package com.zmops.zeus.server.transfer.core.task;

import com.zmops.zeus.server.transfer.common.AbstractDaemon;
import com.zmops.zeus.server.transfer.conf.CommonConstants;
import com.zmops.zeus.server.transfer.conf.JobProfile;
import com.zmops.zeus.server.transfer.core.TransferManager;
import com.zmops.zeus.server.transfer.core.db.JobProfileDb;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/zmops/zeus/server/transfer/core/task/TaskPositionManager.class */
public class TaskPositionManager extends AbstractDaemon {
    public static final int DEFAULT_FLUSH_TIMEOUT = 3;
    private final TransferManager transferManager;
    private final JobProfileDb jobConfDb;
    private final ConcurrentHashMap<String, ConcurrentHashMap<String, Long>> jobTaskPositionMap = new ConcurrentHashMap<>();
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskPositionManager.class);
    private static volatile TaskPositionManager taskPositionManager = null;

    private TaskPositionManager(TransferManager transferManager) {
        this.transferManager = transferManager;
        this.jobConfDb = transferManager.getJobManager().getJobConfDb();
    }

    public static TaskPositionManager getTaskPositionManager(TransferManager transferManager) {
        if (taskPositionManager == null) {
            synchronized (TaskPositionManager.class) {
                if (taskPositionManager == null) {
                    taskPositionManager = new TaskPositionManager(transferManager);
                }
            }
        }
        return taskPositionManager;
    }

    public static TaskPositionManager getTaskPositionManager() {
        if (taskPositionManager == null) {
            throw new RuntimeException("task position manager has not been initialized by agentManager");
        }
        return taskPositionManager;
    }

    @Override // com.zmops.zeus.server.transfer.common.Service
    public void start() throws Exception {
        submitWorker(taskPositionFlushThread());
    }

    private Runnable taskPositionFlushThread() {
        return () -> {
            while (isRunnable()) {
                try {
                    Iterator it = this.jobTaskPositionMap.keySet().iterator();
                    while (it.hasNext()) {
                        String str = (String) it.next();
                        JobProfile jobProfile = this.jobConfDb.getJobProfile(str);
                        if (jobProfile == null) {
                            LOGGER.warn("jobProfile {} cannot be found in db, might be deleted by standalone mode, now delete job position in memory", str);
                            deleteJobPosition(str);
                        } else {
                            flushJobProfile(str, jobProfile);
                        }
                    }
                    TimeUnit.SECONDS.sleep(3L);
                } catch (Exception e) {
                    LOGGER.error("error caught", e);
                }
            }
        };
    }

    private void flushJobProfile(String str, JobProfile jobProfile) {
        this.jobTaskPositionMap.get(str).forEach((str2, l) -> {
            jobProfile.setLong(str2 + CommonConstants.POSITION_SUFFIX, l.longValue());
        });
        if (!this.jobConfDb.checkJobfinished(jobProfile)) {
            this.jobConfDb.updateJobProfile(jobProfile);
        } else {
            LOGGER.info("Cannot update job profile {}, delete memory job in jobTaskPosition", str);
            deleteJobPosition(str);
        }
    }

    private void deleteJobPosition(String str) {
        this.jobTaskPositionMap.remove(str);
    }

    @Override // com.zmops.zeus.server.transfer.common.Service
    public void stop() throws Exception {
        waitForTerminate();
    }

    public void updateFileSinkPosition(String str, String str2, long j) {
        long longValue;
        ConcurrentHashMap<String, Long> concurrentHashMap = new ConcurrentHashMap<>();
        ConcurrentHashMap<String, Long> putIfAbsent = this.jobTaskPositionMap.putIfAbsent(str, concurrentHashMap);
        if (putIfAbsent == null) {
            putIfAbsent = concurrentHashMap;
            longValue = this.jobConfDb.getJobProfile(str).getLong(str2 + CommonConstants.POSITION_SUFFIX, 0L);
        } else {
            longValue = putIfAbsent.getOrDefault(str2, 1L).longValue();
        }
        putIfAbsent.put(str2, Long.valueOf(longValue + j));
    }

    public ConcurrentHashMap<String, Long> getTaskPositionMap(String str) {
        return this.jobTaskPositionMap.get(str);
    }

    public ConcurrentHashMap<String, ConcurrentHashMap<String, Long>> getJobTaskPosition() {
        return this.jobTaskPositionMap;
    }
}
