如何实现MySQL实时同步到PostgreSQL数据库
Contents
0 项目背景说明
公司有一项目,采用MySQL 5.7.39作为业务数据库。项目主管部门有一套数据汇总系统,要求我们把部分业务表数据实时推送到汇总系统。他们提供一套前置的瀚高PostgreSQL数据库出来,让我们把MySQL中的对应业务表推送到该数据库中。同时,要求我们把数据推送到目标PostgreSQL数据库之后,如果源端的MySQL数据库有插入操作,则需要在目标PostgreSQL数据库表中通过添加4个字段记录下来:I_ID 自增,I_STATE = 1,I_FLAG = I,I_TIME 表示记录插入时间。
经过分析调研测试验证,在网上找到有网友在Kubernetes里通过Debezium and KafkaConnect来说实现过,这个场景对我们来说不太贴合,大概了解一下原理,就略过了。链接:Sync MySQL to PostgreSQL Using Debezium and KafkaConnect
同时,看到有网友通过轻量级 CDC debezium-server-databend 构建实时数据同步,简单测试了下,该网友实现了把MySQL同步到databend数据库里,没有直接落库到PostgreSQL数据库。有开发同事,对该项目进行了简单的二开,有实现了部分需求。暂时没有足够的时间和精力继续二开,就地搁置。https://zhuanlan.zhihu.com/p/647470177?utm_id=0
网上有Tapdata工具可以实现该场景,但是需要联系厂家申请试用,他们的人有联系我。但是,我隐隐约约感觉到对方可能会收费。https://bbs.huaweicloud.com/blogs/320806
最后,找到一款go-mysql-postgres的开源工具,经过测试验证,以及在目标端的PostgreSQL数据库表上创建触发器,可以初步实现该功能。
核心操作流程:直接在源端的MySQL数据库服务器上执行操作:安装配置go-mysql-postgres工具,用于读取本地的MySQL数据库的binlog,然后把数据写入到目标端的PostgreSQL数据库。
而go-mysql-postgres工具的主要思路是,源端的MySQL数据库开启binlog,且格式要求是row,源端MySQL和目标端PostgreSQL中的表必须都得有主键。通过mysqldump工具把MySQL中要同步的表备份出来,记录下binlog position,用于后面从哪儿开始读取binlog进行同步。把备份的建表语句改成兼容PostgreSQL的SQL,在PostgreSQL中执行建表,并且把数据执行插入到PostgreSQL数据库里。准备就绪之后,开启go-mysql-postgres程序来从记录下的binlog position位置处开始执行同步。
本文档用于记录如何采用go-mysql-postgres和触发器来实现MySQL数据实时同步到PostgreSQL,并且在目标库表上增加4个标记位字段的操作步骤和详细流程。
1 安装PostgreSQL 15.0
在源端MySQL数据库服务器上执行安装,目的是通过该PostgreSQL安装的psql客户端工具可以连接到目标端的PostgreSQL数据库。
groupadd -g 1688 postgres useradd -g postgres -u 1688 postgres echo 'D1ng0D1888!'|passwd --stdin docker mkdir -p /data/postgres/15.0/ chown -R postgres:postgres /data/postgres/ cp postgresql-15.0.tar.gz /home/postgres/ ##解压,编译安装 su - postgres tar -zxvf postgresql-15.0.tar.gz cd postgresql-15.0/ ./configure --prefix=/data/postgres/15.0 gmake world && gmake install-world ###修改~/.bashrc [postgres@dbserver ~]$ cat .bashrc # Source default setting [ -f /etc/bashrc ] && . /etc/bashrc # User environment PATH PATH="$HOME/.local/bin:$HOME/bin:$PATH:/data/postgres/15.0/bin/" export PATH export PGPASSWORD=123456Aa* [postgres@dbserver ~]$ ##使之生效 source ~/.bashrc #连接目标端瀚高数据库,这里隐去对应的数据库IP,端口,账户名和密码等信息 [postgres@dbserver ~]$ psql -h pg_ip -p port -U user_name -d db_name NOTICE: ------------------------------------------- Login User: user_name Login time: 2024-10-14 16:01:21.042908+08 Login Address: pg_ip Last Login Status: SUCCESS Login Failures: 0 Valied Until: infinity ------------------------------------------- psql (15.0, server 12.7) Type "help" for help. db_name=>
2 下载安装go-mysql-postgres
同样在源端MySQL数据库服务器上执行安装,将来直接在数据库服务器上启动该程序,读取MySQL的binlog,并把数据推送到目标端的PostgreSQL数据库中去。
#下载安装golang,因为go-mysql-postges是用golang写的,所以需要先安装配置golang环境,才可以编译go-mysql-postgres程序 wget https://dl.google.com/go/go1.21.3.linux-amd64.tar.gz # 解压到/usr/local路径下 tar -C /usr/local -xzf go1.21.3.linux-amd64.tar.gz vi /etc/profile #添加下述配置项 export PATH=$PATH:/usr/local/go/bin export GOROOT=/usr/local/go export GOPATH=$HOME/Documents/go #读取golang配置,使之生效 source /etc/profile #下载编译go-mysql-postgres git clone https://gitee.com/tangjunhu/go-mysql-postgres.git cd go-mysql-postgres/ make [root@dbserver ~]# go version go version go1.21.3 linux/amd64 [root@dbserver ~]# cd go-mysql-postgres/ [root@dbserver go-mysql-postgres]# ll total 60 -rw-r--r-- 1 root root 199 Oct 11 10:45 build.sh -rw-r--r-- 1 root root 425 Oct 11 10:45 clear_vendor.sh drwxr-xr-x 3 root root 4096 Oct 11 10:45 cmd -rw-r--r-- 1 root root 455 Oct 11 10:45 Dockerfile drwxr-xr-x 2 root root 4096 Oct 11 10:45 elastic drwxr-xr-x 2 root root 4096 Oct 11 10:45 etc -rw-r--r-- 1 root root 1046 Oct 11 10:45 go.mod -rw-r--r-- 1 root root 5088 Oct 11 10:45 go.sum -rw-r--r-- 1 root root 1076 Oct 11 10:45 LICENSE -rw-r--r-- 1 root root 255 Oct 11 10:45 Makefile -rw-r--r-- 1 root root 3399 Oct 11 10:45 README.md drwxr-xr-x 2 root root 4096 Oct 11 10:45 river drwxr-xr-x 2 root root 4096 Oct 11 10:45 util drwxr-xr-x 3 root root 4096 Oct 11 10:45 vendor [root@dbserver go-mysql-postgres]# make GO111MODULE=on go build -o bin/go-mysql-postgresql-mw ./cmd/go-mysql-elasticsearch cat build.sh >build chmod a+x build [root@dbserver go-mysql-postgres]#
3 下载mysql2pgsql工具脚本
该工具脚本的主要作用是可以把mysqldump导出的含有建表语句的SQL和插入SQL转换成兼容PostgreSQL的SQL。其中,会自动把MySQL的自增主键id转换为PostgreSQL的serial序列自增类型字段。缺点是,在建表语句里没有把MySQL建表语句中的USING BTREE关键字去掉,这个需要手工处理一下。
https://github.com/ahammond/mysql2pgsql/blob/master/mysql2pgsql.pl
4 导出MySQL表结构和表数据
mkdir -p /tmp/mysql2pg/ cd /tmp/mysql2pg/ mysqldump -h 127.0.0.1 -P端口 -uroot -proot密码 数据库名 --master-data=2 t_policies > t_policies.mysql
5 转换为PostgreSQL的表结构和数据
cd /tmp/mysql2pg/ /root/mysql2pgsql.pl t_policies.mysql t_policies.pg #去掉建表语句中的USING BTREE sed -i 's/USING BTREE/ /' /tmp/mysql2pg/t_policies.pg
6 导入PostgreSQL数据库:
su - postgres psql -h -xxxx #连接到目标PostgreSQL数据库,执行SQL建表语句和插入数据 yth_ta_qlk=> \i /tmp/mysql2pg/t_policies.pg psql:/tmp/mysql2pg/t_policies.pg:40: ERROR: table "t_policies" does not exist CREATE TABLE INSERT 0 53 COMMENT COMMENT COMMENT COMMENT COMMENT COMMENT COMMENT COMMENT COMMENT COMMENT COMMENT COMMENT COMMENT COMMENT COMMENT COMMENT yth_ta_qlk=>
7 修改go-mysql-postgres配置文件
该文件放在/root/go-mysql-postgres路径下,名为river.toml,可以参考/root/go-mysql-postgres/etc/river.toml实例文件进行配置。核心配置内容如下:
# MySQL data source [[source]] schema = "mysql_db_name" #MySQL源库数据库名 tables = ["t_news","t_policies"] #待同步的MySQL表名,如果有多个表的话,以逗号分隔,列出多个要同步的表名即可 # 目标PG的连接配置 [[target]] pg_name = "159.xxx_yth_ta_qlk" #指定一个目标PostgreSQL数据库的连接标识符 pg_host = "pg_ip" #目标PostgreSQL数据库的IP pg_port = pg_port #目标PostgreSQL数据库的端口 pg_user = "pg_useranme" #目标PostgreSQL数据库的用户名 pg_pass = "pg_password" #目标PostgreSQL数据库的密码 pg_dbname = "pg_dbname" #目标PostgreSQL数据库的数据库名 # MySQL 数据到 PG 后的分发规则 [[rule]] #mysql 库表的配置 schema = "mysql_db_name" #源端MySQL数据库名, table = "t_news" #源端待同步的表名 # pg 库表的配置 pg_schema = "tas_fgw_zxqyrzzhxyfwpt" #目标端pg schema pg_table = "t_news" #目标端表名 [[rule]] #mysql 库表的配置 schema = "mysql_db_name" #源端MySQL数据库名, table = "t_policies" #源端MySQL数据库名, # pg 库表的配置 pg_schema = "tas_fgw_zxqyrzzhxyfwpt" #目标端pg schema pg_table = "t_policies" #目标端表名
新建/root/go-mysql-postgres/var/master.info文件,该路径和文件默认不存在,需要手工创建出来,其内容如下:bin_name执行MySQL数据库当前的binary log日志文件名,bin_pos指向当前的binary log position位置处。
[root@dbserver go-mysql-postgres]# cat var/master.info bin_name = "binlog.000001" bin_pos = 17466296 [root@dbserver go-mysql-postgres]#
可以通过在MySQL命令行里执行show master status或者show binary logs来获取:
mysql> show master status; +---------------+----------+--------------+------------------+-------------------+ | File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set | +---------------+----------+--------------+------------------+-------------------+ | binlog.000001 | 17466296 | | | | +---------------+----------+--------------+------------------+-------------------+ 1 row in set (0.00 sec) mysql> show binary logs; +---------------+-----------+ | Log_name | File_size | +---------------+-----------+ | binlog.000001 | 17466296 | +---------------+-----------+ 1 row in set (0.00 sec) mysql>
这里的bin_pos至少要从第一次dump MySQL表时的位置处开始,在第4步骤的导出命令里加了–master-data=2选项,可以从导出文件中获取该信息。一旦同步开启之后,go-mysql-postgres会自动更新该文件信息。
8 启动go-mysql-postgres
cd /root/go-mysql-postgres nohup ./bin/go-mysql-postgresql-mw -config=river.toml > /root/go-mysql-postgres/reps.log 2 >&1 & tail -f reps.log [root@dbserver go-mysql-postgres]# tail -f reps.log [2024/10/14 16:54:53] [info] master.go:110 save position [binlog.000001,17434752] [2024/10/14 16:54:54] [info] master.go:110 save position [binlog.000001,17434752] [2024/10/14 16:54:55] [info] master.go:110 save position [binlog.000001,17434752] [2024/10/14 16:54:56] [info] master.go:110 save position [binlog.000001,17434752] [2024/10/14 16:54:57] [info] master.go:110 save position [binlog.000001,17434752] [2024/10/14 16:54:58] [info] master.go:110 save position [binlog.000001,17434752] [2024/10/14 16:54:59] [info] master.go:110 save position [binlog.000001,17434752] [2024/10/14 16:55:00] [info] master.go:110 save position [binlog.000001,17434752] [2024/10/14 16:55:01] [info] master.go:110 save position [binlog.000001,17434752] [2024/10/14 16:55:02] [info] master.go:110 save position [binlog.000001,17434752] [2024/10/14 16:55:03] [info] master.go:110 save position [binlog.000001,17434752] [2024/10/14 16:55:04] [info] master.go:110 save position [binlog.000001,17434752] .... #查看帮助 [root@dbserver go-mysql-postgres]# ./bin/go-mysql-postgresql-mw -h Usage of ./bin/go-mysql-postgresql-mw: -config string go-mysql-postgres config file (default "./etc/river.toml") -data_dir string path for go-mysql-elasticsearch to save data -es_addr string Elasticsearch addr -exec string mysqldump execution path -flavor string flavor: mysql or mariadb -log_level string log level (default "info") -max_procs int GOMAXPROCS -my_addr string MySQL addr -my_pass string MySQL password -my_user string MySQL user -server_id int MySQL server id, as a pseudo slave [root@dbserver go-mysql-postgres]#
9 PostgreSQL表添加字段和触发器
alter table t_policies add column i_state varchar(2); alter table t_policies add column i_flag varchar(2); alter table t_policies add column i_time timestamp; alter table t_policies add column i_id serial; comment on table t_policies is '政策法规信息'; --insert create or replace function t_policies_insert_state_flag_time() returns trigger as $$ begin new.i_state = 1; new.i_flag ='I'; new.i_time = current_timestamp; return new; end $$ language plpgsql; create trigger t_policies_auto_insert_state_flag_time before insert on t_policies for each row execute procedure t_policies_insert_state_flag_time();
10 测试验证
--MySQL插入数据 insert into t_policies(id,title,sort) values(9999,'测试',9999); --pg查看数据 select id,title,sort,i_id,i_state,i_flag,i_time from t_policies; --看到插入数据 9999 | 测试 | 9999 | 54 | 1 | I | 2024-10-12 11:25:23.947781 --MySQL删除测试数据 delete from t_policies where id=9999;
11 参考链接:
https://blog.csdn.net/Maslii/article/details/104762949
https://blog.csdn.net/qq_26373925/article/details/110506835
https://gitee.com/tangjunhu/go-mysql-postgres
https://github.com/ahammond/mysql2pgsql/blob/master/mysql2pgsql.pl
https://cloud.tencent.com/developer/article/1506974
https://cloud.tencent.com/developer/article/1506977?from_column=20421&from=20421