热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

用logstash作数据的聚合统计

用logstash作数据的聚合统计以spark-streaming处理消费数据,统计日志经sparksql存储在mysql中日志写入方式为appendvalwordsDataFram

用logstash 作数据的聚合统计

以spark-streaming 处理消费数据,统计日志经spark sql存储在mysql中

日志写入方式为append
val wordsDataFrame = rdd.toDF("supplier", "type", "domain", "pdate", "count", "idate")
wordsDataFrame
.write
.format("jdbc")
.mode("append").options(mysql_conf)
.save()
因为spark-streaming是批处理服务,这些日志只是中间结果,会有大量小批次的统计信息
表名statistics_temp
id(自增id) domain idate count
1 www.baidu.com 2018-06-13 1
2 www.baidu.com 2018-06-13 2
3 www.taobao.com 2018-06-13 3

求每日每domain的数据还需再一次作聚合

原本的构想是通过logstash将日志导入es,elk方案,kibana直接划分时间间隔聚合出结果展示

现在有个需求是要存入每日的量级入一张表,直接在mysql展示

不考虑从kibana聚合再写入es,statistics_temp数据量较大,每次查询都从statistics_temp用聚合显然不可行

因此需要由这张临时表,导出完真正的有效表statistics,这种需求很常见,通常是记录时间点,定时执行,读取时间点后的数据,统计再upsert

导出表名statistics(domain+idate 配置联合唯一索引,DUPLICATE才生效)
id(自增id) domain idate count
1 www.baidu.com 2018-06-13 3
3 www.taobao.com 2018-06-13 3

最近作数据流程设计搭建,不想写这种代码,因为对logstash比较熟悉,因此便想试试logstash的方案,最终实验可行

环境
logstash 5.5
mysql 5.7

聚合 需要官方插件
https://www.elastic.co/guide/en/logstash/current/plugins-filters-aggregate.html#plugins-filters-aggregate-example2
写入 mysql需要第三方插件
https://github.com/theangryangel/logstash-output-jdbc

1安装插件
logstash5.5 默认不含logstash-filters-aggregate 需单独安装,其他版本未知
logstash-plugins install logstash-filters-aggregate
第三方插件,默认不含,需单独安装
logstash-plugins install logstash-output-jdbc

2执行
由于jdbc_fetch_size参数不生效,因此会拿出大量数据(mysql-connector-java-5.1.44-bin.jar,默认的数据量)需要调整堆大小,避免oom

export LS_JAVA_OPTS="-Xms4g -Xmx8g"
logstash -f /logstash/statistic.conf

重点是/logstash/statistic.conf的内容

input {
jdbc {
jdbc_driver_library => "/logstash/mysql-connector-java-5.1.44-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://host:3306/db1"
jdbc_user => "root"
jdbc_password => "root"
schedule => "* * * * *"
use_column_value => true
tracking_column => "id"
jdbc_fetch_size => 2000
tracking_column_type => "numeric"
statement => "SELECT * from statistics WHERE id > :sql_last_value"
clean_run => false
record_last_run => true
last_run_metadata_path => "/home/logstash/.statistic"
}
}

filter {
aggregate {
task_id => "%{domain}_%{idate}"
code => "
map['domain'] = event.get('domain')
map['idate'] = event.get('idate')
map['count'] ||= 0
map['count'] += event.get('count')
event.cancel()
"
push_previous_map_as_event => true
}
}

output {
stdout{
codec=>rubydebug{}
}
jdbc {
driver_jar_path => "/logstash/mysql-connector-java-5.1.44-bin.jar"
driver_class => "com.mysql.jdbc.Driver"
connection_string => "jdbc:mysql://host:3306/db1?user=root&password=root"
statement => [ "INSERT INTO `bbs`.`statistics2` (`domain`,`idate`,`count`) VALUES(?,?,?) ON DUPLICATE KEY UPDATE `count` = `count` + ?;", "domain","idate","count","count" ]
}
}


tracking_column => "id"
tracking_column_type => "numeric"
statement => "SELECT * from statistics WHERE id > :sql_last_value"
last_run_metadata_path => "/home/logstash/.statistic"

以上四个参数,配置记录点的信息

该例子为自增id numeric类型
记录文件示例
bash-4.3# cat /home/logstash/.statistic
--- 127986843

若是时间类型
tracking_column => "updated_on"
tracking_column_type => "timestamp"
statement => "SELECT * from user WHERE updated_on > :sql_last_value"
last_run_metadata_path => "/home/logstash/.statistic"

记录文件示例
bash-4.3# cat /home/logstash/.statistic
--- 2018-06-13 10:57:59.000000000 +00:00

可以通过,停logstash服务,改记录文件,再启动logstash服务,来重新聚合统计某记录点后的数据。


示例
count 对比
statistics_temp:
mysql> select count(count),idate from statistics_temp where domain='www.abc.com' and idate>'2018-06-15' group by idate;
+--------------+------------+
| count(count) | idate |
+--------------+------------+
| 15290 | 2018-06-16 |
| 27176 | 2018-06-17 |
| 18997 | 2018-06-18 |
| 21785 | 2018-06-19 |
+--------------+------------+
statistics:
mysql> select count(count),idate from statistics where domain='www.abc.com' and idate>'2018-06-15' group by idate;
+--------------+------------+
| count(count) | idate |
+--------------+------------+
| 532 | 2018-06-16 |
| 533 | 2018-06-17 |
| 534 | 2018-06-18 |
| 535 | 2018-06-19 |
+--------------+------------+

sum 对比
statistics_temp
mysql> select sum(count),idate from statistics_temp where domain='www.abc.com' and idate>'2018-06-15' group by idate;
+------------+------------+
| sum(count) | idate |
+------------+------------+
| 390499 | 2018-06-16 |
| 705807 | 2018-06-17 |
| 462147 | 2018-06-18 |
| 600657 | 2018-06-19 |
+------------+------------+
statistics
mysql> select sum(count),idate from statistics where domain='www.abc.com' and idate>'2018-06-15' group by idate;
+------------+------------+
| sum(count) | idate |
+------------+------------+
| 390499 | 2018-06-16 |
| 705807 | 2018-06-17 |
| 462147 | 2018-06-18 |
| 600657 | 2018-06-19 |
+------------+------------+

相比statistics_temp,statistics count减少,sum一致,聚合成功


推荐阅读
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • 【shell】网络处理:判断IP是否在网段、两个ip是否同网段、IP地址范围、网段包含关系
    本文介绍了使用shell脚本判断IP是否在同一网段、判断IP地址是否在某个范围内、计算IP地址范围、判断网段之间的包含关系的方法和原理。通过对IP和掩码进行与计算,可以判断两个IP是否在同一网段。同时,还提供了一段用于验证IP地址的正则表达式和判断特殊IP地址的方法。 ... [详细]
  • 本文讨论了如何使用GStreamer来删除H264格式视频文件中的中间部分,而不需要进行重编码。作者提出了使用gst_element_seek(...)函数来实现这个目标的思路,并提到遇到了一个解决不了的BUG。文章还列举了8个解决方案,希望能够得到更好的思路。 ... [详细]
  • 本文介绍了关于Java异常的八大常见问题,包括异常管理的最佳做法、在try块中定义的变量不能用于catch或finally的原因以及为什么Double.parseDouble(null)和Integer.parseInt(null)会抛出不同的异常。同时指出这些问题是由于不同的开发人员开发所导致的,不值得过多思考。 ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • 纠正网上的错误:自定义一个类叫java.lang.System/String的方法
    本文纠正了网上关于自定义一个类叫java.lang.System/String的错误答案,并详细解释了为什么这种方法是错误的。作者指出,虽然双亲委托机制确实可以阻止自定义的System类被加载,但通过自定义一个特殊的类加载器,可以绕过双亲委托机制,达到自定义System类的目的。作者呼吁读者对网上的内容持怀疑态度,并带着问题来阅读文章。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • Vagrant虚拟化工具的安装和使用教程
    本文介绍了Vagrant虚拟化工具的安装和使用教程。首先介绍了安装virtualBox和Vagrant的步骤。然后详细说明了Vagrant的安装和使用方法,包括如何检查安装是否成功。最后介绍了下载虚拟机镜像的步骤,以及Vagrant镜像网站的相关信息。 ... [详细]
  • ***byte(字节)根据长度转成kb(千字节)和mb(兆字节)**parambytes*return*publicstaticStringbytes2kb(longbytes){ ... [详细]
  • 本文介绍了SparkStreaming微批量处理的方法与技巧,包括参考文章spark_streaming_微批量处理Spark流的内容。通过本文的阅读,读者可以了解到在SparkStreaming中如何进行微批量处理,并掌握相关的方法和技巧。阅读本文可以帮助读者更好地理解和应用SparkStreaming的微批量处理功能。 ... [详细]
  • 本文介绍了在Cpp中将字符串形式的数值转换为int或float等数值类型的方法,主要使用了strtol、strtod和strtoul函数。这些函数可以将以null结尾的字符串转换为long int、double或unsigned long类型的数值,且支持任意进制的字符串转换。相比之下,atoi函数只能转换十进制数值且没有错误返回。 ... [详细]
  • 初探PLC 的ST 语言转换成C++ 的方法
    自动控制软件绕不开ST(StructureText)语言。它是IEC61131-3标准中唯一的一个高级语言。目前,大多数PLC产品支持ST ... [详细]
  • 点击上方“新机器视觉”,选择加”星标”或“置顶”重磅干货,第一时间送达很早就想总结一下前段时间学习HALCON的心得,但由于其他的事情总是抽不出时间。去年有过一段时间的集中学习,做 ... [详细]
author-avatar
歪果仁
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有