`

storm事务

阅读更多

转:http://blog.csdn.net/yangbutao/article/details/17844799

 

1、storm事务性topology的提出

对于容错机制,Storm通过一个系统级别的组件acker,结合xor校验机制判断一个msg是否发送成功,进而spout可以重发该msg,保证一个msg在出错的情况下至少被重发一次。但是在一些事务性要求比较高的场景中,需要保障一次只有一次的语义,比如需要精确统计tuple的数量等等。Storm 0.7.0引入了Transactional Topology, 它可以保证每个tuple”被且仅被处理一次”, 这样你就可以实现一种非常准确,非常可扩展,并且高度容错方式来实现计数类应用。

2、API介绍

 

IBatchBolt有三个方法

execute(Tuple tuple)

finishBatch()

prepare (java.util.Map conf, TopologyContext context, BatchOutputCollector collector,T id)

 

ITransactionalSpout有以下几个主要方法:

ITransactionalSpout.Coordinator<T> getCoordinator(java.util.Map conf,

                                                 TopologyContext context)

ITransactionalSpout.Emitter<T> getEmitter(java.util.Map conf,

                                          TopologyContext context)

 

3、事务机制原理分析

1) 对于一次只有一次的语义,从原理上来讲,需要在发送tuple的时候带上xid,在需要事务处理的时候,根据该xid是否以前已经处理成功来决定是否进行处理,当然需要把xid和处理结果一起做保存。并且需要保障顺序性,在当前请求xid提交前,所有比自己低xid请求都已经提交。

在事务处理时单个处理tuple效率比较低,因此storm中引入batch处理,一批tuple赋予一个xid,为了提高batch之间处理的并行度,storm采用了pipeline 处理的模型。参见下图pipeline模型,多个事务可以并行执行,但是commit的是严格按照顺序的。

 

对应到storm中的具体实现中,把一个batch的计算分成了两个阶段processing和commit阶段:

 

Processing阶段:多个batch可以并行计算,上面例子中bolt2是普通的batchbolt(实现BaseBatchBolt),那么多个batch在bolt2的task之间可以并行执行,比如对batch3和batch4并行执行execute或finishbatch(什么时候调用该操作,后面会介绍)方法。

Commiting阶段:batch之间强制按照顺序进行提交,上图中Bolt3实现BaseBatchBolt并且标记需要事务处理的(实现了ICommitter接口或者通过TransactionalTopologyBuilder的setCommitterBolt方法把BatchBolt添加到topology里面),那么在Storm认为可以提交(至于什么时候可以提交,后面会介绍)batch的时候调用finishbatch,在finishBatch做xid的比较以及状态保存工作。例子中batch2必须等待batch1提交后,才可以进行提交。

Storm事务性的拓扑看起来比较复杂,需要对batch的commit进行管理,错误的发现,batch的发射以及处理等等,其内部实现完全基于storm的相关底层操作进行抽象。

当使用Transactional Topologies的时候, storm为你做下面这些事情:

 

  • 管理状态: Storm把所有实现Transactional Topologies所必须的状态保存在zookeeper里面。 这包括当前transaction id以及定义每个batch的一些元数据。
  • 协调事务: Storm帮你管理所有事情, 以帮你决定在任何一个时间点是该proccessing还是该committing。
  • 错误检测: Storm利用acking框架来高效地检测什么时候一个batch被成功处理了,被成功提交了,或者失败了。Storm然后会相应地replay对应的batch。你不需要自己手动做任何acking或者anchoring — storm帮你搞定所有事情。
  • 内置的批处理API: Storm在普通bolt之上包装了一层API来提供对tuple的批处理支持。Storm管理所有的协调工作,包括决定什么时候一个bolt接收到一个特定transaction的所有tuple。Storm同时也会自动清理每个transaction所产生的中间数据。
  • 最后,需要注意的一点是Transactional Topologies需要一个可以完全重发(replay)一个特定batch的消息的队列系统(Message Queue)。storm-contrib里面的storm-kafka实现了这个。

 

事务性topology从实现上来讲,包括事务性的spout,以及事务性的bolt。

2) 事务性的spout需要实现ITransactionalSpout,这个接口包含两个内部类Coordinator和Emitter。在topology运行的时候,事务性的spout内部包含一个子的topology,类似下面这个结构:

 

其中coordinator是spout,emitter是bolt。

这里面有两种类型的tuple,一种是事务性的tuple,一种是真实batch中的tuple;

coordinator为事务性batch发射tuple,Emitter负责为每个batch实际发射tuple。

具体如下:

  • coordinator只有一个,emitter根据并行度可以有多个实例
  • emitter以all grouping(广播)的方式订阅coordinator的”batch emit”流
  • coordinator (其实是是一个内部的spout)开启一个事务准备发射一个batch时候,进入一个事务的processing阶段,会发射一个事务性tuple(transactionAttempt & metadata)到”batch emit”流

        *****说明******

       TransactionalTopology里发送的tuple都必须以TransactionAttempt作为第一个field,storm根据这个field来判断tuple属于哪一个batch。

       TransactionAttempt包含两个值:一个transaction id,一个attempt id。transaction id的作用就是我们上面介绍的对于每个batch中的tuple是唯一的

       ,而且不管这个batch    replay多少次都是一样的。attempt id是对于每个batch唯一的一个id, 但是对于同一个batch,它replay之后的attempt id跟replay之前就不一样了,

       我们可以把attempt id理解成replay-times, storm利用这个id来区别一个batch发射的tuple的不同版本

        metadata(元数据)中包含当前事务可以从哪个point进行重放数据,存放在zookeeper中的,spout可以通过Kryo从zookeeper中序列化和反序列化该元数据。

 

       **************

  • Emiter接收到这个tuble后,会进行batch tuple的发送
  • Storm通过anchoring/acking机制来检测事务是否已经完成了processing 阶段;
  • Processing阶段完成后,并且之前的transactions都已经提交了,coordinator发射一个tuble到” commit”流,进入commit阶段。
  • commiting bolts通过all grouping方式订阅该”commit”流,事务提交后,coordinator同样通过anchoring/acking机制确认已经完成了commit阶段,接收到ack后,在zookeeper上把该transaction标记为完成。

3) 事务性的Bolt继承BaseTransactionalBolt,处理batch在一起的tuples,对于每一个tuple调用调用execute方法,而在整个batch处理(processing)完成的时候调用finishBatch方法。如果BatchBolt被标记成Committer,则只能在commit阶段调用finishBolt方法。一个batch的commit阶 段由storm保证只在前一个batch成功提交之后才会执行。并且它会重试直到topology里面的所有bolt在commit完成提交。那么如何知道batch的processing完成了,也就是bolt是否接收处理了batch里面所有的tuple;在bolt内部,有一个CoordinatedBolt的模型。

 

CoordinateBolt具体原理如下:

 

CoordinateBolt具体原理如下:

 

  • 真正执行计算的bolt外面封装了一个CoordinateBolt。真正执行任务的bolt我们称为real bolt。
  • 每个CoordinateBolt记录两个值:有哪些task给我发送了tuple(根据topology的grouping信息);我要给哪些tuple发送信息(同样根据groping信息)
  •  Real bolt发出一个tuple后,其外层的CoordinateBolt会记录下这个tuple发送给哪个task了。
  • 等所有的tuple都发送完了之后,CoordinateBolt通过另外一个特殊的stream以emitDirect的方式告诉所有它发送过 tuple的task,它发送了多少tuple给这个task。下游task会将这个数字和自己已经接收到的tuple数量做对比,如果相等,则说明处理 完了所有的tuple。
  • 下游CoordinateBolt会重复上面的步骤,通知其下游。

事务性的拓扑在storm中的一个应用是Trident,它是在storm的原语和事务性的基础上做更高层次的抽象,做到一致性和恰好一次的语义,后续章节会对trident做分析。

官网:

http://storm.apache.org/documentation/Transactional-topologies.html

分享到:
评论

相关推荐

    storm事务详解(transactionTopology

    storm事务详解(transactionTopology)共3页.pdf.zip

    storm事务1111

    storm事务

    Storm事务性拓扑详解教程.docx

    Storm0.7.0实现了一个新特性——事务性拓扑,这一特性使消息在语义上确保你可以安全的方式重发消息,并保证它们只会被处理一次。在不支持事务性拓扑的情况下,你无法在准确性,可扩展性,以空错性上得到保证的前提下...

    Storm中涉及到的类

    Storm事务中涉及到的类,以及在非事务、事务、分区事务、不透明分区事务的比较的信息汇总

    Storm入门教程 之Storm原理和概念详解

    Storm流计算从入门到精通之技术篇(高并发策略、批处理事务、Trident精解、运维监控、企业场景) Storm入门教程 之Storm原理和概念详解,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群...

    细细品味Storm_Storm简介及安装

    第7章演示集成Storm和非事务型系统 的复杂性,通过集成Storm和开源探索性分析架构 Druid实现一个可配置的实时系统来分析金融事件。 第8章探讨Lambda体系结构的实现方法,讲解如何 将批处理机制和实时处理引擎结合...

    基于Storm流计算天猫双十一作战室项目实战

    3、注重实践,对较抽象难懂的技术点如Grouping策略、并发度及线程安全、批处理事务、DRPC、Storm Trident均结合企业场景开发案例进行讲解,让学员觉得简单易懂; 4、每个技术均采用最新稳定版本,学完后会员可以从...

    Storm实战:构建大数据实时计算

    第6章~第8章详细而系统地讲解了几个高级特性:事务、DRPC和Trident;第9章以实例的方式讲解了Storm在实际业务场景中的应用;第10章总结了几个在大数据场景应用过程中遇到的经典问题,以及详细的排查过程。

    Storm实时数据处理.pdf

    Storm实时数据处理.pdf

    Storm实时数据处理中文完整版

    Storm实时数据处理中文完整版,带有完整的书签

    微服务架构的分布式事务解决方案+Dubbo分布式服务框架视频教程+redis+zookeeper+storm+mycat

    微服务架构的分布式事务解决方案 Dubbo分布式服务框架视频教程+redis+zookeeper+storm+mycat 资源为百度云连接+密码

    storm_trident_state

    storm_trident_state storm trident自定义state的实现,实现了3种插入数据库的持久化方法,事务,不透明事务,不透明分区事务 下载下面,运行:mvn eclipse:eclipse即可在eclipse使用此项目。

    storm-trident-elasticsearch:基于 Elasticsearch 的 Trident State 实现

    它支持非事务性、事务性和不透明状态类型。 Maven 依赖 &lt; groupId&gt;com.github.fhuss&lt;/ groupId&gt; &lt; artifactId&gt;storm-elasticsearch &lt; version&gt;0.3.0 TupleMapper / TridentTupleMapper 要将文档索引到 ...

    storm-cassandra-cql

    对于 Cassandra,CQL 对轻量级事务、批处理和集合有更好的支持。 此外,CQL 可能会比传统的 Thrift 界面获得更多关注。 出于这些原因,我们决定创建一个基于 CQL 的 C* 状态实现。 Storm-Cassandra-Cql 提供了三种...

    trident-gcd:Storm Trident API 的 Google Cloud Datastore 状态实现

    该库在 Google Cloud Datastore ( ) 之上实现了 Trident 状态。 它支持非事务性、事务性和不透明状态类型。

    购物数据流处理+可视化实时数据大屏.rar

    整个项目主要实现了从模拟生成购物数据,到通过kafka传输数据,到通过storm的高级事务处理trident来进行实时流数据处理,最后,将实时生成的统计数据进行实时的可视化,生成类似天猫双十一实时数据大屏的可视化效果

    storm:用锈写成的内存中与数据库无关的ORM

    事务日志,用于将更改推送到数据库和内存中。 表以不同的策略和表存储方式加载到内存中,以实现快速访问。 可以与读取-&gt;队列-&gt;写入锁定模型一起使用,以实现最大的并发性。 加载/保存是异步的。 添加行/表的版本...

    谷歌浏览器

    Storm流计算之项目篇...JQuery,含3个完整实际项目) http://www.ibeifeng.com/goods-461.html Storm流计算从入门到精通之技术篇(高并发策略、批处理事务、Trident精解、运维监控、企业场景) http://www.ibeifeng.co

    基于go语言的大数据分布式有序计算(例如行情数据处理-定时任务调度等).zip

    分布式强有序实时计算 -- 第一版(一个master节点 多个slave节点,轻量级storm golang版本) 应用场景:大数据(行情数据)处理/定时任务调度等 功能: 1:选举出master节点/slave节点 2:master节点指派slave节点...

    transactional_topo_opaque_partition

    storm不透明分区事务,引入storm 0.9.0.1和jdk1.7

Global site tag (gtag.js) - Google Analytics