/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.influxdb.source;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient;
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SourceConfig;
import org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;
import org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxDBSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxDBSourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.influxdb.source.InfluxdbSourceReader;
import org.apache.seatunnel.connectors.seatunnel.influxdb.state.InfluxDBSourceState;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InfluxDBSource
implements SeaTunnelSource<SeaTunnelRow, InfluxDBSourceSplit, InfluxDBSourceState>,
SupportParallelism,
SupportColumnProjection {
    private static final Logger log = LoggerFactory.getLogger(InfluxDBSource.class);
    private final CatalogTable catalogTable;
    private final SourceConfig sourceConfig;
    private static final String QUERY_LIMIT = " limit 1";

    public InfluxDBSource(CatalogTable catalogTable, SourceConfig sourceConfig) {
        this.catalogTable = catalogTable;
        this.sourceConfig = sourceConfig;
    }

    public String getPluginName() {
        return "InfluxDB";
    }

    public Boundedness getBoundedness() {
        return Boundedness.BOUNDED;
    }

    public SourceReader createReader(SourceReader.Context readerContext) throws Exception {
        List<Integer> columnsIndexList = this.initColumnsIndex(InfluxDBClient.getInfluxDB(this.sourceConfig));
        return new InfluxdbSourceReader(this.sourceConfig, readerContext, this.catalogTable.getSeaTunnelRowType(), columnsIndexList);
    }

    public SourceSplitEnumerator createEnumerator(SourceSplitEnumerator.Context enumeratorContext) throws Exception {
        return new InfluxDBSourceSplitEnumerator((SourceSplitEnumerator.Context<InfluxDBSourceSplit>)enumeratorContext, this.sourceConfig);
    }

    public SourceSplitEnumerator<InfluxDBSourceSplit, InfluxDBSourceState> restoreEnumerator(SourceSplitEnumerator.Context<InfluxDBSourceSplit> enumeratorContext, InfluxDBSourceState checkpointState) throws Exception {
        return new InfluxDBSourceSplitEnumerator(enumeratorContext, checkpointState, this.sourceConfig);
    }

    public List<CatalogTable> getProducedCatalogTables() {
        return Collections.singletonList(this.catalogTable);
    }

    private List<Integer> initColumnsIndex(InfluxDB influxdb) {
        String sql = this.sourceConfig.getSql();
        String query = sql + QUERY_LIMIT;
        int start = InfluxDBSource.containTzFunction(sql.toLowerCase());
        if (start > 0) {
            StringBuilder tmpSql = new StringBuilder(sql);
            tmpSql.insert(start - 1, QUERY_LIMIT).append(" ");
            query = tmpSql.toString();
        }
        try {
            QueryResult queryResult = influxdb.query(new Query(query, this.sourceConfig.getDatabase()));
            List<QueryResult.Series> serieList = queryResult.getResults().get(0).getSeries();
            ArrayList<String> fieldNames = new ArrayList<String>(serieList.get(0).getColumns());
            return Arrays.stream(this.catalogTable.getSeaTunnelRowType().getFieldNames()).map(fieldNames::indexOf).collect(Collectors.toList());
        }
        catch (Exception e) {
            throw new InfluxdbConnectorException(InfluxdbConnectorErrorCode.GET_COLUMN_INDEX_FAILED, "Get column index of query result exception", e);
        }
    }

    private static int containTzFunction(String sql) {
        Pattern pattern = Pattern.compile("tz\\(.*\\)");
        Matcher matcher = pattern.matcher(sql);
        if (matcher.find()) {
            int start = matcher.start();
            return start;
        }
        return -1;
    }
}

