Trident API
partition本地操作,无需网络io
等同于pig的generate
mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))
public class MyFunction extends BaseFunction {
public void execute(TridentTuple tuple, TridentCollector collector) {
for(int i=0; i < tuple.getInteger(0); i++) {
collector.emit(new Values(i));
}
}
}
等同于pig的filter
mystream.each(new Fields("b", "a"), new MyFilter())
public class MyFilter extends BaseFilter {
public boolean isKeep(TridentTuple tuple) {
return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2;
}
}
partitionAggregate
等同于pig的combine操作(三种aggregate接口)
mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
mystream.chainedAgg()
.partitionAggregate(new Count(), new Fields("count"))
.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
.chainEnd()
@@@
public class Count implements CombinerAggregator<Long> {
public Long init(TridentTuple tuple) {
return 1L;
}
public Long combine(Long val1, Long val2) {
return val1 + val2;
}
public Long zero() {
return 0L;
}
}
@@@
public class Count implements ReducerAggregator<Long> {
public Long init() {
return 0L;
}
public Long reduce(Long curr, TridentTuple tuple) {
return curr + 1;
}
}
//最底层的aggregate,每个方法都有collector
public class CountAgg extends BaseAggregator<CountState> {
static class CountState {
long count = 0;
}
public CountState init(Object batchId, TridentCollector collector) {
return new CountState();
}
public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
state.count+=1;
}
public void complete(CountState state, TridentCollector collector) {
collector.emit(new Values(state.count));
}
}
---------------------
stateQuery and partitionPersist
--------------------------
projection
mystream.project(new Fields("b", "d"))
---------------------------
Repartitioning operations
shuffle: Use random round robin algorithm to evenly redistribute tuples across all target partitions
broadcast: Every tuple is replicated to all target partitions. This can useful during DRPC – for example, if you need to do a stateQuery on every partition of data.
partitionBy: partitionBy takes in a set of fields and does semantic partitioning based on that set of fields. The fields are hashed and modded by the number of target partitions to select the target partition. partitionBy guarantees that the same set of fields always goes to the same target partition.
global: All tuples are sent to the same partition. The same partition is chosen for all batches in the stream.
batchGlobal: All tuples in the batch are sent to the same partition. Different batches in the stream may go to different partitions.
partition: This method takes in a custom partitioning function that implements backtype.storm.grouping.CustomStreamGrouping
----------------------------
Aggregation operations
mystream.aggregate(new Count(), new Fields("count"))
----------------------------
等同pig group by
Operations on grouped streams
groupBy(new Fields("word"))
--------------------------------
不同于sql的joins,做的是一个batch的join
Merges and joins
Here's an example join between a stream containing fields ["key", "val1", "val2"] and another stream containing ["x", "val1"]:
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
相关推荐
Storm Trident API 使用详解.docx
该库在 Google Cloud Datastore ( ) 之上实现了 Trident 状态。 它支持非事务性、事务性和不透明状态类型。
Bolt/Trident API 实现 该库提供了核心storm bolt,并在Elasticsearch 之上实现了Trident 状态。 它支持非事务性、事务性和不透明状态类型。 Maven 依赖 < groupId>com.github.fhuss</ groupId> < artifactId>...
这个项目是 Storm's Trident 的游乐场。 在这个项目中,您可以找到我用于柏林的 Trident hackaton @ Big Data Beers #4 的指南: : 在 Demo.java 文件中。 您还可以在示例子包中找到我解释的概念的说明性示例。 ...
07.Kafka Java API 简单开发测试 08.storm-kafka 详解和实战案例 09.S图表框架HighCharts介绍 10.HBase快速入门 11.基于HBase的Dao基类和实现类开发一 12.基于HBase的Dao基类和实现类开发二 13.项目1-地区销售额-...
07.Kafka Java API 简单开发测试 08.storm-kafka 详解和实战案例 09.S图表框架HighCharts介绍 10.HBase快速入门 11.基于HBase的Dao基类和实现类开发一 12.基于HBase的Dao基类和实现类开发二 13.项目1-地区销售额-...
07.Kafka Java API 简单开发测试 08.storm-kafka 详解和实战案例 09.S图表框架HighCharts介绍 10.HBase快速入门 11.基于HBase的Dao基类和实现类开发一 12.基于HBase的Dao基类和实现类开发二 13.项目1-地区销售额-...
07.Kafka Java API 简单开发测试 08.storm-kafka 详解和实战案例 09.S图表框架HighCharts介绍 10.HBase快速入门 11.基于HBase的Dao基类和实现类开发一 12.基于HBase的Dao基类和实现类开发二 13.项目1-地区销售额-...
Storm 平台的库,由 Trident api 描述。它专注于实时算法和在线学习算法,该库实现了一些经典算法类型,如分类器、聚类、回归、基数、和Average Counting.etc,你可以用这个库在大数据环境中构建一些智能应用程序,...
三叉戟lambda 使用Storm的作为实时层和作为批处理层的一个玩具示例。问题我们要实现对按日期分组的tweet中的#...解决方案提出的解决方案是使用“ lambda体系结构”并使用实现实时层, 是Storm之上的API,可简化构建
07.Kafka Java API 简单开发测试 08.storm-kafka 详解和实战案例 09.S图表框架HighCharts介绍 10.HBase快速入门 11.基于HBase的Dao基类和实现类开发一 12.基于HBase的Dao基类和实现类开发二 13.项目1-地区销售额-...
07.Kafka Java API 简单开发测试 08.storm-kafka 详解和实战案例 09.S图表框架HighCharts介绍 10.HBase快速入门 11.基于HBase的Dao基类和实现类开发一 12.基于HBase的Dao基类和实现类开发二 13.项目1-地区销售额-...
07.Kafka Java API 简单开发测试 08.storm-kafka 详解和实战案例 09.S图表框架HighCharts介绍 10.HBase快速入门 11.基于HBase的Dao基类和实现类开发一 12.基于HBase的Dao基类和实现类开发二 13.项目1-地区销售额-...
07.Kafka Java API 简单开发测试 08.storm-kafka 详解和实战案例 09.S图表框架HighCharts介绍 10.HBase快速入门 11.基于HBase的Dao基类和实现类开发一 12.基于HBase的Dao基类和实现类开发二 13.项目1-地区销售额-...
07.Kafka Java API 简单开发测试 08.storm-kafka 详解和实战案例 09.S图表框架HighCharts介绍 10.HBase快速入门 11.基于HBase的Dao基类和实现类开发一 12.基于HBase的Dao基类和实现类开发二 13.项目1-地区销售额-...
07.Kafka Java API 简单开发测试 08.storm-kafka 详解和实战案例 09.S图表框架HighCharts介绍 10.HBase快速入门 11.基于HBase的Dao基类和实现类开发一 12.基于HBase的Dao基类和实现类开发二 13.项目1-地区销售额-...
07.Kafka Java API 简单开发测试 08.storm-kafka 详解和实战案例 09.S图表框架HighCharts介绍 10.HBase快速入门 11.基于HBase的Dao基类和实现类开发一 12.基于HBase的Dao基类和实现类开发二 13.项目1-地区销售额-...
07.Kafka Java API 简单开发测试 08.storm-kafka 详解和实战案例 09.S图表框架HighCharts介绍 10.HBase快速入门 11.基于HBase的Dao基类和实现类开发一 12.基于HBase的Dao基类和实现类开发二 13.项目1-地区销售额-...
07.Kafka Java API 简单开发测试 08.storm-kafka 详解和实战案例 09.S图表框架HighCharts介绍 10.HBase快速入门 11.基于HBase的Dao基类和实现类开发一 12.基于HBase的Dao基类和实现类开发二 13.项目1-地区销售额-...