/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.src.impl;

import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class ContinuousFileSplitEnumerator
implements SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint<FileSourceSplit>> {
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileSplitEnumerator.class);
    private final SplitEnumeratorContext<FileSourceSplit> context;
    private final FileSplitAssigner splitAssigner;
    private final FileEnumerator enumerator;
    private final HashSet<Path> pathsAlreadyProcessed;
    private final LinkedHashMap<Integer, String> readersAwaitingSplit;
    private final Path[] paths;
    private final long discoveryInterval;

    public ContinuousFileSplitEnumerator(SplitEnumeratorContext<FileSourceSplit> context, FileEnumerator enumerator, FileSplitAssigner splitAssigner, Path[] paths, Collection<Path> alreadyDiscoveredPaths, long discoveryInterval) {
        Preconditions.checkArgument((discoveryInterval > 0L ? 1 : 0) != 0);
        this.context = (SplitEnumeratorContext)Preconditions.checkNotNull(context);
        this.enumerator = (FileEnumerator)Preconditions.checkNotNull((Object)enumerator);
        this.splitAssigner = (FileSplitAssigner)Preconditions.checkNotNull((Object)splitAssigner);
        this.paths = paths;
        this.discoveryInterval = discoveryInterval;
        this.pathsAlreadyProcessed = new HashSet<Path>(alreadyDiscoveredPaths);
        this.readersAwaitingSplit = new LinkedHashMap();
    }

    public void start() {
        this.context.callAsync(() -> this.enumerator.enumerateSplits(this.paths, 1), this::processDiscoveredSplits, this.discoveryInterval, this.discoveryInterval);
    }

    public void close() throws IOException {
    }

    public void addReader(int subtaskId) {
    }

    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
        this.readersAwaitingSplit.put(subtaskId, requesterHostname);
        this.assignSplits();
    }

    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
        LOG.error("Received unrecognized event: {}", (Object)sourceEvent);
    }

    public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
        LOG.debug("File Source Enumerator adds splits back: {}", splits);
        this.splitAssigner.addSplits(splits);
    }

    public PendingSplitsCheckpoint<FileSourceSplit> snapshotState() throws Exception {
        PendingSplitsCheckpoint<FileSourceSplit> checkpoint = PendingSplitsCheckpoint.fromCollectionSnapshot(this.splitAssigner.remainingSplits(), this.pathsAlreadyProcessed);
        LOG.debug("Source Checkpoint is {}", checkpoint);
        return checkpoint;
    }

    private void processDiscoveredSplits(Collection<FileSourceSplit> splits, Throwable error) {
        if (error != null) {
            LOG.error("Failed to enumerate files", error);
            return;
        }
        Collection newSplits = splits.stream().filter(split -> this.pathsAlreadyProcessed.add(split.path())).collect(Collectors.toList());
        this.splitAssigner.addSplits(newSplits);
        this.assignSplits();
    }

    private void assignSplits() {
        Iterator<Map.Entry<Integer, String>> awaitingReader = this.readersAwaitingSplit.entrySet().iterator();
        while (awaitingReader.hasNext()) {
            Map.Entry<Integer, String> nextAwaiting = awaitingReader.next();
            if (!this.context.registeredReaders().containsKey(nextAwaiting.getKey())) {
                awaitingReader.remove();
                continue;
            }
            String hostname = nextAwaiting.getValue();
            int awaitingSubtask = nextAwaiting.getKey();
            Optional<FileSourceSplit> nextSplit = this.splitAssigner.getNext(hostname);
            if (!nextSplit.isPresent()) break;
            this.context.assignSplit((SourceSplit)nextSplit.get(), awaitingSubtask);
            awaitingReader.remove();
        }
    }
}

