package io.smallrye.reactive.messaging.rabbitmq.internals;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniSubscribe;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.tuples.Tuple2;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQMessageConverter;
import io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQExceptions;
import io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging;
import io.smallrye.reactive.messaging.rabbitmq.tracing.RabbitMQOpenTelemetryInstrumenter;
import io.vertx.mutiny.rabbitmq.RabbitMQPublisher;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.eclipse.microprofile.reactive.messaging.Message;

/* loaded from: input_file:io/smallrye/reactive/messaging/rabbitmq/internals/RabbitMQMessageSender.class */
public class RabbitMQMessageSender implements Flow.Processor<Message<?>, Message<?>>, Flow.Subscription {
    private final Uni<RabbitMQPublisher> retrieveSender;
    private final RabbitMQConnectorOutgoingConfiguration configuration;
    private final AtomicReference<Flow.Subscription> upstream = new AtomicReference<>();
    private final AtomicReference<Flow.Subscriber<? super Message<?>>> downstream = new AtomicReference<>();
    private final String configuredExchange;
    private final boolean isTracingEnabled;
    private final long inflights;
    private final Optional<Long> defaultTtl;
    private final boolean publishConfirms;
    private final RabbitMQOpenTelemetryInstrumenter instrumenter;

    public RabbitMQMessageSender(RabbitMQConnectorOutgoingConfiguration rabbitMQConnectorOutgoingConfiguration, Uni<RabbitMQPublisher> uni) {
        this.retrieveSender = uni;
        this.configuration = rabbitMQConnectorOutgoingConfiguration;
        this.configuredExchange = RabbitMQClientHelper.getExchangeName(rabbitMQConnectorOutgoingConfiguration);
        this.isTracingEnabled = rabbitMQConnectorOutgoingConfiguration.getTracingEnabled().booleanValue();
        this.inflights = rabbitMQConnectorOutgoingConfiguration.getMaxInflightMessages().longValue();
        this.defaultTtl = rabbitMQConnectorOutgoingConfiguration.getDefaultTtl();
        this.publishConfirms = rabbitMQConnectorOutgoingConfiguration.getPublishConfirms().booleanValue();
        if (this.inflights <= 0) {
            throw RabbitMQExceptions.ex.illegalArgumentInvalidMaxInflightMessages();
        }
        if (this.defaultTtl.isPresent() && this.defaultTtl.get().longValue() < 0) {
            throw RabbitMQExceptions.ex.illegalArgumentInvalidDefaultTtl();
        }
        if (rabbitMQConnectorOutgoingConfiguration.getTracingEnabled().booleanValue()) {
            this.instrumenter = RabbitMQOpenTelemetryInstrumenter.createForSender();
        } else {
            this.instrumenter = null;
        }
    }

    @Override // java.util.concurrent.Flow.Publisher
    public void subscribe(Flow.Subscriber<? super Message<?>> subscriber) {
        if (!this.downstream.compareAndSet(null, subscriber)) {
            Subscriptions.fail(subscriber, RabbitMQExceptions.ex.illegalStateOnlyOneSubscriberAllowed());
        } else if (this.upstream.get() != null) {
            subscriber.onSubscribe(this);
        }
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        if (this.upstream.compareAndSet(null, subscription)) {
            Flow.Subscriber<? super Message<?>> subscriber = this.downstream.get();
            if (subscriber != null) {
                subscriber.onSubscribe(this);
                return;
            }
            return;
        }
        Flow.Subscriber<? super Message<?>> subscriber2 = this.downstream.get();
        if (subscriber2 != null) {
            subscriber2.onSubscribe(Subscriptions.CANCELLED);
        }
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(Message<?> message) {
        if (isCancelled()) {
            return;
        }
        Flow.Subscriber<? super Message<?>> subscriber = this.downstream.get();
        UniSubscribe subscribe = this.retrieveSender.onItem().transformToUni(rabbitMQPublisher -> {
            try {
                return send(rabbitMQPublisher, message, this.configuredExchange, this.configuration).onItem().transform(message2 -> {
                    return Tuple2.of(rabbitMQPublisher, message2);
                });
            } catch (Exception e) {
                RabbitMQLogging.log.serializationFailure(this.configuration.getChannel(), e);
                return Uni.createFrom().completionStage(message.nack(e)).map(r2 -> {
                    return null;
                });
            }
        }).subscribe();
        Consumer consumer = tuple2 -> {
            if (tuple2 != null) {
                subscriber.onNext(tuple2.getItem2());
                if (this.inflights != Long.MAX_VALUE) {
                    this.upstream.get().request(1L);
                }
            }
        };
        Objects.requireNonNull(subscriber);
        subscribe.with(consumer, subscriber::onError);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        Subscriptions.EmptySubscription emptySubscription = (Flow.Subscription) this.upstream.getAndSet(Subscriptions.CANCELLED);
        Flow.Subscriber<? super Message<?>> subscriber = this.downstream.get();
        if (emptySubscription == null || emptySubscription == Subscriptions.CANCELLED || subscriber == null) {
            return;
        }
        subscriber.onError(th);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        Subscriptions.EmptySubscription emptySubscription = (Flow.Subscription) this.upstream.getAndSet(Subscriptions.CANCELLED);
        Flow.Subscriber<? super Message<?>> subscriber = this.downstream.get();
        if (emptySubscription == null || emptySubscription == Subscriptions.CANCELLED || subscriber == null) {
            return;
        }
        subscriber.onComplete();
    }

    @Override // java.util.concurrent.Flow.Subscription
    public void request(long j) {
        if (j != Long.MAX_VALUE) {
            throw RabbitMQExceptions.ex.illegalStateConsumeWithoutBackPressure();
        }
        this.upstream.get().request(this.inflights);
    }

    @Override // java.util.concurrent.Flow.Subscription
    public void cancel() {
        Subscriptions.EmptySubscription emptySubscription = (Flow.Subscription) this.upstream.getAndSet(Subscriptions.CANCELLED);
        if (emptySubscription == null || emptySubscription == Subscriptions.CANCELLED) {
            return;
        }
        emptySubscription.cancel();
    }

    private Uni<Message<?>> send(RabbitMQPublisher rabbitMQPublisher, Message<?> message, String str, RabbitMQConnectorOutgoingConfiguration rabbitMQConnectorOutgoingConfiguration) {
        int intValue = rabbitMQConnectorOutgoingConfiguration.getReconnectAttempts().intValue();
        int intValue2 = rabbitMQConnectorOutgoingConfiguration.getReconnectInterval().intValue();
        RabbitMQMessageConverter.OutgoingRabbitMQMessage convert = RabbitMQMessageConverter.convert(this.instrumenter, message, str, rabbitMQConnectorOutgoingConfiguration.getDefaultRoutingKey(), this.defaultTtl, this.isTracingEnabled);
        RabbitMQLogging.log.sendingMessageToExchange(str, convert.getRoutingKey());
        return (this.publishConfirms ? rabbitMQPublisher.publishConfirm(str, convert.getRoutingKey(), convert.getProperties(), convert.getBody()).onItem().invoke(l -> {
            OutgoingMessageMetadata.setResultOnMessage(message, l);
        }).replaceWithVoid() : rabbitMQPublisher.publish(str, convert.getRoutingKey(), convert.getProperties(), convert.getBody())).onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(intValue2)).atMost(intValue).onItemOrFailure().transformToUni((r5, th) -> {
            return th != null ? Uni.createFrom().completionStage(message.nack(th)) : Uni.createFrom().completionStage(message.ack());
        }).onItem().transform(r3 -> {
            return message;
        });
    }

    private boolean isCancelled() {
        Subscriptions.EmptySubscription emptySubscription = (Flow.Subscription) this.upstream.get();
        return emptySubscription == Subscriptions.CANCELLED || emptySubscription == null;
    }
}
