package com.zmops.zeus.server.transfer.core.sink;

import com.zmops.zeus.server.transfer.api.Message;
import com.zmops.zeus.server.transfer.api.Sink;
import com.zmops.zeus.server.transfer.common.TransferThreadFactory;
import com.zmops.zeus.server.transfer.conf.CommonConstants;
import com.zmops.zeus.server.transfer.conf.JobConstants;
import com.zmops.zeus.server.transfer.conf.JobProfile;
import com.zmops.zeus.server.transfer.core.message.EndMessage;
import com.zmops.zeus.server.transfer.core.message.PackProxyMessage;
import com.zmops.zeus.server.transfer.core.message.ProxyMessage;
import com.zmops.zeus.server.transfer.sender.SenderManager;
import com.zmops.zeus.server.transfer.utils.TransferUtils;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/zmops/zeus/server/transfer/core/sink/DataCarrierSink.class */
public class DataCarrierSink implements Sink {
    private static final Logger LOGGER = LoggerFactory.getLogger(DataCarrierSink.class);
    private SenderManager senderManager;
    private String bid;
    private String tid;
    private String sourceFile;
    private String jobInstanceId;
    private int maxBatchSize;
    private int maxBatchTimeoutMs;
    private int batchFlushInterval;
    private int maxQueueNumber;
    private final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new TransferThreadFactory("ProxySink"));
    private volatile boolean shutdown = false;
    private ConcurrentHashMap<String, PackProxyMessage> cache;
    private long dataTime;

    @Override // com.zmops.zeus.server.transfer.api.Sink
    public void write(Message message) {
        if (message == null || message.getBody().length <= 0) {
            return;
        }
        message.getHeader().put(CommonConstants.PROXY_KEY_BID, this.bid);
        message.getHeader().put(CommonConstants.PROXY_KEY_TID, this.tid);
        if (message instanceof EndMessage) {
            return;
        }
        ProxyMessage parse = ProxyMessage.parse(message);
        this.cache.compute(parse.getTid(), (str, packProxyMessage) -> {
            if (packProxyMessage == null) {
                packProxyMessage = new PackProxyMessage(this.maxBatchSize, this.maxQueueNumber, this.maxBatchTimeoutMs, parse.getTid());
            }
            packProxyMessage.addProxyMessage(parse);
            return packProxyMessage;
        });
    }

    @Override // com.zmops.zeus.server.transfer.api.Sink
    public void setSourceFile(String str) {
        this.sourceFile = str;
    }

    private Runnable flushCache() {
        return () -> {
            LOGGER.info("start flush cache thread for {} TDBusSink", this.bid);
            while (!this.shutdown) {
                try {
                    this.cache.forEach((str, packProxyMessage) -> {
                        Pair<String, List<byte[]>> fetchBatch = packProxyMessage.fetchBatch();
                        if (fetchBatch != null) {
                            this.senderManager.sendBatch(this.jobInstanceId, this.bid, (String) fetchBatch.getKey(), (List) fetchBatch.getValue(), 0, this.dataTime);
                        }
                    });
                    TransferUtils.silenceSleepInMs(this.batchFlushInterval);
                } catch (Exception e) {
                    LOGGER.error("error caught", e);
                }
            }
        };
    }

    @Override // com.zmops.zeus.server.transfer.api.Stage
    public void init(JobProfile jobProfile) {
        this.maxBatchSize = jobProfile.getInt(CommonConstants.PROXY_PACKAGE_MAX_SIZE, CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE);
        this.maxQueueNumber = jobProfile.getInt(CommonConstants.PROXY_TID_QUEUE_MAX_NUMBER, 10000);
        this.maxBatchTimeoutMs = jobProfile.getInt(CommonConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS, CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS);
        this.jobInstanceId = jobProfile.get(JobConstants.JOB_INSTANCE_ID);
        this.batchFlushInterval = jobProfile.getInt(CommonConstants.PROXY_BATCH_FLUSH_INTERVAL, 100);
        this.cache = new ConcurrentHashMap<>(10);
        this.bid = jobProfile.get(CommonConstants.PROXY_BID);
        this.dataTime = TransferUtils.timeStrConvertToMillSec(jobProfile.get(JobConstants.JOB_DATA_TIME, JobConstants.DEFAULT_JOB_LINE_FILTER), jobProfile.get(JobConstants.JOB_CYCLE_UNIT, JobConstants.DEFAULT_JOB_LINE_FILTER));
        this.bid = jobProfile.get(CommonConstants.PROXY_BID);
        this.tid = jobProfile.get(CommonConstants.PROXY_TID);
        this.executorService.execute(flushCache());
        this.senderManager = new SenderManager(jobProfile, this.bid, this.sourceFile);
    }

    private HashMap<String, String> parseAttrFromJobProfile(JobProfile jobProfile) {
        HashMap<String, String> hashMap = new HashMap<>();
        String str = jobProfile.get(JobConstants.JOB_ADDITION_STR, JobConstants.DEFAULT_JOB_LINE_FILTER);
        if (!str.isEmpty()) {
            hashMap.putAll(TransferUtils.getAdditionAttr(str));
        }
        if (jobProfile.getBoolean(JobConstants.JOB_RETRY, false)) {
            hashMap.put(CommonConstants.PROXY_OCEANUS_F, CommonConstants.PROXY_OCEANUS_BL);
        }
        hashMap.put(CommonConstants.PROXY_KEY_ID, jobProfile.get(JobConstants.JOB_ID));
        hashMap.put(CommonConstants.PROXY_KEY_AGENT_IP, jobProfile.get(JobConstants.JOB_IP));
        return hashMap;
    }

    @Override // com.zmops.zeus.server.transfer.api.Stage
    public void destroy() {
        LOGGER.info("destroy sink which sink from source file {}", this.sourceFile);
        while (!sinkFinish()) {
            LOGGER.info("job {} wait until cache all flushed to proxy", this.jobInstanceId);
            TransferUtils.silenceSleepInMs(this.batchFlushInterval);
        }
        this.shutdown = true;
        this.executorService.shutdown();
    }

    private boolean sinkFinish() {
        return this.cache.values().stream().allMatch((v0) -> {
            return v0.isEmpty();
        });
    }
}
