Spark通过Spark-launcherJavaApi提交任务到yarn集群
通过spark-launcher提交
public void crateBatchTaskByLauncher() throws Exception {
SparkApplicationParam sparkAppParams = new SparkApplicationParam();
sparkAppParams.setJarPath("/home/bd/SPARK/spark-test-1.0.0.jar");
sparkAppParams.setMainClass("com.starnet.server.bigdata.spark.wordcount.WordCount");
submitApplication(sparkAppParams);
}
public void submitApplication(SparkApplicationParam sparkAppParams, String... otherParams) throws Exception {
log.info("spark任务传入参数:{}", sparkAppParams.toString());
Map<String, String> confParams = sparkAppParams.getOtherConfParams();
SparkLauncher launcher = new SparkLauncher()
.setSparkHome("/opt/module/spark-yarn")
.setAppResource(sparkAppParams.getJarPath())
.setMainClass(sparkAppParams.getMainClass())
.setMaster(sparkAppParams.getMaster())
.setDeployMode(sparkAppParams.getDeployMode())
.setAppName("testAppName")
.setConf("spark.driver.memory", sparkAppParams.getDriverMemory())
.setConf("spark.executor.memory", sparkAppParams.getExecutorMemory())
.setConf("spark.eventLog.dir", "hdfs://hadoop113:8020/spark")
.setConf("spark.yarn.jars", "hdfs://hadoop113:8020/jar/spark/*.jar")
.setConf("spark.executor.cores", sparkAppParams.getExecutorCores())
// 日志重定向
.redirectError(new File("/home/bd/server/sparkErr"))
.redirectOutput(new File("/home/bd/server/sparkInput"))
.setVerbose(true);
if (confParams != null && confParams.size() != 0) {
log.info("开始设置spark job运行参数:{}", JSONObject.toJSONString(confParams));
for (Map.Entry<String, String> conf : confParams.entrySet()) {
log.info("{}:{}", conf.getKey(), conf.getValue());
launcher.setConf(conf.getKey(), conf.getValue());
}
}
if (otherParams.length != 0) {
log.info("开始设置spark job参数:{}", Arrays.toString(otherParams));
launcher.addAppArgs(otherParams);
}
log.info("参数设置完成,开始提交spark任务");
// 线程提交
new Thread(() -> {
try {
CountDownLatch countDownLatch = new CountDownLatch(1);
SparkAppHandle handle = launcher.startApplication(new SparkAppHandle.Listener() {
@Override
public void stateChanged(SparkAppHandle sparkAppHandle) {
// TODO:状态改变之后可以进行一些特定的操作,例如通知onos相关信息。
if (sparkAppHandle.getState().isFinal()) {
countDownLatch.countDown();
}
if (sparkAppHandle.getAppId() != null) {
log.info("{} stateChanged: {}", sparkAppHandle.getAppId(), sparkAppHandle.getState().toString());
} else {
log.info("stateChanged: {}", sparkAppHandle.getState().toString());
}
}
@Override
public void infoChanged(SparkAppHandle sparkAppHandle) {
if (sparkAppHandle.getAppId() != null) {
log.info("{} infoChanged: {}, err info: {}",
sparkAppHandle.getAppId(), sparkAppHandle.getState().toString(), sparkAppHandle.getError());
} else {
log.info("infoChanged: {}, err info: {}",
sparkAppHandle.getState().toString(), sparkAppHandle.getError());
}
}
});
log.warn("The task is executing, current is get application id before, please wait ....");
// 状态包括:
// UNKNOWN,还没有报告回来
// CONNECTED,已连接
// SUBMITTED,已提交
// RUNNING,运行中
// FINISHED,已完成且成功
// FAILED,已完成且失败
// KILLED,已完成且被杀死
// LOST,已完成且以未知状态退出
String applicationId = null;
while(!SparkAppHandle.State.RUNNING.equals(handle.getState())){
applicationId = handle.getAppId();
if (applicationId != null) {
log.warn("handle current state is {}, app id is {}", handle.getState().toString(), applicationId);
break;
}
}
log.warn("The task is executing, current has been get application id before, please wait ....");
// Asks the application to stop.
// handle.stop();
// Tries to kill the underlying application.
// handle.kill();
//线程等待任务结束
countDownLatch.await();
log.warn("The task is over, over state is {}", handle.getState().toString());
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}).start();
}
}
public class SparkApplicationParam {
/**
* 任务的主类
*/
private String mainClass;
/**
* jar包路径
*/
private String jarPath;
private String master = "yarn";
private String deployMode = "cluster";
private String driverMemory = "1g";
private String executorMemory = "1g";
private String executorCores = "1";
/**
* 其他配置:传递给spark job的参数
*/
private Map<String, String> otherConfParams;
}
spark-launcher任务提交的日志默认和小程序放到一起,可以通过其他方式将日志单独打印出来,之后要实装Spark的话可以将其日志分开输出,便于问题的回溯,并且可以自定义监听器,当信息或者状态变更时,都能进行操作,支持暂停、停止、断连、获得AppId、获得State等多种功能。
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgkhekc
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13