专业编程基础技术教程

网站首页 > 基础教程 正文

一文掌握springboot实时订阅数据库binlog

ccvgpt 2024-12-18 12:36:27 基础教程 5 ℃

前言:

数据发生变更实时订阅在多种场景中发挥着重要作用。Binlog是MySQL等数据库用来记录所有数据库表变化的日志,通过订阅Binlog,可以实时获取数据库的所有变更,并将这些变更同步到其他系统或组件中。

一: Binlog(Binary Log)

Binlog(Binary Log)是MySQL数据库的一种二进制日志文件,用于记录用户对数据库执行的更新操作(如INSERT、UPDATE、DELETE等),但不包括查询操作。以下是一些实时订阅Binlog的主要应用场景:

一文掌握springboot实时订阅数据库binlog

  • 数据异构:通过订阅Binlog,可以将数据从一个系统或数据库异构到另一个系统或数据库中,例如将数据从MySQL同步到Elasticsearch或其他NoSQL数据库中,以满足不同的数据访问需求。
  • 缓存更新:当数据库中的数据发生变化时,通过订阅Binlog可以实时更新缓存中的数据,从而确保缓存与数据库之间的一致性。
  • 任务分发和消息通知:在一些业务场景中,当数据发生变化时,需要触发相应的任务或发送消息通知其他系统。通过订阅Binlog,可以在数据变更时实时触发这些任务或发送消息通知,从而实现业务逻辑的自动化处理。
  • 数据审计和监控:通过订阅Binlog,可以实时监控数据的更改情况,从而实现数据审计的需求。例如,可以记录用户对数据库的修改操作,以便在需要时进行追溯和审查。

二、binlog配置

  • 1.查看是否开启binlog
#查看是否开启binlog功能 ,OFF表示没开启,ON表示开启
show variables like 'log_bin'
  • 2.开启binlog
1.编辑配置文件
vi /etc/my.cnf

2.添加内容
log-bin=mysql-bin       #binlog文件名
binlog_format=ROW    #选择row模式
server_id=1                   #mysql实例id,不能重复,唯一

3.重启数据库:
  • 3.创建用户账号,并授权该用户拥有功能权限
#创建 test 的用户,密码为 123456,% 表示任意地址都可远程登录。
CREATE USER 'test'@'%' IDENTIFIED BY '123456';

#给test 用户授权同步复制等的权限(REPLICATION SLAVE)
GRANT REPLICATION SLAVE ON *.* TO 'test'@'%';  

#刷新
FLUSH PRIVILEGES;

三:代码实现

  • 1.mvn依赖
<dependency>
      <groupId>com.github.shyiko</groupId>
      <artifactId>mysql-binlog-connector-java</artifactId>
      <version>0.21.0</version>
</dependency>


  • 2.定义事件类
@Data
public class BinlogEvent<T> {
    /**
     * 操作类型(INSERT,UPDATE,DELETE)
     */
    private String operationType;

    /**
     * 来源 表
     */
    private String table;

    /**
     * 来源 库
     */
    private String database;

    /**
     * 新数据
     */
    private T data;

    /**
     * 原始数据(针对update事件)
     */
    private T originalData;

}
  • 3.定义列数据信息

这里对应的是数据库表中列的信息

@Data
public class ColumnMetadata {
    //列名称
    private String columnName;
    //列类型
    private String dataType;
    //字符集
    private String characterSetName;
}
  • 4.定义jdbc工具

这里是获取库表的列详情信息,这里可以优化成使用数据源,提高性能。

public class JdbcUtil {

    private static Map<String, List<ColumnMetadata>> columnMetadataMap = new HashMap<>();

    public static List<ColumnMetadata> getColumns(String database, String table) {
        String tableSchema = String.format("%s.%s", database, table);
        List<ColumnMetadata> columns = columnMetadataMap.get(tableSchema);
        try {
            if (columns == null) {
                Class.forName("org.mariadb.jdbc.Driver");
                Connection connection = DriverManager.getConnection("jdbc:mariadb://192.168.180.128.7:3306/dsmp_center?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8&autoReconnect=true&rewriteBatchedStatements=true&allowPublicKeyRetrieval=true",
                        "root", "123456");
              //查询表中列信息
                PreparedStatement statement = connection.prepareStatement(
                        "select COLUMN_NAME, DATA_TYPE, CHARACTER_SET_NAME from INFORMATION_SCHEMA.COLUMNS " +
                                "where TABLE_SCHEMA=? and TABLE_NAME=? order by ORDINAL_POSITION asc;");
                statement.setString(1, database);
                statement.setString(2, table);
                ResultSet resultSet = statement.executeQuery();
                List<ColumnMetadata> columnMetadataList = new ArrayList<>();
                while (resultSet.next()) {
                    ColumnMetadata column = new ColumnMetadata();
                    column.setColumnName(resultSet.getString("COLUMN_NAME"));
                    column.setDataType(resultSet.getString("DATA_TYPE"));
                    column.setCharacterSetName(resultSet.getString("CHARACTER_SET_NAME"));
                    columnMetadataList.add(column);
                }
                columns = columnMetadataList;
                columnMetadataMap.put(tableSchema, columnMetadataList);
            }
            return columns;
        } catch (SQLException e) {
        } catch (Exception e) {
        }
        return columns;
    }
}

5.定义转换成事件实体工具

这个工具类的作用是将binlog监听到表中的数据变更,进行与表的列一一对应。

public class ConverEntity {

    @SneakyThrows
    public static Object toEntity(List<ColumnMetadata> columns, Serializable[] data) {
        Map<String, Object> obj = new HashMap<>(columns.size());
        for (int i = 0; i < data.length; i++) {
            ColumnMetadata column = columns.get(i);
            Serializable fieldValue = data[i];
            if (fieldValue instanceof Date) {
                if (fieldValue != null) {
                    data[i] = new Date(((Date) fieldValue).getTime() + 0);
                }
            } else if (fieldValue instanceof byte[]) {
                if (fieldValue != null) {
                    data[i] = new String((byte[]) fieldValue, StandardCharsets.UTF_8);
                }
            } else if (fieldValue instanceof BitSet) {
                if (fieldValue != null) {
                    data[i] = !((BitSet) fieldValue).isEmpty();
                }
            }
            obj.put(column.getColumnName(), data[i]);
        }
        return obj;
    }
}

6.核心代码

@Slf4j
@Component
@RequiredArgsConstructor
public class BinlogClient implements CommandLineRunner {

    private final Map<Long, TableMapEventData> tableMap = new HashMap<>();
    private final Map<String, List<ColumnMetadata>> columnMetadataMap = new HashMap<>();

    @Override
    public void run(String... args) throws Exception {
        log.info("启动 Binlog 客户端 - 连接服务端");
      	//这里对应上面创建的用户test/123456
        BinaryLogClient client = new BinaryLogClient("192.168.160.128", 3306, "test", "123456");
      	// #mysql实例id
        client.setServerId(1l);
      
        client.registerEventListener(new BinaryLogClient.EventListener() {
            @Override
            public void onEvent(Event event) {
                EventHeaderV4 headerV4 = event.getHeader();
                EventType eventType = headerV4.getEventType();
          			//每张表有一个表id,对应事件信息(表名,库名等其他信息)
                if (eventType == EventType.TABLE_MAP) {
                    TableMapEventData eventData = event.getData();
                    tableMap.put(eventData.getTableId(), eventData);
                } else {
                  //insert,update,delete事件
                    if (EventType.isRowMutation(eventType)) {
                        EventData eventData = event.getData();
                      	
                      //update
                        if (eventData instanceof UpdateRowsEventData) {
                            UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) eventData;
                            long tableId = updateRowsEventData.getTableId();
                            List<Map.Entry<Serializable[], Serializable[]>> rows = updateRowsEventData.getRows();
                            TableMapEventData tableMapEventData = tableMap.get(tableId);

                            if (tableMapEventData != null) {
                                String database = tableMapEventData.getDatabase();
                                String table = tableMapEventData.getTable();
                                if (EventType.isUpdate(eventType)) {
                                    update(database, table, rows);
                                }
                            }
                        }

                      	//insert
                        if (eventData instanceof WriteRowsEventData) {
                            WriteRowsEventData writeRowsEventData = (WriteRowsEventData) eventData;
                            long tableId = writeRowsEventData.getTableId();
                            List<Serializable[]> rows = writeRowsEventData.getRows();
                            TableMapEventData tableMapEventData = tableMap.get(tableId);
                            if (tableMapEventData != null) {
                                String database = tableMapEventData.getDatabase();
                                String table = tableMapEventData.getTable();
                                if (EventType.isWrite(eventType)) {
                                    insert(database, table, rows);
                                }
                            }
                        }

                      //delete
                        if (eventData instanceof DeleteRowsEventData) {
                            DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) eventData;
                            long tableId = deleteRowsEventData.getTableId();
                            List<Serializable[]> rows = deleteRowsEventData.getRows();
                            TableMapEventData tableMapEventData = tableMap.get(tableId);
                            if (tableMapEventData != null) {
                                String database = tableMapEventData.getDatabase();
                                String table = tableMapEventData.getTable();
                                if (EventType.isDelete(eventType)) {
                                    delete(database, table, rows);
                                }
                            }
                        }
                    }
                }
            }
        });
			//连接
        client.connect();
    }

    //insert操作
       public void insert(String databaseName, String tableName, List<Serializable[]> data) {
        List<ColumnMetadata> columns = JdbcUtil.getColumns(databaseName, tableName);
        BinlogEvent binlogEvent = buildBinlogEvent(databaseName, tableName, "insert");
        data.forEach(row -> {
            binlogEvent.setData(ConverEntity.toEntity(columns, row));
        });
        log.info("insert操作:库名:{},表名:{},数据:{}", databaseName, tableName, binlogEvent);
    }

     //update操作
    public void update(String databaseName, String tableName, List<Map.Entry<Serializable[], Serializable[]>> data) {
        List<ColumnMetadata> columns = JdbcUtil.getColumns(databaseName, tableName);
        BinlogEvent binlogEvent = buildBinlogEvent(databaseName, tableName, "update");
        data.forEach(row -> {
            binlogEvent.setData(ConverEntity.toEntity(columns, row.getValue()));
            binlogEvent.setOriginalData(ConverEntity.toEntity(columns, row.getKey()));
        });
        log.info("库名:{},表名:{},数据:{}", databaseName, tableName, binlogEvent);
    }

       //delete操作
    public void delete(String databaseName, String tableName, List<Serializable[]> data) {
        List<ColumnMetadata> columns = JdbcUtil.getColumns(databaseName, tableName);
        BinlogEvent binlogEvent = buildBinlogEvent(databaseName, tableName, "delete");
        data.forEach(row -> {
            binlogEvent.setData(ConverEntity.toEntity(columns, row));
        });
        log.info("库名:{},表名:{},数据:{}", databaseName, tableName, binlogEvent);
    }

    //构建BinlogEvent
    private BinlogEvent buildBinlogEvent(String databaseName, String tableName, String operationType) {
        BinlogEvent binlogEvent = new BinlogEvent<>();
        binlogEvent.setDatabase(databaseName);
        binlogEvent.setTable(tableName);
        binlogEvent.setOperationType(operationType);
        return binlogEvent;
    }

}


总结:

需要注意的是,虽然实时订阅Binlog带来了很多便利和优势,但也存在一些挑战和限制。例如,订阅Binlog会增加系统的复杂度和资源消耗;在极端情况下,可能会出现数据丢失或不一致的问题;此外,还需要合理管理Redis和数据库的连接以及消费者数量等。因此,在实际应用中需要根据具体场景和需求进行权衡和选择。

Tags:

最近发表
标签列表