diff --git a/dlink-admin/src/main/java/com/dlink/job/Job2MysqlHandler.java b/dlink-admin/src/main/java/com/dlink/job/Job2MysqlHandler.java index 9757b42619..c057c49ec9 100644 --- a/dlink-admin/src/main/java/com/dlink/job/Job2MysqlHandler.java +++ b/dlink-admin/src/main/java/com/dlink/job/Job2MysqlHandler.java @@ -63,7 +63,8 @@ public class Job2MysqlHandler implements JobHandler { static { historyService = SpringContextUtils.getBean("historyServiceImpl", HistoryService.class); clusterService = SpringContextUtils.getBean("clusterServiceImpl", ClusterService.class); - clusterConfigurationService = SpringContextUtils.getBean("clusterConfigurationServiceImpl", ClusterConfigurationService.class); + clusterConfigurationService = SpringContextUtils.getBean("clusterConfigurationServiceImpl", + ClusterConfigurationService.class); jarService = SpringContextUtils.getBean("jarServiceImpl", JarService.class); jobInstanceService = SpringContextUtils.getBean("jobInstanceServiceImpl", JobInstanceService.class); jobHistoryService = SpringContextUtils.getBean("jobHistoryServiceImpl", JobHistoryService.class); @@ -132,12 +133,12 @@ public boolean success() { final Integer clusterConfigurationId = job.getJobConfig().getClusterConfigurationId(); if (job.isUseGateway()) { cluster = clusterService.registersCluster(Cluster.autoRegistersCluster( - job.getJobManagerAddress(), - job.getJobId(), - job.getJobConfig().getJobName() + LocalDateTime.now(), - job.getType().getLongValue(), - clusterConfigurationId, - taskId)); + job.getJobManagerAddress(), + job.getJobId(), + job.getJobConfig().getJobName() + LocalDateTime.now(), + job.getType().getLongValue(), + clusterConfigurationId, + taskId)); if (Asserts.isNotNull(cluster)) { clusterId = cluster.getId(); } @@ -174,11 +175,12 @@ public boolean success() { jobHistory.setClusterJson(JSONUtil.toJsonString(cluster)); jobHistory.setJarJson(Asserts.isNotNull(job.getJobConfig().getJarId()) - ? JSONUtil.toJsonString(jarService.getById(job.getJobConfig().getJarId())) : null); + ? JSONUtil.toJsonString(jarService.getById(job.getJobConfig().getJarId())) + : null); jobHistory.setClusterConfigurationJson(Asserts.isNotNull(clusterConfigurationId) - ? JSONUtil.toJsonString(clusterConfigurationService.getClusterConfigById(clusterConfigurationId)) - : null); + ? JSONUtil.toJsonString(clusterConfigurationService.getClusterConfigById(clusterConfigurationId)) + : null); jobHistoryService.save(jobHistory); DaemonFactory.addTask(DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstance.getId())); diff --git a/dlink-admin/src/main/java/com/dlink/mapper/JobHistoryMapper.java b/dlink-admin/src/main/java/com/dlink/mapper/JobHistoryMapper.java index 4a1f36393c..2742527d24 100644 --- a/dlink-admin/src/main/java/com/dlink/mapper/JobHistoryMapper.java +++ b/dlink-admin/src/main/java/com/dlink/mapper/JobHistoryMapper.java @@ -37,7 +37,4 @@ public interface JobHistoryMapper extends SuperMapper { @InterceptorIgnore(tenantLine = "true") JobHistory getByIdWithoutTenant(Integer id); - - int insert(JobHistory jobHistory); - } diff --git a/dlink-admin/src/main/java/com/dlink/mapper/JobInstanceMapper.java b/dlink-admin/src/main/java/com/dlink/mapper/JobInstanceMapper.java index afd031f04e..f934095acc 100644 --- a/dlink-admin/src/main/java/com/dlink/mapper/JobInstanceMapper.java +++ b/dlink-admin/src/main/java/com/dlink/mapper/JobInstanceMapper.java @@ -45,6 +45,7 @@ public interface JobInstanceMapper extends SuperMapper { List countHistoryStatus(); + @InterceptorIgnore(tenantLine = "true") List listJobInstanceActive(); JobInstance getJobInstanceByTaskId(Integer id); diff --git a/dlink-admin/src/main/java/com/dlink/service/impl/JobHistoryServiceImpl.java b/dlink-admin/src/main/java/com/dlink/service/impl/JobHistoryServiceImpl.java index a12f04c485..047e15e985 100644 --- a/dlink-admin/src/main/java/com/dlink/service/impl/JobHistoryServiceImpl.java +++ b/dlink-admin/src/main/java/com/dlink/service/impl/JobHistoryServiceImpl.java @@ -45,6 +45,8 @@ @Service public class JobHistoryServiceImpl extends SuperServiceImpl implements JobHistoryService { + private static final Logger log = LoggerFactory.getLogger(JobHistoryServiceImpl.class); + @Override public JobHistory getByIdWithoutTenant(Integer id) { return baseMapper.getByIdWithoutTenant(id); @@ -52,11 +54,9 @@ public JobHistory getByIdWithoutTenant(Integer id) { @Override public JobHistory getJobHistory(Integer id) { - return getJobHistoryInfo(getById(id)); + return getJobHistoryInfo(baseMapper.getByIdWithoutTenant(id)); } - private static final Logger log = LoggerFactory.getLogger(JobHistoryServiceImpl.class); - @Override public JobHistory getJobHistoryInfo(JobHistory jobHistory) { if (Asserts.isNotNull(jobHistory)) { diff --git a/dlink-admin/src/main/java/com/dlink/service/impl/StudioServiceImpl.java b/dlink-admin/src/main/java/com/dlink/service/impl/StudioServiceImpl.java index e7334f556a..b66f98447a 100644 --- a/dlink-admin/src/main/java/com/dlink/service/impl/StudioServiceImpl.java +++ b/dlink-admin/src/main/java/com/dlink/service/impl/StudioServiceImpl.java @@ -103,12 +103,12 @@ public class StudioServiceImpl implements StudioService { private final UDFService udfService; public StudioServiceImpl(ClusterService clusterService, - ClusterConfigurationService clusterConfigurationService, - SavepointsService savepointsService, - DataBaseService dataBaseService, - TaskService taskService, - FragmentVariableService fragmentVariableService, - UDFService udfService) { + ClusterConfigurationService clusterConfigurationService, + SavepointsService savepointsService, + DataBaseService dataBaseService, + TaskService taskService, + FragmentVariableService fragmentVariableService, + UDFService udfService) { this.clusterService = clusterService; this.clusterConfigurationService = clusterConfigurationService; this.savepointsService = savepointsService; @@ -120,17 +120,17 @@ public StudioServiceImpl(ClusterService clusterService, private void addFlinkSQLEnv(AbstractStatementDTO statementDTO) { ProcessEntity process = ProcessContextHolder.getProcess(); - process.info("Start Initialize FlinkSQLEnv:"); + process.info("Start initialize FlinkSQLEnv:"); if (statementDTO.isFragment()) { process.config("Variable opened."); // initialize global variables - process.info("Initializing Global Variables..."); + process.info("Initializing global variables..."); statementDTO.setVariables(fragmentVariableService.listEnabledVariables()); process.infoSuccess(); // initialize database variables - process.info("Initializing Database Variables..."); + process.info("Initializing database variables..."); String flinkWithSql = dataBaseService.getEnabledFlinkWithSql(); if (Asserts.isNotNullString(flinkWithSql)) { statementDTO.setStatement(flinkWithSql + "\n" + statementDTO.getStatement()); @@ -152,7 +152,7 @@ private void addFlinkSQLEnv(AbstractStatementDTO statementDTO) { process.info("No FlinkSQLEnv are loaded."); } } - process.info("Finish Initialize FlinkSQLEnv."); + process.info("Finish initialize FlinkSQLEnv."); } private void buildSession(JobConfig config) { @@ -178,16 +178,15 @@ private JobResult executeFlinkSql(StudioExecuteDTO studioExecuteDTO) { ProcessEntity process = ProcessContextHolder.registerProcess( ProcessEntity.init(ProcessType.FLINKEXECUTE, StpUtil.getLoginIdAsInt())); addFlinkSQLEnv(studioExecuteDTO); - process.info("Initializing Flink Job Config..."); + process.info("Initializing Flink job config..."); JobConfig config = studioExecuteDTO.getJobConfig(); buildSession(config); // init UDF udfService.init(studioExecuteDTO.getStatement(), config); JobManager jobManager = JobManager.build(config); - process.infoSuccess(); process.start(); JobResult jobResult = jobManager.executeSql(studioExecuteDTO.getStatement()); - process.finish(); + process.finish("Execute Flink SQL succeed."); RunTimeUtil.recovery(jobManager); return jobResult; } @@ -208,7 +207,7 @@ public JobResult executeCommonSql(SqlDTO sqlDTO) { JobResult result = new JobResult(); result.setStatement(sqlDTO.getStatement()); result.setStartTimeNow(); - process.info("Initializing Database Connection..."); + process.info("Initializing database connection..."); if (Asserts.isNull(sqlDTO.getDatabaseId())) { result.setSuccess(false); result.setError("请指定数据源"); @@ -229,7 +228,7 @@ public JobResult executeCommonSql(SqlDTO sqlDTO) { process.start(); selectResult = driver.executeSql(sqlDTO.getStatement(), sqlDTO.getMaxRowNum()); } - process.finish(); + process.finish("Execute sql succeed."); result.setResult(selectResult); if (selectResult.isSuccess()) { result.setSuccess(true); @@ -265,7 +264,7 @@ private List explainFlinkSql(StudioExecuteDTO studioExecuteDTO ProcessEntity process = ProcessContextHolder.registerProcess( ProcessEntity.init(ProcessType.FLINKEXPLAIN, StpUtil.getLoginIdAsInt())); addFlinkSQLEnv(studioExecuteDTO); - process.info("Initializing Flink Job Config..."); + process.info("Initializing Flink job config..."); JobConfig config = studioExecuteDTO.getJobConfig(); // If you are using explainSql | getStreamGraph | getJobPlan, make the dialect change to local. config.buildLocal(); @@ -273,10 +272,9 @@ private List explainFlinkSql(StudioExecuteDTO studioExecuteDTO // init UDF udfService.init(studioExecuteDTO.getStatement(), config); JobManager jobManager = JobManager.build(config); - process.infoSuccess(); process.start(); - List sqlExplainResults = - jobManager.explainSql(studioExecuteDTO.getStatement()).getSqlExplainResults(); + List sqlExplainResults = jobManager.explainSql(studioExecuteDTO.getStatement()) + .getSqlExplainResults(); process.finish(); return sqlExplainResults; } @@ -284,7 +282,7 @@ private List explainFlinkSql(StudioExecuteDTO studioExecuteDTO private List explainCommonSql(StudioExecuteDTO studioExecuteDTO) { ProcessEntity process = ProcessContextHolder.registerProcess( ProcessEntity.init(ProcessType.SQLEXPLAIN, StpUtil.getLoginIdAsInt())); - process.info("Initializing Flink Job Config..."); + process.info("Initializing database connection..."); if (Asserts.isNull(studioExecuteDTO.getDatabaseId())) { process.error("The database does not exist."); return Collections.singletonList( @@ -417,8 +415,8 @@ public boolean cancel(Integer clusterId, String jobId) { JobConfig jobConfig = new JobConfig(); jobConfig.setAddress(cluster.getJobManagerHost()); if (Asserts.isNotNull(cluster.getClusterConfigurationId())) { - Map gatewayConfig = - clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId()); + Map gatewayConfig = clusterConfigurationService + .getGatewayConfig(cluster.getClusterConfigurationId()); jobConfig.buildGatewayConfig(gatewayConfig); } JobManager jobManager = JobManager.build(jobConfig); @@ -436,8 +434,8 @@ public boolean savepoint(Integer taskId, Integer clusterId, String jobId, String jobConfig.setType(cluster.getType()); if (Asserts.isNotNull(cluster.getClusterConfigurationId())) { // 如果用户选择用dlink平台来托管集群信息 说明任务一定是从dlink发起提交的 - Map gatewayConfig = - clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId()); + Map gatewayConfig = clusterConfigurationService + .getGatewayConfig(cluster.getClusterConfigurationId()); jobConfig.buildGatewayConfig(gatewayConfig); jobConfig.getGatewayConfig().getClusterConfig().setAppId(cluster.getName()); jobConfig.setTaskId(cluster.getTaskId()); diff --git a/dlink-admin/src/main/java/com/dlink/service/impl/TaskServiceImpl.java b/dlink-admin/src/main/java/com/dlink/service/impl/TaskServiceImpl.java index 6c7ba081e5..48df1e28d7 100644 --- a/dlink-admin/src/main/java/com/dlink/service/impl/TaskServiceImpl.java +++ b/dlink-admin/src/main/java/com/dlink/service/impl/TaskServiceImpl.java @@ -79,6 +79,9 @@ import com.dlink.model.TaskOperatingStatus; import com.dlink.model.TaskVersion; import com.dlink.model.UDFTemplate; +import com.dlink.process.context.ProcessContextHolder; +import com.dlink.process.model.ProcessEntity; +import com.dlink.process.model.ProcessType; import com.dlink.result.SqlExplainResult; import com.dlink.result.TaskOperatingResult; import com.dlink.service.AlertGroupService; @@ -137,6 +140,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import cn.dev33.satoken.stp.StpUtil; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.convert.Convert; @@ -211,15 +215,22 @@ public JobResult submitTask(Integer id) { return executeCommonSql(SqlDTO.build(task.getStatement(), task.getDatabaseId(), null)); } + ProcessEntity process = ProcessContextHolder.registerProcess( + ProcessEntity.init(ProcessType.FLINKSUBMIT, StpUtil.getLoginIdAsInt())); + process.info("Initializing Flink job config..."); JobConfig config = buildJobConfig(task); // init UDF udfService.init(task.getStatement(), config); JobManager jobManager = JobManager.build(config); - + process.start(); if (!config.isJarTask()) { - return jobManager.executeSql(task.getStatement()); + JobResult jobResult = jobManager.executeSql(task.getStatement()); + process.finish("Submit Flink SQL succeed."); + return jobResult; } else { - return jobManager.executeJar(); + JobResult jobResult = jobManager.executeJar(); + process.finish("Submit Flink Jar succeed."); + return jobResult; } } @@ -492,8 +503,8 @@ public Task getTaskByNameAndTenantId(String name, Integer tenantId) { @Override public JobStatus checkJobStatus(JobInfoDetail jobInfoDetail) { JobConfig jobConfig = new JobConfig(); - Map gatewayConfigMap = - clusterConfigurationService.getGatewayConfig(jobInfoDetail.getClusterConfiguration().getId()); + Map gatewayConfigMap = clusterConfigurationService + .getGatewayConfig(jobInfoDetail.getClusterConfiguration().getId()); jobConfig.buildGatewayConfig(gatewayConfigMap); GatewayConfig gatewayConfig = jobConfig.getGatewayConfig(); gatewayConfig.setType(GatewayType.get(jobInfoDetail.getCluster().getType())); @@ -530,9 +541,8 @@ public Task getUDFByClassName(String className) { @Override public List getAllUDF() { - List tasks = - list(new QueryWrapper().in("dialect", Dialect.JAVA, Dialect.SCALA, Dialect.PYTHON) - .eq("enabled", 1).isNotNull("save_point_path")); + List tasks = list(new QueryWrapper().in("dialect", Dialect.JAVA, Dialect.SCALA, Dialect.PYTHON) + .eq("enabled", 1).isNotNull("save_point_path")); return tasks.stream().peek(task -> { Assert.check(task); task.setStatement(statementService.getById(task.getId()).getStatement()); @@ -568,8 +578,8 @@ public Result releaseTask(Integer id) { public Task createTaskVersionSnapshot(Task task) { List taskVersions = taskVersionService.getTaskVersionByTaskId(task.getId()); List versionIds = taskVersions.stream().map(TaskVersion::getVersionId).collect(Collectors.toList()); - Map versionMap = - taskVersions.stream().collect(Collectors.toMap(TaskVersion::getVersionId, t -> t)); + Map versionMap = taskVersions.stream() + .collect(Collectors.toMap(TaskVersion::getVersionId, t -> t)); TaskVersion taskVersion = new TaskVersion(); BeanUtil.copyProperties(task, taskVersion); @@ -755,13 +765,13 @@ private boolean savepointJobInstance(Integer jobInstanceId, String savePointType Task task = this.getTaskInfoById(cluster.getTaskId()); if (Asserts.isNotNull(cluster.getClusterConfigurationId())) { - Map gatewayConfig = - clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId()); + Map gatewayConfig = clusterConfigurationService + .getGatewayConfig(cluster.getClusterConfigurationId()); // 如果是k8s application 模式,且不是sql任务,则需要补齐statement 内的自定义配置 if (Dialect.KUBERNETES_APPLICATION.equalsVal(task.getDialect())) { Statement statement = statementService.getById(cluster.getTaskId()); - Map statementConfig = - JSONUtil.toMap(statement.getStatement(), String.class, Object.class); + Map statementConfig = JSONUtil.toMap(statement.getStatement(), String.class, + Object.class); gatewayConfig.putAll(statementConfig); } jobConfig.buildGatewayConfig(gatewayConfig); @@ -821,14 +831,14 @@ private JobConfig buildJobConfig(Task task) { else if (Dialect.KUBERNETES_APPLICATION.equalsVal(task.getDialect()) && GatewayType.KUBERNETES_APPLICATION.equalsValue(config.getType())) { Map taskConfig = JSONUtil.toMap(task.getStatement(), String.class, Object.class); - Map clusterConfiguration = - clusterConfigurationService.getGatewayConfig(task.getClusterConfigurationId()); + Map clusterConfiguration = clusterConfigurationService + .getGatewayConfig(task.getClusterConfigurationId()); clusterConfiguration.putAll((Map) taskConfig.get("appConfig")); clusterConfiguration.put("taskCustomConfig", taskConfig); config.buildGatewayConfig(clusterConfiguration); } else { - Map gatewayConfig = - clusterConfigurationService.getGatewayConfig(task.getClusterConfigurationId()); + Map gatewayConfig = clusterConfigurationService + .getGatewayConfig(task.getClusterConfigurationId()); // submit application type with clusterConfiguration if (GatewayType.YARN_APPLICATION.equalsValue(config.getType()) || GatewayType.KUBERNETES_APPLICATION.equalsValue(config.getType())) { @@ -903,9 +913,9 @@ public JobInstance refreshJobInstance(Integer id, boolean isCoercive) { if (!isCoercive && !inRefreshPlan(jobInfoDetail.getInstance())) { return jobInfoDetail.getInstance(); } - JobHistory jobHistoryJson = - jobHistoryService.refreshJobHistory(id, jobInfoDetail.getCluster().getJobManagerHost(), - jobInfoDetail.getInstance().getJid(), jobInfoDetail.isNeedSave()); + JobHistory jobHistoryJson = jobHistoryService.refreshJobHistory(id, + jobInfoDetail.getCluster().getJobManagerHost(), + jobInfoDetail.getInstance().getJid(), jobInfoDetail.isNeedSave()); JobHistory jobHistory = jobHistoryService.getJobHistoryInfo(jobHistoryJson); jobInfoDetail.setJobHistory(jobHistory); JobStatus checkStatus = null; @@ -977,7 +987,6 @@ public String getTaskAPIAddress() { @Override public Integer queryAllSizeByName(String name) { - return baseMapper.queryAllSizeByName(name); } @@ -996,8 +1005,8 @@ public String exportJsonByTaskId(Integer taskId) { ((ObjectNode) jsonNode).put("path", getTaskPathByTaskId(taskId)); // clusterConfigurationName if (Asserts.isNotNull(task.getClusterConfigurationId())) { - ClusterConfiguration clusterConfiguration = - clusterConfigurationService.getById(task.getClusterConfigurationId()); + ClusterConfiguration clusterConfiguration = clusterConfigurationService + .getById(task.getClusterConfigurationId()); ((ObjectNode) jsonNode).put("clusterConfigurationName", Asserts.isNotNull(clusterConfiguration) ? clusterConfiguration.getName() : null); } @@ -1082,8 +1091,8 @@ public Result buildTaskByJsonNode(JsonNode jsonNode, ObjectMapper mapper) throws } } if (Asserts.isNotNull(task.getDatabaseName())) { - DataBase dataBase = - dataBaseService.getOne(new QueryWrapper().eq("name", task.getDatabaseName())); + DataBase dataBase = dataBaseService + .getOne(new QueryWrapper().eq("name", task.getDatabaseName())); if (Asserts.isNotNull(dataBase)) { task.setDatabaseId(dataBase.getId()); } @@ -1099,8 +1108,8 @@ public Result buildTaskByJsonNode(JsonNode jsonNode, ObjectMapper mapper) throws * task.getEnvName())); if(Asserts.isNotNull(task1)){ task.setEnvId(task1.getId()); } } */ if (Asserts.isNotNull(task.getAlertGroupName())) { - AlertGroup alertGroup = - alertGroupService.getOne(new QueryWrapper().eq("name", task.getAlertGroupName())); + AlertGroup alertGroup = alertGroupService + .getOne(new QueryWrapper().eq("name", task.getAlertGroupName())); if (Asserts.isNotNull(alertGroup)) { task.setAlertGroupId(alertGroup.getId()); } @@ -1205,7 +1214,7 @@ public void handleJobDone(JobInstance jobInstance) { return; } Integer jobInstanceId = jobInstance.getId(); - JobHistory jobHistory = jobHistoryService.getById(jobInstanceId); // 获取任务历史信息 + JobHistory jobHistory = jobHistoryService.getJobHistory(jobInstanceId); // 获取任务历史信息 String jobJson = jobHistory.getJobJson(); // 获取任务历史信息的jobJson ObjectNode jsonNodes = JSONUtil.parseObject(jobJson); if (jsonNodes.has("errors")) { @@ -1313,7 +1322,7 @@ private List> dealWithCatalogue(List catalogueList) @Override public Result> queryOnLineTaskByDoneStatus(List jobLifeCycle, List jobStatuses, - boolean includeNull, Integer catalogueId) { + boolean includeNull, Integer catalogueId) { final Tree node = ((Tree) queryAllCatalogue().getDatas()) .getNode(Objects.isNull(catalogueId) ? 0 : catalogueId); final List parentIds = new ArrayList<>(0); @@ -1324,7 +1333,7 @@ public Result> queryOnLineTaskByDoneStatus(List jobLife } private List getTasks(List jobLifeCycle, List jobStatuses, boolean includeNull, - List parentIds) { + List parentIds) { return this.baseMapper.queryOnLineTaskByDoneStatus(parentIds, jobLifeCycle.stream().filter(Objects::nonNull).map(JobLifeCycle::getValue).collect(Collectors.toList()), includeNull, jobStatuses.stream().map(JobStatus::name).collect(Collectors.toList())); @@ -1346,8 +1355,8 @@ private void childrenNodeParse(Tree node, List parentIds) { @Override public void selectSavepointOnLineTask(TaskOperatingResult taskOperatingResult) { - final JobInstance jobInstanceByTaskId = - jobInstanceService.getJobInstanceByTaskId(taskOperatingResult.getTask().getId()); + final JobInstance jobInstanceByTaskId = jobInstanceService + .getJobInstanceByTaskId(taskOperatingResult.getTask().getId()); if (jobInstanceByTaskId == null) { startGoingLiveTask(taskOperatingResult, null); return; @@ -1364,11 +1373,8 @@ public void selectSavepointOnLineTask(TaskOperatingResult taskOperatingResult) { } private void findTheConditionSavePointToOnline(TaskOperatingResult taskOperatingResult, - JobInstance jobInstanceByTaskId) { - final LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() - .select(JobHistory::getId, JobHistory::getCheckpointsJson) - .eq(JobHistory::getId, jobInstanceByTaskId.getId()); - final JobHistory jobHistory = jobHistoryService.getOne(queryWrapper); + JobInstance jobInstanceByTaskId) { + final JobHistory jobHistory = jobHistoryService.getJobHistory(jobInstanceByTaskId.getId()); if (jobHistory != null && StringUtils.isNotBlank(jobHistory.getCheckpointsJson())) { final ObjectNode jsonNodes = JSONUtil.parseObject(jobHistory.getCheckpointsJson()); final ArrayNode history = jsonNodes.withArray("history"); diff --git a/dlink-admin/src/main/java/com/dlink/utils/UDFUtils.java b/dlink-admin/src/main/java/com/dlink/utils/UDFUtils.java index 61580f680c..b594b3908a 100644 --- a/dlink-admin/src/main/java/com/dlink/utils/UDFUtils.java +++ b/dlink-admin/src/main/java/com/dlink/utils/UDFUtils.java @@ -43,8 +43,7 @@ */ public class UDFUtils extends UDFUtil { - private static final String FUNCTION_SQL_REGEX = - "create\\s+.*function\\s+(.*)\\s+as\\s+'(.*)'(\\s+language (.*))?;"; + private static final String FUNCTION_SQL_REGEX = "create\\s+.*function\\s+(.*)\\s+as\\s+'(.*)'(\\s+language (.*))?;"; public static List getUDF(String statement) { ProcessEntity process = ProcessContextHolder.getProcess(); @@ -65,7 +64,9 @@ public static List getUDF(String statement) { .build(); }).collect(Collectors.toList()); List classNameList = udfList.stream().map(UDF::getClassName).collect(Collectors.toList()); - process.info(StringUtils.join(",", classNameList)); + if (classNameList.size() > 0) { + process.info(StringUtils.join(classNameList, ",")); + } process.info(CharSequenceUtil.format("A total of {} UDF have been Parsed.", classNameList.size())); return udfList; } diff --git a/dlink-admin/src/main/resources/mapper/JobHistoryMapper.xml b/dlink-admin/src/main/resources/mapper/JobHistoryMapper.xml index 4133fad3f9..1c407eb6d1 100644 --- a/dlink-admin/src/main/resources/mapper/JobHistoryMapper.xml +++ b/dlink-admin/src/main/resources/mapper/JobHistoryMapper.xml @@ -9,13 +9,6 @@ limit 1 - - insert into dlink_job_history (id,job_json,exceptions_json,checkpoints_json,checkpoints_config_json,config_json, - jar_json,cluster_json,cluster_configuration_json,update_time) - values (#{id},#{jobJson},#{exceptionsJson},#{checkpointsJson},#{checkpointsConfigJson},#{configJson}, - #{jarJson},#{clusterJson},#{clusterConfigurationJson},#{updateTime}) - -