喵星之旅-调皮的大象-maxwell

介绍

Maxwell是由美国Zendesk公司开源,使用Java编写的MySQL变更数据抓取软件。他会实时监控Mysql数据库的数据变更操作(包括insert、update、delete),并将变更数据以JSON的格式发送给Kafka、Kinesi等流数据处理平台。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
插入
mysql> insert into bunny.student values(1,'zhangsan');
{
"database":"bunny",
"table":"student",
"type":"insert",
"ts":1234567,
"xid":154422,
"commit":true,
"data":{
"id":1,
"name":"zhangsan"
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
更新
mysql> update bunny.student set name = 'lisi' where id=1;
{
"database":"bunny",
"table":"student",
"type":"update",
"ts":1245678,
"xid":146733,
"commit":true,
"data":{
"id":1,
"name":"lisi"
},
"old":{
"name":"zhangsan"
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
删除
mysql> delete from bunny.student where id =1;
{
"database":"bunny",
"table":"student",
"type":"delete",
"ts":1633335751,
"xid":1535632,
"commit":true,
"data":{
"id":1,
"name":"lisi"
}
}
1
2
3
4
5
6
7
8
9
10
11
字段说明:
1. database:变更数据所属的数据库
2. table: 变更数据所属的表
3. type: 数据变更类型
4. ts: 数据变更发生的时间
5. xid: 事务id
6. commit: 事务提交标志,可用于重新组装事务
7. data:对于insert类型,表示插入的数据
对于update类型,表示修改之后的数据
对于delete的类型,表示删除的数据
8. old: 对于update的类型,表示修改前的数据,仅包含变更字段

原理

Maxwell的工作原理是实时读取Mysql数据库的二进制日志–Binlog,从中获取变更数据,再将变更数据以Json的格式发送至Kafka等流处理平台。

Maxwell的实现原理就是将自己伪装成Slave,并遵循Mysql主从复制的协议,从master中同步数据。

部署

下载解压

安装包下载地址:https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz

1
tar -zxvf maxwell-1.29.2.tar.gz -C /opt/bunny/

配置mysql

Maxwell要求Binlog采用Row-based模式.

1
2
show VARIABLES like  '%log_bin%'
show VARIABLES like '%binlog_format%'

参考配置

1
2
3
4
5
6
7
8
9
10
11
12
sudo vim /etc/my.cnf


[mysqld]
#数据库id
server-id=1
#启动Binlog,该参数的值会作为binlog的文件名前缀
log-bin=mysql-bin
#binlog类型,maxwell要求为row类型
binlog_format=row
#启动binlog的数据库,需根据实际情况修改配置
binlog-do-db=gmall

配置maxwell

1
2
3
4
cd /opt/bunny/maxwell-1.29.2/
mv config.properties.example config.properties
vim config.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
#Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producer=kafka
#目标Kafka集群地址
kafka.bootstrap.servers=h102:9092,h103:9092,h104:9092
#目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}
kafka_topic=maxwell

#MySQL相关配置
host=u2.kittybunny.cn
port=7006
user=root
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai

使用

Maxwell发送数据的目的地是kafka集群,需要首先将kafka集群启动。

1
2
3
4
5
6
7
8
zk.sh start
kafka.sh start

cd /opt/bunny/kafka/
bin/kafka-topics.sh --bootstrap-server h102:9092 --create --replication-factor 3 --partitions 3 --topic maxwell

bin/kafka-console-consumer.sh --bootstrap-server h102:9092 --topic maxwell

启动Maxwell

1
2
cd /opt/bunny/maxwell-1.29.2/
bin/maxwell --config config.properties --daemon

关闭使用kill

模拟生成数据

1
2
3
4
5
6
cd /opt/bunny/db_log/
# 根据需要配置
vi application.properties
# 运行
java -jar gmall2020-mock-db-2021-11-14.jar

Maxwell启动停止脚本maxwell.sh

1
2
cd /home/bunny/bin/
vi maxwell.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#!/bin/bash

MAXWELL_HOME=/opt/bunny/maxwell-1.29.2

status_maxwell(){
result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`
return $result
}


start_maxwell(){
status_maxwell
if [[ $? -lt 1 ]]; then
echo "启动Maxwell"
$MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemon
else
echo "Maxwell正在运行"
fi
}


stop_maxwell(){
status_maxwell
if [[ $? -gt 0 ]]; then
echo "停止Maxwell"
ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9
else
echo "Maxwell未在运行"
fi
}


case $1 in
start )
start_maxwell
;;
stop )
stop_maxwell
;;
restart )
stop_maxwell
start_maxwell
;;
esac
1
2
chmod +x /home/bunny/bin/maxwell.sh
xsync /home/bunny/bin/

utf8mb3问题

数据库采用mysql5不会有问题,如果使用mysql8需要改maxwell源码。据说是这个类(com.zendesk.maxwell.schema.columndef.StringColumnDef),未验证。里面有utf8和utf8mb4的判断,加上utf8mb3判断即可。

文章目录
  1. 介绍
  2. 原理
  3. 部署
    1. 下载解压
    2. 配置mysql
    3. 配置maxwell
  4. 使用
  5. Maxwell启动停止脚本maxwell.sh
  6. utf8mb3问题
|