package io.smallrye.reactive.messaging.mqtt;

import io.netty.handler.codec.mqtt.MqttQoS;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.AsyncResultUni;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.mqtt.Clients;
import io.smallrye.reactive.messaging.mqtt.i18n.MqttLogging;
import io.smallrye.reactive.messaging.mqtt.internal.MqttHelpers;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSession;
import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.core.buffer.Buffer;
import jakarta.enterprise.inject.Instance;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.microprofile.reactive.messaging.Message;

/* loaded from: input_file:io/smallrye/reactive/messaging/mqtt/MqttSink.class */
public class MqttSink {
    private final String channel;
    private final String topic;
    private final int qos;
    private final boolean healthEnabled;
    private final Flow.Subscriber<? extends Message<?>> sink;
    private final AtomicBoolean started = new AtomicBoolean();
    private final AtomicBoolean alive = new AtomicBoolean();
    private final AtomicReference<Clients.ClientHolder> reference = new AtomicReference<>();

    public MqttSink(Vertx vertx, MqttConnectorOutgoingConfiguration mqttConnectorOutgoingConfiguration, Instance<MqttClientSessionOptions> instance) {
        MqttClientSessionOptions createClientOptions = MqttHelpers.createClientOptions(mqttConnectorOutgoingConfiguration, instance);
        this.channel = mqttConnectorOutgoingConfiguration.getChannel();
        this.topic = mqttConnectorOutgoingConfiguration.getTopic().orElse(this.channel);
        this.qos = mqttConnectorOutgoingConfiguration.getQos().intValue();
        this.healthEnabled = mqttConnectorOutgoingConfiguration.getHealthEnabled().booleanValue();
        this.sink = MultiUtils.via(multi -> {
            return multi.onSubscription().call(() -> {
                if (this.reference.get() == null) {
                    this.reference.set(Clients.getHolder(vertx, createClientOptions));
                }
                return AsyncResultUni.toUni(handler -> {
                    this.reference.get().start().onComplete(handler);
                }).onItem().invoke(() -> {
                    this.started.set(true);
                    this.alive.set(true);
                });
            }).onItem().transformToUniAndConcatenate(this::send).onCompletion().invoke(() -> {
                Clients.ClientHolder andSet = this.reference.getAndSet(null);
                if (andSet != null) {
                    andSet.close();
                }
                this.alive.set(false);
            }).onFailure().invoke(th -> {
                this.alive.set(false);
                MqttLogging.log.errorWhileSendingMessageToBroker(th);
            });
        });
    }

    private Uni<? extends Message<?>> send(Message<?> message) {
        String str;
        boolean z;
        MqttQoS valueOf;
        MqttClientSession client = this.reference.get().getClient();
        Optional metadata = message.getMetadata(SendingMqttMessageMetadata.class);
        if (metadata.isPresent()) {
            SendingMqttMessageMetadata sendingMqttMessageMetadata = (SendingMqttMessageMetadata) metadata.get();
            str = sendingMqttMessageMetadata.getTopic() == null ? this.topic : sendingMqttMessageMetadata.getTopic();
            valueOf = sendingMqttMessageMetadata.getQosLevel() == null ? MqttQoS.valueOf(this.qos) : sendingMqttMessageMetadata.getQosLevel();
            z = sendingMqttMessageMetadata.isRetain();
        } else {
            str = this.topic;
            z = false;
            valueOf = MqttQoS.valueOf(this.qos);
        }
        if (str == null) {
            MqttLogging.log.ignoringNoTopicSet();
            return Uni.createFrom().item(message);
        }
        String str2 = str;
        MqttQoS mqttQoS = valueOf;
        boolean z2 = z;
        return AsyncResultUni.toUni(handler -> {
            client.publish(str2, convert(message.getPayload()).getDelegate(), mqttQoS, false, z2).onComplete(handler);
        }).onItemOrFailure().transformToUni((num, th) -> {
            if (th != null) {
                return Uni.createFrom().completionStage(message.nack(th).thenApply(r3 -> {
                    return message;
                }));
            }
            OutgoingMessageMetadata.setResultOnMessage(message, num);
            return Uni.createFrom().completionStage(message.ack().thenApply(r32 -> {
                return message;
            }));
        });
    }

    private Buffer convert(Object obj) {
        return obj == null ? Buffer.buffer() : obj instanceof JsonObject ? new Buffer(((JsonObject) obj).toBuffer()) : obj instanceof JsonArray ? new Buffer(((JsonArray) obj).toBuffer()) : ((obj instanceof String) || obj.getClass().isPrimitive()) ? new Buffer(io.vertx.core.buffer.Buffer.buffer(obj.toString())) : obj instanceof byte[] ? new Buffer(io.vertx.core.buffer.Buffer.buffer((byte[]) obj)) : obj instanceof Buffer ? (Buffer) obj : obj instanceof io.vertx.core.buffer.Buffer ? new Buffer((io.vertx.core.buffer.Buffer) obj) : new Buffer(Json.encodeToBuffer(obj));
    }

    public Flow.Subscriber<? extends Message<?>> getSink() {
        return this.sink;
    }

    private boolean isConnected() {
        return this.reference.get() != null && this.reference.get().getClient().isConnected();
    }

    public void isStarted(HealthReport.HealthReportBuilder healthReportBuilder) {
        if (this.healthEnabled) {
            healthReportBuilder.add(this.channel, this.started.get());
        }
    }

    public void isReady(HealthReport.HealthReportBuilder healthReportBuilder) {
        if (this.healthEnabled) {
            healthReportBuilder.add(this.channel, isConnected());
        }
    }

    public void isAlive(HealthReport.HealthReportBuilder healthReportBuilder) {
        if (this.healthEnabled) {
            healthReportBuilder.add(this.channel, this.alive.get());
        }
    }
}
