好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

Apache Hudi异步Clustering部署操作的掌握

1. 摘要

在 之前的一篇博客 中,我们介绍了Clustering(聚簇)的表服务来重新组织数据来提供更好的查询性能,而不用降低摄取速度,并且我们已经知道如何部署同步Clustering,本篇博客中,我们将讨论近期社区做的一些改进以及如何通过 HoodieClusteringJob 和 DeltaStreamer 工具来部署 异步 Clustering。

2. 介绍

通常讲, Clustering 根据可配置的策略创建一个计划,根据特定规则对符合条件的文件进行分组,然后执行该计划。 Hudi 支持 并发写入 ,并在多个表服务之间提供快照隔离,从而允许写入程序在后台运行 Clustering 时继续摄取。有关 Clustering 的体系结构的更详细概述请查看上一篇博文。

3. Clustering策略

如前所述 Clustering 计划和执行取决于可插拔的配置策略。这些策略大致可分为三类:计划策略、执行策略和更新策略。

3.1 计划策略

该策略在创建Clustering计划时发挥作用。它有助于决定应该对哪些文件组进行Clustering。让我们看一下Hudi提供的不同计划策略。请注意,使用 此配置 可以轻松地插拔这些策略。

SparkSizeBasedClusteringPlanStrategy:根据基本文件的 小文件限制 选择文件切片并创建 Clustering 组,最大大小为每个组允许的最大文件大小。可以使用 此配置 指定最大大小。此策略对于将中等大小的文件合并成大文件非常有用,以减少跨冷分区分布的大量文件。 SparkRecentDaysClusteringPlanStrategy:根据以前的 N 天分区创建一个计划,将这些分区中的小文件片进行 Clustering ,这是默认策略,当工作负载是可预测的并且数据是按时间划分时,它可能很有用。 SparkSelectedPartitionsClusteringPlanStrategy:如果只想对某个范围内的特定分区进行 Clustering ,那么无论这些分区是新分区还是旧分区,此策略都很有用,要使用此策略,还需要在下面设置两个配置(包括开始和结束分区):

?

1

2

hoodie.clustering.plan.strategy.cluster.begin.partition

hoodie.clustering.plan.strategy.cluster.end.partition

注意:所有策略都是分区感知的,后两种策略仍然受到第一种策略的大小限制的约束。

3.2 执行策略

在计划阶段构建 Clustering 组后,Hudi主要根据排序列和大小为每个组应用执行策略,可以使用 此配置 指定策略。

SparkSortAndSizeExecutionStrategy 是默认策略。使用此配置进行 Clustering 时,用户可以指定数据排序列。除此之外我们还可以为 Clustering 产生的Parquet文件设置 最大文件大小 。该策略使用 bulk_insert 将数据写入新文件,在这种情况下,Hudi隐式使用一个分区器,该分区器根据指定列进行排序。通过这种策略改变数据布局,不仅提高了查询性能,而且自动平衡了重写开销。

现在该策略可以作为单个Spark作业或多个作业执行,具体取决于在计划阶段创建的 Clustering 组的数量。默认情况下Hudi将提交多个Spark作业并合并结果。如果要强制Hudi使用单Spark作业,请将执行策略类配置设置为 SingleSparkJobExecutionStrategy 。

3.3 更新策略

目前只能为未接收任何并发更新的表/分区调度 Clustering 。默认情况下更新策略的配置设置为 SparkRejectUpdateStrategy 。如果某个文件组在 Clustering 期间有更新,则它将拒绝更新并引发异常。然而在某些用例中,更新是非常稀疏的,并且不涉及大多数文件组。简单拒绝更新的默认策略似乎不公平。在这种用例中用户可以将配置设置为 SparkAllowUpdateStregy 。

我们讨论了关键策略配置,下面列出了与 Clustering 相关的所有其他配置。在此列表中一些非常有用的配置包括:

配置项 解释 默认值
hoodie.clustering.async.enabled 启用在表上的异步运行Clustering服务。 false
hoodie.clustering.async.max.commits 通过指定应触发多少次提交来控制异步Clustering的频率。 4
hoodie.clustering.preserve.commit.metadata 重写数据时保留现有的_hoodie_commit_time。这意味着用户可以在Clustering数据上运行增量查询,而不会产生任何副作用。 false

4. 异步Clustering

之前我们已经了解了用户如何设置 同步Clustering 。此外用户可以利用 HoodiecClusteringJob 设置两步异步Clustering。

4.1 HoodieClusteringJob

随着Hudi版本0.9.0的发布,我们可以在同一步骤中调度和执行 Clustering 。我们只需要指定 -mode 或 -m 选项。有如下三种模式:

schedule (调度):制定一个Clustering计划。这提供了一个可以在执行模式下传递的 instant 。

execute (执行):在给定的 instant 执行Clustering计划,这意味着这里需要 instant 。

scheduleAndExecute (调度并执行):首先制定Clustering计划并立即执行该计划。

请注意要在原始写入程序仍在运行时运行作业请启用多写入:

?

1

2

hoodie.write.concurrency.mode=optimistic_concurrency_control

hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider

使用 spark submit 命令提交 HoodieClusteringJob 示例如下:

?

1

2

3

4

5

6

7

8

spark-submit \

--class org.apache.hudi.utilities.HoodieClusteringJob \

/path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2 .12-0.9.0-SNAPSHOT.jar \

--props /path/to/config/clusteringjob .properties \

--mode scheduleAndExecute \

--base-path /path/to/hudi_table/basePath \

--table-name hudi_table_schedule_clustering \

--spark-memory 1g

clusteringjob.properties 配置文件示例如下

?

1

2

3

4

5

6

hoodie.clustering.async.enabled= true

hoodie.clustering.async.max.commits=4

hoodie.clustering.plan.strategy.target. file .max.bytes=1073741824

hoodie.clustering.plan.strategy.small. file .limit=629145600

hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy

hoodie.clustering.plan.strategy. sort .columns=column1,column2

4.2 HoodieDeltaStreamer

接着看下如何使用 HudiDeltaStreamer 。现在我们可以使用 DeltaStreamer 触发异步Clustering。只需将hoodie.clustering.async.enabled为 true ,并在属性文件中指定其他Clustering配置,在启动 Deltastreamer 时可以将其位置设为 -props (与 HoodieClusteringJob 配置类似)。

使用 spark submit 命令提交 HoodieDeltaStreamer 示例如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

spark-submit \

--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \

/path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2 .12-0.9.0-SNAPSHOT.jar \

--props /path/to/config/clustering_kafka .properties \

--schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider \

-- source -class org.apache.hudi.utilities.sources.AvroKafkaSource \

-- source -ordering-field impresssiontime \

--table- type COPY_ON_WRITE \

--target-base-path /path/to/hudi_table/basePath \

--target-table impressions_cow_cluster \

-- op INSERT \

--hoodie-conf hoodie.clustering.async.enabled= true \

--continuous

4.3 Spark Structured Streaming

我们还可以使用Spark结构化流启用异步Clustering,如下所示。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

val commonOpts = Map(

    "hoodie.insert.shuffle.parallelism" -> "4" ,

    "hoodie.upsert.shuffle.parallelism" -> "4" ,

    DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key" ,

    DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition" ,

    DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp" ,

    HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"

)

def getAsyncClusteringOpts(isAsyncClustering: String,

                            clusteringNumCommit: String,

                            executionStrategy: String):Map[String, String] = {

    commonOpts + (DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key -> isAsyncClustering,

            HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key -> clusteringNumCommit,

            HoodieClusteringConfig.EXECUTION_STRATEGY_CLASS_NAME.key -> executionStrategy

    )

}

def initStreamingWriteFuture(hudiOptions: Map[String, String]): Future[Unit] = {

    val streamingInput = // define the source of streaming

    Future {

       println( "streaming starting" )

       streamingInput

               .writeStream

               .format( "org.apache.hudi" )

               .options(hudiOptions)

               .option( "checkpointLocation" , basePath + "/checkpoint" )

               .mode(Append)

               .start()

               .awaitTermination(10000)

       println( "streaming ends" )

    }

}

def structuredStreamingWithClustering(): Unit = {

    val df = //generate data frame

    val hudiOptions = getClusteringOpts( "true" , "1" , "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy" )

    val f1 = initStreamingWriteFuture(hudiOptions)

    Await.result(f1, Duration.Inf)

}

5. 总结和未来工作

在这篇文章中,我们讨论了不同的Clustering策略以及如何设置异步Clustering。未来的工作包括:

Clustering支持更新。

支持Clustering的CLI工具。

另外Flink支持Clustering已经有相应 Pull Request ,有兴趣的小伙伴可以关注该PR。

以上就是 Apache Hudi异步Clustering部署操作的掌握的详细内容,更多关于Apache Hudi异步Clustering部署的资料请关注其它相关文章!

原文链接:https://www.cnblogs.com/leesf456/p/15340428.html

查看更多关于Apache Hudi异步Clustering部署操作的掌握的详细内容...

  阅读:23次