ETL系列二、SpringBoot集成DataX
一、引言
有的项目可能存在一些需求,项目需要使用自己的定时任务调度工具(如xxl-job等)来调度datax任务脚本,这个时候就需要在SpringBoot工程中集成Datax来使用。
二、集成方案
一般有两个比较简单的集成方案:
(1) 执行command命令方式
(2) 调用datax任务执行器方式
三、集成实战
1、执行command命令方式
此方案只需要编写一个工具类即可,但是应用运行环境需要支持python。
-
import org.slf4j.Logger;
-
import org.slf4j.LoggerFactory;
-
import org.springframework.beans.factory.annotation.Value;
-
import org.springframework.stereotype.Component;
-
-
import java.io.BufferedInputStream;
-
import java.io.BufferedReader;
-
import java.io.InputStreamReader;
-
import java.util.Arrays;
-
-
/**
-
* 命令执行工具类
-
*/
-
-
public class ExecCommandUtil {
-
private static Logger log = LoggerFactory.getLogger(ExecCommandUtil.class);
-
private static String CHARSET;
-
-
public void setCharset(String charset) {
-
this.CHARSET = charset;
-
}
-
-
public static void execCommand(String param) throws Exception {
-
int exitValue = -1;
-
String[] command = param.split(" ");
-
log.info(Arrays.toString(command));
-
BufferedReader bufferedReader = null;
-
try {
-
long startTime = System.currentTimeMillis();
-
// command process
-
ProcessBuilder processBuilder = new ProcessBuilder();
-
processBuilder.command(command);
-
processBuilder.redirectErrorStream(true);
-
-
Process process = processBuilder.start();
-
-
BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream());
-
-
// 指定读取流编码
-
bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream, CHARSET));
-
-
// command log
-
String line;
-
while ((line = bufferedReader.readLine()) != null) {
-
log.info(line);
-
}
-
-
// command exit
-
process.waitFor();
-
long endTime = System.currentTimeMillis();
-
log.debug("command execute spend time: {} ms", endTime - startTime);
-
exitValue = process.exitValue();
-
} finally {
-
if (bufferedReader != null) {
-
bufferedReader.close();
-
}
-
}
-
-
// 命令退出值exitValue不等于0且不等于3,代表命令未成功执行
-
if (exitValue != 0 && exitValue != 3) {
-
throw new Exception(String.format("command is failed, exit value=%s.", exitValue));
-
}
-
}
-
}
2、调用datax任务执行器方式
(1) 添加依赖
注意:添加依赖前,需要将如下的这些包上传到私有仓库。
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-web</artifactId>
-
</dependency>
-
<dependency>
-
<groupId>org.springframework.boot</groupId>
-
<artifactId>spring-boot-starter-test</artifactId>
-
<scope>test</scope>
-
</dependency>
-
<dependency>
-
<groupId>com.alibaba.datax</groupId>
-
<artifactId>datax-common</artifactId>
-
<version>0.0.1-SNAPSHOT</version>
-
</dependency>
-
<dependency>
-
<groupId>com.alibaba.datax</groupId>
-
<artifactId>datax-core</artifactId>
-
<version>0.0.1-SNAPSHOT</version>
-
</dependency>
-
<dependency>
-
<groupId>junit</groupId>
-
<artifactId>junit</artifactId>
-
<scope>test</scope>
-
</dependency>
(2) 添加配置
-
## DataX插件安装路径设置
-
spring.datax.homepath=/data/datax/datax
(3) 编码
- datax工作目录系统变量设置工具类DataxHomePathUtil
-
import org.slf4j.Logger;
-
import org.slf4j.LoggerFactory;
-
import org.springframework.beans.factory.annotation.Value;
-
import org.springframework.stereotype.Component;
-
-
-
/**
-
* datax工作目录工具类
-
*/
-
-
public class DataxHomePathUtil {
-
private static Logger logger = LoggerFactory.getLogger(DataxHomePathUtil.class);
-
/**
-
* datax工作目录
-
* 存放插件与job定义文件
-
*/
-
private static String DATAX_PLUGIN_PATH;
-
-
-
public void setDataxPluginPath(String dataxPluginPath)
-
{
-
this.DATAX_PLUGIN_PATH = dataxPluginPath;
-
}
-
-
public static void setDataxHomePath() {
-
logger.debug("---datax插件安装目录:{}", DATAX_PLUGIN_PATH);
-
System.setProperty("datax.home", DATAX_PLUGIN_PATH);
-
}
-
-
}
- DataX任务引擎调用工具类EngineHelper
-
import com.alibaba.datax.core.Engine;
-
import org.springframework.stereotype.Component;
-
-
/**
-
* job引擎执行工具类
-
*/
-
-
public class EngineHelper {
-
/**
-
* datax任务引擎
-
* @param jobJson json配置文件路径
-
* @throws Throwable
-
*/
-
public static void entry(String jobJson) throws Throwable {
-
DataxHomePathUtil.setDataxHomePath();
-
String[] datxArgs2 = {"-job", jobJson, "-mode", "standalone", "-jobid", "-1"};
-
Engine.entry(datxArgs2);
-
}
-
}
3、测试
(1) 添加配置
添加配置前,请准备好数据同步任务脚本,并上传至对应路径。
-
## datax数据同步任务脚本
-
spring.datax.job.balfund=/data/datax/datax/job/balfund-1.json
-
## datax数据同步命令
-
spring.datax.command.py-balfund=python /data/datax/datax/bin/datax.py -p"-Dversion='8'" /data/datax/datax/job/balfund-clickhouse2.json
(2)编写测试类
-
import com.***.datax.util.EngineHelper;
-
import com.***.datax.util.ExecCommandUtil;
-
import org.slf4j.Logger;
-
import org.slf4j.LoggerFactory;
-
import org.springframework.beans.factory.annotation.Value;
-
import org.springframework.stereotype.Controller;
-
import org.springframework.web.bind.annotation.GetMapping;
-
import org.springframework.web.bind.annotation.RequestMapping;
-
-
-
-
-
public class DataxController {
-
Logger log = LoggerFactory.getLogger(DataxController.class);
-
-
private String jobJsonBalfund;
-
-
-
private String pyJobBalfund;
-
-
-
public String test1() {
-
log.info("------------{}", jobJsonBalfund);
-
try {
-
EngineHelper.entry(jobJsonBalfund);
-
} catch (Throwable e) {
-
throw new RuntimeException(e);
-
}
-
return "执行完成";
-
}
-
-
-
public String test2() {
-
log.info("------------{}", jobJsonBalfund);
-
try {
-
ExecCommandUtil.execCommand(pyJobBalfund);
-
} catch (Exception e) {
-
throw new RuntimeException(e);
-
}
-
return "执行完成";
-
}
-
}
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgbhfcb
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13