/*
 * Decompiled with CFR 0.152.
 */
package org.dromara.mica.mqtt.core.server.support;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.dromara.mica.mqtt.codec.MqttMessageFactory;
import org.dromara.mica.mqtt.codec.MqttMessageType;
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.codec.codes.MqttConnectReasonCode;
import org.dromara.mica.mqtt.codec.message.MqttConnAckMessage;
import org.dromara.mica.mqtt.codec.message.MqttConnectMessage;
import org.dromara.mica.mqtt.codec.message.MqttMessage;
import org.dromara.mica.mqtt.codec.message.MqttPubAckMessage;
import org.dromara.mica.mqtt.codec.message.MqttPublishMessage;
import org.dromara.mica.mqtt.codec.message.MqttSubAckMessage;
import org.dromara.mica.mqtt.codec.message.MqttSubscribeMessage;
import org.dromara.mica.mqtt.codec.message.MqttUnSubAckMessage;
import org.dromara.mica.mqtt.codec.message.MqttUnSubscribeMessage;
import org.dromara.mica.mqtt.codec.message.builder.MqttTopicSubscription;
import org.dromara.mica.mqtt.codec.message.header.MqttConnectVariableHeader;
import org.dromara.mica.mqtt.codec.message.header.MqttFixedHeader;
import org.dromara.mica.mqtt.codec.message.header.MqttMessageIdVariableHeader;
import org.dromara.mica.mqtt.codec.message.header.MqttPublishVariableHeader;
import org.dromara.mica.mqtt.codec.message.payload.MqttConnectPayload;
import org.dromara.mica.mqtt.core.common.MqttPendingPublish;
import org.dromara.mica.mqtt.core.common.MqttPendingQos2Publish;
import org.dromara.mica.mqtt.core.server.MqttServerCreator;
import org.dromara.mica.mqtt.core.server.MqttServerProcessor;
import org.dromara.mica.mqtt.core.server.auth.IMqttServerAuthHandler;
import org.dromara.mica.mqtt.core.server.auth.IMqttServerPublishPermission;
import org.dromara.mica.mqtt.core.server.auth.IMqttServerSubscribeValidator;
import org.dromara.mica.mqtt.core.server.auth.IMqttServerUniqueIdService;
import org.dromara.mica.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import org.dromara.mica.mqtt.core.server.enums.MessageType;
import org.dromara.mica.mqtt.core.server.event.IMqttConnectStatusListener;
import org.dromara.mica.mqtt.core.server.event.IMqttMessageListener;
import org.dromara.mica.mqtt.core.server.event.IMqttSessionListener;
import org.dromara.mica.mqtt.core.server.model.Message;
import org.dromara.mica.mqtt.core.server.session.IMqttSessionManager;
import org.dromara.mica.mqtt.core.server.store.IMqttMessageStore;
import org.dromara.mica.mqtt.core.util.TopicUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Node;
import org.tio.core.Tio;
import org.tio.core.TioConfig;
import org.tio.core.intf.Packet;
import org.tio.utils.hutool.StrUtil;
import org.tio.utils.mica.Pair;
import org.tio.utils.timer.TimerTaskService;

public class DefaultMqttServerProcessor
implements MqttServerProcessor {
    private static final Logger logger = LoggerFactory.getLogger(DefaultMqttServerProcessor.class);
    private static final long KEEP_ALIVE_UNIT = 2000L;
    private final MqttServerCreator serverCreator;
    private final long heartbeatTimeout;
    private final IMqttMessageStore messageStore;
    private final IMqttSessionManager sessionManager;
    private final IMqttServerAuthHandler authHandler;
    private final IMqttServerUniqueIdService uniqueIdService;
    private final IMqttServerSubscribeValidator subscribeValidator;
    private final IMqttServerPublishPermission publishPermission;
    private final IMqttMessageDispatcher messageDispatcher;
    private final IMqttConnectStatusListener connectStatusListener;
    private final IMqttSessionListener sessionListener;
    private final IMqttMessageListener messageListener;
    private final TimerTaskService taskService;
    private final ExecutorService executor;

    public DefaultMqttServerProcessor(MqttServerCreator serverCreator, TimerTaskService taskService, ExecutorService executor) {
        this.serverCreator = serverCreator;
        this.heartbeatTimeout = serverCreator.getHeartbeatTimeout() == null ? 120000L : serverCreator.getHeartbeatTimeout();
        this.messageStore = serverCreator.getMessageStore();
        this.sessionManager = serverCreator.getSessionManager();
        this.authHandler = serverCreator.getAuthHandler();
        this.uniqueIdService = serverCreator.getUniqueIdService();
        this.subscribeValidator = serverCreator.getSubscribeValidator();
        this.publishPermission = serverCreator.getPublishPermission();
        this.messageDispatcher = serverCreator.getMessageDispatcher();
        this.connectStatusListener = serverCreator.getConnectStatusListener();
        this.sessionListener = serverCreator.getSessionListener();
        this.messageListener = serverCreator.getMessageListener();
        this.taskService = taskService;
        this.executor = executor;
    }

    @Override
    public void processConnect(ChannelContext context, MqttConnectMessage mqttMessage) {
        boolean willFlag;
        String password;
        String userName;
        MqttConnectPayload payload = mqttMessage.payload();
        String clientId = payload.clientIdentifier();
        String uniqueId = this.uniqueIdService.getUniqueId(context, clientId, userName = payload.username(), password = payload.password());
        if (StrUtil.isBlank((CharSequence)uniqueId)) {
            DefaultMqttServerProcessor.connAckByReturnCode(clientId, uniqueId, context, MqttConnectReasonCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
            return;
        }
        if (this.authHandler != null && !this.authHandler.verifyAuthenticate(context, uniqueId, clientId, userName, password)) {
            DefaultMqttServerProcessor.connAckByReturnCode(clientId, uniqueId, context, MqttConnectReasonCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
            return;
        }
        context.setAccepted(true);
        ChannelContext otherContext = Tio.getByBsId((TioConfig)context.getTioConfig(), (String)uniqueId);
        if (otherContext != null) {
            Tio.unbindBsId((ChannelContext)otherContext);
            String remark = String.format("uniqueId:[%s] clientId:[%s] \u88ab\u8e22\u51fa\uff0c\u8bf7\u68c0\u67e5\u662f\u5426\u6709\u76f8\u540c clientId \u4e92\u8e22\uff0c\u65b0 contextId:[%s]", uniqueId, clientId, context.getId());
            Tio.remove((ChannelContext)otherContext, (String)remark);
            this.cleanSession(uniqueId);
        }
        this.sendConnected(context, uniqueId);
        Tio.bindBsId((ChannelContext)context, (String)uniqueId);
        if (StrUtil.isNotBlank((CharSequence)userName)) {
            Tio.bindUser((ChannelContext)context, (String)userName);
        }
        MqttConnectVariableHeader variableHeader = mqttMessage.variableHeader();
        int keepAliveSeconds = variableHeader.keepAliveTimeSeconds();
        long keepAliveTimeout = (long)keepAliveSeconds * 2000L;
        if (keepAliveSeconds > 0 && this.heartbeatTimeout != keepAliveTimeout) {
            context.setHeartbeatTimeout(Long.valueOf(keepAliveTimeout));
        }
        if (willFlag = variableHeader.isWillFlag()) {
            Message willMessage = new Message();
            willMessage.setMessageType(MessageType.DOWN_STREAM);
            willMessage.setFromClientId(uniqueId);
            willMessage.setFromUsername(userName);
            willMessage.setTopic(payload.willTopic());
            byte[] willMessageInBytes = payload.willMessageInBytes();
            if (willMessageInBytes != null) {
                willMessage.setPayload(willMessageInBytes);
            }
            willMessage.setQos(variableHeader.willQos());
            willMessage.setRetain(variableHeader.isWillRetain());
            willMessage.setTimestamp(System.currentTimeMillis());
            Node clientNode = context.getClientNode();
            willMessage.setPeerHost(clientNode.getPeerHost());
            willMessage.setNode(this.serverCreator.getNodeName());
            this.messageStore.addWillMessage(uniqueId, willMessage);
        }
        DefaultMqttServerProcessor.connAckByReturnCode(clientId, uniqueId, context, MqttConnectReasonCode.CONNECTION_ACCEPTED);
        this.executor.execute(() -> {
            try {
                this.connectStatusListener.online(context, uniqueId, userName);
            }
            catch (Throwable e) {
                logger.error("Mqtt server uniqueId:{} clientId:{} online notify error.", new Object[]{uniqueId, clientId, e});
            }
        });
    }

    private static void connAckByReturnCode(String clientId, String uniqueId, ChannelContext context, MqttConnectReasonCode returnCode) {
        MqttConnAckMessage message = MqttConnAckMessage.builder().returnCode(returnCode).sessionPresent(false).build();
        boolean result = Tio.send((ChannelContext)context, (Packet)message);
        if (returnCode.isAccepted()) {
            logger.info("Connect successful, clientId: {} uniqueId:{} result:{}", new Object[]{clientId, uniqueId, result});
        } else {
            logger.error("Connect error - clientId: {} uniqueId:{} returnCode:{} result:{}", new Object[]{clientId, uniqueId, returnCode, result});
        }
    }

    private void sendConnected(ChannelContext context, String uniqueId) {
        Message message = new Message();
        message.setClientId(uniqueId);
        message.setMessageType(MessageType.CONNECT);
        message.setNode(this.serverCreator.getNodeName());
        message.setTimestamp(System.currentTimeMillis());
        Node clientNode = context.getClientNode();
        message.setPeerHost(clientNode.getPeerHost());
        this.messageDispatcher.send(message);
    }

    private void cleanSession(String clientId) {
        try {
            this.sessionManager.remove(clientId);
        }
        catch (Throwable throwable) {
            logger.error("Mqtt server clientId:{} session clean error.", (Object)clientId, (Object)throwable);
        }
    }

    @Override
    public void processPublish(ChannelContext context, MqttPublishMessage message) {
        String clientId = context.getBsId();
        MqttFixedHeader fixedHeader = message.fixedHeader();
        MqttQoS mqttQoS = fixedHeader.qosLevel();
        MqttPublishVariableHeader variableHeader = message.variableHeader();
        String topicName = variableHeader.topicName();
        if (this.publishPermission != null && !this.publishPermission.verifyPermission(context, clientId, topicName, mqttQoS, fixedHeader.isRetain())) {
            logger.error("Mqtt clientId:{} username:{} topic:{} \u6ca1\u6709\u53d1\u5e03\u6743\u9650\u3002", new Object[]{clientId, context.getUserId(), topicName});
            return;
        }
        int packetId = variableHeader.packetId();
        logger.debug("Publish - clientId:{} topicName:{} mqttQoS:{} packetId:{}", new Object[]{clientId, topicName, mqttQoS, packetId});
        switch (mqttQoS) {
            case QOS0: {
                this.invokeListenerForPublish(context, clientId, mqttQoS, topicName, message);
                break;
            }
            case QOS1: {
                this.invokeListenerForPublish(context, clientId, mqttQoS, topicName, message);
                if (packetId == -1) break;
                MqttMessage messageAck = MqttPubAckMessage.builder().packetId(packetId).build();
                boolean resultPubAck = Tio.send((ChannelContext)context, (Packet)messageAck);
                logger.debug("Publish - PubAck send clientId:{} topicName:{} mqttQoS:{} packetId:{} result:{}", new Object[]{clientId, topicName, mqttQoS, packetId, resultPubAck});
                break;
            }
            case QOS2: {
                if (packetId == -1) break;
                MqttFixedHeader pubRecFixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.QOS0, false, 0);
                MqttMessage pubRecMessage = new MqttMessage(pubRecFixedHeader, (Object)MqttMessageIdVariableHeader.from((int)packetId));
                MqttPendingQos2Publish pendingQos2Publish = new MqttPendingQos2Publish(message, pubRecMessage);
                this.sessionManager.addPendingQos2Publish(clientId, packetId, pendingQos2Publish);
                pendingQos2Publish.startPubRecRetransmitTimer(this.taskService, context);
                boolean resultPubRec = Tio.send((ChannelContext)context, (Packet)pubRecMessage);
                logger.debug("Publish - PubRec send clientId:{} topicName:{} mqttQoS:{} packetId:{} result:{}", new Object[]{clientId, topicName, mqttQoS, packetId, resultPubRec});
                break;
            }
        }
    }

    @Override
    public void processPubAck(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
        int packetId = variableHeader.messageId();
        String clientId = context.getBsId();
        logger.debug("PubAck - clientId:{}, packetId:{}", (Object)clientId, (Object)packetId);
        MqttPendingPublish pendingPublish = this.sessionManager.getPendingPublish(clientId, packetId);
        if (pendingPublish == null) {
            return;
        }
        pendingPublish.onPubAckReceived();
        this.sessionManager.removePendingPublish(clientId, packetId);
    }

    @Override
    public void processPubRec(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
        String clientId = context.getBsId();
        int packetId = variableHeader.messageId();
        logger.debug("PubRec - clientId:{}, packetId:{}", (Object)clientId, (Object)packetId);
        MqttPendingPublish pendingPublish = this.sessionManager.getPendingPublish(clientId, packetId);
        if (pendingPublish == null) {
            return;
        }
        pendingPublish.onPubAckReceived();
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.QOS1, false, 0);
        MqttMessage pubRelMessage = new MqttMessage(fixedHeader, (Object)variableHeader);
        pendingPublish.setPubRelMessage(pubRelMessage);
        pendingPublish.startPubRelRetransmissionTimer(this.taskService, context);
        boolean result = Tio.send((ChannelContext)context, (Packet)pubRelMessage);
        logger.debug("Publish - PubRel send clientId:{} packetId:{} result:{}", new Object[]{clientId, packetId, result});
    }

    @Override
    public void processPubRel(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
        String clientId = context.getBsId();
        int packetId = variableHeader.messageId();
        logger.debug("PubRel - clientId:{}, packetId:{}", (Object)clientId, (Object)packetId);
        MqttPendingQos2Publish pendingQos2Publish = this.sessionManager.getPendingQos2Publish(clientId, packetId);
        if (pendingQos2Publish != null) {
            MqttPublishMessage incomingPublish = pendingQos2Publish.getIncomingPublish();
            String topicName = incomingPublish.variableHeader().topicName();
            MqttFixedHeader incomingFixedHeader = incomingPublish.fixedHeader();
            MqttQoS mqttQoS = incomingFixedHeader.qosLevel();
            this.invokeListenerForPublish(context, clientId, mqttQoS, topicName, incomingPublish);
            pendingQos2Publish.onPubRelReceived();
            this.sessionManager.removePendingQos2Publish(clientId, packetId);
        }
        MqttMessage message = MqttMessageFactory.newMessage((MqttFixedHeader)new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.QOS0, false, 0), (Object)MqttMessageIdVariableHeader.from((int)packetId), null);
        boolean result = Tio.send((ChannelContext)context, (Packet)message);
        logger.debug("Publish - PubComp send clientId:{} packetId:{} result:{}", new Object[]{clientId, packetId, result});
    }

    @Override
    public void processPubComp(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
        int packetId = variableHeader.messageId();
        String clientId = context.getBsId();
        logger.debug("PubComp - clientId:{}, packetId:{}", (Object)clientId, (Object)packetId);
        MqttPendingPublish pendingPublish = this.sessionManager.getPendingPublish(clientId, packetId);
        if (pendingPublish != null) {
            pendingPublish.onPubCompReceived();
            this.sessionManager.removePendingPublish(clientId, packetId);
        }
    }

    @Override
    public void processSubscribe(ChannelContext context, MqttSubscribeMessage message) {
        String clientId = context.getBsId();
        int packetId = message.variableHeader().messageId();
        List topicSubscriptionList = message.payload().topicSubscriptions();
        ArrayList<MqttQoS> grantedQosList = new ArrayList<MqttQoS>();
        ArrayList<String> subscribedTopicList = new ArrayList<String>();
        boolean enableSubscribeValidator = this.subscribeValidator != null;
        for (MqttTopicSubscription subscription : topicSubscriptionList) {
            String topicFilter = subscription.topicFilter();
            TopicUtil.validateTopicFilter((String)topicFilter);
            MqttQoS mqttQoS = subscription.qualityOfService();
            if (enableSubscribeValidator && !this.subscribeValidator.verifyTopicFilter(context, clientId, topicFilter, mqttQoS)) {
                grantedQosList.add(MqttQoS.FAILURE);
                logger.error("Subscribe - clientId:{} username:{} topicFilter:{} mqttQoS:{} \u6ca1\u6709\u8ba2\u9605\u6743\u9650 packetId:{}", new Object[]{clientId, context.getUserId(), topicFilter, mqttQoS, packetId});
                continue;
            }
            grantedQosList.add(mqttQoS);
            subscribedTopicList.add(topicFilter);
            this.sessionManager.addSubscribe(topicFilter, clientId, (int)mqttQoS.value());
            logger.info("Subscribe - clientId:{} topicFilter:{} mqttQoS:{} packetId:{}", new Object[]{clientId, topicFilter, mqttQoS, packetId});
            this.publishSubscribedEvent(context, clientId, topicFilter, mqttQoS);
        }
        MqttSubAckMessage subAckMessage = MqttSubAckMessage.builder().addGrantedQosList(grantedQosList).packetId(packetId).build();
        boolean result = Tio.send((ChannelContext)context, (Packet)subAckMessage);
        logger.info("Subscribe - SubAck send clientId:{} subscribedTopicList:{} packetId:{} result:{}", new Object[]{clientId, subscribedTopicList, packetId, result});
        for (String topic : subscribedTopicList) {
            this.executor.submit(() -> {
                List<Message> retainMessageList = this.messageStore.getRetainMessage(topic);
                if (retainMessageList != null && !retainMessageList.isEmpty()) {
                    for (Message retainMessage : retainMessageList) {
                        this.messageDispatcher.sendRetainMessage(context, clientId, retainMessage);
                    }
                }
            });
        }
    }

    private void publishSubscribedEvent(ChannelContext context, String clientId, String topicFilter, MqttQoS mqttQoS) {
        if (this.sessionListener == null) {
            return;
        }
        this.executor.execute(() -> {
            try {
                this.sessionListener.onSubscribed(context, clientId, topicFilter, mqttQoS);
            }
            catch (Throwable e) {
                logger.error("Mqtt server clientId:{} topicFilter:{} onUnsubscribed error.", new Object[]{clientId, topicFilter, e});
            }
        });
    }

    @Override
    public void processUnSubscribe(ChannelContext context, MqttUnSubscribeMessage message) {
        String clientId = context.getBsId();
        int packetId = message.variableHeader().messageId();
        List topicFilterList = message.payload().topics();
        for (String topicFilter : topicFilterList) {
            this.sessionManager.removeSubscribe(topicFilter, clientId);
            this.publishUnsubscribedEvent(context, clientId, topicFilter);
        }
        logger.info("UnSubscribe - clientId:{} Topic:{} packetId:{}", new Object[]{clientId, topicFilterList, packetId});
        MqttUnSubAckMessage unSubMessage = MqttUnSubAckMessage.builder().packetId(packetId).build();
        boolean result = Tio.send((ChannelContext)context, (Packet)unSubMessage);
        logger.debug("UnSubscribe - UnSubAck send clientId:{} result:{}", (Object)clientId, (Object)result);
    }

    private void publishUnsubscribedEvent(ChannelContext context, String clientId, String topicFilter) {
        if (this.sessionListener == null) {
            return;
        }
        this.executor.execute(() -> {
            try {
                this.sessionListener.onUnsubscribed(context, clientId, topicFilter);
            }
            catch (Throwable e) {
                logger.error("Mqtt server clientId:{} topicFilter:{} onUnsubscribed error.", new Object[]{clientId, topicFilter, e});
            }
        });
    }

    @Override
    public void processPingReq(ChannelContext context) {
        String clientId = context.getBsId();
        boolean result = Tio.send((ChannelContext)context, (Packet)MqttMessage.PINGRESP);
        logger.debug("PingReq - PingResp send clientId:{} result:{}", (Object)clientId, (Object)result);
    }

    @Override
    public void processDisConnect(ChannelContext context) {
        String clientId = context.getBsId();
        logger.info("DisConnect - clientId:{} contextId:{}", (Object)clientId, (Object)context.getId());
        context.setBizStatus(true);
        Tio.remove((ChannelContext)context, (String)"Mqtt DisConnect");
    }

    private void invokeListenerForPublish(ChannelContext context, String clientId, MqttQoS mqttQoS, String topicName, MqttPublishMessage publishMessage) {
        MqttFixedHeader fixedHeader = publishMessage.fixedHeader();
        Node clientNode = context.getClientNode();
        boolean isRetain = fixedHeader.isRetain();
        byte[] payload = publishMessage.payload();
        if (isRetain) {
            Pair retainPair = TopicUtil.retainTopicName((String)topicName);
            int timeOut = (Integer)retainPair.getRight();
            if (timeOut < 0) {
                logger.error("MqttPublishMessage topic {} \u4e0d\u7b26\u5408 $retain/${ttl}/topic \u89c4\u5219.", (Object)topicName);
                return;
            }
            topicName = (String)retainPair.getLeft();
            if (MqttQoS.QOS0 == mqttQoS || payload == null || payload.length == 0) {
                this.messageStore.clearRetainMessage(topicName);
            } else {
                Message retainMessage = new Message();
                retainMessage.setTopic(topicName);
                retainMessage.setQos(mqttQoS.value());
                retainMessage.setPayload(payload);
                retainMessage.setFromClientId(clientId);
                retainMessage.setMessageType(MessageType.DOWN_STREAM);
                retainMessage.setRetain(true);
                retainMessage.setDup(fixedHeader.isDup());
                retainMessage.setTimestamp(System.currentTimeMillis());
                retainMessage.setPeerHost(clientNode.getPeerHost());
                retainMessage.setNode(this.serverCreator.getNodeName());
                this.messageStore.addRetainMessage(topicName, timeOut, retainMessage);
            }
        }
        String topic = topicName;
        if (this.messageListener != null) {
            this.executor.submit(() -> {
                try {
                    this.messageListener.onMessage(context, clientId, topic, mqttQoS, publishMessage);
                }
                catch (Throwable e) {
                    logger.error(e.getMessage(), e);
                }
            });
        }
        MqttPublishVariableHeader variableHeader = publishMessage.variableHeader();
        Message message = new Message();
        message.setId(variableHeader.packetId());
        message.setFromClientId(clientId);
        message.setTopic(topic);
        message.setQos(mqttQoS.value());
        if (payload != null) {
            message.setPayload(payload);
        }
        message.setMessageType(MessageType.UP_STREAM);
        message.setRetain(false);
        message.setDup(fixedHeader.isDup());
        message.setTimestamp(System.currentTimeMillis());
        message.setPeerHost(clientNode.getPeerHost());
        message.setNode(this.serverCreator.getNodeName());
        this.executor.submit(() -> {
            try {
                this.messageDispatcher.send(message);
            }
            catch (Throwable e) {
                logger.error(e.getMessage(), e);
            }
        });
    }
}

