package io.asyncer.r2dbc.mysql;

import io.asyncer.r2dbc.mysql.client.FluxExchangeable;
import io.asyncer.r2dbc.mysql.message.client.ClientMessage;
import io.asyncer.r2dbc.mysql.message.client.PingMessage;
import io.asyncer.r2dbc.mysql.message.client.TextQueryMessage;
import io.asyncer.r2dbc.mysql.message.server.ServerMessage;
import java.util.Iterator;
import java.util.Queue;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.SynchronousSink;
import reactor.util.concurrent.Queues;

/* JADX INFO: Access modifiers changed from: package-private */
/* compiled from: QueryFlow.java */
/* loaded from: input_file:io/asyncer/r2dbc/mysql/TransactionMultiExchangeable.class */
public final class TransactionMultiExchangeable extends FluxExchangeable<Void> {
    private final Sinks.Many<ClientMessage> requests = Sinks.many().unicast().onBackpressureBuffer((Queue) Queues.one().get());
    private final AbstractTransactionState state;
    private final Iterator<String> statements;

    /* JADX INFO: Access modifiers changed from: package-private */
    public TransactionMultiExchangeable(AbstractTransactionState abstractTransactionState) {
        this.state = abstractTransactionState;
        this.statements = abstractTransactionState.statements();
    }

    @Override // java.util.function.BiConsumer
    public void accept(ServerMessage serverMessage, SynchronousSink<Void> synchronousSink) {
        if (this.state.accept(serverMessage, synchronousSink)) {
            String next = this.statements.next();
            QueryLogger.log(next);
            this.state.setSql(next);
            Sinks.EmitResult tryEmitNext = this.requests.tryEmitNext(new TextQueryMessage(next));
            if (tryEmitNext != Sinks.EmitResult.OK) {
                QueryFlow.logger.error("Fail to emit a transaction message due to {}", new Object[]{tryEmitNext});
                synchronousSink.complete();
            }
        }
    }

    public void dispose() {
        this.requests.tryEmitComplete();
    }

    public void subscribe(CoreSubscriber<? super ClientMessage> coreSubscriber) {
        if (this.state.cancelTasks()) {
            coreSubscriber.onSubscribe(Operators.scalarSubscription(coreSubscriber, PingMessage.INSTANCE));
            return;
        }
        String next = this.statements.next();
        QueryLogger.log(next);
        this.state.setSql(next);
        this.requests.asFlux().subscribe(coreSubscriber);
        Sinks.EmitResult tryEmitNext = this.requests.tryEmitNext(new TextQueryMessage(next));
        if (tryEmitNext != Sinks.EmitResult.OK) {
            QueryFlow.logger.error("Fail to emit a transaction message due to {}", new Object[]{tryEmitNext});
        }
    }
}
