/*
 * Decompiled with CFR 0.152.
 */
package org.redisson.spring.data.connection;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.redisson.api.RFuture;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.reactive.CommandReactiveExecutor;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.RedisClusterNode;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

abstract class RedissonBaseReactive {
    final CommandReactiveExecutor executorService;

    RedissonBaseReactive(CommandReactiveExecutor executorService) {
        this.executorService = executorService;
    }

    public static byte[] toByteArray(ByteBuffer buffer) {
        byte[] dst = new byte[buffer.remaining()];
        int pos = buffer.position();
        buffer.get(dst);
        buffer.position(pos);
        return dst;
    }

    RFuture<String> toStringFuture(RFuture<Void> f) {
        CompletionStage ff = f.thenApply(r -> "OK");
        return new CompletableFutureWrapper(ff);
    }

    <T> Mono<T> execute(RedisClusterNode node, RedisCommand<T> command, Object ... params) {
        MasterSlaveEntry entry = this.getEntry(node);
        return this.executorService.reactive(() -> this.executorService.writeAsync(entry, (Codec)StringCodec.INSTANCE, command, params));
    }

    MasterSlaveEntry getEntry(RedisClusterNode node) {
        MasterSlaveEntry entry = this.executorService.getConnectionManager().getEntry(new InetSocketAddress(node.getHost(), (int)node.getPort()));
        return entry;
    }

    <V, T> Flux<T> execute(Publisher<V> commands, Function<V, Publisher<T>> mapper) {
        Flux s = Flux.from(commands);
        return s.concatMap(mapper);
    }

    <T> Mono<T> write(byte[] key, Codec codec, RedisCommand<?> command, Object ... params) {
        Mono f = this.executorService.reactive(() -> this.executorService.writeAsync(key, codec, command, params));
        return f.onErrorMap(e -> new RedisSystemException(e.getMessage(), e));
    }

    <T> Mono<T> read(byte[] key, Codec codec, RedisCommand<?> command, Object ... params) {
        Mono f = this.executorService.reactive(() -> this.executorService.readAsync(key, codec, command, params));
        return f.onErrorMap(e -> new RedisSystemException(e.getMessage(), e));
    }
}

