package io.smallrye.reactive.messaging.rabbitmq.internals;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniSubscribe;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.rabbitmq.ClientHolder;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector;
import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.rabbitmq.i18n.RabbitMQLogging;
import io.vertx.mutiny.rabbitmq.RabbitMQClient;
import io.vertx.mutiny.rabbitmq.RabbitMQPublisher;
import io.vertx.rabbitmq.RabbitMQPublisherOptions;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.function.Consumer;
import org.eclipse.microprofile.reactive.messaging.Message;

/* loaded from: input_file:io/smallrye/reactive/messaging/rabbitmq/internals/OutgoingRabbitMQChannel.class */
public class OutgoingRabbitMQChannel {
    private final Flow.Subscriber<Message<?>> subscriber;
    private final RabbitMQConnectorOutgoingConfiguration config;
    private final ClientHolder holder;
    private volatile RabbitMQPublisher publisher;

    public OutgoingRabbitMQChannel(RabbitMQConnector rabbitMQConnector, RabbitMQConnectorOutgoingConfiguration rabbitMQConnectorOutgoingConfiguration) {
        this.config = rabbitMQConnectorOutgoingConfiguration;
        RabbitMQClient createClient = RabbitMQClientHelper.createClient(rabbitMQConnector, rabbitMQConnectorOutgoingConfiguration);
        createClient.getDelegate().addConnectionEstablishedCallback(promise -> {
            UniSubscribe subscribe = RabbitMQClientHelper.declareExchangeIfNeeded(createClient, rabbitMQConnectorOutgoingConfiguration, rabbitMQConnector.configMaps()).subscribe();
            Consumer consumer = str -> {
                promise.complete();
            };
            Objects.requireNonNull(promise);
            subscribe.with(consumer, promise::fail);
        });
        this.holder = new ClientHolder(createClient, rabbitMQConnectorOutgoingConfiguration, rabbitMQConnector.vertx(), null);
        this.subscriber = MultiUtils.via(new RabbitMQMessageSender(rabbitMQConnectorOutgoingConfiguration, this.holder.getOrEstablishConnection().onItem().transformToUni(rabbitMQClient -> {
            return Uni.createFrom().item(RabbitMQPublisher.create(rabbitMQConnector.vertx(), rabbitMQClient, new RabbitMQPublisherOptions().setReconnectAttempts(rabbitMQConnectorOutgoingConfiguration.getReconnectAttempts()).setReconnectInterval(Duration.ofSeconds(rabbitMQConnectorOutgoingConfiguration.getReconnectInterval().intValue()).toMillis()).setMaxInternalQueueSize(rabbitMQConnectorOutgoingConfiguration.getMaxOutgoingInternalQueueSize().orElse(Integer.MAX_VALUE).intValue())));
        }).onItem().call((v0) -> {
            return v0.start();
        }).invoke(rabbitMQPublisher -> {
            this.publisher = rabbitMQPublisher;
        }).onFailure().recoverWithNull().memoize().indefinitely()), multi -> {
            return multi.onFailure().invoke(th -> {
                RabbitMQLogging.log.error(rabbitMQConnectorOutgoingConfiguration.getChannel(), th);
            });
        });
    }

    public Flow.Subscriber<Message<?>> getSubscriber() {
        return this.subscriber;
    }

    public HealthReport.HealthReportBuilder isAlive(HealthReport.HealthReportBuilder healthReportBuilder) {
        return !this.config.getHealthEnabled().booleanValue() ? healthReportBuilder : computeHealthReport(healthReportBuilder);
    }

    private HealthReport.HealthReportBuilder computeHealthReport(HealthReport.HealthReportBuilder healthReportBuilder) {
        RabbitMQClient client = this.holder.client();
        if (client == null) {
            return healthReportBuilder.add(new HealthReport.ChannelInfo(this.config.getChannel(), false));
        }
        boolean z = true;
        if (this.holder.hasBeenConnected()) {
            z = client.isConnected() && client.isOpenChannel();
        }
        return healthReportBuilder.add(new HealthReport.ChannelInfo(this.config.getChannel(), z));
    }

    public HealthReport.HealthReportBuilder isReady(HealthReport.HealthReportBuilder healthReportBuilder) {
        return (this.config.getHealthEnabled().booleanValue() && this.config.getHealthReadinessEnabled().booleanValue()) ? computeHealthReport(healthReportBuilder) : healthReportBuilder;
    }

    public void terminate() {
        if (this.publisher != null) {
            this.publisher.stopAndAwait();
        }
    }
}
