`

storm trident api

阅读更多

Trident API

 

 

partition本地操作,无需网络io

等同于pig的generate

mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))

 

public class MyFunction extends BaseFunction {

    public void execute(TridentTuple tuple, TridentCollector collector) {

        for(int i=0; i < tuple.getInteger(0); i++) {

            collector.emit(new Values(i));

        }

    }

}

 

 

等同于pig的filter

mystream.each(new Fields("b", "a"), new MyFilter())

 

public class MyFilter extends BaseFilter {

    public boolean isKeep(TridentTuple tuple) {

        return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2;

    }

}

 

 

partitionAggregate

 

等同于pig的combine操作(三种aggregate接口)

mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))

mystream.chainedAgg()

        .partitionAggregate(new Count(), new Fields("count"))

        .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))

        .chainEnd()

 

 

@@@

public class Count implements CombinerAggregator<Long> {

    public Long init(TridentTuple tuple) {

        return 1L;

    }

 

    public Long combine(Long val1, Long val2) {

        return val1 + val2;

    }

 

    public Long zero() {

        return 0L;

    }

}

 

@@@

public class Count implements ReducerAggregator<Long> {

    public Long init() {

        return 0L;

    }

 

    public Long reduce(Long curr, TridentTuple tuple) {

        return curr + 1;

    }

}

 

//最底层的aggregate,每个方法都有collector

public class CountAgg extends BaseAggregator<CountState> {

    static class CountState {

        long count = 0;

    }

 

    public CountState init(Object batchId, TridentCollector collector) {

        return new CountState();

    }

 

    public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {

        state.count+=1;

    }

 

    public void complete(CountState state, TridentCollector collector) {

        collector.emit(new Values(state.count));

    }

}

 

---------------------

 

stateQuery and partitionPersist

 

--------------------------

 

projection

mystream.project(new Fields("b", "d"))

 

---------------------------

 

Repartitioning operations

 

shuffle: Use random round robin algorithm to evenly redistribute tuples across all target partitions

broadcast: Every tuple is replicated to all target partitions. This can useful during DRPC – for example, if you need to do a stateQuery on every partition of data.

partitionBy: partitionBy takes in a set of fields and does semantic partitioning based on that set of fields. The fields are hashed and modded by the number of target partitions to select the target partition. partitionBy guarantees that the same set of fields always goes to the same target partition.

global: All tuples are sent to the same partition. The same partition is chosen for all batches in the stream.

batchGlobal: All tuples in the batch are sent to the same partition. Different batches in the stream may go to different partitions.

partition: This method takes in a custom partitioning function that implements backtype.storm.grouping.CustomStreamGrouping

 

----------------------------

 

Aggregation operations

 

mystream.aggregate(new Count(), new Fields("count"))

 

----------------------------

 

等同pig group by

Operations on grouped streams

 

groupBy(new Fields("word"))

 

--------------------------------

 

不同于sql的joins,做的是一个batch的join

Merges and joins

 

Here's an example join between a stream containing fields ["key", "val1", "val2"] and another stream containing ["x", "val1"]:

 

topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics