好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Hudi Java Client总结之读取Hive写Hudi代码

前言

Hudi 除了支持Spark、Fink写Hudi外,还支持 Java 客户端。本文总结Hudi Java Client如何使用,主要为代码示例,可以实现读取 Hive 表写Hudi表。当然也支持读取其他数据源,比如mysql,实现读取mysql的历史数据和增量数据写Hudi。

版本

Hudi 0.12.0

功能支持

支持insert/upsert/delete,暂不支持bulkInsert目前仅支持COW表支持完整的写Hudi操作,包括rollback、clean、archive等。

代码

完整代码已上传GitHub:https://github.com/dongkelun/hudi-demo/tree/master/java-client。

其中HoodieJavaWriteClientExample是从Hudi源码里拷贝的,包含了insert/upsert/delte/的代码示例,JavaClientHive2Hudi是我自己的写的代码示例总结,实现了kerberos认证、读取Hive表Schema作为写hudi的Schema、读取Hive表数据写hudi表,并同步hudi元数据至hive元数据,实现自动创建Hive元数据,当然也支持读取其他数据源,比如mysql,实现历史和增量写。

相比于HoodieJavaWriteClientExample,JavaClientHive2Hudi加了很多配置参数,更贴近实际使用,比如HoodieJavaWriteClientExample的payload为HoodieAvroPayload这只能作为示例使用,JavaClientHive2Hudi使用的为DefaultHoodieRecordPayload它支持预合并和历史值比较,关于这一点可以参考我之前写的文章:Hudi preCombinedField 总结(二)-源码分析,如果只需要预合并功能,可以使用OverwriteWithLatestAvroPayload,这俩分别是Spark SQL 和 Spark DF的默认值,当然都不需要的话,也支持HoodieAvroPayload,代码里是根据条件判断需要用哪个payloadClassName。

String payloadClassName  =  shouldOrdering  ?  DefaultHoodieRecordPayload .class  .getName  (  )   :  shouldCombine  ?  OverwriteWithLatestAvroPayload .class  .getName  (  )   :  HoodieAvroPayload .class  .getName  (  )  ; 

然后利用反射构造payload,其实这里反射的逻辑就是Hudi Spark源码里的逻辑。

另一个它更贴近实际使用的原因就是我们项目上就是将Hudi Java Client封装成了一个NIFI processor,然后用NIFI调度,其性能和稳定性都能够满足项目需求,这里的核心逻辑和实际项目中的逻辑是差不多的。关于我们使用Java客户端的原因是由于历史原因造成的,因为我们之前还没有调度Spark、Flink的开发工具(之前用的NIFI),而开发一个新的开发工具的话是需要时间成本的,所以选择了Java客户端,我们现在已经将Apache DolphinScheduler作为自己的开发调度工具了,后面会主要使用Spark/Flink,所以现在总结一下Hudi Java Client的使用以及源码,避免遗忘,也希望对大家有所帮助。

初始化Hudi表

Java Client的代码更贴近源码。

initTable主要是根据一些配置信息,生成.hoodie元数据路径,并生成hoodie.properties元数据文件,该文件里持久化保存了Hudi的一些配置信息。

if  (  !  ( fs .exists  ( path )   &&  fs .exists  ( hoodiePath )  )  )   {   // 根据Hudi路径存不存在,判断Hudi表需不需要初始化
                if  ( Arrays .asList  ( INSERT_OPERATION ,  UPSERT_OPERATION )  .contains  ( writeOperationType )  )   {  HoodieTableMetaClient .withPropertyBuilder  (  )   .setTableType  ( TABLE_TYPE )   .setTableName  ( targetTable )   .setPayloadClassName  ( payloadClassName )   .setRecordKeyFields  ( recordKeyFields )   .setPreCombineField  ( preCombineField )   .setPartitionFields  ( partitionFields )   .setBootstrapIndexClass  ( NoOpBootstrapIndex .class  .getName  (  )  )   .initTable  ( hadoopConf ,  tablePath )  ;   }  else if  ( writeOperationType .equals  ( DELETE_OPERATION )  )   {   //  Delete 操作,Hudi表必须已经存在
                    throw new TableNotFoundException ( tablePath )  ;   }   } 

hoodie.properties

#Properties saved  on   2022  -  10  -  24 T07 :  40  :  36.530 Z
#Mon Oct  24   15  :  40  :  36  CST  2022  hoodie .table  .name  = test_hudi_target
hoodie .archivelog  .folder  = archived
hoodie .table  .type  = COPY_ON_WRITE
hoodie .table  .version  =  5  hoodie .timeline  .layout  .version  =  1  hoodie .datasource  .write  .drop  .partition  .columns  =  false  hoodie .table  .checksum  =  1749434190 

创建HoodieJavaWriteClient

首先要创建HoodieWriteConfig,主要是hudi的一些配置,比如Schema、表名、payload、索引、clean等一些参数,具体可以自己去了解。

HoodieWriteConfig cfg  =  HoodieWriteConfig .newBuilder  (  )  .withPath  ( tablePath )   .withSchema  ( writeSchema .toString  (  )  )   .withParallelism  (  2  ,   2  )  .withDeleteParallelism  (  2  )   .forTable  ( targetTable )   .withWritePayLoad  ( payloadClassName )   .withPayloadConfig  ( HoodiePayloadConfig .newBuilder  (  )  .withPayloadOrderingField  ( orderingField )  .build  (  )  )   .withIndexConfig  ( HoodieIndexConfig .newBuilder  (  )   .withIndexType  ( HoodieIndex .IndexType  .BLOOM  )   //   .bloomIndexPruneByRanges  (  false  )   //   1000 万总体时间提升1分钟  .bloomFilterFPP  (  0.000001  )   //   1000 万总体时间提升3分钟  .fromProperties  ( indexProperties )   .build  (  )  )   .withCompactionConfig  ( HoodieCompactionConfig .newBuilder  (  )   .compactionSmallFileSize  ( smallFileLimit )   .approxRecordSize  ( recordSizeEstimate )  .build  (  )  )   .withArchivalConfig  ( HoodieArchivalConfig .newBuilder  (  )  .archiveCommitsWith  (  150  ,   200  )  .build  (  )  )   .withCleanConfig  ( HoodieCleanConfig .newBuilder  (  )  .retainCommits  (  100  )  .build  (  )  )   .withStorageConfig  ( HoodieStorageConfig .newBuilder  (  )   .parquetMaxFileSize  ( maxFileSize )  .build  (  )  )   .build  (  )  ;  writeClient  =  new HoodieJavaWriteClient <>  ( new HoodieJavaEngineContext ( hadoopConf )  ,  cfg ) 

startCommit

返回commitTime,首先会执行rollback,然后创建一个.commit.request,再将commitTime返回。

String newCommitTime  =  writeClient .startCommit  (  )  ; 

generateRecord

这里主要是构造写hudi需要的数据结构,包含HoodieKey和payLoad,其中delete操作只需要HoodieKey。

public static List < HoodieRecord < HoodieRecordPayload >>  generateRecord ( ResultSet rs ,  Schema writeSchema ,  String payloadClassName ,   boolean  shouldCombine )  throws IOException ,  SQLException  {  List < HoodieRecord < HoodieRecordPayload >>  list  =  new ArrayList <>  (  )  ;  while  ( rs .next  (  )  )   {  GenericRecord rec  =  new GenericData .Record  ( writeSchema )  ;  writeSchema .getFields  (  )  .forEach  ( field  ->   {  try  {  rec .put  ( field .name  (  )  ,  convertValueType ( rs ,  field .name  (  )  ,  field .schema  (  )  .getType  (  )  )  )  ;   }  catch  ( SQLException e )   {  throw new RuntimeException ( e )  ;   }   }  )  ;  String partitionPath  =  partitionFields  ==   null   ?   ""   :  getRecordPartitionPath ( rs ,  writeSchema )  ;  System .out  .println  ( partitionPath )  ;  String rowKey  =  recordKeyFields  ==   null   &&  writeOperationType .equals  ( INSERT_OPERATION )   ?  UUID .randomUUID  (  )  .toString  (  )   :  getRecordKey ( rs ,  writeSchema )  ;  HoodieKey key  =  new HoodieKey ( rowKey ,  partitionPath )  ;  if  ( shouldCombine )   {  Object orderingVal  =  HoodieAvroUtils .getNestedFieldVal  ( rec ,  preCombineField ,   false  ,   false  )  ;  list .add  ( new HoodieAvroRecord <>  ( key ,  createPayload ( payloadClassName ,  rec ,   ( Comparable )  orderingVal )  )  )  ;   }  else  {  list .add  ( new HoodieAvroRecord <>  ( key ,  createPayload ( payloadClassName ,  rec )  )  )  ;   }   }  return list ;   } 

写Hudi

最后执行写Hudi的操作,常用upsert/insert/delete,Java Client也是默认开启clean等操作的,具体的实现是在HoodieJavaCopyOnWriteTable中。目前还不支持bulkInsert等操作,后面如果我有能力的话,会尝试提交PR支持。

writeClient .upsert  ( records ,  newCommitTime )  ;  writeClient .insert  ( records ,  newCommitTime )  ;  writeClient .delete  ( records ,  newCommitTime )  ; 

同步Hive

最后是同步元数据至Hive,实现在hive中建表,这一步是可选的。这样可以利用Hive SQL和Spark SQL查询Hudi表。

 /**    * 利用HiveSyncTool同步Hive元数据    * Spark写Hudi同步hive元数据的源码就是这样同步的    *    * @param properties    * @param hiveConf    */  public static void syncHive ( TypedProperties properties ,  HiveConf hiveConf )   {  HiveSyncTool hiveSyncTool  =  new HiveSyncTool ( properties ,  hiveConf )  ;  hiveSyncTool .syncHoodieTable  (  )  ;   }  public static HiveConf getHiveConf ( String hiveSitePath ,  String coreSitePath ,  String hdfsSitePath )   {  HiveConf configuration  =  new HiveConf (  )  ;  configuration .addResource  ( new Path ( hiveSitePath )  )  ;  configuration .addResource  ( new Path ( coreSitePath )  )  ;  configuration .addResource  ( new Path ( hdfsSitePath )  )  ;  return configuration ;   }   /**    * 同步Hive元数据的一些属性配置    * @param basePath    * @return    */  public static TypedProperties getHiveSyncProperties ( String basePath )   {  TypedProperties properties  =  new TypedProperties (  )  ;  properties .put  ( HiveSyncConfigHolder .HIVE_SYNC_MODE  .key  (  )  ,  HiveSyncMode .HMS  .name  (  )  )  ;  properties .put  ( HiveSyncConfigHolder .HIVE_CREATE_MANAGED_TABLE  .key  (  )  ,   true  )  ;  properties .put  ( HoodieSyncConfig .META_SYNC_DATABASE_NAME  .key  (  )  ,  dbName )  ;  properties .put  ( HoodieSyncConfig .META_SYNC_TABLE_NAME  .key  (  )  ,  targetTable )  ;  properties .put  ( HoodieSyncConfig .META_SYNC_BASE_PATH  .key  (  )  ,  basePath )  ;  properties .put  ( HoodieSyncConfig .META_SYNC_PARTITION_EXTRACTOR_CLASS  .key  (  )  ,  MultiPartKeysValueExtractor .class  .getName  (  )  )  ;  properties .put  ( HoodieSyncConfig .META_SYNC_PARTITION_FIELDS  .key  (  )  ,  partitionFields )  ;  if  ( partitionFields  !=   null   &&   ! partitionFields .isEmpty  (  )  )   {  properties .put  ( HoodieSyncConfig .META_SYNC_PARTITION_FIELDS  .key  (  )  ,  partitionFields )  ;   }  return properties ;   } 

与0.9.0版本差异

之前是基于0.9.0版本开发的,本文代码示例基于0.12.0,核心代码是一样的,差异的地方有两处。

1、0.9.0 clean、archive的参数都是在withCompactionConfig中,现在单独拎出来2、0.9.0 HiveSyncTool的参数为HiveSyncConfig,现在为TypedProperties。

总结

Hudi Java Client和Spark、Flink一样都可以实现完整的写Hudi的逻辑,但是目前功能支持还不完善,比如不支持MOR表,而且性能上也不如Spark、Flink,毕竟Spark、FLink都是集群,但是Hudi Java Client可以集成到其他框架中,比如NIFI,集成起来比较方便,集成到NIFI的好处是,可以通过拖来拽配置参数的形式完成历史数据和增量数据写入Hudi。也可以自己实现多线程,提升性能,我们目前测试的性能是Insert可以达到10000条/s,而upsert因为需要读取索引,还有历史数据的更新,可能需要重写整个表,所以当历史数据比较大且更新占比比较高时,单线程的性能会非常差,但是我们基于源码改造,将布隆索引和写数据的部分改为多线程后,性能就会提升很多,当然这也取决于机器的性能,和CPU、内存有关。对于数据量不是很大的ZF数据,一般大表几十亿,性能还是可以满足要求的。

原文地址:https://mp.weixin.qq.com/s/5raMUByVGDyVYMeAdW-LRw

查看更多关于Hudi Java Client总结之读取Hive写Hudi代码的详细内容...

  阅读:22次