热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

flow的采集与分析vflow项目分析和解读

前言vflow这个开源项目,是我一开始做flow这块的最开始参考的项目。虽然到最后,我们没有使用这个项目,但是该项目是一个企业级的处理netflow等协议的项目,有许多地方值得去研

前言

vflow这个开源项目,是我一开始做flow这块的最开始参考的项目。虽然到最后,我们没有使用这个项目,但是该项目是一个企业级的处理netflow等协议的项目,有许多地方值得去研究。
《flow的采集与分析---vflow项目分析和解读》

特性

  • PFIX RFC7011 collector
  • sFLow v5 raw header packet collector
  • Netflow v9 collector
  • Decoding sFlow raw header L2/L3/L4
  • Produce to Apache Kafka, NSQ, NATS
  • Replicate IPFIX to 3rd party collector
  • Supports IPv4 and IPv6
  • Monitoring with InfluxDB and OpenTSDB backend

架构

《flow的采集与分析---vflow项目分析和解读》

Dynamic pool

提供了work pool,而且wokrer数量是根据buff堆积消息数量动态调整的,这点之前文章介绍过实现方式。

插件架构

vFlow 支持 Kafka,NSQ,NATS三种消息队列,但是你可以很容易地将消息写到其他的消息队列,比如 RabbitMQ。

package producer
import (
"log"
"sync"
)
// Producer represents messaging queue
type Producer struct {
MQ MQueue
MQConfigFile string
MQErrorCount *uint64
Topic string
Chan chan []byte
Logger *log.Logger
}
// MQueue represents messaging queue methods
type MQueue interface {
setup(string, *log.Logger) error
inputMsg(string, chan []byte, *uint64)
}
// NewProducer constructs new Messaging Queue
func NewProducer(mqName string) *Producer {
var mqRegistered = map[string]MQueue{
"kafka": new(Kafka),
"nsq": new(NSQ),
"nats": new(NATS),
"rawSocket": new(RawSocket),
}
return &Producer{
MQ: mqRegistered[mqName],
}
}
// Run configs and tries to be ready to produce
func (p *Producer) Run() error {
var (
wg sync.WaitGroup
err error
)
err = p.MQ.setup(p.MQConfigFile, p.Logger)
if err != nil {
return err
}
wg.Add(1)
go func() {
defer wg.Done()
topic := p.Topic
p.MQ.inputMsg(topic, p.Chan, p.MQErrorCount)
}()
wg.Wait()
return nil
}
// Shutdown stops the producer
func (p *Producer) Shutdown() {
close(p.Chan)
}

提供了接口,如果扩展到其他的消息队列,实现接口即可。

扩展支持更多的协议

vflow暂时支持sflow和netflow协议,对于cflow和netstream,我们可以扩展实现。我已经扩展了netstream的模块。在vflow.go中,加入指定的flow worker。如下:

//版本太老,不再支持
netflow5 := NewNetFlowV5()
netstream5 := NewNetStreamV5()
// 主要支持四大协议
sFlow := NewSFlow()
ipfix := NewIPFIX()
netflow9 := NewNetflowV9()
netstream9 := NewNetStreamV9()
protos[0] = sFlow
protos[1] = ipfix
protos[2] = netflow9
protos[3] = netstream9
protos[4] = netflow5
protos[5] = netstream5
for _, p := range protos {
wg.Add(1)
go func(p proto) {
defer wg.Done()
p.run()
}(p)
}

支持监控

在monitor文件夹下提供了监控相关,目前支持metrics导入到influxdb和opentsdb。
提供了完善的metrics指标。
《flow的采集与分析---vflow项目分析和解读》

硬件要求

LoadIPFIX PPSCPURAM
low<1K2-464M
moderate<10K8+256M
high<50K12+512M
x-high<100K24+1G

总结

1)udp的负载均衡
这里nginx就支持udp协议的代理。
2)关于最终存放flow采集以后的数据,vflow提供了clickhouse和memsql以及spark三种思路。
3)当然我们之所以没有采用vflow,是因为我们编写了telegraf的flow input插件,可以充分利用telegraf提供的各种输出插件以及过滤器等。


推荐阅读
  • 数组的排序:数组本身有Arrays类中的sort()方法,这里写几种常见的排序方法。(1)冒泡排序法publicstaticvoidmain(String[]args ... [详细]
  • Go Cobra命令行工具入门教程
    本文介绍了Go语言实现的命令行工具Cobra的基本概念、安装方法和入门实践。Cobra被广泛应用于各种项目中,如Kubernetes、Hugo和Github CLI等。通过使用Cobra,我们可以快速创建命令行工具,适用于写测试脚本和各种服务的Admin CLI。文章还通过一个简单的demo演示了Cobra的使用方法。 ... [详细]
  • (三)多表代码生成的实现方法
    本文介绍了一种实现多表代码生成的方法,使用了java代码和org.jeecg框架中的相关类和接口。通过设置主表配置,可以生成父子表的数据模型。 ... [详细]
  • 认真一点学 Go:18. 并发
    收录于《Go基础系列》,作者:潇洒哥老苗。>>原文链接学到什么并发与并行的区别?什么是Goroutine?什么是通道?Goroutine如何通信?相关函数的使用?sel ... [详细]
  • 按照之前我对map的理解,map中的数据应该是有序二叉树的存储顺序,正常的遍历也应该是有序的遍历和输出,但实际试了一下,却发现并非如此,网上查了下,发现从Go1开始,遍历的起始节点就是随机了,当然随机 ... [详细]
  • golang 解析磁力链为 torrent 相关的信息
    其实通过http请求已经获得了种子的信息了,但是传播存储种子好像是违法的,所以就存储些描述信息吧。之前python跑的太慢了。这个go并发不知道写的有没有问题?!packag ... [详细]
  • 本文介绍了解决Netty拆包粘包问题的一种方法——使用特殊结束符。在通讯过程中,客户端和服务器协商定义一个特殊的分隔符号,只要没有发送分隔符号,就代表一条数据没有结束。文章还提供了服务端的示例代码。 ... [详细]
  • 本文介绍了django中视图函数的使用方法,包括如何接收Web请求并返回Web响应,以及如何处理GET请求和POST请求。同时还介绍了urls.py和views.py文件的配置方式。 ... [详细]
  • 本文介绍了如何使用Express App提供静态文件,同时提到了一些不需要使用的文件,如package.json和/.ssh/known_hosts,并解释了为什么app.get('*')无法捕获所有请求以及为什么app.use(express.static(__dirname))可能会提供不需要的文件。 ... [详细]
  • Java自带的观察者模式及实现方法详解
    本文介绍了Java自带的观察者模式,包括Observer和Observable对象的定义和使用方法。通过添加观察者和设置内部标志位,当被观察者中的事件发生变化时,通知观察者对象并执行相应的操作。实现观察者模式非常简单,只需继承Observable类和实现Observer接口即可。详情请参考Java官方api文档。 ... [详细]
  • WhenIusepythontoapplythepymysqlmoduletoaddafieldtoatableinthemysqldatabase,itdo ... [详细]
  • Android系统源码分析Zygote和SystemServer启动过程详解
    本文详细解析了Android系统源码中Zygote和SystemServer的启动过程。首先介绍了系统framework层启动的内容,帮助理解四大组件的启动和管理过程。接着介绍了AMS、PMS等系统服务的作用和调用方式。然后详细分析了Zygote的启动过程,解释了Zygote在Android启动过程中的决定作用。最后通过时序图展示了整个过程。 ... [详细]
  • 本文介绍了RxJava在Android开发中的广泛应用以及其在事件总线(Event Bus)实现中的使用方法。RxJava是一种基于观察者模式的异步java库,可以提高开发效率、降低维护成本。通过RxJava,开发者可以实现事件的异步处理和链式操作。对于已经具备RxJava基础的开发者来说,本文将详细介绍如何利用RxJava实现事件总线,并提供了使用建议。 ... [详细]
  • ConsumerConfiguration在kafka0.9使用JavaConsumer替代了老版本的scalaConsumer。新版的配置如下:bootstrap. ... [详细]
  • druid接入kafka indexing service整个流程
    先介绍下我们的druid集群配置Overload1台Coordinator1台Middlemanager3台Broker3台Historical一共12台,其中cold6台,hot ... [详细]
author-avatar
神奇伟哥
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有