package org.apache.doris.flink.sink.writer;

import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.exception.StreamLoadException;
import org.apache.doris.flink.rest.models.RespContent;
import org.apache.doris.flink.sink.EscapeHandler;
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.LoadStatus;
import org.apache.doris.flink.sink.ResponseUtil;
import org.apache.doris.shaded.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/doris/flink/sink/writer/DorisStreamLoad.class */
public class DorisStreamLoad implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DorisStreamLoad.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private final LabelGenerator labelGenerator;
    private final byte[] lineDelimiter;
    private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load";
    private static final String ABORT_URL_PATTERN = "http://%s/api/%s/_stream_load_2pc";
    private static final String JOB_EXIST_FINISHED = "FINISHED";
    private String loadUrlStr;
    private String hostPort;
    private String abortUrlStr;
    private final String user;
    private final String passwd;
    private final String db;
    private final String table;
    private final boolean enable2PC;
    private final boolean enableDelete;
    private final Properties streamLoadProp;
    private final RecordStream recordStream;
    private volatile Future<CloseableHttpResponse> pendingLoadFuture;
    private final CloseableHttpClient httpClient;
    private final ExecutorService executorService;
    private boolean loadBatchFirstRecord;

    public DorisStreamLoad(String str, DorisOptions dorisOptions, DorisExecutionOptions dorisExecutionOptions, LabelGenerator labelGenerator, CloseableHttpClient closeableHttpClient) {
        this.hostPort = str;
        String[] split = dorisOptions.getTableIdentifier().split("\\.");
        this.db = split[0];
        this.table = split[1];
        this.user = dorisOptions.getUsername();
        this.passwd = dorisOptions.getPassword();
        this.labelGenerator = labelGenerator;
        this.loadUrlStr = String.format(LOAD_URL_PATTERN, str, this.db, this.table);
        this.abortUrlStr = String.format(ABORT_URL_PATTERN, str, this.db);
        this.enable2PC = dorisExecutionOptions.enabled2PC().booleanValue();
        this.streamLoadProp = dorisExecutionOptions.getStreamLoadProp();
        this.enableDelete = dorisExecutionOptions.getDeletable().booleanValue();
        this.httpClient = closeableHttpClient;
        this.executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>) new LinkedBlockingQueue(), (ThreadFactory) new ExecutorThreadFactory("stream-load-upload"));
        this.recordStream = new RecordStream(dorisExecutionOptions.getBufferSize(), dorisExecutionOptions.getBufferCount(), dorisExecutionOptions.isUseCache());
        this.lineDelimiter = EscapeHandler.escapeString(this.streamLoadProp.getProperty(LoadConstants.LINE_DELIMITER_KEY, "\n")).getBytes();
        this.loadBatchFirstRecord = true;
    }

    public String getDb() {
        return this.db;
    }

    public String getTable() {
        return this.table;
    }

    public String getHostPort() {
        return this.hostPort;
    }

    public void setHostPort(String str) {
        this.hostPort = str;
        this.loadUrlStr = String.format(LOAD_URL_PATTERN, str, this.db, this.table);
        this.abortUrlStr = String.format(ABORT_URL_PATTERN, str, this.db);
    }

    public Future<CloseableHttpResponse> getPendingLoadFuture() {
        return this.pendingLoadFuture;
    }

    public void abortPreCommit(String str, long j) throws Exception {
        long j2 = j;
        LOG.info("abort for labelSuffix {}. start chkId {}.", str, Long.valueOf(j));
        while (true) {
            try {
                String generateTableLabel = this.labelGenerator.generateTableLabel(j2);
                HttpPutBuilder httpPutBuilder = new HttpPutBuilder();
                httpPutBuilder.setUrl(this.loadUrlStr).baseAuth(this.user, this.passwd).addCommonHeader().enable2PC().setLabel(generateTableLabel).setEmptyEntity().addProperties(this.streamLoadProp);
                RespContent handlePreCommitResponse = handlePreCommitResponse(this.httpClient.execute((HttpUriRequest) httpPutBuilder.build()));
                Preconditions.checkState(BooleanUtils.TRUE.equals(handlePreCommitResponse.getTwoPhaseCommit()));
                if (!LoadStatus.LABEL_ALREADY_EXIST.equals(handlePreCommitResponse.getStatus())) {
                    LOG.info("abort {} for check label {}.", Long.valueOf(handlePreCommitResponse.getTxnId()), generateTableLabel);
                    abortTransaction(handlePreCommitResponse.getTxnId());
                    LOG.info("abort for labelSuffix {} finished", str);
                    return;
                } else {
                    if (JOB_EXIST_FINISHED.equals(handlePreCommitResponse.getExistingJobStatus())) {
                        throw new DorisException("Load status is Label Already Exists and load job finished, change you label prefix or restore from latest savepoint!");
                    }
                    Matcher matcher = ResponseUtil.LABEL_EXIST_PATTERN.matcher(handlePreCommitResponse.getMessage());
                    if (!matcher.find()) {
                        LOG.error("response: {}", handlePreCommitResponse.toString());
                        throw new DorisException("Load Status is Label Already Exists, but no txnID associated with it!");
                    }
                    Preconditions.checkState(generateTableLabel.equals(matcher.group(1)));
                    long parseLong = Long.parseLong(matcher.group(2));
                    LOG.info("abort {} for exist label {}", Long.valueOf(parseLong), generateTableLabel);
                    abortTransaction(parseLong);
                    j2++;
                }
            } catch (Exception e) {
                LOG.warn("failed to stream load data", (Throwable) e);
                throw e;
            }
        }
    }

    public void writeRecord(byte[] bArr) throws IOException {
        if (this.loadBatchFirstRecord) {
            this.loadBatchFirstRecord = false;
        } else {
            this.recordStream.write(this.lineDelimiter);
        }
        this.recordStream.write(bArr);
    }

    @VisibleForTesting
    public RecordStream getRecordStream() {
        return this.recordStream;
    }

    public RespContent handlePreCommitResponse(CloseableHttpResponse closeableHttpResponse) throws Exception {
        if (closeableHttpResponse.getStatusLine().getStatusCode() != 200 || closeableHttpResponse.getEntity() == null) {
            throw new StreamLoadException("stream load error: " + closeableHttpResponse.getStatusLine().toString());
        }
        String entityUtils = EntityUtils.toString(closeableHttpResponse.getEntity());
        LOG.info("load Result {}", entityUtils);
        return (RespContent) OBJECT_MAPPER.readValue(entityUtils, RespContent.class);
    }

    public RespContent stopLoad(String str) throws IOException {
        this.recordStream.endInput();
        LOG.info("table {} stream load stopped for {} on host {}", this.table, str, this.hostPort);
        Preconditions.checkState(this.pendingLoadFuture != null);
        try {
            return handlePreCommitResponse(this.pendingLoadFuture.get());
        } catch (Exception e) {
            throw new DorisRuntimeException(e);
        }
    }

    public void startLoad(String str, boolean z) throws IOException {
        this.loadBatchFirstRecord = !z;
        HttpPutBuilder httpPutBuilder = new HttpPutBuilder();
        this.recordStream.startInput(z);
        LOG.info("table {} stream load started for {} on host {}", this.table, str, this.hostPort);
        try {
            httpPutBuilder.setUrl(this.loadUrlStr).baseAuth(this.user, this.passwd).addCommonHeader().addHiddenColumns(this.enableDelete).setLabel(str).setEntity(new InputStreamEntity(this.recordStream)).addProperties(this.streamLoadProp);
            if (this.enable2PC) {
                httpPutBuilder.enable2PC();
            }
            this.pendingLoadFuture = this.executorService.submit(() -> {
                LOG.info("table {} start execute load", this.table);
                return this.httpClient.execute((HttpUriRequest) httpPutBuilder.build());
            });
        } catch (Exception e) {
            LOG.warn("failed to stream load data with label: " + str, (Throwable) e);
            throw e;
        }
    }

    public void abortTransaction(long j) throws Exception {
        HttpPutBuilder httpPutBuilder = new HttpPutBuilder();
        httpPutBuilder.setUrl(this.abortUrlStr).baseAuth(this.user, this.passwd).addCommonHeader().addTxnId(j).setEmptyEntity().abort();
        CloseableHttpResponse execute = this.httpClient.execute((HttpUriRequest) httpPutBuilder.build());
        if (execute.getStatusLine().getStatusCode() != 200 || execute.getEntity() == null) {
            LOG.warn("abort transaction response: " + execute.getStatusLine().toString());
            throw new DorisRuntimeException("Fail to abort transaction " + j + " with url " + this.abortUrlStr);
        }
        Map map = (Map) new ObjectMapper().readValue(EntityUtils.toString(execute.getEntity()), new TypeReference<HashMap<String, String>>() { // from class: org.apache.doris.flink.sink.writer.DorisStreamLoad.1
        });
        if (LoadStatus.SUCCESS.equals(map.get("status"))) {
            return;
        }
        if (ResponseUtil.isCommitted((String) map.get("msg"))) {
            throw new DorisException("try abort committed transaction, do you recover from old savepoint?");
        }
        LOG.warn("Fail to abort transaction. txnId: {}, error: {}", Long.valueOf(j), map.get("msg"));
    }

    public void close() throws IOException {
        if (null != this.httpClient) {
            try {
                this.httpClient.close();
            } catch (IOException e) {
                throw new IOException("Closing httpClient failed.", e);
            }
        }
        if (null != this.executorService) {
            this.executorService.shutdownNow();
        }
    }
}
