/*
 * Decompiled with CFR 0.152.
 */
package com.taosdata.jdbc.ws;

import com.taosdata.jdbc.AbstractResultSet;
import com.taosdata.jdbc.BlockData;
import com.taosdata.jdbc.TSDBError;
import com.taosdata.jdbc.enums.DataType;
import com.taosdata.jdbc.rs.RestfulResultSet;
import com.taosdata.jdbc.rs.RestfulResultSetMetaData;
import com.taosdata.jdbc.ws.Transport;
import com.taosdata.jdbc.ws.entity.Action;
import com.taosdata.jdbc.ws.entity.Code;
import com.taosdata.jdbc.ws.entity.FetchBlockNewResp;
import com.taosdata.jdbc.ws.entity.FetchReq;
import com.taosdata.jdbc.ws.entity.QueryResp;
import com.taosdata.jdbc.ws.entity.Request;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractWSResultSet
extends AbstractResultSet {
    private final Logger log = LoggerFactory.getLogger(Transport.class);
    protected final Statement statement;
    protected final Transport transport;
    protected final long queryId;
    protected final long reqId;
    protected volatile boolean isClosed;
    private boolean isCompleted = false;
    protected final ResultSetMetaData metaData;
    protected final List<RestfulResultSet.Field> fields = new ArrayList<RestfulResultSet.Field>();
    protected final List<String> columnNames;
    protected List<List<Object>> result = new ArrayList<List<Object>>();
    protected int numOfRows = 0;
    protected int rowIndex = 0;
    private static final int CACHE_SIZE = 5;
    BlockingQueue<BlockData> blockingQueueOut = new LinkedBlockingQueue<BlockData>(5);
    ThreadPoolExecutor backFetchExecutor;
    ForkJoinPool dataHandleExecutor = this.getForkJoinPool();
    private int fetchBlockNum = 0;
    private final int START_BACKEND_FETCH_BLOCK_NUM = 3;

    protected AbstractWSResultSet(Statement statement, Transport transport, QueryResp response, String database) throws SQLException {
        this.statement = statement;
        this.transport = transport;
        this.queryId = response.getId();
        this.reqId = response.getReqId();
        this.columnNames = Arrays.asList(response.getFieldsNames());
        for (int i = 0; i < response.getFieldsCount(); ++i) {
            String colName = response.getFieldsNames()[i];
            int taosType = response.getFieldsTypes()[i];
            int jdbcType = DataType.convertTaosType2DataType(taosType).getJdbcTypeValue();
            int length = response.getFieldsLengths()[i];
            this.fields.add(new RestfulResultSet.Field(colName, jdbcType, length, "", taosType));
        }
        this.metaData = new RestfulResultSetMetaData(database, this.fields);
        this.timestampPrecision = response.getPrecision();
    }

    private void startBackendFetch() {
        this.backFetchExecutor = (ThreadPoolExecutor)Executors.newFixedThreadPool(1);
        this.backFetchExecutor.submit(() -> {
            try {
                while (!this.isClosed) {
                    BlockData blockData = BlockData.getEmptyBlockData(this.fields, this.timestampPrecision);
                    byte[] version = new byte[]{1, 0};
                    FetchBlockNewResp resp = (FetchBlockNewResp)this.transport.send(Action.FETCH_BLOCK_NEW.getAction(), this.reqId, this.queryId, 7L, version);
                    resp.init();
                    if (Code.SUCCESS.getCode() != resp.getCode()) {
                        blockData.setReturnCode(resp.getCode());
                        this.blockingQueueOut.put(blockData);
                        break;
                    }
                    if (resp.isCompleted() || this.isClosed) {
                        blockData.setCompleted(true);
                        this.blockingQueueOut.put(blockData);
                        break;
                    }
                    blockData.setBuffer(resp.getBuffer());
                    this.blockingQueueOut.put(blockData);
                    this.dataHandleExecutor.submit(blockData::handleData);
                }
            }
            catch (InterruptedException ignored) {
                Thread.currentThread().interrupt();
            }
            catch (Exception e) {
                this.log.error("fetch block error", (Throwable)e);
                BlockData blockData = BlockData.getEmptyBlockData(this.fields, this.timestampPrecision);
                while (!this.isClosed) {
                    try {
                        if (!this.blockingQueueOut.offer(blockData, 10L, TimeUnit.MILLISECONDS)) continue;
                        break;
                    }
                    catch (InterruptedException ignored) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            }
        });
    }

    private boolean forward() {
        if (this.rowIndex > this.numOfRows) {
            return false;
        }
        return ++this.rowIndex < this.numOfRows;
    }

    public void reset() {
        this.rowIndex = 0;
    }

    @Override
    public boolean next() throws SQLException {
        if (this.isClosed()) {
            throw TSDBError.createSQLException(8965);
        }
        if (this.forward()) {
            return true;
        }
        ++this.fetchBlockNum;
        if (this.fetchBlockNum > 3) {
            BlockData blockData;
            if (this.backFetchExecutor == null) {
                this.startBackendFetch();
            }
            try {
                blockData = this.blockingQueueOut.take();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw TSDBError.createSQLException(9040, "FETCH DATA INTERRUPTED");
            }
            if (blockData.getReturnCode() != Code.SUCCESS.getCode()) {
                throw TSDBError.createSQLException(blockData.getReturnCode(), "FETCH DATA ERROR");
            }
            this.reset();
            if (blockData.isCompleted()) {
                this.isCompleted = true;
                return false;
            }
            blockData.waitTillOK();
            this.result = blockData.getData();
            this.numOfRows = blockData.getNumOfRows();
        } else {
            byte[] version = new byte[]{1, 0};
            FetchBlockNewResp resp = (FetchBlockNewResp)this.transport.send(Action.FETCH_BLOCK_NEW.getAction(), this.reqId, this.queryId, 7L, version);
            resp.init();
            if (Code.SUCCESS.getCode() != resp.getCode()) {
                throw TSDBError.createSQLException(resp.getCode(), "FETCH DATA ERROR");
            }
            this.reset();
            BlockData blockData = BlockData.getEmptyBlockData(this.fields, this.timestampPrecision);
            if (resp.isCompleted() || this.isClosed) {
                blockData.setCompleted(true);
                return false;
            }
            blockData.setBuffer(resp.getBuffer());
            blockData.handleData();
            this.result = blockData.getData();
            this.numOfRows = blockData.getNumOfRows();
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws SQLException {
        AbstractWSResultSet abstractWSResultSet = this;
        synchronized (abstractWSResultSet) {
            if (!this.isClosed) {
                this.isClosed = true;
                if (this.backFetchExecutor != null) {
                    while (this.backFetchExecutor.getActiveCount() != 0) {
                        try {
                            Thread.sleep(1L);
                        }
                        catch (InterruptedException ignored) {
                            Thread.currentThread().interrupt();
                        }
                    }
                    if (!this.backFetchExecutor.isShutdown()) {
                        this.backFetchExecutor.shutdown();
                    }
                }
                if (!this.isCompleted) {
                    FetchReq closeReq = new FetchReq();
                    closeReq.setReqId(this.queryId);
                    closeReq.setId(this.queryId);
                    this.transport.sendWithoutResponse(new Request(Action.FREE_RESULT.getAction(), closeReq));
                }
            }
        }
    }

    @Override
    public ResultSetMetaData getMetaData() throws SQLException {
        if (this.isClosed()) {
            throw TSDBError.createSQLException(8965);
        }
        return this.metaData;
    }

    @Override
    public boolean isClosed() throws SQLException {
        return this.isClosed;
    }
}

