/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.databend.source;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Properties;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.databend.exception.DatabendConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.databend.exception.DatabendConnectorException;
import org.apache.seatunnel.connectors.seatunnel.databend.util.DatabendUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DatabendSourceReader
extends AbstractSingleSplitReader<SeaTunnelRow> {
    private static final Logger log = LoggerFactory.getLogger(DatabendSourceReader.class);
    private final DatabendSourceConfig sourceConfig;
    private final String sql;
    private SeaTunnelRowType rowType;
    private final SingleSplitReaderContext readerContext;
    private Connection connection;
    private PreparedStatement statement;
    private ResultSet resultSet;
    private boolean hasNext;
    private SeaTunnelRow firstRow = null;
    private boolean reachEnd;

    public DatabendSourceReader(SingleSplitReaderContext context, DatabendSourceConfig sourceConfig, String sql, SeaTunnelRowType rowType) {
        this.readerContext = context;
        this.sourceConfig = sourceConfig;
        this.sql = sql;
        this.rowType = rowType;
        log.info("DatabendSourceReader constructor - rowType: {}", (Object)rowType);
    }

    public void open() throws Exception {
        log.info("Starting to open DatabendSourceReader");
        try {
            log.info("Loading Databend JDBC driver");
            Class.forName("com.databend.jdbc.DatabendDriver");
            log.info("Connecting to Databend with URL: {}", (Object)this.sourceConfig.getUrl());
            Properties properties = this.sourceConfig.getProperties();
            this.connection = DriverManager.getConnection(this.sourceConfig.getUrl(), properties);
            log.info("Connection to Databend established successfully");
            log.info("Preparing SQL statement: {}", (Object)this.sql);
            this.statement = this.connection.prepareStatement(this.sql);
            Integer fetchSize = this.sourceConfig.getFetchSize();
            if (fetchSize != null && fetchSize > 0) {
                log.info("Setting fetch size to: {}", (Object)fetchSize);
                this.statement.setFetchSize(fetchSize);
                this.statement.setFetchDirection(1000);
            } else {
                log.info("Using default fetch size");
            }
            log.info("Executing query");
            this.resultSet = this.statement.executeQuery();
            log.info("Query executed successfully");
            if (this.rowType == null || this.rowType.getFieldNames().length == 0) {
                log.info("Row type is null or empty, inferring from ResultSet metadata");
                this.rowType = this.inferRowTypeFromResultSet(this.resultSet.getMetaData());
                log.info("Inferred row type: {}", (Object)this.rowType);
            } else {
                log.info("Using provided row type: {}", (Object)this.rowType);
            }
            this.hasNext = this.resultSet.next();
            log.info("Initial resultSet.next() returned: {}", (Object)this.hasNext);
            if (!this.hasNext) {
                log.info("No data found in result set");
                this.reachEnd = true;
            }
        }
        catch (Exception e) {
            log.error("Error while opening Databend source reader", e);
            throw new DatabendConnectorException(DatabendConnectorErrorCode.CONNECT_FAILED, "Failed to open Databend source reader: " + e.getMessage(), e);
        }
        log.info("DatabendSourceReader opened successfully");
    }

    public SeaTunnelRowType getRowType() {
        return this.rowType;
    }

    @Override
    public void internalPollNext(Collector<SeaTunnelRow> output) throws Exception {
        if (this.reachEnd) {
            return;
        }
        log.info("Starting to poll data from Databend");
        int rowCount = 0;
        try {
            while (this.hasNext) {
                SeaTunnelRow row = DatabendUtil.convertToSeaTunnelRow(this.resultSet, this.rowType);
                log.info("Converting ResultSet to SeaTunnelRow: {}", (Object)row);
                output.collect((Object)row);
                log.info("Collected row {}: {}", (Object)(++rowCount), (Object)row);
                this.hasNext = this.resultSet.next();
                if (this.hasNext) continue;
                log.info("Reached end of ResultSet after reading {} rows", (Object)rowCount);
                this.reachEnd = true;
                this.readerContext.signalNoMoreElement();
                break;
            }
        }
        catch (Exception e) {
            log.error("Error while polling data from Databend", e);
            throw e;
        }
        log.info("Finished polling data from Databend, total rows: {}", (Object)rowCount);
    }

    private SeaTunnelRowType inferRowTypeFromResultSet(ResultSetMetaData metaData) throws SQLException {
        int columnCount = metaData.getColumnCount();
        String[] fieldNames = new String[columnCount];
        SeaTunnelDataType[] fieldTypes = new SeaTunnelDataType[columnCount];
        for (int i = 0; i < columnCount; ++i) {
            int columnIndex = i + 1;
            fieldNames[i] = metaData.getColumnLabel(columnIndex);
            fieldTypes[i] = this.convertDatabendTypeToSeaTunnelType(metaData.getColumnType(columnIndex), metaData.getColumnTypeName(columnIndex), metaData.getPrecision(columnIndex), metaData.getScale(columnIndex));
        }
        return new SeaTunnelRowType(fieldNames, fieldTypes);
    }

    private SeaTunnelDataType<?> convertDatabendTypeToSeaTunnelType(int sqlType, String typeName, int precision, int scale) {
        if (typeName != null) {
            if ((typeName = typeName.toUpperCase()).contains("VARCHAR") || typeName.contains("STRING") || typeName.contains("TEXT") || typeName.contains("CHAR")) {
                return BasicType.STRING_TYPE;
            }
            if (typeName.contains("BOOLEAN") || typeName.equals("BOOL")) {
                return BasicType.BOOLEAN_TYPE;
            }
            if (typeName.equals("TINYINT") || typeName.equals("UINT8") || typeName.equals("INT8")) {
                return BasicType.BYTE_TYPE;
            }
            if (typeName.equals("SMALLINT") || typeName.equals("UINT16") || typeName.equals("INT16")) {
                return BasicType.SHORT_TYPE;
            }
            if (typeName.equals("INT") || typeName.equals("INTEGER") || typeName.equals("UINT32") || typeName.equals("INT32")) {
                return BasicType.INT_TYPE;
            }
            if (typeName.equals("BIGINT") || typeName.equals("UINT64") || typeName.equals("INT64")) {
                return BasicType.LONG_TYPE;
            }
            if (typeName.equals("FLOAT") || typeName.contains("FLOAT32")) {
                return BasicType.FLOAT_TYPE;
            }
            if (typeName.equals("DOUBLE") || typeName.contains("FLOAT64")) {
                return BasicType.DOUBLE_TYPE;
            }
            if (typeName.contains("DECIMAL")) {
                return new DecimalType(precision, scale);
            }
            if (typeName.equals("DATE")) {
                return LocalTimeType.LOCAL_DATE_TYPE;
            }
            if (typeName.equals("TIMESTAMP") || typeName.equals("DATETIME")) {
                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
            }
            if (typeName.contains("BINARY") || typeName.contains("BLOB")) {
                return PrimitiveByteArrayType.INSTANCE;
            }
        }
        switch (sqlType) {
            case -16: 
            case -15: 
            case -9: 
            case -1: 
            case 1: 
            case 12: {
                return BasicType.STRING_TYPE;
            }
            case -6: {
                return BasicType.BYTE_TYPE;
            }
            case 5: {
                return BasicType.SHORT_TYPE;
            }
            case 4: {
                return BasicType.INT_TYPE;
            }
            case -5: {
                return BasicType.LONG_TYPE;
            }
            case 6: 
            case 7: {
                return BasicType.FLOAT_TYPE;
            }
            case 8: {
                return BasicType.DOUBLE_TYPE;
            }
            case -7: 
            case 16: {
                return BasicType.BOOLEAN_TYPE;
            }
            case 2: 
            case 3: {
                return new DecimalType(precision > 0 ? precision : 38, scale >= 0 ? scale : 18);
            }
            case 91: {
                return LocalTimeType.LOCAL_DATE_TYPE;
            }
            case 92: {
                return LocalTimeType.LOCAL_TIME_TYPE;
            }
            case 93: {
                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
            }
            case -4: 
            case -3: 
            case -2: 
            case 2004: {
                return PrimitiveByteArrayType.INSTANCE;
            }
        }
        log.warn("Unsupported SQL type: {}, type name: {}, using STRING_TYPE as fallback", (Object)sqlType, (Object)typeName);
        return BasicType.STRING_TYPE;
    }

    public void close() throws IOException {
        try {
            if (this.resultSet != null) {
                this.resultSet.close();
            }
            if (this.statement != null) {
                this.statement.close();
            }
            if (this.connection != null) {
                this.connection.close();
            }
        }
        catch (SQLException e) {
            throw new IOException("Error while closing Databend source reader", e);
        }
    }
}

