/*
 * Decompiled with CFR 0.152.
 */
package org.dromara.mica.mqtt.core.server;

import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.codec.message.MqttPublishMessage;
import org.dromara.mica.mqtt.core.common.MqttPendingPublish;
import org.dromara.mica.mqtt.core.serializer.MqttSerializer;
import org.dromara.mica.mqtt.core.server.MqttServerCreator;
import org.dromara.mica.mqtt.core.server.enums.MessageType;
import org.dromara.mica.mqtt.core.server.listener.MqttProtocolListeners;
import org.dromara.mica.mqtt.core.server.model.ClientInfo;
import org.dromara.mica.mqtt.core.server.model.Message;
import org.dromara.mica.mqtt.core.server.model.Subscribe;
import org.dromara.mica.mqtt.core.server.session.IMqttSessionManager;
import org.dromara.mica.mqtt.core.server.store.IMqttMessageStore;
import org.dromara.mica.mqtt.core.util.TopicUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.TioConfig;
import org.tio.core.intf.Packet;
import org.tio.core.stat.vo.StatVo;
import org.tio.server.TioServerConfig;
import org.tio.server.task.ServerHeartbeatTask;
import org.tio.utils.hutool.StrUtil;
import org.tio.utils.mica.Pair;
import org.tio.utils.page.Page;
import org.tio.utils.page.PageUtils;
import org.tio.utils.timer.SystemTimer;
import org.tio.utils.timer.Timer;
import org.tio.utils.timer.TimerTask;
import org.tio.utils.timer.TimerTaskService;

public final class MqttServer {
    private static final Logger logger = LoggerFactory.getLogger(MqttServer.class);
    private final MqttServerCreator serverCreator;
    private final TioServerConfig serverConfig;
    private final TimerTaskService taskService;
    private final MqttProtocolListeners listeners;
    private final IMqttSessionManager sessionManager;
    private final IMqttMessageStore messageStore;
    private final MqttSerializer mqttSerializer;

    MqttServer(MqttServerCreator serverCreator, TioServerConfig serverConfig, MqttProtocolListeners listeners) {
        this.serverCreator = serverCreator;
        this.serverConfig = serverConfig;
        this.taskService = serverConfig.getTaskService();
        this.listeners = listeners;
        this.sessionManager = serverCreator.getSessionManager();
        this.messageStore = serverCreator.getMessageStore();
        this.mqttSerializer = serverCreator.getMqttSerializer();
    }

    public static MqttServerCreator create() {
        return new MqttServerCreator();
    }

    public TioServerConfig getServerConfig() {
        return this.serverConfig;
    }

    public MqttServerCreator getServerCreator() {
        return this.serverCreator;
    }

    public boolean publish(String clientId, String topic, Object payload) {
        return this.publish(clientId, topic, payload, MqttQoS.QOS0);
    }

    public boolean publish(String clientId, String topic, Object payload, MqttQoS qos) {
        return this.publish(clientId, topic, payload, qos, false);
    }

    public boolean publish(String clientId, String topic, Object payload, boolean retain) {
        return this.publish(clientId, topic, payload, MqttQoS.QOS0, retain);
    }

    public boolean publish(String clientId, String topic, Object payload, MqttQoS qos, boolean retain) {
        ChannelContext context;
        TopicUtil.validateTopicName((String)topic);
        if (retain) {
            Pair retainPair = TopicUtil.retainTopicName((String)topic);
            int timeOut = (Integer)retainPair.getRight();
            if (timeOut < 0) {
                logger.error("MqttPublishMessage topic {} \u4e0d\u7b26\u5408 $retain/${ttl}/topic \u89c4\u5219.", (Object)topic);
                return false;
            }
            topic = (String)retainPair.getLeft();
            this.saveRetainMessage(topic, timeOut, qos, payload);
        }
        if ((context = Tio.getByBsId((TioConfig)this.getServerConfig(), (String)clientId)) == null || context.isClosed()) {
            logger.warn("Mqtt Topic:{} publish to clientId:{} ChannelContext is null may be disconnected.", (Object)topic, (Object)clientId);
            return false;
        }
        Byte subMqttQoS = this.sessionManager.searchSubscribe(topic, clientId);
        if (subMqttQoS == null) {
            logger.warn("Mqtt Topic:{} publish but clientId:{} not subscribed.", (Object)topic, (Object)clientId);
            return false;
        }
        MqttQoS mqttQoS = qos.value() > subMqttQoS ? MqttQoS.valueOf((int)subMqttQoS.byteValue()) : qos;
        return this.publish(context, clientId, topic, payload, mqttQoS, retain);
    }

    public boolean publish(ChannelContext context, String clientId, String topic, Object payload, MqttQoS qos, boolean retain) {
        boolean isHighLevelQoS = MqttQoS.QOS1 == qos || MqttQoS.QOS2 == qos;
        int messageId = isHighLevelQoS ? this.sessionManager.getPacketId(clientId) : -1;
        byte[] newPayload = payload instanceof byte[] ? (byte[])payload : this.mqttSerializer.serialize(payload);
        MqttPublishMessage message = MqttPublishMessage.builder().topicName(topic).payload(newPayload).qos(qos).retained(retain).messageId(messageId).build();
        if (isHighLevelQoS) {
            MqttPendingPublish pendingPublish = new MqttPendingPublish(message, qos);
            this.sessionManager.addPendingPublish(clientId, messageId, pendingPublish);
            pendingPublish.startPublishRetransmissionTimer(this.taskService, context);
        }
        boolean result = Tio.send((ChannelContext)context, (Packet)message);
        logger.debug("MQTT Topic:{} qos:{} retain:{} publish clientId:{} result:{}", new Object[]{topic, qos, retain, clientId, result});
        return result;
    }

    public boolean publishAll(String topic, Object payload) {
        return this.publishAll(topic, payload, MqttQoS.QOS0);
    }

    public boolean publishAll(String topic, Object payload, MqttQoS qos) {
        return this.publishAll(topic, payload, qos, false);
    }

    public boolean publishAll(String topic, Object payload, boolean retain) {
        return this.publishAll(topic, payload, MqttQoS.QOS0, retain);
    }

    public boolean publishAll(String topic, Object payload, MqttQoS qos, boolean retain) {
        List<Subscribe> subscribeList;
        TopicUtil.validateTopicName((String)topic);
        if (retain) {
            Pair retainPair = TopicUtil.retainTopicName((String)topic);
            int timeOut = (Integer)retainPair.getRight();
            if (timeOut < 0) {
                logger.error("MqttPublishMessage topic {} \u4e0d\u7b26\u5408 $retain/${ttl}/topic \u89c4\u5219.", (Object)topic);
                return false;
            }
            topic = (String)retainPair.getLeft();
            this.saveRetainMessage(topic, timeOut, qos, payload);
        }
        if ((subscribeList = this.sessionManager.searchSubscribe(topic)).isEmpty()) {
            logger.debug("Mqtt Topic:{} publishAll but subscribe client list is empty.", (Object)topic);
            return false;
        }
        for (Subscribe subscribe : subscribeList) {
            String clientId = subscribe.getClientId();
            ChannelContext context = Tio.getByBsId((TioConfig)this.getServerConfig(), (String)clientId);
            if (context == null || context.isClosed()) {
                logger.warn("Mqtt Topic:{} publish to clientId:{} channel is null may be disconnected.", (Object)topic, (Object)clientId);
                continue;
            }
            int subMqttQoS = subscribe.getMqttQoS();
            MqttQoS mqttQoS = qos.value() > subMqttQoS ? MqttQoS.valueOf((int)subMqttQoS) : qos;
            this.publish(context, clientId, topic, payload, mqttQoS, false);
        }
        return true;
    }

    public boolean sendToClient(String topic, Message message) {
        String clientId = message.getClientId();
        MqttQoS mqttQoS = MqttQoS.valueOf((int)message.getQos());
        if (StrUtil.isBlank((CharSequence)clientId)) {
            return this.publishAll(topic, message.getPayload(), mqttQoS, message.isRetain());
        }
        return this.publish(clientId, topic, message.getPayload(), mqttQoS, message.isRetain());
    }

    private void saveRetainMessage(String topic, int timeout, MqttQoS mqttQoS, Object payload) {
        Message retainMessage = new Message();
        retainMessage.setTopic(topic);
        retainMessage.setQos(mqttQoS.value());
        retainMessage.setPayload(payload instanceof byte[] ? (byte[])payload : this.mqttSerializer.serialize(payload));
        retainMessage.setMessageType(MessageType.DOWN_STREAM);
        retainMessage.setRetain(false);
        retainMessage.setDup(false);
        retainMessage.setTimestamp(System.currentTimeMillis());
        retainMessage.setNode(this.serverCreator.getNodeName());
        this.messageStore.addRetainMessage(topic, timeout, retainMessage);
    }

    public ClientInfo getClientInfo(String clientId) {
        ChannelContext context = Tio.getByBsId((TioConfig)this.getServerConfig(), (String)clientId);
        if (context == null) {
            return null;
        }
        return ClientInfo.form(this.serverCreator, context, ClientInfo::new);
    }

    public ClientInfo getClientInfo(ChannelContext context) {
        return ClientInfo.form(this.serverCreator, context, ClientInfo::new);
    }

    public List<ClientInfo> getClients() {
        return MqttServer.getClients(this.serverCreator, (TioConfig)this.getServerConfig());
    }

    public static List<ClientInfo> getClients(MqttServerCreator serverCreator, TioConfig tioConfig) {
        return Tio.getAll((TioConfig)tioConfig).stream().map(context -> ClientInfo.form(serverCreator, context, ClientInfo::new)).collect(Collectors.toList());
    }

    public Page<ClientInfo> getClients(Integer pageIndex, Integer pageSize) {
        return MqttServer.getClients(this.serverCreator, (TioConfig)this.getServerConfig(), pageIndex, pageSize);
    }

    public static Page<ClientInfo> getClients(MqttServerCreator serverCreator, TioConfig tioConfig, Integer pageIndex, Integer pageSize) {
        return PageUtils.fromSet((Set)Tio.getAll((TioConfig)tioConfig), (int)pageIndex, (int)pageSize, context -> ClientInfo.form(serverCreator, context, ClientInfo::new));
    }

    public StatVo getStat() {
        return this.serverConfig.getStat();
    }

    public List<Subscribe> getSubscriptions(String clientId) {
        return this.serverCreator.getSessionManager().getSubscriptions(clientId);
    }

    public TimerTask schedule(Runnable command, long delay) {
        return this.schedule(command, delay, null);
    }

    public TimerTask schedule(Runnable command, long delay, Executor executor) {
        return this.taskService.addTask(systemTimer -> new TimerTask(delay, (SystemTimer)systemTimer, executor, command){
            final /* synthetic */ SystemTimer val$systemTimer;
            final /* synthetic */ Executor val$executor;
            final /* synthetic */ Runnable val$command;
            {
                this.val$systemTimer = systemTimer;
                this.val$executor = executor;
                this.val$command = runnable;
                super(x0);
            }

            public void run() {
                try {
                    this.val$systemTimer.add((TimerTask)this);
                    if (this.val$executor == null) {
                        this.val$command.run();
                    } else {
                        this.val$executor.execute(this.val$command);
                    }
                }
                catch (Exception e) {
                    logger.error("Mqtt server schedule error", (Throwable)e);
                }
            }
        });
    }

    public TimerTask scheduleOnce(Runnable command, long delay) {
        return this.scheduleOnce(command, delay, null);
    }

    public TimerTask scheduleOnce(final Runnable command, long delay, final Executor executor) {
        return this.taskService.addTask(systemTimer -> new TimerTask(delay){

            public void run() {
                try {
                    if (executor == null) {
                        command.run();
                    } else {
                        executor.execute(command);
                    }
                }
                catch (Exception e) {
                    logger.error("Mqtt server schedule once error", (Throwable)e);
                }
            }
        });
    }

    public ChannelContext getChannelContext(String clientId) {
        return Tio.getByBsId((TioConfig)this.getServerConfig(), (String)clientId);
    }

    public void close(String clientId) {
        Tio.remove((ChannelContext)this.getChannelContext(clientId), (String)"Mqtt server close this connects.");
    }

    public boolean start() {
        this.taskService.start();
        this.taskService.addTask(systemTimer -> new ServerHeartbeatTask((Timer)systemTimer, this.serverConfig));
        this.listeners.start();
        return true;
    }

    public boolean stop() {
        boolean result = this.listeners.stop();
        ExecutorService mqttExecutor = this.serverCreator.getMqttExecutor();
        try {
            mqttExecutor.shutdown();
        }
        catch (Exception e1) {
            logger.error(e1.getMessage(), (Throwable)e1);
        }
        try {
            result &= mqttExecutor.awaitTermination(10L, TimeUnit.MINUTES);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.error(e.getMessage(), (Throwable)e);
        }
        try {
            this.sessionManager.clean();
        }
        catch (Throwable e) {
            logger.error("MqttServer stop session clean error.", e);
        }
        try {
            this.messageStore.clean();
        }
        catch (Throwable e) {
            logger.error("MqttServer stop message store clean error.", e);
        }
        return result;
    }
}

