热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Flume笔记一基础

FlumeFlume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定

Flume

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统, Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

架构
在这里插入图片描述

运行机制:

Flume 的核心是把数据从数据源(source)收集过来,在将收集到的数据送到指定的目的地(sink)。为了保证输送的过程一定成功,在送到目的地(sink)之前,会先缓存数据(channel),待数据真正到达目的地(sink)后,flume 在删除自己缓存的数据。

核心的角色是 agent, agent 本身是一个 Java 进程, 一般运行在日志收集节点。 flume 采集系统就是由一个个 agent 所连接起来形成。

agent的三个组件:

Source:
Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。

Channel:
Channel 是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink 运作在不同的速率上。Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个Sink 的读取操作。
Flume 自带两种 Channel:Memory Channel 和 File Channel 以及 Kafka Channel。
Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。
File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。

Sink:
Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。

Event
传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。Event 由 Header 和 Body 两部分组成,Header 用来存放该 event 的一些属性,为 K-V 结构,Body 用来存放该条数据,形式为字节数组。

在这里插入图片描述

安装:

1.解压
2.修改flume-env.sh
添加java环境变量
3.验证
flume-ng version

案例:


nc

netcat 源

1.flume配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 对当前agent的命名组件 a1:当前agent的名字 如果在同一节点有多个agent
# 需要区别开 source,sink,channel后边加s说明可能会有多个组件# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# 描述和配置当前的source 监听的节点和端口# Describe the sink
a1.sinks.k1.type = logger
# sink的类型是log# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# c1类型是内存级别 缓冲大小阈值单位:事件 一次传输的事件量# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 绑定source和sink到channel sink只能绑定一个channel 所以后面没有s

flume启动命令:

flume-ng agent --conf-file 配置文件 --name a1 -Dflume.root.logger=INFO,console
#agent:启动一个agent
#Dflume.root.logger=INFO,console 打印到控制台 不常用

flume启动后相当于开启了一个服务端
在这里插入图片描述
在另一个会话页面:

nc localhost 44444

相当于开启了一个客户端
此时在客户端输入就会在服务端以log形式打印到控制台
在这里插入图片描述
在这里插入图片描述

案例2


利用exec源监控某个文件

Exec Source在启动时运行给定的Unix命令,并期望进程在标准输出上产生连续的数据(除非属性logStdErr设置为true,否则stderr将被丢弃)。 如果进程由于任何原因退出,source也会退出,并且不会生成更多数据。

a1.sources = r1
a1.sinks = k1
a1.channels = c1a1.sources.r1.type = exec
a1.sources.r1.command = tail -f a1.sinks.k1.type = loggera1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

追加内容到要监控的文件

cat 2.txt >> flume.log

在这里插入图片描述

案例3:


flume-hdfs

flume要想将数据输出到hdfs,需要有hadoop相关jar包
在这里插入图片描述
flume官方手册

http://flume.apache.org/releases/content/1.7.0/FlumeUserGuide.html

滚动文件:rollsize 设为 hdfs块大小 128mb

滚动文件夹:用处:一天滚动一个文件夹

可以配合hive分区 按天分区load数据就会很方便

a2.sources = r2
a2.sinks = k2
a2.channels = c2a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/test.log
a2.sources.r2.shell = /bin/bash -c #解析方式a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://192.168.56.20:9000/flume/%Y%m%d/%Ha2.sinks.k2.hdfs.round = true # 按照时间滚动文件夹
a2.sinks.k2.hdfs.roundValue = 1 # 多长时间创建一个新文件夹
a2.sinks.k2.hdfs.roundUnit = hour # 重新定义时间单位a2.sinks.k2.hdfs.useLocalTimeStamp = true # 使用本地时间戳a2.sinks.k2.hdfs.batchSize = 1000 # 积攒多少个Event flush到hdfs一次
a2.sinks.k2.hdfs.fileType = DataStream # 设置文件类型
a2.sinks.k2.hdfs.rollIntreval = 60 # 多久生成一个新文件 s
a2.sinks.k2.hdfs.rollSize = 134217700 # 文件滚动与Event数量无关 设置大小比一个hdfs块128MB稍小
a2.sinks.k2.hdfs.rollCount = 0 # 文件滚动与Event数量无关a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

案例4:


监控多个文件

spooldir source

通过此源,您可以通过将要摄取的文件放入磁盘上的“Spooling”目录中来摄取数据。该源将监视指定目录中的新文件,并从出现的新文件中解析事件。事件解析逻辑是可插入的。将给定文件完全读入通道后,将其重命名以指示完成(或选择删除)。

与Exec源不同,此源是可靠的,即使Flume重新启动或终止,它也不会丢失数据。为了获得这种可靠性,必须仅将不可变的唯一命名的文件放入Spooling目录中。Flume尝试检测这些问题情况,如果违反这些条件,将返回失败:

如果将文件放入Spooling目录后写入文件,Flume将在其日志文件中打印错误并停止处理。
如果以后再使用文件名,Flume将在其日志文件中打印错误并停止处理。
为避免上述问题,将唯一的标识符(例如时间戳)添加到日志文件名称(当它们移到Spooling目录中时)可能会很有用。

尽管有此来源的可靠性保证,但是在某些情况下,如果发生某些下游故障,则事件可能会重复。这与Flume其他组件提供的保证是一致的。

a2.sources = r2
a2.sinks = k2
a2.channels = c2a2.sources.r2.type = spooldir
a2.sources.r2.spoolDir = /opt/module/flume/upload a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://192.168.56.20:9000/flume/%Y%m%d/%Ha2.sinks.k2.hdfs.round = true # 按照时间滚动文件夹
a2.sinks.k2.hdfs.roundValue = 1 # 多长时间创建一个新文件夹
a2.sinks.k2.hdfs.roundUnit = hour # 重新定义时间单位a2.sinks.k2.hdfs.useLocalTimeStamp = true # 使用本地时间戳a2.sinks.k2.hdfs.batchSize = 1000 # 积攒多少个Event flush到hdfs一次
a2.sinks.k2.hdfs.fileType = DataStream # 设置文件类型
a2.sinks.k2.hdfs.rollIntreval = 60 # 多久生成一个新文件 s
a2.sinks.k2.hdfs.rollSize = 134217700 # 文件滚动与Event数量无关 设置大小比一个hdfs块128MB稍小
a2.sinks.k2.hdfs.rollCount = 0 # 文件滚动与Event数量无关a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

先上传 后改名为已读

但是不能动态监控变化的文件

案例5:


监控动态多文件

1.7版本 Talldir实现断点续传

在通过Flume收集日志的业务场景中,一般都会遇到下面的情况,在日志收集服务器的某个目录下,会按照一段时间生成一个日志文件,并且日志会不断的追加到这个文件中,比如,每小时一个命名规则为log_20151015_10.log的日志文件,所有10点产生的日志都会追加到这个文件中,到了11点,就会生成另一个log_20151015_11.log的文件。

这种场景如果通过flume(1.6)收集,当前提供的Spooling Directory Source和Exec Source均不能满足动态实时收集的需求,在当前正在开发的flume1.7版本中,提供了一个非常好用的TaildirSource,使用这个source,可以监控一个目录,并且使用正则表达式匹配该目录中的文件名进行实时收集。

a1.sources.r1.type = TAILDIRa1.sources.r1.filegroups = f1a1.sources.r1.filegroups.f1 = 第一个路径a1.sources.r1.positionFile = 路径

在这里插入图片描述
位置文件:
实现断点续传 json格式 inode(linux文件系统文件标识)记录了被监控文件位置信息
在这里插入图片描述


推荐阅读
  • Flume 开源分布式日志收集系统
    为什么80%的码农都做不了架构师?Flume--开源分布式日志收集系统Flume是Cloudera提供的一个高可用的、高可靠的开源分布式海量日志收集系统 ... [详细]
  • Android中高级面试必知必会,积累总结
    本文介绍了Android中高级面试的必知必会内容,并总结了相关经验。文章指出,如今的Android市场对开发人员的要求更高,需要更专业的人才。同时,文章还给出了针对Android岗位的职责和要求,并提供了简历突出的建议。 ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • 深入理解Kafka服务端请求队列中请求的处理
    本文深入分析了Kafka服务端请求队列中请求的处理过程,详细介绍了请求的封装和放入请求队列的过程,以及处理请求的线程池的创建和容量设置。通过场景分析、图示说明和源码分析,帮助读者更好地理解Kafka服务端的工作原理。 ... [详细]
  • 一次上线事故,30岁+的程序员踩坑经验之谈
    本文主要介绍了一位30岁+的程序员在一次上线事故中踩坑的经验之谈。文章提到了在双十一活动期间,作为一个在线医疗项目,他们进行了优惠折扣活动的升级改造。然而,在上线前的最后一天,由于大量数据请求,导致部分接口出现问题。作者通过部署两台opentsdb来解决问题,但读数据的opentsdb仍然经常假死。作者只能查询最近24小时的数据。这次事故给他带来了很多教训和经验。 ... [详细]
  • 2019我的金三银四
    先讲一下自己的情况吧,二本学生,17年毕业,目前在一家跨境电商从事Java技术开发工作(不是阿里,没那么厉害),技术栈目前偏向于容器云、持续集成持续交付这一块,也就是SpringBoot、Kuber ... [详细]
  • 博客_2018年博客总结
    本文由编程笔记#小编为大家整理,主要介绍了2018年博客总结相关的知识,希望对你有一定的参考价值。前言     ... [详细]
  • Java开发实战讲解!字节跳动三场技术面+HR面
    二、回顾整理阿里面试题基本就这样了,还有一些零星的问题想不起来了,答案也整理出来了。自我介绍JVM如何加载一个类的过程,双亲委派模型中有 ... [详细]
  • Java开发面试问题,2021网易Java高级面试题及答案,实战案例
    前言大厂面试真题向来都是各大求职者的最佳练兵场,而今天小编带来的便是“HUAWEI”面经!这是一次真实的面试经历,虽然不是我自己亲身经历 ... [详细]
  • 前言最近一段时间在整公司项目里一个功能的优化,用到了多线程处理。期间也是踩了不少的坑,在这里想说下我遇到的问题和注意事项。以及怎样知道启动的那些多线程都 ... [详细]
  • 如何使用最流行框架Tensorflow进行时序预测和时间序列分析
    一、看深度学习框架排名第一的TensorFlow如何进行时序预测摘要:2017年深度学习框架关注度排名tensorflow以绝对的优势占领榜首,本文通过一个小例子介绍了Tensor ... [详细]
  • 大数据的明天将驶向何方?
    http:www.infoq.comcnarticleswhere-will-big-data--tomorrow-sail-to大数据的明天将驶向何方?作者 36Kr 发布于20 ... [详细]
  • 搞懂 ELK 并不是一件特别难的事
    点击下方“民工哥技术之路”,选择“设为星标”回复“1024”获取独家整理的学习资料!本篇文章主要介绍ELK的一些框架组成,原理和实践&#x ... [详细]
  • CDH4简介
    原文地址:CDH4简介作者:HadoopChinaWebelievethatduring2012,enterprisedistributionsofHa ... [详细]
  • 本篇内容介绍了“web日志类型有哪些”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处 ... [详细]
author-avatar
手机用户2502907603
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有