Apache Storm java.nio.channels.ClosedChannelException:null

 熊孝琳_506 发布于 2023-01-06 15:03

我们正在尝试使用Apache Storm处理大量(假)消息.消息示例:

"{"clientName":"Sergey Bakulin","sum":12925,"group":"property","suspicious":false,"clientId":2,"dt":1404387303764,"coord":{"lat":55.767842588357645,"lon":37.46920361823332}}".

我们使用Apache Kafka作为Storm集群的消息来源.我们的目的是能够处理至少50k msg/sec/node.如果我们使用多个节点,我们会不断遇到错误(日志片段来自worker - *.log):

2014-07-03 15:14:47 b.s.m.n.Client [INFO] failed to send requests to ip-172-31-23-123.eu-west-1.compute.internal/172.31.23.123:6701: java.nio.channels.ClosedChannelException: null
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.cleanUpWriteBuffer(AbstractNioWorker.java:381) [netty-3.6.3.Final.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:349) [netty-3.6.3.Final.jar:na]
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:93) [netty-3.6.3.Final.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:107) [netty-3.6.3.Final.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:312) [netty-3.6.3.Final.jar:na]
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:88) [netty-3.6.3.Final.jar:na]
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) [netty-3.6.3.Final.jar:na]
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [netty-3.6.3.Final.jar:na]
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) [netty-3.6.3.Final.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_51]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_51]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
2014-07-03 15:14:47 b.s.m.n.StormClientErrorHandler [INFO] Connection failed Netty-Client-ip-172-31-23-123.eu-west-1.compute.internal/172.31.23.123:6701

我们当前的风暴配置:

########### These MUST be filled in for a storm configuration
storm.zookeeper.servers:
  - "172.31.*.*"

storm.local.dir: "/home/*/storm/data"
nimbus.host: "127.0.0.1"
supervisor.slots.ports:
  - 6701
  - 6702

ui.port: 8090

worker.childopts: "-Xmx6g -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=1%ID% -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun$

supervisor.childopts: "-Xmx1024m -Djava.net.preferIPv4Stack=true"
supervisor.worker.start.timeout.secs: 10
supervisor.worker.timeout.secs: 10
supervisor.monitor.frequency.secs: 3
supervisor.heartbeat.frequency.secs: 5
supervisor.enable: true

storm.messaging.netty.server_worker_threads: 2
storm.messaging.netty.client_worker_threads: 2
storm.messaging.netty.buffer_size: 5242880
storm.messaging.netty.max_retries: 25
storm.messaging.netty.max_wait_ms: 1000

我们的风暴拓扑:

Properties conf = Util.readProperties(ClientTopology.class, "storm.properties");

prepareRedisDB(conf);

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("kafka_trans_spout", getKafkaSpout(conf, conf.getProperty("kafka_trans_topic")), 3);
builder.setSpout("kafka_socevent_spout", getKafkaSpout(conf, conf.getProperty("kafka_socevent_topic")), 3);

builder.setBolt("json_to_tuple_trans_bolt", new JSONToTupleBolt(Transaction.class), 6)
        .shuffleGrouping("kafka_trans_spout");
builder.setBolt("json_to_tuple_socevent_bolt", new JSONToTupleBolt(SocialEvent.class), 3)
        .shuffleGrouping("kafka_socevent_spout");

builder.setBolt("alert_bolt", new AlertBolt(conf), 3)
        .fieldsGrouping("json_to_tuple_trans_bolt", new Fields("cl_id"))
        .fieldsGrouping("json_to_tuple_socevent_bolt", new Fields("cl_id"));
builder.setBolt("offer_bolt", new NearestOfferBolt(conf), 3)
        .shuffleGrouping("json_to_tuple_trans_bolt");

run(builder, args, 6);

private static KafkaSpout getKafkaSpout(Properties conf, String topic) {
    SpoutConfig spoutConfig = new SpoutConfig(
            new ZkHosts(conf.getProperty("zk_host"), "/brokers"),
            topic,
            "/brokers",
            conf.getProperty("kafka_consumer_group_id"));
    List zkServers = new ArrayList();
    zkServers.add(conf.getProperty("zk_host"));
    spoutConfig.zkServers = zkServers;
    spoutConfig.zkPort = Integer.valueOf(conf.getProperty("zk_port"));
    spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    spoutConfig.forceFromStart = true;
    spoutConfig.fetchSizeBytes = 5*1024*1024;
    spoutConfig.bufferSizeBytes = 5*1024*1024;
    storm.kafka.KafkaSpout kafkaSpout = new storm.kafka.KafkaSpout(spoutConfig);
    return kafkaSpout;
}

我们使用AWS c3.2xlarge机器,Apache Storm 0.9.2-incubating,Apache Kafka 2.9.2-0.8.1.1.

撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有