package org.apache.doris.flink.sink.writer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.exception.StreamLoadException;
import org.apache.doris.flink.rest.models.RespContent;
import org.apache.doris.flink.sink.BackendUtil;
import org.apache.doris.flink.sink.DorisCommittable;
import org.apache.doris.flink.sink.HttpUtil;
import org.apache.doris.flink.sink.LoadStatus;
import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/doris/flink/sink/writer/DorisWriter.class */
public class DorisWriter<IN> implements StatefulSink.StatefulSinkWriter<IN, DorisWriterState>, TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, DorisCommittable> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DorisWriter.class);
    private static final List<String> DORIS_SUCCESS_STATUS = new ArrayList(Arrays.asList(LoadStatus.SUCCESS, LoadStatus.PUBLISH_TIMEOUT));
    private final long lastCheckpointId;
    private long curCheckpointId;
    volatile boolean globalLoading;
    private final DorisOptions dorisOptions;
    private final DorisReadOptions dorisReadOptions;
    private final DorisExecutionOptions executionOptions;
    private final String labelPrefix;
    private final int subtaskId;
    private final int intervalTime;
    private final DorisRecordSerializer<IN> serializer;
    private final transient ScheduledExecutorService scheduledExecutorService;
    private transient Thread executorThread;
    private BackendUtil backendUtil;
    private Map<String, DorisStreamLoad> dorisStreamLoadMap = new ConcurrentHashMap();
    private Map<String, LabelGenerator> labelGeneratorMap = new ConcurrentHashMap();
    private Map<String, Boolean> loadingMap = new ConcurrentHashMap();
    private volatile transient Exception loadException = null;

    public DorisWriter(Sink.InitContext initContext, Collection<DorisWriterState> collection, DorisRecordSerializer<IN> dorisRecordSerializer, DorisOptions dorisOptions, DorisReadOptions dorisReadOptions, DorisExecutionOptions dorisExecutionOptions) {
        this.lastCheckpointId = initContext.getRestoredCheckpointId().orElse(0L);
        this.curCheckpointId = this.lastCheckpointId + 1;
        LOG.info("restore checkpointId {}", Long.valueOf(this.lastCheckpointId));
        LOG.info("labelPrefix " + dorisExecutionOptions.getLabelPrefix());
        this.labelPrefix = dorisExecutionOptions.getLabelPrefix();
        this.subtaskId = initContext.getSubtaskId();
        this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, (ThreadFactory) new ExecutorThreadFactory("stream-load-check"));
        this.serializer = dorisRecordSerializer;
        this.dorisOptions = dorisOptions;
        this.dorisReadOptions = dorisReadOptions;
        this.executionOptions = dorisExecutionOptions;
        this.intervalTime = dorisExecutionOptions.checkInterval().intValue();
        this.globalLoading = false;
        initializeLoad(collection);
    }

    public void initializeLoad(Collection<DorisWriterState> collection) {
        this.backendUtil = BackendUtil.getInstance(this.dorisOptions, this.dorisReadOptions, LOG);
        try {
            if (this.executionOptions.enabled2PC().booleanValue()) {
                abortLingeringTransactions(collection);
            }
            this.executorThread = Thread.currentThread();
            this.scheduledExecutorService.scheduleWithFixedDelay(this::checkDone, 200L, this.intervalTime, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            LOG.error("Failed to abort transaction.", (Throwable) e);
            throw new DorisRuntimeException(e);
        }
    }

    private void abortLingeringTransactions(Collection<DorisWriterState> collection) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (DorisWriterState dorisWriterState : collection) {
            if (!dorisWriterState.getLabelPrefix().equals(this.labelPrefix)) {
                LOG.warn("Label prefix from previous execution {} has changed to {}.", dorisWriterState.getLabelPrefix(), this.executionOptions.getLabelPrefix());
            }
            getStreamLoader(dorisWriterState.getDatabase() + "." + dorisWriterState.getTable()).abortPreCommit(dorisWriterState.getLabelPrefix(), this.curCheckpointId);
            arrayList.add(dorisWriterState.getLabelPrefix());
        }
        if (!arrayList.contains(this.labelPrefix) && StringUtils.isNotEmpty(this.dorisOptions.getTableIdentifier()) && StringUtils.isNotEmpty(this.labelPrefix)) {
            getStreamLoader(this.dorisOptions.getTableIdentifier()).abortPreCommit(this.labelPrefix, this.curCheckpointId);
        }
    }

    public void write(IN in, SinkWriter.Context context) throws IOException {
        checkLoadException();
        String tableIdentifier = this.dorisOptions.getTableIdentifier();
        DorisRecord serialize = this.serializer.serialize(in);
        if (serialize == null || serialize.getRow() == null) {
            return;
        }
        if (serialize.getTableIdentifier() != null) {
            tableIdentifier = serialize.getTableIdentifier();
        }
        DorisStreamLoad streamLoader = getStreamLoader(tableIdentifier);
        if (!this.loadingMap.containsKey(tableIdentifier)) {
            streamLoader.startLoad(getLabelGenerator(tableIdentifier).generateTableLabel(this.curCheckpointId), false);
            this.loadingMap.put(tableIdentifier, true);
            this.globalLoading = true;
        }
        streamLoader.writeRecord(serialize.getRow());
    }

    public void flush(boolean z) throws IOException, InterruptedException {
    }

    public Collection<DorisCommittable> prepareCommit() throws IOException, InterruptedException {
        if (!this.globalLoading && this.loadingMap.values().stream().noneMatch((v0) -> {
            return v0.booleanValue();
        })) {
            return Collections.emptyList();
        }
        this.globalLoading = false;
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, DorisStreamLoad> entry : this.dorisStreamLoadMap.entrySet()) {
            String key = entry.getKey();
            if (this.loadingMap.getOrDefault(key, false).booleanValue()) {
                DorisStreamLoad value = entry.getValue();
                RespContent stopLoad = value.stopLoad(getLabelGenerator(key).generateTableLabel(this.curCheckpointId));
                if (!DORIS_SUCCESS_STATUS.contains(stopLoad.getStatus())) {
                    throw new DorisRuntimeException(String.format("tabel {} stream load error: %s, see more in %s", key, stopLoad.getMessage(), stopLoad.getErrorURL()));
                }
                if (this.executionOptions.enabled2PC().booleanValue()) {
                    arrayList.add(new DorisCommittable(value.getHostPort(), value.getDb(), stopLoad.getTxnId()));
                }
            } else {
                LOG.debug("skip table {}, no data need to load.", key);
            }
        }
        this.loadingMap.clear();
        return arrayList;
    }

    public List<DorisWriterState> snapshotState(long j) throws IOException {
        ArrayList arrayList = new ArrayList();
        for (DorisStreamLoad dorisStreamLoad : this.dorisStreamLoadMap.values()) {
            dorisStreamLoad.setHostPort(this.backendUtil.getAvailableBackend());
            arrayList.add(new DorisWriterState(this.labelPrefix, dorisStreamLoad.getDb(), dorisStreamLoad.getTable(), this.subtaskId));
        }
        this.curCheckpointId = j + 1;
        return arrayList;
    }

    private LabelGenerator getLabelGenerator(String str) {
        return this.labelGeneratorMap.computeIfAbsent(str, str2 -> {
            return new LabelGenerator(this.labelPrefix, this.executionOptions.enabled2PC().booleanValue(), str, this.subtaskId);
        });
    }

    private DorisStreamLoad getStreamLoader(String str) {
        LabelGenerator labelGenerator = getLabelGenerator(str);
        this.dorisOptions.setTableIdentifier(str);
        return this.dorisStreamLoadMap.computeIfAbsent(str, str2 -> {
            return new DorisStreamLoad(this.backendUtil.getAvailableBackend(), this.dorisOptions, this.executionOptions, labelGenerator, new HttpUtil().getHttpClient());
        });
    }

    private void checkDone() {
        for (Map.Entry<String, DorisStreamLoad> entry : this.dorisStreamLoadMap.entrySet()) {
            checkAllDone(entry.getKey(), entry.getValue());
        }
    }

    private void checkAllDone(String str, DorisStreamLoad dorisStreamLoad) {
        String message;
        LOG.debug("start timer checker, interval {} ms", Integer.valueOf(this.intervalTime));
        if (dorisStreamLoad.getPendingLoadFuture() == null || !dorisStreamLoad.getPendingLoadFuture().isDone()) {
            return;
        }
        if (!this.globalLoading || !this.loadingMap.get(str).booleanValue()) {
            LOG.debug("not loading, skip timer checker for table {}", str);
            return;
        }
        if (dorisStreamLoad.getPendingLoadFuture() == null || !dorisStreamLoad.getPendingLoadFuture().isDone()) {
            return;
        }
        if (!this.executionOptions.isUseCache()) {
            try {
                message = dorisStreamLoad.handlePreCommitResponse(dorisStreamLoad.getPendingLoadFuture().get()).getMessage();
            } catch (Exception e) {
                message = e.getMessage();
            }
            this.loadException = new StreamLoadException(message);
            LOG.error("table {} stream load finished unexpectedly, interrupt worker thread! {}", str, message);
            this.executorThread.interrupt();
            return;
        }
        try {
            dorisStreamLoad.setHostPort(this.backendUtil.getAvailableBackend());
            if (this.executionOptions.enabled2PC().booleanValue()) {
                dorisStreamLoad.abortPreCommit(this.labelPrefix, this.curCheckpointId);
            }
            LOG.info("getting exception, breakpoint resume for checkpoint ID: {}, table {}", Long.valueOf(this.curCheckpointId), str);
            dorisStreamLoad.startLoad(getLabelGenerator(str).generateTableLabel(this.curCheckpointId), true);
        } catch (Exception e2) {
            throw new DorisRuntimeException(e2);
        }
    }

    private void checkLoadException() {
        if (this.loadException != null) {
            throw new RuntimeException("error while loading data.", this.loadException);
        }
    }

    @VisibleForTesting
    public boolean isLoading() {
        return this.globalLoading;
    }

    @VisibleForTesting
    public void setDorisStreamLoadMap(Map<String, DorisStreamLoad> map) {
        this.dorisStreamLoadMap = map;
    }

    public void close() throws Exception {
        LOG.info("Close DorisWriter.");
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.shutdownNow();
        }
        if (this.dorisStreamLoadMap == null || this.dorisStreamLoadMap.isEmpty()) {
            return;
        }
        Iterator<DorisStreamLoad> it = this.dorisStreamLoadMap.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }
}
