/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.file.source.split;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseMultipleTableFileSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.file.source.state.FileSourceState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultipleTableFileSourceSplitEnumerator
implements SourceSplitEnumerator<FileSourceSplit, FileSourceState> {
    private static final Logger log = LoggerFactory.getLogger(MultipleTableFileSourceSplitEnumerator.class);
    private final SourceSplitEnumerator.Context<FileSourceSplit> context;
    private final Set<FileSourceSplit> allSplit;
    private final Set<FileSourceSplit> assignedSplit;
    private final Map<String, List<String>> filePathMap;
    private final AtomicInteger assignCount = new AtomicInteger(0);
    private final Object lock = new Object();

    public MultipleTableFileSourceSplitEnumerator(SourceSplitEnumerator.Context<FileSourceSplit> context, BaseMultipleTableFileSourceConfig multipleTableFileSourceConfig) {
        this.context = context;
        this.filePathMap = multipleTableFileSourceConfig.getFileSourceConfigs().stream().collect(Collectors.toMap(localFileSourceConfig -> localFileSourceConfig.getCatalogTable().getTableId().toTablePath().toString(), BaseFileSourceConfig::getFilePaths));
        this.assignedSplit = new HashSet<FileSourceSplit>();
        this.allSplit = new TreeSet<FileSourceSplit>(Comparator.comparing(FileSourceSplit::splitId));
    }

    public MultipleTableFileSourceSplitEnumerator(SourceSplitEnumerator.Context<FileSourceSplit> context, BaseMultipleTableFileSourceConfig multipleTableFileSourceConfig, FileSourceState fileSourceState) {
        this(context, multipleTableFileSourceConfig);
        this.assignedSplit.addAll(fileSourceState.getAssignedSplit());
    }

    public void open() {
        for (Map.Entry<String, List<String>> filePathEntry : this.filePathMap.entrySet()) {
            String tableId = filePathEntry.getKey();
            List<String> filePaths = filePathEntry.getValue();
            for (String filePath : filePaths) {
                this.allSplit.add(new FileSourceSplit(tableId, filePath));
            }
        }
    }

    public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
        if (CollectionUtils.isEmpty(splits)) {
            return;
        }
        this.allSplit.addAll(splits);
        this.assignSplit(subtaskId);
    }

    public int currentUnassignedSplitSize() {
        return this.allSplit.size() - this.assignedSplit.size();
    }

    public void handleSplitRequest(int subtaskId) {
    }

    public void registerReader(int subtaskId) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public FileSourceState snapshotState(long checkpointId) {
        Object object = this.lock;
        synchronized (object) {
            return new FileSourceState(this.assignedSplit);
        }
    }

    public void notifyCheckpointComplete(long checkpointId) {
    }

    private void assignSplit(int taskId) {
        ArrayList<FileSourceSplit> currentTaskSplits = new ArrayList<FileSourceSplit>();
        if (this.context.currentParallelism() == 1) {
            currentTaskSplits.addAll(this.allSplit);
        } else {
            this.assignCount.set(0);
            for (FileSourceSplit fileSourceSplit : this.allSplit) {
                int splitOwner = MultipleTableFileSourceSplitEnumerator.getSplitOwner(this.assignCount.getAndIncrement(), this.context.currentParallelism());
                if (splitOwner != taskId) continue;
                currentTaskSplits.add(fileSourceSplit);
            }
        }
        this.context.assignSplit(taskId, currentTaskSplits);
        this.assignedSplit.addAll(currentTaskSplits);
        log.info("SubTask {} is assigned to [{}], size {}", new Object[]{taskId, currentTaskSplits.stream().map(FileSourceSplit::splitId).collect(Collectors.joining(",")), currentTaskSplits.size()});
        this.context.signalNoMoreSplits(taskId);
    }

    private static int getSplitOwner(int assignCount, int numReaders) {
        return assignCount % numReaders;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() throws Exception {
        for (int i = 0; i < this.context.currentParallelism(); ++i) {
            log.info("Assigned splits to reader [{}]", (Object)i);
            Object object = this.lock;
            synchronized (object) {
                this.assignSplit(i);
                continue;
            }
        }
    }

    public void close() throws IOException {
    }
}

