专业编程基础技术教程

网站首页 > 基础教程 正文

系统数据实时同步方案一落地 系统间数据同步解决方案

ccvgpt 2024-12-18 12:37:10 基础教程 3 ℃

本文:例如数据库binlog二进制日志文件,实现增量数据实时订阅。

原理:使用数据库主从复制功能。

系统数据实时同步方案一落地 系统间数据同步解决方案

一、数据实时同步binlog的作用

Binlog(Binary Log)是MySQL数据库的一种二进制日志文件,用于记录用户对数据库执行的更新操作(如INSERT、UPDATE、DELETE等),但不包括查询操作。数据实时同步binlog的主要作用包括:

  1. 数据库主从复制:通过binlog,可以实现MySQL数据库的主从复制,即将主数据库上的数据变更实时同步到从数据库上。
  2. 数据增量恢复:在数据库发生故障时,可以利用binlog进行数据的增量恢复,以恢复丢失的数据。
  3. 数据实时同步:binlog还可以用于不同系统或数据库之间的数据实时同步,确保数据的一致性和及时性。

二、落地实现

  1. 开启数据库binlog功能
  • 查看是否开启binlog功能:show variables like 'log_bin';



  • 编辑配置文件 my.cnf

vi /etc/my.cnf

追加内容:

log-bin=mysql-bin       #binlog文件名
binlog_format=ROW  #选择row模式
server_id=1                #mysql实例id,不能重复,唯一


重启数据库,再次查询:



  • 创建用户账号,并授权该用户拥有从机功能权限
#创建 test 的用户,密码为 123456,% 表示任意地址都可远程登录。
CREATE USER 'test'@'%' IDENTIFIED BY '123456';

#给test 用户授权同步复制等的权限(REPLICATION SLAVE)
GRANT REPLICATION SLAVE ON *.* TO 'test'@'%';  

FLUSH PRIVILEGES;
  1. 功能实现
  • mvn引入依赖
<dependency>
      <groupId>com.github.shyiko</groupId>
      <artifactId>mysql-binlog-connector-java</artifactId>
      <version>0.21.0</version>
</dependency>
  • 核心代码
@SpringBootTest
public class DataSyncTestApplication {

    private final Map<Long, TableMapEventData> tableMap = new HashMap<>();
    private final Map<String, List<ColumnMetadata>> columnMetadataMap = new HashMap<>();
    @Resource
    private InforMationSchemaMapper inforMationSchemaMapper;

    @Test
    public void test() throws IOException {
        BinaryLogClient client = new BinaryLogClient("127.0.0.1", 3306,
            "test", "123456");
        client.setServerId(1);
        client.setKeepAlive(true);
        client.setKeepAliveInterval(1l);
        client.setHeartbeatInterval(3l);
        client.setConnectTimeout(6l);
        client.registerEventListener(new BinaryLogClient.EventListener() {
            @Override
            public void onEvent(Event event) {
                EventHeaderV4 headerV4 = event.getHeader();
                EventType eventType = headerV4.getEventType();
                if (eventType == EventType.TABLE_MAP) {
                    TableMapEventData eventData = event.getData();
                    tableMap.put(eventData.getTableId(), eventData);
                } else {
                    if (EventType.isRowMutation(eventType)) {
                        RowMutationEventData rowMutationEventData = new RowMutationEventData(event.getData());
                        TableMapEventData tableMapEventData = tableMap.get(rowMutationEventData.getTableId());
                        if (tableMapEventData != null) {
                            String database = tableMapEventData.getDatabase();
                            String table = tableMapEventData.getTable();
                            if (EventType.isUpdate(eventType)) {
                                update(database, table, rowMutationEventData.getUpdateRows());
                                return;
                            }
                            if (EventType.isDelete(eventType)) {
                                delete(database, table, rowMutationEventData.getDeleteRows());
                                return;
                            }
                            if (EventType.isWrite(eventType)) {
                                insert(database, table, rowMutationEventData.getInsertRows());
                            }
                        }
                    }
                }
            }
        });
        client.connect();
    }


    @Data
    public static class RowMutationEventData {
        //表id
        private long tableId;
        //新增数据
        private List<Serializable[]> insertRows;
        //删除数据
        private List<Serializable[]> deleteRows;
        //更新数据
        private List<Map.Entry<Serializable[], Serializable[]>> updateRows;

        public RowMutationEventData(EventData eventData) {
            //更新数据
            if (eventData instanceof UpdateRowsEventData) {
                UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) eventData;
                this.tableId = updateRowsEventData.getTableId();
                this.updateRows = updateRowsEventData.getRows();
                return;
            }
            //新增数据
            if (eventData instanceof WriteRowsEventData) {
                WriteRowsEventData writeRowsEventData = (WriteRowsEventData) eventData;
                this.tableId = writeRowsEventData.getTableId();
                this.insertRows = writeRowsEventData.getRows();
                return;
            }
            //删除数据
            if (eventData instanceof DeleteRowsEventData) {
                DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) eventData;
                this.tableId = deleteRowsEventData.getTableId();
                this.deleteRows = deleteRowsEventData.getRows();
            }
        }
    }


    /**
     * 新增
     * @param databaseName
     * @param tableName
     * @param data
     */
    public void insert(String databaseName, String tableName, List<Serializable[]> data) {
        List<ColumnMetadata> columns = getColumns(databaseName, tableName);
        BinlogEvent binlogEvent = buildBinlogEvent(databaseName, tableName, OperationType.INSERT.getDesc());
        data.forEach(row -> {
            binlogEvent.setData(toEntity(columns, row));
            System.out.println("新增数据===" + binlogEvent);
        });
    }

    /**
     * 更新
     * @param databaseName
     * @param tableName
     * @param data
     */
    public void update(String databaseName, String tableName, List<Map.Entry<Serializable[], Serializable[]>> data) {
        List<ColumnMetadata> columns = getColumns(databaseName, tableName);
        BinlogEvent binlogEvent = buildBinlogEvent(databaseName, tableName, OperationType.UPDATE.getDesc());
        data.forEach(row -> {
            binlogEvent.setData(toEntity(columns, row.getValue()));
            binlogEvent.setOriginalData(toEntity(columns, row.getKey()));
            System.out.println("更新数据===" + binlogEvent);
        });
    }

    /**
     * 删除
     * @param databaseName
     * @param tableName
     * @param data
     */
    public void delete(String databaseName, String tableName, List<Serializable[]> data) {
        List<ColumnMetadata> columns = getColumns(databaseName, tableName);
        BinlogEvent binlogEvent = buildBinlogEvent(databaseName, tableName, OperationType.DELETE.getDesc());
        data.forEach(row -> {
            binlogEvent.setData(toEntity(columns, row));
            System.out.println("删除数据===" + binlogEvent);
        });
    }

    //构建BinlogEvent
    private BinlogEvent buildBinlogEvent(String databaseName, String tableName, String operationType) {
        BinlogEvent binlogEvent = new BinlogEvent<>();
        binlogEvent.setDatabase(databaseName);
        binlogEvent.setTable(tableName);
        binlogEvent.setOperationType(operationType);
        binlogEvent.setTimestamp(System.currentTimeMillis());
        return binlogEvent;
    }
    
    @SneakyThrows
    public Object toEntity(List<ColumnMetadata> columns, Serializable[] data) {
        Map<String, Object> obj = new HashMap<>(columns.size());
        for (int i = 0; i < data.length; i++) {
            ColumnMetadata column = columns.get(i);
            Serializable fieldValue = data[i];
            if (fieldValue instanceof Date) {
                if (fieldValue != null) {
                    data[i] = new Date(((Date) fieldValue).getTime() + 0);
                }
            } else if (fieldValue instanceof byte[]) {
                if (fieldValue != null) {
                    data[i] = new String((byte[]) fieldValue, StandardCharsets.UTF_8);
                }
            } else if (fieldValue instanceof BitSet) {
                if (fieldValue != null) {
                    data[i] = !((BitSet) fieldValue).isEmpty();
                }
            }
            obj.put(column.getColumnName(), data[i]);
        }
        return obj;
    }

    //获取当前表字段与类型信息
    public List<ColumnMetadata> getColumns(String databaseName, String tableName) {
        String tableSchema = String.format("%s.%s", databaseName, tableName);
        List<ColumnMetadata> columns = columnMetadataMap.get(tableSchema);
        if (columns == null) {
            columns = inforMationSchemaMapper.getColumns(databaseName, tableName);
            columnMetadataMap.put(tableSchema, columns);
        }
        return columns;
    }

    //事件实体
    @Data
    public class BinlogEvent<T> {

        /**
         * 操作类型(INSERT,UPDATE,DELETE)
         */
        private String operationType;

        /**
         * 来源 Table
         */
        private String table;

        /**
         * 来源 Database
         */
        private String database;

        /**
         * 新数据
         */
        private T data;

        /**
         * 原数据
         */
        private T originalData;

        /**
         * 时间戳
         */
        private Long timestamp;
    }
}


InforMationSchemaMapper:


    /**
     * 根据库,表获取字段信息
     *
     * @param database 数据库
     * @param table    表
     * @return
     */
    @Select(" select COLUMN_NAME        as columnName,\n" +
            "        DATA_TYPE          as dataType,\n" +
            "        CHARACTER_SET_NAME as characterSetName\n" +
            "        from INFORMATION_SCHEMA.COLUMNS\n" +
            " where TABLE_SCHEMA = #{database} and TABLE_NAME = #{table}\n" +
            " order by ORDINAL_POSITION asc")
    List<ColumnMetadata> getColumns(@Param("database") String database, @Param("table") String table);

Tags:

最近发表
标签列表