热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

第98课:使用SparkStreaming实战对论坛网站动态行为的多维度分析(上)

第98课:使用SparkStreaming实战对论坛网站动态行为的多维度分析(上)三件事情:1、明天开始100天,超越美国开发团队2、2016年,投资1亿美金,投资100家spark

第98课:  使用Spark Streaming实战对论坛网站动态行为的多维度分析(上)
三件事情:
1、明天开始100天,超越美国开发团队
2、2016年,投资1亿美金,投资100家spark创业服务公司,技术支撑
3、100天地狱式、死亡式训练
================================================================================
场景: 电商 库存,物流,采购
这节课程的内容
1、kafka的producer,模拟用户的点击行为,通过代码的方式操作kafka
2、pv或uv

SparkSQLDataManually.java的代码复制过来SparkStreamingDataMannuallyProducerForKafka
生成的数据作为Producer的方式发送给Kafka,然后SparkSTreaming程序在Kafka中在线pull到论坛或网站的用

户在线行为信息,进行多维度的分析

给kafka写数据,要弄一个线程,不断的写数据,那我们构建一个线程;或者继承线程也可以,简化一点。
public class SparkStreamingDataMannuallyProducerForKafka extends Thread
写一个构造器,指定topic:
private String topic ;//发送给Kafka的数据的类别
private Producer producerForKafka;
private static Sting dateToday;
private static   Random random;


public SparkStreamingDataMannuallyProducerForKafka(String topic){
      dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
     this.topic = topic ;
     //连上kafka
     Properties cOnf= new Properties();
     conf.put("metadata.broker.list","master:9092,worker1:9092,worker2:9092");
     conf.put("serializer.class","kafka.serializer.StringEncoder");
  //构造器的核心是要生成数据
          //先要导入kafka的jar包 import kafka。producer
     producerForKafka= new Producer (new ProducerConfig(conf));
   
}
//作为线程而言,要复写run方法,先写业务逻辑,再写控制

public void run() {
               int cunter =0 ;//搞500条
  while(true){ //模拟实际情况,不断循环,异步过程,不可能是同步
  counter ++; 
   String userLog = userLogs();
     System.out.println("product:"+userLog );
    //send是核心代码
    producerForKafka.send(new KeyedMessage(topic, userLog ));
  
                 if( 0 == counter%500) {
                    counter = 0;
                    try {
                    Thread.sleep(1000);}catch (....)
}

 }


userLogs()方法中 修改代码
userLogBuffer.append(dateToday)


======================================
main中很简单了
public static void main (String[] args){
   new SparkStreamingDataMannuallyProducerForKafka("userlogs").start();
}


============================
数据生成好了,下面写一下uv pv
===========================

OnlineBBSUserLogs.java
val sparkCOnf= new SparkConf().setAppName("OnlineBBSUserLogs ")
                           .setMaster("local[2]")
 val ssc = new StreamingContext(sparkConf, Seconds(15))

Map kafkaParameters =new HashMap();

kafkaParameters .put("metadata.broker.list","master:9092,worker1:9092,worker2:9092");

Set topics = new HashSet();

topics.add("UserLogs");

//StringDecoder.class要导入kafka的包
JavaPairInputDStream lines = KafkaUtils.createDirectStream

(jsc,String.class,String.class,          

StringDecoder.class,StringDecoder.class,kafkaParametes,topics);

看源代码,createDirectStream返回的类型看一下,继承InputDStream。

业务代码找pv,有两种view register,那么写一个filter代码
JavaPairDStream logsDStream= lines.filter(new

Funtion,Boolean(){
   @Override
   public Boolean call(Tuple2, v1) throws Exception {
      String[] logs = v1._2.split("\t");
      String action = logs[5];
      if ("view".equals(action)) {
            return true;
          } else {
             return false; }
     
  }

});
  
JavaPairDStream pairs= logsDStream.mapToPair(new

PairFuntion,Long,Long (){

 @Override
   public Tuple2 call(Tuple2, t) throws Exception {
     String[] logs = t._2.split("\t");
     Long pageId=Long.valudeOf(logs[3]);
      retun new Tuple2(pageId=Long.valudeOf,1L);

}
});

//过滤出了目标数据,下一步reducebykey
  JavaPairDStream wordsCount = pairs.reduceByKey(new Function2

Long, Long>() { //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce)
   
   @Override
   public Long call(Long v1, Long v2) throws Exception {
    return v1 + v2;
   }
  });

//打印出来,在企业生产环境中,一般会把计算的数据保存到Redis或者DB中,采用J2EE等技术进行趋势绘制,

领导爱看这个,就像动态更新的股票交易一样,实现在线的监控
 wordsCount.print();
  

  jsc.start();
  
  jsc.awaitTermination();


推荐阅读
  • 什么是大数据lambda架构
    一、什么是Lambda架构Lambda架构由Storm的作者[NathanMarz]提出,根据维基百科的定义,Lambda架构的设计是为了在处理大规模数 ... [详细]
  • 使用freemaker生成Java代码的步骤及示例代码
    本文介绍了使用freemaker这个jar包生成Java代码的步骤,通过提前编辑好的模板,可以避免写重复代码。首先需要在springboot的pom.xml文件中加入freemaker的依赖包。然后编写模板,定义要生成的Java类的属性和方法。最后编写生成代码的类,通过加载模板文件和数据模型,生成Java代码文件。本文提供了示例代码,并展示了文件目录结构。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 感谢大家对IT十八掌大数据的支持,今天的作业如下:1.实践PreparedStament的CRUD操作。2.对比Statement和PreparedStatement的大批量操作耗时?(1 ... [详细]
  • SpringBoot uri统一权限管理的实现方法及步骤详解
    本文详细介绍了SpringBoot中实现uri统一权限管理的方法,包括表结构定义、自动统计URI并自动删除脏数据、程序启动加载等步骤。通过该方法可以提高系统的安全性,实现对系统任意接口的权限拦截验证。 ... [详细]
  • 本文讨论了如何优化解决hdu 1003 java题目的动态规划方法,通过分析加法规则和最大和的性质,提出了一种优化的思路。具体方法是,当从1加到n为负时,即sum(1,n)sum(n,s),可以继续加法计算。同时,还考虑了两种特殊情况:都是负数的情况和有0的情况。最后,通过使用Scanner类来获取输入数据。 ... [详细]
  • 本文介绍了Swing组件的用法,重点讲解了图标接口的定义和创建方法。图标接口用来将图标与各种组件相关联,可以是简单的绘画或使用磁盘上的GIF格式图像。文章详细介绍了图标接口的属性和绘制方法,并给出了一个菱形图标的实现示例。该示例可以配置图标的尺寸、颜色和填充状态。 ... [详细]
  • 本文介绍了使用Python解析C语言结构体的方法,包括定义基本类型和结构体类型的字典,并提供了一个示例代码,展示了如何解析C语言结构体。 ... [详细]
  • 本文介绍了在Python中使用zlib模块进行字符串的压缩与解压缩的方法,并探讨了其在内存优化方面的应用。通过压缩存储URL等长字符串,可以大大降低内存消耗,虽然处理时间会增加,但是整体效果显著。同时,给出了参考链接,供进一步学习和应用。 ... [详细]
  • 纠正网上的错误:自定义一个类叫java.lang.System/String的方法
    本文纠正了网上关于自定义一个类叫java.lang.System/String的错误答案,并详细解释了为什么这种方法是错误的。作者指出,虽然双亲委托机制确实可以阻止自定义的System类被加载,但通过自定义一个特殊的类加载器,可以绕过双亲委托机制,达到自定义System类的目的。作者呼吁读者对网上的内容持怀疑态度,并带着问题来阅读文章。 ... [详细]
  • ***byte(字节)根据长度转成kb(千字节)和mb(兆字节)**parambytes*return*publicstaticStringbytes2kb(longbytes){ ... [详细]
  • 本文介绍了解决java开源项目apache commons email简单使用报错的方法,包括使用正确的JAR包和正确的代码配置,以及相关参数的设置。详细介绍了如何使用apache commons email发送邮件。 ... [详细]
  • 2019独角兽企业重金招聘Python工程师标准
    本文介绍了2019独角兽企业对Python工程师的招聘标准,包括在AndroidManifest中定义meta-data的方法和获取meta-data值的代码。同时提供了获取meta-data值的具体实现方法。转载文章链接:https://my.oschina.net/u/244918/blog/685127 ... [详细]
  • JAVA调用存储过程CallableStatement对象的方法及使用示例
    本文介绍了使用JAVA调用存储过程CallableStatement对象的方法,包括创建CallableStatement对象、传入IN参数、注册OUT参数、传入INOUT参数、检索结果和OUT参数、处理NULL值等。通过示例代码演示了具体的调用过程。 ... [详细]
  • MySQL锁--(深入浅出读书笔记)
    MySQL锁的概述1.针对不同的引擎,采用不同的锁机制;(表锁,页面锁,行锁)myisam和memory存储引擎:表级锁;BOB存储引擎:页面锁,表级 ... [详细]
author-avatar
友爱锦锦_950
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有