/*
 * Decompiled with CFR 0.152.
 */
package com.jxdinfo.hussar.support.job.execution.core.tracker.task;

import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.jxdinfo.hussar.platform.core.utils.HussarUtils;
import com.jxdinfo.hussar.support.job.core.JobSerializable;
import com.jxdinfo.hussar.support.job.core.enums.ExecuteType;
import com.jxdinfo.hussar.support.job.core.enums.InstanceStatus;
import com.jxdinfo.hussar.support.job.core.exception.JobRuntimeException;
import com.jxdinfo.hussar.support.job.core.execution.model.TaskDO;
import com.jxdinfo.hussar.support.job.core.model.InstanceDetail;
import com.jxdinfo.hussar.support.job.core.request.ServerScheduleJobReq;
import com.jxdinfo.hussar.support.job.core.request.TaskTrackerReportInstanceStatusReq;
import com.jxdinfo.hussar.support.job.core.response.AskResponse;
import com.jxdinfo.hussar.support.job.execution.common.WorkerRuntime;
import com.jxdinfo.hussar.support.job.execution.common.constants.TaskStatus;
import com.jxdinfo.hussar.support.job.execution.common.utils.AkkaUtils;
import com.jxdinfo.hussar.support.job.execution.core.tracker.singlemodel.SingleServerTrackerActor;
import com.jxdinfo.hussar.support.job.execution.core.tracker.task.TaskTracker;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

public class CommonTaskTracker
extends TaskTracker {
    private static Logger log = LoggerFactory.getLogger(CommonTaskTracker.class);
    private int reportFailedCnt = 0;
    public static final String ROOT_TASK_ID = "0";
    public static final String LAST_TASK_ID = "9999";
    private static final int MAX_REPORT_FAILED_THRESHOLD = 5;

    protected CommonTaskTracker(ServerScheduleJobReq req, WorkerRuntime workerRuntime) {
        super(req, workerRuntime);
    }

    @Override
    protected void initTaskTracker(ServerScheduleJobReq req) {
        String poolName = String.format("ctttp-%d", req.getInstanceId()) + "-%d";
        ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat(poolName).build();
        this.scheduledPool = Executors.newScheduledThreadPool(2, factory);
        this.persistenceRootTask();
        int delay = Integer.parseInt(System.getProperty("hussarjob.worker.status-check.normal.period", "13"));
        this.scheduledPool.scheduleWithFixedDelay(new StatusCheckRunnable(), 3L, delay, TimeUnit.SECONDS);
        ExecuteType executeType = ExecuteType.valueOf((String)req.getExecuteType());
        if (executeType == ExecuteType.MAP || executeType == ExecuteType.MAP_REDUCE) {
            this.scheduledPool.scheduleAtFixedRate(new TaskTracker.WorkerDetector(), 1L, 1L, TimeUnit.MINUTES);
        }
        this.scheduledPool.scheduleWithFixedDelay(new TaskTracker.Dispatcher(), 10L, 5000L, TimeUnit.MILLISECONDS);
    }

    @Override
    public InstanceDetail fetchRunningStatus() {
        InstanceDetail detail = new InstanceDetail();
        detail.setActualTriggerTime(Long.valueOf(this.createTime));
        detail.setStatus(Integer.valueOf(InstanceStatus.RUNNING.getV()));
        detail.setTaskTrackerAddress(this.workerRuntime.getWorkerAddress());
        TaskTracker.InstanceStatisticsHolder holder = this.getInstanceStatisticsHolder(this.instanceId);
        InstanceDetail.TaskDetail taskDetail = new InstanceDetail.TaskDetail();
        taskDetail.setSucceedTaskNum(holder.succeedNum);
        taskDetail.setFailedTaskNum(holder.failedNum);
        taskDetail.setTotalTaskNum(holder.getTotalTaskNum());
        detail.setTaskDetail(taskDetail);
        return detail;
    }

    public boolean isTimeout() {
        return System.currentTimeMillis() - this.createTime > this.instanceInfo.getInstanceTimeoutMS();
    }

    private void persistenceRootTask() {
        TaskDO rootTask = new TaskDO();
        rootTask.setStatus(Integer.valueOf(TaskStatus.WAITING_DISPATCH.getValue()));
        rootTask.setInstanceId(this.instanceInfo.getInstanceId());
        rootTask.setTaskId(ROOT_TASK_ID);
        rootTask.setFailedCnt(Integer.valueOf(0));
        rootTask.setAddress(this.workerRuntime.getWorkerAddress());
        rootTask.setTaskName("OMS_ROOT_TASK");
        rootTask.setCreatedTime(Long.valueOf(System.currentTimeMillis()));
        rootTask.setLastModifiedTime(Long.valueOf(System.currentTimeMillis()));
        rootTask.setLastReportTime(Long.valueOf(-1L));
        rootTask.setSubInstanceId(Long.valueOf(this.instanceId));
        if (!this.taskPersistenceService.save(rootTask)) {
            log.error("[TaskTracker-{}] create root task failed.", (Object)this.instanceId);
            throw new JobRuntimeException("create root task failed for instance: " + this.instanceId);
        }
        log.info("[TaskTracker-{}] create root task successfully.", (Object)this.instanceId);
    }

    public String toString() {
        return "CommonTaskTracker{reportFailedCnt=" + this.reportFailedCnt + ", createTime=" + this.createTime + ", workerRuntime=" + this.workerRuntime + ", instanceId=" + this.instanceId + ", instanceInfo=" + this.instanceInfo + ", ptStatusHolder=" + this.ptStatusHolder + ", taskPersistenceService=" + this.taskPersistenceService + ", scheduledPool=" + this.scheduledPool + ", finished=" + this.finished + ", appendedWfContext=" + this.appendedWfContext + '}';
    }

    private class StatusCheckRunnable
    implements Runnable {
        private static final long DISPATCH_TIME_OUT_MS = 15000L;

        private StatusCheckRunnable() {
        }

        private void innerRun() {
            List<String> disconnectedPTs;
            TaskTracker.InstanceStatisticsHolder holder = CommonTaskTracker.this.getInstanceStatisticsHolder(CommonTaskTracker.this.instanceId);
            long finishedNum = holder.succeedNum + holder.failedNum;
            long unfinishedNum = holder.waitingDispatchNum + holder.workerUnreceivedNum + holder.receivedNum + holder.runningNum;
            log.debug("[TaskTracker-{}] status check result: {}", (Object)CommonTaskTracker.this.instanceId, (Object)holder);
            TaskTrackerReportInstanceStatusReq req = new TaskTrackerReportInstanceStatusReq();
            req.setJobId(CommonTaskTracker.this.instanceInfo.getJobId());
            req.setInstanceId(Long.valueOf(CommonTaskTracker.this.instanceId));
            req.setWfInstanceId(CommonTaskTracker.this.instanceInfo.getWfInstanceId());
            req.setTotalTaskNum(finishedNum + unfinishedNum);
            req.setSucceedTaskNum(holder.succeedNum);
            req.setFailedTaskNum(holder.failedNum);
            req.setReportTime(System.currentTimeMillis());
            req.setStartTime(CommonTaskTracker.this.createTime);
            req.setSourceAddress(CommonTaskTracker.this.workerRuntime.getWorkerAddress());
            boolean success = false;
            String result = null;
            if (unfinishedNum == 0L) {
                if (finishedNum == 0L) {
                    CommonTaskTracker.this.finished.set(true);
                    result = "create root task failed";
                } else {
                    ExecuteType executeType = ExecuteType.valueOf((String)CommonTaskTracker.this.instanceInfo.getExecuteType());
                    switch (executeType) {
                        case STANDALONE: {
                            CommonTaskTracker.this.finished.set(true);
                            List<TaskDO> allTask = CommonTaskTracker.this.taskPersistenceService.getAllTask(CommonTaskTracker.this.instanceId, CommonTaskTracker.this.instanceId);
                            if (CollectionUtils.isEmpty(allTask) || allTask.size() > 1) {
                                success = false;
                                result = "unknown bug";
                                log.warn("[TaskTracker-{}] there must have some bug in TaskTracker.", (Object)CommonTaskTracker.this.instanceId);
                                break;
                            }
                            result = allTask.get(0).getResult();
                            success = allTask.get(0).getStatus().intValue() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue();
                            break;
                        }
                        case MAP: {
                            CommonTaskTracker.this.finished.set(true);
                            success = holder.failedNum == 0L;
                            result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.succeedNum, holder.failedNum);
                            break;
                        }
                        default: {
                            Optional<TaskDO> lastTaskOptional = CommonTaskTracker.this.taskPersistenceService.getLastTask(CommonTaskTracker.this.instanceId, CommonTaskTracker.this.instanceId);
                            if (lastTaskOptional.isPresent()) {
                                TaskDO resultTask = lastTaskOptional.get();
                                TaskStatus lastTaskStatus = TaskStatus.of(resultTask.getStatus());
                                if (lastTaskStatus != TaskStatus.WORKER_PROCESS_SUCCESS && lastTaskStatus != TaskStatus.WORKER_PROCESS_FAILED) break;
                                CommonTaskTracker.this.finished.set(true);
                                success = lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS;
                                result = resultTask.getResult();
                                break;
                            }
                            TaskDO newLastTask = new TaskDO();
                            newLastTask.setTaskName("OMS_LAST_TASK");
                            newLastTask.setTaskId(CommonTaskTracker.LAST_TASK_ID);
                            newLastTask.setSubInstanceId(Long.valueOf(CommonTaskTracker.this.instanceId));
                            newLastTask.setAddress(CommonTaskTracker.this.workerRuntime.getWorkerAddress());
                            CommonTaskTracker.this.submitTask(Lists.newArrayList((Object[])new TaskDO[]{newLastTask}));
                        }
                    }
                }
            }
            if (CommonTaskTracker.this.isTimeout()) {
                CommonTaskTracker.this.finished.set(true);
                success = false;
                result = "instance execute timeout";
            }
            String serverPath = AkkaUtils.getServerActorPath(CommonTaskTracker.this.workerRuntime.getServerDiscoveryService().getCurrentServerAddress());
            ActorSelection serverActor = null;
            if (CommonTaskTracker.this.finished.get()) {
                req.setResult(result);
                req.setAppendedWfContext(CommonTaskTracker.this.appendedWfContext);
                req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV());
                boolean serverAccepted = false;
                if (CommonTaskTracker.this.workerRuntime.isSingleModel()) {
                    try {
                        SingleServerTrackerActor.tell((JobSerializable)req);
                        serverAccepted = true;
                    }
                    catch (Exception e) {
                        log.warn("[\u5355\u673a\u6a21\u5f0f-TaskTracker-{}] report finished status failed, result={}.", new Object[]{CommonTaskTracker.this.instanceId, result, e});
                        serverAccepted = false;
                    }
                } else {
                    serverActor = CommonTaskTracker.this.workerRuntime.getActorSystem().actorSelection(serverPath);
                    CompletionStage askCS = Patterns.ask((ActorSelection)serverActor, (Object)req, (Duration)Duration.ofMillis(5000L));
                    try {
                        AskResponse askResponse = (AskResponse)askCS.toCompletableFuture().get(5000L, TimeUnit.MILLISECONDS);
                        serverAccepted = askResponse.isSuccess();
                    }
                    catch (Exception e) {
                        log.warn("[TaskTracker-{}] report finished status failed, result={}.", new Object[]{CommonTaskTracker.this.instanceId, result, e});
                    }
                }
                if (!serverAccepted) {
                    if (++CommonTaskTracker.this.reportFailedCnt > 5) {
                        log.error("[TaskTracker-{}] try to report finished status(success={}, result={}) lots of times but all failed, it's time to give up, so the process result will be dropped", new Object[]{CommonTaskTracker.this.instanceId, success, result});
                        CommonTaskTracker.this.destroy();
                    }
                    return;
                }
                log.info("[TaskTracker-{}] instance process finished,result = {}, start to release resource...", (Object)CommonTaskTracker.this.instanceId, (Object)result);
                CommonTaskTracker.this.destroy();
                return;
            }
            req.setInstanceStatus(InstanceStatus.RUNNING.getV());
            if (CommonTaskTracker.this.workerRuntime.isSingleModel()) {
                SingleServerTrackerActor.tell((JobSerializable)req);
            } else {
                try {
                    if (HussarUtils.isEmpty(serverActor)) {
                        serverActor = CommonTaskTracker.this.workerRuntime.getActorSystem().actorSelection(serverPath);
                    }
                    serverActor.tell((Object)req, null);
                }
                catch (Exception e) {
                    log.warn("[TaskTracker-{}] report running status failed, req={}.", new Object[]{CommonTaskTracker.this.instanceId, req, e});
                }
            }
            long currentMS = System.currentTimeMillis();
            if (holder.workerUnreceivedNum != 0L) {
                CommonTaskTracker.this.taskPersistenceService.getTaskByStatus(CommonTaskTracker.this.instanceId, TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 100).forEach(uncheckTask -> {
                    long elapsedTime = currentMS - uncheckTask.getLastModifiedTime();
                    if (elapsedTime > 15000L) {
                        TaskDO updateEntity = new TaskDO();
                        updateEntity.setStatus(Integer.valueOf(TaskStatus.WAITING_DISPATCH.getValue()));
                        if (!"OMS_LAST_TASK".equals(uncheckTask.getTaskName())) {
                            updateEntity.setAddress("N/A");
                        }
                        updateEntity.setFailedCnt(Integer.valueOf(uncheckTask.getFailedCnt() + 1));
                        CommonTaskTracker.this.taskPersistenceService.updateTask(CommonTaskTracker.this.instanceId, uncheckTask.getTaskId(), updateEntity);
                        log.warn("[TaskTracker-{}] task(id={},name={}) try to dispatch again due to unreceived the response from ProcessorTracker.", new Object[]{CommonTaskTracker.this.instanceId, uncheckTask.getTaskId(), uncheckTask.getTaskName()});
                    }
                });
            }
            if (!(disconnectedPTs = CommonTaskTracker.this.ptStatusHolder.getAllDisconnectedProcessorTrackers()).isEmpty()) {
                log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", (Object)CommonTaskTracker.this.instanceId, disconnectedPTs);
                if (CommonTaskTracker.this.taskPersistenceService.updateLostTasks(CommonTaskTracker.this.instanceId, disconnectedPTs, true)) {
                    CommonTaskTracker.this.ptStatusHolder.remove(disconnectedPTs);
                    log.warn("[TaskTracker-{}] removed these ProcessorTracker from StatusHolder: {}", (Object)CommonTaskTracker.this.instanceId, disconnectedPTs);
                }
            }
        }

        @Override
        public void run() {
            try {
                this.innerRun();
            }
            catch (Exception e) {
                log.warn("[TaskTracker-{}] status checker execute failed, please fix the bug (@tjq)!", (Object)CommonTaskTracker.this.instanceId, (Object)e);
            }
        }
    }
}

