Flink Hudi 测试
组件版本
将hdfs-site.xml,core-site.xml,hive-site.xml放入resources下
测试代码
-
object TestFlinkSQLOptHudi {
-
private var logger: org.slf4j.Logger = _
-
-
def main(args: Array[String]): Unit = {
-
logger = LoggerFactory.getLogger(this.getClass.getSimpleName)
-
Logger.getLogger("org.apache").setLevel(Level.WARN)
-
Logger.getLogger("hive.metastore").setLevel(Level.WARN)
-
Logger.getLogger("akka").setLevel(Level.INFO)
-
-
val tableEnv = FlinkUtils.initStreamTableEnvironment()
-
-
// datagen source 随机生成数据
-
// val sourceDDL =
-
// """
-
// |create table datagen_source (
-
// | id int,
-
// | data string,
-
// | ts as localtimestamp,
-
// | watermark for ts as ts
-
// |) with (
-
// | 'connector' = 'datagen',
-
// | 'rows-per-second'='10',
-
// | 'fields.id.kind'='sequence',
-
// | 'fields.id.start'='1',
-
// | 'fields.id.end'='100000',
-
// | 'fields.data.length'='5'
-
// |)
-
// |""".stripMargin
-
// tableEnv.executeSql(sourceDDL)
-
-
// kafka sink
-
// val sinkDDL =
-
// """
-
// |create table kafka_sink (
-
// | id int,
-
// | data string,
-
// | ts timestamp
-
// |) with (
-
// | 'connector' = 'kafka',
-
// | 'topic' = 'kafka_sink',
-
// | 'properties.bootstrap.servers' = 'test-lakehouse:9092',
-
// | 'properties.group.id' = 'lakehouse',
-
// | 'scan.startup.mode' = 'earliest-offset',
-
// | 'format' = 'json',
-
// | 'json.fail-on-missing-field' = 'false',
-
// | 'json.ignore-parse-errors' = 'true'
-
// |)
-
// |""".stripMargin
-
// tableEnv.executeSql(sinkDDL)
-
-
// insert to kafka
-
// val insertDML =
-
// """
-
// |insert into kafka_sink
-
// |select * from datagen_soure
-
// |""".stripMargin
-
// tableEnv.executeSql(insertDML)
-
-
// 1. 建表,写表
-
// val writeTableDDL =
-
// """
-
// |create table if not exists hudi_test_table (
-
// | id int,
-
// | data string,
-
// | ts timestamp(3),
-
// | `time` string,
-
// | `date` string
-
// |) partitioned by (`date`)
-
// | with (
-
// | 'connector' = 'hudi',
-
// | 'table.type' = 'MERGE_ON_READ',
-
// | 'path' = 'hdfs://test-lakehouse:9000/lakehouse/hudi_test_table',
-
// // | 'path'='file:///F:\workspace\lakehouse-hudi\test\src\main\resources\hudi_test_table',
-
// | 'write.tasks' = '1',
-
// | 'hoodie.datasource.write.recordkey.field' = 'id',
-
// | 'write.precombine.field' = 'ts',
-
// | 'compaction.tasks' = '1',
-
// | 'compaction.trigger.strategy' = 'num_or_time',
-
// | 'compaction.delta_commits' = '2',
-
// | 'compaction.delta_seconds' = '300'
-
// |)
-
// |""".stripMargin
-
// tableEnv.executeSql(writeTableDDL)
-
-
// 2. 直接写入数据
-
// val insertDML =
-
// """
-
// |insert into hudi_test_table values
-
// |(1, 'a', timestamp '2021-09-26 00:00:01', '2021-09-26 00:00:01', '2021-09-26'),
-
// |(2, 'b', timestamp '2021-09-26 00:00:02', '2021-09-26 00:00:02', '2021-09-26'),
-
// |(3, 'c', timestamp '2021-09-26 00:00:03', '2021-09-26 00:00:03', '2021-09-26'),
-
// |(4, 'd', timestamp '2021-09-27 00:00:04', '2021-09-27 00:00:04', '2021-09-27'),
-
// |(5, 'e', timestamp '2021-09-27 00:00:05', '2021-09-27 00:00:05', '2021-09-27'),
-
// |(6, 'f', timestamp '2021-09-27 00:00:06', '2021-09-27 00:00:06', '2021-09-27')
-
// |""".stripMargin
-
// tableEnv.executeSql(insertDML)
-
// 查询结果
-
// ---- --- ------- --------------------- --------------------- ------------
-
//| op | d | a | ts | time | date |
-
// ---- --- - -- --------------------- --------------------- ------------
-
//| I | 1 | a | 2021-09-26T00:00:01 | 2021-09-26 00:00:01 | 2021-09-26 |
-
//| I | 2 | b | 2021-09-26T00:00:02 | 2021-09-26 00:00:02 | 2021-09-26 |
-
//| I | 3 | c | 2021-09-26T00:00:03 | 2021-09-26 00:00:03 | 2021-09-26 |
-
//| I | 4 | d | 2021-09-27T00:00:04 | 2021-09-27 00:00:04 | 2021-09-27 |
-
//| I | 5 | e | 2021-09-27T00:00:05 | 2021-09-27 00:00:05 | 2021-09-27 |
-
//| I | 6 | f | 2021-09-27T00:00:06 | 2021-09-27 00:00:06 | 2021-09-27 |
-
-
// update 数据
-
// val updateDML =
-
// """
-
// |insert into t1 values
-
// |('1','update',timestamp '2021-09-26 12:12:12', date '2021-09-26')
-
// |""".stripMargin
-
// tableEnv.executeSql(updateDML)
-
// 查询结果
-
// ---- --- --------- --------------------- --------------------- ------------
-
//| op | d | a | ts | time | date |
-
// ---- --- -------- --------------------- --------------------- ------------
-
//| I | 1 | update | 2021-09-26T12:12:12 | 2021-09-26 12:12:12 | 2021-09-26 |
-
//| I | 2 | b | 2021-09-26T00:00:02 | 2021-09-26 00:00:02 | 2021-09-26 |
-
//| I | 3 | c | 2021-09-26T00:00:03 | 2021-09-26 00:00:03 | 2021-09-26 |
-
//| I | 4 | d | 2021-09-27T00:00:04 | 2021-09-27 00:00:04 | 2021-09-27 |
-
//| I | 5 | e | 2021-09-27T00:00:05 | 2021-09-27 00:00:05 | 2021-09-27 |
-
//| I | 6 | f | 2021-09-27T00:00:06 | 2021-09-27 00:00:06 | 2021-09-27 |
-
-
// 3. source 数据写入 hudi
-
// val insertDML =
-
// """
-
// |insert into hudi_test_table
-
// |select
-
// | id,
-
// | data,
-
// | ts,
-
// | date_format(ts, 'yyyy-MM-dd HH:mm:ss') as `time`,
-
// | date_format(ts, 'yyyy-MM-dd') as `date`
-
// |from datagen_source
-
// |""".stripMargin
-
// tableEnv.executeSql(insertDML)
-
-
// 4. 流表查询
-
tableEnv.executeSql("drop table if exists hudi_test_table")
-
val readTableDDL =
-
"""
-
|create table if not exists hudi_test_table (
-
| id int,
-
| data string,
-
| ts timestamp(3),
-
| `time` string,
-
| `date` string
-
|) partitioned by (`date`)
-
| with (
-
| 'connector' = 'hudi',
-
| 'table.type' = 'MERGE_ON_READ',
-
| 'path' = 'hdfs://test-lakehouse:9000/lakehouse/hudi_test_table',
-
| 'hoodie.datasource.write.recordkey.field' = 'id',
-
// | 'hoodie.datasource.query.type' = 'snapshot',
-
| 'read.tasks' = '1',
-
| 'read.streaming.enabled' = 'true',
-
| 'read.streaming.check-interval' = '5',
-
| 'read.streaming.start-commit' = '000',
-
| 'read.utc-timezone' = 'false'
-
|)
-
|""".stripMargin
-
tableEnv.executeSql(readTableDDL)
-
-
val queryDML =
-
"""
-
|select count(distinct id) from hudi_test_table where `date` = '2021-09-30'
-
|""".stripMargin
-
tableEnv.executeSql(queryDML).print()
-
-
// 或者 print sink
-
val sinkDDL =
-
// val sinkDDL =
-
// """
-
// |create table print_sink (
-
// | id int,
-
// | data string,
-
// | ts timestamp(3),
-
// | `time` string,
-
// | `date` string
-
// |) with (
-
// | 'connector' = 'print'
-
// |)
-
// |""".stripMargin
-
// tableEnv.executeSql(sinkDDL)
-
//
-
// val queryDML =
-
// """
-
// |insert into print_sink
-
// |select * from hudi_test_table
-
// |""".stripMargin
-
// tableEnv.executeSql(queryDML)
-
}
-
}
sql-client 测试
本地测试模式
将 hudi-flink-bundle_2.11-0.9.0.jar 放入 $FLINK_HOME/lib 下,启动sql-client既可。
sql-client embedded
yarn-cluster模式
选择一台服务器创建 session
./bin/yarn-session.sh -d -nm dwd_test -yjm 3g -ytm 10g -ys 2
检查session是否正常
yarn application -list | grep application_1632050203454_167491
查看 applictionId
在该服务器上每次创建 session,applictionId 都会改变。
-
# cat /tmp/.yarn-properties-user,比如 hdfs用户启动的session
-
-
cat /tmp/.yarn-properties-hdfs
-
-
#Generated YARN properties file
-
#Wed Sep 29 17:00:09 CST 2021
-
dynamicPropertiesString=
-
applicationID=application_1632050203454_167491
切换 hdfs 用户启动 sql-client 进行测试,如果 /tmp/.yarn-properties-hdfs 中 applicationId 发生改变,该会话将不可用,所以测试时只创建一次 session,可启动多个 sql-client 多个 Job 往该session提交即可。
-
bin/sql-client.sh embedded -s application_1632050203454_167491
-
-
# 检查会话是否可用
-
select now();
部分参数设置
-
set execution.checkpointing.interval = 10sec;
-
-
set execution.result-mode = tableau;
-
-
set execution.restart-strategy.type = fixed-delay;
-
set execution.restart-strategy.attempts = 10;
-
set execution.restart-strategy.delay = 10000;
建表
-
create table if not exists datagen_source (
-
id int,
-
data string,
-
ts as localtimestamp,
-
watermark for ts as ts
-
) with (
-
'connector' = 'datagen',
-
--'number-of-rows' = '10000',
-
'rows-per-second'='10',
-
'fields.id.kind'='sequence',
-
'fields.id.start'='1',
-
'fields.id.end'='100000',
-
'fields.data.length'='5'
-
);
-
-
create table if not exists hudi_test_table(
-
id int,
-
data string,
-
ts timestamp(3),
-
`time` string,
-
`date` string
-
) comment 'flink hudi ods table : 用户维系记录表'
-
partitioned by (`date`)
-
with (
-
'connector' = 'hudi',
-
'table.type' = 'MERGE_ON_READ',
-
'path' = 'hdfs:///user/hive/datalake/hudi_test_db/hudi_test_table',
-
'read.tasks' = '1',
-
'read.streaming.enabled' = 'true',
-
'read.streaming.check-interval' = '10',
-
'read.streaming.start-commit' = '000',
-
'read.utc-timezone' = 'false',
-
'write.tasks' = '1',
-
'write.rate.limit' = '2000',
-
'write.bucket_assign.tasks' = '3',
-
'write.precombine.field' = 'ts',
-
'write.index_bootstrap.tasks' = '3'
-
'index.global.enabled' = 'true'
-
'index.bootstrap.enabled' = 'true'
-
'index.state.ttl' = '15'
-
'clean.retain_commits' = '60',
-
'hoodie.datasource.write.recordkey.field' = 'id',
-
'hoodie.datasource.write.partitionpath.field' = 'date',
-
'hoodie.datasource.query.type' = 'snapshot',
-
'hoodie.datasource.merge.type' = 'payload_combine',
-
'compaction.tasks' = '1',
-
'compaction.trigger.strategy' = 'num_or_time',
-
'compaction.delta_commits' = '5',
-
'compaction.delta_seconds' = '300',
-
'compaction.max_memory' = '1024'
-
'hive_sync.enable' = 'true',
-
'hive_sync.metastore.uris' = 'thrift://ip:9083',
-
'hive_sync.jdbc_url' = 'jdbc:hive2://ip:10000',
-
'hive_sync.use_jdbc' = 'true',
-
'hive_sync.db' = 'hudi_test_db',
-
'hive_sync.table' = 'hudi_test_table',
-
'hive_sync.partition_fields' = 'date',
-
--'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.NonPartitionedExtractor',
-
'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
-
'hive_sync.file_format' = 'PARQUET',
-
'hive_sync.support_timestamp' = 'true',
-
'hive_sync.username' = 'hive',
-
'hive_sync.password' = '123456'
-
);
写入
-
-- 直接写入
-
insert into hudi_test_table values
-
(1, 'a', timestamp '2021-09-26 00:00:01', '2021-09-26 00:00:01', '2021-09-26'),
-
(2, 'b', timestamp '2021-09-26 00:00:02', '2021-09-26 00:00:02', '2021-09-26'),
-
(3, 'c', timestamp '2021-09-26 00:00:03', '2021-09-26 00:00:03', '2021-09-26'),
-
(4, 'd', timestamp '2021-09-27 00:00:04', '2021-09-27 00:00:04', '2021-09-27'),
-
(5, 'e', timestamp '2021-09-27 00:00:05', '2021-09-27 00:00:05', '2021-09-27'),
-
(6, 'f', timestamp '2021-09-27 00:00:06', '2021-09-27 00:00:06', '2021-09-27');
-
-
-- 单条 upsert
-
insert into hudi_test_table values
-
(1, 'update', timestamp '2021-09-26 12:12:12', '2021-09-26 12:12:12', '2021-09-26');
-
-
-- 从 source 写入
-
insert into hudi_test_table
-
select
-
id,
-
data,
-
ts,
-
date_format(ts, 'yyyy-MM-dd HH:mm:ss') as `time`,
-
date_format(ts, 'yyyy-MM-dd') as `date`
-
from datagen_source;
流读
-
set table.dynamic-table-options.enabled = true;
-
select count(0) from
-
/* OPTIONS('read.tasks' = '10', 'read.streaming.enabled' = 'true') */;
pom文件
-
<properties>
-
<!-- project compiler -->
-
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-
<maven.compiler.source>1.8</maven.compiler.source>
-
<maven.compiler.target>1.8</maven.compiler.target>
-
<!-- maven compiler-->
-
<scala.maven.plugin.version>3.2.2</scala.maven.plugin.version>
-
<maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
-
<maven.assembly.plugin.version>3.1.1</maven.assembly.plugin.version>
-
<!-- sdk -->
-
<java.version>1.8</java.version>
-
<scala.version>2.12.13</scala.version>
-
<scala.binary.version>2.12</scala.binary.version>
-
<!-- engine-->
-
<hadoop.version>2.9.2</hadoop.version>
-
<flink.version>1.12.2</flink.version>
-
<hoodie.version>0.9.0</hoodie.version>
-
<hive.version>2.3.9</hive.version>
-
-
<!-- <scope.type>provided</scope.type>-->
-
<scope.type>compile</scope.type>
-
</properties>
-
-
<dependencies>
-
<!-- scala -->
-
<dependency>
-
<groupId>org.scala-lang</groupId>
-
<artifactId>scala-library</artifactId>
-
<version>${scala.version}</version>
-
<scope>${scope.type}</scope>
-
</dependency>
-
-
<!-- flink Dependency -->
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
-
<version>${flink.version}</version>
-
<scope>${scope.type}</scope>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-core</artifactId>
-
<version>${flink.version}</version>
-
<scope>${scope.type}</scope>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-scala_${scala.binary.version}</artifactId>
-
<version>${flink.version}</version>
-
<scope>${scope.type}</scope>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-table-common</artifactId>
-
<version>${flink.version}</version>
-
<scope>${scope.type}</scope>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
-
<version>${flink.version}</version>
-
<scope>${scope.type}</scope>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
-
<version>${flink.version}</version>
-
<scope>${scope.type}</scope>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
-
<version>${flink.version}</version>
-
<scope>${scope.type}</scope>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-clients_${scala.binary.version}</artifactId>
-
<version>${flink.version}</version>
-
<scope>${scope.type}</scope>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-csv</artifactId>
-
<version>${flink.version}</version>
-
<scope>${scope.type}</scope>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-json</artifactId>
-
<version>${flink.version}</version>
-
<scope>${scope.type}</scope>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-orc_${scala.binary.version}</artifactId>
-
<version>${flink.version}</version>
-
<scope>${scope.type}</scope>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
-
<version>${flink.version}</version>
-
<scope>${scope.type}</scope>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
-
<version>${flink.version}</version>
-
<scope>${scope.type}</scope>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
-
<version>${flink.version}</version>
-
<scope>${scope.type}</scope>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.flink</groupId>
-
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
-
<version>${flink.version}</version>
-
<scope>${scope.type}</scope>
-
</dependency>
-
<dependency>
-
<groupId>com.alibaba.ververica</groupId>
-
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
-
<version>1.2.0</version>
-
<scope>${scope.type}</scope>
-
</dependency>
-
-
<!-- hudi Dependency -->
-
<dependency>
-
<groupId>org.apache.hudi</groupId>
-
<artifactId>hudi-flink-bundle_${scala.binary.version}</artifactId>
-
<version>${hoodie.version}</version>
-
<scope>${scope.type}</scope>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.hudi</groupId>
-
<artifactId>hudi-flink-client</artifactId>
-
<version>${hoodie.version}</version>
-
<scope>${scope.type}</scope>
-
</dependency>
-
-
<!-- hadoop Dependency-->
-
<dependency>
-
<groupId>org.apache.hadoop</groupId>
-
<artifactId>hadoop-common</artifactId>
-
<version>${hadoop.version}</version>
-
<scope>${scope.type}</scope>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.hadoop</groupId>
-
<artifactId>hadoop-hdfs</artifactId>
-
<version>${hadoop.version}</version>
-
<scope>${scope.type}</scope>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.hadoop</groupId>
-
<artifactId>hadoop-client</artifactId>
-
<version>${hadoop.version}</version>
-
<scope>${scope.type}</scope>
-
</dependency>
-
-
<!-- hive Dependency-->
-
<dependency>
-
<groupId>org.apache.hive</groupId>
-
<artifactId>hive-exec</artifactId>
-
<version>${hive.version}</version>
-
<scope>${scope.type}</scope>
-
<exclusions>
-
<exclusion>
-
<groupId>org.apache.logging.log4j</groupId>
-
<artifactId>log4j-slf4j-impl</artifactId>
-
</exclusion>
-
<exclusion>
-
<groupId>org.apache.hive</groupId>
-
<artifactId>hive-llap-tez</artifactId>
-
</exclusion>
-
</exclusions>
-
</dependency>
-
</dependencies>
-
-
<build>
-
<plugins>
-
<plugin>
-
<groupId>net.alchim31.maven</groupId>
-
<artifactId>scala-maven-plugin</artifactId>
-
<version>${scala.maven.plugin.version}</version>
-
<executions>
-
<execution>
-
<goals>
-
<goal>compile</goal>
-
</goals>
-
</execution>
-
</executions>
-
</plugin>
-
<plugin>
-
<groupId>org.apache.maven.plugins</groupId>
-
<artifactId>maven-assembly-plugin</artifactId>
-
<version>${maven.assembly.plugin.version}</version>
-
<configuration>
-
<descriptorRefs>
-
<descriptorRef>jar-with-dependencies</descriptorRef>
-
</descriptorRefs>
-
</configuration>
-
<executions>
-
<execution>
-
<id>make-assembly</id>
-
<phase>package</phase>
-
<goals>
-
<goal>single</goal>
-
</goals>
-
</execution>
-
</executions>
-
</plugin>
-
</plugins>
-
</build>
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgkhgfi
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13