/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.mongodb.source.reader.split;

import com.mongodb.MongoException;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCursor;
import java.io.IOException;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
import org.apache.flink.connector.mongodb.common.utils.MongoUtils;
import org.apache.flink.connector.mongodb.source.config.MongoReadOptions;
import org.apache.flink.connector.mongodb.source.reader.MongoSourceReaderContext;
import org.apache.flink.connector.mongodb.source.reader.split.MongoSourceSplitReader;
import org.apache.flink.connector.mongodb.source.split.MongoScanSourceSplit;
import org.apache.flink.connector.mongodb.source.split.MongoSourceSplit;
import org.apache.flink.util.CollectionUtil;
import org.bson.BsonDocument;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class MongoScanSourceSplitReader
implements MongoSourceSplitReader<MongoSourceSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(MongoScanSourceSplitReader.class);
    private final MongoConnectionOptions connectionOptions;
    private final MongoReadOptions readOptions;
    private final MongoSourceReaderContext readerContext;
    @Nullable
    private final List<String> projectedFields;
    private boolean closed = false;
    private boolean finished = false;
    private MongoClient mongoClient;
    private MongoCursor<BsonDocument> currentCursor;
    private MongoScanSourceSplit currentSplit;

    public MongoScanSourceSplitReader(MongoConnectionOptions connectionOptions, MongoReadOptions readOptions, @Nullable List<String> projectedFields, MongoSourceReaderContext readerContext) {
        this.connectionOptions = connectionOptions;
        this.readOptions = readOptions;
        this.projectedFields = projectedFields;
        this.readerContext = readerContext;
    }

    public RecordsWithSplitIds<BsonDocument> fetch() throws IOException {
        if (this.closed) {
            throw new IllegalStateException("Cannot fetch records from a closed split reader");
        }
        RecordsBySplits.Builder builder = new RecordsBySplits.Builder();
        if (this.currentSplit == null) {
            return builder.build();
        }
        if (this.readerContext.isOverLimit()) {
            builder.addFinishedSplit(this.currentSplit.splitId());
            this.currentSplit = null;
            this.finished = true;
            return builder.build();
        }
        this.currentCursor = this.getOrCreateCursor();
        int fetchSize = this.readOptions.getFetchSize();
        try {
            for (int recordNum = 0; recordNum < fetchSize; ++recordNum) {
                if (this.currentCursor.hasNext()) {
                    builder.add((SourceSplit)this.currentSplit, this.currentCursor.next());
                    this.readerContext.getReadCount().incrementAndGet();
                    if (!this.readerContext.isOverLimit()) continue;
                    builder.addFinishedSplit(this.currentSplit.splitId());
                    this.finished = true;
                    break;
                }
                builder.addFinishedSplit(this.currentSplit.splitId());
                this.finished = true;
                break;
            }
            RecordsBySplits recordNum = builder.build();
            return recordNum;
        }
        catch (MongoException e) {
            throw new IOException("Scan records form MongoDB failed", e);
        }
        finally {
            if (this.finished) {
                this.currentSplit = null;
                this.closeCursor();
            }
        }
    }

    public void handleSplitsChanges(SplitsChange<MongoSourceSplit> splitsChanges) {
        LOG.debug("Handle split changes {}", splitsChanges);
        if (!(splitsChanges instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChanges.getClass()));
        }
        MongoSourceSplit sourceSplit = (MongoSourceSplit)splitsChanges.splits().get(0);
        if (!(sourceSplit instanceof MongoScanSourceSplit)) {
            throw new UnsupportedOperationException(String.format("The SourceSplit type of %s is not supported.", sourceSplit.getClass()));
        }
        this.currentSplit = (MongoScanSourceSplit)sourceSplit;
        this.finished = false;
    }

    public void wakeUp() {
        this.closeCursor();
    }

    public void close() {
        if (!this.closed) {
            this.closed = true;
            this.closeCursor();
        }
    }

    private MongoCursor<BsonDocument> getOrCreateCursor() {
        if (this.currentCursor == null) {
            LOG.debug("Opened cursor for partitionId: {}", (Object)this.currentSplit);
            this.mongoClient = MongoClients.create((String)this.connectionOptions.getUri());
            FindIterable findIterable = this.mongoClient.getDatabase(this.connectionOptions.getDatabase()).getCollection(this.connectionOptions.getCollection(), BsonDocument.class).find().min((Bson)this.currentSplit.getMin()).max((Bson)this.currentSplit.getMax()).hint((Bson)this.currentSplit.getHint()).noCursorTimeout(this.readOptions.isNoCursorTimeout());
            if (this.currentSplit.getOffset() > 0) {
                findIterable.skip(this.currentSplit.getOffset());
            }
            if (this.readerContext.isLimitPushedDown()) {
                findIterable.limit(this.readerContext.getLimit());
            }
            if (!CollectionUtil.isNullOrEmpty(this.projectedFields)) {
                findIterable.projection(MongoUtils.project(this.projectedFields));
            }
            this.currentCursor = findIterable.cursor();
        }
        return this.currentCursor;
    }

    private void closeCursor() {
        if (this.currentCursor != null) {
            LOG.debug("Closing cursor for split: {}", (Object)this.currentSplit);
            try {
                this.currentCursor.close();
            }
            finally {
                this.currentCursor = null;
                try {
                    this.mongoClient.close();
                }
                finally {
                    this.mongoClient = null;
                }
            }
        }
    }
}

