码友18年 study together Java

【Jstorm基础】Jstorm 介绍

JStorm 简介

Storm是开源的分布式容错实时计算系统,目前被托管在GitHub上,遵循 Eclipse Public License 1.0。最初由BackType开发,现在已被Twitter收入麾下。Storm最新版本是Storm 0.9,核心采用Clojure实现。Storm为分布式实时计算提供了一组通用原语,可被用于“流处理”之中,实时处理消息;Storm也可被用于“连续计算”(continuous computation),对数据流做连续处理,在计算时就将结果以流的形式输出给用户;它还可被用于“分布式RPC”,以并行的方式执行运算。

Storm主要特点如下:
0、简单的编程模型。类似于MapReduce降低了并行批处理复杂性,Storm降低了实时处理的复杂性。
1、语言无关。Storm的消息处理组件可以用任何语言来定义。默认支持Clojure、Java、Ruby和Python。要增加对其他语言的支持,只需实现一个简单的Storm通信协议即可。
2、容错性。如果在消息处理过程中出了一些异常,Storm会重新调度出问题的处理逻辑。Storm保证一个处理单元永远运行,除非显式杀掉。
3、可伸缩性。Storm的可伸缩性可以使其每秒处理的消息量达到很高。为了扩展一个实时计算任务,需要做的就是增加节点并且提高计算任务的并行度设置(parallelism setting)。Storm应用在10个节点的集群上每秒可以处理高达1000000个消息,包括每秒一百多次的数据库调用[5]。同时Storm使用ZooKeeper来协调集群内的各种配置使得Storm的集群可以很容易扩展。
4、保证无数据丢失。实时系统必须保证所有的数据被成功的处理。 那些会丢失数据的系统的适用场景非常窄,而Storm保证每一条消息都会被处理。
5、适用场景广泛。消息流处理、持续计算、分布式方法调用等是Storm适用场景广泛的基础,Storm的这些基础原语可以满足大量的场景。

虽然Storm具备诸多优势,但也存在不足:
0、Storm目前还存在Nimbus SPOF的问题;
1、存在雪崩问题;
2、资源粒度较粗;
3、Clojure实现引入了学习成本;

为此,阿里巴巴中间件团队用Java重新实现了类Storm的JStorm,同样被托管在GitHub上,遵循 Eclipse Public License 1.0,目前版本0.9.3。相关资料显示,阿里巴巴内部已经大规模部署了Storm/JStorm集群。
JStorm继承了Storm的所有优点,同时与Storm相比JStorm所特有的如下特点:
0、兼容Storm接口。开发者在Storm上运行的程序无需任何修改即可运行在JStorm上。
1、Nimbus HA。解决了Storm的Nimbus单点问题,支持自动热备切换Nimbus。
2、更细粒度的资源划分。JStorm从CPU、MEMORY、DISK和NET四个维度进行任务调度,同时不存在任务抢占问题。
3、可定制的任务调度机制。(Storm的任务调度目前也可定制)
4、更好的性能。通过底层ZeroMQ和Netty使JStorm具有更好的性能,同时具有更好的稳定性。
5、解决了Storm的雪崩问题。通过Netty和disruptor机制实现RPC保证可以匹配的数据发送和接收速度避免雪崩问题。

此外,JStorm通过减少对zookeeper的访问量、增加反序列化线程、优化ACK、增加监控内容及JAVA本身优势等各个方面优化了Storm的性能和稳定性。总之,JStorm比Storm更强大、更稳定、性能更好。
(本文后面所述关于JStorm的部分内容同样适用Storm)

JStorm 应用场景

从应用的角度来说,JStorm它是一种分布式的应用;从系统层面来说,它又类似于MapReduce这样的调度系统;而从数据方面来说,它又 是一种基于流水数据的实时处理解决方案。如今,DT时代的当下,用户和企业也不仅仅只满足于离线数据,对于数据的实时性要求也越来越高了。

  在早期,Storm和JStorm未问世之前,业界有很多实时计算系统,可谓百家争鸣,自Storm和JStorm出世之后,基本这两者占据主要地位,原因如下:

  • 易开发:接口简单,上手容易,只需要按照Spout,Bolt以及Topology的编程规范即可开发一个扩展性良好的应用,底层的细节我们可以不用去深究其原因。

  • 扩展性:可线性扩展性能。

  • 容错:当Worker异常或挂起,会自动分配新的Worker去工作。

  • 数据精准:其包含Ack机制,规避了数据丢失的风险。使用事物机制,提高数据精度。

  JStorm处理数据的方式流程是基于流式处理,因此,我们会用它做以下处理:

  • 日志分析:从收集的日志当中,统计出特定的数据结果,并将统计后的结果持久化到外界存储介质中,如:DB。当下,实时统计主流使用JStorm和Storm。

  • 消息转移:将接受的消息进行Filter后,定向的存储到另外的消息中间件中。

JStorm 基本术语

Spout和Bolt

在JStorm中,它认为每个Stream都有一个Stream的来源,即Tuple的源头,所以它将这个源头抽象为Spout,而Spout可能是一个消息中间件,如:MQ,Kafka等。并不断的发出消息,也可能是从某个队列中不断读取队列的元数据。

  在有了Spout后,接下来如何去处理相关内容,以类似的思想,将JStorm的处理过程抽象为Bolt,Bolt可以消费任意数量的输入流, 只要将流方向导到该Bolt即可,同时,它也可以发送新的流给其他的Bolt使用,因而,我们只需要开启特定的Spout,将Spout流出的Tuple 导向特定的Bolt,然后Bolt对导入的流做处理后再导向其它的Bolt等。

  那么,通过上述描述,其实,我们可以用一个形象的比喻来理解这个流程。我们可以认为Spout就是一个个的水龙头,并且每个水龙头中的水是不同 的,我们想要消费那种水就去开启对应的水龙头,然后使用管道将水龙头中的水导向一个水处理器,即Bolt,水处理器处理完后会再使用管道导向到另外的处理 器或者落地到存储介质。

Topology

有向无环图,JStorm将这个图抽象为Topology,它是JStorm中最高层次的一个抽象概念,它可以处理代码层面 当中直接于JStorm打交道的,可以被提交到JStorm集群执行对应的任务,一个Topology即为一个数据流转换图,图中的每个节点是一个 Spout或者Bolt,当Spout或Bolt发送Tuple到流时,它就发送Tuple到每个订阅了该流的Bolt上。


Tuple

  JStorm当中将Stream中数据抽象为了Tuple,一个Tuple就是一个Value List,List值的每个Value都有一个Name,并且该Value可以是基本类型,字符类型,字节数组等,当然也可以是其它可序列化的类型。 Topology的每个节点都要说明它所发射出的Tuple的字段的Name,其它节点只需要订阅该Name就可以接收处理相应的内容。

Worker和Task

  Work和Task在JStorm中的职责是一个执行单元,一个Worker表示一个进程,一个Task表示一个线程,一个Worker可以运 行多个Task。而Worker可以通过setNumWorkers(int workers)方法来设置对应的数目,表示这个Topology运行在多个JVM(PS:一个JVM为一个进程,即一个Worker);另外 setSpout(String id, IRichSpout spout, Number parallelism_hint)和setBolt(String id, IRichBolt bolt,Number parallelism_hint)方法中的参数parallelism_hint代表这样一个Spout或Bolt有多少个实例,即对应多少个线程,一 个实例对应一个线程。

Slot

  在JStorm当中,Slot的类型分为四种,他们分别是:CPU,Memory,Disk,Port;与Storm有所区别(Storm局限 于Port)。一个Supervisor可以提供的对象有:CPU Slot、Memory Slot、Disk Slot以及Port Slot。

  • 在JStorm中,一个Worker消耗一个Port Slot,默认一个Task会消耗一个CPU Slot和一个Memory Slot

  • 在Task执行较多的任务时,可以申请更多的CPU Slot

  • 在Task需要更多的内存时,可以申请更多的额Memory Slot

  • 在Task磁盘IO较多时,可以申请Disk Slot

JStorm 数据模型

JStorm通过一系列基本元素实现实时计算的目标,其中包括了Topology、Stream、Spout、Bolt等等。JStorm在模型上和MapReduce有很多相似的地方,下表从不同维度对JStorm和MapReduce进行了比较。


MapReduce

JStorm

Role

JobTracker

Nimbus

TaskTracker

Supervisor

Child

Worker

Application

Job

Topology

Interface

Mapper/Reducer

Spout/Bolt

实时计算任务需要打包成Topology提交,和MapReduce Job相似,不同的是,MapReduce Job在计算完成后结束,而JStorm的Topology任务一旦提交永远不会结束,除非显式停止。

计算任务Topology是由不同的Spout和Bolt通过Stream连接起来的DAG图。下面是一个典型Topology的结构示意图:

 

其中:

Spout:JStorm的消息源。用于生产消息,一般是从外部数据源(如MQ/RDBMS/NoSQL/RTLog等)不间断读取数据并向下游发送消息。

Bolt:JStorm的消息处理者。用于为Topology进行消息处理,Bolt可以执行查询、过滤、聚合及各种复杂运算操作,Bolt的消息处理结果可以作为下游Bolt的输入不断迭代。

Stream:JStorm中对数据进行的抽象,它是时间上无界的Tuple元组序列。在Topology中Spout是Stream的源头,负责从特定数据源发射Stream;Bolt可以接收任意多个Stream输入然后进行数据的加工处理,如果需要Bolt还可以发射出新Stream给下游Bolt。

Tuple:JStorm使用Tuple作为数据模型,存在于任意两个有数据交互的组件(Spout/Bolt)之间。每个Tuple是一组具有各自名称的值,值可以是任何类型,JStorm支持所有的基本类型、字符串以及字节数组,也可以使用自定义类型(需实现对应序列化器)作为值类型。简单来说,Tuple就是一组实现了序列化器带有名称的Java对象集合。

从整个Topology上看,Spout/Bolt可以看作DAG的节点,Stream是连接不同节点之间的有向边,Tuple则是流过Stream的数据集合。
下面是一个Topology内部Spout和Bolt之间的数据流关系:

 

Topology中每一个计算组件(Spout和Bolt)都有一个并行度,在创建Topology时指定(默认为1),JStorm在集群内分配对应个数的线程Task并行。

如上图示,既然对于Spout/Bolt都会有多个线程来并行执行,那么如何在两个组件(Spout和Bolt)之间发送Tuple会成为新的问题。

JStorm通过定义Topology时为每个Bolt指定输入Stream以及指定提供的若干种数据流分发(Stream Grouping)策略用来解决这一问题。

JStorm提供了以下几种Stream Grouping策略:
0) Shuffle Grouping:随机分组,随机派发Stream里面的Tuple,保证每个Bolt接收到的Tuple数目大致相同,通过轮询随机的方式使得下游Bolt之间接收到的Tuple数目差值不超过1。
1) Fields Grouping:按字段分组,具有同样字段值的Tuple会被分到相同Bolt里的Task,不同字段值则会被分配到不同Task。
2) All Grouping:广播分组,每一个Tuple,所有的Bolt都会收到。
3) Global Grouping:全局分组,Tuple被分配到Bolt中ID值最低的的一个Task。
4) Non Grouping:不分组,Tuple会按照完全随机的方式分发到下游Bolt。
5) Direct Grouping:直接分组,Tuple需要指定由Bolt的哪个Task接收。 只有被声明为Direct Stream的消息流可以声明这种分组方法。
6) Local or Shuffle Grouping:基本同Shuffle Grouping。
7) Custom Grouping:用户自定义分组策略,CustomStreamGrouping是自定义分组策略时用户需要实现的接口。

JStorm 关键流程

0、Topology提交

JStorm为用户提供了StormSubmitter. submitTopology用来向集群提交Topology,整个提交流程:

Client端:
0)客户端简单验证;
1)检查是否已经存在同名Topology;
2)提交jar包;
3)向Nimbus提交Topology;

Nimbus端:
0)Nimbus端简单合法性检查;
1)生成Topology Name;
2)序列化配置文件和Topology Code;
3)Nimbus本地准备运行时所需数据;
4)向ZK注册Topology和Task;
5)将Task压入分配队列等待TopologyAssign分配;

1、任务调度策略

从0.9.0开始,JStorm提供非常强大的调度功能,基本上可以满足大部分的需求,同时支持自定义任务调度策略。JStorm的资源不再仅是Worker的端口,而从CPU/Memory/Disk/Net等四个维度综合考虑。
Nimbus任务调度算法[2]如下:
0)优先使用自定义任务分配算法,当资源无法满足需求时,该任务放到下一级任务分配算法;
1)使用历史任务分配算法(如果打开使用历史任务属性),当资源无法满足需求时,该任务放到下一级任务分配算法;
2)使用默认资源平衡算法,计算每个Supervisor上剩余资源权值,取权值最高的Supervisor分配任务。

2、Acker机制

为保证无数据丢失,Storm/JStorm使用了非常漂亮的可靠性处理机制,如图当定义Topology时指定Acker,JStorm除了Topology本身任务外,还会启动一组称为Acker的特殊任务,负责跟踪Topolgogy DAG中的每个消息。每当发现一个DAG被成功处理完成,Acker就向创建根消息的Spout任务发送一个Ack信号。Topology中Acker任务的并行度默认parallelism hint=1,当系统中有大量的消息时,应该适当提高Acker任务的并行度。

 

Acker按照Tuple Tree的方式跟踪消息。当Spout发送一个消息的时候,它就通知对应的Acker一个新的根消息产生了,这时Acker就会创建一个新的Tuple Tree。当Acker发现这棵树被完全处理之后,他就会通知对应的Spout任务。

 

Acker任务保存了数据结构Map<MessageID,Map< TaskID, Value>>,
其中MessageID是Spout根消息ID,TaskID是Spout任务ID,Value表示一个64bit的长整型数字,是树中所有消息的随机ID的异或结果。通过TaskID,Acker知道当消息树处理完成后通知哪个Spout任务,通过MessageID,Acker知道属于Spout任务的哪个消息被成功处理完成。Value表示了整棵树的的状态,无论这棵树多大,只需要这个固定大小的数字就可以跟踪整棵树。当消息被创建和被应答的时候都会有相同的MessageID发送过来做异或。当Acker发现一棵树的Value值为0的时候,表明这棵树已经被成功处理完成。

例如,对于前面Topology中消息树,Acker数据的变化过程:
Step0.A发送T0给B后:
R0=r0
<id0,<taskA,R0>>
Step1.B接收到T0并成功处理后向C发送T1,向D发送T2:
R1=R0^r1^r2=r0^r1^r2
<id0,<taskA,R0^R1>>
=<id0,<taskA,r0^r0^r1^r2>>
=<id0,<taskA,r1^r2>>
Step2.C接收到T1并成功处理后:
R2=r1
<id0,<taskA,r1^r2^R2>>
=<id0,<taskA,r1^r2^r1>>
=<id0,<taskA,r2>>
Step3.D接收到T2并成功处理后:
R3=r2
<id0,<taskA,r2^R3>>
=<id0,<taskA,r2^r2>>
=<id0,<taskA,0>>
当结果为0时Acker可以通知taskA根消息id0的消息树已被成功处理完成。

需要指出的是,Acker并不是必须的,当实际业务可以容忍数据丢失情况下可以不用Acker,对数据丢失零容忍的业务必须打开Acker,另外当系统的消息规模较大是可适当增加Acker的并行度。

3、故障恢复

0)节点故障

Nimbus故障。Nimbus本身无状态,所以Nimbus故障不会影响正在正常运行任务,另外Nimbus HA保证Nimbus故障后可以及时被备份Nimbus接管。
Supervisors节点故障。Supervisor故障后,Nimbus会将故障节点上的任务迁移到其他可用节点上继续运行,但是Supervisor故障需要外部监控并及时手动重启。
Worker故障。Worker健康状况监控由Supervisor负责,当Woker出现故障时,Supervisor会及时在本机重试重启。
Zookeeper节点故障。Zookeeper本身具有很好的故障恢复机制,能保证至少半数以上节点在线就可正常运行,及时修复故障节点即可。

1)任务失败

Spout失败。消息不能被及时被Pull到系统中,造成外部大量消息不能被及时处理,而外部大量计算资源空闲。
Bolt失败。消息不能被处理,Acker持有的所有与该Bolt相关的消息反馈值都不能回归到0,最后因为超时最终Spout的fail将被调用。
Acker失败。Acker持有的所有反馈信息不管成功与否都不能及时反馈到Spout,最后同样因为超时Spout的fail将被调用。
任务失败后,需要Nimbus及时监控到并重新分配失败任务。