/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.milvus.source;

import io.milvus.client.MilvusServiceClient;
import io.milvus.grpc.GetLoadStateResponse;
import io.milvus.grpc.LoadState;
import io.milvus.grpc.QueryResults;
import io.milvus.orm.iterator.QueryIterator;
import io.milvus.param.ConnectParam;
import io.milvus.param.R;
import io.milvus.param.RpcStatus;
import io.milvus.param.collection.AlterCollectionParam;
import io.milvus.param.collection.GetLoadStateParam;
import io.milvus.param.dml.QueryIteratorParam;
import io.milvus.param.dml.QueryParam;
import io.milvus.response.QueryResultsWrapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.CommonOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode;
import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException;
import org.apache.seatunnel.connectors.seatunnel.milvus.source.MilvusSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.milvus.utils.source.MilvusSourceConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MilvusSourceReader
implements SourceReader<SeaTunnelRow, MilvusSourceSplit> {
    private static final Logger log = LoggerFactory.getLogger(MilvusSourceReader.class);
    private final Deque<MilvusSourceSplit> pendingSplits = new ConcurrentLinkedDeque<MilvusSourceSplit>();
    private final ReadonlyConfig config;
    private final SourceReader.Context context;
    private final Map<TablePath, CatalogTable> sourceTables;
    private MilvusServiceClient client;
    private volatile boolean noMoreSplit;

    public MilvusSourceReader(SourceReader.Context readerContext, ReadonlyConfig config, Map<TablePath, CatalogTable> sourceTables) {
        this.context = readerContext;
        this.config = config;
        this.sourceTables = sourceTables;
    }

    public void open() throws Exception {
        this.client = new MilvusServiceClient(ConnectParam.newBuilder().withUri((String)this.config.get(MilvusSourceOptions.URL)).withToken((String)this.config.get(MilvusSourceOptions.TOKEN)).build());
        this.setRateLimit(((Integer)this.config.get(MilvusSourceOptions.RATE_LIMIT)).toString());
    }

    private void setRateLimit(String rateLimit) {
        log.info("Set rate limit: " + rateLimit);
        for (Map.Entry<TablePath, CatalogTable> entry : this.sourceTables.entrySet()) {
            TablePath tablePath = entry.getKey();
            String collectionName = tablePath.getTableName();
            AlterCollectionParam alterCollectionParam = AlterCollectionParam.newBuilder().withDatabaseName(tablePath.getDatabaseName()).withCollectionName(collectionName).withProperty("collection.queryRate.max.qps", rateLimit).build();
            R<RpcStatus> response = this.client.alterCollection(alterCollectionParam);
            if (response.getStatus().intValue() == R.Status.Success.getCode()) continue;
            throw new MilvusConnectorException((SeaTunnelErrorCode)MilvusConnectionErrorCode.SERVER_RESPONSE_FAILED, response.getException());
        }
        log.info("Set rate limit success");
    }

    public void close() throws IOException {
        log.info("Close milvus source reader");
        this.setRateLimit("-1");
        this.client.close();
        log.info("Close milvus source reader success");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
        Object object = output.getCheckpointLock();
        synchronized (object) {
            MilvusSourceSplit split = this.pendingSplits.poll();
            if (null != split) {
                try {
                    log.info("Begin to read data from split: " + split);
                    this.pollNextData(split, output);
                }
                catch (Exception e) {
                    log.error("Read data from split: " + split + " failed", (Throwable)e);
                    throw new MilvusConnectorException((SeaTunnelErrorCode)MilvusConnectionErrorCode.READ_DATA_FAIL, e);
                }
            } else if (!this.noMoreSplit) {
                log.info("Milvus source wait split!");
            }
        }
        if (this.noMoreSplit && this.pendingSplits.isEmpty() && Boundedness.BOUNDED.equals((Object)this.context.getBoundedness())) {
            log.info("Closed the bounded milvus source");
            this.context.signalNoMoreElement();
        }
        Thread.sleep(1000L);
    }

    private void pollNextData(MilvusSourceSplit split, Collector<SeaTunnelRow> output) throws InterruptedException {
        R<QueryResults> queryResultsR;
        R<GetLoadStateResponse> loadStateResponse;
        TablePath tablePath = split.getTablePath();
        String partitionName = split.getPartitionName();
        TableSchema tableSchema = this.sourceTables.get(tablePath).getTableSchema();
        log.info("begin to read data from milvus, table schema: " + tableSchema);
        if (null == tableSchema) {
            throw new MilvusConnectorException(MilvusConnectionErrorCode.SOURCE_TABLE_SCHEMA_IS_NULL);
        }
        GetLoadStateParam.Builder loadStateParam = GetLoadStateParam.newBuilder().withDatabaseName(tablePath.getDatabaseName()).withCollectionName(tablePath.getTableName());
        if (StringUtils.isNotEmpty(partitionName)) {
            loadStateParam.withPartitionNames(Collections.singletonList(partitionName));
        }
        if ((loadStateResponse = this.client.getLoadState(loadStateParam.build())).getStatus().intValue() != R.Status.Success.getCode()) {
            throw new MilvusConnectorException((SeaTunnelErrorCode)MilvusConnectionErrorCode.SERVER_RESPONSE_FAILED, loadStateResponse.getException());
        }
        if (!LoadState.LoadStateLoaded.equals(loadStateResponse.getData().getState())) {
            throw new MilvusConnectorException(MilvusConnectionErrorCode.COLLECTION_NOT_LOADED);
        }
        QueryParam.Builder queryParam = QueryParam.newBuilder().withDatabaseName(tablePath.getDatabaseName()).withCollectionName(tablePath.getTableName()).withExpr("").withOutFields(Arrays.asList("count(*)"));
        if (StringUtils.isNotEmpty(partitionName)) {
            queryParam.withPartitionNames(Collections.singletonList(partitionName));
        }
        if ((queryResultsR = this.client.query(queryParam.build())).getStatus().intValue() != R.Status.Success.getCode()) {
            throw new MilvusConnectorException((SeaTunnelErrorCode)MilvusConnectionErrorCode.SERVER_RESPONSE_FAILED, loadStateResponse.getException());
        }
        QueryResultsWrapper wrapper = new QueryResultsWrapper(queryResultsR.getData());
        List<QueryResultsWrapper.RowRecord> records = wrapper.getRowRecords();
        log.info("Total records num: " + records.get(0).getFieldValues().get("count(*)"));
        long batchSize = ((Integer)this.config.get(MilvusSourceOptions.BATCH_SIZE)).intValue();
        this.queryIteratorData(tablePath, partitionName, tableSchema, output, batchSize);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void queryIteratorData(TablePath tablePath, String partitionName, TableSchema tableSchema, Collector<SeaTunnelRow> output, long batchSize) throws InterruptedException {
        try {
            R<QueryIterator> response;
            MilvusSourceConverter sourceConverter = new MilvusSourceConverter(tableSchema);
            QueryIteratorParam.Builder param = QueryIteratorParam.newBuilder().withDatabaseName(tablePath.getDatabaseName()).withCollectionName(tablePath.getTableName()).withOutFields(Arrays.asList("*")).withBatchSize(batchSize);
            if (StringUtils.isNotEmpty(partitionName)) {
                param.withPartitionNames(Collections.singletonList(partitionName));
            }
            if ((response = this.client.queryIterator(param.build())).getStatus().intValue() != R.Status.Success.getCode()) {
                throw new MilvusConnectorException((SeaTunnelErrorCode)MilvusConnectionErrorCode.SERVER_RESPONSE_FAILED, response.getException());
            }
            int maxFailRetry = 3;
            QueryIterator iterator2 = response.getData();
            while (maxFailRetry > 0) {
                try {
                    List<QueryResultsWrapper.RowRecord> next = iterator2.next();
                    if (next == null || next.isEmpty()) return;
                    for (QueryResultsWrapper.RowRecord record : next) {
                        SeaTunnelRow seaTunnelRow = sourceConverter.convertToSeaTunnelRow(record, tableSchema, tablePath);
                        if (StringUtils.isNotEmpty(partitionName)) {
                            HashMap<String, String> options = new HashMap<String, String>();
                            options.put(CommonOptions.PARTITION.getName(), partitionName);
                            seaTunnelRow.setOptions(options);
                        }
                        output.collect((Object)seaTunnelRow);
                    }
                }
                catch (Exception e) {
                    if (!e.getMessage().contains("rate limit exceeded")) throw new MilvusConnectorException((SeaTunnelErrorCode)MilvusConnectionErrorCode.READ_DATA_FAIL, e);
                    if (--maxFailRetry == 0) {
                        log.error("Iterate next data from milvus failed, batchSize = {}, throw exception", (Object)batchSize, (Object)e);
                        throw new MilvusConnectorException((SeaTunnelErrorCode)MilvusConnectionErrorCode.READ_DATA_FAIL, e);
                    }
                    log.error("Iterate next data from milvus failed, batchSize = {}, will retry after 30 s, maxRetry: {}", new Object[]{batchSize, maxFailRetry, e});
                    Thread.sleep(30000L);
                    continue;
                    return;
                }
            }
        }
        catch (Exception e) {
            if (!e.getMessage().contains("rate limit exceeded") || batchSize <= 10L) throw new MilvusConnectorException((SeaTunnelErrorCode)MilvusConnectionErrorCode.READ_DATA_FAIL, e);
            log.error("Query Iterate data from milvus failed, retry from beginning with smaller batch size: {} after 30 s", (Object)(batchSize / 2L), (Object)e);
            Thread.sleep(30000L);
            this.queryIteratorData(tablePath, partitionName, tableSchema, output, batchSize / 2L);
            return;
        }
    }

    public List<MilvusSourceSplit> snapshotState(long checkpointId) throws Exception {
        return new ArrayList<MilvusSourceSplit>(this.pendingSplits);
    }

    public void addSplits(List<MilvusSourceSplit> splits) {
        log.info("Adding milvus splits to reader: " + splits);
        this.pendingSplits.addAll(splits);
    }

    public void handleNoMoreSplits() {
        log.info("receive no more splits message, this milvus reader will not add new split.");
        this.noMoreSplit = true;
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
    }
}

