/*
 * Decompiled with CFR 0.152.
 */
package org.dromara.mica.mqtt.core.server;

import java.io.IOException;
import org.dromara.mica.mqtt.codec.message.MqttMessage;
import org.dromara.mica.mqtt.core.server.MqttMessageInterceptors;
import org.dromara.mica.mqtt.core.server.MqttServerCreator;
import org.dromara.mica.mqtt.core.server.dispatcher.IMqttMessageDispatcher;
import org.dromara.mica.mqtt.core.server.event.IMqttConnectStatusListener;
import org.dromara.mica.mqtt.core.server.model.Message;
import org.dromara.mica.mqtt.core.server.session.IMqttSessionManager;
import org.dromara.mica.mqtt.core.server.store.IMqttMessageStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.intf.Packet;
import org.tio.server.DefaultTioServerListener;
import org.tio.utils.hutool.StrUtil;

public class MqttServerAioListener
extends DefaultTioServerListener {
    private static final Logger logger = LoggerFactory.getLogger(MqttServerAioListener.class);
    private final IMqttMessageStore messageStore;
    private final IMqttSessionManager sessionManager;
    private final IMqttMessageDispatcher messageDispatcher;
    private final IMqttConnectStatusListener connectStatusListener;
    private final MqttMessageInterceptors messageInterceptors;

    public MqttServerAioListener(MqttServerCreator serverCreator) {
        this.messageStore = serverCreator.getMessageStore();
        this.sessionManager = serverCreator.getSessionManager();
        this.messageDispatcher = serverCreator.getMessageDispatcher();
        this.connectStatusListener = serverCreator.getConnectStatusListener();
        this.messageInterceptors = serverCreator.getMessageInterceptors();
    }

    public boolean onHeartbeatTimeout(ChannelContext context, long interval, int heartbeatTimeoutCount) {
        String clientId = context.getBsId();
        logger.info("Mqtt HeartbeatTimeout clientId:{} interval:{} count:{}", new Object[]{clientId, interval, heartbeatTimeoutCount});
        return false;
    }

    public void onAfterConnected(ChannelContext context, boolean isConnected, boolean isReconnect) throws Exception {
        this.messageInterceptors.onAfterConnected(context, isConnected, isReconnect);
    }

    public void onBeforeClose(ChannelContext context, Throwable throwable, String remark, boolean isRemove) {
        context.setAccepted(false);
        String clientId = context.getBsId();
        boolean isNotNormalDisconnect = !context.isBizStatus();
        context.setBizStatus(false);
        if (isNotNormalDisconnect || throwable != null) {
            if (throwable instanceof IOException) {
                logger.error("Mqtt server close clientId:{}, remark:{} isRemove:{} error:{}", new Object[]{clientId, remark, isRemove, throwable.getMessage()});
            } else {
                logger.error("Mqtt server close clientId:{}, remark:{} isRemove:{}", new Object[]{clientId, remark, isRemove, throwable});
            }
        } else {
            logger.info("Mqtt server close clientId:{} remark:{} isRemove:{}", new Object[]{clientId, remark, isRemove});
        }
        if (StrUtil.isBlank((CharSequence)clientId)) {
            return;
        }
        if (isNotNormalDisconnect) {
            this.sendWillMessage(clientId);
        }
        this.cleanSession(clientId);
        this.notify(context, clientId, remark);
    }

    private void sendWillMessage(String clientId) {
        try {
            Message willMessage = this.messageStore.getWillMessage(clientId);
            if (willMessage == null) {
                return;
            }
            boolean result = this.messageDispatcher.send(willMessage);
            logger.debug("Mqtt server clientId:{} send willMessage result:{}.", (Object)clientId, (Object)result);
            this.messageStore.clearWillMessage(clientId);
        }
        catch (Throwable throwable) {
            logger.error("Mqtt server clientId:{} send willMessage error.", (Object)clientId, (Object)throwable);
        }
    }

    private void cleanSession(String clientId) {
        try {
            this.sessionManager.remove(clientId);
        }
        catch (Throwable throwable) {
            logger.error("Mqtt server clientId:{} session clean error.", (Object)clientId, (Object)throwable);
        }
    }

    private void notify(ChannelContext context, String clientId, String remark) {
        String username = context.getUserId();
        try {
            this.connectStatusListener.offline(context, clientId, username, remark);
        }
        catch (Throwable throwable) {
            logger.error("Mqtt server clientId:{} offline notify error.", (Object)clientId, (Object)throwable);
        }
    }

    public void onAfterSent(ChannelContext context, Packet packet, boolean isSentSuccess) throws Exception {
        if (packet instanceof MqttMessage) {
            this.messageInterceptors.onAfterSent(context, (MqttMessage)packet, isSentSuccess);
        }
    }

    public void onAfterReceivedBytes(ChannelContext context, int receivedBytes) throws Exception {
        this.messageInterceptors.onAfterReceivedBytes(context, receivedBytes);
    }

    public void onAfterDecoded(ChannelContext context, Packet packet, int packetSize) throws Exception {
        if (packet instanceof MqttMessage) {
            this.messageInterceptors.onAfterDecoded(context, (MqttMessage)packet, packetSize);
        }
    }

    public void onAfterHandled(ChannelContext context, Packet packet, long cost) throws Exception {
        if (packet instanceof MqttMessage) {
            this.messageInterceptors.onAfterHandled(context, (MqttMessage)packet, cost);
        }
    }
}

