amazonriver elasticsearch 实时同步, 逻辑复制 go 哈罗单车 开源

amazonriver

 

一、    简介

amazonriver 是一个将postgresql的增、删、改实时同步到es或kafka的服务(基于pg的逻辑复制功能),go语言实现,哈罗单车开源的一个项目。

git地址https://github.com/hellobike/amazonriver

也有java版本https://github.com/hellobike/tunnel(需依赖zookeeper)

版本支持

  • Postgresql 9.4 or later
  • Kafka 0.8 or later
  • ElasticSearch 5.x

 

amazonriver 利用pg内部的逻辑复制功能,通过在pg创建逻辑复制槽,接收数据库的逻辑变更,通过解析test_decoding特定格式的消息,得到逻辑数据,与es的通讯是发布订阅的方式,

效果要比触发器好的多,并且配置简易

 

 

安装

下载最新版本

https://github.com/hellobike/amazonriver/releases

 

 

压缩包中amazonriver是可执行文件,放到/opt/amazonriver目录下

 

二、    配置

amazonriver 配置

在该目录下新增一个配置文件 config.json

 

 

config.json配置内容为

{

    "pg_dump_path": "",

"subscribes": [{

       #历史数据导入的功能,测试发现无法使用

        "dump": false,

              #若有多个amazonriver实例,这里要配置成不一样的

        "slotName": "slot_for_es1",

        "pgConnConf": {

            #pg数据库配置

            "host": "10.168.4.91",

            "port": 5321,

            "database": "xzdssituation",

            "schema": "public",

            "user": "postgres",

            "password": "postgres"

        },

        "rules": [

            {

                #表名

                "table": "ds_command",

                 #表的主键

                "pks": ["command_id"],

                 #es中的主键

                "esid": ["command_id"],

                "index": "ds_command",

                 #固定为data, 禁止以下划线开头

                "type": "data"

            },

            {

                "table": "ds_alarm",

                "pks": ["command_id"],

                "esid": ["command_id"],

                "index": "ds_alarm",

                "type": "data"

            }

        ],

        "esConf": {

            #elasticsearch 地址

            "addrs": "http://10.168.4.60:9200/",

            "user": "",

            "password": ""

        },

         #失败重试0为不重试,-1会一直重试直到成功

        "retry": -1

}],

# 填":8080"会开启端口,供普罗米修斯监控,这里不配置

    "prometheus_address": ""

}

 

PostgreSQL 配置

1.修改/app/pgsql_data/postgresql.conf

wal_level = 'logical';
max_replication_slots = 5; #该值要大于1

修改后需要重启才能生效

2.创建有replication权限的用户

CREATE ROLE syn_rep LOGIN  ENCRYPTED PASSWORD 'postgres' REPLICATION;
GRANT CONNECT ON DATABASE xzdssituation to syn_rep;
COMMIT;

3.修改白名单配置

在 /app/pgsql_data/pg_hba.conf 中增加配置:

 host replication syn_rep all md5

修改后需要reload才能生效

4.重启pg

su – postgres

./stop-pgsql.sh

./start-pgsql.sh

三、      启停

先给amazonriver加可执行权限

chmod +x /opt/amazonriver/amazonriver

启动(指定配置 和 日志)

已写成脚本:/opt/amazonriver/start.sh

nohup /opt/amazonriver/amazonriver -config /opt/amazonriver/config.json -level debug > /opt/amazonriver/logfile.log 2>&1 & echo $! > /opt/amazonriver/pid

停止

已写成脚本:/opt/amazonriver/stop.sh

kill `cat /opt/amazonriver/pid`

四、          日志查看

tail -200f /opt/amazonriver/logfile.log

 

附录:

/app/pgsql/bin/pg_ctl 控制pg启停

pg_ctl start [-w] [-s] [-D datadir] [-l filename] [-o options] [-p path]
pg_ctl stop [-W] [-s] [-D datadir] [-m s[mart] | f[ast] | i[mmediate] ]
pg_ctl restart [-w] [-s] [-D datadir] [-m s[mart] | f[ast] | i[mmediate] ] [-o options]
pg_ctl reload [-s] [-D datadir]
pg_ctl status [-D datadir]
pg_ctl kill [signal_name] [process_id]
pg_ctl register [-N servicename] [-U username] [-P password] [-D datadir] [-w] [-o options]
pg_ctl unregister [-N servicename]