Linux,  MySQL,  PostgreSQL

如何实现MySQL实时同步到PostgreSQL数据库

0 项目背景说明

公司有一项目,采用MySQL 5.7.39作为业务数据库。项目主管部门有一套数据汇总系统,要求我们把部分业务表数据实时推送到汇总系统。他们提供一套前置的瀚高PostgreSQL数据库出来,让我们把MySQL中的对应业务表推送到该数据库中。同时,要求我们把数据推送到目标PostgreSQL数据库之后,如果源端的MySQL数据库有插入操作,则需要在目标PostgreSQL数据库表中通过添加4个字段记录下来:I_ID 自增,I_STATE = 1,I_FLAG = I,I_TIME 表示记录插入时间。

经过分析调研测试验证,在网上找到有网友在Kubernetes里通过Debezium and KafkaConnect来说实现过,这个场景对我们来说不太贴合,大概了解一下原理,就略过了。链接:Sync MySQL to PostgreSQL Using Debezium and KafkaConnect

同时,看到有网友通过轻量级 CDC debezium-server-databend 构建实时数据同步,简单测试了下,该网友实现了把MySQL同步到databend数据库里,没有直接落库到PostgreSQL数据库。有开发同事,对该项目进行了简单的二开,有实现了部分需求。暂时没有足够的时间和精力继续二开,就地搁置。https://zhuanlan.zhihu.com/p/647470177?utm_id=0

网上有Tapdata工具可以实现该场景,但是需要联系厂家申请试用,他们的人有联系我。但是,我隐隐约约感觉到对方可能会收费。https://bbs.huaweicloud.com/blogs/320806

最后,找到一款go-mysql-postgres的开源工具,经过测试验证,以及在目标端的PostgreSQL数据库表上创建触发器,可以初步实现该功能。

核心操作流程:直接在源端的MySQL数据库服务器上执行操作:安装配置go-mysql-postgres工具,用于读取本地的MySQL数据库的binlog,然后把数据写入到目标端的PostgreSQL数据库。

而go-mysql-postgres工具的主要思路是,源端的MySQL数据库开启binlog,且格式要求是row,源端MySQL和目标端PostgreSQL中的表必须都得有主键。通过mysqldump工具把MySQL中要同步的表备份出来,记录下binlog position,用于后面从哪儿开始读取binlog进行同步。把备份的建表语句改成兼容PostgreSQL的SQL,在PostgreSQL中执行建表,并且把数据执行插入到PostgreSQL数据库里。准备就绪之后,开启go-mysql-postgres程序来从记录下的binlog position位置处开始执行同步。

本文档用于记录如何采用go-mysql-postgres和触发器来实现MySQL数据实时同步到PostgreSQL,并且在目标库表上增加4个标记位字段的操作步骤和详细流程。

1 安装PostgreSQL 15.0

在源端MySQL数据库服务器上执行安装,目的是通过该PostgreSQL安装的psql客户端工具可以连接到目标端的PostgreSQL数据库。

groupadd -g 1688 postgres
useradd -g postgres -u 1688 postgres
echo 'D1ng0D1888!'|passwd --stdin docker
mkdir -p /data/postgres/15.0/
chown -R postgres:postgres /data/postgres/
cp postgresql-15.0.tar.gz  /home/postgres/
​
##解压,编译安装
su - postgres
tar -zxvf postgresql-15.0.tar.gz
​
cd postgresql-15.0/
​
./configure --prefix=/data/postgres/15.0
​
gmake world && gmake install-world
​
​
###修改~/.bashrc
[postgres@dbserver ~]$ cat .bashrc 
# Source default setting
[ -f /etc/bashrc ] && . /etc/bashrc
​
# User environment PATH
PATH="$HOME/.local/bin:$HOME/bin:$PATH:/data/postgres/15.0/bin/"
export PATH
​
export PGPASSWORD=123456Aa*
[postgres@dbserver ~]$     
​
##使之生效
source ~/.bashrc
​
#连接目标端瀚高数据库,这里隐去对应的数据库IP,端口,账户名和密码等信息
[postgres@dbserver ~]$ psql -h pg_ip -p port -U user_name -d db_name
NOTICE:  
-------------------------------------------
Login User: user_name 
Login time: 2024-10-14 16:01:21.042908+08 
Login Address: pg_ip 
Last Login Status: SUCCESS 
Login Failures: 0 
Valied Until: infinity 
-------------------------------------------
​
psql (15.0, server 12.7)
Type "help" for help.
​
db_name=> 

2 下载安装go-mysql-postgres

同样在源端MySQL数据库服务器上执行安装,将来直接在数据库服务器上启动该程序,读取MySQL的binlog,并把数据推送到目标端的PostgreSQL数据库中去。

#下载安装golang,因为go-mysql-postges是用golang写的,所以需要先安装配置golang环境,才可以编译go-mysql-postgres程序
wget https://dl.google.com/go/go1.21.3.linux-amd64.tar.gz
# 解压到/usr/local路径下
tar -C /usr/local -xzf go1.21.3.linux-amd64.tar.gz
​
vi /etc/profile
#添加下述配置项
​
export PATH=$PATH:/usr/local/go/bin
export GOROOT=/usr/local/go
export GOPATH=$HOME/Documents/go
​
#读取golang配置,使之生效
source /etc/profile
​
​
#下载编译go-mysql-postgres
git clone https://gitee.com/tangjunhu/go-mysql-postgres.git
cd go-mysql-postgres/
make
[root@dbserver ~]# go version
go version go1.21.3 linux/amd64
[root@dbserver ~]# cd go-mysql-postgres/
[root@dbserver go-mysql-postgres]# ll
total 60
-rw-r--r-- 1 root root  199 Oct 11 10:45 build.sh
-rw-r--r-- 1 root root  425 Oct 11 10:45 clear_vendor.sh
drwxr-xr-x 3 root root 4096 Oct 11 10:45 cmd
-rw-r--r-- 1 root root  455 Oct 11 10:45 Dockerfile
drwxr-xr-x 2 root root 4096 Oct 11 10:45 elastic
drwxr-xr-x 2 root root 4096 Oct 11 10:45 etc
-rw-r--r-- 1 root root 1046 Oct 11 10:45 go.mod
-rw-r--r-- 1 root root 5088 Oct 11 10:45 go.sum
-rw-r--r-- 1 root root 1076 Oct 11 10:45 LICENSE
-rw-r--r-- 1 root root  255 Oct 11 10:45 Makefile
-rw-r--r-- 1 root root 3399 Oct 11 10:45 README.md
drwxr-xr-x 2 root root 4096 Oct 11 10:45 river
drwxr-xr-x 2 root root 4096 Oct 11 10:45 util
drwxr-xr-x 3 root root 4096 Oct 11 10:45 vendor
[root@dbserver go-mysql-postgres]# make
GO111MODULE=on go build -o bin/go-mysql-postgresql-mw ./cmd/go-mysql-elasticsearch
cat build.sh >build 
chmod a+x build
[root@dbserver go-mysql-postgres]#

3 下载mysql2pgsql工具脚本

该工具脚本的主要作用是可以把mysqldump导出的含有建表语句的SQL和插入SQL转换成兼容PostgreSQL的SQL。其中,会自动把MySQL的自增主键id转换为PostgreSQL的serial序列自增类型字段。缺点是,在建表语句里没有把MySQL建表语句中的USING BTREE关键字去掉,这个需要手工处理一下。

https://github.com/ahammond/mysql2pgsql/blob/master/mysql2pgsql.pl

4 导出MySQL表结构和表数据

mkdir -p /tmp/mysql2pg/
cd /tmp/mysql2pg/
​
mysqldump -h 127.0.0.1 -P端口 -uroot -proot密码 数据库名  --master-data=2  t_policies > t_policies.mysql

5 转换为PostgreSQL的表结构和数据

cd /tmp/mysql2pg/
/root/mysql2pgsql.pl t_policies.mysql t_policies.pg
​
#去掉建表语句中的USING BTREE
sed -i 's/USING BTREE/ /' /tmp/mysql2pg/t_policies.pg

6 导入PostgreSQL数据库:

su - postgres
psql -h -xxxx #连接到目标PostgreSQL数据库,执行SQL建表语句和插入数据 
yth_ta_qlk=> \i /tmp/mysql2pg/t_policies.pg 
psql:/tmp/mysql2pg/t_policies.pg:40: ERROR:  table "t_policies" does not exist
CREATE TABLE
INSERT 0 53
COMMENT
COMMENT
COMMENT
COMMENT
COMMENT
COMMENT
COMMENT
COMMENT
COMMENT
COMMENT
COMMENT
COMMENT
COMMENT
COMMENT
COMMENT
COMMENT
yth_ta_qlk=>

7 修改go-mysql-postgres配置文件

该文件放在/root/go-mysql-postgres路径下,名为river.toml,可以参考/root/go-mysql-postgres/etc/river.toml实例文件进行配置。核心配置内容如下:

# MySQL data source
[[source]]
schema = "mysql_db_name"    #MySQL源库数据库名
tables = ["t_news","t_policies"] #待同步的MySQL表名,如果有多个表的话,以逗号分隔,列出多个要同步的表名即可
​
# 目标PG的连接配置
[[target]]
pg_name = "159.xxx_yth_ta_qlk"      #指定一个目标PostgreSQL数据库的连接标识符
pg_host = "pg_ip"            #目标PostgreSQL数据库的IP
pg_port = pg_port                    #目标PostgreSQL数据库的端口
pg_user = "pg_useranme"  #目标PostgreSQL数据库的用户名
pg_pass = "pg_password"               #目标PostgreSQL数据库的密码
pg_dbname = "pg_dbname"            #目标PostgreSQL数据库的数据库名
​
# MySQL 数据到 PG 后的分发规则
[[rule]]
#mysql 库表的配置
schema = "mysql_db_name"               #源端MySQL数据库名,
table = "t_news"                   #源端待同步的表名
# pg 库表的配置
pg_schema = "tas_fgw_zxqyrzzhxyfwpt"  #目标端pg schema
pg_table = "t_news"                   #目标端表名
​
[[rule]]
#mysql 库表的配置
schema = "mysql_db_name"              #源端MySQL数据库名,
table = "t_policies"              #源端MySQL数据库名,
# pg 库表的配置
pg_schema = "tas_fgw_zxqyrzzhxyfwpt"  #目标端pg schema
pg_table = "t_policies"               #目标端表名

新建/root/go-mysql-postgres/var/master.info文件,该路径和文件默认不存在,需要手工创建出来,其内容如下:bin_name执行MySQL数据库当前的binary log日志文件名,bin_pos指向当前的binary log position位置处。

[root@dbserver go-mysql-postgres]# cat var/master.info 
bin_name = "binlog.000001"
bin_pos = 17466296
[root@dbserver go-mysql-postgres]#

可以通过在MySQL命令行里执行show master status或者show binary logs来获取:

mysql> show master status;
+---------------+----------+--------------+------------------+-------------------+
| File          | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+---------------+----------+--------------+------------------+-------------------+
| binlog.000001 | 17466296 |              |                  |                   |
+---------------+----------+--------------+------------------+-------------------+
1 row in set (0.00 sec)
​
mysql> show binary logs;
+---------------+-----------+
| Log_name      | File_size |
+---------------+-----------+
| binlog.000001 |  17466296 |
+---------------+-----------+
1 row in set (0.00 sec)
​
mysql>   

这里的bin_pos至少要从第一次dump MySQL表时的位置处开始,在第4步骤的导出命令里加了–master-data=2选项,可以从导出文件中获取该信息。一旦同步开启之后,go-mysql-postgres会自动更新该文件信息。

8 启动go-mysql-postgres

cd /root/go-mysql-postgres
​
nohup ./bin/go-mysql-postgresql-mw  -config=river.toml > /root/go-mysql-postgres/reps.log 2 >&1 &
​
tail -f reps.log
[root@dbserver go-mysql-postgres]# tail -f reps.log
[2024/10/14 16:54:53] [info] master.go:110 save position [binlog.000001,17434752]
[2024/10/14 16:54:54] [info] master.go:110 save position [binlog.000001,17434752]
[2024/10/14 16:54:55] [info] master.go:110 save position [binlog.000001,17434752]
[2024/10/14 16:54:56] [info] master.go:110 save position [binlog.000001,17434752]
[2024/10/14 16:54:57] [info] master.go:110 save position [binlog.000001,17434752]
[2024/10/14 16:54:58] [info] master.go:110 save position [binlog.000001,17434752]
[2024/10/14 16:54:59] [info] master.go:110 save position [binlog.000001,17434752]
[2024/10/14 16:55:00] [info] master.go:110 save position [binlog.000001,17434752]
[2024/10/14 16:55:01] [info] master.go:110 save position [binlog.000001,17434752]
[2024/10/14 16:55:02] [info] master.go:110 save position [binlog.000001,17434752]
[2024/10/14 16:55:03] [info] master.go:110 save position [binlog.000001,17434752]
[2024/10/14 16:55:04] [info] master.go:110 save position [binlog.000001,17434752]
....
​
#查看帮助
[root@dbserver go-mysql-postgres]# ./bin/go-mysql-postgresql-mw -h
Usage of ./bin/go-mysql-postgresql-mw:
  -config string
        go-mysql-postgres config file (default "./etc/river.toml")
  -data_dir string
        path for go-mysql-elasticsearch to save data
  -es_addr string
        Elasticsearch addr
  -exec string
        mysqldump execution path
  -flavor string
        flavor: mysql or mariadb
  -log_level string
        log level (default "info")
  -max_procs int
        GOMAXPROCS
  -my_addr string
        MySQL addr
  -my_pass string
        MySQL password
  -my_user string
        MySQL user
  -server_id int
        MySQL server id, as a pseudo slave
[root@dbserver go-mysql-postgres]# 

9 PostgreSQL表添加字段和触发器

alter table t_policies add column i_state varchar(2);
alter table t_policies add column i_flag varchar(2);
alter table t_policies add column i_time timestamp;
alter table t_policies add column i_id serial;
comment on table t_policies is '政策法规信息';
​
--insert
create or replace function t_policies_insert_state_flag_time() returns trigger as
$$
begin
    new.i_state = 1;
    new.i_flag ='I';
    new.i_time = current_timestamp;
    return new;
end
$$
language plpgsql;
​
create trigger t_policies_auto_insert_state_flag_time
  before insert on t_policies 
  for each row execute procedure t_policies_insert_state_flag_time();

10 测试验证

--MySQL插入数据
insert into t_policies(id,title,sort) values(9999,'测试',9999);
​
--pg查看数据
select id,title,sort,i_id,i_state,i_flag,i_time from t_policies;
​
--看到插入数据
 9999 | 测试                                                                                                                                                                             | 9999 |   54 | 1       | I      | 2024-10-12 11:25:23.947781
 
--MySQL删除测试数据
delete from t_policies where id=9999;

11 参考链接:

https://blog.csdn.net/Maslii/article/details/104762949

https://blog.csdn.net/qq_26373925/article/details/110506835

https://gitee.com/tangjunhu/go-mysql-postgres

https://github.com/ahammond/mysql2pgsql/blob/master/mysql2pgsql.pl

https://cloud.tencent.com/developer/article/1506974

https://cloud.tencent.com/developer/article/1506977?from_column=20421&from=20421

https://timothyzhang.medium.com/real-time-cdc-replications-between-mysql-and-postgresql-using-debezium-connectors-24aa33d58f1e

留言