/*
 * Decompiled with CFR 0.152.
 */
package net.dreamlu.iot.mqtt.core.server.support;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import net.dreamlu.iot.mqtt.codec.MqttConnAckMessage;
import net.dreamlu.iot.mqtt.codec.MqttConnectMessage;
import net.dreamlu.iot.mqtt.codec.MqttConnectPayload;
import net.dreamlu.iot.mqtt.codec.MqttConnectReasonCode;
import net.dreamlu.iot.mqtt.codec.MqttConnectVariableHeader;
import net.dreamlu.iot.mqtt.codec.MqttFixedHeader;
import net.dreamlu.iot.mqtt.codec.MqttMessage;
import net.dreamlu.iot.mqtt.codec.MqttMessageBuilders;
import net.dreamlu.iot.mqtt.codec.MqttMessageFactory;
import net.dreamlu.iot.mqtt.codec.MqttMessageIdVariableHeader;
import net.dreamlu.iot.mqtt.codec.MqttMessageType;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttPublishVariableHeader;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.codec.MqttSubAckMessage;
import net.dreamlu.iot.mqtt.codec.MqttSubscribeMessage;
import net.dreamlu.iot.mqtt.codec.MqttTopicSubscription;
import net.dreamlu.iot.mqtt.codec.MqttUnsubAckMessage;
import net.dreamlu.iot.mqtt.codec.MqttUnsubscribeMessage;
import net.dreamlu.iot.mqtt.core.common.MqttPendingPublish;
import net.dreamlu.iot.mqtt.core.common.MqttPendingQos2Publish;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import net.dreamlu.iot.mqtt.core.server.MqttServerProcessor;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerAuthHandler;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerPublishPermission;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerSubscribeValidator;
import net.dreamlu.iot.mqtt.core.server.auth.IMqttServerUniqueIdService;
import net.dreamlu.iot.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import net.dreamlu.iot.mqtt.core.server.enums.MessageType;
import net.dreamlu.iot.mqtt.core.server.event.IMqttConnectStatusListener;
import net.dreamlu.iot.mqtt.core.server.event.IMqttMessageListener;
import net.dreamlu.iot.mqtt.core.server.event.IMqttSessionListener;
import net.dreamlu.iot.mqtt.core.server.model.Message;
import net.dreamlu.iot.mqtt.core.server.session.IMqttSessionManager;
import net.dreamlu.iot.mqtt.core.server.store.IMqttMessageStore;
import net.dreamlu.iot.mqtt.core.util.TopicUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Node;
import org.tio.core.Tio;
import org.tio.core.TioConfig;
import org.tio.core.intf.Packet;
import org.tio.utils.hutool.StrUtil;
import org.tio.utils.timer.TimerTaskService;

public class DefaultMqttServerProcessor
implements MqttServerProcessor {
    private static final Logger logger = LoggerFactory.getLogger(DefaultMqttServerProcessor.class);
    private static final long KEEP_ALIVE_UNIT = 2000L;
    private final MqttServerCreator serverCreator;
    private final long heartbeatTimeout;
    private final IMqttMessageStore messageStore;
    private final IMqttSessionManager sessionManager;
    private final IMqttServerAuthHandler authHandler;
    private final IMqttServerUniqueIdService uniqueIdService;
    private final IMqttServerSubscribeValidator subscribeValidator;
    private final IMqttServerPublishPermission publishPermission;
    private final IMqttMessageDispatcher messageDispatcher;
    private final IMqttConnectStatusListener connectStatusListener;
    private final IMqttSessionListener sessionListener;
    private final IMqttMessageListener messageListener;
    private final TimerTaskService taskService;
    private final ExecutorService executor;

    public DefaultMqttServerProcessor(MqttServerCreator serverCreator, TimerTaskService taskService, ExecutorService executor) {
        this.serverCreator = serverCreator;
        this.heartbeatTimeout = serverCreator.getHeartbeatTimeout() == null ? 120000L : serverCreator.getHeartbeatTimeout();
        this.messageStore = serverCreator.getMessageStore();
        this.sessionManager = serverCreator.getSessionManager();
        this.authHandler = serverCreator.getAuthHandler();
        this.uniqueIdService = serverCreator.getUniqueIdService();
        this.subscribeValidator = serverCreator.getSubscribeValidator();
        this.publishPermission = serverCreator.getPublishPermission();
        this.messageDispatcher = serverCreator.getMessageDispatcher();
        this.connectStatusListener = serverCreator.getConnectStatusListener();
        this.sessionListener = serverCreator.getSessionListener();
        this.messageListener = serverCreator.getMessageListener();
        this.taskService = taskService;
        this.executor = executor;
    }

    @Override
    public void processConnect(ChannelContext context, MqttConnectMessage mqttMessage) {
        boolean willFlag;
        String password;
        String userName;
        MqttConnectPayload payload = mqttMessage.payload();
        String clientId = payload.clientIdentifier();
        String uniqueId = this.uniqueIdService.getUniqueId(context, clientId, userName = payload.userName(), password = payload.password());
        if (StrUtil.isBlank((CharSequence)uniqueId)) {
            DefaultMqttServerProcessor.connAckByReturnCode(clientId, uniqueId, context, MqttConnectReasonCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
            return;
        }
        if (this.authHandler != null && !this.authHandler.verifyAuthenticate(context, uniqueId, clientId, userName, password)) {
            DefaultMqttServerProcessor.connAckByReturnCode(clientId, uniqueId, context, MqttConnectReasonCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
            return;
        }
        context.setAccepted(true);
        ChannelContext otherContext = Tio.getByBsId((TioConfig)context.getTioConfig(), (String)uniqueId);
        if (otherContext != null) {
            Tio.unbindBsId((ChannelContext)otherContext);
            String remark = String.format("uniqueId:[%s] clientId:[%s] now bind on new context id:[%s]", uniqueId, clientId, context.getId());
            Tio.remove((ChannelContext)otherContext, (String)remark);
            this.cleanSession(uniqueId);
        }
        this.sendConnected(context, uniqueId);
        Tio.bindBsId((ChannelContext)context, (String)uniqueId);
        if (StrUtil.isNotBlank((CharSequence)userName)) {
            Tio.bindUser((ChannelContext)context, (String)userName);
        }
        MqttConnectVariableHeader variableHeader = mqttMessage.variableHeader();
        int keepAliveSeconds = variableHeader.keepAliveTimeSeconds();
        long keepAliveTimeout = (long)keepAliveSeconds * 2000L;
        if (keepAliveSeconds > 0 && this.heartbeatTimeout != keepAliveTimeout) {
            context.setHeartbeatTimeout(Long.valueOf(keepAliveTimeout));
        }
        if (willFlag = variableHeader.isWillFlag()) {
            Message willMessage = new Message();
            willMessage.setMessageType(MessageType.DOWN_STREAM);
            willMessage.setFromClientId(uniqueId);
            willMessage.setFromUsername(userName);
            willMessage.setTopic(payload.willTopic());
            byte[] willMessageInBytes = payload.willMessageInBytes();
            if (willMessageInBytes != null) {
                willMessage.setPayload(willMessageInBytes);
            }
            willMessage.setQos(variableHeader.willQos());
            willMessage.setRetain(variableHeader.isWillRetain());
            willMessage.setTimestamp(System.currentTimeMillis());
            Node clientNode = context.getClientNode();
            willMessage.setPeerHost(clientNode.getPeerHost());
            willMessage.setNode(this.serverCreator.getNodeName());
            this.messageStore.addWillMessage(uniqueId, willMessage);
        }
        DefaultMqttServerProcessor.connAckByReturnCode(clientId, uniqueId, context, MqttConnectReasonCode.CONNECTION_ACCEPTED);
        this.executor.execute(() -> {
            try {
                this.connectStatusListener.online(context, uniqueId, userName);
            }
            catch (Throwable e) {
                logger.error("Mqtt server uniqueId:{} clientId:{} online notify error.", new Object[]{uniqueId, clientId, e});
            }
        });
    }

    private static void connAckByReturnCode(String clientId, String uniqueId, ChannelContext context, MqttConnectReasonCode returnCode) {
        MqttConnAckMessage message = MqttMessageBuilders.connAck().returnCode(returnCode).sessionPresent(false).build();
        Tio.send((ChannelContext)context, (Packet)message);
        if (MqttConnectReasonCode.CONNECTION_ACCEPTED == returnCode) {
            logger.info("Connect successful, clientId: {} uniqueId:{}", (Object)clientId, (Object)uniqueId);
        } else {
            logger.error("Connect error - clientId: {} uniqueId:{} returnCode:{}", new Object[]{clientId, uniqueId, returnCode});
        }
    }

    private void sendConnected(ChannelContext context, String uniqueId) {
        Message message = new Message();
        message.setClientId(uniqueId);
        message.setMessageType(MessageType.CONNECT);
        message.setNode(this.serverCreator.getNodeName());
        message.setTimestamp(System.currentTimeMillis());
        Node clientNode = context.getClientNode();
        message.setPeerHost(clientNode.getPeerHost());
        this.messageDispatcher.send(message);
    }

    private void cleanSession(String clientId) {
        try {
            this.sessionManager.remove(clientId);
        }
        catch (Throwable throwable) {
            logger.error("Mqtt server clientId:{} session clean error.", (Object)clientId, (Object)throwable);
        }
    }

    @Override
    public void processPublish(ChannelContext context, MqttPublishMessage message) {
        String clientId = context.getBsId();
        MqttFixedHeader fixedHeader = message.fixedHeader();
        MqttQoS mqttQoS = fixedHeader.qosLevel();
        MqttPublishVariableHeader variableHeader = message.variableHeader();
        String topicName = variableHeader.topicName();
        if (this.publishPermission != null && !this.publishPermission.verifyPermission(context, clientId, topicName, mqttQoS, fixedHeader.isRetain())) {
            logger.error("Mqtt clientId:{} topic:{} no publish permission.", (Object)clientId, (Object)topicName);
            return;
        }
        int packetId = variableHeader.packetId();
        logger.debug("Publish - clientId:{} topicName:{} mqttQoS:{} packetId:{}", new Object[]{clientId, topicName, mqttQoS, packetId});
        switch (mqttQoS) {
            case QOS0: {
                this.invokeListenerForPublish(context, clientId, mqttQoS, topicName, message);
                break;
            }
            case QOS1: {
                this.invokeListenerForPublish(context, clientId, mqttQoS, topicName, message);
                if (packetId == -1) break;
                MqttMessage messageAck = MqttMessageBuilders.pubAck().packetId(packetId).build();
                boolean resultPubAck = Tio.send((ChannelContext)context, (Packet)messageAck);
                logger.debug("Publish - PubAck send clientId:{} topicName:{} mqttQoS:{} packetId:{} result:{}", new Object[]{clientId, topicName, mqttQoS, packetId, resultPubAck});
                break;
            }
            case QOS2: {
                if (packetId == -1) break;
                MqttFixedHeader pubRecFixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.QOS0, false, 0);
                MqttMessage pubRecMessage = new MqttMessage(pubRecFixedHeader, (Object)MqttMessageIdVariableHeader.from((int)packetId));
                boolean resultPubRec = Tio.send((ChannelContext)context, (Packet)pubRecMessage);
                logger.debug("Publish - PubRec send clientId:{} topicName:{} mqttQoS:{} packetId:{} result:{}", new Object[]{clientId, topicName, mqttQoS, packetId, resultPubRec});
                MqttPendingQos2Publish pendingQos2Publish = new MqttPendingQos2Publish(message, pubRecMessage);
                this.sessionManager.addPendingQos2Publish(clientId, packetId, pendingQos2Publish);
                pendingQos2Publish.startPubRecRetransmitTimer(this.taskService, context);
                break;
            }
        }
    }

    @Override
    public void processPubAck(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
        int messageId = variableHeader.messageId();
        String clientId = context.getBsId();
        logger.debug("PubAck - clientId:{}, messageId:{}", (Object)clientId, (Object)messageId);
        MqttPendingPublish pendingPublish = this.sessionManager.getPendingPublish(clientId, messageId);
        if (pendingPublish == null) {
            return;
        }
        pendingPublish.onPubAckReceived();
        this.sessionManager.removePendingPublish(clientId, messageId);
    }

    @Override
    public void processPubRec(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
        String clientId = context.getBsId();
        int messageId = variableHeader.messageId();
        logger.debug("PubRec - clientId:{}, messageId:{}", (Object)clientId, (Object)messageId);
        MqttPendingPublish pendingPublish = this.sessionManager.getPendingPublish(clientId, messageId);
        if (pendingPublish == null) {
            return;
        }
        pendingPublish.onPubAckReceived();
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.QOS1, false, 0);
        MqttMessage pubRelMessage = new MqttMessage(fixedHeader, (Object)variableHeader);
        Tio.send((ChannelContext)context, (Packet)pubRelMessage);
        pendingPublish.setPubRelMessage(pubRelMessage);
        pendingPublish.startPubRelRetransmissionTimer(this.taskService, context);
    }

    @Override
    public void processPubRel(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
        String clientId = context.getBsId();
        int messageId = variableHeader.messageId();
        logger.debug("PubRel - clientId:{}, messageId:{}", (Object)clientId, (Object)messageId);
        MqttPendingQos2Publish pendingQos2Publish = this.sessionManager.getPendingQos2Publish(clientId, messageId);
        if (pendingQos2Publish != null) {
            MqttPublishMessage incomingPublish = pendingQos2Publish.getIncomingPublish();
            String topicName = incomingPublish.variableHeader().topicName();
            MqttFixedHeader incomingFixedHeader = incomingPublish.fixedHeader();
            MqttQoS mqttQoS = incomingFixedHeader.qosLevel();
            this.invokeListenerForPublish(context, clientId, mqttQoS, topicName, incomingPublish);
            pendingQos2Publish.onPubRelReceived();
            this.sessionManager.removePendingQos2Publish(clientId, messageId);
        }
        MqttMessage message = MqttMessageFactory.newMessage((MqttFixedHeader)new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.QOS0, false, 0), (Object)MqttMessageIdVariableHeader.from((int)messageId), null);
        Tio.send((ChannelContext)context, (Packet)message);
    }

    @Override
    public void processPubComp(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
        int messageId = variableHeader.messageId();
        String clientId = context.getBsId();
        logger.debug("PubComp - clientId:{}, messageId:{}", (Object)clientId, (Object)messageId);
        MqttPendingPublish pendingPublish = this.sessionManager.getPendingPublish(clientId, messageId);
        if (pendingPublish != null) {
            pendingPublish.onPubCompReceived();
            this.sessionManager.removePendingPublish(clientId, messageId);
        }
    }

    @Override
    public void processSubscribe(ChannelContext context, MqttSubscribeMessage message) {
        String clientId = context.getBsId();
        int messageId = message.variableHeader().messageId();
        List topicSubscriptionList = message.payload().topicSubscriptions();
        ArrayList<MqttQoS> grantedQosList = new ArrayList<MqttQoS>();
        ArrayList<String> subscribedTopicList = new ArrayList<String>();
        boolean enableSubscribeValidator = this.subscribeValidator != null;
        for (MqttTopicSubscription subscription : topicSubscriptionList) {
            String topicFilter = subscription.topicName();
            TopicUtil.validateTopicFilter((String)topicFilter);
            MqttQoS mqttQoS = subscription.qualityOfService();
            if (enableSubscribeValidator && !this.subscribeValidator.verifyTopicFilter(context, clientId, topicFilter, mqttQoS)) {
                grantedQosList.add(MqttQoS.FAILURE);
                logger.error("Subscribe - clientId:{} topicFilter:{} mqttQoS:{} valid failed messageId:{}", new Object[]{clientId, topicFilter, mqttQoS, messageId});
                continue;
            }
            grantedQosList.add(mqttQoS);
            subscribedTopicList.add(topicFilter);
            this.sessionManager.addSubscribe(topicFilter, clientId, mqttQoS.value());
            logger.info("Subscribe - clientId:{} topicFilter:{} mqttQoS:{} messageId:{}", new Object[]{clientId, topicFilter, mqttQoS, messageId});
            this.publishSubscribedEvent(context, clientId, topicFilter, mqttQoS);
        }
        MqttSubAckMessage subAckMessage = MqttMessageBuilders.subAck().addGrantedQosList(grantedQosList).packetId(messageId).build();
        Tio.send((ChannelContext)context, (Packet)subAckMessage);
        for (String topic : subscribedTopicList) {
            this.executor.submit(() -> {
                List<Message> retainMessageList = this.messageStore.getRetainMessage(topic);
                if (retainMessageList != null && !retainMessageList.isEmpty()) {
                    for (Message retainMessage : retainMessageList) {
                        this.messageDispatcher.send(clientId, retainMessage);
                    }
                }
            });
        }
    }

    private void publishSubscribedEvent(ChannelContext context, String clientId, String topicFilter, MqttQoS mqttQoS) {
        if (this.sessionListener == null) {
            return;
        }
        this.executor.execute(() -> {
            try {
                this.sessionListener.onSubscribed(context, clientId, topicFilter, mqttQoS);
            }
            catch (Throwable e) {
                logger.error("Mqtt server clientId:{} topicFilter:{} onUnsubscribed error.", new Object[]{clientId, topicFilter, e});
            }
        });
    }

    @Override
    public void processUnSubscribe(ChannelContext context, MqttUnsubscribeMessage message) {
        String clientId = context.getBsId();
        int messageId = message.variableHeader().messageId();
        List topicFilterList = message.payload().topics();
        for (String topicFilter : topicFilterList) {
            this.sessionManager.removeSubscribe(topicFilter, clientId);
            this.publishUnsubscribedEvent(context, clientId, topicFilter);
        }
        logger.info("UnSubscribe - clientId:{} Topic:{} messageId:{}", new Object[]{clientId, topicFilterList, messageId});
        MqttUnsubAckMessage unSubMessage = MqttMessageBuilders.unsubAck().packetId(messageId).build();
        Tio.send((ChannelContext)context, (Packet)unSubMessage);
    }

    private void publishUnsubscribedEvent(ChannelContext context, String clientId, String topicFilter) {
        if (this.sessionListener == null) {
            return;
        }
        this.executor.execute(() -> {
            try {
                this.sessionListener.onUnsubscribed(context, clientId, topicFilter);
            }
            catch (Throwable e) {
                logger.error("Mqtt server clientId:{} topicFilter:{} onUnsubscribed error.", new Object[]{clientId, topicFilter, e});
            }
        });
    }

    @Override
    public void processPingReq(ChannelContext context) {
        String clientId = context.getBsId();
        logger.debug("PingReq - clientId:{}", (Object)clientId);
        Tio.send((ChannelContext)context, (Packet)MqttMessage.PINGRESP);
    }

    @Override
    public void processDisConnect(ChannelContext context) {
        String clientId = context.getBsId();
        logger.info("DisConnect - clientId:{} contextId:{}", (Object)clientId, (Object)context.getId());
        context.setBizStatus(true);
        Tio.remove((ChannelContext)context, (String)"Mqtt DisConnect");
    }

    private void invokeListenerForPublish(ChannelContext context, String clientId, MqttQoS mqttQoS, String topicName, MqttPublishMessage publishMessage) {
        MqttFixedHeader fixedHeader = publishMessage.fixedHeader();
        boolean isRetain = fixedHeader.isRetain();
        byte[] payload = publishMessage.payload();
        if (isRetain) {
            if (MqttQoS.QOS0 == mqttQoS || payload == null || payload.length == 0) {
                this.messageStore.clearRetainMessage(topicName);
            } else {
                Message retainMessage = new Message();
                retainMessage.setTopic(topicName);
                retainMessage.setQos(mqttQoS.value());
                retainMessage.setPayload(payload);
                retainMessage.setFromClientId(clientId);
                retainMessage.setMessageType(MessageType.DOWN_STREAM);
                retainMessage.setRetain(true);
                retainMessage.setDup(fixedHeader.isDup());
                retainMessage.setTimestamp(System.currentTimeMillis());
                Node clientNode = context.getClientNode();
                retainMessage.setPeerHost(clientNode.getPeerHost());
                retainMessage.setNode(this.serverCreator.getNodeName());
                this.messageStore.addRetainMessage(topicName, retainMessage);
            }
        }
        MqttPublishVariableHeader variableHeader = publishMessage.variableHeader();
        int packetId = variableHeader.packetId();
        Message message = new Message();
        message.setId(packetId);
        message.setFromClientId(clientId);
        message.setTopic(topicName);
        message.setQos(mqttQoS.value());
        if (payload != null) {
            message.setPayload(payload);
        }
        message.setMessageType(MessageType.UP_STREAM);
        message.setRetain(isRetain);
        message.setDup(fixedHeader.isDup());
        message.setTimestamp(System.currentTimeMillis());
        Node clientNode = context.getClientNode();
        message.setPeerHost(clientNode.getPeerHost());
        message.setNode(this.serverCreator.getNodeName());
        if (this.messageListener != null) {
            this.executor.submit(() -> {
                try {
                    this.messageListener.onMessage(context, clientId, topicName, mqttQoS, publishMessage, message);
                }
                catch (Throwable e) {
                    logger.error(e.getMessage(), e);
                }
            });
        }
        this.executor.submit(() -> {
            try {
                this.messageDispatcher.send(message);
            }
            catch (Throwable e) {
                logger.error(e.getMessage(), e);
            }
        });
    }
}

