/*
 * Decompiled with CFR 0.152.
 */
package io.r2dbc.postgresql;

import io.r2dbc.postgresql.ExceptionFactory;
import io.r2dbc.postgresql.PostgresReplicationStream;
import io.r2dbc.postgresql.PostgresqlConnection;
import io.r2dbc.postgresql.api.PostgresqlConnectionMetadata;
import io.r2dbc.postgresql.api.PostgresqlReplicationConnection;
import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.message.backend.BackendMessage;
import io.r2dbc.postgresql.message.backend.EmptyQueryResponse;
import io.r2dbc.postgresql.message.backend.ErrorResponse;
import io.r2dbc.postgresql.message.backend.ReadyForQuery;
import io.r2dbc.postgresql.message.frontend.FrontendMessage;
import io.r2dbc.postgresql.message.frontend.Query;
import io.r2dbc.postgresql.replication.LogSequenceNumber;
import io.r2dbc.postgresql.replication.ReplicationRequest;
import io.r2dbc.postgresql.replication.ReplicationSlot;
import io.r2dbc.postgresql.replication.ReplicationSlotRequest;
import io.r2dbc.postgresql.replication.ReplicationStream;
import io.r2dbc.postgresql.util.Assert;
import io.r2dbc.postgresql.util.PredicateUtils;
import io.r2dbc.spi.Row;
import java.util.function.Predicate;
import org.reactivestreams.Publisher;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

final class DefaultPostgresqlReplicationConnection
implements PostgresqlReplicationConnection {
    private static final Predicate<BackendMessage> WINDOW_UNTIL;
    private final PostgresqlConnection delegate;
    private final Client client;

    DefaultPostgresqlReplicationConnection(PostgresqlConnection delegate) {
        this.delegate = delegate;
        this.client = delegate.getClient();
    }

    @Override
    public Mono<Void> close() {
        return this.delegate.close();
    }

    @Override
    public Mono<ReplicationSlot> createSlot(ReplicationSlotRequest request) {
        Assert.requireNonNull(request, "request must not be null");
        return this.delegate.createStatement(request.asSQL()).execute().flatMap(it -> it.map((row, rowMetadata) -> DefaultPostgresqlReplicationConnection.getReplicationSlot(request, row))).last();
    }

    private static ReplicationSlot getReplicationSlot(ReplicationSlotRequest request, Row row) {
        return new ReplicationSlot(DefaultPostgresqlReplicationConnection.getString(row, "slot_name"), request.getReplicationType(), LogSequenceNumber.valueOf(DefaultPostgresqlReplicationConnection.getString(row, "consistent_point")), (String)row.get("snapshot_name", String.class), (String)row.get("output_plugin", String.class));
    }

    @Override
    public Mono<ReplicationStream> startReplication(ReplicationRequest request) {
        Assert.requireNonNull(request, "request must not be null");
        String sql = request.asSQL();
        ExceptionFactory exceptionFactory = ExceptionFactory.withSql(sql);
        EmitterProcessor requestProcessor = EmitterProcessor.create();
        return Mono.fromDirect((Publisher)this.client.exchange((Publisher<FrontendMessage>)requestProcessor.startWith((Object[])new FrontendMessage[]{new Query(sql)})).handle(exceptionFactory::handleErrorResponse).windowUntil(WINDOW_UNTIL).map(messages -> new PostgresReplicationStream(this.client.getByteBufAllocator(), request, (EmitterProcessor<FrontendMessage>)requestProcessor, (Flux<BackendMessage>)messages)));
    }

    @Override
    public PostgresqlConnectionMetadata getMetadata() {
        return this.delegate.getMetadata();
    }

    private static String getString(Row row, String column) {
        String value = (String)row.get(column, String.class);
        if (value == null) {
            throw new IllegalStateException(String.format("No value found for column %s", column));
        }
        return value;
    }

    static {
        Predicate[] predicateArray = new Predicate[3];
        predicateArray[0] = ReadyForQuery.class::isInstance;
        predicateArray[1] = EmptyQueryResponse.class::isInstance;
        predicateArray[2] = ErrorResponse.class::isInstance;
        WINDOW_UNTIL = PredicateUtils.or(predicateArray);
    }
}

