网站首页 > 基础教程 正文
本文:例如数据库binlog二进制日志文件,实现增量数据实时订阅。
原理:使用数据库主从复制功能。
一、数据实时同步binlog的作用
Binlog(Binary Log)是MySQL数据库的一种二进制日志文件,用于记录用户对数据库执行的更新操作(如INSERT、UPDATE、DELETE等),但不包括查询操作。数据实时同步binlog的主要作用包括:
- 数据库主从复制:通过binlog,可以实现MySQL数据库的主从复制,即将主数据库上的数据变更实时同步到从数据库上。
- 数据增量恢复:在数据库发生故障时,可以利用binlog进行数据的增量恢复,以恢复丢失的数据。
- 数据实时同步:binlog还可以用于不同系统或数据库之间的数据实时同步,确保数据的一致性和及时性。
二、落地实现
- 开启数据库binlog功能
- 查看是否开启binlog功能:show variables like 'log_bin';
- 编辑配置文件 my.cnf
vi /etc/my.cnf
追加内容:
log-bin=mysql-bin #binlog文件名
binlog_format=ROW #选择row模式
server_id=1 #mysql实例id,不能重复,唯一
重启数据库,再次查询:
- 创建用户账号,并授权该用户拥有从机功能权限
#创建 test 的用户,密码为 123456,% 表示任意地址都可远程登录。
CREATE USER 'test'@'%' IDENTIFIED BY '123456';
#给test 用户授权同步复制等的权限(REPLICATION SLAVE)
GRANT REPLICATION SLAVE ON *.* TO 'test'@'%';
FLUSH PRIVILEGES;
- 功能实现:
- mvn引入依赖
<dependency>
<groupId>com.github.shyiko</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.21.0</version>
</dependency>
- 核心代码
@SpringBootTest
public class DataSyncTestApplication {
private final Map<Long, TableMapEventData> tableMap = new HashMap<>();
private final Map<String, List<ColumnMetadata>> columnMetadataMap = new HashMap<>();
@Resource
private InforMationSchemaMapper inforMationSchemaMapper;
@Test
public void test() throws IOException {
BinaryLogClient client = new BinaryLogClient("127.0.0.1", 3306,
"test", "123456");
client.setServerId(1);
client.setKeepAlive(true);
client.setKeepAliveInterval(1l);
client.setHeartbeatInterval(3l);
client.setConnectTimeout(6l);
client.registerEventListener(new BinaryLogClient.EventListener() {
@Override
public void onEvent(Event event) {
EventHeaderV4 headerV4 = event.getHeader();
EventType eventType = headerV4.getEventType();
if (eventType == EventType.TABLE_MAP) {
TableMapEventData eventData = event.getData();
tableMap.put(eventData.getTableId(), eventData);
} else {
if (EventType.isRowMutation(eventType)) {
RowMutationEventData rowMutationEventData = new RowMutationEventData(event.getData());
TableMapEventData tableMapEventData = tableMap.get(rowMutationEventData.getTableId());
if (tableMapEventData != null) {
String database = tableMapEventData.getDatabase();
String table = tableMapEventData.getTable();
if (EventType.isUpdate(eventType)) {
update(database, table, rowMutationEventData.getUpdateRows());
return;
}
if (EventType.isDelete(eventType)) {
delete(database, table, rowMutationEventData.getDeleteRows());
return;
}
if (EventType.isWrite(eventType)) {
insert(database, table, rowMutationEventData.getInsertRows());
}
}
}
}
}
});
client.connect();
}
@Data
public static class RowMutationEventData {
//表id
private long tableId;
//新增数据
private List<Serializable[]> insertRows;
//删除数据
private List<Serializable[]> deleteRows;
//更新数据
private List<Map.Entry<Serializable[], Serializable[]>> updateRows;
public RowMutationEventData(EventData eventData) {
//更新数据
if (eventData instanceof UpdateRowsEventData) {
UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) eventData;
this.tableId = updateRowsEventData.getTableId();
this.updateRows = updateRowsEventData.getRows();
return;
}
//新增数据
if (eventData instanceof WriteRowsEventData) {
WriteRowsEventData writeRowsEventData = (WriteRowsEventData) eventData;
this.tableId = writeRowsEventData.getTableId();
this.insertRows = writeRowsEventData.getRows();
return;
}
//删除数据
if (eventData instanceof DeleteRowsEventData) {
DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) eventData;
this.tableId = deleteRowsEventData.getTableId();
this.deleteRows = deleteRowsEventData.getRows();
}
}
}
/**
* 新增
* @param databaseName
* @param tableName
* @param data
*/
public void insert(String databaseName, String tableName, List<Serializable[]> data) {
List<ColumnMetadata> columns = getColumns(databaseName, tableName);
BinlogEvent binlogEvent = buildBinlogEvent(databaseName, tableName, OperationType.INSERT.getDesc());
data.forEach(row -> {
binlogEvent.setData(toEntity(columns, row));
System.out.println("新增数据===" + binlogEvent);
});
}
/**
* 更新
* @param databaseName
* @param tableName
* @param data
*/
public void update(String databaseName, String tableName, List<Map.Entry<Serializable[], Serializable[]>> data) {
List<ColumnMetadata> columns = getColumns(databaseName, tableName);
BinlogEvent binlogEvent = buildBinlogEvent(databaseName, tableName, OperationType.UPDATE.getDesc());
data.forEach(row -> {
binlogEvent.setData(toEntity(columns, row.getValue()));
binlogEvent.setOriginalData(toEntity(columns, row.getKey()));
System.out.println("更新数据===" + binlogEvent);
});
}
/**
* 删除
* @param databaseName
* @param tableName
* @param data
*/
public void delete(String databaseName, String tableName, List<Serializable[]> data) {
List<ColumnMetadata> columns = getColumns(databaseName, tableName);
BinlogEvent binlogEvent = buildBinlogEvent(databaseName, tableName, OperationType.DELETE.getDesc());
data.forEach(row -> {
binlogEvent.setData(toEntity(columns, row));
System.out.println("删除数据===" + binlogEvent);
});
}
//构建BinlogEvent
private BinlogEvent buildBinlogEvent(String databaseName, String tableName, String operationType) {
BinlogEvent binlogEvent = new BinlogEvent<>();
binlogEvent.setDatabase(databaseName);
binlogEvent.setTable(tableName);
binlogEvent.setOperationType(operationType);
binlogEvent.setTimestamp(System.currentTimeMillis());
return binlogEvent;
}
@SneakyThrows
public Object toEntity(List<ColumnMetadata> columns, Serializable[] data) {
Map<String, Object> obj = new HashMap<>(columns.size());
for (int i = 0; i < data.length; i++) {
ColumnMetadata column = columns.get(i);
Serializable fieldValue = data[i];
if (fieldValue instanceof Date) {
if (fieldValue != null) {
data[i] = new Date(((Date) fieldValue).getTime() + 0);
}
} else if (fieldValue instanceof byte[]) {
if (fieldValue != null) {
data[i] = new String((byte[]) fieldValue, StandardCharsets.UTF_8);
}
} else if (fieldValue instanceof BitSet) {
if (fieldValue != null) {
data[i] = !((BitSet) fieldValue).isEmpty();
}
}
obj.put(column.getColumnName(), data[i]);
}
return obj;
}
//获取当前表字段与类型信息
public List<ColumnMetadata> getColumns(String databaseName, String tableName) {
String tableSchema = String.format("%s.%s", databaseName, tableName);
List<ColumnMetadata> columns = columnMetadataMap.get(tableSchema);
if (columns == null) {
columns = inforMationSchemaMapper.getColumns(databaseName, tableName);
columnMetadataMap.put(tableSchema, columns);
}
return columns;
}
//事件实体
@Data
public class BinlogEvent<T> {
/**
* 操作类型(INSERT,UPDATE,DELETE)
*/
private String operationType;
/**
* 来源 Table
*/
private String table;
/**
* 来源 Database
*/
private String database;
/**
* 新数据
*/
private T data;
/**
* 原数据
*/
private T originalData;
/**
* 时间戳
*/
private Long timestamp;
}
}
InforMationSchemaMapper:
/**
* 根据库,表获取字段信息
*
* @param database 数据库
* @param table 表
* @return
*/
@Select(" select COLUMN_NAME as columnName,\n" +
" DATA_TYPE as dataType,\n" +
" CHARACTER_SET_NAME as characterSetName\n" +
" from INFORMATION_SCHEMA.COLUMNS\n" +
" where TABLE_SCHEMA = #{database} and TABLE_NAME = #{table}\n" +
" order by ORDINAL_POSITION asc")
List<ColumnMetadata> getColumns(@Param("database") String database, @Param("table") String table);
猜你喜欢
- 2024-12-18 吊打 ThreadLocal,谈谈FastThreadLocal为啥能这么快?
- 2024-12-18 分布式锁中的王者方案 - Redisson
- 2024-12-18 你管这玩意儿叫高并发? 什么叫高并发
- 2024-12-18 分布式锁工具:Redisson 分布式锁 redis zookeeper
- 2024-12-18 Spring Cloud Circuit Breaker快速入门Demo
- 2024-12-18 BitMap是啥?脑袋一下空白? bitmap文件头
- 2024-12-18 一亿个8位数字,用什么排序方法 一亿个8位数字,用什么排序方法最好
- 2024-12-18 Java基础-数据类型和数据结构,初阶小白看过来~
- 2024-12-18 10张图带你搞定高并发之网络IO模型
- 2024-12-18 「微服务架构」分布式锁Redission官方文档
- 最近发表
- 标签列表
-
- gitpush (61)
- pythonif (68)
- location.href (57)
- tail-f (57)
- pythonifelse (59)
- deletesql (62)
- c++模板 (62)
- css3动画 (57)
- c#event (59)
- linuxgzip (68)
- 字符串连接 (73)
- nginx配置文件详解 (61)
- html标签 (69)
- c++初始化列表 (64)
- exec命令 (59)
- canvasfilltext (58)
- mysqlinnodbmyisam区别 (63)
- arraylistadd (66)
- node教程 (59)
- console.table (62)
- c++time_t (58)
- phpcookie (58)
- mysqldatesub函数 (63)
- window10java环境变量设置 (66)
- c++虚函数和纯虚函数的区别 (66)