热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flume+Kafka+Spark小案例

🎃生产者&消费者小案例主要通过Flume进行数据的收集,作为数据的生产者,接入kafka作为消息中间件,用SparkStr

🎃生产者&消费者小案例




主要通过Flume进行数据的收集,作为数据的生产者,接入kafka作为消息中间件,用SparkStreaming作为消费者进行数据的消费【前提Linux已准备好Flume+Kafka环境,网上很多教程这里不多赘述,直接进入主要内容】




1、准备Flume的配置文件

apache-flume-1.9.0-bin/options/exec2kafka.conf文件:

## flume-ng agent -n a1 -c options/ -f exec2kafka.conf -Dflume.root.logger=INFO,console##定义a1的三个组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1##定义Source的类型
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/baidu.log##定义Channel的类型
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100##定义Sink的类型
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = baidu
a1.sinks.k1.kafka.bootstrap.servers = 192.168.88.101:9092,192.168.88.102:9092,192.168.88.103:9092
a1.sinks.k1.kafka.flumeBatchSize = 10
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1##组装source channel sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1



2、编写SparkStreaming消费者

object KafkaSpark {def main(args: Array[String]): Unit = {//创建val sparkConf = new SparkConf().setMaster("local[2]").setAppName("Hello07Kafka")val streamingContext = new StreamingContext(sparkConf, Seconds(1))//配置信息val kafkaParams = Map[String, Object]("bootstrap.servers" -> "node01:9092,node02:9092,node03:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "yjx_kafka","auto.offset.reset" -> "earliest","enable.auto.commit" -> (false: java.lang.Boolean))//主题val topics = Array("baidu")//开始创建Kafkaval linesDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils createDirectStream(streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaParams))//打印数据linesDStream.map(_.value()).foreachRDD(rdd => rdd.foreach(println))streamingContext.start()streamingContext.awaitTermination()}
}



3、启动环境


① 启动zookeeper集群

【1】【2】【3】zkServer.sh start



② 启动kafka集群

#启动kafka集群
【1】【2】【3】kafka-server-start.sh /opt/yjx/kafka_2.12-0.11.0.3/config/server.properties
#创建主题
【1】kafka-topics.sh --zookeeper node01:2181 --create --replication-factor 2 --partitions 3 --topic baidu

③ 启动Flume

#flume启动命令
【1】flume-ng agent -n a1 -f /opt/yjx/apache-flume-1.9.0-bin/options/exec2kafka.conf -Dflume.root.logger=INFO,console

最后启动消费者SparkStreaming代码~


在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述



4、进行数据测试


① 通过ping baidu进行测试

#ping结果到baidu.log文件中
ping www.baidu.com >> /root/baidu.log 2>&1 &

在这里插入图片描述
在这里插入图片描述



② 通过文件追加

[root@node01 ~]# echo helloworld>>/root/baidu.log
[root@node01 ~]# echo helloworld1>>/root/baidu.log
[root@node01 ~]# echo helloworld2>>/root/baidu.log

在这里插入图片描述



推荐阅读
  • Java在运行已编译完成的类时,是通过java虚拟机来装载和执行的,java虚拟机通过操作系统命令JAVA_HOMEbinjava–option来启 ... [详细]
  • Nginx使用AWStats日志分析的步骤及注意事项
    本文介绍了在Centos7操作系统上使用Nginx和AWStats进行日志分析的步骤和注意事项。通过AWStats可以统计网站的访问量、IP地址、操作系统、浏览器等信息,并提供精确到每月、每日、每小时的数据。在部署AWStats之前需要确认服务器上已经安装了Perl环境,并进行DNS解析。 ... [详细]
  • 本文介绍了解决Netty拆包粘包问题的一种方法——使用特殊结束符。在通讯过程中,客户端和服务器协商定义一个特殊的分隔符号,只要没有发送分隔符号,就代表一条数据没有结束。文章还提供了服务端的示例代码。 ... [详细]
  • Linux重启网络命令实例及关机和重启示例教程
    本文介绍了Linux系统中重启网络命令的实例,以及使用不同方式关机和重启系统的示例教程。包括使用图形界面和控制台访问系统的方法,以及使用shutdown命令进行系统关机和重启的句法和用法。 ... [详细]
  • 图解redis的持久化存储机制RDB和AOF的原理和优缺点
    本文通过图解的方式介绍了redis的持久化存储机制RDB和AOF的原理和优缺点。RDB是将redis内存中的数据保存为快照文件,恢复速度较快但不支持拉链式快照。AOF是将操作日志保存到磁盘,实时存储数据但恢复速度较慢。文章详细分析了两种机制的优缺点,帮助读者更好地理解redis的持久化存储策略。 ... [详细]
  • Linux磁盘的分区、格式化的观察和操作步骤
    本文介绍了如何观察Linux磁盘的分区状态,使用lsblk命令列出系统上的所有磁盘列表,并解释了列表中各个字段的含义。同时,还介绍了使用parted命令列出磁盘的分区表类型和分区信息的方法。在进行磁盘分区操作时,根据分区表类型选择使用fdisk或gdisk命令,并提供了具体的分区步骤。通过本文,读者可以了解到Linux磁盘分区和格式化的基本知识和操作步骤。 ... [详细]
  • 利用Visual Basic开发SAP接口程序初探的方法与原理
    本文介绍了利用Visual Basic开发SAP接口程序的方法与原理,以及SAP R/3系统的特点和二次开发平台ABAP的使用。通过程序接口自动读取SAP R/3的数据表或视图,在外部进行处理和利用水晶报表等工具生成符合中国人习惯的报表样式。具体介绍了RFC调用的原理和模型,并强调本文主要不讨论SAP R/3函数的开发,而是针对使用SAP的公司的非ABAP开发人员提供了初步的接口程序开发指导。 ... [详细]
  • Android源码深入理解JNI技术的概述和应用
    本文介绍了Android源码中的JNI技术,包括概述和应用。JNI是Java Native Interface的缩写,是一种技术,可以实现Java程序调用Native语言写的函数,以及Native程序调用Java层的函数。在Android平台上,JNI充当了连接Java世界和Native世界的桥梁。本文通过分析Android源码中的相关文件和位置,深入探讨了JNI技术在Android开发中的重要性和应用场景。 ... [详细]
  • 本文介绍了Linux Shell中括号和整数扩展的使用方法,包括命令组、命令替换、初始化数组以及算术表达式和逻辑判断的相关内容。括号中的命令将会在新开的子shell中顺序执行,括号中的变量不能被脚本余下的部分使用。命令替换可以用于将命令的标准输出作为另一个命令的输入。括号中的运算符和表达式符合C语言运算规则,可以用在整数扩展中进行算术计算和逻辑判断。 ... [详细]
  • Go Cobra命令行工具入门教程
    本文介绍了Go语言实现的命令行工具Cobra的基本概念、安装方法和入门实践。Cobra被广泛应用于各种项目中,如Kubernetes、Hugo和Github CLI等。通过使用Cobra,我们可以快速创建命令行工具,适用于写测试脚本和各种服务的Admin CLI。文章还通过一个简单的demo演示了Cobra的使用方法。 ... [详细]
  • 本文介绍了Windows操作系统的版本及其特点,包括Windows 7系统的6个版本:Starter、Home Basic、Home Premium、Professional、Enterprise、Ultimate。Windows操作系统是微软公司研发的一套操作系统,具有人机操作性优异、支持的应用软件较多、对硬件支持良好等优点。Windows 7 Starter是功能最少的版本,缺乏Aero特效功能,没有64位支持,最初设计不能同时运行三个以上应用程序。 ... [详细]
  • 怎么在PHP项目中实现一个HTTP断点续传功能发布时间:2021-01-1916:26:06来源:亿速云阅读:96作者:Le ... [详细]
  • 解决.net项目中未注册“microsoft.ACE.oledb.12.0”提供程序的方法
    在开发.net项目中,通过microsoft.ACE.oledb读取excel文件信息时,报错“未在本地计算机上注册“microsoft.ACE.oledb.12.0”提供程序”。本文提供了解决这个问题的方法,包括错误描述和代码示例。通过注册提供程序和修改连接字符串,可以成功读取excel文件信息。 ... [详细]
  • Centos7搭建ELK(Elasticsearch、Logstash、Kibana)教程及注意事项
    本文介绍了在Centos7上搭建ELK(Elasticsearch、Logstash、Kibana)的详细步骤,包括下载安装包、安装Elasticsearch、创建用户、修改配置文件等。同时提供了使用华为镜像站下载安装包的方法,并强调了保证版本一致的重要性。 ... [详细]
  • Ihaveaworkfolderdirectory.我有一个工作文件夹目录。holderDir.glob(*)>holder[ProjectOne, ... [详细]
author-avatar
唐僧
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有