热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

windows下storm本地模式java开发实例

2019独角兽企业重金招聘Python工程师标准环境storm有本地模式和集群模式,本地模式什么都不需要,知道stormjar包就可以运行了&#x

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

环境

storm有本地模式和集群模式,本地模式什么都不需要,知道storm jar包就可以运行了,我在windows下eclipse中使用maven进行本地模式的编程。storm编程一种可以继承BaseRichSpout和BaseBasicBolt类,另一种是实现IRichSpout和IRichBolt接口。

maven下载storm jar包

从storm官网上可以查到storm maven配置http://storm.apache.org/downloads.html

我这里使用的是storm-0.10.0

groupId: org.apache.storm
artifactId: storm-core
version: 0.10.0

第一种方法 继承类

编写spout

package com.storm.stormDemo;
import java.util.Map;
import java.util.Random;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class RandomSpout extends BaseRichSpout{private SpoutOutputCollector collector;private static String[] words = {"happy","excited","angry"};/* (non-Javadoc)* @see backtype.storm.spout.ISpout#open(java.util.Map, backtype.storm.task.TopologyContext, backtype.storm.spout.SpoutOutputCollector)*/public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) {// TODO Auto-generated method stubthis.collector = arg2;}/* (non-Javadoc)* @see backtype.storm.spout.ISpout#nextTuple()*/public void nextTuple() {// TODO Auto-generated method stubString word = words[new Random().nextInt(words.length)]; collector.emit(new Values(word));}/* (non-Javadoc)* @see backtype.storm.topology.IComponent#declareOutputFields(backtype.storm.topology.OutputFieldsDeclarer)*/public void declareOutputFields(OutputFieldsDeclarer arg0) {// TODO Auto-generated method stubarg0.declare(new Fields("randomstring"));}
}

编写bolt

package com.storm.stormDemo;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class SenqueceBolt extends BaseBasicBolt{/* (non-Javadoc)* @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector)*/public void execute(Tuple input, BasicOutputCollector collector) {// TODO Auto-generated method stubString word = (String) input.getValue(0);  String out = "I'm " + word +  "!";  System.out.println("out=" + out);}/* (non-Javadoc)* @see backtype.storm.topology.IComponent#declareOutputFields(backtype.storm.topology.OutputFieldsDeclarer)*/public void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stub}
}

编写主类

package com.storm.stormDemo;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
public class FirstTopo {public static void main(String[] args) throws Exception {  TopologyBuilder builder = new TopologyBuilder();   builder.setSpout("spout", new RandomSpout());  builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout"); Config conf = new Config();  conf.setDebug(false); if (args != null && args.length > 0) {  conf.setNumWorkers(3);  StormSubmitter.submitTopology(args[0], conf, builder.createTopology());  } else {  LocalCluster cluster = new LocalCluster();  cluster.submitTopology("firstTopo", conf, builder.createTopology());  Utils.sleep(3000);  cluster.killTopology("firstTopo");  cluster.shutdown();  }  }  
}

执行结果

104929_8UPp_2000675.png

第二种方法 实现接口

编写spout

package com.storm.stormDemo; import java.io.BufferedReader; 
import java.io.FileNotFoundException;import java.io.FileReader;  
import java.util.Map;  
import backtype.storm.spout.SpoutOutputCollector;  
import backtype.storm.task.TopologyContext;  
import backtype.storm.topology.IRichSpout; 
import backtype.storm.topology.OutputFieldsDeclarer;  
import backtype.storm.tuple.Fields;  
import backtype.storm.tuple.Values; 
public class WordReader implements IRichSpout {  private static final long serialVersionUID = 1L;  private SpoutOutputCollector collector;  private FileReader fileReader;  private boolean completed = false;  public boolean isDistributed() {  return false;  }  /** * 这是第一个方法,里面接收了三个参数,第一个是创建Topology时的配置, * 第二个是所有的Topology数据,第三个是用来把Spout的数据发射给bolt * **/  public void open(Map conf, TopologyContext context,  SpoutOutputCollector collector) {  try {  //获取创建Topology时指定的要读取的文件路径  this.fileReader = new FileReader(conf.get("wordsFile").toString());  } catch (FileNotFoundException e) {  throw new RuntimeException("Error reading file ["  + conf.get("wordFile") + "]");  }  //初始化发射器  this.collector = collector;  }  /** * 这是Spout最主要的方法,在这里我们读取文本文件,并把它的每一行发射出去(给bolt) * 这个方法会不断被调用,为了降低它对CPU的消耗,当任务完成时让它sleep一下 * **/  public void nextTuple() {  if (completed) {  try {  Thread.sleep(1000);  } catch (InterruptedException e) {  // Do nothing  }  return;  }  String str;  // Open the reader  BufferedReader reader = new BufferedReader(fileReader);  try {  // Read all lines  while ((str = reader.readLine()) != null) {  /** * 发射每一行,Values是一个ArrayList的实现 */  this.collector.emit(new Values(str), str);  }  } catch (Exception e) {  throw new RuntimeException("Error reading tuple", e);  } finally {  completed = true;  }  }  public void declareOutputFields(OutputFieldsDeclarer declarer) {  declarer.declare(new Fields("line"));  }  public void close() {  // TODO Auto-generated method stub  }  public void activate() {  // TODO Auto-generated method stub  }  public void deactivate() {  // TODO Auto-generated method stub  }  public void ack(Object msgId) {  System.out.println("OK:" + msgId);  }  public void fail(Object msgId) {  System.out.println("FAIL:" + msgId);  }  public Map getComponentConfiguration() {  // TODO Auto-generated method stub  return null;  }  
}

编写第一个bolt

package com.storm.stormDemo;  
import java.util.ArrayList;  
import java.util.List;  
import java.util.Map;  
import backtype.storm.task.OutputCollector; 
import backtype.storm.task.TopologyContext;  
import backtype.storm.topology.IRichBolt;  
import backtype.storm.topology.OutputFieldsDeclarer;  
import backtype.storm.tuple.Fields;  
import backtype.storm.tuple.Tuple; 
import backtype.storm.tuple.Values;  
public class WordNormalizer implements IRichBolt {  private OutputCollector collector;  public void prepare(Map stormConf, TopologyContext context,  OutputCollector collector) {  this.collector = collector;  }  /**这是bolt中最重要的方法,每当接收到一个tuple时,此方法便被调用 * 这个方法的作用就是把文本文件中的每一行切分成一个个单词,并把这些单词发射出去(给下一个bolt处理) * **/  public void execute(Tuple input) {  String sentence = input.getString(0);  String[] words = sentence.split(" ");  for (String word : words) {  word = word.trim();  if (!word.isEmpty()) {  word = word.toLowerCase();  // Emit the word  List a = new ArrayList();  a.add(input);  collector.emit(a, new Values(word));  }  }  //确认成功处理一个tuple  collector.ack(input);  }  public void declareOutputFields(OutputFieldsDeclarer declarer) {  declarer.declare(new Fields("word"));  }  public void cleanup() {  // TODO Auto-generated method stub  }  public Map getComponentConfiguration() {  // TODO Auto-generated method stub  return null;  }  
}

编写第二个bolt

package com.storm.stormDemo;  
import java.util.HashMap;  
import java.util.Map;  
import backtype.storm.task.OutputCollector;  
import backtype.storm.task.TopologyContext;  
import backtype.storm.topology.IRichBolt;  
import backtype.storm.topology.OutputFieldsDeclarer;  
import backtype.storm.tuple.Tuple;  public class WordCounter implements IRichBolt {  Integer id;  String name;  Map counters;  private OutputCollector collector;  public void prepare(Map stormConf, TopologyContext context,  OutputCollector collector) {  this.counters = new HashMap();  this.collector = collector;  this.name = context.getThisComponentId();  this.id = context.getThisTaskId();  }  public void execute(Tuple input) {  String str = input.getString(0);  if (!counters.containsKey(str)) {  counters.put(str, 1);  } else {  Integer c = counters.get(str) + 1;  counters.put(str, c);  }  // 确认成功处理一个tuple  collector.ack(input);  }  /** * Topology执行完毕的清理工作,比如关闭连接、释放资源等操作都会写在这里 * 因为这只是个Demo,我们用它来打印我们的计数器 * */  public void cleanup() {  System.out.println("-- Word Counter [" + name + "-" + id + "] --");  for (Map.Entry entry : counters.entrySet()) {  System.out.println(entry.getKey() + ": " + entry.getValue());  }  counters.clear();  }  public void declareOutputFields(OutputFieldsDeclarer declarer) {  // TODO Auto-generated method stub  }  public Map getComponentConfiguration() {  // TODO Auto-generated method stub  return null;  }  
}

编写主类

package com.storm.stormDemo; 
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
public class WordCountTopologyMain {  public static void main(String[] args) throws InterruptedException {  //定义一个Topology  TopologyBuilder builder = new TopologyBuilder();  builder.setSpout("word-reader",new WordReader(),1);  builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");  builder.setBolt("word-counter", new WordCounter(),2).fieldsGrouping("word-normalizer", new Fields("word"));  //配置  Config conf = new Config();  conf.put("wordsFile", "f:/test.txt");  conf.setDebug(false);  //提交Topology  conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);  //创建一个本地模式cluster  LocalCluster cluster = new LocalCluster();  cluster.submitTopology("Getting-Started-Toplogie", conf,builder.createTopology());  Utils.sleep(3000);  cluster.killTopology("Getting-Started-Toplogie");  cluster.shutdown();  }  
}

执行结果

105514_LQRg_2000675.png

更多实例请参考:

http://ifeve.com/getting-started-with-storm6/

http://www.aboutyun.com/thread-8080-1-1.html

总结

在spout读取文件的时候,如果你设置spout的数量为2的时候,读取的数据就会重复,bolt就会处理重复的数据,这是我们在编写spout的时候要注意的。


转:https://my.oschina.net/u/2000675/blog/610753



推荐阅读
  • Mac OS 升级到11.2.2 Eclipse打不开了,报错Failed to create the Java Virtual Machine
    本文介绍了在Mac OS升级到11.2.2版本后,使用Eclipse打开时出现报错Failed to create the Java Virtual Machine的问题,并提供了解决方法。 ... [详细]
  • Java自带的观察者模式及实现方法详解
    本文介绍了Java自带的观察者模式,包括Observer和Observable对象的定义和使用方法。通过添加观察者和设置内部标志位,当被观察者中的事件发生变化时,通知观察者对象并执行相应的操作。实现观察者模式非常简单,只需继承Observable类和实现Observer接口即可。详情请参考Java官方api文档。 ... [详细]
  • 使用eclipse创建一个Java项目的步骤
    本文介绍了使用eclipse创建一个Java项目的步骤,包括启动eclipse、选择New Project命令、在对话框中输入项目名称等。同时还介绍了Java Settings对话框中的一些选项,以及如何修改Java程序的输出目录。 ... [详细]
  • 如何实现JDK版本的切换功能,解决开发环境冲突问题
    本文介绍了在开发过程中遇到JDK版本冲突的情况,以及如何通过修改环境变量实现JDK版本的切换功能,解决开发环境冲突的问题。通过合理的切换环境,可以更好地进行项目开发。同时,提醒读者注意不仅限于1.7和1.8版本的转换,还要适应不同项目和个人开发习惯的需求。 ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • 本文讨论了Alink回归预测的不完善问题,指出目前主要针对Python做案例,对其他语言支持不足。同时介绍了pom.xml文件的基本结构和使用方法,以及Maven的相关知识。最后,对Alink回归预测的未来发展提出了期待。 ... [详细]
  • Centos7.6安装Gitlab教程及注意事项
    本文介绍了在Centos7.6系统下安装Gitlab的详细教程,并提供了一些注意事项。教程包括查看系统版本、安装必要的软件包、配置防火墙等步骤。同时,还强调了使用阿里云服务器时的特殊配置需求,以及建议至少4GB的可用RAM来运行GitLab。 ... [详细]
  • 关键词:Golang, Cookie, 跟踪位置, net/http/cookiejar, package main, golang.org/x/net/publicsuffix, io/ioutil, log, net/http, net/http/cookiejar ... [详细]
  • Android系统移植与调试之如何修改Android设备状态条上音量加减键在横竖屏切换的时候的显示于隐藏
    本文介绍了如何修改Android设备状态条上音量加减键在横竖屏切换时的显示与隐藏。通过修改系统文件system_bar.xml实现了该功能,并分享了解决思路和经验。 ... [详细]
  • 在Xamarin XAML语言中如何在页面级别构建ControlTemplate控件模板
    本文介绍了在Xamarin XAML语言中如何在页面级别构建ControlTemplate控件模板的方法和步骤,包括将ResourceDictionary添加到页面中以及在ResourceDictionary中实现模板的构建。通过本文的阅读,读者可以了解到在Xamarin XAML语言中构建控件模板的具体操作步骤和语法形式。 ... [详细]
  • CEPH LIO iSCSI Gateway及其使用参考文档
    本文介绍了CEPH LIO iSCSI Gateway以及使用该网关的参考文档,包括Ceph Block Device、CEPH ISCSI GATEWAY、USING AN ISCSI GATEWAY等。同时提供了多个参考链接,详细介绍了CEPH LIO iSCSI Gateway的配置和使用方法。 ... [详细]
  • Week04面向对象设计与继承学习总结及作业要求
    本文总结了Week04面向对象设计与继承的重要知识点,包括对象、类、封装性、静态属性、静态方法、重载、继承和多态等。同时,还介绍了私有构造函数在类外部无法被调用、static不能访问非静态属性以及该类实例可以共享类里的static属性等内容。此外,还提到了作业要求,包括讲述一个在网上商城购物或在班级博客进行学习的故事,并使用Markdown的加粗标记和语句块标记标注关键名词和动词。最后,还提到了参考资料中关于UML类图如何绘制的范例。 ... [详细]
  • 图像因存在错误而无法显示 ... [详细]
  • Activiti7流程定义开发笔记
    本文介绍了Activiti7流程定义的开发笔记,包括流程定义的概念、使用activiti-explorer和activiti-eclipse-designer进行建模的方式,以及生成流程图的方法。还介绍了流程定义部署的概念和步骤,包括将bpmn和png文件添加部署到activiti数据库中的方法,以及使用ZIP包进行部署的方式。同时还提到了activiti.cfg.xml文件的作用。 ... [详细]
  • Python使用Pillow包生成验证码图片的方法
    本文介绍了使用Python中的Pillow包生成验证码图片的方法。通过随机生成数字和符号,并添加干扰象素,生成一幅验证码图片。需要配置好Python环境,并安装Pillow库。代码实现包括导入Pillow包和随机模块,定义随机生成字母、数字和字体颜色的函数。 ... [详细]
author-avatar
yan雀安知鸿鹄之志_647
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有