Skip to content

0.7.0-rc2 #1334

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Nov 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions dlink-admin/src/main/java/com/dlink/job/Job2MysqlHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public class Job2MysqlHandler implements JobHandler {
static {
historyService = SpringContextUtils.getBean("historyServiceImpl", HistoryService.class);
clusterService = SpringContextUtils.getBean("clusterServiceImpl", ClusterService.class);
clusterConfigurationService = SpringContextUtils.getBean("clusterConfigurationServiceImpl", ClusterConfigurationService.class);
clusterConfigurationService = SpringContextUtils.getBean("clusterConfigurationServiceImpl",
ClusterConfigurationService.class);
jarService = SpringContextUtils.getBean("jarServiceImpl", JarService.class);
jobInstanceService = SpringContextUtils.getBean("jobInstanceServiceImpl", JobInstanceService.class);
jobHistoryService = SpringContextUtils.getBean("jobHistoryServiceImpl", JobHistoryService.class);
Expand Down Expand Up @@ -132,12 +133,12 @@ public boolean success() {
final Integer clusterConfigurationId = job.getJobConfig().getClusterConfigurationId();
if (job.isUseGateway()) {
cluster = clusterService.registersCluster(Cluster.autoRegistersCluster(
job.getJobManagerAddress(),
job.getJobId(),
job.getJobConfig().getJobName() + LocalDateTime.now(),
job.getType().getLongValue(),
clusterConfigurationId,
taskId));
job.getJobManagerAddress(),
job.getJobId(),
job.getJobConfig().getJobName() + LocalDateTime.now(),
job.getType().getLongValue(),
clusterConfigurationId,
taskId));
if (Asserts.isNotNull(cluster)) {
clusterId = cluster.getId();
}
Expand Down Expand Up @@ -174,11 +175,12 @@ public boolean success() {
jobHistory.setClusterJson(JSONUtil.toJsonString(cluster));

jobHistory.setJarJson(Asserts.isNotNull(job.getJobConfig().getJarId())
? JSONUtil.toJsonString(jarService.getById(job.getJobConfig().getJarId())) : null);
? JSONUtil.toJsonString(jarService.getById(job.getJobConfig().getJarId()))
: null);

jobHistory.setClusterConfigurationJson(Asserts.isNotNull(clusterConfigurationId)
? JSONUtil.toJsonString(clusterConfigurationService.getClusterConfigById(clusterConfigurationId))
: null);
? JSONUtil.toJsonString(clusterConfigurationService.getClusterConfigById(clusterConfigurationId))
: null);
jobHistoryService.save(jobHistory);

DaemonFactory.addTask(DaemonTaskConfig.build(FlinkJobTask.TYPE, jobInstance.getId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,4 @@ public interface JobHistoryMapper extends SuperMapper<JobHistory> {

@InterceptorIgnore(tenantLine = "true")
JobHistory getByIdWithoutTenant(Integer id);

int insert(JobHistory jobHistory);

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public interface JobInstanceMapper extends SuperMapper<JobInstance> {

List<JobInstanceCount> countHistoryStatus();

@InterceptorIgnore(tenantLine = "true")
List<JobInstance> listJobInstanceActive();

JobInstance getJobInstanceByTaskId(Integer id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,18 @@
@Service
public class JobHistoryServiceImpl extends SuperServiceImpl<JobHistoryMapper, JobHistory> implements JobHistoryService {

private static final Logger log = LoggerFactory.getLogger(JobHistoryServiceImpl.class);

@Override
public JobHistory getByIdWithoutTenant(Integer id) {
return baseMapper.getByIdWithoutTenant(id);
}

@Override
public JobHistory getJobHistory(Integer id) {
return getJobHistoryInfo(getById(id));
return getJobHistoryInfo(baseMapper.getByIdWithoutTenant(id));
}

private static final Logger log = LoggerFactory.getLogger(JobHistoryServiceImpl.class);

@Override
public JobHistory getJobHistoryInfo(JobHistory jobHistory) {
if (Asserts.isNotNull(jobHistory)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ public class StudioServiceImpl implements StudioService {
private final UDFService udfService;

public StudioServiceImpl(ClusterService clusterService,
ClusterConfigurationService clusterConfigurationService,
SavepointsService savepointsService,
DataBaseService dataBaseService,
TaskService taskService,
FragmentVariableService fragmentVariableService,
UDFService udfService) {
ClusterConfigurationService clusterConfigurationService,
SavepointsService savepointsService,
DataBaseService dataBaseService,
TaskService taskService,
FragmentVariableService fragmentVariableService,
UDFService udfService) {
this.clusterService = clusterService;
this.clusterConfigurationService = clusterConfigurationService;
this.savepointsService = savepointsService;
Expand All @@ -120,17 +120,17 @@ public StudioServiceImpl(ClusterService clusterService,

private void addFlinkSQLEnv(AbstractStatementDTO statementDTO) {
ProcessEntity process = ProcessContextHolder.getProcess();
process.info("Start Initialize FlinkSQLEnv:");
process.info("Start initialize FlinkSQLEnv:");
if (statementDTO.isFragment()) {
process.config("Variable opened.");

// initialize global variables
process.info("Initializing Global Variables...");
process.info("Initializing global variables...");
statementDTO.setVariables(fragmentVariableService.listEnabledVariables());
process.infoSuccess();

// initialize database variables
process.info("Initializing Database Variables...");
process.info("Initializing database variables...");
String flinkWithSql = dataBaseService.getEnabledFlinkWithSql();
if (Asserts.isNotNullString(flinkWithSql)) {
statementDTO.setStatement(flinkWithSql + "\n" + statementDTO.getStatement());
Expand All @@ -152,7 +152,7 @@ private void addFlinkSQLEnv(AbstractStatementDTO statementDTO) {
process.info("No FlinkSQLEnv are loaded.");
}
}
process.info("Finish Initialize FlinkSQLEnv.");
process.info("Finish initialize FlinkSQLEnv.");
}

private void buildSession(JobConfig config) {
Expand All @@ -178,16 +178,15 @@ private JobResult executeFlinkSql(StudioExecuteDTO studioExecuteDTO) {
ProcessEntity process = ProcessContextHolder.registerProcess(
ProcessEntity.init(ProcessType.FLINKEXECUTE, StpUtil.getLoginIdAsInt()));
addFlinkSQLEnv(studioExecuteDTO);
process.info("Initializing Flink Job Config...");
process.info("Initializing Flink job config...");
JobConfig config = studioExecuteDTO.getJobConfig();
buildSession(config);
// init UDF
udfService.init(studioExecuteDTO.getStatement(), config);
JobManager jobManager = JobManager.build(config);
process.infoSuccess();
process.start();
JobResult jobResult = jobManager.executeSql(studioExecuteDTO.getStatement());
process.finish();
process.finish("Execute Flink SQL succeed.");
RunTimeUtil.recovery(jobManager);
return jobResult;
}
Expand All @@ -208,7 +207,7 @@ public JobResult executeCommonSql(SqlDTO sqlDTO) {
JobResult result = new JobResult();
result.setStatement(sqlDTO.getStatement());
result.setStartTimeNow();
process.info("Initializing Database Connection...");
process.info("Initializing database connection...");
if (Asserts.isNull(sqlDTO.getDatabaseId())) {
result.setSuccess(false);
result.setError("请指定数据源");
Expand All @@ -229,7 +228,7 @@ public JobResult executeCommonSql(SqlDTO sqlDTO) {
process.start();
selectResult = driver.executeSql(sqlDTO.getStatement(), sqlDTO.getMaxRowNum());
}
process.finish();
process.finish("Execute sql succeed.");
result.setResult(selectResult);
if (selectResult.isSuccess()) {
result.setSuccess(true);
Expand Down Expand Up @@ -265,26 +264,25 @@ private List<SqlExplainResult> explainFlinkSql(StudioExecuteDTO studioExecuteDTO
ProcessEntity process = ProcessContextHolder.registerProcess(
ProcessEntity.init(ProcessType.FLINKEXPLAIN, StpUtil.getLoginIdAsInt()));
addFlinkSQLEnv(studioExecuteDTO);
process.info("Initializing Flink Job Config...");
process.info("Initializing Flink job config...");
JobConfig config = studioExecuteDTO.getJobConfig();
// If you are using explainSql | getStreamGraph | getJobPlan, make the dialect change to local.
config.buildLocal();
buildSession(config);
// init UDF
udfService.init(studioExecuteDTO.getStatement(), config);
JobManager jobManager = JobManager.build(config);
process.infoSuccess();
process.start();
List<SqlExplainResult> sqlExplainResults =
jobManager.explainSql(studioExecuteDTO.getStatement()).getSqlExplainResults();
List<SqlExplainResult> sqlExplainResults = jobManager.explainSql(studioExecuteDTO.getStatement())
.getSqlExplainResults();
process.finish();
return sqlExplainResults;
}

private List<SqlExplainResult> explainCommonSql(StudioExecuteDTO studioExecuteDTO) {
ProcessEntity process = ProcessContextHolder.registerProcess(
ProcessEntity.init(ProcessType.SQLEXPLAIN, StpUtil.getLoginIdAsInt()));
process.info("Initializing Flink Job Config...");
process.info("Initializing database connection...");
if (Asserts.isNull(studioExecuteDTO.getDatabaseId())) {
process.error("The database does not exist.");
return Collections.singletonList(
Expand Down Expand Up @@ -417,8 +415,8 @@ public boolean cancel(Integer clusterId, String jobId) {
JobConfig jobConfig = new JobConfig();
jobConfig.setAddress(cluster.getJobManagerHost());
if (Asserts.isNotNull(cluster.getClusterConfigurationId())) {
Map<String, Object> gatewayConfig =
clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
Map<String, Object> gatewayConfig = clusterConfigurationService
.getGatewayConfig(cluster.getClusterConfigurationId());
jobConfig.buildGatewayConfig(gatewayConfig);
}
JobManager jobManager = JobManager.build(jobConfig);
Expand All @@ -436,8 +434,8 @@ public boolean savepoint(Integer taskId, Integer clusterId, String jobId, String
jobConfig.setType(cluster.getType());
if (Asserts.isNotNull(cluster.getClusterConfigurationId())) {
// 如果用户选择用dlink平台来托管集群信息 说明任务一定是从dlink发起提交的
Map<String, Object> gatewayConfig =
clusterConfigurationService.getGatewayConfig(cluster.getClusterConfigurationId());
Map<String, Object> gatewayConfig = clusterConfigurationService
.getGatewayConfig(cluster.getClusterConfigurationId());
jobConfig.buildGatewayConfig(gatewayConfig);
jobConfig.getGatewayConfig().getClusterConfig().setAppId(cluster.getName());
jobConfig.setTaskId(cluster.getTaskId());
Expand Down
Loading