• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

ETL系列二、SpringBoot集成DataX

武飞扬头像
幽幽之心
帮助1

一、引言

有的项目可能存在一些需求,项目需要使用自己的定时任务调度工具(如xxl-job等)来调度datax任务脚本,这个时候就需要在SpringBoot工程中集成Datax来使用。

二、集成方案

一般有两个比较简单的集成方案:

(1) 执行command命令方式

(2) 调用datax任务执行器方式

三、集成实战

1、执行command命令方式

此方案只需要编写一个工具类即可,但是应用运行环境需要支持python。

  1.  
    import org.slf4j.Logger;
  2.  
    import org.slf4j.LoggerFactory;
  3.  
    import org.springframework.beans.factory.annotation.Value;
  4.  
    import org.springframework.stereotype.Component;
  5.  
     
  6.  
    import java.io.BufferedInputStream;
  7.  
    import java.io.BufferedReader;
  8.  
    import java.io.InputStreamReader;
  9.  
    import java.util.Arrays;
  10.  
     
  11.  
    /**
  12.  
    * 命令执行工具类
  13.  
    */
  14.  
    @Component
  15.  
    public class ExecCommandUtil {
  16.  
    private static Logger log = LoggerFactory.getLogger(ExecCommandUtil.class);
  17.  
    private static String CHARSET;
  18.  
    @Value("${spring.datax.command.charset:GBK}")
  19.  
    public void setCharset(String charset) {
  20.  
    this.CHARSET = charset;
  21.  
    }
  22.  
     
  23.  
    public static void execCommand(String param) throws Exception {
  24.  
    int exitValue = -1;
  25.  
    String[] command = param.split(" ");
  26.  
    log.info(Arrays.toString(command));
  27.  
    BufferedReader bufferedReader = null;
  28.  
    try {
  29.  
    long startTime = System.currentTimeMillis();
  30.  
    // command process
  31.  
    ProcessBuilder processBuilder = new ProcessBuilder();
  32.  
    processBuilder.command(command);
  33.  
    processBuilder.redirectErrorStream(true);
  34.  
     
  35.  
    Process process = processBuilder.start();
  36.  
     
  37.  
    BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream());
  38.  
     
  39.  
    // 指定读取流编码
  40.  
    bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream, CHARSET));
  41.  
     
  42.  
    // command log
  43.  
    String line;
  44.  
    while ((line = bufferedReader.readLine()) != null) {
  45.  
    log.info(line);
  46.  
    }
  47.  
     
  48.  
    // command exit
  49.  
    process.waitFor();
  50.  
    long endTime = System.currentTimeMillis();
  51.  
    log.debug("command execute spend time: {} ms", endTime - startTime);
  52.  
    exitValue = process.exitValue();
  53.  
    } finally {
  54.  
    if (bufferedReader != null) {
  55.  
    bufferedReader.close();
  56.  
    }
  57.  
    }
  58.  
     
  59.  
    // 命令退出值exitValue不等于0且不等于3,代表命令未成功执行
  60.  
    if (exitValue != 0 && exitValue != 3) {
  61.  
    throw new Exception(String.format("command is failed, exit value=%s.", exitValue));
  62.  
    }
  63.  
    }
  64.  
    }
学新通

2、调用datax任务执行器方式

(1) 添加依赖

注意:添加依赖前,需要将如下的这些包上传到私有仓库。

  1.  
    <dependency>
  2.  
    <groupId>org.springframework.boot</groupId>
  3.  
    <artifactId>spring-boot-starter-web</artifactId>
  4.  
    </dependency>
  5.  
    <dependency>
  6.  
    <groupId>org.springframework.boot</groupId>
  7.  
    <artifactId>spring-boot-starter-test</artifactId>
  8.  
    <scope>test</scope>
  9.  
    </dependency>
  10.  
    <dependency>
  11.  
    <groupId>com.alibaba.datax</groupId>
  12.  
    <artifactId>datax-common</artifactId>
  13.  
    <version>0.0.1-SNAPSHOT</version>
  14.  
    </dependency>
  15.  
    <dependency>
  16.  
    <groupId>com.alibaba.datax</groupId>
  17.  
    <artifactId>datax-core</artifactId>
  18.  
    <version>0.0.1-SNAPSHOT</version>
  19.  
    </dependency>
  20.  
    <dependency>
  21.  
    <groupId>junit</groupId>
  22.  
    <artifactId>junit</artifactId>
  23.  
    <scope>test</scope>
  24.  
    </dependency>
学新通

(2) 添加配置

  1.  
    ## DataX插件安装路径设置
  2.  
    spring.datax.homepath=/data/datax/datax

(3) 编码

  • datax工作目录系统变量设置工具类DataxHomePathUtil
  1.  
    import org.slf4j.Logger;
  2.  
    import org.slf4j.LoggerFactory;
  3.  
    import org.springframework.beans.factory.annotation.Value;
  4.  
    import org.springframework.stereotype.Component;
  5.  
     
  6.  
     
  7.  
    /**
  8.  
    * datax工作目录工具类
  9.  
    */
  10.  
    @Component
  11.  
    public class DataxHomePathUtil {
  12.  
    private static Logger logger = LoggerFactory.getLogger(DataxHomePathUtil.class);
  13.  
    /**
  14.  
    * datax工作目录
  15.  
    * 存放插件与job定义文件
  16.  
    */
  17.  
    private static String DATAX_PLUGIN_PATH;
  18.  
     
  19.  
    @Value("${spring.datax.homepath:}")
  20.  
    public void setDataxPluginPath(String dataxPluginPath)
  21.  
    {
  22.  
    this.DATAX_PLUGIN_PATH = dataxPluginPath;
  23.  
    }
  24.  
     
  25.  
    public static void setDataxHomePath() {
  26.  
    logger.debug("---datax插件安装目录:{}", DATAX_PLUGIN_PATH);
  27.  
    System.setProperty("datax.home", DATAX_PLUGIN_PATH);
  28.  
    }
  29.  
     
  30.  
    }
学新通
  • DataX任务引擎调用工具类EngineHelper
  1.  
    import com.alibaba.datax.core.Engine;
  2.  
    import org.springframework.stereotype.Component;
  3.  
     
  4.  
    /**
  5.  
    * job引擎执行工具类
  6.  
    */
  7.  
    @Component
  8.  
    public class EngineHelper {
  9.  
    /**
  10.  
    * datax任务引擎
  11.  
    * @param jobJson json配置文件路径
  12.  
    * @throws Throwable
  13.  
    */
  14.  
    public static void entry(String jobJson) throws Throwable {
  15.  
    DataxHomePathUtil.setDataxHomePath();
  16.  
    String[] datxArgs2 = {"-job", jobJson, "-mode", "standalone", "-jobid", "-1"};
  17.  
    Engine.entry(datxArgs2);
  18.  
    }
  19.  
    }
学新通

3、测试

(1) 添加配置

添加配置前,请准备好数据同步任务脚本,并上传至对应路径。

  1.  
    ## datax数据同步任务脚本
  2.  
    spring.datax.job.balfund=/data/datax/datax/job/balfund-1.json
  3.  
    ## datax数据同步命令
  4.  
    spring.datax.command.py-balfund=python /data/datax/datax/bin/datax.py -p"-Dversion='8'" /data/datax/datax/job/balfund-clickhouse2.json

(2)编写测试类

  1.  
    import com.***.datax.util.EngineHelper;
  2.  
    import com.***.datax.util.ExecCommandUtil;
  3.  
    import org.slf4j.Logger;
  4.  
    import org.slf4j.LoggerFactory;
  5.  
    import org.springframework.beans.factory.annotation.Value;
  6.  
    import org.springframework.stereotype.Controller;
  7.  
    import org.springframework.web.bind.annotation.GetMapping;
  8.  
    import org.springframework.web.bind.annotation.RequestMapping;
  9.  
     
  10.  
     
  11.  
    @Controller
  12.  
    @RequestMapping("/datax")
  13.  
    public class DataxController {
  14.  
    Logger log = LoggerFactory.getLogger(DataxController.class);
  15.  
    @Value("${spring.datax.job.balfund}")
  16.  
    private String jobJsonBalfund;
  17.  
     
  18.  
    @Value("${spring.datax.command.py-balfund}")
  19.  
    private String pyJobBalfund;
  20.  
     
  21.  
    @GetMapping("/test-1")
  22.  
    public String test1() {
  23.  
    log.info("------------{}", jobJsonBalfund);
  24.  
    try {
  25.  
    EngineHelper.entry(jobJsonBalfund);
  26.  
    } catch (Throwable e) {
  27.  
    throw new RuntimeException(e);
  28.  
    }
  29.  
    return "执行完成";
  30.  
    }
  31.  
     
  32.  
    @GetMapping("/test-2")
  33.  
    public String test2() {
  34.  
    log.info("------------{}", jobJsonBalfund);
  35.  
    try {
  36.  
    ExecCommandUtil.execCommand(pyJobBalfund);
  37.  
    } catch (Exception e) {
  38.  
    throw new RuntimeException(e);
  39.  
    }
  40.  
    return "执行完成";
  41.  
    }
  42.  
    }
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgbhfcb
系列文章
更多 icon
同类精品
更多 icon
继续加载