热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

sparkstreaming连接消费nsq

为什么80%的码农都做不了架构师?spark-streaming连接消费nsq目的使用NSQ作为消息流使用spark-streaming进行消费对数据进行清洗后

为什么80%的码农都做不了架构师?>>>   hot3.png

spark-streaming连接消费nsq

目的

  • 使用 NSQ作为消息流
  • 使用 spark-streaming 进行消费
  • 对数据进行清洗后,保存到hive仓库中

连接方案

1、编写Spark Streaming Custom Receivers(spark-streaming 自定义接收器),详细见文档

2、使用 nsq 官方提供的Java程序连接包 JavaNSQClient ,详细见文档

详细代码

自定义连接器

ReliableNSQReceiver.scala

import com.github.brainlag.nsq.callbacks.NSQMessageCallback
import com.github.brainlag.nsq.lookup.DefaultNSQLookup
import com.github.brainlag.nsq.{NSQConsumer, NSQMessage}
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiverclass MessageCallbacks(store_fun:String => Unit) extends NSQMessageCallback with Logging {def message(message: NSQMessage): Unit ={val s = new String(message.getMessage())store_fun(s)message.finished()}
}
/* 自定义连接器 */
class ReliableNSQReceiver(host: String, port: Int, topic: String, channel: String)extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {var consumer: NSQConsumer = nulldef onStart() {// 启动通过连接接收数据的线程new Thread("Socket Receiver") {override def run() { receive() }}.start()}def onStop() {logInfo("Stopped receiving")consumer.close}/** 接收数据 */private def receive() {try {val lookup = new DefaultNSQLookuplookup.addLookupAddress(host, port)consumer = new NSQConsumer(lookup, topic, channel, new MessageCallbacks(store))consumer.start} catch {case e: java.net.ConnectException =>restart("Error connecting to " + host + ":" + port, e)case t: Throwable =>restart("Error receiving data", t)}}}

使用连接器

import com.google.gson.JsonParser
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}/*
* 在定义一个 context 之后,您必须执行以下操作.* 通过创建输入 DStreams 来定义输入源.
* 通过应用转换和输出操作 DStreams 定义流计算(streaming computations).
* 开始接收输入并且使用 streamingContext.start() 来处理数据.
* 使用 streamingContext.awaitTermination() 等待处理被终止(手动或者由于任何错误).
* 使用 streamingContext.stop() 来手动的停止处理.*/object ELKStreaming extends Logging{def main(args: Array[String]): Unit &#61;{if (args.length <4) {System.err.println("Usage: ELKStreaming ")System.exit(1)}logInfo("start &#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;>")StreamingExamples.setStreamingLogLevels()val sparkConf &#61; new SparkConf().setAppName("ELKStreaming").setMaster("yarn").set("hive.metastore.uris", "thrift://hadoop15.bigdata.org:9083")// 创建一个批次间隔为10val ssc &#61; new StreamingContext(sparkConf, Seconds(args(2).toInt))// 使用自定义的NSQReceiverval lines &#61; ssc.receiverStream(new ReliableNSQReceiver(args(0), args(1).toInt, "log", "scalatest"))val hiveStream: DStream[(String, String)] &#61; lines.map(line &#61;> prefix_exit(line))// 将计算后的数据保存到hive中hiveStream.foreachRDD(rdd &#61;> {// 利用SparkConf来初始化SparkSession。val sparkSession: SparkSession &#61; SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()// 导入隐式转换来将RDDimport sparkSession.implicits._// 将RDD转换成DFval df: DataFrame &#61; rdd.toDF("str", "ymd")// 取出表中的字段logInfo("df count &#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;&#61;>"&#43; df.count)df.createOrReplaceTempView("spark_logs")sparkSession.sql("insert into "&#43;args(3)&#43;" partition (ymd) select str,ymd from spark_logs")})ssc.start()ssc.awaitTermination()}def prefix_exit(line:String):(String,String) &#61;{// 对数据进行清洗计算val obj &#61; new JsonParser().parse(line).getAsJsonObjectval data_str1 &#61; obj.get("recv_timestamp").toString().split("T|Z|\"")val data_str2 &#61; data_str1(1).split(&#39;-&#39;)val data_str3 &#61; data_str2(1)&#43;"/"&#43;data_str2(2)&#43;"/"&#43;data_str2(0)&#43;" "&#43;data_str1(2)&#43;" [I] "&#43;obj.get("index_type").toString().split("\"")(1)&#43;" "&#43;lineval data_str4 &#61; data_str2(0)&#43;data_str2(1)&#43;data_str2(2)(data_str3.toString(), data_str4.toString())}
}


转:https://my.oschina.net/2devil/blog/3003109



推荐阅读
  • 使用Ubuntu中的Python获取浏览器历史记录原文: ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • Go Cobra命令行工具入门教程
    本文介绍了Go语言实现的命令行工具Cobra的基本概念、安装方法和入门实践。Cobra被广泛应用于各种项目中,如Kubernetes、Hugo和Github CLI等。通过使用Cobra,我们可以快速创建命令行工具,适用于写测试脚本和各种服务的Admin CLI。文章还通过一个简单的demo演示了Cobra的使用方法。 ... [详细]
  • ubuntu用sqoop将数据从hive导入mysql时,命令: ... [详细]
  • 本文介绍了使用Spark实现低配版高斯朴素贝叶斯模型的原因和原理。随着数据量的增大,单机上运行高斯朴素贝叶斯模型会变得很慢,因此考虑使用Spark来加速运行。然而,Spark的MLlib并没有实现高斯朴素贝叶斯模型,因此需要自己动手实现。文章还介绍了朴素贝叶斯的原理和公式,并对具有多个特征和类别的模型进行了讨论。最后,作者总结了实现低配版高斯朴素贝叶斯模型的步骤。 ... [详细]
  • 本文介绍了关系型数据库和NoSQL数据库的概念和特点,列举了主流的关系型数据库和NoSQL数据库,同时描述了它们在新闻、电商抢购信息和微博热点信息等场景中的应用。此外,还提供了MySQL配置文件的相关内容。 ... [详细]
  • AFNetwork框架(零)使用NSURLSession进行网络请求
    本文介绍了AFNetwork框架中使用NSURLSession进行网络请求的方法,包括NSURLSession的配置、请求的创建和执行等步骤。同时还介绍了NSURLSessionDelegate和NSURLSessionConfiguration的相关内容。通过本文可以了解到AFNetwork框架中使用NSURLSession进行网络请求的基本流程和注意事项。 ... [详细]
  • 本文介绍了在Python3中如何使用选择文件对话框的格式打开和保存图片的方法。通过使用tkinter库中的filedialog模块的asksaveasfilename和askopenfilename函数,可以方便地选择要打开或保存的图片文件,并进行相关操作。具体的代码示例和操作步骤也被提供。 ... [详细]
  • Spring源码解密之默认标签的解析方式分析
    本文分析了Spring源码解密中默认标签的解析方式。通过对命名空间的判断,区分默认命名空间和自定义命名空间,并采用不同的解析方式。其中,bean标签的解析最为复杂和重要。 ... [详细]
  • Python实现变声器功能(萝莉音御姐音)的方法及步骤
    本文介绍了使用Python实现变声器功能(萝莉音御姐音)的方法及步骤。首先登录百度AL开发平台,选择语音合成,创建应用并填写应用信息,获取Appid、API Key和Secret Key。然后安装pythonsdk,可以通过pip install baidu-aip或python setup.py install进行安装。最后,书写代码实现变声器功能,使用AipSpeech库进行语音合成,可以设置音量等参数。 ... [详细]
  • 本文介绍了一个Java猜拳小游戏的代码,通过使用Scanner类获取用户输入的拳的数字,并随机生成计算机的拳,然后判断胜负。该游戏可以选择剪刀、石头、布三种拳,通过比较两者的拳来决定胜负。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 本文讨论了如何优化解决hdu 1003 java题目的动态规划方法,通过分析加法规则和最大和的性质,提出了一种优化的思路。具体方法是,当从1加到n为负时,即sum(1,n)sum(n,s),可以继续加法计算。同时,还考虑了两种特殊情况:都是负数的情况和有0的情况。最后,通过使用Scanner类来获取输入数据。 ... [详细]
  • baresip android编译、运行教程1语音通话
    本文介绍了如何在安卓平台上编译和运行baresip android,包括下载相关的sdk和ndk,修改ndk路径和输出目录,以及创建一个c++的安卓工程并将目录考到cpp下。详细步骤可参考给出的链接和文档。 ... [详细]
  • 安卓select模态框样式改变_微软Office风格的多端(Web、安卓、iOS)组件库——Fabric UI...
    介绍FabricUI是微软开源的一套Office风格的多端组件库,共有三套针对性的组件,分别适用于web、android以及iOS,Fab ... [详细]
author-avatar
金婉jessica氵_573
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有