热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

flume与kafka的整合

案例1:syslog-memory-kafka将flume采集到的数据落地到kafka上,即sink是kafka(生产者身份)vimsyslog-mem-kafka.conf

案例1:syslog-memory-kafka

将flume采集到的数据落地到kafka上,即sink是kafka(生产者身份)

vim syslog-mem-kafka.conf
# 命名个组件
a1.sources = r1
a1.sinks = k1
a1.channels = c1#source属性
a1.sources.r1.type = syslogtcp
a1.sources.r1.host=mypc01
a1.sources.r1.port=10086# 描述channel属性
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 描述sink属性
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = mypc01:9092,mypc:9092,mypc03:9092
# 主题必须提前存在
a1.sinks.k1.kafka.topic = pet
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1# 关联source和sink到channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

在这里插入图片描述
启动flume

#bin/bash
/usr/local/flume/bin/flume-ng agent -c /usr/local/flume/conf \
-f /usr/local/flume/flumeconf/syslog-mem-kafka.conf \
-n a1 -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=31002

先启动消费者准备接受消息

kafka-console-consumer.sh \
--bootstrap-server mypc01:9092,mypc02:9092,mypc03:9092 \
--topic pet

测试

echo "aaaaa" | nc mypc01 10086

案例2 kafka-memory-hdfs

kafka的source类型从kafka集群读取数据,就是消费者身份,将数据封装成event落地到hdfs
在这里插入图片描述

vim kafka-mem-kafka.conf
# 命名个组件
a1.sources = r1
a1.sinks = k1
a1.channels = c1#source属性
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = mypc01:9092,mypc02:9092,mypc03:9092
a1.sources.r1.kafka.consumer.group.id=g1
a1.sources.r1.kafka.topics=pet# 描述channel属性
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# 描述sink属性
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://mypc01:8020/kafka/pet/%Y%m%d
a1.sinks.k1.hdfs.filePrefix=FlumeData
a1.sinks.k1.hdfs.fileSuffix = .kafka
a1.sinks.k1.hdfs.rollSize=102400
a1.sinks.k1.hdfs.rollCount = 0
#单位为s
b1001.sinks.k1.hdfs.rollInterval=60
b1001.sinks.k1.hdfs.useLocalTimeStamp = true# 关联source和sink到channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动flume

#bin/bash
/usr/local/flume/bin/flume-ng agent -c /usr/local/flume/conf \
-f /usr/local/flume/flumeconf/kafka-mem-hdfs.conf \
-n a1 -Dflume.root.logger=INFO,console -Dflume.monitoring.type=http -Dflume.monitoring.port=31002

启动生产者,使用生产者发送消息

kafka-console-producer.sh \
--broker-list mypc01:9092,mypc02:9092,mypc03:9092 \
--topic pet

之后就可以在hdfs上看到生成的文件了.

在这里插入图片描述


总结


  • kafka可以作为source,也可以作为sink

推荐阅读
  • ZABBIX 3.0 配置监控NGINX性能【OK】
    1.在agent端查看配置:nginx-V查看编辑时是否加入状态监控模块:--with-http_stub_status_module--with-http_gzip_stat ... [详细]
  • Python瓦片图下载、合并、绘图、标记的代码示例
    本文提供了Python瓦片图下载、合并、绘图、标记的代码示例,包括下载代码、多线程下载、图像处理等功能。通过参考geoserver,使用PIL、cv2、numpy、gdal、osr等库实现了瓦片图的下载、合并、绘图和标记功能。代码示例详细介绍了各个功能的实现方法,供读者参考使用。 ... [详细]
  • 本文介绍了在mac环境下使用nginx配置nodejs代理服务器的步骤,包括安装nginx、创建目录和文件、配置代理的域名和日志记录等。 ... [详细]
  • 本文介绍了Python爬虫技术基础篇面向对象高级编程(中)中的多重继承概念。通过继承,子类可以扩展父类的功能。文章以动物类层次的设计为例,讨论了按照不同分类方式设计类层次的复杂性和多重继承的优势。最后给出了哺乳动物和鸟类的设计示例,以及能跑、能飞、宠物类和非宠物类的增加对类数量的影响。 ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • 本文介绍了在MacOS系统上安装MySQL的步骤,并详细说明了如何设置MySQL服务的开机启动和如何修改MySQL的密码。通过下载MySQL的macos版本并按照提示一步一步安装,在系统偏好设置中可以找到MySQL的图标进行设置。同时,还介绍了通过终端命令来修改MySQL的密码的具体操作步骤。 ... [详细]
  • Netty源代码分析服务器端启动ServerBootstrap初始化
    本文主要分析了Netty源代码中服务器端启动的过程,包括ServerBootstrap的初始化和相关参数的设置。通过分析NioEventLoopGroup、NioServerSocketChannel、ChannelOption.SO_BACKLOG等关键组件和选项的作用,深入理解Netty服务器端的启动过程。同时,还介绍了LoggingHandler的作用和使用方法,帮助读者更好地理解Netty源代码。 ... [详细]
  • 本文整理了Java中org.gwtbootstrap3.client.ui.Icon.addDomHandler()方法的一些代码示例,展示了Icon.ad ... [详细]
  • PHP连接MySQL的2种方法小结以及防止乱码【PHP】
    后端开发|php教程PHP,MySQL,乱码后端开发-php教程PHP的MySQL配置报错信息:ClassmysqlinotfoundinAnswer:1.在confphp.ini ... [详细]
  • 视图分区_组复制常规操作网络分区amp;混合使用IPV6与IPV4 | 全方位认识 MySQL 8.0 Group Replication...
    网络分区对于常规事务而言,每当组内有事务数据需要被复制时,组内的成员需要达成共识(要么都提交,要么都回滚)。对于组成员资格的变更也和保持组 ... [详细]
  • PHP图片截取方法及应用实例
    本文介绍了使用PHP动态切割JPEG图片的方法,并提供了应用实例,包括截取视频图、提取文章内容中的图片地址、裁切图片等问题。详细介绍了相关的PHP函数和参数的使用,以及图片切割的具体步骤。同时,还提供了一些注意事项和优化建议。通过本文的学习,读者可以掌握PHP图片截取的技巧,实现自己的需求。 ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • 如何使用Java获取服务器硬件信息和磁盘负载率
    本文介绍了使用Java编程语言获取服务器硬件信息和磁盘负载率的方法。首先在远程服务器上搭建一个支持服务端语言的HTTP服务,并获取服务器的磁盘信息,并将结果输出。然后在本地使用JS编写一个AJAX脚本,远程请求服务端的程序,得到结果并展示给用户。其中还介绍了如何提取硬盘序列号的方法。 ... [详细]
  • Python项目实战10.2:MySQL读写分离性能优化
    本文介绍了在Python项目实战中进行MySQL读写分离的性能优化,包括主从同步的配置和Django实现,以及在两台centos 7系统上安装和配置MySQL的步骤。同时还介绍了创建从数据库的用户和权限的方法。摘要长度为176字。 ... [详细]
  • Annotation的大材小用
    为什么80%的码农都做不了架构师?最近在开发一些通用的excel数据导入的功能,由于涉及到导入的模块很多,所以开发了一个比较通用的e ... [详细]
author-avatar
在这里啊
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有