原副标题:Apache Hudi 在鸵鸟云统计数据湖网络平台的结构设计与课堂教学
在大统计信息处置中,动态统计数据挖掘是两个重要的市场需求。随著统计信息量的急速增长,对动态预测的考验也在急速加强,现代的格式化形式已经不能满足用户动态统计数据处理的市场需求,需要一类更为高工作效率的控制技术来化解这个问题。Apache Hudi(Hadoop Upserts Deletes and Incremental Processing)就是这样一类控制技术,提供更多了高工作效率的动态统计基础构架管理工作机能。
责任编辑将如是说鸵鸟云如前所述 Hudi 构筑统计数据湖的总体计划构架或其在动态统计基础构架处置方面的特征,因此为我们展现两个采用 Apache Hudi 的单纯实例,易于新手里路。
Apache Hudi 如是说
Apache Hudi 是两个开放源码的统计数据湖储存系统,能在 Hadoop 生态系中提供更多动态统计基础构架处置机能。Hudi 最先由 Uber 合作开发,而后成为 Apache 世界顶级工程项目。
Hudi 主要就优点
· 全力支持加速填入和预览操作方式,以期在统计基础构架中动态处置统计数据;
· 提供更多存量查阅机能,可有效率提升统计数据挖掘工作效率;
· 全力支持时间点查阅,以期查阅统计数据在某一关键时刻的状况;
· 与 Apache Spark、Hive 等大统计数据挖掘辅助工具相容。
Hudi 构架
Apache Hudi 的构架包括下列两个主要就模块:
· Hudi 统计数据储存:Hudi 统计数据储存是 Hudi 的核心理念模块,负责管理工作储存统计数据,统计数据储存有三种类别:Copy-On-Write(COW)和 Merge-On-Read(MOR);
· Copy-On-Write:COW 储存类别会在对统计数据展开预览时,建立两个捷伊统计CSV复本,将预览的统计数据载入复本中,之后,捷伊统计CSV复本会代替原始统计CSV;
· Merge-On-Read:MOR 储存类别会在查阅时,将预览的统计数据与原始统计数据展开分拆,这种形式能减少统计数据储存的载入延后,但会减少查阅的排序量;
· Hudi 索引:Hudi 索引用于维护统计数据记录的位置信息,索引有三种类别:内置索引(如 Bloom 过滤器)和外部索引(如 HBase 索引);
· Hudi 查阅引擎:Hudi 查阅引擎负责管理工作处置查阅请求,Hudi 全力支持多种查阅引擎,如 Spark SQL、Hive、Presto 等。
Hudi 的采用场景
Apache Hudi 能帮助企业和组织实现动态统计信息处置和预测。动态统计信息处置需要加速地处置和查阅统计数据,同时还需要保证统计数据的一致性和可靠性。
Apache Hudi 的存量统计信息处置、ACID 事务性保证、写时分拆等控制技术优点能帮助企业更好地实现动态统计信息处置和预测,如前所述 Hudi 的优点能在一定程度上在动态数仓的构筑过程中承担上下游统计数据链路的对接(类似 Kafka 的角色)。既能实现存量的统计信息处置,也能为批流一体的处置提供更多储存基础。
Hudi 的优势和劣势
● 优势
· 高工作效率处置大规模统计数据集;
· 全力支持动态统计数据预览和查阅;
· 实现了存量载入机制,提升了统计数据访问工作效率;
· Hudi 能与流处置管道集成;
· Hudi 提供更多了时间旅行机能,允许回溯统计数据的历史版本。
● 劣势
· 在读写统计数据时需要付出额外的代价;
· 操作方式比较复杂,需要采用专业的编程语言和辅助工具。
Hudi 在鸵鸟云统计数据湖网络平台上的课堂教学
Hudi 在鸵鸟云统计数据湖的控制技术构架
Hudi 在鸵鸟云的统计数据湖网络平台上主要就对统计数据湖管理工作提供更多助力:
· 元统计数据的接入,让用户可以加速的对表展开管理工作;
· 统计数据加速接入,包括对符合条件的原有表统计数据展开转换,加速搭建统计数据湖能力;
· 湖表的管理工作,监控小文档定期展开分拆,提升表的查阅性能,内在丰富的表操作方式机能,包括 time travel ,孤儿文档清理,过期快照清理等;
· 索引构筑,提供更多多种索引包括 bloom filter,zorder 等,提升排序引擎的查阅性能。
Hudi 采用实例
在如是说了 Hudi 的基本信息和鸵鸟云统计数据湖网络平台的结构之后,我们来看两个采用实例,代替 Flink 在内存中的 join 过程。
在 Flink 中对多流 join 往往是比较头疼的场景,需要考虑 state ttl 时间设置,设置太小统计数据经常关联不上,设置太大内存又需要很高才能保留,我们通过 Hudi 的形式来换个思路实现。
● 构筑 catalog
public String createCatalog(){
String createCatalog = “CREATE CATALOG hudi_catalog WITH (\n” +
” type = hudi,\n” +
” mode = hms,\n” +
” default-database = default,\n” +
” hive.conf.dir = /hive_conf_dir,\n” +
” table.external = true\n” +
“)”;
return createCatalog;
}
● 建立 hudi 表
public String createHudiTable(){
String createTable = “CREATE TABLE if not exists hudi_catalog.flink_db.test_hudi_flink_join_2 (\n” +
” id int ,\n” +
” name VARCHAR(10),\n” +
” age int ,\n” +
” address VARCHAR(10),\n” +
” dt VARCHAR(10),\n” +
” primary key(id) not enforced\n” +
“)\n” +
“PARTITIONED BY (dt)\n” +
“WITH (\n” +
” connector = hudi,\n” +
” table.type = MERGE_ON_READ,\n” +
” changelog.enabled = true,\n” +
” index.type = BUCKET,\n” +
” hoodie.bucket.index.num.buckets = 2,\n” +
String.format(” %s = %s,\n”, FlinkOptions.PRECOMBINE_FIELD.key(), FlinkOptions.NO_PRE_COMBINE) +
” write.payload.class = ” + PartialUpdateAvroPayload.class.getName() + “\n” +
“);”;
return createTable;
}
● 预览 hudi 表的 flink_db.test_hudi_flink_join_2 的 id, name, age, dt 列
01 从 kafka 中读取 topic1
public String createKafkaTable1(){
String kafkaSource1 = “CREATE TABLE source1\n” +
“(\n” +
” id INT,\n” +
” name STRING,\n” +
” age INT,\n” +
” dt String,\n” +
” PROCTIME AS PROCTIME()\n” +
“) WITH (\n” +
” connector = kafka\n” +
” ,topic = join_topic1\n” +
” ,properties.bootstrap.servers = localhost:9092\n” +
” ,scan.startup.mode = earliest-offset\n” +
” ,format = json\n” +
” ,json.timestamp-format.standard = SQL\n” +
” )”;
return kafkaSource1;
}
02 从 kafka 中读取 topic2
public String createKafkaTable2(){
String kafkaSource2 = “CREATE TABLE source2\n” +
“(\n” +
” id INT,\n” +
” name STRING,\n” +
” address string,\n” +
” dt String,\n” +
” PROCTIME AS PROCTIME()\n” +
“) WITH (\n” +
” connector = kafka\n” +
” ,topic = join_topic2\n” +
” ,properties.bootstrap.servers = localhost:9092\n” +
” ,scan.startup.mode = earliest-offset\n” +
” ,format = json\n” +
” ,json.timestamp-format.standard = SQL\n” +
” )”;
return kafkaSource2;
}
● 执行填入逻辑1
String insertSQL = “insert into hudi_catalog.flink_db.test_hudi_flink_join_2(id,name,age,dt) ” +
“select id, name,age,dt from source1”;
● 通过 spark 查阅统计数据
20230323090605515 20230323090605515_1_186 45 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 45 xc 45 NULL 1
20230323090605515 20230323090605515_1_179 30 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 30 xc 30 NULL 1
● 执行填入逻辑2
String insertSQL = “insert into hudi_catalog.flink_db.test_hudi_flink_join_2(id,name,address,dt) ” +
“select id, name, address,dt from source2”;
● 运行成功
运行成功后在 spark 中查阅对应的表统计数据:
20230323090605515 20230323090605515_1_186 45 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 45 xc 45 xc:address45 1
20230323090605515 20230323090605515_1_179 30 1 c990a618-896c-4627-8243-baace65c7ad6-0_0-21-26_20230331101342388.parquet 30 xc 30 xc:address30 1
能发现在第二次统计数据运行之后,表统计数据的对应字段 address 已经预览,达到了类似在 Flink 中直接执行 join 的效果。
insert into hudi_catalog.flink_db.test_hudi_flink_join_2
select a.id, a.name, a.age,b.address a.dt from source1 a left join source2 b on a.id = b.id
《统计数据治理行业课堂教学白皮书》下载地址:https://fs80.cn/380a4b
《数栈V6.0产品白皮书》下载地址:https://fs80.cn/cw0iw1
想了解或咨询更多有关鸵鸟云大统计数据产品、行业化解计划、客户案例的朋友,浏览鸵鸟云官网:https://www.dtstack.com/?src=szsohu
同时,欢迎对大统计数据开放源码工程项目有兴趣的同学加入「鸵鸟云开放源码框架钉钉控制技术 qun」,交流最新开放源码控制技术信息,qun 号码:30537511,工程项目地址:https://github.com/DTStack