package org.apache.doris.flink.sink.batch;

import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.writer.LabelGenerator;
import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/doris/flink/sink/batch/DorisBatchWriter.class */
public class DorisBatchWriter<IN> implements SinkWriter<IN> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DorisBatchWriter.class);
    private DorisBatchStreamLoad batchStreamLoad;
    private final DorisOptions dorisOptions;
    private final DorisReadOptions dorisReadOptions;
    private final DorisExecutionOptions executionOptions;
    private final String labelPrefix;
    private final LabelGenerator labelGenerator;
    private final long flushIntervalMs;
    private final DorisRecordSerializer<IN> serializer;
    private final transient ScheduledExecutorService scheduledExecutorService;
    private volatile transient Exception flushException = null;
    private String database;
    private String table;

    public DorisBatchWriter(Sink.InitContext initContext, DorisRecordSerializer<IN> dorisRecordSerializer, DorisOptions dorisOptions, DorisReadOptions dorisReadOptions, DorisExecutionOptions dorisExecutionOptions) {
        if (!StringUtils.isNullOrWhitespaceOnly(dorisOptions.getTableIdentifier())) {
            String[] split = dorisOptions.getTableIdentifier().split("\\.");
            Preconditions.checkState(split.length == 2, "tableIdentifier input error, the format is database.table");
            this.database = split[0];
            this.table = split[1];
        }
        LOG.info("labelPrefix " + dorisExecutionOptions.getLabelPrefix());
        this.labelPrefix = dorisExecutionOptions.getLabelPrefix() + "_" + initContext.getSubtaskId();
        this.labelGenerator = new LabelGenerator(this.labelPrefix, false);
        this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, (ThreadFactory) new ExecutorThreadFactory("stream-load-flush-interval"));
        this.serializer = dorisRecordSerializer;
        this.dorisOptions = dorisOptions;
        this.dorisReadOptions = dorisReadOptions;
        this.executionOptions = dorisExecutionOptions;
        this.flushIntervalMs = dorisExecutionOptions.getBufferFlushIntervalMs();
    }

    public void initializeLoad() throws IOException {
        this.batchStreamLoad = new DorisBatchStreamLoad(this.dorisOptions, this.dorisReadOptions, this.executionOptions, this.labelGenerator);
        this.scheduledExecutorService.scheduleWithFixedDelay(this::intervalFlush, this.flushIntervalMs, this.flushIntervalMs, TimeUnit.MILLISECONDS);
    }

    private void intervalFlush() {
        try {
            LOG.info("interval flush triggered.");
            this.batchStreamLoad.flush(null, false);
        } catch (InterruptedException e) {
            this.flushException = e;
        }
    }

    public void write(IN in, SinkWriter.Context context) throws IOException, InterruptedException {
        checkFlushException();
        String str = this.database;
        String str2 = this.table;
        DorisRecord serialize = this.serializer.serialize(in);
        if (serialize == null || serialize.getRow() == null) {
            return;
        }
        if (serialize.getTableIdentifier() != null) {
            str = serialize.getDatabase();
            str2 = serialize.getTable();
        }
        this.batchStreamLoad.writeRecord(str, str2, serialize.getRow());
    }

    public void flush(boolean z) throws IOException, InterruptedException {
        checkFlushException();
        LOG.info("checkpoint flush triggered.");
        this.batchStreamLoad.flush(null, true);
    }

    public void close() throws Exception {
        LOG.info("DorisBatchWriter Close");
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.shutdownNow();
        }
        this.batchStreamLoad.close();
    }

    private void checkFlushException() {
        if (this.flushException != null) {
            throw new RuntimeException("Writing records to streamload failed.", this.flushException);
        }
    }
}
