• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

通过Flink-SqlKafka数据写入HDFS

武飞扬头像
IT_xhf
帮助1

系列文章目录

提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加
例如:第一章 Python 机器学习入门之pandas的使用


提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档


前言

近期,公司有需求将Kafka的数据写入到Hive表中,当时看到Flink有一个File Connector可以将文件写入到HDFS,所以开始了解Flink-Sql写入到HDFS的使用。


一、创建Hive Catalog

将Flink-Sql的元数据通过hive catalog保存起来。这样通过Flink Sql创建的表都会保存到Hive中。

CREATE CATALOG myhive_default WITH (
    'type' = 'hive',
    'default-database' = 'default',
    'hive-conf-dir' = '/etc/hive/conf'
);
use catalog myhive_default;

二、创建表

1.创建Kafka表

CREATE TABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  log_ts TIMESTAMP(3),
  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP column
) WITH (
  'connector' = 'kafka',
  'topic' = 'kafka2hive2',
  'properties.bootstrap.servers' = 'ip1:9092,ip2:9092,ip3:9092',
  'properties.group.id' = 'kafka2hive',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);

2.创建Hive表

CREATE external TABLE fs_table (
  user_id STRING,
  order_amount DOUBLE
) partitioned by (dt string,h string,m string) 
  stored as ORC 
  TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',
  'sink.partition-commit.delay'='0s',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.policy.kind'='metastore');

3. 执行同步语句

set execution.checkpointing.interval=10sec;
insert into  fs_table 
SELECT user_id, order_amount,DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH'), DATE_FORMAT(log_ts, 'mm') FROM kafka_table;

总结

不开启checkpoint, 写入到Hive的文件都是inprogress状态, 所以在执行之前要开启checkpoint。任务checkpoint后,临时文件会变成正式文件。

参考资料

Flink-File-Connector

Flink-Hive-Connector

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhghfiae
系列文章
更多 icon
同类精品
更多 icon
继续加载