/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.redis.source;

import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.connectors.seatunnel.redis.client.RedisClient;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
import org.apache.seatunnel.connectors.seatunnel.redis.source.RedisRecordReader;
import org.apache.seatunnel.connectors.seatunnel.redis.util.KeyValueMerger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KeyedRecordReader
extends RedisRecordReader {
    private static final Logger log = LoggerFactory.getLogger(KeyedRecordReader.class);
    private final KeyValueMerger keyValueMerger;

    public KeyedRecordReader(RedisParameters redisParameters, DeserializationSchema<SeaTunnelRow> deserializationSchema, RedisClient redisClient, KeyValueMerger keyValueMerger) {
        super(redisParameters, deserializationSchema, redisClient);
        this.keyValueMerger = keyValueMerger;
    }

    @Override
    public void pollZsetToNext(List<String> keys, Collector<SeaTunnelRow> output) throws IOException {
        List<List<String>> zSetList = this.redisClient.batchGetZset(keys);
        for (int i = 0; i < zSetList.size(); ++i) {
            for (String value : zSetList.get(i)) {
                this.pollValueToNext(keys.get(i), value, output);
            }
        }
    }

    @Override
    public void pollSetToNext(List<String> keys, Collector<SeaTunnelRow> output) throws IOException {
        List<Set<String>> setList = this.redisClient.batchGetSet(keys);
        for (int i = 0; i < setList.size(); ++i) {
            for (String value : setList.get(i)) {
                this.pollValueToNext(keys.get(i), value, output);
            }
        }
    }

    @Override
    public void pollListToNext(List<String> keys, Collector<SeaTunnelRow> output) throws IOException {
        List<List<String>> valueList = this.redisClient.batchGetList(keys);
        for (int i = 0; i < valueList.size(); ++i) {
            for (String value : valueList.get(i)) {
                this.pollValueToNext(keys.get(i), value, output);
            }
        }
    }

    @Override
    public void pollStringToNext(List<String> keys, Collector<SeaTunnelRow> output) throws IOException {
        List<String> values = this.redisClient.batchGetString(keys);
        for (int i = 0; i < values.size(); ++i) {
            this.pollValueToNext(keys.get(i), values.get(i), output);
        }
    }

    private void pollValueToNext(String key, String value, Collector<SeaTunnelRow> output) throws IOException {
        if (this.deserializationSchema == null) {
            throw CommonError.illegalArgument((String)"deserializationSchema is null", (String)("Redis source requires a deserialization schema to parse the record with key: " + key));
        }
        String parsed = this.keyValueMerger.parseWithKey(key, value);
        this.deserializationSchema.deserialize(parsed.getBytes(), output);
    }
}

