介绍
Maxwell是由美国Zendesk公司开源,使用Java编写的MySQL变更数据抓取软件。他会实时监控Mysql数据库的数据变更操作(包括insert、update、delete),并将变更数据以JSON的格式发送给Kafka、Kinesi等流数据处理平台。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| 插入 mysql> insert into bunny.student values(1,'zhangsan'); { "database":"bunny", "table":"student", "type":"insert", "ts":1234567, "xid":154422, "commit":true, "data":{ "id":1, "name":"zhangsan" } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| 更新 mysql> update bunny.student set name = 'lisi' where id=1; { "database":"bunny", "table":"student", "type":"update", "ts":1245678, "xid":146733, "commit":true, "data":{ "id":1, "name":"lisi" }, "old":{ "name":"zhangsan" } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| 删除 mysql> delete from bunny.student where id =1; { "database":"bunny", "table":"student", "type":"delete", "ts":1633335751, "xid":1535632, "commit":true, "data":{ "id":1, "name":"lisi" } }
|
1 2 3 4 5 6 7 8 9 10 11
| 字段说明: 1. database:变更数据所属的数据库 2. table: 变更数据所属的表 3. type: 数据变更类型 4. ts: 数据变更发生的时间 5. xid: 事务id 6. commit: 事务提交标志,可用于重新组装事务 7. data:对于insert类型,表示插入的数据 对于update类型,表示修改之后的数据 对于delete的类型,表示删除的数据 8. old: 对于update的类型,表示修改前的数据,仅包含变更字段
|
原理
Maxwell的工作原理是实时读取Mysql数据库的二进制日志–Binlog,从中获取变更数据,再将变更数据以Json的格式发送至Kafka等流处理平台。
Maxwell的实现原理就是将自己伪装成Slave,并遵循Mysql主从复制的协议,从master中同步数据。
部署
下载解压
安装包下载地址:https://github.com/zendesk/maxwell/releases/download/v1.29.2/maxwell-1.29.2.tar.gz
1
| tar -zxvf maxwell-1.29.2.tar.gz -C /opt/bunny/
|
配置mysql
Maxwell要求Binlog采用Row-based模式.
1 2
| show VARIABLES like '%log_bin%' show VARIABLES like '%binlog_format%'
|
参考配置
1 2 3 4 5 6 7 8 9 10 11 12
| sudo vim /etc/my.cnf
[mysqld] #数据库id server-id=1 #启动Binlog,该参数的值会作为binlog的文件名前缀 log-bin=mysql-bin #binlog类型,maxwell要求为row类型 binlog_format=row #启动binlog的数据库,需根据实际情况修改配置 binlog-do-db=gmall
|
配置maxwell
1 2 3 4
| cd /opt/bunny/maxwell-1.29.2/ mv config.properties.example config.properties vim config.properties
|
1 2 3 4 5 6 7 8 9 10 11 12 13
| #Maxwell数据发送目的地,可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis producer=kafka #目标Kafka集群地址 kafka.bootstrap.servers=h102:9092,h103:9092,h104:9092 #目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table} kafka_topic=maxwell
#MySQL相关配置 host=u2.kittybunny.cn port=7006 user=root password=maxwell jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai
|
使用
Maxwell发送数据的目的地是kafka集群,需要首先将kafka集群启动。
1 2 3 4 5 6 7 8
| zk.sh start kafka.sh start
cd /opt/bunny/kafka/ bin/kafka-topics.sh --bootstrap-server h102:9092 --create --replication-factor 3 --partitions 3 --topic maxwell
bin/kafka-console-consumer.sh --bootstrap-server h102:9092 --topic maxwell
|
启动Maxwell
1 2
| cd /opt/bunny/maxwell-1.29.2/ bin/maxwell --config config.properties --daemon
|
关闭使用kill
模拟生成数据
1 2 3 4 5 6
| cd /opt/bunny/db_log/ # 根据需要配置 vi application.properties # 运行 java -jar gmall2020-mock-db-2021-11-14.jar
|
Maxwell启动停止脚本maxwell.sh
1 2
| cd /home/bunny/bin/ vi maxwell.sh
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| #!/bin/bash
MAXWELL_HOME=/opt/bunny/maxwell-1.29.2
status_maxwell(){ result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l` return $result }
start_maxwell(){ status_maxwell if [[ $? -lt 1 ]]; then echo "启动Maxwell" $MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemon else echo "Maxwell正在运行" fi }
stop_maxwell(){ status_maxwell if [[ $? -gt 0 ]]; then echo "停止Maxwell" ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9 else echo "Maxwell未在运行" fi }
case $1 in start ) start_maxwell ;; stop ) stop_maxwell ;; restart ) stop_maxwell start_maxwell ;; esac
|
1 2
| chmod +x /home/bunny/bin/maxwell.sh xsync /home/bunny/bin/
|
utf8mb3问题
数据库采用mysql5不会有问题,如果使用mysql8需要改maxwell源码。据说是这个类(com.zendesk.maxwell.schema.columndef.StringColumnDef),未验证。里面有utf8和utf8mb4的判断,加上utf8mb3判断即可。