前言
记录总结自己第一次如何使用Flink SQL读写Hudi并同步Hive,以及遇到的问题及解决过程。
版本Flink 1.14.3Hudi 0.12.0/0.12.1
本文采用Flink yarn-session模式,不会的可以参考之前的文章。
Hudi包下载地址:https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink1.14-bundle/0.12.1/hudi-flink1.14-bundle-0.12.1.jar
如果想同步Hive的话,就不能使用上面下载的包了,必须使用profileflink-bundle-shade-hive自己打包,这里参考官网:https://hudi.apache.org/cn/docs/syncing_metastore,打包命令
## Hive3 mvn clean package - DskipTests - Drat .skip = true - Pflink - bundle - shade - hive3 - Dflink1 .14 - Dscala - 2.12 ## Hive2 mvn clean package - DskipTests - Drat .skip = true - Pflink - bundle - shade - hive2 - Dflink1 .14 - Dscala - 2.12 ## Hive1 mvn clean package - DskipTests - Drat .skip = true - Pflink - bundle - shade - hive1 - Dflink1 .14 - Dscala - 2.12
为了避免不必要的麻烦,最好自己修改一下对应的profile中的Hive版本,比如我们环境的Hive版本为HDP的3.1.0.3.1.0.0-78,我们将hive.version对应的值改为3.1.0.3.1.0.0-78再打包就可以了。
方式1、建在内存中、不同步Hive表这种建表方式,元数据在内存中,退出SQL客户端后,需要重新建表(表数据文件还在)
建表
CREATE TABLE test_hudi_flink1 ( id int PRIMARY KEY NOT ENFORCED , name VARCHAR ( 10 ) , price int , ts int , dt VARCHAR ( 10 ) ) PARTITIONED BY ( dt ) WITH ( 'connector' = 'hudi' , 'path' = '/tmp/hudi/test_hudi_flink1' , 'table.type' = 'MERGE_ON_READ' , 'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator' , 'hoodie.datasource.write.recordkey.field' = 'id' , 'hoodie.datasource.write.hive_style_partitioning' = 'true' ) ;
PRIMARY KEY和hoodie.datasource.write.recordkey.field作用相同,联合主键时,可以单独放在最后 PRIMARY KEY (id1, id2) NOT ENFORCED
CREATE TABLE test_hudi_flink1 ( id1 int , id2 int , name VARCHAR ( 10 ) , price int , ts int , dt VARCHAR ( 10 ) , PRIMARY KEY ( id1 , id2 ) NOT ENFORCED )
Insert
insert into test_hudi_flink1 values ( 1 , 'hudi' , 10 , 100 , '2022-10-31' ) , ( 2 , 'hudi' , 10 , 100 , '2022-10-31' ) ;
至于upadte和delete可以参考官网:https://hudi.apache.org/cn/docs/flink-quick-start-guide
查询
select * from test_hudi_flink1 ;
通过Flink查询出来的结果是没有Hudi的元数据字段的
方式2、建在Hive Catalog中、不同步Hive表
这种建表方式,会在对应的Hive中创建表,好处是,当我们退出SQL客户端后,再重新启动一个新的SQL客户端,我们可以直接使用Hive Catalog中的表,进行读写数据。
建表
CREATE CATALOG hive_catalog WITH ( 'type' = 'hive' , 'default-database' = 'default' , 'hive-conf-dir' = '/usr/hdp/3.1.0.0-78/hive/conf' ) ; use catalog hive_catalog ; use hudi ; CREATE TABLE test_hudi_flink2 ( id int PRIMARY KEY NOT ENFORCED , name VARCHAR ( 10 ) , price int , ts int , dt VARCHAR ( 10 ) ) PARTITIONED BY ( dt ) WITH ( 'connector' = 'hudi' , 'path' = '/tmp/hudi/test_hudi_flink2' , 'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator' , 'hoodie.datasource.write.recordkey.field' = 'id' , 'hoodie.datasource.write.hive_style_partitioning' = 'true' ) ;
Insert
insert into test_hudi_flink2 values ( 1 , 'hudi' , 10 , 100 , '2022-10-31' ) , ( 2 , 'hudi' , 10 , 100 , '2022-10-31' ) ;
查询
select * from test_hudi_flink2 ;
但是同样地也无法查询Hudi的元数据字段,而且在Hive表中查询此表是会有异常的,因为表结构是这样的:
show create table test_hudi_flink2 ; +----------------------------------------------------+ | createtab_stmt | +----------------------------------------------------+ | CREATE TABLE `test_hudi_flink2` ( | | ) | | ROW FORMAT SERDE | | 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' | | STORED AS INPUTFORMAT | | 'org.apache.hadoop.mapred.TextInputFormat' | | OUTPUTFORMAT | | 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' | | LOCATION | | 'hdfs://cluster1/warehouse/tablespace/managed/hive/hudi.db/test_hudi_flink2' | | TBLPROPERTIES ( | | 'flink.connector' = 'hudi' , | | 'flink.hoodie.datasource.write.hive_style_partitioning' = 'true' , | | 'flink.hoodie.datasource.write.recordkey.field' = 'id' , | | 'flink.partition.keys.0.name' = 'dt' , | | 'flink.path' = '/tmp/hudi/test_hudi_flink2' , | | 'flink.schema.0.data-type' = 'INT NOT NULL' , | | 'flink.schema.0.name' = 'id' , | | 'flink.schema.1.data-type' = 'VARCHAR(10)' , | | 'flink.schema.1.name' = 'name' , | | 'flink.schema.2.data-type' = 'INT' , | | 'flink.schema.2.name' = 'price' , | | 'flink.schema.3.data-type' = 'INT' , | | 'flink.schema.3.name' = 'ts' , | | 'flink.schema.4.data-type' = 'VARCHAR(10)' , | | 'flink.schema.4.name' = 'dt' , | | 'flink.schema.primary-key.columns' = 'id' , | | 'flink.schema.primary-key.name' = 'PK_3386' , | | 'transient_lastDdlTime' = '1667129407' ) | +----------------------------------------------------+
select * from test_hudi_flink2 ; Error : Error while compiling statement : FAILED : SemanticException Line 0 : - 1 Invalid column reference 'TOK_ALLCOLREF' ( state = 42000 , code = 40000 )方式3、建在内存中、同步Hive表
这样建表的好处是,可以利用同步到Hive中的表,通过Hive SQL和Spark SQL查询,也可以利用Spark进行insert、update等,但是Flink SQL客户端退出后,不能利用Hive中的表进行写数据,需要再重新建表
MOR表
建表
配置环境变量HIVE_CONF_DIR
export HIVE_CONF_DIR =/ usr / hdp / 3.1 .0 .0 - 78 / hive / conf
CREATE TABLE test_hudi_flink3 ( id int PRIMARY KEY NOT ENFORCED , name VARCHAR ( 10 ) , price int , ts int , dt VARCHAR ( 10 ) ) PARTITIONED BY ( dt ) WITH ( 'connector' = 'hudi' , 'path' = '/tmp/hudi/test_hudi_flink3' , 'table.type' = 'MERGE_ON_READ' , 'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator' , 'hoodie.datasource.write.recordkey.field' = 'id' , 'hoodie.datasource.write.hive_style_partitioning' = 'true' , 'hive_sync.enable' = 'true' , 'hive_sync.mode' = 'hms' , 'hive_sync.metastore.uris' = 'thrift://indata-192-168-44-128.indata.com:9083' , 'hive_sync.conf.dir' = '/usr/hdp/3.1.0.0-78/hive/conf' , 'hive_sync.db' = 'hudi' , 'hive_sync.table' = 'test_hudi_flink3' , 'hive_sync.partition_fields' = 'dt' , 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.HiveStylePartitionValueExtractor' ) ;
HIVE_CONF_DIR和hive_sync.conf.dir作用是一样的,如果没有配置hive_sync.conf.dir的话就会取HIVE_CONF_DIR,如果都没有配置,同步会有异常,具体看后面的异常解决。
关于同步Hive的参数,官方文档上说hive_sync.metastore.uris是必须的,但是经过验证,不设置也可以,因为hive_sync.conf.dir下面有hive-site.xml读取里面的配置信息获取即可,Spark SQL同步Hive也是读取hive-site.xml的。其他的参数可以自己去了解
同步表
只有在写数据的时候才会触发同步Hive表
insert into test_hudi_flink3 values ( 1 , 'hudi' , 10 , 100 , '2022-10-31' ) , ( 2 , 'hudi' , 10 , 100 , '2022-10-31' ) ;
然后我们可以看到在Hive库中生成了两张表test_hudi_flink3_ro、test_hudi_flink3_rt,这和我们使用Spark SQL同步的表是一样的,可以用Hive查询,也可以用Spark查询、写数据
MOR表一开始没有生成parquet文件,在Hive里查询为空(RO、RT都为空),我们可以在SparkSQL里再插入几条数据,就可以查询出数据来了
# ro、rt都支持Spark SQL insert insert into test_hudi_flink3_ro values ( 3 , 'hudi' , 10 , 100 , '2022-10-31' ) , ( 4 , 'hudi' , 10 , 100 , '2022-10-31' ) ; insert into test_hudi_flink3_rt values ( 5 , 'hudi' , 10 , 100 , '2022-10-31' ) , ( 6 , 'hudi' , 10 , 100 , '2022-10-31' ) ;
关于Flink SQL和Spark SQL配置一致性问题:
hoodie.datasource.write.keygenerator.class 这里设置的为ComplexAvroKeyGenerator,也就是复合主键,原因是Fink SQL 默认值为SimpleKey,但是SparkSQL默认值SqlKeyGenerator,它是ComplexKeyGenerator,也就是默认值为复合主键,但是由于ComplexKeyGenerator在hudi-spark-client中,flink模块没有,所以flink中需要设置hudi-client-common中的ComplexAvroKeyGenerator即可保持一致性(如果keygenerator不一致会导致重复数据) hoodie.datasource.write.hive_style_partitioning flink sql默认值为false,但是SparkSQL为true,所以这里设置为true hive_sync.partition_extractor_class SparkSQL中默认值为MultiPartKeysValueExtractor,对于本例中的字符串类型的分区字段是支持的,但是Flink SQL中的默认值为SlashEncodedDayPartitionValueExtractor,它要求分区字段必须是日期格式的,所以这里设置为HiveStylePartitionValueExtractor即可解决 hoodie.allow.operation.metadata.field Flink支持这个配置项,当为true时,Hudi元数据字段中会多一个_hoodie_operation,但是目前Spark还不支持,所以对于这种,对于Flink SQL同步的Hive表,不能再通过Spark SQL写数据,不过后面我会提PR支持 我们发现对于Flink的很多配置项key值或者默认值都和Spark或者Hudi Common中不一致,这一点如果需要Flink和Spark配合使用的话,就需要注意保持一致性COW表
我们来看一下COW表会同步哪些表
建表
CREATE TABLE test_hudi_flink4 ( id int PRIMARY KEY NOT ENFORCED , name VARCHAR ( 10 ) , price int , ts int , dt VARCHAR ( 10 ) ) PARTITIONED BY ( dt ) WITH ( 'connector' = 'hudi' , 'path' = '/tmp/hudi/test_hudi_flink4' , 'table.type' = 'COPY_ON_WRITE' , 'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator' , 'hoodie.datasource.write.recordkey.field' = 'id' , 'hoodie.datasource.write.hive_style_partitioning' = 'true' , 'hive_sync.enable' = 'true' , 'hive_sync.mode' = 'hms' , 'hive_sync.metastore.uris' = 'thrift://indata-192-168-44-128.indata.com:9083' , 'hive_sync.conf.dir' = '/usr/hdp/3.1.0.0-78/hive/conf' , 'hive_sync.db' = 'hudi' , 'hive_sync.table' = 'test_hudi_flink4' , 'hive_sync.partition_fields' = 'dt' , 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.HiveStylePartitionValueExtractor' ) ;
同步表
写数据触发同步Hive表
insert into test_hudi_flink4 values ( 1 , 'hudi' , 10 , 100 , '2022-10-31' ) , ( 2 , 'hudi' , 10 , 100 , '2022-10-31' ) ;
因为COW表只有RT表,所以不会通过_rt来区分,同步的表名和配置的表名一致。这点可以参考我之前总结的文章Hudi查询类型/视图总结
+----------------------------------------------------+ | createtab_stmt | +----------------------------------------------------+ | CREATE EXTERNAL TABLE `test_hudi_flink4` ( | | `_hoodie_commit_time` string COMMENT '' , | | `_hoodie_commit_seqno` string COMMENT '' , | | `_hoodie_record_key` string COMMENT '' , | | `_hoodie_partition_path` string COMMENT '' , | | `_hoodie_file_name` string COMMENT '' , | | `id` int COMMENT '' , | | `name` string COMMENT '' , | | `price` int COMMENT '' , | | `ts` int COMMENT '' ) | | PARTITIONED BY ( | | `dt` string COMMENT '' ) | | ROW FORMAT SERDE | | 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' | | WITH SERDEPROPERTIES ( | | 'hoodie.query.as.ro.table' = 'false' , | | 'path' = '/tmp/hudi/test_hudi_flink4' ) | | STORED AS INPUTFORMAT | | 'org.apache.hudi.hadoop.HoodieParquetInputFormat' | | OUTPUTFORMAT | | 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' | | LOCATION | | 'hdfs://cluster1/tmp/hudi/test_hudi_flink4' | | TBLPROPERTIES ( | | 'last_commit_time_sync' = '20221030204426056' , | | 'spark.sql.sources.provider' = 'hudi' , | | 'spark.sql.sources.schema.numPartCols' = '1' , | | 'spark.sql.sources.schema.numParts' = '1' , | | 'spark.sql.sources.schema.part.0' = '{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"price","type":"integer","nullable":true,"metadata":{}},{"name":"ts","type":"integer","nullable":true,"metadata":{}},{"name":"dt","type":"string","nullable":true,"metadata":{}}]}' , | | 'spark.sql.sources.schema.partCol.0' = 'dt' , | | 'transient_lastDdlTime' = '1667133869' ) | +----------------------------------------------------+方式4、建在Hive Catalog中、同步Hive表
这样建表的好处是,我们既可以利用Hive Catalog中的表通过Flink SQL写数据,也可以利用同步的Hive表通过Hive SQL查询、Spark SQL读写
MOR表
建表
配置环境变量HIVE_CONF_DIR
export HIVE_CONF_DIR =/ usr / hdp / 3.1 .0 .0 - 78 / hive / conf
CREATE CATALOG hive_catalog WITH ( 'type' = 'hive' , 'default-database' = 'default' , 'hive-conf-dir' = '/usr/hdp/3.1.0.0-78/hive/conf' ) ; use catalog hive_catalog ; use hudi ; CREATE TABLE test_hudi_flink5 ( id int PRIMARY KEY NOT ENFORCED , name VARCHAR ( 10 ) , price int , ts int , dt VARCHAR ( 10 ) ) PARTITIONED BY ( dt ) WITH ( 'connector' = 'hudi' , 'path' = '/tmp/hudi/test_hudi_flink5' , 'table.type' = 'MERGE_ON_READ' , 'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator' , 'hoodie.datasource.write.recordkey.field' = 'id' , 'hoodie.datasource.write.hive_style_partitioning' = 'true' , 'hive_sync.enable' = 'true' , 'hive_sync.mode' = 'hms' , 'hive_sync.metastore.uris' = 'thrift://indata-192-168-44-128.indata.com:9083' , 'hive_sync.conf.dir' = '/usr/hdp/3.1.0.0-78/hive/conf' , 'hive_sync.db' = 'hudi' , 'hive_sync.table' = 'test_hudi_flink5' , 'hive_sync.partition_fields' = 'dt' , 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.HiveStylePartitionValueExtractor' ) ;
同步表
同样写几条数据触发同步Hive
insert into test_hudi_flink5 values ( 1 , 'hudi' , 10 , 100 , '2022-10-31' ) , ( 2 , 'hudi' , 10 , 100 , '2022-10-31' ) ;
然后我们可以看到在Hive库中生成了三张表test_hudi_flink4、test_hudi_flink4_ro、test_hudi_flink4_rt,其中test_hudi_flink4是Flink格式的,和上面的方式2中的表结构一样,不能用Hive查询,但是可以在Flink中写数据、查询数据,对于test_hudi_flink4_ro、test_hudi_flink4_rt,我们就可以用Hive查询,也可以用Spark查询、写数据。
COW表
但是对于COW表来说因为同步的表名没有_rt也就是和Hive Catalog表名一样,这样就有问题,所以我们需要区分出Hive Catalog表和同步的表名,一种方式是修改hive_sync.table,另一种方式是Hive Catalog表和同步表保存在不同的Hive Database中,比如下面的示例
CREATE CATALOG hive_catalog WITH ( 'type' = 'hive' , 'default-database' = 'default' , 'hive-conf-dir' = '/usr/hdp/3.1.0.0-78/hive/conf' ) ; use catalog hive_catalog ; use flink_hudi ; CREATE TABLE test_hudi_flink6 ( id int PRIMARY KEY NOT ENFORCED , name VARCHAR ( 10 ) , price int , ts int , dt VARCHAR ( 10 ) ) PARTITIONED BY ( dt ) WITH ( 'connector' = 'hudi' , 'path' = '/tmp/hudi/test_hudi_flink6' , 'table.type' = 'MERGE_ON_READ' , 'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator' , 'hoodie.datasource.write.recordkey.field' = 'id' , 'hoodie.datasource.write.hive_style_partitioning' = 'true' , 'hive_sync.enable' = 'true' , 'hive_sync.mode' = 'hms' , 'hive_sync.metastore.uris' = 'thrift://indata-192-168-44-128.indata.com:9083' , 'hive_sync.conf.dir' = '/usr/hdp/3.1.0.0-78/hive/conf' , 'hive_sync.db' = 'hudi' , 'hive_sync.table' = 'test_hudi_flink6' , 'hive_sync.partition_fields' = 'dt' , 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.HiveStylePartitionValueExtractor' ) ;
这样Catalog表保存在flink_hudi库中,同步的表保存在hudi库中
insert into test_hudi_flink6 values ( 1 , 'hudi' , 10 , 100 , '2022-10-31' ) , ( 2 , 'hudi' , 10 , 100 , '2022-10-31' ) ;异常解决
记录异常信息及解决方法,由于没有及时整理,顺序可能有点乱
不同步Hive
最开始使用的在maven里下载的包,在Hive里查询发现没有同步表,后来在官网https://hudi.apache.org/cn/docs/syncing_metastore,发现要使用profileflink-bundle-shade-hive自己打包
mvn clean package - DskipTests - Drat .skip = true - Pflink - bundle - shade - hive3 - Dflink1 .14 - Dscala - 2.12
但是用自己的打的包依旧不成功,在Flink SQL客户端没有异常,就很费解,后来发现在Flink yarn-session对应的web界面的Job Manager菜单里能看到具体的日志信息,比如写Hudi的Starting Javalin,这样就好办了,根据具体的异常信息对应解决即可。
异常1
2022 - 10 - 29 16 : 02 : 41 , 694 WARN hive .metastore [ ] - set_ugi ( ) not successful , Likely cause : new client talking to old server. Continuing without it. org .apache .thrift .transport .TTransportException : null at org .apache .thrift .transport .TIOStreamTransport .read ( TIOStreamTransport .java : 132 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ]
解决方法:配置环境变量HIVE_CONF_DIR或者配置参数hive_sync.conf.dir,这个问题困扰了我一整天,因为关于这个配置网上没有资料,我是在源码中找到的答案:
public static org .apache .hadoop .conf .Configuration getHiveConf ( Configuration conf ) { String explicitDir = conf .getString ( FlinkOptions .HIVE_SYNC_CONF_DIR , System .getenv ( "HIVE_CONF_DIR" ) ) ; org .apache .hadoop .conf .Configuration hadoopConf = new org .apache .hadoop .conf .Configuration ( ) ; if ( explicitDir != null ) { hadoopConf .addResource ( new Path ( explicitDir , "hive-site.xml" ) ) ; } return hadoopConf ; } // StreamWriteOperatorCoordinator this .hiveConf = new SerializableConfiguration ( HadoopConfigurations .getHiveConf ( conf ) ) ;
异常2
java .lang .NoSuchMethodError : org .apache .parquet .schema .Types$PrimitiveBuilder .as ( Lorg / apache / parquet / schema / LogicalTypeAnnotation ; ) Lorg / apache / parquet / schema / Types$Builder ; at org .apache .parquet .avro .AvroSchemaConverter .convertField ( AvroSchemaConverter .java : 177 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at org .apache .parquet .avro .AvroSchemaConverter .convertUnion ( AvroSchemaConverter .java : 242 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at org .apache .parquet .avro .AvroSchemaConverter .convertField ( AvroSchemaConverter .java : 199 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at org .apache .parquet .avro .AvroSchemaConverter .convertField ( AvroSchemaConverter .java : 152 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at org .apache .parquet .avro .AvroSchemaConverter .convertField ( AvroSchemaConverter .java : 260 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at org .apache .parquet .avro .AvroSchemaConverter .convertFields ( AvroSchemaConverter .java : 146 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at org .apache .parquet .avro .AvroSchemaConverter .convert ( AvroSchemaConverter .java : 137 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at org .apache .hudi .common .table .TableSchemaResolver .readSchemaFromLogFile ( TableSchemaResolver .java : 485 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at org .apache .hudi .common .table .TableSchemaResolver .readSchemaFromLogFile ( TableSchemaResolver .java : 468 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at org .apache .hudi .common .table .TableSchemaResolver .fetchSchemaFromFiles ( TableSchemaResolver .java : 604 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at org .apache .hudi .common .table .TableSchemaResolver .getTableParquetSchemaFromDataFile ( TableSchemaResolver .java : 251 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at org .apache .hudi .common .table .TableSchemaResolver .getTableAvroSchemaFromDataFile ( TableSchemaResolver .java : 117 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at org .apache .hudi .common .table .TableSchemaResolver .hasOperationField ( TableSchemaResolver .java : 537 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at org .apache .hudi .util .Lazy .get ( Lazy .java : 53 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at org .apache .hudi .common .table .TableSchemaResolver .getTableSchemaFromLatestCommitMetadata ( TableSchemaResolver .java : 208 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at org .apache .hudi .common .table .TableSchemaResolver .getTableAvroSchemaInternal ( TableSchemaResolver .java : 176 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at org .apache .hudi .common .table .TableSchemaResolver .getTableAvroSchema ( TableSchemaResolver .java : 138 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at org .apache .hudi .common .table .TableSchemaResolver .getTableParquetSchema ( TableSchemaResolver .java : 156 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at org .apache .hudi .sync .common .HoodieSyncClient .getStorageSchema ( HoodieSyncClient .java : 103 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at org .apache .hudi .hive .HiveSyncTool .syncHoodieTable ( HiveSyncTool .java : 206 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at org .apache .hudi .hive .HiveSyncTool .doSync ( HiveSyncTool .java : 157 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at org .apache .hudi .hive .HiveSyncTool .syncHoodieTable ( HiveSyncTool .java : 141 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at org .apache .hudi .sink .StreamWriteOperatorCoordinator .doSyncHive ( StreamWriteOperatorCoordinator .java : 335 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at org .apache .hudi .sink .StreamWriteOperatorCoordinator .syncHive ( StreamWriteOperatorCoordinator .java : 326 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at org .apache .hudi .sink .StreamWriteOperatorCoordinator .handleEndInputEvent ( StreamWriteOperatorCoordinator .java : 426 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at org .apache .hudi .sink .StreamWriteOperatorCoordinator .lambda$handleEventFromOperator$3 ( StreamWriteOperatorCoordinator .java : 278 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at org .apache .hudi .sink .utils .NonThrownExecutor .lambda$wrapAction$0 ( NonThrownExecutor .java : 130 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .0 .jar : 0.12 .0 ] at java .util .concurrent .Executors$RunnableAdapter .call ( Executors .java : 511 ) [ ? : 1.8 .0_181 ] at java .util .concurrent .FutureTask .run ( FutureTask .java : 266 ) [ ? : 1.8 .0_181 ] at java .util .concurrent .ThreadPoolExecutor .runWorker ( ThreadPoolExecutor .java : 1149 ) [ ? : 1.8 .0_181 ] at java .util .concurrent .ThreadPoolExecutor$Worker .run ( ThreadPoolExecutor .java : 624 ) [ ? : 1.8 .0_181 ] at java .lang .Thread .run ( Thread .java : 748 ) [ ? : 1.8 .0_181 ]
原因是jar包冲突,根据异常信息可知hudi包的org.apache.parquet.schema.Types这个类可能和flink环境下面的其他jar包冲突,经排查,发现hive-exec.*jar里也有一样的类名,将该jar包删除,验证问题解决。(在之前的文章中有写到因为缺某些类,才会将hive-exec.*jar放到flink下面,现在验证不缺这个类了,如果还有的话,可以找其他没有冲突的包替代)
异常3
Caused by : org .apache .flink .util .FlinkRuntimeException : Failed to start the operator coordinators at org .apache .flink .runtime .scheduler .DefaultOperatorCoordinatorHandler .startAllOperatorCoordinators ( DefaultOperatorCoordinatorHandler .java : 90 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .runtime .scheduler .SchedulerBase .startScheduling ( SchedulerBase .java : 585 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .runtime .jobmaster .JobMaster .startScheduling ( JobMaster .java : 965 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .runtime .jobmaster .JobMaster .startJobExecution ( JobMaster .java : 882 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .runtime .jobmaster .JobMaster .onStart ( JobMaster .java : 389 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .runtime .rpc .RpcEndpoint .internalCallOnStart ( RpcEndpoint .java : 181 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .runtime .rpc .akka .AkkaRpcActor$StoppedState .lambda$start$0 ( AkkaRpcActor .java : 624 ) ~ [ flink - rpc - akka_0f8ea990 - 3e27 - 4639 - 9 ea1 - d92b6879facc .jar : 1.14 .3 ] at org .apache .flink .runtime .concurrent .akka .ClassLoadingUtils .runWithContextClassLoader ( ClassLoadingUtils .java : 68 ) ~ [ flink - rpc - akka_0f8ea990 - 3e27 - 4639 - 9 ea1 - d92b6879facc .jar : 1.14 .3 ] at org .apache .flink .runtime .rpc .akka .AkkaRpcActor$StoppedState .start ( AkkaRpcActor .java : 623 ) ~ [ flink - rpc - akka_0f8ea990 - 3e27 - 4639 - 9 ea1 - d92b6879facc .jar : 1.14 .3 ] ... 20 more Caused by : java .lang .NoClassDefFoundError : org / apache / hadoop / hive / conf / HiveConf at org .apache .hudi .sink .StreamWriteOperatorCoordinator .initHiveSync ( StreamWriteOperatorCoordinator .java : 315 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .1 .jar : 0.12 .1 ] at org .apache .hudi .sink .StreamWriteOperatorCoordinator .start ( StreamWriteOperatorCoordinator .java : 191 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .1 .jar : 0.12 .1 ] at org .apache .flink .runtime .operators .coordination .OperatorCoordinatorHolder .start ( OperatorCoordinatorHolder .java : 194 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .runtime .scheduler .DefaultOperatorCoordinatorHandler .startAllOperatorCoordinators ( DefaultOperatorCoordinatorHandler .java : 85 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .runtime .scheduler .SchedulerBase .startScheduling ( SchedulerBase .java : 585 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .runtime .jobmaster .JobMaster .startScheduling ( JobMaster .java : 965 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .runtime .jobmaster .JobMaster .startJobExecution ( JobMaster .java : 882 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .runtime .jobmaster .JobMaster .onStart ( JobMaster .java : 389 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .runtime .rpc .RpcEndpoint .internalCallOnStart ( RpcEndpoint .java : 181 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .runtime .rpc .akka .AkkaRpcActor$StoppedState .lambda$start$0 ( AkkaRpcActor .java : 624 ) ~ [ flink - rpc - akka_0f8ea990 - 3e27 - 4639 - 9 ea1 - d92b6879facc .jar : 1.14 .3 ] at org .apache .flink .runtime .concurrent .akka .ClassLoadingUtils .runWithContextClassLoader ( ClassLoadingUtils .java : 68 ) ~ [ flink - rpc - akka_0f8ea990 - 3e27 - 4639 - 9 ea1 - d92b6879facc .jar : 1.14 .3 ] at org .apache .flink .runtime .rpc .akka .AkkaRpcActor$StoppedState .start ( AkkaRpcActor .java : 623 ) ~ [ flink - rpc - akka_0f8ea990 - 3e27 - 4639 - 9 ea1 - d92b6879facc .jar : 1.14 .3 ] ... 20 more Caused by : java .lang .ClassNotFoundException : org .apache .hadoop .hive .conf .HiveConf at java .net .URLClassLoader .findClass ( URLClassLoader .java : 381 ) ~ [ ? : 1.8 .0_181 ] at java .lang .ClassLoader .loadClass ( ClassLoader .java : 424 ) ~ [ ? : 1.8 .0_181 ] at sun .misc .Launcher$AppClassLoader .loadClass ( Launcher .java : 349 ) ~ [ ? : 1.8 .0_181 ] at java .lang .ClassLoader .loadClass ( ClassLoader .java : 357 ) ~ [ ? : 1.8 .0_181 ] at org .apache .hudi .sink .StreamWriteOperatorCoordinator .initHiveSync ( StreamWriteOperatorCoordinator .java : 315 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .1 .jar : 0.12 .1 ] at org .apache .hudi .sink .StreamWriteOperatorCoordinator .start ( StreamWriteOperatorCoordinator .java : 191 ) ~ [ hudi - flink1 .14 - bundle - 0.12 .1 .jar : 0.12 .1 ] at org .apache .flink .runtime .operators .coordination .OperatorCoordinatorHolder .start ( OperatorCoordinatorHolder .java : 194 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .runtime .scheduler .DefaultOperatorCoordinatorHandler .startAllOperatorCoordinators ( DefaultOperatorCoordinatorHandler .java : 85 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .runtime .scheduler .SchedulerBase .startScheduling ( SchedulerBase .java : 585 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .runtime .jobmaster .JobMaster .startScheduling ( JobMaster .java : 965 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .runtime .jobmaster .JobMaster .startJobExecution ( JobMaster .java : 882 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .runtime .jobmaster .JobMaster .onStart ( JobMaster .java : 389 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .runtime .rpc .RpcEndpoint .internalCallOnStart ( RpcEndpoint .java : 181 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .runtime .rpc .akka .AkkaRpcActor$StoppedState .lambda$start$0 ( AkkaRpcActor .java : 624 ) ~ [ flink - rpc - akka_0f8ea990 - 3e27 - 4639 - 9 ea1 - d92b6879facc .jar : 1.14 .3 ] at org .apache .flink .runtime .concurrent .akka .ClassLoadingUtils .runWithContextClassLoader ( ClassLoadingUtils .java : 68 ) ~ [ flink - rpc - akka_0f8ea990 - 3e27 - 4639 - 9 ea1 - d92b6879facc .jar : 1.14 .3 ] at org .apache .flink .runtime .rpc .akka .AkkaRpcActor$StoppedState .start ( AkkaRpcActor .java : 623 ) ~ [ flink - rpc - akka_0f8ea990 - 3e27 - 4639 - 9 ea1 - d92b6879facc .jar : 1.14 .3 ]
这个异常就是使用在maven下载的包同步hive产生的异常,但是无法在Flink yarn-session对应的web界面看日志,因为yarn-session对应的任务会跑挂掉,我们可以通过下面的命令查看日志信息
yarn logs - applicationId application_1666247158647_0121
异常4
Caused by : org .apache .flink .streaming .runtime .tasks .StreamTaskException : Could not instantiate outputs in order . at org .apache .flink .streaming .api .graph .StreamConfig .getOutEdgesInOrder ( StreamConfig .java : 488 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .streaming .runtime .tasks .StreamTask .createRecordWriters ( StreamTask .java : 1612 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .streaming .runtime .tasks .StreamTask .createRecordWriterDelegate ( StreamTask .java : 1596 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .streaming .runtime .tasks .StreamTask . < init > ( StreamTask .java : 376 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .streaming .runtime .tasks .StreamTask . < init > ( StreamTask .java : 359 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .streaming .runtime .tasks .StreamTask . < init > ( StreamTask .java : 332 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .streaming .runtime .tasks .StreamTask . < init > ( StreamTask .java : 324 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .streaming .runtime .tasks .StreamTask . < init > ( StreamTask .java : 314 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .streaming .runtime .tasks .OneInputStreamTask . < init > ( OneInputStreamTask .java : 75 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at sun .reflect .NativeConstructorAccessorImpl .newInstance0 ( Native Method ) ~ [ ? : 1.8 .0_181 ] at sun .reflect .NativeConstructorAccessorImpl .newInstance ( NativeConstructorAccessorImpl .java : 62 ) ~ [ ? : 1.8 .0_181 ] at sun .reflect .DelegatingConstructorAccessorImpl .newInstance ( DelegatingConstructorAccessorImpl .java : 45 ) ~ [ ? : 1.8 .0_181 ] at java .lang .reflect .Constructor .newInstance ( Constructor .java : 423 ) ~ [ ? : 1.8 .0_181 ] at org .apache .flink .runtime .taskmanager .Task .loadAndInstantiateInvokable ( Task .java : 1582 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .runtime .taskmanager .Task .doRun ( Task .java : 740 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at org .apache .flink .runtime .taskmanager .Task .run ( Task .java : 575 ) ~ [ flink - dist_2 .12 - 1.14 .3 .jar : 1.14 .3 ] at java .lang .Thread .run ( Thread .java : 748 ) ~ [ ? : 1.8 .0_181 ] Caused by : java .io .IOException : unexpected exception type
这个原因是因为yarn-session所用的hudi包和sql-client所用的hudi包版本不一致,改为一致即可
其他异常
比如缺相关依赖包异常,去环境上Hive路径下拷贝对应的jar包到Flink路径下即可
总结本文记录了自己使用Flink SQL读写Hudi表并同步Hive的一些配置,并且做了Flink SQL和Spark SQL的一致性配置。其实关于Flink SQL读写Hudi还有一个HoodieHiveCatalog也可以使用,有时间等我研究明白了,再分享给大家。
原文地址:https://mp.weixin.qq.com/s/N_SnkJDEEvka2rvtV2RWJg
查看更多关于Flink SQL操作Hudi并同步Hive使用总结的详细内容...