package com.jxdinfo.hussar.support.job.execution.core.tracker.processor;

import akka.actor.ActorSelection;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.jxdinfo.hussar.platform.core.utils.SpringContextUtil;
import com.jxdinfo.hussar.support.job.core.RemoteConstant;
import com.jxdinfo.hussar.support.job.core.enums.ExecuteType;
import com.jxdinfo.hussar.support.job.core.enums.ProcessorType;
import com.jxdinfo.hussar.support.job.core.enums.TimeExpressionType;
import com.jxdinfo.hussar.support.job.core.exception.JobRuntimeException;
import com.jxdinfo.hussar.support.job.core.execution.model.InstanceInfo;
import com.jxdinfo.hussar.support.job.core.execution.model.TaskDO;
import com.jxdinfo.hussar.support.job.core.execution.request.ProcessorReportTaskStatusReq;
import com.jxdinfo.hussar.support.job.core.execution.request.TaskTrackerStartTaskReq;
import com.jxdinfo.hussar.support.job.core.utils.CommonUtils;
import com.jxdinfo.hussar.support.job.core.utils.SupplierPlus;
import com.jxdinfo.hussar.support.job.execution.common.WorkerRuntime;
import com.jxdinfo.hussar.support.job.execution.common.constants.TaskStatus;
import com.jxdinfo.hussar.support.job.execution.common.utils.AkkaUtils;
import com.jxdinfo.hussar.support.job.execution.common.utils.SpringUtils;
import com.jxdinfo.hussar.support.job.execution.container.OmsContainer;
import com.jxdinfo.hussar.support.job.execution.container.OmsContainerFactory;
import com.jxdinfo.hussar.support.job.execution.core.ProcessorBeanFactory;
import com.jxdinfo.hussar.support.job.execution.core.executor.ProcessorRunnable;
import com.jxdinfo.hussar.support.job.execution.core.processor.sdk.BasicProcessor;
import com.jxdinfo.hussar.support.job.execution.core.tracker.singlemodel.SingleTaskTrackerActor;
import com.jxdinfo.hussar.support.job.execution.log.OmsLogger;
import com.jxdinfo.hussar.support.job.execution.log.impl.OmsServerLogger;
import com.jxdinfo.hussar.support.job.execution.support.ProcessorListener;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException
    */
/*  JADX ERROR: NullPointerException in pass: ProcessKotlinInternals
    java.lang.NullPointerException
    */
/* loaded from: input_file:BOOT-INF/lib/hussar-job-execution-9.0.0-poc-donghang-beta.2.jar:com/jxdinfo/hussar/support/job/execution/core/tracker/processor/ProcessorTracker.class */
public class ProcessorTracker {
    private static Logger log = LoggerFactory.getLogger((Class<?>) ProcessorTracker.class);
    private long startTime;
    private WorkerRuntime workerRuntime;
    private InstanceInfo instanceInfo;
    private Long instanceId;
    private BasicProcessor processor;
    private OmsContainer omsContainer;
    private OmsLogger omsLogger;
    private Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue;
    private long lastIdleTime;
    private long lastCompletedTaskCount;
    private String taskTrackerAddress;
    private ActorSelection taskTrackerActorRef;
    private ThreadPoolExecutor threadPool;
    private ScheduledExecutorService timingPool;
    private static final int THREAD_POOL_QUEUE_MAX_SIZE = 128;
    private static final long MAX_IDLE_TIME = 120000;
    private boolean lethal;
    private String lethalReason;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/hussar-job-execution-9.0.0-poc-donghang-beta.2.jar:com/jxdinfo/hussar/support/job/execution/core/tracker/processor/ProcessorTracker$CheckerAndReporter.class */
    public class CheckerAndReporter implements Runnable {
        private CheckerAndReporter() {
        }

        /*  JADX ERROR: JadxRuntimeException in pass: InlineMethods
            jadx.core.utils.exceptions.JadxRuntimeException: Failed to process method for inline: com.jxdinfo.hussar.support.job.execution.core.tracker.processor.ProcessorTracker.access$702(com.jxdinfo.hussar.support.job.execution.core.tracker.processor.ProcessorTracker, long):long
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:74)
            	at jadx.core.dex.visitors.InlineMethods.visit(InlineMethods.java:49)
            Caused by: jadx.core.utils.exceptions.JadxRuntimeException: Class not yet loaded at codegen stage: com.jxdinfo.hussar.support.job.execution.core.tracker.processor.ProcessorTracker
            	at jadx.core.dex.nodes.ClassNode.reloadAtCodegenStage(ClassNode.java:883)
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:66)
            	... 1 more
            */
        @Override // java.lang.Runnable
        public void run() {
            /*
                Method dump skipped, instructions count: 510
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: com.jxdinfo.hussar.support.job.execution.core.tracker.processor.ProcessorTracker.CheckerAndReporter.run():void");
        }
    }

    public ProcessorTracker(TaskTrackerStartTaskReq taskTrackerStartTaskReq, WorkerRuntime workerRuntime) {
        this.lethal = false;
        try {
            this.startTime = System.currentTimeMillis();
            this.workerRuntime = workerRuntime;
            this.instanceInfo = taskTrackerStartTaskReq.getInstanceInfo();
            this.instanceId = taskTrackerStartTaskReq.getInstanceInfo().getInstanceId();
            this.taskTrackerAddress = taskTrackerStartTaskReq.getTaskTrackerAddress();
            String akkaWorkerPath = AkkaUtils.getAkkaWorkerPath(this.taskTrackerAddress, RemoteConstant.TASK_TRACKER_ACTOR_NAME);
            if (!workerRuntime.isSingleModel()) {
                this.taskTrackerActorRef = workerRuntime.getActorSystem().actorSelection(akkaWorkerPath);
            }
            this.omsLogger = new OmsServerLogger(this.instanceId.longValue(), workerRuntime.getOmsLogHandler());
            this.statusReportRetryQueue = Queues.newLinkedBlockingQueue();
            this.lastIdleTime = -1L;
            this.lastCompletedTaskCount = 0L;
            initThreadPool();
            initTimingJob();
            initProcessor();
            log.info("[ProcessorTracker-{}] ProcessorTracker was successfully created!", this.instanceId);
        } catch (Throwable th) {
            log.error("[ProcessorTracker-{}] create ProcessorTracker failed, all tasks submitted here will fail.", this.instanceId, th);
            this.lethal = true;
            this.lethalReason = ExceptionUtils.getMessage(th);
        }
    }

    public void submitTask(TaskDO taskDO) {
        if (this.lethal) {
            ProcessorReportTaskStatusReq reportTime = new ProcessorReportTaskStatusReq().setInstanceId(this.instanceId).setSubInstanceId(taskDO.getSubInstanceId()).setTaskId(taskDO.getTaskId()).setStatus(TaskStatus.WORKER_PROCESS_FAILED.getValue()).setResult(this.lethalReason).setReportTime(System.currentTimeMillis());
            if (this.workerRuntime.isSingleModel()) {
                SingleTaskTrackerActor.tell(reportTime);
                return;
            } else {
                this.taskTrackerActorRef.tell(reportTime, null);
                return;
            }
        }
        boolean z = false;
        taskDO.setInstanceId(this.instanceInfo.getInstanceId());
        taskDO.setAddress(this.taskTrackerAddress);
        try {
            this.threadPool.submit(new ProcessorRunnable(this.instanceInfo, this.taskTrackerActorRef, taskDO, this.processor, this.omsLogger, this.omsContainer == null ? getClass().getClassLoader() : this.omsContainer.getContainerClassLoader(), this.statusReportRetryQueue, this.workerRuntime, (ProcessorListener) SpringContextUtil.getBean(ProcessorListener.class, true)));
            z = true;
        } catch (RejectedExecutionException e) {
            log.warn("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed due to ThreadPool has too much task waiting to process, this task will dispatch to other ProcessorTracker.", this.instanceId, taskDO.getTaskId(), taskDO.getTaskName());
        } catch (Exception e2) {
            log.error("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed.", this.instanceId, taskDO.getTaskId(), taskDO.getTaskName(), e2);
        }
        if (z) {
            ProcessorReportTaskStatusReq processorReportTaskStatusReq = new ProcessorReportTaskStatusReq();
            processorReportTaskStatusReq.setInstanceId(this.instanceId);
            processorReportTaskStatusReq.setSubInstanceId(taskDO.getSubInstanceId());
            processorReportTaskStatusReq.setTaskId(taskDO.getTaskId());
            processorReportTaskStatusReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue());
            processorReportTaskStatusReq.setReportTime(System.currentTimeMillis());
            if (this.workerRuntime.isSingleModel()) {
                SingleTaskTrackerActor.tell(processorReportTaskStatusReq);
            } else {
                this.taskTrackerActorRef.tell(processorReportTaskStatusReq, null);
            }
            log.debug("[ProcessorTracker-{}] submit task(taskId={}, taskName={}) success, current queue size: {}.", this.instanceId, taskDO.getTaskId(), taskDO.getTaskName(), Integer.valueOf(this.threadPool.getQueue().size()));
        }
    }

    public void destroy() {
        if (this.omsContainer != null) {
            this.omsContainer.tryRelease();
        }
        CommonUtils.executeIgnoreException(() -> {
            List<Runnable> shutdownNow = this.threadPool.shutdownNow();
            if (CollectionUtils.isEmpty(shutdownNow)) {
                return;
            }
            log.warn("[ProcessorTracker-{}] shutdown threadPool now and stop {} tasks.", this.instanceId, Integer.valueOf(shutdownNow.size()));
        });
        this.taskTrackerActorRef = null;
        this.statusReportRetryQueue.clear();
        ProcessorTrackerPool.removeProcessorTracker(this.instanceId);
        log.info("[ProcessorTracker-{}] ProcessorTracker destroyed successfully!", this.instanceId);
        CommonUtils.executeIgnoreException((SupplierPlus<?>) () -> {
            return this.timingPool.shutdownNow();
        });
    }

    private void initThreadPool() {
        int calThreadPoolSize = calThreadPoolSize();
        this.threadPool = new ThreadPoolExecutor(calThreadPoolSize, calThreadPoolSize, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue(128), new ThreadFactoryBuilder().setNameFormat("PPP-%d").build(), new ThreadPoolExecutor.AbortPolicy());
        this.threadPool.allowCoreThreadTimeOut(true);
    }

    private void initTimingJob() {
        this.timingPool = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("PPT-%d").build());
        this.timingPool.scheduleAtFixedRate(new CheckerAndReporter(), 0L, 10L, TimeUnit.SECONDS);
    }

    private void initProcessor() throws Exception {
        ProcessorType valueOf = ProcessorType.valueOf(this.instanceInfo.getProcessorType());
        String processorInfo = this.instanceInfo.getProcessorInfo();
        switch (valueOf) {
            case BUILT_IN:
                if (SpringUtils.supportSpringBean()) {
                    try {
                        this.processor = (BasicProcessor) SpringUtils.getBean(processorInfo);
                    } catch (Exception e) {
                        log.warn("[ProcessorTracker-{}] no spring bean of processor(className={}), reason is {}.", this.instanceId, processorInfo, ExceptionUtils.getMessage(e));
                    }
                }
                if (this.processor == null) {
                    this.processor = ProcessorBeanFactory.getInstance().getLocalProcessor(processorInfo);
                    break;
                }
                break;
            case EXTERNAL:
                String[] split = processorInfo.split("#");
                log.info("[ProcessorTracker-{}] try to load processor({}) in container({})", this.instanceId, split[1], split[0]);
                this.omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(split[0]), this.workerRuntime.getActorSystem().actorSelection(AkkaUtils.getServerActorPath(this.workerRuntime.getServerDiscoveryService().getCurrentServerAddress())));
                if (this.omsContainer == null) {
                    log.warn("[ProcessorTracker-{}] load container failed.", this.instanceId);
                    break;
                } else {
                    this.processor = this.omsContainer.getProcessor(split[1]);
                    break;
                }
            default:
                log.warn("[ProcessorTracker-{}] unknown processor type: {}.", this.instanceId, valueOf);
                throw new JobRuntimeException("unknown processor type of " + valueOf);
        }
        if (this.processor == null) {
            log.warn("[ProcessorTracker-{}] fetch Processor(type={},info={}) failed.", this.instanceId, valueOf, processorInfo);
            throw new JobRuntimeException("fetch Processor failed, please check your processorType and processorInfo config");
        }
    }

    private int calThreadPoolSize() {
        ExecuteType valueOf = ExecuteType.valueOf(this.instanceInfo.getExecuteType());
        ProcessorType valueOf2 = ProcessorType.valueOf(this.instanceInfo.getProcessorType());
        if (valueOf2 == ProcessorType.PYTHON || valueOf2 == ProcessorType.SHELL) {
            return 1;
        }
        if (valueOf == ExecuteType.MAP_REDUCE || valueOf == ExecuteType.MAP) {
            return this.instanceInfo.getThreadConcurrency();
        }
        if (TimeExpressionType.frequentTypes.contains(Integer.valueOf(this.instanceInfo.getTimeExpressionType()))) {
            return this.instanceInfo.getThreadConcurrency();
        }
        return 2;
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: com.jxdinfo.hussar.support.job.execution.core.tracker.processor.ProcessorTracker.access$702(com.jxdinfo.hussar.support.job.execution.core.tracker.processor.ProcessorTracker, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$702(com.jxdinfo.hussar.support.job.execution.core.tracker.processor.ProcessorTracker r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.lastIdleTime = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: com.jxdinfo.hussar.support.job.execution.core.tracker.processor.ProcessorTracker.access$702(com.jxdinfo.hussar.support.job.execution.core.tracker.processor.ProcessorTracker, long):long");
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: com.jxdinfo.hussar.support.job.execution.core.tracker.processor.ProcessorTracker.access$602(com.jxdinfo.hussar.support.job.execution.core.tracker.processor.ProcessorTracker, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$602(com.jxdinfo.hussar.support.job.execution.core.tracker.processor.ProcessorTracker r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.lastCompletedTaskCount = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: com.jxdinfo.hussar.support.job.execution.core.tracker.processor.ProcessorTracker.access$602(com.jxdinfo.hussar.support.job.execution.core.tracker.processor.ProcessorTracker, long):long");
    }

    static /* synthetic */ WorkerRuntime access$800(ProcessorTracker processorTracker) {
        return processorTracker.workerRuntime;
    }

    static /* synthetic */ ActorSelection access$900(ProcessorTracker processorTracker) {
        return processorTracker.taskTrackerActorRef;
    }

    static /* synthetic */ Queue access$1000(ProcessorTracker processorTracker) {
        return processorTracker.statusReportRetryQueue;
    }

    static {
    }
}
