网站首页 > 基础教程 正文
前言:
数据发生变更实时订阅在多种场景中发挥着重要作用。Binlog是MySQL等数据库用来记录所有数据库表变化的日志,通过订阅Binlog,可以实时获取数据库的所有变更,并将这些变更同步到其他系统或组件中。
一: Binlog(Binary Log)
Binlog(Binary Log)是MySQL数据库的一种二进制日志文件,用于记录用户对数据库执行的更新操作(如INSERT、UPDATE、DELETE等),但不包括查询操作。以下是一些实时订阅Binlog的主要应用场景:
- 数据异构:通过订阅Binlog,可以将数据从一个系统或数据库异构到另一个系统或数据库中,例如将数据从MySQL同步到Elasticsearch或其他NoSQL数据库中,以满足不同的数据访问需求。
- 缓存更新:当数据库中的数据发生变化时,通过订阅Binlog可以实时更新缓存中的数据,从而确保缓存与数据库之间的一致性。
- 任务分发和消息通知:在一些业务场景中,当数据发生变化时,需要触发相应的任务或发送消息通知其他系统。通过订阅Binlog,可以在数据变更时实时触发这些任务或发送消息通知,从而实现业务逻辑的自动化处理。
- 数据审计和监控:通过订阅Binlog,可以实时监控数据的更改情况,从而实现数据审计的需求。例如,可以记录用户对数据库的修改操作,以便在需要时进行追溯和审查。
二、binlog配置
- 1.查看是否开启binlog
#查看是否开启binlog功能 ,OFF表示没开启,ON表示开启
show variables like 'log_bin'
- 2.开启binlog
1.编辑配置文件
vi /etc/my.cnf
2.添加内容
log-bin=mysql-bin #binlog文件名
binlog_format=ROW #选择row模式
server_id=1 #mysql实例id,不能重复,唯一
3.重启数据库:
- 3.创建用户账号,并授权该用户拥有功能权限
#创建 test 的用户,密码为 123456,% 表示任意地址都可远程登录。
CREATE USER 'test'@'%' IDENTIFIED BY '123456';
#给test 用户授权同步复制等的权限(REPLICATION SLAVE)
GRANT REPLICATION SLAVE ON *.* TO 'test'@'%';
#刷新
FLUSH PRIVILEGES;
三:代码实现
- 1.mvn依赖
<dependency>
<groupId>com.github.shyiko</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.21.0</version>
</dependency>
- 2.定义事件类
@Data
public class BinlogEvent<T> {
/**
* 操作类型(INSERT,UPDATE,DELETE)
*/
private String operationType;
/**
* 来源 表
*/
private String table;
/**
* 来源 库
*/
private String database;
/**
* 新数据
*/
private T data;
/**
* 原始数据(针对update事件)
*/
private T originalData;
}
- 3.定义列数据信息
这里对应的是数据库表中列的信息
@Data
public class ColumnMetadata {
//列名称
private String columnName;
//列类型
private String dataType;
//字符集
private String characterSetName;
}
- 4.定义jdbc工具
这里是获取库表的列详情信息,这里可以优化成使用数据源,提高性能。
public class JdbcUtil {
private static Map<String, List<ColumnMetadata>> columnMetadataMap = new HashMap<>();
public static List<ColumnMetadata> getColumns(String database, String table) {
String tableSchema = String.format("%s.%s", database, table);
List<ColumnMetadata> columns = columnMetadataMap.get(tableSchema);
try {
if (columns == null) {
Class.forName("org.mariadb.jdbc.Driver");
Connection connection = DriverManager.getConnection("jdbc:mariadb://192.168.180.128.7:3306/dsmp_center?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8&autoReconnect=true&rewriteBatchedStatements=true&allowPublicKeyRetrieval=true",
"root", "123456");
//查询表中列信息
PreparedStatement statement = connection.prepareStatement(
"select COLUMN_NAME, DATA_TYPE, CHARACTER_SET_NAME from INFORMATION_SCHEMA.COLUMNS " +
"where TABLE_SCHEMA=? and TABLE_NAME=? order by ORDINAL_POSITION asc;");
statement.setString(1, database);
statement.setString(2, table);
ResultSet resultSet = statement.executeQuery();
List<ColumnMetadata> columnMetadataList = new ArrayList<>();
while (resultSet.next()) {
ColumnMetadata column = new ColumnMetadata();
column.setColumnName(resultSet.getString("COLUMN_NAME"));
column.setDataType(resultSet.getString("DATA_TYPE"));
column.setCharacterSetName(resultSet.getString("CHARACTER_SET_NAME"));
columnMetadataList.add(column);
}
columns = columnMetadataList;
columnMetadataMap.put(tableSchema, columnMetadataList);
}
return columns;
} catch (SQLException e) {
} catch (Exception e) {
}
return columns;
}
}
5.定义转换成事件实体工具
这个工具类的作用是将binlog监听到表中的数据变更,进行与表的列一一对应。
public class ConverEntity {
@SneakyThrows
public static Object toEntity(List<ColumnMetadata> columns, Serializable[] data) {
Map<String, Object> obj = new HashMap<>(columns.size());
for (int i = 0; i < data.length; i++) {
ColumnMetadata column = columns.get(i);
Serializable fieldValue = data[i];
if (fieldValue instanceof Date) {
if (fieldValue != null) {
data[i] = new Date(((Date) fieldValue).getTime() + 0);
}
} else if (fieldValue instanceof byte[]) {
if (fieldValue != null) {
data[i] = new String((byte[]) fieldValue, StandardCharsets.UTF_8);
}
} else if (fieldValue instanceof BitSet) {
if (fieldValue != null) {
data[i] = !((BitSet) fieldValue).isEmpty();
}
}
obj.put(column.getColumnName(), data[i]);
}
return obj;
}
}
6.核心代码
@Slf4j
@Component
@RequiredArgsConstructor
public class BinlogClient implements CommandLineRunner {
private final Map<Long, TableMapEventData> tableMap = new HashMap<>();
private final Map<String, List<ColumnMetadata>> columnMetadataMap = new HashMap<>();
@Override
public void run(String... args) throws Exception {
log.info("启动 Binlog 客户端 - 连接服务端");
//这里对应上面创建的用户test/123456
BinaryLogClient client = new BinaryLogClient("192.168.160.128", 3306, "test", "123456");
// #mysql实例id
client.setServerId(1l);
client.registerEventListener(new BinaryLogClient.EventListener() {
@Override
public void onEvent(Event event) {
EventHeaderV4 headerV4 = event.getHeader();
EventType eventType = headerV4.getEventType();
//每张表有一个表id,对应事件信息(表名,库名等其他信息)
if (eventType == EventType.TABLE_MAP) {
TableMapEventData eventData = event.getData();
tableMap.put(eventData.getTableId(), eventData);
} else {
//insert,update,delete事件
if (EventType.isRowMutation(eventType)) {
EventData eventData = event.getData();
//update
if (eventData instanceof UpdateRowsEventData) {
UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) eventData;
long tableId = updateRowsEventData.getTableId();
List<Map.Entry<Serializable[], Serializable[]>> rows = updateRowsEventData.getRows();
TableMapEventData tableMapEventData = tableMap.get(tableId);
if (tableMapEventData != null) {
String database = tableMapEventData.getDatabase();
String table = tableMapEventData.getTable();
if (EventType.isUpdate(eventType)) {
update(database, table, rows);
}
}
}
//insert
if (eventData instanceof WriteRowsEventData) {
WriteRowsEventData writeRowsEventData = (WriteRowsEventData) eventData;
long tableId = writeRowsEventData.getTableId();
List<Serializable[]> rows = writeRowsEventData.getRows();
TableMapEventData tableMapEventData = tableMap.get(tableId);
if (tableMapEventData != null) {
String database = tableMapEventData.getDatabase();
String table = tableMapEventData.getTable();
if (EventType.isWrite(eventType)) {
insert(database, table, rows);
}
}
}
//delete
if (eventData instanceof DeleteRowsEventData) {
DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) eventData;
long tableId = deleteRowsEventData.getTableId();
List<Serializable[]> rows = deleteRowsEventData.getRows();
TableMapEventData tableMapEventData = tableMap.get(tableId);
if (tableMapEventData != null) {
String database = tableMapEventData.getDatabase();
String table = tableMapEventData.getTable();
if (EventType.isDelete(eventType)) {
delete(database, table, rows);
}
}
}
}
}
}
});
//连接
client.connect();
}
//insert操作
public void insert(String databaseName, String tableName, List<Serializable[]> data) {
List<ColumnMetadata> columns = JdbcUtil.getColumns(databaseName, tableName);
BinlogEvent binlogEvent = buildBinlogEvent(databaseName, tableName, "insert");
data.forEach(row -> {
binlogEvent.setData(ConverEntity.toEntity(columns, row));
});
log.info("insert操作:库名:{},表名:{},数据:{}", databaseName, tableName, binlogEvent);
}
//update操作
public void update(String databaseName, String tableName, List<Map.Entry<Serializable[], Serializable[]>> data) {
List<ColumnMetadata> columns = JdbcUtil.getColumns(databaseName, tableName);
BinlogEvent binlogEvent = buildBinlogEvent(databaseName, tableName, "update");
data.forEach(row -> {
binlogEvent.setData(ConverEntity.toEntity(columns, row.getValue()));
binlogEvent.setOriginalData(ConverEntity.toEntity(columns, row.getKey()));
});
log.info("库名:{},表名:{},数据:{}", databaseName, tableName, binlogEvent);
}
//delete操作
public void delete(String databaseName, String tableName, List<Serializable[]> data) {
List<ColumnMetadata> columns = JdbcUtil.getColumns(databaseName, tableName);
BinlogEvent binlogEvent = buildBinlogEvent(databaseName, tableName, "delete");
data.forEach(row -> {
binlogEvent.setData(ConverEntity.toEntity(columns, row));
});
log.info("库名:{},表名:{},数据:{}", databaseName, tableName, binlogEvent);
}
//构建BinlogEvent
private BinlogEvent buildBinlogEvent(String databaseName, String tableName, String operationType) {
BinlogEvent binlogEvent = new BinlogEvent<>();
binlogEvent.setDatabase(databaseName);
binlogEvent.setTable(tableName);
binlogEvent.setOperationType(operationType);
return binlogEvent;
}
}
总结:
需要注意的是,虽然实时订阅Binlog带来了很多便利和优势,但也存在一些挑战和限制。例如,订阅Binlog会增加系统的复杂度和资源消耗;在极端情况下,可能会出现数据丢失或不一致的问题;此外,还需要合理管理Redis和数据库的连接以及消费者数量等。因此,在实际应用中需要根据具体场景和需求进行权衡和选择。
猜你喜欢
- 2024-12-18 吊打 ThreadLocal,谈谈FastThreadLocal为啥能这么快?
- 2024-12-18 分布式锁中的王者方案 - Redisson
- 2024-12-18 你管这玩意儿叫高并发? 什么叫高并发
- 2024-12-18 系统数据实时同步方案一落地 系统间数据同步解决方案
- 2024-12-18 分布式锁工具:Redisson 分布式锁 redis zookeeper
- 2024-12-18 Spring Cloud Circuit Breaker快速入门Demo
- 2024-12-18 BitMap是啥?脑袋一下空白? bitmap文件头
- 2024-12-18 一亿个8位数字,用什么排序方法 一亿个8位数字,用什么排序方法最好
- 2024-12-18 Java基础-数据类型和数据结构,初阶小白看过来~
- 2024-12-18 10张图带你搞定高并发之网络IO模型
- 最近发表
- 标签列表
-
- gitpush (61)
- pythonif (68)
- location.href (57)
- tail-f (57)
- pythonifelse (59)
- deletesql (62)
- c++模板 (62)
- css3动画 (57)
- c#event (59)
- linuxgzip (68)
- 字符串连接 (73)
- nginx配置文件详解 (61)
- html标签 (69)
- c++初始化列表 (64)
- exec命令 (59)
- canvasfilltext (58)
- mysqlinnodbmyisam区别 (63)
- arraylistadd (66)
- node教程 (59)
- console.table (62)
- c++time_t (58)
- phpcookie (58)
- mysqldatesub函数 (63)
- window10java环境变量设置 (66)
- c++虚函数和纯虚函数的区别 (66)