记 - 实时数仓开发实践 - doris/pg/flink
业务场景描述 - 实时数仓
- 查询性能在毫秒级
- 业务数据可以随意修改、删除数据等操作
- 指标计算需求涉及跨多个表关联查询,4个union all [即5个select],涉及大表的json字符串解析与计算
- 存在json字符串解析,且解析后数据做聚合,再做差值
环境描述
doris 1.1
debezium - flink - doris 框架实现
doris - UNIQUE KEY 模式实现
实现思路1: flink 消费kafka明细数据到doris,表模型为UNIQUE KEY 模式,用普通视图包装ETL逻辑,供外部查询;但是经过验证发现查询性能在秒级,尝试进行慢sql优化。
针对慢sql的优化尝试:
- 使用rollup单表提前聚合,但是不支持按部分主键聚合;
- 优化表前缀索引,查询性能有所提升,但还是在秒级范围内;
- 把json字符串解析功能提前完成同时删除表无关字段;
结论: 同步明细数据到doris中ETL,数据查询性能无法达到预期的时间要求。【未解决问题】
实现思路2: flink 消费kafka明细数据经过ETL计算聚合后sink到doris,用普通视图包装ETL逻辑。
结论: 同步聚合数据到doris进行查询,数据查询性能达到预期的时间要求。
doris - AGGREGATE KEY 模式实现
1、flink 消费kafka明细数据经过ETL计算聚合后sink到doris,用普通视图包装ETL逻辑;
2、【flink 计算原理解析】flink 是有状态的计算,每次的输出都是把历史状态 当前状态的数据做逻辑计算后sink到目标表,因此非key字段要的value要设置为replace;此时的模式即是UNIQUE KEY模式的实现。【UNIQUE KEY仅是AGGREGATE KEY的一种特例形式】
结论: 同步聚合数据到doris进行查询,数据查询性能达到预期的时间要求。
debezium - flink - PostgreSQL 框架实现
实现思路:
flink 消费kafka明细数据,flink任务中数据聚合计算后同步到PostgreSQL数据库
结论: 同步聚合数据到doris进行查询,数据查询性能达到预期的时间要求。
doris 使用总结
尽量减少 delete/insert操作
1、delete/insert操作,性能较慢;建议减少deleted/insert操作或者加大批量写入。
作为key字段不能为null
- 要注意key字段不能为空,某一些业务场景可能作为key字段为null ,数据会写入失败。
key字段为null报错如下:
Column ‘birth_date’ is NOT NULL, however, a null value is being written into it. You can set job configuration ‘table.exec.sink.not-null-enforcer’=‘drop’ to suppress this exception and drop such records silently.
UNIQUE KEY 模式对rollup的支持
CREATE TABLE XXXX (
event_id bigint(20) NULL COMMENT "",
animal_id bigint(20) NULL COMMENT "",
org_id bigint(20) NULL COMMENT "",
pig_num bigint(20)
) ENGINE=OLAP
UNIQUE KEY(event_id, animal_id, org_id)
COMMENT "OLAP"
DISTRIBUTED BY HASH(org_id) BUCKETS 32
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
);
执行:
ALTER TABLE DB.XXX ADD ROLLUP r1(org_id, pig_num);
报错信息: SQL 错误 [1105] [HY000]: errCode = 2, detailMessage = Rollup should contains all unique keys in basetable
加入全部主键,没有问题:
ALTER TABLE test_bigdata_realtime_metrics.dwd_fpf_anc_event_list0727 ADD ROLLUP r1(event_id,org_id,animal_id, pig_num);
官网说是支持的,有兴趣可以查看:https://doris.apache.org/zh-CN/docs/data-table/hit-the-rollup/#aggregate-和-unique-模型中的-rollup
结论:UNIQUE KEY 模式不支持单一主键 rollup,必须包含表所有主键
AGGREGATE KEY 模式对rollup的支持
CREATE TABLE db.XXX (
event_id bigint(20) NULL COMMENT "",
animal_id bigint(20) NULL COMMENT "",
org_id bigint(20) NULL COMMENT "",
pig_num bigint(20) SUM
) ENGINE=OLAP
Aggregate KEY(event_id, animal_id, org_id)
COMMENT "OLAP"
DISTRIBUTED BY HASH(org_id) BUCKETS 32
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
);
使用单一主键创建 ROLLUP :
ALTER TABLE db.XXX ADD ROLLUP r1(org_id , pig_num);
结论: 执行成功,支持单一主键
AGGREGATE KEY 模式 - 验证insert/delete
建表语句
CREATE TABLE `dws_fmc_piglet_got_confirm_agg` (
`tenant_id` bigint(20) NULL COMMENT "",
`got_date` varchar(255) NULL COMMENT "",
`pig_types` varchar(255) NULL COMMENT "",
`pig_type` varchar(255) NULL COMMENT "",
`pig_num` int(11) REPLACE NULL COMMENT "",
`compute_time` datetime MAX NULL COMMENT "",
) ENGINE=OLAP
AGGREGATE KEY(`tenant_id`, `got_date`, `pig_types`, `pig_type`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`tenant_id`) BUCKETS 32
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
)
**结论:**数据可以正常回撤,原理同UNIQUE KEY 模式。【UNIQUE KEY 模式是AGGREGATE KEY模式的特例而已】
doris UNIQUE KEY 模式 - 查询不准确
1、数据查询不准确:不确定是否为后台服务进程不稳定导致,过几分钟后数据变得一致。
2、官网已知查询不准确和解决方法:https://doris.apache.org/zh-CN/docs/faq/sql-faq#q4-unique-key-模型查询结果不一致
不要频繁操作alter
创建多个ROLLUP:
ALTER TABLE db.XXX ADD ROLLUP r1(col1, col4);
ALTER TABLE db.XXX ADD ROLLUP r2(col3, col5);
报错信息:
SQL 错误 [1105] [HY000]: errCode = 2, detailMessage = Table[db.XXX]'s state is not NORMAL. Do not allow doing ALTER ops
string/text/float/double 不能做key,date 做key不报错但数据显示异常和不准确
doris 版本:1.1
1、key字段不能为 text、string , 否则表创建失败
2、key中部分字段是date 类型,flink sink to doris B表,date类型字段大部分数据变成null
建表语句:
CREATE TABLE XXXX (
t_id bigint(20) NULL COMMENT "",
f_id bigint(20) NULL COMMENT "",
a_id bigint(20) NULL COMMENT "",
birth_str varchar(255) NULL COMMENT "",
birth_date date NULL COMMENT "",
pig_types varchar(255) NULL COMMENT "",
pig_type varchar(255) NULL COMMENT "",
pig_num int(11) NULL COMMENT "",
compute_time datetime NULL COMMENT "",
) ENGINE=OLAP
UNIQUE KEY(t_id, f_id, a_id, birth_str, birth_date, pig_types, pig_type)
COMMENT "OLAP"
DISTRIBUTED BY HASH(t_id) BUCKETS 32
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
)
flink ETL sql:
SELECT
basic.t_id ,
basic.f_id,
basic.a_id,
cast(basic.birth_date as varchar) as birth_str,
basic.birth_date,
'a' AS pig_types,
'b' AS pig_type,
1 as pig_num,
localtimestamp as compute_time
FROM basic
WHERE
basic.birth_date is not null
查询结果:
特殊字符串解析
1、json 字符串解析不是特别快,建议想要更高的性能,提前解析json ,doris只做逻辑计算【可能直接计算是毫秒级,解析特殊字符串再用来计算可能变成秒级,不是绝对】
sql 报错不够精准
有错误sql 如下:
SELECT
basic.tenant_id AS tenant_id,
basic.farm_id AS farm_id,
CASE
WHEN datediff(now() , cast(birth_time as datetime)) between 0 and 30 THEN '0-30日龄'
WHEN datediff(now(), cast(birth_time as datetime)) between 31 and 60 THEN '31-60日龄'
WHEN datediff(now(), cast(birth_time as datetime)) between 61 and 90 THEN '61-90日龄'
WHEN datediff(now(), cast(birth_time as datetime)) between 91 and 120 THEN '91-120日龄'
WHEN datediff(now(), cast(birth_time as datetime)) between 121 and 150 THEN '121-150日龄'
ELSE '150日龄以上'
END AS pig_sub_type
,sum(1)
FROM basic
group by
basic.tenant_id ,
basic.farm_id,
CASE
WHEN datediff(now(), cast(birth_time as datetime)) between 0 and 30 THEN '0-30日龄'
WHEN datediff(now(), cast(birth_time as datetime)) between 31 and 60 THEN '31-60日龄'
WHEN datediff(now(), cast(birth_time as datetime)) between 61 and 90 THEN '61-90日龄'
WHEN datediff(now(), cast(birth_time as datetime)) between 91 and 120 THEN '91-120日龄'
WHEN datediff(now(), cast(birth_time as datetime)) between 121 and 150 THEN '121-150日龄'
ELSE '150日龄以上'
END
报错信息:select list expression not produced by aggregation output (missing from GROUP BY clause?): CASE WHEN datediff(‘2022-07-27 10:06:44’, CAST(birth_time AS DATETIME)) >= 0
实际错误原因 :是datediff函数中传入的参数类型有错误,两个参数必须是datetime类型;
如:select datediff(CAST(‘2007-12-31 23:59:59’ AS DATETIME), CAST(‘2007-12-30’ AS DATETIME));
sql修正如下:
SELECT
basic.tenant_id AS tenant_id,
basic.farm_id AS farm_id,
CASE
WHEN datediff(CURRENT_DATE() , cast(birth_time as datetime)) between 0 and 30 THEN '0-30日龄'
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 31 and 60 THEN '31-60日龄'
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 61 and 90 THEN '61-90日龄'
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 91 and 120 THEN '91-120日龄'
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 121 and 150 THEN '121-150日龄'
ELSE '150日龄以上'
END AS pig_sub_type
,sum(1)
FROM basic
group by
basic.tenant_id ,
basic.farm_id,
CASE
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 0 and 30 THEN '0-30日龄'
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 31 and 60 THEN '31-60日龄'
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 61 and 90 THEN '61-90日龄'
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 91 and 120 THEN '91-120日龄'
WHEN datediff(CURRENT_DATE(), cast(birth_time as datetime)) between 121 and 150 THEN '121-150日龄'
ELSE '150日龄以上'
END
datediff 函数传参说明
- DATETIME DATEDIFF(DATETIME expr1,DATETIME expr2) ,注意参数类型,报错不一定是提示函数参数类型错误
- 计算expr1 - expr2,结果精确到天。
- expr1 和 expr2 参数是合法的日期或日期/时间表达式。
- 注释:只有值的日期部分参与计算。
- example:select datediff(CAST(‘2007-12-31 23:59:59’ AS DATETIME), CAST(‘2007-12-30’ AS DATETIME));
补充 - flink 相关的
flink ETL中当前日期无法再次触发当前日期函数计算
问题描述:flink etl sql 中使用了当前时间/日期 - 表记录中的某个时间/日期字段,若某个id的记录在历史时间已经被计算过,但是在今天或者每天再取这个时间的差值,实际上这个差值是少了记录时间到当前时间的时间差,会导致数据不一致。
解决: 实际上,解决方法在问题描述中已经写出来了。新增记录计算时间,此时计算的日期差值没有问题,然后随着时间推移当前时间不断变化,然而flink etl 任务的当前时间还是历史的当前时间,不会再被触发计算【记录被修改除外】,此时需要使用**(当前时间 - 记录的计算时间) 已计算好的日期差值 = 当前时间 - 记录日期的差值**
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhfkfigg
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01