/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.starrocks.client;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.connectors.seatunnel.starrocks.client.StarRocksFlushTuple;
import org.apache.seatunnel.connectors.seatunnel.starrocks.client.StarRocksStreamLoadVisitor;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
import org.apache.seatunnel.shade.com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StarRocksSinkManager {
    private static final Logger log = LoggerFactory.getLogger(StarRocksSinkManager.class);
    private final SinkConfig sinkConfig;
    private final List<byte[]> batchList;
    private final StarRocksStreamLoadVisitor starrocksStreamLoadVisitor;
    private volatile boolean initialize;
    private volatile Exception flushException;
    private int batchRowCount = 0;
    private long batchBytesSize = 0L;

    public StarRocksSinkManager(SinkConfig sinkConfig, TableSchema tableSchema) {
        this(sinkConfig, tableSchema, new StarRocksStreamLoadVisitor(sinkConfig, tableSchema));
    }

    StarRocksSinkManager(SinkConfig sinkConfig, TableSchema tableSchema, StarRocksStreamLoadVisitor streamLoadVisitor) {
        this.sinkConfig = sinkConfig;
        this.batchList = new ArrayList<byte[]>();
        this.starrocksStreamLoadVisitor = streamLoadVisitor;
    }

    private void tryInit() throws IOException {
        if (this.initialize) {
            return;
        }
        this.initialize = true;
    }

    public synchronized void write(String record) throws IOException {
        this.tryInit();
        this.checkFlushException();
        byte[] bts = record.getBytes(StandardCharsets.UTF_8);
        this.batchList.add(bts);
        ++this.batchRowCount;
        this.batchBytesSize += (long)bts.length;
        if (this.batchRowCount >= this.sinkConfig.getBatchMaxSize() || this.batchBytesSize >= this.sinkConfig.getBatchMaxBytes()) {
            this.flush();
        }
    }

    public synchronized void close() throws IOException {
        this.flush();
    }

    public synchronized void flush() throws IOException {
        this.checkFlushException();
        if (this.batchList.isEmpty()) {
            return;
        }
        String label = this.createBatchLabel();
        StarRocksFlushTuple tuple = new StarRocksFlushTuple(label, this.batchBytesSize, new ArrayList<byte[]>(this.batchList));
        for (int i = 0; i <= this.sinkConfig.getMaxRetries(); ++i) {
            try {
                Boolean successFlag = this.starrocksStreamLoadVisitor.doStreamLoad(tuple);
                if (!successFlag.booleanValue()) continue;
                break;
            }
            catch (Exception e) {
                log.warn("Writing records to StarRocks failed, retry times = {}", (Object)i, (Object)e);
                String labelAlreadyMessage = String.format("Label [%s] has already been used", label);
                if (ExceptionUtils.getMessage((Throwable)e).contains(labelAlreadyMessage)) {
                    log.warn("Label [{}] has already been used, Skipping this batch", (Object)label);
                    break;
                }
                if (i >= this.sinkConfig.getMaxRetries()) {
                    throw new StarRocksConnectorException((SeaTunnelErrorCode)StarRocksConnectorErrorCode.WRITE_RECORDS_FAILED, "The number of retries was exceeded, writing records to StarRocks failed.", e);
                }
                if (e instanceof StarRocksConnectorException && ((StarRocksConnectorException)((Object)e)).needReCreateLabel()) {
                    String newLabel = this.createBatchLabel();
                    log.warn(String.format("Batch label changed from [%s] to [%s]", tuple.getLabel(), newLabel));
                    tuple.setLabel(newLabel);
                }
                try {
                    long backoff = Math.min(this.sinkConfig.getRetryBackoffMultiplierMs() * i, this.sinkConfig.getMaxRetryBackoffMs());
                    Thread.sleep(backoff);
                    continue;
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    throw new StarRocksConnectorException((SeaTunnelErrorCode)StarRocksConnectorErrorCode.FLUSH_DATA_FAILED, e);
                }
            }
        }
        this.batchList.clear();
        this.batchRowCount = 0;
        this.batchBytesSize = 0L;
    }

    private void checkFlushException() {
        if (this.flushException != null) {
            throw new StarRocksConnectorException((SeaTunnelErrorCode)StarRocksConnectorErrorCode.FLUSH_DATA_FAILED, this.flushException);
        }
    }

    public String createBatchLabel() {
        StringBuilder sb = new StringBuilder();
        if (!Strings.isNullOrEmpty((String)this.sinkConfig.getLabelPrefix())) {
            sb.append(this.sinkConfig.getLabelPrefix());
        }
        return sb.append(UUID.randomUUID()).toString();
    }
}

