• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Flink—读Hive表数据写入Kafka

武飞扬头像
Andya_net
帮助3


学新通
关注微信公众号:CodingTechWork,一起学习进步。

引言

场景

  数仓Hive中的数据需要读取后写入Kafka中进行数据服务输出。

选型

  选用Flink进行读Hive写Kafka,因为其拥有丰富的connector可选择。

开发

pom依赖

<dependencies>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-table-api-java-bridge_2.11</artifactId>
		<version>1.13.2</version>
	</dependency>
		<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-connector-hive_2.11</artifactId>
		<version>2.2.0</version>
	</dependency>
</dependencies>

<build>
	<plugins>
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-source-plugin</artifactId>
			<executions>
				<execution>
					<id>attach-sources</id>
					<goals>
						<goal>jar</goal>
					</goals>
				</execution>
			</executions>
			<configuration>
				<skipSource>true</skipSource>
			</configuration>
		</plugin>
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-jar-plugin</artifactId>
			<version>3.2.0</version>
			<configuration>
				<archive>
				<!--确定主类-->
					<manifest>
						<mainClass>com.test.demo.flinkhive2kafka.job.Hive2Kafka</mainClass>
					</manifest>
				</archive>
			</configuration>
		</plugin>
	</plugins>
</build>
学新通

job类


import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.catalog.hive.HiveCatalog;
import lombok.extern.slf4j.SLf4j;

@Slf4j
public class Hive2Kafka {
	public static void main(String[] args) {
		// 设置flink sql环境
		EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build();

		// 创建table环境
		TableEnvironment tableEnvironment = TableEnvironment.create(environmentSettings);

		// 设置配置
		tableEnvironment.getConfig().getConfiguration().setString("table.exec.hive.fallback-mapred-reader", "true")
		
		// 获取外部配置
		ParameterTool parameterTool = ParameterTool.fromArgs(args);
		log.info("parameters size: {}", parameterTool.getNumberOfParameters());
		
		// 获取所有配置
		String hiveCatalogName = parameterTool.get("hive.catalog.name");
		String hiveConfDir = parameterTool.get("hive.conf.dir");
		String hiveDatabaseName = parameterTool.get("hive.db.name");
		String hiveKafakaTable = parameterTool.get("hive.kafka.tb");
		String kafkaBootstrapServer = parameterTool.get("kafka.bootstrap.server");
		String kafkaTopic = parameterTool.get("kafka.topic");
		String kafkaGroupId = parameterTool.get("kafka.group.id");
		String kafkaUsername = parameterTool.get("kafka.username");
		String kafkaPassword = parameterTool.get("kafka.password");
		String insertKafkaTableSql = parameterTool.get("insert.kafka.table.sql");

		// 创建hive catalog
		HiveCatalog hiveCatalog = new HiveCatalog(hiveCatalogName, hiveDatabaseName, hiveConfDir);
		// 注册catalog
		tableEnvironment.registerCatalog(hiveCatalogName, hiveCatalog);
		// 使用catalog
		tableEnvironment.useCatalog(hiveCatalogName);
		
		String createKafkaTableSql = String.format("CREATE TABLE IF NOT EXISTS %s(`field01` STRING) \n"  
		"WITH('connector' = 'kafka', \n"  
		"'topic' = '%s', \n"   
		"'properties.group.id' = '%s', \n"  
		"'properties.bootstrap.servers' = '%s', \n"  
		"'scan.startup.mode' = 'group-offsets', \n"  
		"'properties.auto.offset.reset' = 'earliest', \n"  
		"'format' = 'raw', \n"  
		"'properties.security.protocol' = 'SASL_PLAINTEXT', \n"  
		"'properties.sasl.mechanism' = 'PLAIN', \n"  
		"'properties.sasl.mechanism' = 'org.apache.kafka.common.security.plain.PlainLoginModule "  
		"required username = \"%s\" password=\"%s\";'\n"  
		")",hiveKafkaTable, kafkaTopic, kafkaGroupId, kafkaBootstrapServer, kafkaUsername, kafkaPassword);
		// 创建kafka表	
		tableEnvironment.executeSql(createKafkaTableSql).print();
		// 执行flink sql
		tableEnvironment.executeSql(insertKafkaTableSql).print();
	}
}
学新通

执行

使用yarn-application模式

./fkink run-application -t yarn-application flink-hive-2-kafka-1.0.jar --hive.db.name xxx --hive.kafka.tb xxx --kafka.bootstrap.server xxx:9092,xxx:9092 --kafka.topic xxx --kafka.group.id xxx --kafka.username xxx --kafka.password 'xxx' --sql.insert.kafka.table 'xxxxxxx'

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfehegi
系列文章
更多 icon
同类精品
更多 icon
继续加载