package com.alibaba.schedulerx.worker.processor;

import akka.actor.Address;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.schedulerx.common.constants.CommonConstants;
import com.alibaba.schedulerx.common.domain.InstanceStatus;
import com.alibaba.schedulerx.common.util.ConfigUtil;
import com.alibaba.schedulerx.common.util.JsonUtil;
import com.alibaba.schedulerx.common.util.StringUtils;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.shade.com.google.common.collect.Lists;
import com.alibaba.schedulerx.shade.org.apache.commons.collections.CollectionUtils;
import com.alibaba.schedulerx.shade.org.apache.commons.configuration.Configuration;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.batch.ContainerStatusReqHandlerPool;
import com.alibaba.schedulerx.worker.domain.DataworksProcessorProfile;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.domain.WorkerConstants;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import com.alibaba.schedulerx.worker.logcollector.LogCollector;
import com.alibaba.schedulerx.worker.logcollector.LogCollectorFactory;
import com.alibaba.schedulerx.worker.util.WorkerIdGenerator;
import com.aliyun.dataworks_public20200518.Client;
import com.aliyun.dataworks_public20200518.models.GetDagRequest;
import com.aliyun.dataworks_public20200518.models.GetInstanceLogRequest;
import com.aliyun.dataworks_public20200518.models.GetInstanceLogResponse;
import com.aliyun.dataworks_public20200518.models.GetInstanceRequest;
import com.aliyun.dataworks_public20200518.models.GetInstanceResponse;
import com.aliyun.dataworks_public20200518.models.ListManualDagInstancesRequest;
import com.aliyun.dataworks_public20200518.models.ListManualDagInstancesResponseBody;
import com.aliyun.dataworks_public20200518.models.RestartInstanceRequest;
import com.aliyun.dataworks_public20200518.models.RunManualDagNodesRequest;
import com.aliyun.dataworks_public20200518.models.RunManualDagNodesResponse;
import com.aliyun.dataworks_public20200518.models.StopInstanceRequest;
import com.aliyun.teaopenapi.models.Config;
import java.util.List;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/alibaba/schedulerx/worker/processor/DataWorksProcessor.class */
public class DataWorksProcessor extends JavaProcessor {
    private static final List<String> COMMON_REGIONS = Lists.newArrayList("cn-qingdao", "cn-huhehaote");
    private static final Long CHECK_TIME = 30L;
    protected static final Logger logger = LogFactory.getLogger(DataWorksProcessor.class);
    private LogCollector logCollector = LogCollectorFactory.get();

    /* loaded from: input_file:com/alibaba/schedulerx/worker/processor/DataWorksProcessor$DataWorksResultInfo.class */
    class DataWorksResultInfo {
        private Long dataWorksDagId;
        private Long nodeInstanceId;
        private String url;
        private String message;

        public DataWorksResultInfo(Long l, Long l2, String str) {
            this.dataWorksDagId = l;
            this.nodeInstanceId = l2;
            this.message = str;
        }

        public Long getDataWorksDagId() {
            return this.dataWorksDagId;
        }

        public void setDataWorksDagId(Long l) {
            this.dataWorksDagId = l;
        }

        public Long getNodeInstanceId() {
            return this.nodeInstanceId;
        }

        public void setNodeInstanceId(Long l) {
            this.nodeInstanceId = l;
        }

        public String getUrl() {
            return this.url;
        }

        public void setUrl(String str) {
            this.url = str;
        }

        public String getMessage() {
            return this.message;
        }

        public void setMessage(String str) {
            this.message = str;
        }

        public String toString() {
            return JSON.toJSONString(this);
        }
    }

    /* loaded from: input_file:com/alibaba/schedulerx/worker/processor/DataWorksProcessor$DataWorksStatus.class */
    enum DataWorksStatus {
        FAILURE,
        SUCCESS
    }

    private Client createClient(JobContext jobContext) throws Exception {
        Configuration workerConfig = ConfigUtil.getWorkerConfig();
        String string = workerConfig.getString(WorkerConstants.ALIYUN_ACESSKEY);
        if (StringUtils.isEmpty(string)) {
            throw new RuntimeException("execute dataworks task ak can't be null.");
        }
        String string2 = workerConfig.getString(WorkerConstants.ALIYUN_SECRETKEY);
        if (StringUtils.isEmpty(string2)) {
            throw new RuntimeException("execute dataworks task sk can't be null.");
        }
        DataworksProcessorProfile dataworksProcessorProfile = (DataworksProcessorProfile) JsonUtil.fromJson(jobContext.getContent(), DataworksProcessorProfile.class);
        Config accessKeySecret = new Config().setAccessKeyId(string).setAccessKeySecret(string2);
        if (COMMON_REGIONS.contains(dataworksProcessorProfile.getRegion())) {
            accessKeySecret.endpoint = "dataworks.aliyuncs.com";
        } else {
            accessKeySecret.endpoint = "dataworks." + dataworksProcessorProfile.getRegion() + ".aliyuncs.com";
        }
        return new Client(accessKeySecret);
    }

    @Override // com.alibaba.schedulerx.worker.processor.JavaProcessor, com.alibaba.schedulerx.worker.processor.JobProcessorEx, com.alibaba.schedulerx.worker.processor.JobProcessor
    public ProcessResult process(JobContext jobContext) throws Exception {
        String status;
        Client createClient = createClient(jobContext);
        DataworksProcessorProfile dataworksProcessorProfile = (DataworksProcessorProfile) JsonUtil.fromJson(jobContext.getContent(), DataworksProcessorProfile.class);
        Long l = null;
        Long l2 = null;
        JSONObject jSONObject = null;
        try {
            jSONObject = JSON.parseObject(jobContext.getInstanceParameters());
        } catch (Exception e) {
            logger.warn("dataworks job instance parameters parse warning.");
        }
        if (jSONObject == null || jSONObject.getLong("nodeInstanceId") == null) {
            RunManualDagNodesResponse runManualDagNodes = createClient.runManualDagNodes(new RunManualDagNodesRequest().setProjectEnv(dataworksProcessorProfile.getProjectEnv()).setProjectName(dataworksProcessorProfile.getProjectName()).setBizDate(jobContext.getDataTime().minusDays(1).toString("yyyy-MM-dd HH:mm:ss")).setIncludeNodeIds(dataworksProcessorProfile.getTaskIds()).setNodeParameters(jobContext.getJobParameters()).setFlowName(dataworksProcessorProfile.getFlowName()));
            if (runManualDagNodes.getBody() != null) {
                l = runManualDagNodes.getBody().getDagId();
                if (l == null || l.longValue() <= 0) {
                    return new ProcessResult(false, new DataWorksResultInfo(l, null, "Execute dataworks task failed, instance id return null.").toString());
                }
                JSONObject jSONObject2 = new JSONObject();
                jSONObject2.put("dataWorksDagId", l);
                List instances = createClient.listManualDagInstances(new ListManualDagInstancesRequest().setProjectEnv(dataworksProcessorProfile.getProjectEnv()).setProjectName(dataworksProcessorProfile.getProjectName()).setDagId(l.toString())).getBody().getInstances();
                if (!CollectionUtils.isNotEmpty(instances)) {
                    return new ProcessResult(false, new DataWorksResultInfo(l, null, "Execute dataworks task failed, node instance not exists.").toString());
                }
                l2 = ((ListManualDagInstancesResponseBody.ListManualDagInstancesResponseBodyInstances) instances.get(0)).getInstanceId();
                jSONObject2.put("nodeInstanceId", l2);
                DataWorksResultInfo dataWorksResultInfo = new DataWorksResultInfo(l, l2, null);
                reportTaskStatus(jobContext, new ProcessResult(InstanceStatus.RUNNING, dataWorksResultInfo.toString()), dataWorksResultInfo.toString());
            }
        } else {
            l = jSONObject.getLong("dataWorksDagId");
            l2 = jSONObject.getLong("nodeInstanceId");
            GetInstanceResponse client = createClient.getInstance(new GetInstanceRequest().setInstanceId(l2).setProjectEnv(dataworksProcessorProfile.getProjectEnv()));
            if (!client.getBody().getSuccess().booleanValue()) {
                return new ProcessResult(false, new DataWorksResultInfo(l, l2, "Execute dataworks task restart failed. " + client.getBody().getErrorMessage()).toString());
            }
            String status2 = client.getBody().getData().getStatus();
            if ((DataWorksStatus.FAILURE.name().equals(status2) || DataWorksStatus.SUCCESS.name().equals(status2)) && !createClient.restartInstance(new RestartInstanceRequest().setInstanceId(l2).setProjectEnv(dataworksProcessorProfile.getProjectEnv())).getBody().getData().booleanValue()) {
                return new ProcessResult(false, new DataWorksResultInfo(l, l2, "Execute dataworks task failed, instance id return null.").toString());
            }
        }
        do {
            TimeUnit.SECONDS.sleep(CHECK_TIME.longValue());
            status = createClient.getDag(new GetDagRequest().setDagId(l).setProjectEnv(dataworksProcessorProfile.getProjectEnv())).getBody().getData().getStatus();
            if (DataWorksStatus.FAILURE.name().equals(status)) {
                GetInstanceLogResponse instanceLog = createClient.getInstanceLog(new GetInstanceLogRequest().setInstanceId(l2).setProjectEnv(dataworksProcessorProfile.getProjectEnv()));
                String str = null;
                if (instanceLog != null && instanceLog.getBody().success.booleanValue() && StringUtils.isNotEmpty(instanceLog.getBody().getData())) {
                    String[] split = instanceLog.getBody().getData().split("\n");
                    str = split[Math.max(0, split.length - 8)];
                }
                this.logCollector.collect(jobContext.getUniqueId(), instanceLog.getBody().getData());
                return new ProcessResult(false, new DataWorksResultInfo(l, l2, "Execute dataworks task failed. case by:" + str).toString());
            }
        } while (!DataWorksStatus.SUCCESS.name().equals(status));
        this.logCollector.collect(jobContext.getUniqueId(), createClient.getInstanceLog(new GetInstanceLogRequest().setInstanceId(l2).setProjectEnv(dataworksProcessorProfile.getProjectEnv())).getBody().getData());
        return new ProcessResult(true, new DataWorksResultInfo(l, l2, "Execute dataworks task success.").toString());
    }

    @Override // com.alibaba.schedulerx.worker.processor.JavaProcessor, com.alibaba.schedulerx.worker.processor.JobProcessorEx
    public void kill(JobContext jobContext) {
        try {
            DataworksProcessorProfile dataworksProcessorProfile = (DataworksProcessorProfile) JsonUtil.fromJson(jobContext.getContent(), DataworksProcessorProfile.class);
            createClient(jobContext).stopInstance(new StopInstanceRequest().setInstanceId(JSON.parseObject(jobContext.getInstanceParameters()).getLong("nodeInstanceId")).setProjectEnv(dataworksProcessorProfile.getProjectEnv()));
        } catch (Exception e) {
            logger.error("dataworks job instance stop failed.");
        }
    }

    private void reportTaskStatus(JobContext jobContext, ProcessResult processResult, String str) {
        Worker.ContainerReportTaskStatusRequest.Builder newBuilder = Worker.ContainerReportTaskStatusRequest.newBuilder();
        newBuilder.setJobId(jobContext.getJobId());
        newBuilder.setJobInstanceId(jobContext.getJobInstanceId());
        newBuilder.setTaskId(jobContext.getTaskId());
        newBuilder.setStatus(processResult.getStatus().getValue());
        Address defaultAddress = SchedulerxWorker.actorSystem.provider().getDefaultAddress();
        newBuilder.setWorkerAddr(defaultAddress.host().get() + CommonConstants.ADDRESS_SEPARATOR + defaultAddress.port().get());
        newBuilder.setWorkerId(WorkerIdGenerator.get());
        newBuilder.setSerialNum(jobContext.getSerialNum());
        newBuilder.setInstanceMasterActorPath(jobContext.getInstanceMasterActorPath());
        if (jobContext.getTaskName() != null) {
            newBuilder.setTaskName(jobContext.getTaskName());
        }
        if (processResult.getResult() != null) {
            newBuilder.setResult(processResult.getResult());
        }
        newBuilder.setProgress(str);
        boolean submitReq = ConfigUtil.getWorkerConfig().getBoolean(WorkerConstants.SHARE_CONTAINER_POOL, false) ? ContainerStatusReqHandlerPool.INSTANCE.submitReq(0L, newBuilder.build()) : ContainerStatusReqHandlerPool.INSTANCE.submitReq(jobContext.getJobInstanceId(), newBuilder.build());
        logger.info("reportTaskStatus instanceId={} submitResult={}, processResult={}", jobContext.getUniqueId(), Boolean.valueOf(submitReq), processResult);
        if (submitReq) {
            return;
        }
        SchedulerxWorker.actorSystem.actorSelection(jobContext.getInstanceMasterActorPath()).tell(newBuilder.build(), null);
    }
}
