wKiom1YXgJTSc0iIAANa7Kqg9ks285.jpg




wKiom1YXgdfhGaezAAKdR4FlMw8922.jpg


wKiom1YXj07ThBGrAAJY1KXzirs696.jpg


1.  构建拓扑代码

package demo;import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;public class AreaAmtTopo {public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new OrderBaseSpout(KafkaProperties.Order_topic),5);
builder.setBolt("filter",new AreaFilterBolt(),5).shuffleGrouping("spout");
builder.setBolt("areabolt",new AreaAmtBolt(),2).fieldsGrouping("filter",new Fields("area_id"));
builder.setBolt("rsltbolt",new AreaRsltBolt(),1).shuffleGrouping("areabolt");}}



2.一级过滤bolt

package demo;import java.util.Map;import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
//一级的过滤bolt
public class AreaFilterBolt implements IBasicBolt {@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {// TODO Auto-generated method stubdeclarer.declare(new Fields("area_id","order_amt","create_time"));//tuple里面每个value的对应name}@Overridepublic Map getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}@Overridepublic void cleanup() {// TODO Auto-generated method stub}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {//order_id,order_amt,create_time,area_idString order=input.getString(0);//取出集合values中的第一个valueif(order!=null){String orderArr[]=order.split("\\t");collector.emit(new Values(orderArr[3],orderArr[1],DateFmt.getCountDate(orderArr[2], DateFmt.date_short)));//area_id,order_amt,create_time}}@Overridepublic void prepare(Map arg0, TopologyContext arg1) {// TODO Auto-generated method stub}}


3.局部汇总bolt(按日期和区域和汇总)

package demo;import java.util.HashMap;
import java.util.Map;import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;//局部汇总
public class AreaAmtBolt implements IBasicBolt {Map countsMap=null;@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("date_area","amt"));}@Overridepublic Map getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}@Overridepublic void prepare(Map paramMap, TopologyContext paramTopologyContext) {// TODO Auto-generated method stubcountsMap =new HashMap();}@Overridepublic void execute(Tuple input,BasicOutputCollector collector) {if(input!=null)//如果spout端没数据就会发空值,所以要做判断再往下发{String area_id=input.getString(0);Double order_amt=input.getDouble(1);String  order_date=input.getStringByField("order_date");Double count=countsMap.get(area_id+"_"+order_date);if (count==null){count = 0.0;    }count+=order_amt;countsMap.put(area_id+"_"+order_date,count);System.err.println("areaAmtBolt"+order_date+"_"+area_id+"="+count);collector.emit(new Values(area_id+"_"+order_date,count));}}@Overridepublic void cleanup() {countsMap.clear();}}


4. 最终结果写入Hbase

package demo;import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;//结果定时写入hbase的bolt
public class AreaRsltBolt implements IBasicBolt {Map countsMap=null;long beginTime=System.currentTimeMillis();long endTime=0L;HBaseDao dao=null;@Overridepublic void declareOutputFields(OutputFieldsDeclarer paramOutputFieldsDeclarer) {// TODO Auto-generated method stub}@Overridepublic Map getComponentConfiguration() {// TODO Auto-generated method stubreturn null;}@Overridepublic void prepare(Map paramMap, TopologyContext paramTopologyContext) {countsMap =new HashMap();dao=new HBaseDAOImp();}@Overridepublic void execute(Tuple input,BasicOutputCollector paramBasicOutputCollector) {String date_areaid=input.getString(0);double  order_amt=input.getDouble(1); countsMap.put(date_areaid,order_amt);endTime=System.currentTimeMillis();if (endTime-beginTime>=5*1000){for(String key:countsMap.keySet()){//put into hbase//2014-05-05_1,amtdao.insert("area_order","cf","order_amt",countsMap.get(key));System.err.println("rsltBolt put hbase: key="+key+"; order_amt="+countsMap.get(key));}beginTime=System.currentTimeMillis();}}@Overridepublic void cleanup() {// TODO Auto-generated method stub}}


5. DateFmt代码

package demo;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;public class DateFmt {public static final String date_long="yyyy-MM-dd HH:mm:ss";public static final String date_short="yyyy-MM-dd";public static SimpleDateFormat sdf=new SimpleDateFormat(date_short);public static String getCountDate(String date,String patton){SimpleDateFormat sdf=new SimpleDateFormat(patton);Calendar cal =Calendar.getInstance();if (date!=null){try {cal.setTime(sdf.parse(date));} catch (ParseException e) {e.printStackTrace();}}return sdf.format(cal.getTime());}public static Date parseDate(String dateStr) throws Exception{return sdf.parse(dateStr);}public static void main(String[] args) {System.out.println(DateFmt.getCountDate("2015-09-08 09:09:08 ", DateFmt.date_long));}
}