/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.redis.source;

import java.io.IOException;
import java.util.List;
import java.util.Objects;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
import org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
import org.apache.seatunnel.connectors.seatunnel.redis.source.KeyedRecordReader;
import org.apache.seatunnel.connectors.seatunnel.redis.source.RedisRecordReader;
import org.apache.seatunnel.connectors.seatunnel.redis.source.UnKeyedRecordReader;
import org.apache.seatunnel.connectors.seatunnel.redis.util.KeyValueMergerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.params.ScanParams;
import redis.clients.jedis.resps.ScanResult;

public class RedisSourceReader
extends AbstractSingleSplitReader<SeaTunnelRow> {
    private static final Logger log = LoggerFactory.getLogger(RedisSourceReader.class);
    private final RedisParameters redisParameters;
    private final SingleSplitReaderContext context;
    private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
    private RedisClient redisClient;

    public RedisSourceReader(RedisParameters redisParameters, SingleSplitReaderContext context, DeserializationSchema<SeaTunnelRow> deserializationSchema) {
        this.redisParameters = redisParameters;
        this.context = context;
        this.deserializationSchema = deserializationSchema;
    }

    public void open() throws Exception {
        this.redisClient = this.redisParameters.buildRedisClient();
    }

    public void close() throws IOException {
        if (Objects.nonNull(this.redisClient)) {
            this.redisClient.close();
        }
    }

    @Override
    public void internalPollNext(Collector<SeaTunnelRow> output) throws Exception {
        RedisDataType redisDataType = this.resolveScanType(this.redisParameters.getRedisDataType());
        String cursor = ScanParams.SCAN_POINTER_START;
        String keysPattern = this.redisParameters.getKeysPattern();
        int batchSize = this.redisParameters.getBatchSize();
        do {
            ScanResult<String> scanResult = this.redisClient.scanKeys(cursor, batchSize, keysPattern, redisDataType);
            cursor = scanResult.getCursor();
            List<String> keys = scanResult.getResult();
            this.pollNext(keys, redisDataType, output);
        } while (!ScanParams.SCAN_POINTER_START.equals(cursor));
        this.context.signalNoMoreElement();
    }

    private void pollNext(List<String> keys, RedisDataType dataType, Collector<SeaTunnelRow> output) throws IOException {
        RedisRecordReader redisRecordReader = Boolean.TRUE.equals(this.redisParameters.getReadKeyEnabled()) ? new KeyedRecordReader(this.redisParameters, this.deserializationSchema, this.redisClient, KeyValueMergerFactory.createMerger(this.deserializationSchema, this.redisParameters)) : new UnKeyedRecordReader(this.redisParameters, this.deserializationSchema, this.redisClient);
        if (CollectionUtils.isEmpty(keys)) {
            return;
        }
        if (RedisDataType.HASH.equals((Object)dataType)) {
            redisRecordReader.pollHashMapToNext(keys, output);
            return;
        }
        if (RedisDataType.STRING.equals((Object)dataType) || RedisDataType.KEY.equals((Object)dataType)) {
            redisRecordReader.pollStringToNext(keys, output);
            return;
        }
        if (RedisDataType.LIST.equals((Object)dataType)) {
            redisRecordReader.pollListToNext(keys, output);
            return;
        }
        if (RedisDataType.SET.equals((Object)dataType)) {
            redisRecordReader.pollSetToNext(keys, output);
            return;
        }
        if (RedisDataType.ZSET.equals((Object)dataType)) {
            redisRecordReader.pollZsetToNext(keys, output);
            return;
        }
        throw new RedisConnectorException((SeaTunnelErrorCode)CommonErrorCode.UNSUPPORTED_DATA_TYPE, "UnSupport redisDataType,only support string,list,hash,set,zset");
    }

    private RedisDataType resolveScanType(RedisDataType dataType) {
        if (RedisDataType.KEY.equals((Object)dataType)) {
            return RedisDataType.STRING;
        }
        return dataType;
    }
}

