/*
 * Decompiled with CFR 0.152.
 */
package org.dromara.mica.mqtt.core.server.cluster;

import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.codec.message.MqttPublishMessage;
import org.dromara.mica.mqtt.core.server.MqttServer;
import org.dromara.mica.mqtt.core.server.enums.MessageType;
import org.dromara.mica.mqtt.core.server.event.IMqttMessageListener;
import org.dromara.mica.mqtt.core.server.model.Message;
import org.dromara.mica.mqtt.core.server.session.IMqttSessionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.TioConfig;
import org.tio.server.ServerChannelContext;

public class MqttClusterMessageListener {
    private static final Logger logger = LoggerFactory.getLogger(MqttClusterMessageListener.class);
    private final String nodeName;
    private final IMqttMessageListener messageListener;
    private final IMqttSessionManager sessionManager;
    private final MqttServer mqttServer;

    public MqttClusterMessageListener(MqttServer mqttServer) {
        this.nodeName = mqttServer.getServerCreator().getNodeName();
        this.messageListener = mqttServer.getServerCreator().getMessageListener();
        this.sessionManager = mqttServer.getServerCreator().getSessionManager();
        this.mqttServer = mqttServer;
    }

    public void onMessage(Message message) {
        String clientId;
        ChannelContext context;
        MessageType messageType = message.getMessageType();
        String topic = message.getTopic();
        if (MessageType.CONNECT == messageType) {
            String node = message.getNode();
            if (this.nodeName.equals(node)) {
                return;
            }
            String clientId2 = message.getClientId();
            ChannelContext context2 = Tio.getByBsId((TioConfig)this.mqttServer.getServerConfig(), (String)clientId2);
            if (context2 != null) {
                Tio.remove((ChannelContext)context2, (String)("clientId:[" + clientId2 + "] now bind on mqtt node:" + node));
            }
        } else if (MessageType.SUBSCRIBE == messageType) {
            String formClientId = message.getFromClientId();
            ChannelContext context3 = this.mqttServer.getChannelContext(formClientId);
            if (context3 != null) {
                this.sessionManager.addSubscribe(topic, formClientId, message.getQos());
            }
        } else if (MessageType.UNSUBSCRIBE == messageType) {
            String formClientId = message.getFromClientId();
            ChannelContext context4 = this.mqttServer.getChannelContext(formClientId);
            if (context4 != null) {
                this.sessionManager.removeSubscribe(topic, formClientId);
            }
        } else if (MessageType.UP_STREAM == messageType) {
            this.mqttServer.sendToClient(topic, message);
        } else if (MessageType.DOWN_STREAM == messageType) {
            this.mqttServer.sendToClient(topic, message);
        } else if (MessageType.HTTP_API == messageType) {
            MqttQoS mqttQoS = MqttQoS.valueOf((int)message.getQos());
            this.mqttServer.publishAll(topic, message.getPayload(), mqttQoS, message.isRetain());
            try {
                this.onHttpApiMessage(topic, mqttQoS, message);
            }
            catch (Throwable e) {
                logger.error(e.getMessage(), e);
            }
        } else if (MessageType.DISCONNECT == messageType && (context = this.mqttServer.getChannelContext(clientId = message.getClientId())) != null) {
            Tio.remove((ChannelContext)context, (String)("Mqtt server delete clients:" + clientId));
        }
    }

    private void onHttpApiMessage(String topic, MqttQoS mqttQoS, Message message) {
        String clientId = message.getClientId();
        ServerChannelContext context = new ServerChannelContext((TioConfig)this.mqttServer.getServerConfig());
        context.setBsId(clientId);
        context.setUserId(MessageType.HTTP_API.name());
        MqttPublishMessage publishMessage = MqttPublishMessage.builder().topicName(topic).qos(mqttQoS).retained(message.isRetain()).payload(message.getPayload()).build();
        this.messageListener.onMessage((ChannelContext)context, clientId, topic, mqttQoS, publishMessage);
    }
}

