package org.apache.doris.flink.sink.batch;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisBatchLoadException;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.sink.BackendUtil;
import org.apache.doris.flink.sink.EscapeHandler;
import org.apache.doris.flink.sink.HttpUtil;
import org.apache.doris.flink.sink.LoadStatus;
import org.apache.doris.flink.sink.writer.LabelGenerator;
import org.apache.doris.flink.sink.writer.LoadConstants;
import org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.util.Preconditions;
import org.apache.http.impl.client.CloseableHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.class */
public class DorisBatchStreamLoad implements Serializable {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DorisBatchStreamLoad.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final List<String> DORIS_SUCCESS_STATUS = new ArrayList(Arrays.asList(LoadStatus.SUCCESS, LoadStatus.PUBLISH_TIMEOUT));
    private final LabelGenerator labelGenerator;
    private final byte[] lineDelimiter;
    private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load";
    private String loadUrl;
    private String hostPort;
    private final String username;
    private final String password;
    private final Properties loadProps;
    private DorisExecutionOptions executionOptions;
    private ExecutorService loadExecutorService;
    private LoadAsyncExecutor loadAsyncExecutor;
    private BlockingQueue<BatchRecordBuffer> flushQueue;
    private final AtomicBoolean started;
    private BackendUtil backendUtil;
    private Map<String, BatchRecordBuffer> bufferMap = new ConcurrentHashMap();
    private volatile boolean loadThreadAlive = false;
    private AtomicReference<Throwable> exception = new AtomicReference<>(null);
    private CloseableHttpClient httpClient = new HttpUtil().getHttpClient();

    /* loaded from: input_file:org/apache/doris/flink/sink/batch/DorisBatchStreamLoad$DefaultThreadFactory.class */
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory(String str) {
            this.namePrefix = "pool-" + poolNumber.getAndIncrement() + "-" + str + "-";
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable, this.namePrefix + this.threadNumber.getAndIncrement());
            thread.setDaemon(false);
            return thread;
        }
    }

    /* loaded from: input_file:org/apache/doris/flink/sink/batch/DorisBatchStreamLoad$LoadAsyncExecutor.class */
    class LoadAsyncExecutor implements Runnable {
        LoadAsyncExecutor() {
        }

        @Override // java.lang.Runnable
        public void run() {
            DorisBatchStreamLoad.LOG.info("LoadAsyncExecutor start");
            DorisBatchStreamLoad.this.loadThreadAlive = true;
            while (DorisBatchStreamLoad.this.started.get()) {
                try {
                    BatchRecordBuffer batchRecordBuffer = (BatchRecordBuffer) DorisBatchStreamLoad.this.flushQueue.poll(2000L, TimeUnit.MILLISECONDS);
                    if (batchRecordBuffer != null) {
                        if (batchRecordBuffer.getLabelName() != null) {
                            load(batchRecordBuffer.getLabelName(), batchRecordBuffer);
                        }
                    }
                } catch (Exception e) {
                    DorisBatchStreamLoad.LOG.error("worker running error", (Throwable) e);
                    DorisBatchStreamLoad.this.exception.set(e);
                    DorisBatchStreamLoad.this.flushQueue.clear();
                }
            }
            DorisBatchStreamLoad.LOG.info("LoadAsyncExecutor stop");
            DorisBatchStreamLoad.this.loadThreadAlive = false;
        }

        /* JADX WARN: Removed duplicated region for block: B:73:0x01f6  */
        /* JADX WARN: Removed duplicated region for block: B:75:0x01e9 A[SYNTHETIC] */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void load(java.lang.String r7, org.apache.doris.flink.sink.batch.BatchRecordBuffer r8) throws java.io.IOException {
            /*
                Method dump skipped, instructions count: 560
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.doris.flink.sink.batch.DorisBatchStreamLoad.LoadAsyncExecutor.load(java.lang.String, org.apache.doris.flink.sink.batch.BatchRecordBuffer):void");
        }

        private void refreshLoadUrl(String str, String str2) {
            DorisBatchStreamLoad.this.hostPort = DorisBatchStreamLoad.this.backendUtil.getAvailableBackend();
            DorisBatchStreamLoad.this.loadUrl = String.format(DorisBatchStreamLoad.LOAD_URL_PATTERN, DorisBatchStreamLoad.this.hostPort, str, str2);
        }
    }

    public DorisBatchStreamLoad(DorisOptions dorisOptions, DorisReadOptions dorisReadOptions, DorisExecutionOptions dorisExecutionOptions, LabelGenerator labelGenerator) {
        this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ? new BackendUtil(dorisOptions.getBenodes()) : new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
        this.hostPort = this.backendUtil.getAvailableBackend();
        this.username = dorisOptions.getUsername();
        this.password = dorisOptions.getPassword();
        this.loadProps = dorisExecutionOptions.getStreamLoadProp();
        this.labelGenerator = labelGenerator;
        this.lineDelimiter = EscapeHandler.escapeString(this.loadProps.getProperty(LoadConstants.LINE_DELIMITER_KEY, "\n")).getBytes();
        this.executionOptions = dorisExecutionOptions;
        this.flushQueue = new LinkedBlockingDeque(dorisExecutionOptions.getFlushQueueSize());
        if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) {
            String[] split = dorisOptions.getTableIdentifier().split("\\.");
            Preconditions.checkState(split.length == 2, "tableIdentifier input error, the format is database.table");
            this.loadUrl = String.format(LOAD_URL_PATTERN, this.hostPort, split[0], split[1]);
        }
        this.loadAsyncExecutor = new LoadAsyncExecutor();
        this.loadExecutorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1), new DefaultThreadFactory("streamload-executor"), new ThreadPoolExecutor.AbortPolicy());
        this.started = new AtomicBoolean(true);
        this.loadExecutorService.execute(this.loadAsyncExecutor);
    }

    public synchronized void writeRecord(String str, String str2, byte[] bArr) throws InterruptedException {
        checkFlushException();
        String tableIdentifier = getTableIdentifier(str, str2);
        BatchRecordBuffer computeIfAbsent = this.bufferMap.computeIfAbsent(tableIdentifier, str3 -> {
            return new BatchRecordBuffer(str, str2, this.lineDelimiter, this.executionOptions.getBufferFlushMaxBytes());
        });
        computeIfAbsent.insert(bArr);
        if (computeIfAbsent.getBufferSizeBytes() >= this.executionOptions.getBufferFlushMaxBytes() * 0.8d || (this.executionOptions.getBufferFlushMaxRows() != 0 && computeIfAbsent.getNumOfRecords() >= this.executionOptions.getBufferFlushMaxRows())) {
            flush(tableIdentifier, false);
        }
    }

    public synchronized void flush(String str, boolean z) throws InterruptedException {
        checkFlushException();
        if (null == str) {
            Iterator<String> it = this.bufferMap.keySet().iterator();
            while (it.hasNext()) {
                flushBuffer(it.next());
            }
        } else if (this.bufferMap.containsKey(str)) {
            flushBuffer(str);
        }
        if (z) {
            waitAsyncLoadFinish();
        }
    }

    private synchronized void flushBuffer(String str) {
        BatchRecordBuffer batchRecordBuffer = this.bufferMap.get(str);
        batchRecordBuffer.setLabelName(this.labelGenerator.generateBatchLabel(batchRecordBuffer.getTable()));
        putRecordToFlushQueue(batchRecordBuffer);
        this.bufferMap.remove(str);
    }

    private void putRecordToFlushQueue(BatchRecordBuffer batchRecordBuffer) {
        checkFlushException();
        if (!this.loadThreadAlive) {
            throw new RuntimeException("load thread already exit, write was interrupted");
        }
        try {
            this.flushQueue.put(batchRecordBuffer);
        } catch (InterruptedException e) {
            throw new RuntimeException("Failed to put record buffer to flush queue");
        }
    }

    private void checkFlushException() {
        if (this.exception.get() != null) {
            throw new DorisBatchLoadException(this.exception.get());
        }
    }

    private void waitAsyncLoadFinish() {
        for (int i = 0; i < this.executionOptions.getFlushQueueSize() + 1; i++) {
            putRecordToFlushQueue(new BatchRecordBuffer());
        }
    }

    private String getTableIdentifier(String str, String str2) {
        return str + "." + str2;
    }

    public void close() {
        this.loadExecutorService.shutdown();
        this.started.set(false);
        this.flushQueue.clear();
    }
}
