• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

记 - 实时数仓开发实践 - doris/pg/flink

武飞扬头像
cg6
帮助1

业务场景描述 - 实时数仓

  1. 查询性能在毫秒级
  2. 业务数据可以随意修改、删除数据等操作
  3. 指标计算需求涉及跨多个表关联查询,4个union all [即5个select],涉及大表的json字符串解析与计算
  4. 存在json字符串解析,且解析后数据做聚合,再做差值

环境描述

doris 1.1

debezium - flink - doris 框架实现

doris - UNIQUE KEY 模式实现

实现思路1: flink 消费kafka明细数据到doris,表模型为UNIQUE KEY 模式,用普通视图包装ETL逻辑,供外部查询;但是经过验证发现查询性能在秒级,尝试进行慢sql优化。

针对慢sql的优化尝试:

  1. 使用rollup单表提前聚合,但是不支持按部分主键聚合;
  2. 优化表前缀索引,查询性能有所提升,但还是在秒级范围内;
  3. 把json字符串解析功能提前完成同时删除表无关字段;

结论: 同步明细数据到doris中ETL,数据查询性能无法达到预期的时间要求。【未解决问题】

实现思路2: flink 消费kafka明细数据经过ETL计算聚合后sink到doris,用普通视图包装ETL逻辑。

结论: 同步聚合数据到doris进行查询,数据查询性能达到预期的时间要求。

doris - AGGREGATE KEY 模式实现

1、flink 消费kafka明细数据经过ETL计算聚合后sink到doris,用普通视图包装ETL逻辑;
2、【flink 计算原理解析】flink 是有状态的计算,每次的输出都是把历史状态 当前状态的数据做逻辑计算后sink到目标表,因此非key字段要的value要设置为replace;此时的模式即是UNIQUE KEY模式的实现。【UNIQUE KEY仅是AGGREGATE KEY的一种特例形式】

结论: 同步聚合数据到doris进行查询,数据查询性能达到预期的时间要求。

debezium - flink - PostgreSQL 框架实现

实现思路:
flink 消费kafka明细数据,flink任务中数据聚合计算后同步到PostgreSQL数据库
结论: 同步聚合数据到doris进行查询,数据查询性能达到预期的时间要求。

doris 使用总结

尽量减少 delete/insert操作

1、delete/insert操作,性能较慢;建议减少deleted/insert操作或者加大批量写入。

作为key字段不能为null

  1. 要注意key字段不能为空,某一些业务场景可能作为key字段为null ,数据会写入失败。

key字段为null报错如下
Column ‘birth_date’ is NOT NULL, however, a null value is being written into it. You can set job configuration ‘table.exec.sink.not-null-enforcer’=‘drop’ to suppress this exception and drop such records silently.

UNIQUE KEY 模式对rollup的支持

CREATE TABLE XXXX (
  event_id bigint(20) NULL COMMENT "",
  animal_id bigint(20) NULL COMMENT "",
  org_id bigint(20) NULL COMMENT "",
  pig_num bigint(20)
) ENGINE=OLAP
UNIQUE KEY(event_id, animal_id, org_id)
COMMENT "OLAP"
DISTRIBUTED BY HASH(org_id) BUCKETS 32
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
);

执行
ALTER TABLE DB.XXX ADD ROLLUP r1(org_id, pig_num);
报错信息: SQL 错误 [1105] [HY000]: errCode = 2, detailMessage = Rollup should contains all unique keys in basetable

加入全部主键,没有问题
ALTER TABLE test_bigdata_realtime_metrics.dwd_fpf_anc_event_list0727 ADD ROLLUP r1(event_id,org_id,animal_id, pig_num);

官网说是支持的,有兴趣可以查看:https://doris.apache.org/zh-CN/docs/data-table/hit-the-rollup/#aggregate-和-unique-模型中的-rollup

结论:UNIQUE KEY 模式不支持单一主键 rollup,必须包含表所有主键

AGGREGATE KEY 模式对rollup的支持

CREATE TABLE db.XXX (
  event_id bigint(20) NULL COMMENT "",
  animal_id bigint(20) NULL COMMENT "",
  org_id bigint(20) NULL COMMENT "",
  pig_num bigint(20) SUM
) ENGINE=OLAP
Aggregate KEY(event_id, animal_id, org_id)
COMMENT "OLAP"
DISTRIBUTED BY HASH(org_id) BUCKETS 32
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
);

使用单一主键创建 ROLLUP :
ALTER TABLE db.XXX ADD ROLLUP r1(org_id , pig_num);

结论: 执行成功,支持单一主键

AGGREGATE KEY 模式 - 验证insert/delete

建表语句

CREATE TABLE `dws_fmc_piglet_got_confirm_agg` (
  `tenant_id` bigint(20) NULL COMMENT "",
  `got_date` varchar(255) NULL COMMENT "",
  `pig_types` varchar(255) NULL COMMENT "",
  `pig_type` varchar(255) NULL COMMENT "",
  `pig_num` int(11) REPLACE NULL COMMENT "",
  `compute_time` datetime MAX NULL COMMENT "",
) ENGINE=OLAP
AGGREGATE KEY(`tenant_id`, `got_date`, `pig_types`, `pig_type`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`tenant_id`) BUCKETS 32
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
)
学新通

**结论:**数据可以正常回撤,原理同UNIQUE KEY 模式。【UNIQUE KEY 模式是AGGREGATE KEY模式的特例而已】

doris UNIQUE KEY 模式 - 查询不准确

1、数据查询不准确:不确定是否为后台服务进程不稳定导致,过几分钟后数据变得一致。
2、官网已知查询不准确和解决方法:https://doris.apache.org/zh-CN/docs/faq/sql-faq#q4-unique-key-模型查询结果不一致

不要频繁操作alter

创建多个ROLLUP:

ALTER TABLE db.XXX ADD ROLLUP r1(col1, col4);
ALTER TABLE db.XXX ADD ROLLUP r2(col3, col5);

报错信息:

SQL 错误 [1105] [HY000]: errCode = 2, detailMessage = Table[db.XXX]'s state is not NORMAL. Do not allow doing ALTER ops

string/text/float/double 不能做key,date 做key不报错但数据显示异常和不准确

doris 版本:1.1
1、key字段不能为 text、string , 否则表创建失败
2、key中部分字段是date 类型,flink sink to doris B表,date类型字段大部分数据变成null

建表语句:

CREATE TABLE XXXX (
  t_id bigint(20) NULL COMMENT "",
  f_id bigint(20) NULL COMMENT "",
  a_id bigint(20) NULL COMMENT "",
  birth_str varchar(255) NULL COMMENT "",
  birth_date date NULL COMMENT "",
  pig_types varchar(255) NULL COMMENT "",
  pig_type varchar(255) NULL COMMENT "",
  pig_num int(11) NULL COMMENT "",
  compute_time datetime NULL COMMENT "",
) ENGINE=OLAP
UNIQUE KEY(t_id, f_id, a_id, birth_str, birth_date, pig_types, pig_type)
COMMENT "OLAP"
DISTRIBUTED BY HASH(t_id) BUCKETS 32
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
)
学新通

flink ETL sql:

SELECT
    basic.t_id ,
    basic.f_id,
    basic.a_id,
    cast(basic.birth_date as varchar) as birth_str,
    basic.birth_date,
    'a' AS pig_types,
    'b' AS pig_type,
    1 as pig_num,
    localtimestamp as compute_time
FROM basic
WHERE 
    basic.birth_date is not null

查询结果:
学新通

特殊字符串解析

1、json 字符串解析不是特别快,建议想要更高的性能,提前解析json ,doris只做逻辑计算【可能直接计算是毫秒级,解析特殊字符串再用来计算可能变成秒级,不是绝对】

sql 报错不够精准

有错误sql 如下:

SELECT
	basic.tenant_id AS tenant_id,
	basic.farm_id AS farm_id,
	CASE
		WHEN datediff(now() , cast(birth_time as datetime)) between 0 and 30 THEN '0-30日龄'
		WHEN datediff(now(), cast(birth_time as datetime)) between 31 and 60 THEN '31-60日龄'
		WHEN datediff(now(), cast(birth_time as datetime)) between 61 and 90 THEN '61-90日龄'
		WHEN datediff(now(), cast(birth_time as datetime)) between 91 and 120 THEN '91-120日龄'
		WHEN datediff(now(), cast(birth_time as datetime)) between 121 and 150 THEN '121-150日龄'
		ELSE '150日龄以上'
	END AS pig_sub_type
	,sum(1)
FROM basic
group by 
basic.tenant_id ,
basic.farm_id,
CASE
	WHEN datediff(now(), cast(birth_time as datetime)) between 0 and 30 THEN '0-30日龄'
	WHEN datediff(now(), cast(birth_time as datetime)) between 31 and 60 THEN '31-60日龄'
	WHEN datediff(now(), cast(birth_time as datetime)) between 61 and 90 THEN '61-90日龄'
	WHEN datediff(now(), cast(birth_time as datetime)) between 91 and 120 THEN '91-120日龄'
	WHEN datediff(now(), cast(birth_time as datetime)) between 121 and 150 THEN '121-150日龄'
	ELSE '150日龄以上'
END
学新通

报错信息:select list expression not produced by aggregation output (missing from GROUP BY clause?): CASE WHEN datediff(‘2022-07-27 10:06:44’, CAST(birth_time AS DATETIME)) >= 0

实际错误原因 :是datediff函数中传入的参数类型有错误,两个参数必须是datetime类型;
如:select datediff(CAST(‘2007-12-31 23:59:59’ AS DATETIME), CAST(‘2007-12-30’ AS DATETIME));

sql修正如下:

SELECT
	basic.tenant_id AS tenant_id,
	basic.farm_id AS farm_id,
	CASE
		WHEN datediff(CURRENT_DATE() , cast(birth_time as datetime)) between 0 and 30 THEN '0-30日龄'
		WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 31 and 60 THEN '31-60日龄'
		WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 61 and 90 THEN '61-90日龄'
		WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 91 and 120 THEN '91-120日龄'
		WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 121 and 150 THEN '121-150日龄'
		ELSE '150日龄以上'
	END AS pig_sub_type
	,sum(1)
FROM basic
group by 
basic.tenant_id ,
basic.farm_id,
CASE
	WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 0 and 30 THEN '0-30日龄'
	WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 31 and 60 THEN '31-60日龄'
	WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 61 and 90 THEN '61-90日龄'
	WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 91 and 120 THEN '91-120日龄'
	WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 121 and 150 THEN '121-150日龄'
	ELSE '150日龄以上'
END
学新通

datediff 函数传参说明

  1. DATETIME DATEDIFF(DATETIME expr1,DATETIME expr2) ,注意参数类型,报错不一定是提示函数参数类型错误
  2. 计算expr1 - expr2,结果精确到天。
  3. expr1 和 expr2 参数是合法的日期或日期/时间表达式。
  4. 注释:只有值的日期部分参与计算。
  5. example:select datediff(CAST(‘2007-12-31 23:59:59’ AS DATETIME), CAST(‘2007-12-30’ AS DATETIME));

补充 - flink 相关的

flink ETL中当前日期无法再次触发当前日期函数计算

问题描述:flink etl sql 中使用了当前时间/日期 - 表记录中的某个时间/日期字段,若某个id的记录在历史时间已经被计算过,但是在今天或者每天再取这个时间的差值,实际上这个差值是少了记录时间到当前时间的时间差,会导致数据不一致。
解决: 实际上,解决方法在问题描述中已经写出来了。新增记录计算时间,此时计算的日期差值没有问题,然后随着时间推移当前时间不断变化,然而flink etl 任务的当前时间还是历史的当前时间,不会再被触发计算【记录被修改除外】,此时需要使用**(当前时间 - 记录的计算时间) 已计算好的日期差值 = 当前时间 - 记录日期的差值**

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfkfigg
系列文章
更多 icon
同类精品
更多 icon
继续加载