我是Storm的新手,并创建了一个程序来读取增加的数字一段时间.我在Spout中使用了一个计数器,在" nextTuple() "方法中,计数器正在被发射并递增
_collector.emit(new Values(new Integer(currentNumber++))); /* how this method is being continuously called...*/
并且在Tuple类的execute()方法中有
public void execute(Tuple input) { int number = input.getInteger(0); logger.info("This number is (" + number + ")"); _outputCollector.ack(input); } /*this part I am clear as Bolt would receive the input from Spout*/
在我的Main类执行中,我有以下代码
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("NumberSpout", new NumberSpout()); builder.setBolt("NumberBolt", new PrimeNumberBolt()) .shuffleGrouping("NumberSpout"); Config config = new Config(); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("NumberTest", config, builder.createTopology()); Utils.sleep(10000); localCluster.killTopology("NumberTest"); localCluster.shutdown();
程序完美正常.目前我在这里看到的是Storm框架如何在内部连续调用nextTuple()方法.我确信我的理解在这里缺少一些东西,由于这个差距,我无法连接到这个框架的内部逻辑.
你们中的任何人都可以帮助我清楚地理解这部分,然后这将对我有很大的帮助,因为我必须在我的项目中实现这个概念.如果我在概念上清楚,那么我可以取得重大进展.感谢是否有人可以在这里快速协助我.等待回复......
Storm框架如何在内部连续调用nextTuple()方法.
我相信这实际上涉及一个关于风暴拓扑的整个生命周期的非常详细的讨论,以及不同实体的明确概念,如工人,执行者,任务等.拓扑的实际处理由StormSubmitter
类及其submitTopology
方法执行.
它首先要做的是使用Nimbus的Thrift接口开始上传jar ,然后调用 submitTopology,最终将拓扑提交给Nimbus.
然后,Nimbus首先对拓扑进行规范化(来自doc:规范化的主要目的是确保每个任务都具有相同的序列化注册,这对于使序列化正常工作至关重要),然后是序列化,zookeeper握手,主管和工人流程启动等.它太宽泛而无法讨论但是如果你真的想要挖掘更多,你可以经历风暴拓扑的生命周期,它可以很好地解释整个过程中一步一步的动作.
(文档中的快速说明)
首先是关于拓扑的几个重要说明:
运行的实际拓扑与用户指定的拓扑不同.实际拓扑具有隐式流,并添加了隐式"acker"螺栓来管理acking框架(用于保证数据处理).
隐式拓扑是通过系统拓扑创建的!功能.系统拓扑!在两个地方使用:
- 当Nimbus为拓扑代码创建任务时
- 在worker中,它知道将消息路由到代码所需的位置
现在这里有一些我可以尝试分享的线索......
Spouts或Bolts实际上是进行实际处理(逻辑)的组件.在风暴术语中,他们在整个结构中执行尽可能多的任务.
从doc页面:每个任务对应一个执行线程
现在,在许多其他情况下,风暴中的一个worker process
(在这里阅读)的一个典型责任是监视拓扑是否活动的拓扑,并将该特定状态存储在名为的变量中storm-active-atom
.任务使用此变量来确定是否调用该nextTuple
方法.因此,只要您的拓扑结构处于活动状态(您没有将您的喷嘴代码放在假设中),直到您的计时器处于活动状态(正如您所说的那样)它会继续调用nextTuple方法.您可以进一步深入了解风暴的Acking框架实现,以了解一旦元组成功处理后它如何理解和确认并保证消息处理
我确信我的理解在这里缺少一些东西,由于这个差距,我无法连接到这个框架的内部逻辑
话虽如此,我认为更清楚地了解如何使用风暴而不是如何在早期阶段了解风暴更为重要.例如,不是学习风暴的内部机制,重要的是要知道如果我们设置一个喷口逐行读取文件,那么它继续使用该_collector.emit
方法发射每一行直到它达到EOF.并且连接到它的螺栓在其execute(tuple input)
方法中 接收相同的螺栓
希望这有助于您将来与我们分享更多