在Storm中将两个螺栓的输出发送到一个螺栓?

 小song喪_774 发布于 2023-01-12 15:35

将BoltA和BoltB的输出发送到BoltC的最简单方法是什么.我是否必须使用联接或是否有任何更简单的解决方案.A和B具有相同的字段(ts,metric_name,metric_count).

    // KafkaSpout --> LogDecoder
    builder.setBolt(LOGDECODER_BOLT_ID, logdecoderBolt, 10).shuffleGrouping(KAFKA_SPOUT_ID);

    // LogDecoder --> CountBolt
    builder.setBolt(COUNT_BOLT_ID, countBolt, 10).shuffleGrouping(LOGDECODER_BOLT_ID);

    // LogDecoder --> HttpResCodeCountBolt
    builder.setBolt(HTTP_RES_CODE_COUNT_BOLT_ID, http_res_code_count_bolt, 10).shuffleGrouping(LOGDECODER_BOLT_ID);


    # And now I want to send CountBolt and HttpResCodeCountBolt output to Aggregator Bolt.

    // CountBolt --> AggregatwBolt
    builder.setBolt(AGGREGATE_BOLT_ID, aggregateBolt, 5).fieldsGrouping((COUNT_BOLT_ID), new Fields("ts"));

    // HttpResCodeCountBolt --> AggregatwBolt
    builder.setBolt(AGGREGATE_BOLT_ID, aggregateBolt, 5).fieldsGrouping((HTTP_RES_CODE_COUNT_BOLT_ID), new Fields("ts"));

这可能吗 ?

1 个回答
  • 是.只需在fieldsGrouping调用中添加stream-id(下面的"stream1"和"stream2"):

    BoltDeclarer bd = builder.setBolt(AGGREGATE_BOLT_ID, aggregateBolt, 5); 
    bd.fieldsGrouping((COUNT_BOLT_ID), "stream1",  new Fields("ts"));
    bd.fieldsGrouping((HTTP_RES_CODE_COUNT_BOLT_ID), "stream2", new Fields("ts"));
    

    然后在BoltC的execute()方法中,您可以测试以查看元组来自哪个流:

    public void execute(Tuple tuple) {
    
        if ("stream1".equals(tuple.getSourceStreamId())) {
            // this came from stream1
        } else if ("stream2".equals(tuple.getSourceStreamId())) {
            // this came from stream2
        }
    

    由于您知道元组来自哪个流,因此您不需要在两个流上具有相同形状的元组.您只需根据stream-id对元组进行解组.

    您还可以检查元组来自哪个组件(因为我输入这个,我认为这可能更适合您的情况)以及发出元组的组件(任务)的实例.

    2023-01-12 15:36 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有