/*
 * Decompiled with CFR 0.152.
 */
package com.jxdinfo.hussar.support.job.execution.core.tracker.processor;

import akka.actor.ActorSelection;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.jxdinfo.hussar.platform.core.utils.SpringContextUtil;
import com.jxdinfo.hussar.support.job.core.JobSerializable;
import com.jxdinfo.hussar.support.job.core.enums.ExecuteType;
import com.jxdinfo.hussar.support.job.core.enums.ProcessorType;
import com.jxdinfo.hussar.support.job.core.enums.TimeExpressionType;
import com.jxdinfo.hussar.support.job.core.exception.JobRuntimeException;
import com.jxdinfo.hussar.support.job.core.execution.model.InstanceInfo;
import com.jxdinfo.hussar.support.job.core.execution.model.TaskDO;
import com.jxdinfo.hussar.support.job.core.execution.request.ProcessorReportTaskStatusReq;
import com.jxdinfo.hussar.support.job.core.execution.request.ProcessorTrackerStatusReportReq;
import com.jxdinfo.hussar.support.job.core.execution.request.TaskTrackerStartTaskReq;
import com.jxdinfo.hussar.support.job.core.utils.CommonUtils;
import com.jxdinfo.hussar.support.job.execution.common.WorkerRuntime;
import com.jxdinfo.hussar.support.job.execution.common.constants.TaskStatus;
import com.jxdinfo.hussar.support.job.execution.common.utils.AkkaUtils;
import com.jxdinfo.hussar.support.job.execution.common.utils.SpringUtils;
import com.jxdinfo.hussar.support.job.execution.container.OmsContainer;
import com.jxdinfo.hussar.support.job.execution.container.OmsContainerFactory;
import com.jxdinfo.hussar.support.job.execution.core.ProcessorBeanFactory;
import com.jxdinfo.hussar.support.job.execution.core.executor.ProcessorRunnable;
import com.jxdinfo.hussar.support.job.execution.core.processor.sdk.BasicProcessor;
import com.jxdinfo.hussar.support.job.execution.core.tracker.processor.ProcessorTrackerPool;
import com.jxdinfo.hussar.support.job.execution.core.tracker.singlemodel.SingleTaskTrackerActor;
import com.jxdinfo.hussar.support.job.execution.log.OmsLogger;
import com.jxdinfo.hussar.support.job.execution.log.impl.OmsServerLogger;
import com.jxdinfo.hussar.support.job.execution.support.ProcessorListener;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

public class ProcessorTracker {
    private static Logger log = LoggerFactory.getLogger(ProcessorTracker.class);
    private long startTime;
    private WorkerRuntime workerRuntime;
    private InstanceInfo instanceInfo;
    private Long instanceId;
    private BasicProcessor processor;
    private OmsContainer omsContainer;
    private OmsLogger omsLogger;
    private Queue<ProcessorReportTaskStatusReq> statusReportRetryQueue;
    private long lastIdleTime;
    private long lastCompletedTaskCount;
    private String taskTrackerAddress;
    private ActorSelection taskTrackerActorRef;
    private ThreadPoolExecutor threadPool;
    private ScheduledExecutorService timingPool;
    private static final int THREAD_POOL_QUEUE_MAX_SIZE = 128;
    private static final long MAX_IDLE_TIME = 120000L;
    private boolean lethal = false;
    private String lethalReason;

    public ProcessorTracker(TaskTrackerStartTaskReq request, WorkerRuntime workerRuntime) {
        try {
            this.startTime = System.currentTimeMillis();
            this.workerRuntime = workerRuntime;
            this.instanceInfo = request.getInstanceInfo();
            this.instanceId = request.getInstanceInfo().getInstanceId();
            this.taskTrackerAddress = request.getTaskTrackerAddress();
            String akkaRemotePath = AkkaUtils.getAkkaWorkerPath(this.taskTrackerAddress, "task_tracker");
            if (!workerRuntime.isSingleModel()) {
                this.taskTrackerActorRef = workerRuntime.getActorSystem().actorSelection(akkaRemotePath);
            }
            this.omsLogger = new OmsServerLogger(this.instanceId, workerRuntime.getOmsLogHandler());
            this.statusReportRetryQueue = Queues.newLinkedBlockingQueue();
            this.lastIdleTime = -1L;
            this.lastCompletedTaskCount = 0L;
            this.initThreadPool();
            this.initTimingJob();
            this.initProcessor();
            log.info("[ProcessorTracker-{}] ProcessorTracker was successfully created!", (Object)this.instanceId);
        }
        catch (Throwable t) {
            log.error("[ProcessorTracker-{}] create ProcessorTracker failed, all tasks submitted here will fail.", (Object)this.instanceId, (Object)t);
            this.lethal = true;
            this.lethalReason = ExceptionUtils.getMessage((Throwable)t);
        }
    }

    public void submitTask(TaskDO newTask) {
        if (this.lethal) {
            ProcessorReportTaskStatusReq report = new ProcessorReportTaskStatusReq().setInstanceId(this.instanceId).setSubInstanceId(newTask.getSubInstanceId()).setTaskId(newTask.getTaskId()).setStatus(TaskStatus.WORKER_PROCESS_FAILED.getValue()).setResult(this.lethalReason).setReportTime(System.currentTimeMillis());
            if (this.workerRuntime.isSingleModel()) {
                SingleTaskTrackerActor.tell((JobSerializable)report);
            } else {
                this.taskTrackerActorRef.tell((Object)report, null);
            }
            return;
        }
        boolean success = false;
        newTask.setInstanceId(this.instanceInfo.getInstanceId());
        newTask.setAddress(this.taskTrackerAddress);
        ClassLoader classLoader = this.omsContainer == null ? this.getClass().getClassLoader() : this.omsContainer.getContainerClassLoader();
        ProcessorListener processorListener = (ProcessorListener)SpringContextUtil.getBean(ProcessorListener.class, (boolean)true);
        ProcessorRunnable processorRunnable = new ProcessorRunnable(this.instanceInfo, this.taskTrackerActorRef, newTask, this.processor, this.omsLogger, classLoader, this.statusReportRetryQueue, this.workerRuntime, processorListener);
        try {
            this.threadPool.submit(processorRunnable);
            success = true;
        }
        catch (RejectedExecutionException ignore) {
            log.warn("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed due to ThreadPool has too much task waiting to process, this task will dispatch to other ProcessorTracker.", new Object[]{this.instanceId, newTask.getTaskId(), newTask.getTaskName()});
        }
        catch (Exception e) {
            log.error("[ProcessorTracker-{}] submit task(taskId={},taskName={}) to ThreadPool failed.", new Object[]{this.instanceId, newTask.getTaskId(), newTask.getTaskName(), e});
        }
        if (success) {
            ProcessorReportTaskStatusReq reportReq = new ProcessorReportTaskStatusReq();
            reportReq.setInstanceId(this.instanceId);
            reportReq.setSubInstanceId(newTask.getSubInstanceId());
            reportReq.setTaskId(newTask.getTaskId());
            reportReq.setStatus(TaskStatus.WORKER_RECEIVED.getValue());
            reportReq.setReportTime(System.currentTimeMillis());
            if (this.workerRuntime.isSingleModel()) {
                SingleTaskTrackerActor.tell((JobSerializable)reportReq);
            } else {
                this.taskTrackerActorRef.tell((Object)reportReq, null);
            }
            log.debug("[ProcessorTracker-{}] submit task(taskId={}, taskName={}) success, current queue size: {}.", new Object[]{this.instanceId, newTask.getTaskId(), newTask.getTaskName(), this.threadPool.getQueue().size()});
        }
    }

    public void destroy() {
        if (this.omsContainer != null) {
            this.omsContainer.tryRelease();
        }
        CommonUtils.executeIgnoreException(() -> {
            List<Runnable> tasks = this.threadPool.shutdownNow();
            if (!CollectionUtils.isEmpty(tasks)) {
                log.warn("[ProcessorTracker-{}] shutdown threadPool now and stop {} tasks.", (Object)this.instanceId, (Object)tasks.size());
            }
        });
        this.taskTrackerActorRef = null;
        this.statusReportRetryQueue.clear();
        ProcessorTrackerPool.removeProcessorTracker(this.instanceId);
        log.info("[ProcessorTracker-{}] ProcessorTracker destroyed successfully!", (Object)this.instanceId);
        CommonUtils.executeIgnoreException(() -> this.timingPool.shutdownNow());
    }

    private void initThreadPool() {
        int poolSize = this.calThreadPoolSize();
        ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(128);
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("PPP-%d").build();
        ThreadPoolExecutor.AbortPolicy rejectionHandler = new ThreadPoolExecutor.AbortPolicy();
        this.threadPool = new ThreadPoolExecutor(poolSize, poolSize, 60L, TimeUnit.SECONDS, queue, threadFactory, rejectionHandler);
        this.threadPool.allowCoreThreadTimeOut(true);
    }

    private void initTimingJob() {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("PPT-%d").build();
        this.timingPool = Executors.newSingleThreadScheduledExecutor(threadFactory);
        this.timingPool.scheduleAtFixedRate(new CheckerAndReporter(), 0L, 10L, TimeUnit.SECONDS);
    }

    private void initProcessor() throws Exception {
        ProcessorType processorType = ProcessorType.valueOf((String)this.instanceInfo.getProcessorType());
        String processorInfo = this.instanceInfo.getProcessorInfo();
        switch (processorType) {
            case BUILT_IN: {
                if (SpringUtils.supportSpringBean()) {
                    try {
                        this.processor = (BasicProcessor)SpringUtils.getBean(processorInfo);
                    }
                    catch (Exception e) {
                        log.warn("[ProcessorTracker-{}] no spring bean of processor(className={}), reason is {}.", new Object[]{this.instanceId, processorInfo, ExceptionUtils.getMessage((Throwable)e)});
                    }
                }
                if (this.processor != null) break;
                this.processor = ProcessorBeanFactory.getInstance().getLocalProcessor(processorInfo);
                break;
            }
            case EXTERNAL: {
                String[] split = processorInfo.split("#");
                log.info("[ProcessorTracker-{}] try to load processor({}) in container({})", new Object[]{this.instanceId, split[1], split[0]});
                String serverPath = AkkaUtils.getServerActorPath(this.workerRuntime.getServerDiscoveryService().getCurrentServerAddress());
                ActorSelection actorSelection = this.workerRuntime.getActorSystem().actorSelection(serverPath);
                this.omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(split[0]), actorSelection);
                if (this.omsContainer != null) {
                    this.processor = this.omsContainer.getProcessor(split[1]);
                    break;
                }
                log.warn("[ProcessorTracker-{}] load container failed.", (Object)this.instanceId);
                break;
            }
            default: {
                log.warn("[ProcessorTracker-{}] unknown processor type: {}.", (Object)this.instanceId, (Object)processorType);
                throw new JobRuntimeException("unknown processor type of " + processorType);
            }
        }
        if (this.processor == null) {
            log.warn("[ProcessorTracker-{}] fetch Processor(type={},info={}) failed.", new Object[]{this.instanceId, processorType, processorInfo});
            throw new JobRuntimeException("fetch Processor failed, please check your processorType and processorInfo config");
        }
    }

    private int calThreadPoolSize() {
        ExecuteType executeType = ExecuteType.valueOf((String)this.instanceInfo.getExecuteType());
        ProcessorType processorType = ProcessorType.valueOf((String)this.instanceInfo.getProcessorType());
        if (processorType == ProcessorType.PYTHON || processorType == ProcessorType.SHELL) {
            return 1;
        }
        if (executeType == ExecuteType.MAP_REDUCE || executeType == ExecuteType.MAP) {
            return this.instanceInfo.getThreadConcurrency();
        }
        if (TimeExpressionType.frequentTypes.contains(this.instanceInfo.getTimeExpressionType())) {
            return this.instanceInfo.getThreadConcurrency();
        }
        return 2;
    }

    private class CheckerAndReporter
    implements Runnable {
        private CheckerAndReporter() {
        }

        @Override
        public void run() {
            long interval = System.currentTimeMillis() - ProcessorTracker.this.startTime;
            if (!TimeExpressionType.frequentTypes.contains(ProcessorTracker.this.instanceInfo.getTimeExpressionType()) && interval > ProcessorTracker.this.instanceInfo.getInstanceTimeoutMS()) {
                log.warn("[ProcessorTracker-{}] detected instance timeout, maybe TaskTracker's destroy request missed, so try to kill self now.", (Object)ProcessorTracker.this.instanceId);
                ProcessorTracker.this.destroy();
                return;
            }
            if (ProcessorTracker.this.threadPool.getActiveCount() > 0 || ProcessorTracker.this.threadPool.getCompletedTaskCount() > ProcessorTracker.this.lastCompletedTaskCount) {
                ProcessorTracker.this.lastIdleTime = -1L;
                ProcessorTracker.this.lastCompletedTaskCount = ProcessorTracker.this.threadPool.getCompletedTaskCount();
            } else if (ProcessorTracker.this.lastIdleTime == -1L) {
                ProcessorTracker.this.lastIdleTime = System.currentTimeMillis();
            } else {
                long idleTime = System.currentTimeMillis() - ProcessorTracker.this.lastIdleTime;
                if (idleTime > 120000L) {
                    log.warn("[ProcessorTracker-{}] ProcessorTracker have been idle for {}ms, it's time to tell TaskTracker and then destroy self.", (Object)ProcessorTracker.this.instanceId, (Object)idleTime);
                    ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildIdleReport((Long)ProcessorTracker.this.instanceId);
                    statusReportReq.setAddress(ProcessorTracker.this.workerRuntime.getWorkerAddress());
                    if (ProcessorTracker.this.workerRuntime.isSingleModel()) {
                        SingleTaskTrackerActor.tell((JobSerializable)statusReportReq);
                    } else {
                        ProcessorTracker.this.taskTrackerActorRef.tell((Object)statusReportReq, null);
                    }
                    ProcessorTracker.this.destroy();
                    return;
                }
            }
            while (!ProcessorTracker.this.statusReportRetryQueue.isEmpty()) {
                ProcessorReportTaskStatusReq req = (ProcessorReportTaskStatusReq)ProcessorTracker.this.statusReportRetryQueue.poll();
                if (req == null) continue;
                req.setReportTime(System.currentTimeMillis());
                Boolean result = ProcessorTracker.this.workerRuntime.isSingleModel() ? SingleTaskTrackerActor.ask((JobSerializable)req).isSuccess() : AkkaUtils.reliableTransmit(ProcessorTracker.this.taskTrackerActorRef, req);
                if (result.booleanValue()) continue;
                ProcessorTracker.this.statusReportRetryQueue.add(req);
                log.warn("[ProcessorRunnable-{}] retry report finished task status failed: {}", (Object)ProcessorTracker.this.instanceId, (Object)req);
                return;
            }
            long waitingNum = ProcessorTracker.this.threadPool.getQueue().size();
            ProcessorTrackerStatusReportReq statusReportReq = ProcessorTrackerStatusReportReq.buildLoadReport((Long)ProcessorTracker.this.instanceId, (Long)waitingNum);
            statusReportReq.setAddress(ProcessorTracker.this.workerRuntime.getWorkerAddress());
            if (ProcessorTracker.this.workerRuntime.isSingleModel()) {
                SingleTaskTrackerActor.tell((JobSerializable)statusReportReq);
            } else {
                ProcessorTracker.this.taskTrackerActorRef.tell((Object)statusReportReq, null);
            }
            log.debug("[ProcessorTracker-{}] send heartbeat to TaskTracker, current waiting task num is {}.", (Object)ProcessorTracker.this.instanceId, (Object)waitingNum);
        }
    }
}

