第98课: 使用Spark Streaming实战对论坛网站动态行为的多维度分析(上)
三件事情:
1、明天开始100天,超越美国开发团队
2、2016年,投资1亿美金,投资100家spark创业服务公司,技术支撑
3、100天地狱式、死亡式训练
================================================================================
场景: 电商 库存,物流,采购
这节课程的内容
1、kafka的producer,模拟用户的点击行为,通过代码的方式操作kafka
2、pv或uv
SparkSQLDataManually.java的代码复制过来SparkStreamingDataMannuallyProducerForKafka
生成的数据作为Producer的方式发送给Kafka,然后SparkSTreaming程序在Kafka中在线pull到论坛或网站的用
户在线行为信息,进行多维度的分析
给kafka写数据,要弄一个线程,不断的写数据,那我们构建一个线程;或者继承线程也可以,简化一点。
public class SparkStreamingDataMannuallyProducerForKafka extends Thread
写一个构造器,指定topic:
private String topic ;//发送给Kafka的数据的类别
private Producer producerForKafka;
private static Sting dateToday;
private static Random random;
public SparkStreamingDataMannuallyProducerForKafka(String topic){
dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
this.topic = topic ;
//连上kafka
Properties cOnf= new Properties();
conf.put("metadata.broker.list","master:9092,worker1:9092,worker2:9092");
conf.put("serializer.class","kafka.serializer.StringEncoder");
//构造器的核心是要生成数据
//先要导入kafka的jar包 import kafka。producer
producerForKafka= new Producer (new ProducerConfig(conf));
}
//作为线程而言,要复写run方法,先写业务逻辑,再写控制
public void run() {
int cunter =0 ;//搞500条
while(true){ //模拟实际情况,不断循环,异步过程,不可能是同步
counter ++;
String userLog = userLogs();
System.out.println("product:"+userLog );
//send是核心代码
producerForKafka.send(new KeyedMessage(topic, userLog ));
if( 0 == counter%500) {
counter = 0;
try {
Thread.sleep(1000);}catch (....)
}
}
userLogs()方法中 修改代码
userLogBuffer.append(dateToday)
======================================
main中很简单了
public static void main (String[] args){
new SparkStreamingDataMannuallyProducerForKafka("userlogs").start();
}
============================
数据生成好了,下面写一下uv pv
===========================
OnlineBBSUserLogs.java
val sparkCOnf= new SparkConf().setAppName("OnlineBBSUserLogs ")
.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(15))
Map kafkaParameters =new HashMap();
kafkaParameters .put("metadata.broker.list","master:9092,worker1:9092,worker2:9092");
Set topics = new HashSet();
topics.add("UserLogs");
//StringDecoder.class要导入kafka的包
JavaPairInputDStream lines = KafkaUtils.createDirectStream
(jsc,String.class,String.class,
StringDecoder.class,StringDecoder.class,kafkaParametes,topics);
看源代码,createDirectStream返回的类型看一下,继承InputDStream。
业务代码找pv,有两种view register,那么写一个filter代码
JavaPairDStream logsDStream= lines.filter(new
Funtion,Boolean(){
@Override
public Boolean call(Tuple2, v1) throws Exception {
String[] logs = v1._2.split("\t");
String action = logs[5];
if ("view".equals(action)) {
return true;
} else {
return false; }
}
});
JavaPairDStream pairs= logsDStream.mapToPair(new
PairFuntion,Long,Long (){
@Override
public Tuple2 call(Tuple2, t) throws Exception {
String[] logs = t._2.split("\t");
Long pageId=Long.valudeOf(logs[3]);
retun new Tuple2(pageId=Long.valudeOf,1L);
}
});
//过滤出了目标数据,下一步reducebykey
JavaPairDStream wordsCount = pairs.reduceByKey(new Function2
Long, Long>() { //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
//打印出来,在企业生产环境中,一般会把计算的数据保存到Redis或者DB中,采用J2EE等技术进行趋势绘制,
领导爱看这个,就像动态更新的股票交易一样,实现在线的监控
wordsCount.print();
jsc.start();
jsc.awaitTermination();