/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.tdengine.source;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.tdengine.config.TDengineSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.tdengine.exception.TDengineConnectorException;
import org.apache.seatunnel.connectors.seatunnel.tdengine.source.StableMetadata;
import org.apache.seatunnel.connectors.seatunnel.tdengine.source.TDengineSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.tdengine.state.TDengineSourceState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TDengineSourceSplitEnumerator
implements SourceSplitEnumerator<TDengineSourceSplit, TDengineSourceState> {
    private static final Logger log = LoggerFactory.getLogger(TDengineSourceSplitEnumerator.class);
    private final SourceSplitEnumerator.Context<TDengineSourceSplit> context;
    private final TDengineSourceConfig config;
    private final StableMetadata stableMetadata;
    private volatile boolean shouldEnumerate;
    private final Object stateLock = new Object();
    private final Map<Integer, List<TDengineSourceSplit>> pendingSplits = new ConcurrentHashMap<Integer, List<TDengineSourceSplit>>();

    public TDengineSourceSplitEnumerator(StableMetadata stableMetadata, TDengineSourceConfig config, SourceSplitEnumerator.Context<TDengineSourceSplit> context) {
        this(stableMetadata, config, null, context);
    }

    public TDengineSourceSplitEnumerator(StableMetadata stableMetadata, TDengineSourceConfig config, TDengineSourceState sourceState, SourceSplitEnumerator.Context<TDengineSourceSplit> context) {
        this.config = config;
        this.context = context;
        this.stableMetadata = stableMetadata;
        boolean bl = this.shouldEnumerate = sourceState == null;
        if (sourceState != null) {
            this.shouldEnumerate = sourceState.isShouldEnumerate();
            this.pendingSplits.putAll(sourceState.getPendingSplits());
        }
    }

    private static int getSplitOwner(String tp, int numReaders) {
        return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
    }

    public void open() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        Set readers = this.context.registeredReaders();
        if (this.shouldEnumerate) {
            List<TDengineSourceSplit> newSplits = this.discoverySplits();
            Object object = this.stateLock;
            synchronized (object) {
                this.addPendingSplit(newSplits);
                this.shouldEnumerate = false;
            }
            this.assignSplit(readers);
        }
        log.info("No more splits to assign. Sending NoMoreSplitsEvent to reader {}.", (Object)readers);
        readers.forEach(arg_0 -> this.context.signalNoMoreSplits(arg_0));
    }

    private void addPendingSplit(List<TDengineSourceSplit> newSplits) {
        int readerCount = this.context.currentParallelism();
        for (TDengineSourceSplit split : newSplits) {
            int ownerReader = TDengineSourceSplitEnumerator.getSplitOwner(split.splitId(), readerCount);
            this.pendingSplits.computeIfAbsent(ownerReader, r -> new ArrayList()).add(split);
        }
    }

    private List<TDengineSourceSplit> discoverySplits() {
        String timestampFieldName = this.stableMetadata.getTimestampFieldName();
        ArrayList<TDengineSourceSplit> splits = new ArrayList<TDengineSourceSplit>();
        for (String subTableName : this.stableMetadata.getSubTableNames()) {
            TDengineSourceSplit splitBySubTable = this.createSplitBySubTable(subTableName, timestampFieldName);
            splits.add(splitBySubTable);
        }
        return splits;
    }

    private TDengineSourceSplit createSplitBySubTable(String subTableName, String timestampFieldName) {
        String selectFields = Arrays.stream(this.stableMetadata.getRowType().getFieldNames()).skip(1L).map(name -> String.format("`%s`", name)).collect(Collectors.joining(","));
        String subTableSQL = String.format("select %s from %s.`%s`", selectFields, this.config.getDatabase(), subTableName);
        String start = this.config.getLowerBound();
        String end = this.config.getUpperBound();
        if (start != null || end != null) {
            String startCondition = null;
            String endCondition = null;
            if (start != null) {
                startCondition = timestampFieldName + " >= '" + start + "'";
            }
            if (end != null) {
                endCondition = timestampFieldName + " < '" + end + "'";
            }
            String query = String.join((CharSequence)" and ", startCondition, endCondition);
            subTableSQL = subTableSQL + " where " + query;
        }
        return new TDengineSourceSplit(subTableName, subTableSQL);
    }

    public void addSplitsBack(List<TDengineSourceSplit> splits, int subtaskId) {
        log.info("Add back splits {} to TDengineSourceSplitEnumerator.", splits);
        if (!splits.isEmpty()) {
            this.addPendingSplit(splits);
            this.assignSplit(Collections.singletonList(subtaskId));
        }
    }

    public int currentUnassignedSplitSize() {
        return this.pendingSplits.size();
    }

    public void registerReader(int subtaskId) {
        log.info("Register reader {} to TDengineSourceSplitEnumerator.", (Object)subtaskId);
        if (!this.pendingSplits.isEmpty()) {
            this.assignSplit(Collections.singletonList(subtaskId));
        }
    }

    private void assignSplit(Collection<Integer> readers) {
        log.info("Assign pendingSplits to readers {}", readers);
        for (int reader : readers) {
            List<TDengineSourceSplit> assignmentForReader = this.pendingSplits.remove(reader);
            if (assignmentForReader == null || assignmentForReader.isEmpty()) continue;
            log.info("Assign splits {} to reader {}", assignmentForReader, (Object)reader);
            try {
                this.context.assignSplit(reader, assignmentForReader);
            }
            catch (Exception e) {
                log.error("Failed to assign splits {} to reader {}", new Object[]{assignmentForReader, reader, e});
                this.pendingSplits.put(reader, assignmentForReader);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public TDengineSourceState snapshotState(long checkpointId) {
        Object object = this.stateLock;
        synchronized (object) {
            return new TDengineSourceState(this.shouldEnumerate, this.pendingSplits);
        }
    }

    public void notifyCheckpointComplete(long checkpointId) {
    }

    public void close() {
    }

    public void handleSplitRequest(int subtaskId) {
        throw new TDengineConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, String.format("Unsupported handleSplitRequest: %d", subtaskId));
    }
}

