package org.apache.seatunnel.connectors.seatunnel.redis.source;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
import org.apache.seatunnel.connectors.seatunnel.redis.exception.RedisConnectorException;
import redis.clients.jedis.params.ScanParams;
import redis.clients.jedis.resps.ScanResult;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.class */
public class RedisSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
    private final RedisParameters redisParameters;
    private final SingleSplitReaderContext context;
    private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
    private RedisClient redisClient;

    public RedisSourceReader(RedisParameters redisParameters, SingleSplitReaderContext singleSplitReaderContext, DeserializationSchema<SeaTunnelRow> deserializationSchema) {
        this.redisParameters = redisParameters;
        this.context = singleSplitReaderContext;
        this.deserializationSchema = deserializationSchema;
    }

    public void open() throws Exception {
        this.redisClient = this.redisParameters.buildRedisClient();
    }

    public void close() throws IOException {
        if (Objects.nonNull(this.redisClient)) {
            this.redisClient.close();
        }
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader
    public void internalPollNext(Collector<SeaTunnelRow> collector) throws Exception {
        RedisDataType resolveScanType = resolveScanType(this.redisParameters.getRedisDataType());
        String str = ScanParams.SCAN_POINTER_START;
        String keysPattern = this.redisParameters.getKeysPattern();
        int batchSize = this.redisParameters.getBatchSize();
        do {
            ScanResult<String> scanKeys = this.redisClient.scanKeys(str, batchSize, keysPattern, resolveScanType);
            str = scanKeys.getCursor();
            pollNext(scanKeys.getResult(), resolveScanType, collector);
        } while (!ScanParams.SCAN_POINTER_START.equals(str));
        this.context.signalNoMoreElement();
    }

    private void pollNext(List<String> list, RedisDataType redisDataType, Collector<SeaTunnelRow> collector) throws IOException {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        if (RedisDataType.HASH.equals(redisDataType)) {
            pollHashMapToNext(list, collector);
            return;
        }
        if (RedisDataType.STRING.equals(redisDataType) || RedisDataType.KEY.equals(redisDataType)) {
            pollStringToNext(list, collector);
            return;
        }
        if (RedisDataType.LIST.equals(redisDataType)) {
            pollListToNext(list, collector);
        } else if (RedisDataType.SET.equals(redisDataType)) {
            pollSetToNext(list, collector);
        } else {
            if (!RedisDataType.ZSET.equals(redisDataType)) {
                throw new RedisConnectorException((SeaTunnelErrorCode) CommonErrorCode.UNSUPPORTED_DATA_TYPE, "UnSupport redisDataType,only support string,list,hash,set,zset");
            }
            pollZsetToNext(list, collector);
        }
    }

    private void pollZsetToNext(List<String> list, Collector<SeaTunnelRow> collector) throws IOException {
        Iterator<List<String>> it = this.redisClient.batchGetZset(list).iterator();
        while (it.hasNext()) {
            Iterator<String> it2 = it.next().iterator();
            while (it2.hasNext()) {
                pollValueToNext(it2.next(), collector);
            }
        }
    }

    private void pollSetToNext(List<String> list, Collector<SeaTunnelRow> collector) throws IOException {
        Iterator<Set<String>> it = this.redisClient.batchGetSet(list).iterator();
        while (it.hasNext()) {
            Iterator<String> it2 = it.next().iterator();
            while (it2.hasNext()) {
                pollValueToNext(it2.next(), collector);
            }
        }
    }

    private void pollListToNext(List<String> list, Collector<SeaTunnelRow> collector) throws IOException {
        Iterator<List<String>> it = this.redisClient.batchGetList(list).iterator();
        while (it.hasNext()) {
            Iterator<String> it2 = it.next().iterator();
            while (it2.hasNext()) {
                pollValueToNext(it2.next(), collector);
            }
        }
    }

    private void pollStringToNext(List<String> list, Collector<SeaTunnelRow> collector) throws IOException {
        Iterator<String> it = this.redisClient.batchGetString(list).iterator();
        while (it.hasNext()) {
            pollValueToNext(it.next(), collector);
        }
    }

    private void pollValueToNext(String str, Collector<SeaTunnelRow> collector) throws IOException {
        if (this.deserializationSchema == null) {
            collector.collect(new SeaTunnelRow(new Object[]{str}));
        } else {
            this.deserializationSchema.deserialize(str.getBytes(), collector);
        }
    }

    private void pollHashMapToNext(List<String> list, Collector<SeaTunnelRow> collector) throws IOException {
        List<Map<String, String>> batchGetHash = this.redisClient.batchGetHash(list);
        if (this.deserializationSchema == null) {
            Iterator<Map<String, String>> it = batchGetHash.iterator();
            while (it.hasNext()) {
                collector.collect(new SeaTunnelRow(new Object[]{JsonUtils.toJsonString(it.next())}));
            }
        } else {
            for (Map<String, String> map : batchGetHash) {
                if (this.redisParameters.getHashKeyParseMode() == RedisConfig.HashKeyParseMode.KV) {
                    this.deserializationSchema.deserialize(JsonUtils.toJsonString(map).getBytes(), collector);
                } else {
                    collector.collect(new SeaTunnelRow(new Object[]{JsonUtils.toJsonString(map)}));
                }
            }
        }
    }

    private RedisDataType resolveScanType(RedisDataType redisDataType) {
        return RedisDataType.KEY.equals(redisDataType) ? RedisDataType.STRING : redisDataType;
    }
}
