阿里开源数据同步神器DataX异构数据源间数据同步同步MySQL和HDFS相互
Datax 实战使用
1.MySQL-To-HDFS
环境 & 准备说明:
描述: 为了快速搭建测试的数据库环境, 本系列将采用docker进行搭建部署, 如没有安装请参照本博客中的 Docker 系列课程。
(1) MySQL
# docker-compose.yml
version: '3.1'
services:
db8:
image: mysql
container_name: mysql8.x
command: --default-authentication-plugin=mysql_native_password
restart: always
environment:
MYSQL_ROOT_PASSWORD: www.Young.top
MYSQL_DATABASE: test
MYSQL_USER: test
MYSQL_PASSWORD: www.Young.top
volumes:
- "/app/mysql8:/var/lib/mysql"
ports:
- 3306:3306
# 部署流程 #
docker pull singularities/hadoop
docker-compose up -d
# 创建测试表
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
`uid` int(0) NOT NULL AUTO_INCREMENT COMMENT '用户id',
`name` varchar(32) CHARACTER SET utf8mb4 NOT NULL COMMENT '用户名称',
`age` int(0) NOT NULL COMMENT '用户年龄',
`hobby` varchar(255) CHARACTER SET utf8mb4 NOT NULL COMMENT '用户爱好',
`operation_time` datetime(0) NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '插入时间',
PRIMARY KEY (`uid`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 9 CHARACTER SET = utf8mb4 ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
INSERT INTO `test`.`user`(`uid`, `name`, `age`, `hobby`, `operation_time`) VALUES (1, 'Young', 20, 'Network,Computer', '2022-10-12 14:34:03');
INSERT INTO `test`.`user`(`uid`, `name`, `age`, `hobby`, `operation_time`) VALUES (2, 'Elastic', 18, '数据分析,数据采集,数据处理', '2022-10-12 17:16:34');
INSERT INTO `test`.`user`(`uid`, `name`, `age`, `hobby`, `operation_time`) VALUES (3, 'Logstash', 20, '日志采集,日志过滤', '2022-10-12 17:16:59');
INSERT INTO `test`.`user`(`uid`, `name`, `age`, `hobby`, `operation_time`) VALUES (4, 'Beats', 10, '通用日志采集', '2022-10-12 17:17:06');
INSERT INTO `test`.`user`(`uid`, `name`, `age`, `hobby`, `operation_time`) VALUES (5, 'Kibana', 19, '数据分析,日志搜寻,日志数据展示,可视化', '2022-10-12 17:27:38');
INSERT INTO `test`.`user`(`uid`, `name`, `age`, `hobby`, `operation_time`) VALUES (6, 'C', 25, '面向过程编程语言', '2022-10-13 02:43:30');
INSERT INTO `test`.`user`(`uid`, `name`, `age`, `hobby`, `operation_time`) VALUES (7, 'C ', 25, '面向对象', '2022-10-13 10:44:59');
INSERT INTO `test`.`user`(`uid`, `name`, `age`, `hobby`, `operation_time`) VALUES (8, 'Python', 26, '编程语言', '2022-10-13 10:48:45');
# 此时test表中有如下数据。
mysql> select * from test.user;
----- ----------- ----- --------------------------------------- ---------------------
| uid | name | age | hobby | operation_time |
----- ----------- ----- --------------------------------------- ---------------------
| 1 | Young | 20 | Network,Computer | 2022-10-12 14:34:03 |
| 2 | Elastic | 18 | 数据分析,数据采集,数据处理 | 2022-10-12 17:16:34 |
| 3 | Logstash | 20 | 日志采集,日志过滤 | 2022-10-12 17:16:59 |
| 4 | Beats | 10 | 通用日志采集 | 2022-10-12 17:17:06 |
| 5 | Kibana | 19 | 数据分析,日志搜寻,日志数据展示,可视化 | 2022-10-12 17:27:38 |
| 6 | C | 25 | 面向过程编程语言 | 2022-10-13 02:43:30 |
| 7 | C | 25 | 面向对象 | 2022-10-13 10:44:59 |
| 8 | Python | 26 | 编程语言 | 2022-10-13 10:48:45 |
----- ----------- ----- --------------------------------------- ---------------------
8 rows in set (0.04 sec)
# 表中字段类型
mysql> desc test.user;
---------------- -------------- ------ ----- --------- -----------------------------
| Field | Type | Null | Key | Default | Extra |
---------------- -------------- ------ ----- --------- -----------------------------
| uid | int | NO | PRI | NULL | auto_increment |
| name | varchar(32) | NO | | NULL | |
| age | int | NO | | NULL | |
| hobby | varchar(255) | NO | | NULL | |
| operation_time | datetime | YES | | NULL | on update CURRENT_TIMESTAMP |
---------------- -------------- ------ ----- --------- -----------------------------
5 rows in set (0.04 sec)
(2) HDFS
Docker HDFS 镜像参考地址: https://registry.hub.docker.com/r/gradiant/hdfs
# hdfs-site 配置文件
tee /tmp/hdfs-site.xml <<'EOF'
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="https://blog.csdn.net/agonie201218/article/details/configuration.xsl"?>
<configuration>
<property><name>dfs.namenode.name.dir</name><value>file:///hadoop/dfs/name</value></property>
<property><name>dfs.namenode.rpc-bind-host</name><value>0.0.0.0</value></property>
<property><name>dfs.namenode.servicerpc-bind-host</name><value>0.0.0.0</value></property>
<property><name>dfs.namenode.http-bind-host</name><value>0.0.0.0</value></property>
<property><name>dfs.namenode.https-bind-host</name><value>0.0.0.0</value></property>
<property><name>dfs.client.use.datanode.hostname</name><value>true</value></property>
<property><name>dfs.datanode.use.datanode.hostname</name><value>true</value></property>
<property><name>dfs.namenode.datanode.registration.ip-hostname-check</name><value>false</value></property>
<property><name>dfs.permissions.enabled</name><value>false</value></property>
</configuration>
EOF
# 部署流程 #
docker pull gradiant/hdfs-namenode
docker pull gradiant/hdfs-datanode
docker run -d --name hdfs-namenode -v /tmp/hdfs-site.xml:/opt/hadoop/etc/hadoop/hdfs-site.xml \
-p "8020:8020" \
-p "14000:14000" \
-p "50070:50070" \
-p "50075:50075" \
-p "10020:10020" \
-p "13562:13562" \
-p "19888:19888" gradiant/hdfs-namenode
# 此处需要等待 hdfs-namenode 启动完毕后才执行如下命令
docker run -d --link hdfs-namenode --name hdfs-datanode1 -e CORE_CONF_fs_defaultFS=hdfs://hdfs-namenode:8020 gradiant/hdfs-datanode
docker run -d --link hdfs-namenode --name hdfs-datanode2 -e CORE_CONF_fs_defaultFS=hdfs://hdfs-namenode:8020 gradiant/hdfs-datanode
docker run -d --link hdfs-namenode --name hdfs-datanode3 -e CORE_CONF_fs_defaultFS=hdfs://hdfs-namenode:8020 gradiant/hdfs-datanode
# 测试:在 hdfs 中创建和列出示例文件夹
docker exec -ti hdfs-namenode hadoop fs -mkdir /hdfs # 在根目录下创建hdfs文件夹
docker exec -ti hdfs-namenode hadoop fs -mkdir -p /hdfs/d1/d2 # 创建多级目录
docker exec -ti hdfs-namenode hadoop fs -ls / # 列出根目录下的文件列表
# Found 1 items
# drwxr-xr-x - hdfs supergroup 0 2022-10-27 08:42 /hdfs
# HDFS 常规命令帮助
hadoop fs
# 创建单级、多级目录
hadoop fs -mkdir /hdfs
hadoop fs -mkdir -p /hdfs/d1/d2
# 上传文件到HDFS
echo "hello world" >> local.txt #创建文件
hadoop fs -put local.txt /hdfs/ #上传文件到hdfs
# 下载hdfs文件
hadoop fs -get /hdfs/local.txt
# 删除hdfs中的文件
hadoop fs -rm /hdfs/local.txt
# 删除hdfs中的目录
hadoop fs -rmdir /hdfs/d1/d2
mysqlreader 快速使用说明与配置样例
1 快速介绍
MysqlReader插件实现了从Mysql读取数据。在底层实现上,MysqlReader通过JDBC连接远程Mysql数据库,并执行相应的sql语句将数据从mysql库中SELECT出来。
不同于其他关系型数据库,MysqlReader不支持FetchSize.
2 实现原理
简而言之,MysqlReader通过JDBC连接器连接到远程的Mysql数据库,并根据用户配置的信息生成查询SELECT SQL语句,然后发送到远程Mysql数据库,并将该SQL执行返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。
对于用户配置Table、Column、Where的信息,MysqlReader将其拼接为SQL语句发送到Mysql数据库;对于用户配置querySql信息,MysqlReader直接将其发送到Mysql数据库。
3 功能说明
3.1 配置样例
- 配置一个从Mysql数据库同步抽取数据到本地的作业:
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"column": [
"id",
"name"
],
"splitPk": "db_id",
"connection": [
{
"table": [
"table"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/database"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print":true
}
}
}
]
}
}
- 配置一个自定义SQL的数据库同步任务到本地内容的作业:
{
"job": {
"setting": {
"speed": {
"channel":1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"connection": [
{
"querySql": [
"select db_id,on_line_flag from db_info where db_id < 10;"
],
"jdbcUrl": [
"jdbc:mysql://bad_ip:3306/database",
"jdbc:mysql://127.0.0.1:bad_port/database",
"jdbc:mysql://127.0.0.1:3306/database"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false,
"encoding": "UTF-8"
}
}
}
]
}
}
3.2 参数说明
-
jdbcUrl
-
描述:描述的是到对端数据库的JDBC连接信息,使用JSON的数组描述,并支持一个库填写多个连接地址。之所以使用JSON数组描述连接信息,是因为阿里集团内部支持多个IP探测,如果配置了多个,MysqlReader可以依次探测ip的可连接性,直到选择一个合法的IP。如果全部连接失败,MysqlReader报错。 注意,jdbcUrl必须包含在connection配置单元中。对于阿里集团外部使用情况,JSON数组填写一个JDBC连接即可。
jdbcUrl按照Mysql官方规范,并可以填写连接附件控制信息。具体请参看Mysql官方文档。
-
必选:是
-
默认值:无
-
-
username
-
描述:数据源的用户名
-
必选:是
-
默认值:无
-
-
password
-
描述:数据源指定用户名的密码
-
必选:是
-
默认值:无
-
-
table
-
描述:所选取的需要同步的表。使用JSON的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一schema结构,MysqlReader不予检查表是否同一逻辑表。注意,table必须包含在connection配置单元中。
-
必选:是
-
默认值:无
-
-
column
-
描述:所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。用户使用*代表默认使用所有列配置,例如[‘*’]。
支持列裁剪,即列可以挑选部分列进行导出。
支持列换序,即列可以不按照表schema信息进行导出。
支持常量配置,用户需要按照Mysql SQL语法格式:
["id", "\`table\`", "1", "'bazhen.csy'", "null", "to_char(a 1)", "2.3" , "true"]
id为普通列名,`table`为包含保留字的列名,1为整形数字常量,'bazhen.csy’为字符串常量,null为空指针,to_char(a 1)为表达式,2.3为浮点数,true为布尔值。 -
必选:是
-
默认值:无
-
-
splitPk
-
描述:MysqlReader进行数据抽取时,如果指定splitPk,表示用户希望使用splitPk代表的字段进行数据分片,DataX因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能。
推荐splitPk用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。
目前splitPk仅支持整形数据切分,
不支持浮点、字符串、日期等其他类型
。如果用户指定其他非支持类型,MysqlReader将报错!如果splitPk不填写,包括不提供splitPk或者splitPk值为空,DataX视作使用单通道同步该表数据。
-
必选:否
-
默认值:空
-
-
where
-
描述:筛选条件,MysqlReader根据指定的column、table、where条件拼接SQL,并根据这个SQL进行数据抽取。在实际业务场景中,往往会选择当天的数据进行同步,可以将where条件指定为gmt_create > $bizdate 。注意:不可以将where条件指定为limit 10,limit不是SQL的合法where子句。
where条件可以有效地进行业务增量同步。如果不填写where语句,包括不提供where的key或者value,DataX均视作同步全量数据。
-
必选:否
-
默认值:无
-
-
querySql
- 描述:在有些业务场景下,where这一配置项不足以描述所筛选的条件,用户可以通过该配置型来自定义筛选SQL。当用户配置了这一项之后,DataX系统就会忽略table,column这些配置型,直接使用这个配置项的内容对数据进行筛选,例如需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id
当用户配置querySql时,MysqlReader直接忽略table、column、where条件的配置
,querySql优先级大于table、column、where选项。- 必选:否
- 默认值:无
3.3 类型转换
目前MysqlReader支持大部分Mysql类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。
下面列出MysqlReader针对Mysql类型转换列表:
DataX 内部类型 | Mysql 数据类型 |
---|---|
Long | int, tinyint, smallint, mediumint, int, bigint |
Double | float, double, decimal |
String | varchar, char, tinytext, text, mediumtext, longtext, year |
Date | date, datetime, timestamp, time |
Boolean | bit, bool |
Bytes | tinyblob, mediumblob, blob, longblob, varbinary |
重点注意:
- 除上述罗列字段类型外,其他类型均不支持。
- tinyint(1) DataX视作为整形。
- year DataX视作为字符串类型
- bit DataX属于未定义行为。
4.配置样例
配置一个从 Mysql 数据库同步抽取数据到本地的作业:
tee job/mysql2stream.json <<'EOF'
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"uid",
"name",
"operation_time"
],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://10.20.172.248:3305/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai"],
"table": ["user"]
}
],
"username": "test5",
"password": "Young.top",
"where": "uid > 0"
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true,
"encoding": "UTF-8"
}
}
}
],
"setting": {
"speed": {
"channel": "2"
}
}
}
}
EOF
执行结果:
2022-10-26 21:43:04.419 [0-0-0-reader] INFO CommonRdbmsReader$Task - Finished read record by Sql: [select uid,name,operation_time from user where (uid > 0)]
jdbcUrl:[jdbc:mysql://********&yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true].
1 Young 2022-10-12 14:34:03
2 Elastic 2022-10-12 17:16:34
3 Logstash 2022-10-12 17:16:59
4 Beats 2022-10-12 17:17:06
5 Kibana 2022-10-12 17:27:38
6 C 2022-10-13 02:43:30
7 C 2022-10-13 10:44:59
8 Python 2022-10-13 10:48:45
.....
2022-10-26 21:43:04.497 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[103]ms
2022-10-26 21:43:14.393 [job-0] INFO StandAloneJobContainerCommunicator - Total 8 records, 117 bytes | Speed 11B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2022-10-26 21:43:14.394 [job-0] INFO JobContainer -
任务启动时刻 : 2022-10-26 21:43:04
任务结束时刻 : 2022-10-26 21:43:14
任务总计耗时 : 10s
任务平均流量 : 11B/s
记录写入速度 : 0rec/s
读出记录总数 : 8
读写失败总数 : 0
(2.2) 配置一个自定义 SQL 的数据库同步任务到本地内容的作业:
cat job/mysql2stream1.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": ["jdbc:mysql://10.20.172.248:3305/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai"],
"querySql": ["select uid,name,hobby,operation_time from user where (uid > 3);"],
}
],
"username": "test5",
"password": "Young.top",
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true,
"encoding": "UTF-8"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
执行结果:
# 执行SQL与Jdbc URL
2022-10-26 21:50:58.904 [0-0-0-reader] INFO CommonRdbmsReader$Task - Finished read record by Sql: [select uid,name,hobby,operation_time from user where (uid > 3);]
jdbcUrl:[jdbc:mysql://*******&rewriteBatchedStatements=true].
# 打印查询到的数据到终端中
4 Beats 通用日志采集 2022-10-12 17:17:06
5 Kibana 数据分析,日志搜寻,日志数据展示,可视化 2022-10-12 17:27:38
6 C 面向过程编程语言 2022-10-13 02:43:30
7 C 面向对象 2022-10-13 10:44:59
8 Python 编程语言 2022-10-13 10:48:45
# 任务执行结果统计
2022-10-26 21:51:08.877 [job-0] INFO JobContainer -
任务启动时刻 : 2022-10-26 21:50:58
任务结束时刻 : 2022-10-26 21:51:08
任务总计耗时 : 10s
任务平均流量 : 10B/s
记录写入速度 : 0rec/s
读出记录总数 : 5
读写失败总数 : 0
hdfswriter 快速使用说明与配置样例
- (1) 关键参数 & 类型转换: HdfsWriter 提供向 HDFS 文件系统指定路径中写入 TEXTFile 文件和 ORCFile 文件, 文件内容可与 hive 中表关联。
defaultFS:Hadoop hdfs 文件系统 namenode 节点地址。格式:hdfs://ip: 端口
fileType: 文件的类型,目前只支持用户配置为 “text”(textfile 文件格式) 或 “orc”(orc 表示 orcfile 文件格式)。
path: 存储到 Hadoop hdfs 文件系统的路径信息,HdfsWriter 会根据并发配置在 Path 目录下写入多个文件
fileName: HdfsWriter 写入时的文件名,实际执行时会在该文件名后添加随机的后缀作为每个线程写入实际文件名。
column: 写入数据的字段,不支持对部分列写入。为与 hive 中表关联,需要指定表中所有字段名和字段类型。
writeMode: 写入前数据清理处理模式, APPend 写入前不做任何处理, onConflict,如果目录下有 fileName 前缀的文件,直接报错。
fieldDelimiter: 写入时的字段分隔符, 需要用户保证与创建的 Hive 表的字段分隔符一致,否则无法在 Hive 表中查到数据
compress: 写入文件压缩类型,默认没有压缩。其中:text 类型文件支持压缩类型有 gzip、bzip2;orc 类型文件支持的压缩类型有 NONE、SNAPPY(需要用户安装 SnAPPyCodec)。
encoding: 写文件的编码配置, 默认值 utf-8 慎重修改。
haveKerberos : 是否有 Kerberos 认证默认 false, 如果为 True 则配置项``为必填。
kerberosKeytabFilePath: Kerberos 认证 keytab 文件路径,绝对路径
kerberosPrincipal: Kerberos 认证 Principal 名,如 xxxx/hadoopclient@xxx.xxx
hadoopConfig: HadoopConfig 高级 HA 配置:
// 名称空间: testDfs
"hadoopConfig":{
"dfs.nameservices": "testDfs",
"dfs.ha.namenodes.testDfs": "namenode1,namenode2",
"dfs.namenode.rpc-address.aliDfs.namenode1": "主机名:端口",
"dfs.namenode.rpc-address.aliDfs.namenode2": "主机名:端口",
"dfs.client.failover.proxy.provider.testDfs": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
}
HdfsWriter 针对 Hive 数据类型转换列表:
- (2) 配置样例
(2.1) 从 MySQL 同步数据 HDFS 中实例演示:
// 示例生成
bin/datax.py -r mysqlreader -w hdfswriter > mysql2hdfs.json
// 最终示例
tee job/mysql2hdfs3.json <<'EOF'
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": ["jdbc:mysql://10.20.172.248:3305/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai"],
"querySql": ["select uid,name,hobby,operation_time from user where (uid > 3);"],
}
],
"username": "test5",
"password": "Young.top",
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name":"uid","type":"Long"},
{"name":"name","type":"string"},
{"name":"hobby","type":"string"},
{"name":"operation_time","type":"Date"},
],
"compress": "gzip",
"defaultFS": "hdfs://10.10.107.225:8020",
"fieldDelimiter": "|",
"fileName": "mysql-test-user",
"fileType": "test",
"path": "/hdfs",
"writeMode": "append",
"encoding": "UTF-8"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
EOF
执行结果:
/usr/local/datax# ./bin/datax.py job/mysql2hdfsjson
# 插件加载
2022-10-27 18:25:25.794 [job-0] INFO JobContainer - DataX Reader.Job [mysqlreader] do prepare work .
2022-10-27 18:25:25.794 [job-0] INFO JobContainer - DataX Writer.Job [hdfswriter] do prepare work .
2022-10-27 18:25:25.853 [job-0] INFO HdfsWriter$Job - 由于您配置了writeMode append, 写入前不做清理工作, [/] 目录下写入相应文件名前缀 [mysql-test-user] 的文件
# mysqlreader 开始执行SQL读取数据
2022-10-27 18:25:25.895 [0-0-0-reader] INFO CommonRdbmsReader$Task - Begin to read record by Sql、
# hdfswriter 准备将数据写入到临时目录和文件中
2022-10-27 18:25:25.910 [0-0-0-writer] INFO HdfsWriter$Task - begin do write...
2022-10-27 18:25:25.911 [0-0-0-writer] INFO HdfsWriter$Task - write to file : [hdfs://10.10.107.225:8020/__5a361c9b_5a1b_413f_aec9_d074058f0c82/mysql-test-user__f9ea0b07_9023_46f5_9f3a_dd42244bfdd8]
# mysqlreader 读取完成
2022-10-27 18:25:25.923 [0-0-0-reader] INFO CommonRdbmsReader$Task - Finished read record by Sql
# hdfswriter 将写入的临时文件进行重命名`mysql-test-user__f9ea0b07_9023_46f5_9f3a_dd42244bfdd8.gz`并删除临时目录和文件
2022-10-27 18:25:35.885 [job-0] INFO JobContainer - DataX Writer.Job [hdfswriter] do post work.
2022-10-27 18:25:35.885 [job-0] INFO HdfsWriter$Job - start rename file [hdfs://10.10.107.225:8020/__5a361c9b_5a1b_413f_aec9_d074058f0c82/mysql-test-user__f9ea0b07_9023_46f5_9f3a_dd42244bfdd8.gz] to file [hdfs://10.10.107.225:8020/mysql-test-user__f9ea0b07_9023_46f5_9f3a_dd42244bfdd8.gz].
2022-10-27 18:25:35.902 [job-0] INFO HdfsWriter$Job - finish rename file [hdfs://10.10.107.225:8020/__5a361c9b_5a1b_413f_aec9_d074058f0c82/mysql-test-user__f9ea0b07_9023_46f5_9f3a_dd42244bfdd8.gz] to file [hdfs://10.10.107.225:8020/mysql-test-user__f9ea0b07_9023_46f5_9f3a_dd42244bfdd8.gz].
2022-10-27 18:25:35.902 [job-0] INFO HdfsWriter$Job - start delete tmp dir [hdfs://10.10.107.225:8020/__5a361c9b_5a1b_413f_aec9_d074058f0c82] .
2022-10-27 18:25:35.911 [job-0] INFO HdfsWriter$Job - finish delete tmp dir [hdfs://10.10.107.225:8020/__5a361c9b_5a1b_413f_aec9_d074058f0c82] .
2022-10-27 18:25:35.912 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.
# 结果统计
任务启动时刻 : 2022-10-27 18:25:24
任务结束时刻 : 2022-10-27 18:25:36
任务总计耗时 : 11s
任务平均流量 : 6B/s
记录写入速度 : 0rec/s
读出记录总数 : 5
读写失败总数 : 0
# 从HDFS中读取插入的text数据
$ docker inspect hdfs-datanode1 | grep '"IPAddress"' | head -n 1
"IPAddress": "172.17.0.3",
$ curl "http://172.17.0.3:50075/webhdfs/v1/mysql-test-user__7630c0e0_d169_43cf_a808_272ad7d907bc?op=OPEN&namenoderpcaddress=0f3e052efe21:8020&offset=0"
4|Beats|通用日志采集
5|Kibana|数据分析,日志搜寻,日志数据展示,可视化
6|C|面向过程编程语言
7|C |面向对象
8|Python|编程语言
Tips : 总结 Datax 针对 HDFS 的写入流程,首先将数据写入到一个临时文件,如果全部成功``临时文件名、并删除临时目录。如果个别数据失败则 Job 任务失败,删除临时目录和临时文件。
Tips : 从上面结果可以看出 HDFS 实际执行时会在该文件名后添加随机的后缀作为每个线程的实际写入文件名。
hdfsreader 快速使用说明与配置样例
- (1) 快速介绍和参数说明
HdfsReader 支持的文件格式有``类型格式的文件,且文件内容存放的必须是一张逻辑意义上的二维表。
参数说明:
path: 要读取的文件路径,如果要读取多个文件,可以使用正则表达式 “*”,注意这里可以支持填写多个路径, 比如需要读取表名叫 mytable01 下分区 day 为 20150820 这一天的所有数据,则配置如下: defaultFS: Hadoop hdfs 文件系统 namenode 节点地址 fileType: 文件的类型,目前只支持用户配置为
column: 读取字段列表,type 指定源数据的类型,index 指定当前列来自于文本第几列 (以 0 开始),value 指定当前类型为常量,如``
fieldDelimiter: 读取的字段分隔符
encoding: 读取文件的编码配置
nullFormat: 文本文件中无法使用标准字符串定义 null(空指针),DataX 提供 nullFormat 定义哪些字符串可以表示为 null。
haveKerberos: 是否有 Kerberos 认证,默认 false, 例如如果用户配置 true,则配置项 kerberosKeytabFilePath,kerberosPrincipal 为必填。
kerberosKeytabFilePath: Kerberos 认证 keytab 文件路径,绝对路径
kerberosPrincipal: Kerberos 认证 Principal 名,如 xxxx/hadoopclient@xxx.xxx
compress: 当 fileType(文件类型)为 csv 下的文件压缩方式,目前仅支持 gzip、bz2、zip、lzo、lzo_deflate、hadoop-snAPPy、framing-snAPPy 压缩;
csvReaderConfig: 取 CSV 类型文件参数配置,Map 类型。读取 CSV 类型文件使用的 CsvReader 进行读取,会有很多配置,不配置则使用默认值。
# CsvReader进行读取,会有很多配置,不配置则使用默认值。
"csvReaderConfig":{
"safetySwitch": false,
"skipEmptyRecords": false,
"useTextQualifier": false
}
# 所有配置项及默认值,配置时 csvReaderConfig 的map中请严格按照以下字段名字进行配置:
boolean caseSensitive = true;
char textQualifier = 34;
boolean trimWhitespace = true;
boolean useTextQualifier = true;//是否使用csv转义字符
char delimiter = 44;//分隔符
char recordDelimiter = 0;
char comment = 35;
boolean useComments = false;
int escapeMode = 1;
boolean safetySwitch = true;//单列长度是否限制100000字符
boolean skipEmptyRecords = true;//是否跳过空行
boolean captureRawRecord = true;
HdfsReader 提供了类型转换的建议表如下:
其中:
- Long 是指 Hdfs 文件文本中使用整形的字符串表示形式,例如 “123456789”。
- Double 是指 Hdfs 文件文本中使用 Double 的字符串表示形式,例如 “3.1415”。
- Boolean 是指 Hdfs 文件文本中使用 Boolean 的字符串表示形式,例如 “true”、“false”。不区分大小写。
- Date 是指 Hdfs 文件文本中使用 Date 的字符串表示形式,例如 “2014-12-31”。
特别提醒:
- Hive 支持的数据类型 TIMESTAMP 可以精确到纳秒级别,所以 textfile、orcfile 中 TIMESTAMP 存放的数据类似于 “2015-08-21 22:40:47.397898389”,如果转换的类型配置为 DataX 的 Date,转换之后会导致纳秒部分丢失,所以如果需要保留纳秒部分的数据,请配置转换类型为 DataX 的 String 类型。
Tips: 目前 HdfsReader 不支持对 Hive 元数据数据库进行访问查询,因此用户在进行类型转换的时候,必须指定数据类型,``。
mysqlwriter 快速使用说明与配置样例
- (1) 描述: 我们使用 MysqlWriter 从数仓导入数据到 Mysql,同时 MysqlWriter 亦可以作为数据迁移工具为 DBA 等用户提供服务。
MysqlWriter 通过 DataX 框架获取 Reader 生成的协议数据,根据你配置的 writeMode 生成或者
,与 insert into 行为一致,冲突时会用新行替换原有行所有字段) 的语句写入数据到 Mysql。
MysqlWriter 可用参数:
jdbcUrl: 目的数据库的 JDBC 连接信息。
username: 目的数据库的用户名。
password: 目的数据库的密码。
table: 目的表的表名称。column: 目的表需要写入数据的字段, 字段之间用英文逗号分隔。例如: "column": ["id","name","age"]。如果要依次写入全部列,使用 * 表示, 例如: "column":
。
session: DataX 在获取 Mysql 连接时,执行 session 指定的 SQL 语句,修改当前 connection session 属性。
preSql: 写入数据到目的表前,会先执行这里的标准语句
postSql: 写入数据到目的表后,会执行这里的标准语句。(原理同 preSql )
writeMode: 控制写入数据到目标表采用 insert into 或者 replace into 或者 ON DUPLICATE KEY UPDATE 语句
batchSize: 一次性批量提交的记录数大小,该值可以极大减少 DataX 与 Mysql 的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成 DataX 运行进程 OOM 情况。
MysqlWriter 针对 Mysql 类型转换列表: `` 与 Mysqlreader 插件是一致的。
配置示例演示
- (1) 从 HDFS 同步数据到 MySQL 中实例演示:
# 操作重命名hadoop中指定文件名称
/usr/local/datax# docker exec -it hdfs-namenode hadoop fs -mv /mysql-test-user__7630c0e0_d169_43cf_a808_272ad7d907bc /mysql-test-user.txt
# 数据库表窗口
mysql> CREATE TABLE IF NOT EXISTS test.hdfsreader like test.user;
Query OK, 0 rows affected (0.48 sec)
# 表结构
mysql> DESC test.hdfsreader;
---------------- -------------- ------ ----- --------- -----------------------------
| Field | Type | Null | Key | Default | Extra |
---------------- -------------- ------ ----- --------- -----------------------------
| uid | int(11) | NO | PRI | NULL | auto_increment |
| name | varchar(32) | NO | | NULL | |
| hobby | varchar(255) | NO | | NULL | |
| operation_time | datetime | YES | | NULL | on update CURRENT_TIMESTAMP |
---------------- -------------- ------ ----- --------- -----------------------------
4 rows in set (0.03 sec)
# hdfsreader =>> mysqlwriter 示例文件生成
$ bin/datax.py -r hdfsreader -w mysqlwriter
# 最终的Job配置
tee job/hdfs2mysql.json <<'EOF'
{
"job": {
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"column": [
{"index":1,"type":"string"},
{"index":2,"type":"string"}
],
"defaultFS": "hdfs://10.10.107.225:8020",
"encoding": "UTF-8",
"fieldDelimiter": "|",
"fileType": "text",
"path": "/mysql-test-user.txt"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [
"name",
"hobby"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://10.20.172.248:3305/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai",
"table": ["hdfsreader"]
}
],
"username": "test5",
"password": "Young.top",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
EOF
执行结果:
$ bin/datax.py job/hdfs2mysql.json
# - 获取的列
2022-10-27 23:20:44.255 [job-0] INFO OriginalConfPretreatmentUtil - table:[hdfsreader] all columns:[uid,name,hobby,operation_time].
# - SQL语句生成
2022-10-27 23:20:44.277 [job-0] INFO OriginalConfPretreatmentUtil - Write data [insert INTO %s (name,hobby) VALUES(?,?)], which jdbcUrl like:[jdbc:mysql://10.20.172.248:3305/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai&yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true]
# - 读取HDFS中指定的mysql-test-user.txt文件
2022-10-27 23:20:44.763 [job-0] INFO HdfsReader$Job - [hdfs://10.10.107.225:8020/mysql-test-user.txt]是[text]类型的文件, 将该文件加入source files列表
2022-10-27 23:20:44.764 [job-0] INFO HdfsReader$Job - 您即将读取的文件数为: [1], 列表为: [hdfs://10.10.107.225:8020/mysql-test-user.txt]
# - 读取开始
2022-10-27 23:20:44.818 [0-0-0-reader] INFO Reader$Task - read start
2022-10-27 23:20:44.818 [0-0-0-reader] INFO Reader$Task - reading file : [hdfs://10.10.107.225:8020/mysql-test-user.txt]
2022-10-27 23:20:44.833 [0-0-0-reader] INFO UnstructuredStorageReaderUtil - CsvReader使用默认值[{"captureRawRecord":true,"columnCount":0,"comment":"#","currentRecord":-1,"delimiter":"|","escapeMode":1,"headerCount":0,"rawRecord":"","recordDelimiter":"\u0000","safetySwitch":false,"skipEmptyRecords":true,"textQualifier":"\"","trimWhitespace":true,"useComments":false,"useTextQualifier":true,"values":[]}],csvReaderConfig值为[null]
2022-10-27 23:20:44.836 [0-0-0-reader] INFO Reader$Task - end read source files...
# - 执行结果
任务启动时刻 : 2022-10-27 23:20:43
任务结束时刻 : 2022-10-27 23:20:54
任务总计耗时 : 10s
任务平均流量 : 6B/s
记录写入速度 : 0rec/s
读出记录总数 : 5
读写失败总数 : 0
# - 查看写入表的数据。
mysql> select * from test.hdfsreader;
----- -------- --------------------------------------- ----------------
| uid | name | hobby | operation_time |
----- -------- --------------------------------------- ----------------
| 1 | Beats | 通用日志采集 | NULL |
| 2 | Kibana | 数据分析,日志搜寻,日志数据展示,可视化 | NULL |
| 3 | C | 面向过程编程语言 | NULL |
| 4 | C | 面向对象 | NULL |
| 5 | Python | 编程语言 | NULL |
----- -------- --------------------------------------- ----------------
5 rows in set (0.04 sec)
- (2) 从 MySQL 中读取并写入到指定表之中。
# 表示创建
create TABLE if not EXISTS mysqlwriter LIKE hdfsreader;
# 示例生成
bin/datax.py -r mysqlreader -w mysqlwriter
# job任务配置
tee job/mysql2mysql.json <<'EOF'
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": ["jdbc:mysql://10.20.172.248:3305/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai"],
"querySql": ["select name,hobby,operation_time from user where (uid > 5);"],
}
],
"username": "test5",
"password": "Young.top",
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [
"name",
"hobby",
"operation_time"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://10.20.172.248:3305/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai",
"table": ["mysqlwriter"]
}
],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"delete from mysqlwriter"
],
"username": "test5",
"password": "Young.top",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
EOF
执行结果:
# -MysqlWriter 写入的字段以及SQL
2022-10-28 09:57:05.554 [job-0] INFO OriginalConfPretreatmentUtil - table:[mysqlwriter] all columns:[uid,name,hobby,operation_time].
2022-10-28 09:57:05.582 [job-0] INFO OriginalConfPretreatmentUtil - Write data [
insert INTO %s (name,hobby,operation_time) VALUES(?,?,?)
], which jdbcUrl like:[jdbc:mysql://10.20.172.248:3305/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai&yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true]
# -写入前的SQL执行处理
2022-10-28 09:57:05.628 [job-0] INFO CommonRdbmsWriter$Job - Begin to execute preSqls:[delete from mysqlwriter]. context info:jdbc:mysql://10.20.172.248:3305/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&serverTimezone=Asia/Shanghai&yearIsDateType=false&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&rewriteBatchedStatements=true.
# -执行统计
任务启动时刻 : 2022-10-28 09:57:05
任务结束时刻 : 2022-10-28 09:57:15
任务总计耗时 : 10s
任务平均流量 : 5B/s
记录写入速度 : 0rec/s
读出记录总数 : 3
读写失败总数 : 0
# -查询插入到数据库的中的表数据
SELECT * from test.mysqlwriter;
----- -------- ------------------ ---------------------
| uid | name | hobby | operation_time |
----- -------- ------------------ ---------------------
| 1 | C | 面向过程编程语言 | 2022-10-13 02:43:30 |
| 2 | C | 面向对象 | 2022-10-13 10:44:59 |
| 3 | Python | 编程语言 | 2022-10-13 10:48:45 |
----- -------- ------------------ ---------------------
3 rows in set (0.03 sec)
Tips : 非常注意读取和写入的字段数需要一致。
DataX 使用优化
关键参数
➢ job.setting.speed.channel : channel 并发数
➢ job.setting.speed.record : 2 全局配置 channel 的 record 限速
➢ job.setting.speed.byte:全局配置 channel 的 byte 限速
➢ core.transport.channel.speed.record:单个 channel 的 record 限速
➢ core.transport.channel.speed.byte:单个 channel 的 byte 限速
优化 1:提升每个 channel 的速度
在 DataX 内部对每个 Channel 会有严格的速度控制,分两种,一种是控制每秒同步的记录数,另外一种是每秒同步的字节数,默认的速度限制是 1MB/s,可以根据具体硬件情况设置这个 byte 速度或者 record 速度,一般设置 byte 速度,比如:我们可以把单个 Channel 的速度上限配置为 5MB
优化 2:提升 DataX Job 内 Channel 并发数
并发数 = taskGroup 的数量 * 每个 TaskGroup 并发执行的 Task 数 (默认为 5)。提升 job 内 Channel 并发有三种配置方式:
配置全局 Byte 限速以及单 Channel Byte 限速
Channel 个数 = 全局 Byte 限速 / 单 Channel Byte 限速
{
"core": {
"transport": {
"channel": {
"speed": {
"byte": 1048576
}
}
}
},
"job": {
"setting": {
"speed": {
"byte" : 5242880
}
}, ...
}
}
core.transport.channel.speed.byte=1048576,job.setting.speed.byte=5242880,所以 Channel
个数 = 全局 Byte 限速 / 单 Channel Byte 限速=5242880/1048576=5 个
配置全局 Record 限速以及单 Channel Record 限速
Channel 个数 = 全局 Record 限速 / 单 Channel Record 限速
{
"core": {
"transport": {
"channel": {
"speed": {
"record": 100
}
}
}
},
"job": {
"setting": {
"speed": {
"record" : 500
}
}, ...
}
}
core.transport.channel.speed.record=100 , job.setting.speed.record=500, 所以配置全局
Record 限速以及单 Channel Record 限速,Channel 个数 = 全局 Record 限速 / 单 Channel
Record 限速=500/100=5
直接配置 Channel 个数
只有在上面两种未设置才生效,上面两个同时设置是取值小的作为最终的 channel 数。
{
"job": {
"setting": {
"speed": {
"channel" : 5
}
}, ...
}
}
直接配置 job.setting.speed.channel=5,所以 job 内 Channel 并发=5 个
优化 3:提高 JVM 堆内存
当提升 DataX Job 内 Channel 并发数时,内存的占用会显著增加,因为 DataX 作为数据交换通道,在内存中会缓存较多的数据。例如 Channel 中会有一个 Buffer,作为临时的数据交换的缓冲区,而在部分 Reader 和 Writer 的中,也会存在一些 Buffer,为了防止 OOM 等错误,调大 JVM 的堆内存。建议将内存设置为 4G 或者 8G,这个也可以根据实际情况来调整。
调整 JVM xms xmx 参数的两种方式:一种是直接更改 datax.py 脚本;另一种是在启动
的时候,加上对应的参数,如下:
python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" XXX.json
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhfhegef
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
photoshop蒙版画笔没反应怎么办
PHP中文网 06-24