网站首页 > 基础教程 正文
1. 介绍
前面分析了基于过滤器的索引,接着分析基于外部存储系统的索引实现:HBaseIndex。对于想自定义实现Index具有一定的借鉴作用。
2. 分析
HBaseIndex也是HoodieIndex的子类实现,其实现了父类的两个核心方法。
// 给输入记录RDD打位置标签
public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc, HoodieTable<T> hoodieTable);
// 更新位置信息
public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc, HoodieTable<T> hoodieTable);
在写入数据过程中,会调用tagLocation给输入记录打位置标签,其核心代码如下
public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc,
HoodieTable<T> hoodieTable) {
return recordRDD.mapPartitionsWithIndex(locationTagFunction(hoodieTable.getMetaClient()), true);
}
可以看到该方法主要使用了locationTagFunctionFunction来处理原始记录,其核心代码如下
private Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<HoodieRecord<T>>> locationTagFunction(
HoodieTableMetaClient metaClient) {
return (Function2<Integer, Iterator<HoodieRecord<T>>, Iterator<HoodieRecord<T>>>) (partitionNum,
hoodieRecordIterator) -> {
// 每次取的批次大小
int multiGetBatchSize = config.getHbaseIndexGetBatchSize();
// 获取HBase连接
synchronized (HBaseIndex.class) {
if (hbaseConnection == null || hbaseConnection.isClosed()) {
hbaseConnection = getHBaseConnection();
}
}
List<HoodieRecord<T>> taggedRecords = new ArrayList<>();
HTable hTable = null;
try {
// 获取配置的表
hTable = (HTable) hbaseConnection.getTable(TableName.valueOf(tableName));
List<Get> statements = new ArrayList<>();
List<HoodieRecord> currentBatchOfRecords = new LinkedList<>();
// 遍历该分区上的记录
while (hoodieRecordIterator.hasNext()) {
HoodieRecord rec = hoodieRecordIterator.next();
// 根据recordKey生成Get
statements.add(generateStatement(rec.getRecordKey()));
currentBatchOfRecords.add(rec);
// 达到批量大小或者遍历完记录
if (statements.size() >= multiGetBatchSize || !hoodieRecordIterator.hasNext()) {
// 获取结果
Result[] results = doGet(hTable, statements);
// 清空便于GC回收
statements.clear();
for (Result result : results) {
// 移除结果对应的的HoodieRecord
HoodieRecord currentRecord = currentBatchOfRecords.remove(0);
if (result.getRow() != null) {
// 取出key, commit时间,文件ID和分区路径
String keyFromResult = Bytes.toString(result.getRow());
String commitTs = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN));
String fileId = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN));
String partitionPath = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN));
// 检查是否为合法的提交(包含在timeline或者小于最新的一次commit)
if (checkIfValidCommit(metaClient, commitTs)) {
// 重新生成HoodieRecord
currentRecord = new HoodieRecord(new HoodieKey(currentRecord.getRecordKey(), partitionPath),
currentRecord.getData());
currentRecord.unseal();
// 设置位置信息
currentRecord.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId));
currentRecord.seal();
taggedRecords.add(currentRecord);
// the key from Result and the key being processed should be same
assert (currentRecord.getRecordKey().contentEquals(keyFromResult));
} else { // 非法提交,也标记为已打完标签
taggedRecords.add(currentRecord);
}
} else { // 标记为已打完标签
taggedRecords.add(currentRecord);
}
}
}
}
}
return taggedRecords.iterator();
};
}
可以看到从HBase中取位置信息流程非常简单,即遍历指定分区上所有记录,然后批量生成recordKey从HBase索引表(表名自定义配置)取对应的信息,然后生成位置信息。
当写完数据后,需要调用updateLocation更新记录的位置信息,其核心代码如下
public JavaRDD<WriteStatus> updateLocation(JavaRDD<WriteStatus> writeStatusRDD, JavaSparkContext jsc,
HoodieTable<T> hoodieTable) {
// 根据配置(hoodie.index.hbase.qps.allocator.class)生成Allocator
final HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = createQPSResourceAllocator(this.config);
// 根据Allocator进行初始化
setPutBatchSize(writeStatusRDD, hBaseIndexQPSResourceAllocator, jsc);
// 使用Function处理
JavaRDD<WriteStatus> writeStatusJavaRDD = writeStatusRDD.mapPartitionsWithIndex(updateLocationFunction(), true);
// 缓存状态RDD
writeStatusJavaRDD = writeStatusJavaRDD.persist(config.getWriteStatusStorageLevel());
return writeStatusJavaRDD;
}
其中updateLocationFunction核心代码如下
private Function2<Integer, Iterator<WriteStatus>, Iterator<WriteStatus>> updateLocationFunction() {
return (Function2<Integer, Iterator<WriteStatus>, Iterator<WriteStatus>>) (partition, statusIterator) -> {
List<WriteStatus> writeStatusList = new ArrayList<>();
// 获取HBase连接
synchronized (HBaseIndex.class) {
if (hbaseConnection == null || hbaseConnection.isClosed()) {
hbaseConnection = getHBaseConnection();
}
}
try (BufferedMutator mutator = hbaseConnection.getBufferedMutator(TableName.valueOf(tableName))) {
// 遍历状态信息
while (statusIterator.hasNext()) {
WriteStatus writeStatus = statusIterator.next();
List<Mutation> mutations = new ArrayList<>();
try {
for (HoodieRecord rec : writeStatus.getWrittenRecords()) {
if (!writeStatus.isErrored(rec.getKey())) {
// 获取新的位置信息
Option<HoodieRecordLocation> loc = rec.getNewLocation();
if (loc.isPresent()) { // 新的位置信息存在
if (rec.getCurrentLocation() != null) { // 当前位置信息存在
// 表示更新,无需更新
continue;
}
// 根据HoodieRecord信息初始化Put
Put put = new Put(Bytes.toBytes(rec.getRecordKey()));
put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, Bytes.toBytes(loc.get().getInstantTime()));
put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN, Bytes.toBytes(loc.get().getFileId()));
put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN, Bytes.toBytes(rec.getPartitionPath()));
mutations.add(put);
} else { // 新的位置不存在
// 表示删除了该记录
Delete delete = new Delete(Bytes.toBytes(rec.getRecordKey()));
mutations.add(delete);
}
}
if (mutations.size() < multiPutBatchSize) {
continue;
}
// 更新
doMutations(mutator, mutations);
}
// 处理剩余的更新
doMutations(mutator, mutations);
}
writeStatusList.add(writeStatus);
}
}
return writeStatusList.iterator();
};
}
可以看到当写完数据后,会更新位置信息,通过WriteStatus中的HoodieRecord的位置信息判断是否需要更新位置信息,对于更新无需要更新,对于新插入需要更新,对于删除需要删除HBase中存储的信息。
3. 总结
Hudi内置了HBase外置存储系统索引的实现,用户可直接配置HBase索引,将记录索引信息存入HBase,当然用户也可自定义实现其他类型索引。
猜你喜欢
- 2024-11-08 你居然只知道蓝绿发布?今天教你全链路灰度
- 2024-11-08 redis 分布式锁的 5个坑,真是又大又深
- 2024-11-08 Blazor OIDC 单点登录授权实例7 - Blazor hybird app 端授权
- 2024-11-08 Spring Boot利用filter实现xss防御
- 2024-11-08 Spring连环CVE-2015-5211和CVE-2020-5421漏洞升级教程
- 2024-11-08 如何进行权限系统设计,一文吃透 如何设计一个权限系统
- 2024-11-08 基于Spring Boot的注解驱动式公众号极速开发框架FastBootWeixin
- 2024-11-08 教育平台项目前端:项目前后端接口联调,项目上线部署发布
- 2024-11-08 HTTP通讯框架选型HttpClient/OkHttp
- 2024-11-08 微信公众号自动回复功能开发 微信公众号平台自动回复功能
- 最近发表
- 标签列表
-
- gitpush (61)
- pythonif (68)
- location.href (57)
- tail-f (57)
- pythonifelse (59)
- deletesql (62)
- c++模板 (62)
- css3动画 (57)
- c#event (59)
- linuxgzip (68)
- 字符串连接 (73)
- nginx配置文件详解 (61)
- html标签 (69)
- c++初始化列表 (64)
- exec命令 (59)
- canvasfilltext (58)
- mysqlinnodbmyisam区别 (63)
- arraylistadd (66)
- node教程 (59)
- console.table (62)
- c++time_t (58)
- phpcookie (58)
- mysqldatesub函数 (63)
- window10java环境变量设置 (66)
- c++虚函数和纯虚函数的区别 (66)