/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.clickhouse.source.split;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhousePart;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.source.ClickhouseSourceTable;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.source.split.ClickhouseSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.source.split.Splitter;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartStrategySplitter
implements Splitter,
AutoCloseable,
Serializable {
    private static final Logger log = LoggerFactory.getLogger(PartStrategySplitter.class);
    private static final long serialVersionUID = 1284356772463422708L;

    @Override
    public List<ClickhouseSourceSplit> generateSplits(ClickhouseSourceTable clickhouseSourceTable, List<Shard> clusterShardList) {
        log.info("start part strategy splitter generate splits. table: {}", (Object)clickhouseSourceTable.getTablePath());
        ClickhouseTable clickhouseTable = clickhouseSourceTable.getClickhouseTable();
        HashMap<Shard, List<ClickhousePart>> shardToParts = new HashMap<Shard, List<ClickhousePart>>();
        clusterShardList.forEach(shard -> {
            try (ClickhouseProxy proxy = new ClickhouseProxy(shard.getNode());){
                List<ClickhousePart> partList = proxy.getPartList(clickhouseTable.getLocalDatabase(), clickhouseTable.getLocalTableName(), (Shard)shard, clickhouseSourceTable.getPartitionList());
                shardToParts.put((Shard)shard, partList);
            }
        });
        return this.partMapToSplits(clickhouseSourceTable, shardToParts);
    }

    @Override
    public String createSplitId(TablePath tablePath, Shard shard, int index) {
        return String.format("%s-%s-%s", tablePath, shard.hashCode(), index);
    }

    public List<ClickhouseSourceSplit> partMapToSplits(ClickhouseSourceTable clickhouseSourceTable, Map<Shard, List<ClickhousePart>> shardToParts) {
        int partSplitSize = this.partCountLimitForOneSplit(clickhouseSourceTable);
        ArrayList<ClickhouseSourceSplit> splits = new ArrayList<ClickhouseSourceSplit>();
        ClickhouseTable clickhouseTable = clickhouseSourceTable.getClickhouseTable();
        for (Map.Entry<Shard, List<ClickhousePart>> shardPartsEntry : shardToParts.entrySet()) {
            HashSet partSet = new HashSet(shardPartsEntry.getValue());
            shardPartsEntry.getValue().clear();
            shardPartsEntry.getValue().addAll(partSet);
            for (int fromIndex = 0; fromIndex < shardPartsEntry.getValue().size(); fromIndex += partSplitSize) {
                HashSet<ClickhousePart> partSplit = new HashSet<ClickhousePart>(shardPartsEntry.getValue().subList(fromIndex, Math.min(fromIndex + partSplitSize, shardPartsEntry.getValue().size())));
                String splitId = String.valueOf(this.createSplitId(clickhouseSourceTable.getTablePath(), shardPartsEntry.getKey(), splits.size()));
                ClickhouseSourceSplit clickhouseSourceSplit = new ClickhouseSourceSplit(TablePath.of((String)clickhouseTable.getLocalDatabase(), (String)clickhouseTable.getLocalTableName()), TablePath.of((String)clickhouseTable.getDatabase(), (String)clickhouseTable.getTableName()), new ArrayList<ClickhousePart>(partSplit), shardPartsEntry.getKey(), clickhouseSourceTable.getOriginQuery(), 0, splitId);
                splits.add(clickhouseSourceSplit);
            }
        }
        for (ClickhouseSourceSplit split : splits) {
            List partNameList = split.getParts().stream().map(ClickhousePart::getName).collect(Collectors.toList());
            log.debug("generate shard {} to parts {}", (Object)split.getShard().getNode(), partNameList);
        }
        log.info("generate splits size: {}", (Object)splits.size());
        return splits;
    }

    public int partCountLimitForOneSplit(ClickhouseSourceTable clickhouseSourceTable) {
        int partSize = Integer.MAX_VALUE;
        if (clickhouseSourceTable.getSplitSize() != null) {
            partSize = clickhouseSourceTable.getSplitSize();
        }
        if (partSize < 1) {
            log.warn("part size {} is less than {}, set to default value {}", new Object[]{partSize, 1, Integer.MAX_VALUE});
            partSize = 1;
        }
        log.debug("part size is set to {}", (Object)partSize);
        return partSize;
    }

    @Override
    public void close() {
    }
}

