package com.zmops.zeus.server.transfer.core.job;

import com.zmops.zeus.server.transfer.common.AbstractDaemon;
import com.zmops.zeus.server.transfer.common.TransferThreadFactory;
import com.zmops.zeus.server.transfer.conf.JobConstants;
import com.zmops.zeus.server.transfer.conf.JobProfile;
import com.zmops.zeus.server.transfer.conf.TransferConfiguration;
import com.zmops.zeus.server.transfer.conf.TransferConstants;
import com.zmops.zeus.server.transfer.core.TransferManager;
import com.zmops.zeus.server.transfer.core.db.JobProfileDb;
import com.zmops.zeus.server.transfer.core.db.StateSearchKey;
import com.zmops.zeus.server.transfer.utils.TransferUtils;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/zmops/zeus/server/transfer/core/job/JobManager.class */
public class JobManager extends AbstractDaemon {
    private static final Logger LOGGER = LoggerFactory.getLogger(JobManager.class);
    private final TransferManager transferManager;
    private final int monitorInterval;
    private final long jobDbCacheTime;
    private final long jobDbCacheCheckInterval;
    private final JobProfileDb jobConfDB;
    private final JobMetrics jobMetrics;
    private final AtomicLong index = new AtomicLong(0);
    private final ThreadPoolExecutor runningPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue(), new TransferThreadFactory("job"));
    private ConcurrentHashMap<String, JobWrapper> jobs = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, Job> pendingJobs = new ConcurrentHashMap<>();

    public JobManager(TransferManager transferManager, JobProfileDb jobProfileDb) {
        this.jobConfDB = jobProfileDb;
        this.transferManager = transferManager;
        TransferConfiguration agentConf = TransferConfiguration.getAgentConf();
        this.monitorInterval = agentConf.getInt(TransferConstants.JOB_MONITOR_INTERVAL, 5);
        this.jobDbCacheTime = agentConf.getLong(TransferConstants.JOB_DB_CACHE_TIME, TransferConstants.DEFAULT_JOB_DB_CACHE_TIME);
        this.jobDbCacheCheckInterval = agentConf.getLong(TransferConstants.JOB_DB_CACHE_CHECK_INTERVAL, 3600L);
        this.jobMetrics = JobMetrics.create();
    }

    public void addJob(Job job) {
        try {
            JobWrapper jobWrapper = new JobWrapper(this.transferManager, job);
            this.runningPool.execute(jobWrapper);
            if (this.jobs.putIfAbsent(jobWrapper.getJob().getJobInstanceId(), jobWrapper) != null) {
                LOGGER.warn("{} has been added to running pool, cannot be added repeatedly", job.getJobInstanceId());
            } else {
                this.jobMetrics.runningJobs.incr();
            }
        } catch (Exception e) {
            LOGGER.debug("reject job {}", job.getJobInstanceId(), e);
            this.pendingJobs.putIfAbsent(job.getJobInstanceId(), job);
        }
    }

    public boolean submitJobProfile(JobProfile jobProfile) {
        if (jobProfile == null || !jobProfile.allRequiredKeyExist()) {
            LOGGER.error("profile is null or not all required key exists {}", jobProfile == null ? null : jobProfile.toJsonStr());
            return false;
        }
        jobProfile.set(JobConstants.JOB_INSTANCE_ID, TransferUtils.getUniqId(JobConstants.JOB_ID_PREFIX, jobProfile.get(JobConstants.JOB_ID), this.index.incrementAndGet()));
        LOGGER.info("submit job profile {}", jobProfile.toJsonStr());
        getJobConfDb().storeJobFirstTime(jobProfile);
        addJob(new Job(jobProfile));
        return true;
    }

    public void deleteJob(String str) {
        JobWrapper remove = this.jobs.remove(str);
        if (remove != null) {
            LOGGER.info("delete job instance with job id {}", str);
            remove.cleanup();
            getJobConfDb().deleteJob(str);
        }
    }

    private void startJobs() {
        for (JobProfile jobProfile : getJobConfDb().getAcceptedJobs()) {
            LOGGER.info("init starting job from db {}", jobProfile.toJsonStr());
            addJob(new Job(jobProfile));
        }
    }

    public Runnable jobStateCheckThread() {
        return () -> {
            while (isRunnable()) {
                try {
                    Iterator it = this.pendingJobs.keySet().iterator();
                    while (it.hasNext()) {
                        Job remove = this.pendingJobs.remove((String) it.next());
                        if (remove != null) {
                            addJob(remove);
                        }
                    }
                    TimeUnit.SECONDS.sleep(this.monitorInterval);
                } catch (Exception e) {
                    LOGGER.error("error caught", e);
                }
            }
        };
    }

    public Runnable dbStorageCheckThread() {
        return () -> {
            while (isRunnable()) {
                try {
                    this.jobConfDB.removeExpireJobs(this.jobDbCacheTime);
                    TimeUnit.SECONDS.sleep(this.jobDbCacheCheckInterval);
                } catch (Exception e) {
                    LOGGER.error("error caught", e);
                }
            }
        };
    }

    public void markJobAsSuccess(String str) {
        if (this.jobs.remove(str) != null) {
            this.jobMetrics.runningJobs.decr();
            LOGGER.info("job instance {} is success", str);
            this.jobConfDB.updateJobState(str, StateSearchKey.SUCCESS);
        }
    }

    public void markJobAsFailed(String str) {
        if (this.jobs.remove(str) != null) {
            LOGGER.info("job instance {} is failed", str);
            this.jobMetrics.runningJobs.decr();
            this.jobMetrics.fatalJobs.incr();
            this.jobConfDB.updateJobState(str, StateSearchKey.FAILED);
        }
    }

    public JobProfileDb getJobConfDb() {
        return this.jobConfDB;
    }

    public boolean checkJobExsit(String str) {
        return this.jobConfDB.getJob(str) != null;
    }

    public Map<String, JobWrapper> getJobs() {
        return this.jobs;
    }

    @Override // com.zmops.zeus.server.transfer.common.Service
    public void start() {
        submitWorker(jobStateCheckThread());
        submitWorker(dbStorageCheckThread());
        startJobs();
    }

    @Override // com.zmops.zeus.server.transfer.common.Service
    public void stop() throws Exception {
        waitForTerminate();
        this.runningPool.shutdown();
    }
}
