/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.milvus.source;

import com.google.protobuf.ProtocolStringList;
import io.milvus.client.MilvusClient;
import io.milvus.client.MilvusServiceClient;
import io.milvus.grpc.DescribeCollectionResponse;
import io.milvus.grpc.FieldSchema;
import io.milvus.grpc.ShowPartitionsResponse;
import io.milvus.param.ConnectParam;
import io.milvus.param.R;
import io.milvus.param.collection.DescribeCollectionParam;
import io.milvus.param.partition.ShowPartitionsParam;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.milvus.config.MilvusSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectionErrorCode;
import org.apache.seatunnel.connectors.seatunnel.milvus.exception.MilvusConnectorException;
import org.apache.seatunnel.connectors.seatunnel.milvus.source.MilvusSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.milvus.source.MilvusSourceState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MilvusSourceSplitEnumerator
implements SourceSplitEnumerator<MilvusSourceSplit, MilvusSourceState> {
    private static final Logger log = LoggerFactory.getLogger(MilvusSourceSplitEnumerator.class);
    private final Map<TablePath, CatalogTable> tables;
    private final SourceSplitEnumerator.Context<MilvusSourceSplit> context;
    private final ConcurrentLinkedQueue<TablePath> pendingTables;
    private final Map<Integer, List<MilvusSourceSplit>> pendingSplits;
    private final Object stateLock = new Object();
    private MilvusClient client = null;
    private final ReadonlyConfig config;

    public MilvusSourceSplitEnumerator(SourceSplitEnumerator.Context<MilvusSourceSplit> context, ReadonlyConfig config, Map<TablePath, CatalogTable> sourceTables, MilvusSourceState sourceState) {
        this.context = context;
        this.tables = sourceTables;
        this.config = config;
        if (sourceState == null) {
            this.pendingTables = new ConcurrentLinkedQueue<TablePath>(this.tables.keySet());
            this.pendingSplits = new HashMap<Integer, List<MilvusSourceSplit>>();
        } else {
            this.pendingTables = new ConcurrentLinkedQueue<TablePath>(sourceState.getPendingTables());
            this.pendingSplits = new HashMap<Integer, List<MilvusSourceSplit>>(sourceState.getPendingSplits());
        }
    }

    public void open() {
        ConnectParam connectParam = ConnectParam.newBuilder().withUri((String)this.config.get(MilvusSourceOptions.URL)).withToken((String)this.config.get(MilvusSourceOptions.TOKEN)).build();
        this.client = new MilvusServiceClient(connectParam);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() throws Exception {
        log.info("Starting milvus split enumerator.");
        Set readers = this.context.registeredReaders();
        while (!this.pendingTables.isEmpty()) {
            Object object = this.stateLock;
            synchronized (object) {
                TablePath tablePath = this.pendingTables.poll();
                log.info("begin to split table path: {}", (Object)tablePath);
                Collection<MilvusSourceSplit> splits = this.generateSplits(this.tables.get(tablePath));
                log.info("end to split table {} into {} splits.", (Object)tablePath, (Object)splits.size());
                this.addPendingSplit(splits);
            }
            object = this.stateLock;
            synchronized (object) {
                this.assignSplit(readers);
            }
        }
        log.info("No more splits to assign. Sending NoMoreSplitsEvent to reader {}.", (Object)readers);
        readers.forEach(arg_0 -> this.context.signalNoMoreSplits(arg_0));
    }

    private Collection<MilvusSourceSplit> generateSplits(CatalogTable table) {
        log.info("Start splitting table {} into chunks by partition...", (Object)table.getTablePath());
        String database = table.getTablePath().getDatabaseName();
        String collection = table.getTablePath().getTableName();
        R<DescribeCollectionResponse> describeCollectionResponseR = this.client.describeCollection(DescribeCollectionParam.newBuilder().withDatabaseName(database).withCollectionName(collection).build());
        boolean hasPartitionKey = describeCollectionResponseR.getData().getSchema().getFieldsList().stream().anyMatch(FieldSchema::getIsPartitionKey);
        ArrayList<MilvusSourceSplit> milvusSourceSplits = new ArrayList<MilvusSourceSplit>();
        if (!hasPartitionKey) {
            ShowPartitionsParam showPartitionsParam = ShowPartitionsParam.newBuilder().withDatabaseName(database).withCollectionName(collection).build();
            R<ShowPartitionsResponse> showPartitionsResponseR = this.client.showPartitions(showPartitionsParam);
            if (showPartitionsResponseR.getStatus().intValue() != R.Status.Success.getCode()) {
                throw new MilvusConnectorException((SeaTunnelErrorCode)MilvusConnectionErrorCode.LIST_PARTITIONS_FAILED, "Failed to show partitions: " + showPartitionsResponseR.getMessage());
            }
            ProtocolStringList partitionList = showPartitionsResponseR.getData().getPartitionNamesList();
            for (String partitionName : partitionList) {
                MilvusSourceSplit milvusSourceSplit = MilvusSourceSplit.builder().tablePath(table.getTablePath()).splitId(this.createSplitId(table.getTablePath(), partitionName)).partitionName(partitionName).build();
                log.info("Generated split: {}", (Object)milvusSourceSplit);
                milvusSourceSplits.add(milvusSourceSplit);
            }
        } else {
            MilvusSourceSplit milvusSourceSplit = MilvusSourceSplit.builder().tablePath(table.getTablePath()).splitId(this.createSplitId(table.getTablePath(), "0")).build();
            log.info("Generated split: {}", (Object)milvusSourceSplit);
            milvusSourceSplits.add(milvusSourceSplit);
        }
        return milvusSourceSplits;
    }

    protected String createSplitId(TablePath tablePath, String index) {
        return String.format("%s-%s", tablePath, index);
    }

    private void addPendingSplit(Collection<MilvusSourceSplit> splits) {
        int readerCount = this.context.currentParallelism();
        for (MilvusSourceSplit split : splits) {
            int ownerReader = MilvusSourceSplitEnumerator.getSplitOwner(split.splitId(), readerCount);
            log.info("Assigning {} to {} reader.", (Object)split, (Object)ownerReader);
            this.pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList()).add(split);
        }
    }

    private static int getSplitOwner(String tp, int numReaders) {
        return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
    }

    private void assignSplit(Collection<Integer> readers) {
        log.info("Assign pendingSplits to readers {}", readers);
        for (int reader : readers) {
            List<MilvusSourceSplit> assignmentForReader = this.pendingSplits.remove(reader);
            if (assignmentForReader == null || assignmentForReader.isEmpty()) continue;
            log.debug("Assign splits {} to reader {}", assignmentForReader, (Object)reader);
            this.context.assignSplit(reader, assignmentForReader);
        }
    }

    public void close() throws IOException {
        if (this.client != null) {
            this.client.close();
        }
    }

    public void addSplitsBack(List<MilvusSourceSplit> splits, int subtaskId) {
        if (!splits.isEmpty()) {
            this.addPendingSplit(splits, subtaskId);
            if (this.context.registeredReaders().contains(subtaskId)) {
                this.assignSplit(Collections.singletonList(subtaskId));
            } else {
                log.warn("Reader {} is not registered. Pending splits {} are not assigned.", (Object)subtaskId, splits);
            }
        }
        log.info("Add back splits {} to JdbcSourceSplitEnumerator.", (Object)splits.size());
    }

    private void addPendingSplit(Collection<MilvusSourceSplit> splits, int ownerReader) {
        this.pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList()).addAll(splits);
    }

    public int currentUnassignedSplitSize() {
        return this.pendingTables.isEmpty() && this.pendingSplits.isEmpty() ? 0 : 1;
    }

    public void handleSplitRequest(int subtaskId) {
        throw new MilvusConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, String.format("Unsupported handleSplitRequest: %d", subtaskId));
    }

    public void registerReader(int subtaskId) {
        log.info("Register reader {} to MilvusSourceSplitEnumerator.", (Object)subtaskId);
        if (!this.pendingSplits.isEmpty()) {
            this.assignSplit(Collections.singletonList(subtaskId));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public MilvusSourceState snapshotState(long checkpointId) throws Exception {
        Object object = this.stateLock;
        synchronized (object) {
            return new MilvusSourceState(new ArrayList<TablePath>(this.pendingTables), new HashMap<Integer, List<MilvusSourceSplit>>(this.pendingSplits));
        }
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
    }
}

