package com.jxdinfo.hussar.support.job.execution.core.executor;

import akka.actor.ActorSelection;
import com.google.common.base.Stopwatch;
import com.jxdinfo.hussar.support.job.core.enums.ExecuteType;
import com.jxdinfo.hussar.support.job.core.execution.model.InstanceInfo;
import com.jxdinfo.hussar.support.job.core.execution.model.TaskDO;
import com.jxdinfo.hussar.support.job.core.execution.request.ProcessorReportTaskStatusReq;
import com.jxdinfo.hussar.support.job.core.serialize.SerializerUtils;
import com.jxdinfo.hussar.support.job.execution.common.ThreadLocalStore;
import com.jxdinfo.hussar.support.job.execution.common.WorkerRuntime;
import com.jxdinfo.hussar.support.job.execution.common.constants.TaskConstant;
import com.jxdinfo.hussar.support.job.execution.common.constants.TaskStatus;
import com.jxdinfo.hussar.support.job.execution.common.utils.AkkaUtils;
import com.jxdinfo.hussar.support.job.execution.common.utils.WorkflowContextUtils;
import com.jxdinfo.hussar.support.job.execution.core.processor.ProcessResult;
import com.jxdinfo.hussar.support.job.execution.core.processor.TaskContext;
import com.jxdinfo.hussar.support.job.execution.core.processor.TaskResult;
import com.jxdinfo.hussar.support.job.execution.core.processor.WorkflowContext;
import com.jxdinfo.hussar.support.job.execution.core.processor.sdk.BasicProcessor;
import com.jxdinfo.hussar.support.job.execution.core.processor.sdk.BroadcastProcessor;
import com.jxdinfo.hussar.support.job.execution.core.processor.sdk.MapReduceProcessor;
import com.jxdinfo.hussar.support.job.execution.core.tracker.singlemodel.SingleTaskTrackerActor;
import com.jxdinfo.hussar.support.job.execution.log.OmsLogger;
import com.jxdinfo.hussar.support.job.execution.support.ProcessorListener;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:BOOT-INF/lib/hussar-job-execution-8.3.4-cus-gyzq.23.jar:com/jxdinfo/hussar/support/job/execution/core/executor/ProcessorRunnable.class */
public class ProcessorRunnable implements Runnable {
    private static Logger log = LoggerFactory.getLogger((Class<?>) ProcessorRunnable.class);
    private final InstanceInfo instanceInfo;
    private final ActorSelection taskTrackerActor;
    private final TaskDO task;
    private final BasicProcessor processor;
    private final OmsLogger omsLogger;
    private final ProcessorListener processorListener;
    private final ClassLoader classLoader;
    private final Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue;
    private final WorkerRuntime workerRuntime;

    public void innerRun() throws InterruptedException {
        ProcessResult processResult;
        String taskId = this.task.getTaskId();
        Long instanceId = this.task.getInstanceId();
        log.debug("[ProcessorRunnable-{}] start to run task(taskId={}&taskName={})", instanceId, taskId, this.task.getTaskName());
        ThreadLocalStore.setTask(this.task);
        ThreadLocalStore.setRuntimeMeta(this.workerRuntime);
        WorkflowContext constructWorkflowContext = constructWorkflowContext();
        TaskContext constructTaskContext = constructTaskContext();
        constructTaskContext.setWorkflowContext(constructWorkflowContext);
        reportStatus(TaskStatus.WORKER_PROCESSING, null, null, null);
        ExecuteType valueOf = ExecuteType.valueOf(this.instanceInfo.getExecuteType());
        if (TaskConstant.ROOT_TASK_NAME.equals(this.task.getTaskName()) && valueOf == ExecuteType.BROADCAST) {
            handleBroadcastRootTask(instanceId, constructTaskContext);
            return;
        }
        try {
            if (TaskConstant.LAST_TASK_NAME.equals(this.task.getTaskName())) {
                handleLastTask(taskId, instanceId, constructTaskContext, valueOf);
                return;
            }
            try {
                this.processorListener.beforeProcess(constructTaskContext);
                processResult = this.processor.process(constructTaskContext);
                if (processResult == null) {
                    processResult = new ProcessResult(false, "ProcessResult can't be null");
                }
                try {
                    this.processorListener.afterProcess(constructTaskContext);
                } catch (Throwable th) {
                    log.warn("[ProcessorRunnable-{}] task(id={},name={}) process failed. afterProcess failed.", instanceId, constructTaskContext.getTaskId(), constructTaskContext.getTaskName(), th);
                    processResult = new ProcessResult(false, th.toString());
                }
            } catch (Throwable th2) {
                log.warn("[ProcessorRunnable-{}] task(id={},name={}) process failed.", instanceId, constructTaskContext.getTaskId(), constructTaskContext.getTaskName(), th2);
                processResult = new ProcessResult(false, th2.toString());
                try {
                    this.processorListener.afterProcess(constructTaskContext);
                } catch (Throwable th3) {
                    log.warn("[ProcessorRunnable-{}] task(id={},name={}) process failed. afterProcess failed.", instanceId, constructTaskContext.getTaskId(), constructTaskContext.getTaskName(), th3);
                    processResult = new ProcessResult(false, th3.toString());
                }
            }
            reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), null, constructWorkflowContext.getAppendedContextData());
        } catch (Throwable th4) {
            try {
                this.processorListener.afterProcess(constructTaskContext);
            } catch (Throwable th5) {
                log.warn("[ProcessorRunnable-{}] task(id={},name={}) process failed. afterProcess failed.", instanceId, constructTaskContext.getTaskId(), constructTaskContext.getTaskName(), th5);
                new ProcessResult(false, th5.toString());
            }
            throw th4;
        }
    }

    private TaskContext constructTaskContext() {
        TaskContext taskContext = new TaskContext();
        BeanUtils.copyProperties(this.task, taskContext);
        taskContext.setJobId(this.instanceInfo.getJobId());
        taskContext.setMaxRetryTimes(this.instanceInfo.getTaskRetryNum());
        taskContext.setCurrentRetryTimes(this.task.getFailedCnt().intValue());
        taskContext.setJobParams(this.instanceInfo.getJobParams());
        taskContext.setInstanceParams(this.instanceInfo.getInstanceParams());
        taskContext.setOmsLogger(this.omsLogger);
        if (this.task.getTaskContent() != null && this.task.getTaskContent().length > 0) {
            taskContext.setSubTask(SerializerUtils.deSerialized(this.task.getTaskContent()));
        }
        taskContext.setDbName(this.instanceInfo.getDbName());
        taskContext.setTenantCode(this.instanceInfo.getTenantCode());
        taskContext.setTenantId(this.instanceInfo.getTenantId());
        taskContext.setUserContext(this.workerRuntime.getWorkerConfig().getUserContext());
        return taskContext;
    }

    private WorkflowContext constructWorkflowContext() {
        return new WorkflowContext(this.instanceInfo.getWfInstanceId(), this.instanceInfo.getInstanceParams());
    }

    private void handleLastTask(String str, Long l, TaskContext taskContext, ExecuteType executeType) {
        ProcessResult processResult;
        Stopwatch createStarted = Stopwatch.createStarted();
        log.debug("[ProcessorRunnable-{}] the last task(taskId={}) start to process.", l, str);
        List<TaskResult> allTaskResult = this.workerRuntime.getTaskPersistenceService().getAllTaskResult(l, this.task.getSubInstanceId());
        try {
            switch (executeType) {
                case BROADCAST:
                    if (!(this.processor instanceof BroadcastProcessor)) {
                        processResult = BroadcastProcessor.defaultResult(allTaskResult);
                        break;
                    } else {
                        processResult = ((BroadcastProcessor) this.processor).postProcess(taskContext, allTaskResult);
                        break;
                    }
                case MAP_REDUCE:
                    if (!(this.processor instanceof MapReduceProcessor)) {
                        processResult = new ProcessResult(false, "not implement the MapReduceProcessor");
                        break;
                    } else {
                        processResult = ((MapReduceProcessor) this.processor).reduce(taskContext, allTaskResult);
                        break;
                    }
                default:
                    processResult = new ProcessResult(false, "IMPOSSIBLE OR BUG");
                    break;
            }
        } catch (Throwable th) {
            processResult = new ProcessResult(false, th.toString());
            log.warn("[ProcessorRunnable-{}] execute last task(taskId={}) failed.", l, str, th);
        }
        reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), null, taskContext.getWorkflowContext().getAppendedContextData());
        log.info("[ProcessorRunnable-{}] the last task execute successfully, using time: {}", l, createStarted);
    }

    private void handleBroadcastRootTask(Long l, TaskContext taskContext) {
        ProcessResult processResult;
        if (this.processor instanceof BroadcastProcessor) {
            try {
                processResult = ((BroadcastProcessor) this.processor).preProcess(taskContext);
            } catch (Throwable th) {
                log.warn("[ProcessorRunnable-{}] broadcast task preProcess failed.", l, th);
                processResult = new ProcessResult(false, th.toString());
            }
        } else {
            processResult = new ProcessResult(true, "NO_PREPOST_TASK");
        }
        reportStatus(processResult.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED, suit(processResult.getMsg()), ProcessorReportTaskStatusReq.BROADCAST, taskContext.getWorkflowContext().getAppendedContextData());
    }

    private void reportStatus(TaskStatus taskStatus, String str, Integer num, Map<String, String> map) {
        ProcessorReportTaskStatusReq processorReportTaskStatusReq = new ProcessorReportTaskStatusReq();
        processorReportTaskStatusReq.setInstanceId(this.task.getInstanceId());
        processorReportTaskStatusReq.setSubInstanceId(this.task.getSubInstanceId());
        processorReportTaskStatusReq.setTaskId(this.task.getTaskId());
        processorReportTaskStatusReq.setStatus(taskStatus.getValue());
        processorReportTaskStatusReq.setResult(str);
        processorReportTaskStatusReq.setReportTime(System.currentTimeMillis());
        processorReportTaskStatusReq.setCmd(num);
        if (this.instanceInfo.getWfInstanceId() != null && WorkflowContextUtils.isExceededLengthLimit(map, this.workerRuntime.getWorkerConfig().getMaxAppendedWfContextLength())) {
            log.warn("[ProcessorRunnable-{}]current length of appended workflow context data is greater than {}, this appended workflow context data will be ignore!", this.instanceInfo.getInstanceId(), Integer.valueOf(this.workerRuntime.getWorkerConfig().getMaxAppendedWfContextLength()));
            map = Collections.emptyMap();
        }
        processorReportTaskStatusReq.setAppendedWfContext(map);
        if (TaskStatus.finishedStatus.contains(Integer.valueOf(taskStatus.getValue()))) {
            if (this.workerRuntime.isSingleModel() ? SingleTaskTrackerActor.ask(processorReportTaskStatusReq).isSuccess() : AkkaUtils.reliableTransmit(this.taskTrackerActor, processorReportTaskStatusReq)) {
                return;
            }
            this.statusReportRetryQueue.add(processorReportTaskStatusReq);
            log.warn("[ProcessorRunnable-{}] report task(id={},status={},result={}) failed, will retry later", this.task.getInstanceId(), this.task.getTaskId(), taskStatus, str);
            return;
        }
        if (this.workerRuntime.isSingleModel()) {
            SingleTaskTrackerActor.tell(processorReportTaskStatusReq);
        } else {
            this.taskTrackerActor.tell(processorReportTaskStatusReq, null);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        Thread.currentThread().setContextClassLoader(this.classLoader);
        try {
            try {
                innerRun();
                ThreadLocalStore.clear();
            } catch (InterruptedException e) {
                ThreadLocalStore.clear();
            } catch (Throwable th) {
                reportStatus(TaskStatus.WORKER_PROCESS_FAILED, th.toString(), null, null);
                log.error("[ProcessorRunnable-{}] execute failed, please contact the author(@KFCFans) to fix the bug!", this.task.getInstanceId(), th);
                ThreadLocalStore.clear();
            }
        } catch (Throwable th2) {
            ThreadLocalStore.clear();
            throw th2;
        }
    }

    private String suit(String str) {
        if (StringUtils.isEmpty(str)) {
            return "";
        }
        int maxResultLength = this.workerRuntime.getWorkerConfig().getMaxResultLength();
        if (str.length() <= maxResultLength) {
            return str;
        }
        log.warn("[ProcessorRunnable-{}] task(taskId={})'s result is too large({}>{}), a part will be discarded.", this.task.getInstanceId(), this.task.getTaskId(), Integer.valueOf(str.length()), Integer.valueOf(maxResultLength));
        return str.substring(0, maxResultLength).concat("...");
    }

    public ProcessorRunnable(InstanceInfo instanceInfo, ActorSelection actorSelection, TaskDO taskDO, BasicProcessor basicProcessor, OmsLogger omsLogger, ClassLoader classLoader, Queue<ProcessorReportTaskStatusReq> queue, WorkerRuntime workerRuntime, ProcessorListener processorListener) {
        this.instanceInfo = instanceInfo;
        this.taskTrackerActor = actorSelection;
        this.task = taskDO;
        this.processor = basicProcessor;
        this.omsLogger = omsLogger;
        this.classLoader = classLoader;
        this.statusReportRetryQueue = queue;
        this.workerRuntime = workerRuntime;
        this.processorListener = processorListener;
    }
}
