/*
 * Decompiled with CFR 0.152.
 */
package io.r2dbc.postgresql.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.r2dbc.postgresql.client.Binding;
import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.ConnectionContext;
import io.r2dbc.postgresql.client.Parameter;
import io.r2dbc.postgresql.client.PortalNameSupplier;
import io.r2dbc.postgresql.client.QueryLogger;
import io.r2dbc.postgresql.message.Format;
import io.r2dbc.postgresql.message.backend.BackendMessage;
import io.r2dbc.postgresql.message.backend.CommandComplete;
import io.r2dbc.postgresql.message.backend.ErrorResponse;
import io.r2dbc.postgresql.message.backend.ParseComplete;
import io.r2dbc.postgresql.message.backend.PortalSuspended;
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
import io.r2dbc.postgresql.message.frontend.Bind;
import io.r2dbc.postgresql.message.frontend.Close;
import io.r2dbc.postgresql.message.frontend.CompositeFrontendMessage;
import io.r2dbc.postgresql.message.frontend.Describe;
import io.r2dbc.postgresql.message.frontend.Execute;
import io.r2dbc.postgresql.message.frontend.ExecutionType;
import io.r2dbc.postgresql.message.frontend.Flush;
import io.r2dbc.postgresql.message.frontend.FrontendMessage;
import io.r2dbc.postgresql.message.frontend.Parse;
import io.r2dbc.postgresql.message.frontend.Sync;
import io.r2dbc.postgresql.util.Assert;
import io.r2dbc.postgresql.util.Operators;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.UnicastProcessor;
import reactor.util.concurrent.Queues;

public final class ExtendedQueryMessageFlow {
    public static final Pattern PARAMETER_SYMBOL = Pattern.compile("\\$([\\d]+)", 32);
    private static final Predicate<BackendMessage> PARSE_TAKE_UNTIL = message -> message instanceof ParseComplete || message instanceof ReadyForQuery;

    private ExtendedQueryMessageFlow() {
    }

    @Deprecated
    public static Flux<BackendMessage> execute(Binding binding, Client client, PortalNameSupplier portalNameSupplier, String statementName, String query, boolean forceBinary, int fetchSize) {
        Assert.requireNonNull(binding, "binding must not be null");
        Assert.requireNonNull(client, "client must not be null");
        Assert.requireNonNull(portalNameSupplier, "portalNameSupplier must not be null");
        Assert.requireNonNull(statementName, "statementName must not be null");
        String portal = portalNameSupplier.get();
        return Flux.defer(() -> {
            Flux<FrontendMessage> bindFlow = ExtendedQueryMessageFlow.toBindFlow(client.getContext(), binding, portal, statementName, query, forceBinary);
            if (fetchSize == 0) {
                return ExtendedQueryMessageFlow.fetchAll(bindFlow, client, portal);
            }
            return ExtendedQueryMessageFlow.fetchCursored(bindFlow, client, portal, fetchSize);
        }).doOnDiscard(ReferenceCounted.class, ReferenceCountUtil::release);
    }

    private static Flux<BackendMessage> fetchAll(Flux<FrontendMessage> bindFlow, Client client, String portal) {
        return (Flux)client.exchange((Publisher<FrontendMessage>)bindFlow.concatWithValues((Object[])new FrontendMessage[]{new CompositeFrontendMessage(new Execute(portal, 0), new Close(portal, ExecutionType.PORTAL), Sync.INSTANCE)})).as(Operators::discardOnCancel);
    }

    private static Flux<BackendMessage> fetchCursored(Flux<FrontendMessage> bindFlow, Client client, String portal, int fetchSize) {
        UnicastProcessor requestsProcessor = UnicastProcessor.create((Queue)((Queue)Queues.small().get()));
        FluxSink requestsSink = requestsProcessor.sink();
        AtomicBoolean isCanceled = new AtomicBoolean(false);
        return (Flux)client.exchange((Publisher<FrontendMessage>)bindFlow.concatWithValues((Object[])new FrontendMessage[]{new CompositeFrontendMessage(new Execute(portal, fetchSize), Flush.INSTANCE)}).concatWith((Publisher)requestsProcessor)).handle((message, sink) -> {
            if (message instanceof CommandComplete) {
                requestsSink.next((Object)new Close(portal, ExecutionType.PORTAL));
                requestsSink.next((Object)Sync.INSTANCE);
                requestsSink.complete();
                sink.next(message);
            } else if (message instanceof ErrorResponse) {
                requestsSink.next((Object)Sync.INSTANCE);
                requestsSink.complete();
                sink.next(message);
            } else if (message instanceof PortalSuspended) {
                if (isCanceled.get()) {
                    requestsSink.next((Object)new Close(portal, ExecutionType.PORTAL));
                    requestsSink.next((Object)Sync.INSTANCE);
                    requestsSink.complete();
                } else {
                    requestsSink.next((Object)new Execute(portal, fetchSize));
                    requestsSink.next((Object)Flush.INSTANCE);
                }
            } else {
                sink.next(message);
            }
        }).as(flux -> Operators.discardOnCancel(flux, () -> isCanceled.set(true)));
    }

    @Deprecated
    public static Flux<BackendMessage> parse(Client client, String name, String query, int[] types) {
        Assert.requireNonNull(client, "client must not be null");
        Assert.requireNonNull(name, "name must not be null");
        Assert.requireNonNull(query, "query must not be null");
        Assert.requireNonNull(types, "types must not be null");
        return client.exchange(PARSE_TAKE_UNTIL, (Publisher<FrontendMessage>)Flux.just((Object)new CompositeFrontendMessage(new Parse(name, types, query), Flush.INSTANCE))).doOnNext(message -> {
            if (message instanceof ErrorResponse) {
                client.send(Sync.INSTANCE);
            }
        });
    }

    public static Flux<BackendMessage> closeStatement(Client client, String name) {
        Assert.requireNonNull(client, "client must not be null");
        Assert.requireNonNull(name, "name must not be null");
        return (Flux)client.exchange((Publisher<FrontendMessage>)Flux.just((Object)new CompositeFrontendMessage(new Close(name, ExecutionType.STATEMENT), Sync.INSTANCE))).as(Operators::discardOnCancel);
    }

    private static Flux<FrontendMessage> toBindFlow(ConnectionContext connectionContext, Binding binding, String portal, String statementName, String query, boolean forceBinary) {
        return Flux.fromIterable(binding.getParameterValues()).flatMap(f -> {
            if (f == Parameter.NULL_VALUE) {
                return Flux.just((Object)Bind.NULL_VALUE);
            }
            return Flux.from((Publisher)f).reduce((Object)Unpooled.compositeBuffer(), (c, b) -> c.addComponent(true, b));
        }).collectList().flatMapMany(values -> {
            Bind bind = new Bind(portal, binding.getParameterFormats(), (List<ByteBuf>)values, ExtendedQueryMessageFlow.resultFormat(forceBinary), statementName);
            return Flux.just((Object)new CompositeFrontendMessage(bind, new Describe(portal, ExecutionType.PORTAL)));
        }).doOnSubscribe(ignore -> QueryLogger.logQuery(connectionContext, query));
    }

    public static Collection<Format> resultFormat(boolean forceBinary) {
        if (forceBinary) {
            return Format.binary();
        }
        return Collections.emptyList();
    }
}

