Flink—读Hive表数据写入Kafka
关注微信公众号:CodingTechWork,一起学习进步。
引言
场景
数仓Hive中的数据需要读取后写入Kafka中进行数据服务输出。
选型
选用Flink进行读Hive写Kafka,因为其拥有丰富的connector可选择。
开发
pom依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.11</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
<configuration>
<skipSource>true</skipSource>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<archive>
<!--确定主类-->
<manifest>
<mainClass>com.test.demo.flinkhive2kafka.job.Hive2Kafka</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
job类
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.catalog.hive.HiveCatalog;
import lombok.extern.slf4j.SLf4j;
@Slf4j
public class Hive2Kafka {
public static void main(String[] args) {
// 设置flink sql环境
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
// 创建table环境
TableEnvironment tableEnvironment = TableEnvironment.create(environmentSettings);
// 设置配置
tableEnvironment.getConfig().getConfiguration().setString("table.exec.hive.fallback-mapred-reader", "true")
// 获取外部配置
ParameterTool parameterTool = ParameterTool.fromArgs(args);
log.info("parameters size: {}", parameterTool.getNumberOfParameters());
// 获取所有配置
String hiveCatalogName = parameterTool.get("hive.catalog.name");
String hiveConfDir = parameterTool.get("hive.conf.dir");
String hiveDatabaseName = parameterTool.get("hive.db.name");
String hiveKafakaTable = parameterTool.get("hive.kafka.tb");
String kafkaBootstrapServer = parameterTool.get("kafka.bootstrap.server");
String kafkaTopic = parameterTool.get("kafka.topic");
String kafkaGroupId = parameterTool.get("kafka.group.id");
String kafkaUsername = parameterTool.get("kafka.username");
String kafkaPassword = parameterTool.get("kafka.password");
String insertKafkaTableSql = parameterTool.get("insert.kafka.table.sql");
// 创建hive catalog
HiveCatalog hiveCatalog = new HiveCatalog(hiveCatalogName, hiveDatabaseName, hiveConfDir);
// 注册catalog
tableEnvironment.registerCatalog(hiveCatalogName, hiveCatalog);
// 使用catalog
tableEnvironment.useCatalog(hiveCatalogName);
String createKafkaTableSql = String.format("CREATE TABLE IF NOT EXISTS %s(`field01` STRING) \n"
"WITH('connector' = 'kafka', \n"
"'topic' = '%s', \n"
"'properties.group.id' = '%s', \n"
"'properties.bootstrap.servers' = '%s', \n"
"'scan.startup.mode' = 'group-offsets', \n"
"'properties.auto.offset.reset' = 'earliest', \n"
"'format' = 'raw', \n"
"'properties.security.protocol' = 'SASL_PLAINTEXT', \n"
"'properties.sasl.mechanism' = 'PLAIN', \n"
"'properties.sasl.mechanism' = 'org.apache.kafka.common.security.plain.PlainLoginModule "
"required username = \"%s\" password=\"%s\";'\n"
")",hiveKafkaTable, kafkaTopic, kafkaGroupId, kafkaBootstrapServer, kafkaUsername, kafkaPassword);
// 创建kafka表
tableEnvironment.executeSql(createKafkaTableSql).print();
// 执行flink sql
tableEnvironment.executeSql(insertKafkaTableSql).print();
}
}
执行
使用yarn-application
模式
./fkink run-application -t yarn-application flink-hive-2-kafka-1.0.jar --hive.db.name xxx --hive.kafka.tb xxx --kafka.bootstrap.server xxx:9092,xxx:9092 --kafka.topic xxx --kafka.group.id xxx --kafka.username xxx --kafka.password 'xxx' --sql.insert.kafka.table 'xxxxxxx'
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhfehegi
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
photoshop蒙版画笔没反应怎么办
PHP中文网 06-24