package com.zmops.zeus.server.transfer.core.task;

import com.zmops.zeus.server.transfer.common.AbstractDaemon;
import com.zmops.zeus.server.transfer.common.TransferThreadFactory;
import com.zmops.zeus.server.transfer.conf.TransferConfiguration;
import com.zmops.zeus.server.transfer.conf.TransferConstants;
import com.zmops.zeus.server.transfer.core.TransferManager;
import com.zmops.zeus.server.transfer.core.job.JobManager;
import com.zmops.zeus.server.transfer.utils.TransferUtils;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/zmops/zeus/server/transfer/core/task/TaskManager.class */
public class TaskManager extends AbstractDaemon {
    private static final Logger LOGGER = LoggerFactory.getLogger(JobManager.class);
    private final TransferManager transferManager;
    private final BlockingQueue<TaskWrapper> retryTasks;
    private final int monitorInterval;
    private final int taskMaxCapacity;
    private final int taskRetryMaxTime;
    private final long waitTime;
    private final ThreadPoolExecutor runningPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue(), new TransferThreadFactory("task"));
    private final TaskMetrics taskMetrics = TaskMetrics.create();
    private final ConcurrentHashMap<String, TaskWrapper> tasks = new ConcurrentHashMap<>();

    public TaskManager(TransferManager transferManager) {
        this.transferManager = transferManager;
        TransferConfiguration agentConf = TransferConfiguration.getAgentConf();
        this.retryTasks = new LinkedBlockingQueue(agentConf.getInt(TransferConstants.TASK_RETRY_MAX_CAPACITY, 10000));
        this.monitorInterval = agentConf.getInt(TransferConstants.TASK_MONITOR_INTERVAL, 6);
        this.taskRetryMaxTime = agentConf.getInt(TransferConstants.TASK_RETRY_SUBMIT_WAIT_SECONDS, 5);
        this.taskMaxCapacity = agentConf.getInt(TransferConstants.TASK_RETRY_MAX_CAPACITY, 10000);
        this.waitTime = agentConf.getLong(TransferConstants.THREAD_POOL_AWAIT_TIME, 300L);
    }

    public TaskMetrics getTaskMetrics() {
        return this.taskMetrics;
    }

    public TaskWrapper getTaskWrapper(String str) {
        return this.tasks.get(str);
    }

    public void submitTask(Task task) {
        submitTask(new TaskWrapper(this.transferManager, task));
    }

    public void submitTask(TaskWrapper taskWrapper) {
        if (this.tasks.putIfAbsent(taskWrapper.getTask().getTaskId(), taskWrapper) == null) {
            boolean z = true;
            while (z) {
                try {
                    this.runningPool.submit(taskWrapper);
                    z = false;
                } catch (Exception e) {
                    TransferUtils.silenceSleepInMs(this.waitTime);
                    LOGGER.warn("reject task {}", taskWrapper.getTask().getTaskId(), e);
                }
            }
            this.taskMetrics.runningTasks.incr();
        }
    }

    private boolean addRetryTask(TaskWrapper taskWrapper) {
        LOGGER.info("retry submit task {}", taskWrapper.getTask().getTaskId());
        try {
            boolean offer = this.retryTasks.offer(taskWrapper, this.taskRetryMaxTime, TimeUnit.SECONDS);
            if (offer) {
                this.taskMetrics.retryingTasks.incr();
            } else {
                LOGGER.error("cannot submit to retry queue, max {}, current {}", Integer.valueOf(this.taskMaxCapacity), Integer.valueOf(this.retryTasks.size()));
            }
            return offer;
        } catch (Exception e) {
            LOGGER.error("error while offer task", e);
            return false;
        }
    }

    public boolean isTaskFinished(String str) {
        TaskWrapper taskWrapper = this.tasks.get(str);
        if (taskWrapper != null) {
            return taskWrapper.isFinished();
        }
        return false;
    }

    public boolean isTaskSuccess(String str) {
        TaskWrapper taskWrapper = this.tasks.get(str);
        if (taskWrapper != null) {
            return taskWrapper.isSuccess();
        }
        return false;
    }

    public void removeTask(String str) {
        this.taskMetrics.runningTasks.decr();
        TaskWrapper remove = this.tasks.remove(str);
        if (remove != null) {
            remove.waitForFinish();
        }
    }

    public boolean killTask(Task task) {
        TaskWrapper taskWrapper = this.tasks.get(task.getTaskId());
        if (taskWrapper == null) {
            return false;
        }
        taskWrapper.kill();
        return true;
    }

    public Runnable createTaskMonitorThread() {
        return () -> {
            while (isRunnable()) {
                try {
                    Iterator it = this.tasks.keySet().iterator();
                    while (it.hasNext()) {
                        String str = (String) it.next();
                        TaskWrapper taskWrapper = this.tasks.get(str);
                        if (taskWrapper != null && taskWrapper.isFailed() && taskWrapper.shouldRetry() && addRetryTask(taskWrapper)) {
                            removeTask(str);
                        }
                    }
                    while (!this.retryTasks.isEmpty()) {
                        TaskWrapper poll = this.retryTasks.poll();
                        if (poll != null) {
                            this.taskMetrics.retryingTasks.decr();
                            submitTask(poll);
                        }
                    }
                    TimeUnit.SECONDS.sleep(this.monitorInterval);
                } catch (Exception e) {
                    LOGGER.error("Exception caught", e);
                }
            }
        };
    }

    @Override // com.zmops.zeus.server.transfer.common.Service
    public void start() {
        submitWorker(createTaskMonitorThread());
    }

    @Override // com.zmops.zeus.server.transfer.common.Service
    public void stop() throws Exception {
        waitForTerminate();
        this.runningPool.shutdown();
    }
}
