/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.influxdb.sink;

import java.io.IOException;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;
import org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
import org.apache.seatunnel.connectors.seatunnel.influxdb.serialize.DefaultSerializer;
import org.apache.seatunnel.connectors.seatunnel.influxdb.serialize.Serializer;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InfluxDBSinkWriter
extends AbstractSinkWriter<SeaTunnelRow, Void>
implements SupportMultiTableSinkWriter {
    private static final Logger log = LoggerFactory.getLogger(InfluxDBSinkWriter.class);
    private final Serializer serializer;
    private InfluxDB influxdb;
    private final SinkConfig sinkConfig;
    private final List<Point> batchList;
    private volatile Exception flushException;

    public InfluxDBSinkWriter(SinkConfig sinkConfig, SeaTunnelRowType seaTunnelRowType) throws ConnectException {
        this.sinkConfig = sinkConfig;
        log.info("sinkConfig is {}", (Object)JsonUtils.toJsonString((Object)sinkConfig));
        this.serializer = new DefaultSerializer(seaTunnelRowType, sinkConfig.getPrecision().getTimeUnit(), sinkConfig.getKeyTags(), sinkConfig.getKeyTime(), sinkConfig.getMeasurement());
        this.batchList = new ArrayList<Point>();
        this.connect();
    }

    public void write(SeaTunnelRow element) throws IOException {
        Point record = this.serializer.serialize(element);
        this.write(record);
    }

    @Override
    public Optional<Void> prepareCommit() {
        this.flush();
        return super.prepareCommit();
    }

    public void close() throws IOException {
        this.flush();
        if (this.influxdb != null) {
            this.influxdb.close();
            this.influxdb = null;
        }
    }

    public void write(Point record) throws IOException {
        this.checkFlushException();
        this.batchList.add(record);
        if (this.sinkConfig.getBatchSize() > 0 && this.batchList.size() >= this.sinkConfig.getBatchSize()) {
            this.flush();
        }
    }

    public void flush() throws IOException {
        this.checkFlushException();
        if (this.batchList.isEmpty()) {
            return;
        }
        BatchPoints.Builder batchPoints = BatchPoints.database(this.sinkConfig.getDatabase());
        for (int i = 0; i <= this.sinkConfig.getMaxRetries(); ++i) {
            try {
                batchPoints.points(this.batchList);
                this.influxdb.write(batchPoints.build());
                continue;
            }
            catch (Exception e) {
                log.error("Writing records to influxdb failed, retry times = {}", (Object)i, (Object)e);
                if (i >= this.sinkConfig.getMaxRetries()) {
                    throw new InfluxdbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.FLUSH_DATA_FAILED, "Writing records to InfluxDB failed.", e);
                }
                try {
                    long backoff = Math.min(this.sinkConfig.getRetryBackoffMultiplierMs() * i, this.sinkConfig.getMaxRetryBackoffMs());
                    Thread.sleep(backoff);
                    continue;
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    throw new InfluxdbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.FLUSH_DATA_FAILED, "Unable to flush; interrupted while doing another attempt.", e);
                }
            }
        }
        this.batchList.clear();
    }

    private void checkFlushException() {
        if (this.flushException != null) {
            throw new InfluxdbConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.FLUSH_DATA_FAILED, "Writing records to InfluxDB failed.", this.flushException);
        }
    }

    public void connect() throws ConnectException {
        if (this.influxdb == null) {
            this.influxdb = InfluxDBClient.getWriteClient(this.sinkConfig);
            String version = this.influxdb.version();
            if (!this.influxdb.ping().isGood()) {
                throw new InfluxdbConnectorException((SeaTunnelErrorCode)InfluxdbConnectorErrorCode.CONNECT_FAILED, String.format("connect influxdb failed, due to influxdb version info is unknown, the url is: {%s}", this.sinkConfig.getUrl()));
            }
            log.info("connect influxdb successful. sever version :{}.", (Object)version);
        }
    }
}

