前言
Hudi 除了支持Spark、Fink写Hudi外,还支持 Java 客户端。本文总结Hudi Java Client如何使用,主要为代码示例,可以实现读取 Hive 表写Hudi表。当然也支持读取其他数据源,比如mysql,实现读取mysql的历史数据和增量数据写Hudi。
版本Hudi 0.12.0
功能支持支持insert/upsert/delete,暂不支持bulkInsert目前仅支持COW表支持完整的写Hudi操作,包括rollback、clean、archive等。
代码完整代码已上传GitHub:https://github.com/dongkelun/hudi-demo/tree/master/java-client。
其中HoodieJavaWriteClientExample是从Hudi源码里拷贝的,包含了insert/upsert/delte/的代码示例,JavaClientHive2Hudi是我自己的写的代码示例总结,实现了kerberos认证、读取Hive表Schema作为写hudi的Schema、读取Hive表数据写hudi表,并同步hudi元数据至hive元数据,实现自动创建Hive元数据,当然也支持读取其他数据源,比如mysql,实现历史和增量写。
相比于HoodieJavaWriteClientExample,JavaClientHive2Hudi加了很多配置参数,更贴近实际使用,比如HoodieJavaWriteClientExample的payload为HoodieAvroPayload这只能作为示例使用,JavaClientHive2Hudi使用的为DefaultHoodieRecordPayload它支持预合并和历史值比较,关于这一点可以参考我之前写的文章:Hudi preCombinedField 总结(二)-源码分析,如果只需要预合并功能,可以使用OverwriteWithLatestAvroPayload,这俩分别是Spark SQL 和 Spark DF的默认值,当然都不需要的话,也支持HoodieAvroPayload,代码里是根据条件判断需要用哪个payloadClassName。
String payloadClassName = shouldOrdering ? DefaultHoodieRecordPayload .class .getName ( ) : shouldCombine ? OverwriteWithLatestAvroPayload .class .getName ( ) : HoodieAvroPayload .class .getName ( ) ;
然后利用反射构造payload,其实这里反射的逻辑就是Hudi Spark源码里的逻辑。
另一个它更贴近实际使用的原因就是我们项目上就是将Hudi Java Client封装成了一个NIFI processor,然后用NIFI调度,其性能和稳定性都能够满足项目需求,这里的核心逻辑和实际项目中的逻辑是差不多的。关于我们使用Java客户端的原因是由于历史原因造成的,因为我们之前还没有调度Spark、Flink的开发工具(之前用的NIFI),而开发一个新的开发工具的话是需要时间成本的,所以选择了Java客户端,我们现在已经将Apache DolphinScheduler作为自己的开发调度工具了,后面会主要使用Spark/Flink,所以现在总结一下Hudi Java Client的使用以及源码,避免遗忘,也希望对大家有所帮助。
初始化Hudi表
Java Client的代码更贴近源码。
initTable主要是根据一些配置信息,生成.hoodie元数据路径,并生成hoodie.properties元数据文件,该文件里持久化保存了Hudi的一些配置信息。
if ( ! ( fs .exists ( path ) && fs .exists ( hoodiePath ) ) ) { // 根据Hudi路径存不存在,判断Hudi表需不需要初始化 if ( Arrays .asList ( INSERT_OPERATION , UPSERT_OPERATION ) .contains ( writeOperationType ) ) { HoodieTableMetaClient .withPropertyBuilder ( ) .setTableType ( TABLE_TYPE ) .setTableName ( targetTable ) .setPayloadClassName ( payloadClassName ) .setRecordKeyFields ( recordKeyFields ) .setPreCombineField ( preCombineField ) .setPartitionFields ( partitionFields ) .setBootstrapIndexClass ( NoOpBootstrapIndex .class .getName ( ) ) .initTable ( hadoopConf , tablePath ) ; } else if ( writeOperationType .equals ( DELETE_OPERATION ) ) { // Delete 操作,Hudi表必须已经存在 throw new TableNotFoundException ( tablePath ) ; } }
hoodie.properties
#Properties saved on 2022 - 10 - 24 T07 : 40 : 36.530 Z #Mon Oct 24 15 : 40 : 36 CST 2022 hoodie .table .name = test_hudi_target hoodie .archivelog .folder = archived hoodie .table .type = COPY_ON_WRITE hoodie .table .version = 5 hoodie .timeline .layout .version = 1 hoodie .datasource .write .drop .partition .columns = false hoodie .table .checksum = 1749434190
创建HoodieJavaWriteClient
首先要创建HoodieWriteConfig,主要是hudi的一些配置,比如Schema、表名、payload、索引、clean等一些参数,具体可以自己去了解。
HoodieWriteConfig cfg = HoodieWriteConfig .newBuilder ( ) .withPath ( tablePath ) .withSchema ( writeSchema .toString ( ) ) .withParallelism ( 2 , 2 ) .withDeleteParallelism ( 2 ) .forTable ( targetTable ) .withWritePayLoad ( payloadClassName ) .withPayloadConfig ( HoodiePayloadConfig .newBuilder ( ) .withPayloadOrderingField ( orderingField ) .build ( ) ) .withIndexConfig ( HoodieIndexConfig .newBuilder ( ) .withIndexType ( HoodieIndex .IndexType .BLOOM ) // .bloomIndexPruneByRanges ( false ) // 1000 万总体时间提升1分钟 .bloomFilterFPP ( 0.000001 ) // 1000 万总体时间提升3分钟 .fromProperties ( indexProperties ) .build ( ) ) .withCompactionConfig ( HoodieCompactionConfig .newBuilder ( ) .compactionSmallFileSize ( smallFileLimit ) .approxRecordSize ( recordSizeEstimate ) .build ( ) ) .withArchivalConfig ( HoodieArchivalConfig .newBuilder ( ) .archiveCommitsWith ( 150 , 200 ) .build ( ) ) .withCleanConfig ( HoodieCleanConfig .newBuilder ( ) .retainCommits ( 100 ) .build ( ) ) .withStorageConfig ( HoodieStorageConfig .newBuilder ( ) .parquetMaxFileSize ( maxFileSize ) .build ( ) ) .build ( ) ; writeClient = new HoodieJavaWriteClient <> ( new HoodieJavaEngineContext ( hadoopConf ) , cfg )
startCommit
返回commitTime,首先会执行rollback,然后创建一个.commit.request,再将commitTime返回。
String newCommitTime = writeClient .startCommit ( ) ;
generateRecord
这里主要是构造写hudi需要的数据结构,包含HoodieKey和payLoad,其中delete操作只需要HoodieKey。
public static List < HoodieRecord < HoodieRecordPayload >> generateRecord ( ResultSet rs , Schema writeSchema , String payloadClassName , boolean shouldCombine ) throws IOException , SQLException { List < HoodieRecord < HoodieRecordPayload >> list = new ArrayList <> ( ) ; while ( rs .next ( ) ) { GenericRecord rec = new GenericData .Record ( writeSchema ) ; writeSchema .getFields ( ) .forEach ( field -> { try { rec .put ( field .name ( ) , convertValueType ( rs , field .name ( ) , field .schema ( ) .getType ( ) ) ) ; } catch ( SQLException e ) { throw new RuntimeException ( e ) ; } } ) ; String partitionPath = partitionFields == null ? "" : getRecordPartitionPath ( rs , writeSchema ) ; System .out .println ( partitionPath ) ; String rowKey = recordKeyFields == null && writeOperationType .equals ( INSERT_OPERATION ) ? UUID .randomUUID ( ) .toString ( ) : getRecordKey ( rs , writeSchema ) ; HoodieKey key = new HoodieKey ( rowKey , partitionPath ) ; if ( shouldCombine ) { Object orderingVal = HoodieAvroUtils .getNestedFieldVal ( rec , preCombineField , false , false ) ; list .add ( new HoodieAvroRecord <> ( key , createPayload ( payloadClassName , rec , ( Comparable ) orderingVal ) ) ) ; } else { list .add ( new HoodieAvroRecord <> ( key , createPayload ( payloadClassName , rec ) ) ) ; } } return list ; }
写Hudi
最后执行写Hudi的操作,常用upsert/insert/delete,Java Client也是默认开启clean等操作的,具体的实现是在HoodieJavaCopyOnWriteTable中。目前还不支持bulkInsert等操作,后面如果我有能力的话,会尝试提交PR支持。
writeClient .upsert ( records , newCommitTime ) ; writeClient .insert ( records , newCommitTime ) ; writeClient .delete ( records , newCommitTime ) ;
同步Hive
最后是同步元数据至Hive,实现在hive中建表,这一步是可选的。这样可以利用Hive SQL和Spark SQL查询Hudi表。
/** * 利用HiveSyncTool同步Hive元数据 * Spark写Hudi同步hive元数据的源码就是这样同步的 * * @param properties * @param hiveConf */ public static void syncHive ( TypedProperties properties , HiveConf hiveConf ) { HiveSyncTool hiveSyncTool = new HiveSyncTool ( properties , hiveConf ) ; hiveSyncTool .syncHoodieTable ( ) ; } public static HiveConf getHiveConf ( String hiveSitePath , String coreSitePath , String hdfsSitePath ) { HiveConf configuration = new HiveConf ( ) ; configuration .addResource ( new Path ( hiveSitePath ) ) ; configuration .addResource ( new Path ( coreSitePath ) ) ; configuration .addResource ( new Path ( hdfsSitePath ) ) ; return configuration ; } /** * 同步Hive元数据的一些属性配置 * @param basePath * @return */ public static TypedProperties getHiveSyncProperties ( String basePath ) { TypedProperties properties = new TypedProperties ( ) ; properties .put ( HiveSyncConfigHolder .HIVE_SYNC_MODE .key ( ) , HiveSyncMode .HMS .name ( ) ) ; properties .put ( HiveSyncConfigHolder .HIVE_CREATE_MANAGED_TABLE .key ( ) , true ) ; properties .put ( HoodieSyncConfig .META_SYNC_DATABASE_NAME .key ( ) , dbName ) ; properties .put ( HoodieSyncConfig .META_SYNC_TABLE_NAME .key ( ) , targetTable ) ; properties .put ( HoodieSyncConfig .META_SYNC_BASE_PATH .key ( ) , basePath ) ; properties .put ( HoodieSyncConfig .META_SYNC_PARTITION_EXTRACTOR_CLASS .key ( ) , MultiPartKeysValueExtractor .class .getName ( ) ) ; properties .put ( HoodieSyncConfig .META_SYNC_PARTITION_FIELDS .key ( ) , partitionFields ) ; if ( partitionFields != null && ! partitionFields .isEmpty ( ) ) { properties .put ( HoodieSyncConfig .META_SYNC_PARTITION_FIELDS .key ( ) , partitionFields ) ; } return properties ; }
与0.9.0版本差异
之前是基于0.9.0版本开发的,本文代码示例基于0.12.0,核心代码是一样的,差异的地方有两处。
1、0.9.0 clean、archive的参数都是在withCompactionConfig中,现在单独拎出来2、0.9.0 HiveSyncTool的参数为HiveSyncConfig,现在为TypedProperties。
总结Hudi Java Client和Spark、Flink一样都可以实现完整的写Hudi的逻辑,但是目前功能支持还不完善,比如不支持MOR表,而且性能上也不如Spark、Flink,毕竟Spark、FLink都是集群,但是Hudi Java Client可以集成到其他框架中,比如NIFI,集成起来比较方便,集成到NIFI的好处是,可以通过拖来拽配置参数的形式完成历史数据和增量数据写入Hudi。也可以自己实现多线程,提升性能,我们目前测试的性能是Insert可以达到10000条/s,而upsert因为需要读取索引,还有历史数据的更新,可能需要重写整个表,所以当历史数据比较大且更新占比比较高时,单线程的性能会非常差,但是我们基于源码改造,将布隆索引和写数据的部分改为多线程后,性能就会提升很多,当然这也取决于机器的性能,和CPU、内存有关。对于数据量不是很大的ZF数据,一般大表几十亿,性能还是可以满足要求的。
原文地址:https://mp.weixin.qq.com/s/5raMUByVGDyVYMeAdW-LRw
查看更多关于Hudi Java Client总结之读取Hive写Hudi代码的详细内容...