2019独角兽企业重金招聘Python工程师标准>>>
插件安装、卸载、更新的命令。
官网:https://www.elastic.co/guide/en/logstash/current/working-with-plugins.html#installing-local-plugins
1、查询已安装的插件:
Logstash发行包捆绑常见的插件,以便您可以开箱即用。可以查询当前可用的插件:
bin/logstash-plugin list 将列出所有安装的插件
bin/logstash-plugin list --verbose 将列出已安装的插件与版本信息
bin/logstash-plugin list '*namefragment*' 将列出所有安装的包含namefragment的插件
bin/logstash-plugin list --group output 将列出特定组的所有安装的插件(输入,过滤器,编解码器,输出)
2、安装插件:
线上安装插件:
bin/logstash-plugin install logstash-output-kafka
线下安装插件:
bin/logstash-plugin install /path/to/logstash-output-kafka-1.0.0.gem
3、更新插件:
bin/logstash-plugin update 更新所有已经安装的插件
bin/logstash-plugin update logstash-output-kafka 更新插件logstash-output-kafka
4、移除插件:
bin/logstash-plugin remove logstash-output-kafka 二、实现mysql数据同步。
1、安装相关插件:
input我这里使用logstash-input-jdbc插件,output使用logstash-output-jdbc插件。因为logstash自带了logstash-input-jdbc插件。所以这里只安装logstash-output-jdbc。
我这里使用线上安装:
bin/logstash-plugin install logstash-output-jdbc
1、下载插件。下载地址:https://github.com/theangryangel/logstash-output-jdbc。
2、线下安装,参考下面2个博客:
http://blog.csdn.net/yeyuma/article/details/50240595
https://my.oschina.net/MrYx3en/blog/508230
2、配置文件:
input {stdin { } jdbc {jdbc_driver_library => "G:/MvnRepository/mysql/mysql-connector-java/5.1.41/mysql-connector-java-5.1.41.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://localhost:3306/xnc"jdbc_user => "root"jdbc_password => "123456"# or jdbc_password_filepath => "/path/to/my/password_file"# where p.update_time >= :sql_last_startstatement => "SELECT id, product_spec_id,zone_id,recorded_by,CAST(price_per_unit AS CHAR) price_per_unit,uom,latest,recording_date,create_time,update_time from price p"jdbc_paging_enabled => "true"jdbc_page_size => "50000"}
}
output {#标准输出。为了测试stdout { codec => rubydebug }jdbc {driver_class => "com.mysql.jdbc.Driver"driver_jar_path => "G:/MvnRepository/mysql/mysql-connector-java/5.1.41/mysql-connector-java-5.1.41.jar"connection_string => "jdbc:mysql://localhost:3306/xncprice"username => "root"password => "123456"statement => [ "INSERT INTO price (id, product_spec_id, zone_id,recorded_by,price_per_unit,uom,latest,recording_date,create_time,update_time) VALUES(?,?,?,?,CAST(? AS decimal),?,?,?,?,?)", "id", "product_spec_id", "zone_id", "recorded_by","price_per_unit", "uom" , "latest" , "recording_date" , "create_time" , "update_time" ]}}
3、运行。
logstash -f G:\logstash-5.5.1\config\my-logstash.conf
这里需要注意数据库类型。因为我的price_per_unit是decimal类型的,在运行之后,price_per_unit插入的数据一直是null,改了目标类型为double还是不行。解决方式有2个:
- 改源类型(input中price_per_unit的数据类型)为double。
- 查询的时候把源的decimal转换为char类型,插入目标的时候再转为decimal。(我使用这个)。
4、同步update操作。
把output使用REPLACE 语句,如果数据id重复时候,直接用后面的数据替换前面的数据。
output {#标准输出。为了测试stdout { codec => rubydebug }jdbc {driver_class => "com.mysql.jdbc.Driver"driver_jar_path => "G:/MvnRepository/mysql/mysql-connector-java/5.1.41/mysql-connector-java-5.1.41.jar"connection_string => "jdbc:mysql://localhost:3306/xncprice"username => "root"password => "123456"statement => [ "REPLACE INTO price (id, product_spec_id, zone_id,recorded_by,price_per_unit,uom,latest,recording_date,create_time,update_time) VALUES(?,?,?,?,CAST(? AS decimal),?,?,?,?,?)", "id", "product_spec_id", "zone_id", "recorded_by","price_per_unit", "uom" , "latest" , "recording_date" , "create_time" , "update_time" ]}