/*
 * Decompiled with CFR 0.152.
 */
package com.github.netty.protocol.mqtt;

import com.github.netty.core.util.LoggerFactoryX;
import com.github.netty.core.util.LoggerX;
import com.github.netty.protocol.mqtt.IQueueRepository;
import com.github.netty.protocol.mqtt.MqttClientDescriptor;
import com.github.netty.protocol.mqtt.MqttConnection;
import com.github.netty.protocol.mqtt.MqttSession;
import com.github.netty.protocol.mqtt.exception.MqttSessionCorruptedException;
import com.github.netty.protocol.mqtt.subscriptions.ISubscriptionsDirectory;
import com.github.netty.protocol.mqtt.subscriptions.Subscription;
import com.github.netty.protocol.mqtt.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

public class MqttSessionRegistry {
    private static final LoggerX LOG = LoggerFactoryX.getLogger(MqttSessionRegistry.class);
    private final ConcurrentMap<String, MqttSession> pool = new ConcurrentHashMap<String, MqttSession>();
    private final ISubscriptionsDirectory subscriptionsDirectory;
    private final IQueueRepository queueRepository;
    private final ConcurrentMap<String, Queue<EnqueuedMessage>> queues = new ConcurrentHashMap<String, Queue<EnqueuedMessage>>();

    public MqttSessionRegistry(ISubscriptionsDirectory subscriptionsDirectory, IQueueRepository queueRepository) {
        this.subscriptionsDirectory = subscriptionsDirectory;
        this.queueRepository = queueRepository;
    }

    void bindToSession(MqttConnection mqttConnection, MqttConnectMessage msg, String clientId) {
        MqttSession newSession;
        boolean isSessionAlreadyStored = false;
        PostConnectAction postConnectAction = PostConnectAction.NONE;
        if (!this.pool.containsKey(clientId)) {
            boolean success;
            newSession = this.createNewSession(mqttConnection, msg, clientId);
            MqttSession previous = this.pool.putIfAbsent(clientId, newSession);
            boolean bl = success = previous == null;
            if (success) {
                LOG.trace("case 1, not existing session with CId {}", (Object)clientId);
            } else {
                postConnectAction = this.bindToExistingSession(mqttConnection, msg, clientId, newSession);
                isSessionAlreadyStored = true;
            }
        } else {
            newSession = this.createNewSession(mqttConnection, msg, clientId);
            postConnectAction = this.bindToExistingSession(mqttConnection, msg, clientId, newSession);
            isSessionAlreadyStored = true;
        }
        boolean msgCleanSessionFlag = msg.variableHeader().isCleanSession();
        boolean isSessionAlreadyPresent = !msgCleanSessionFlag && isSessionAlreadyStored;
        mqttConnection.sendConnAck(isSessionAlreadyPresent);
        if (postConnectAction == PostConnectAction.SEND_STORED_MESSAGES) {
            MqttSession session = (MqttSession)this.pool.get(clientId);
            session.sendQueuedMessagesWhileOffline();
        }
    }

    Collection<MqttClientDescriptor> listConnectedClients() {
        return this.pool.values().stream().filter(MqttSession::connected).map(this::createClientDescriptor).filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
    }

    private Optional<MqttClientDescriptor> createClientDescriptor(MqttSession s) {
        String clientID = s.getClientID();
        Optional<InetSocketAddress> remoteAddressOpt = s.remoteAddress();
        return remoteAddressOpt.map(r -> new MqttClientDescriptor(clientID, r.getHostString(), r.getPort()));
    }

    private PostConnectAction bindToExistingSession(MqttConnection mqttConnection, MqttConnectMessage msg, String clientId, MqttSession newSession) {
        PostConnectAction postConnectAction = PostConnectAction.NONE;
        boolean newIsClean = msg.variableHeader().isCleanSession();
        MqttSession oldSession = (MqttSession)this.pool.get(clientId);
        if (newIsClean && oldSession.disconnected()) {
            this.dropQueuesForClient(clientId);
            this.unsubscribe(oldSession);
            boolean result = oldSession.assignState(MqttSession.SessionStatus.DISCONNECTED, MqttSession.SessionStatus.CONNECTING);
            if (!result) {
                throw new MqttSessionCorruptedException("old session was already changed state");
            }
            this.copySessionConfig(msg, oldSession);
            oldSession.bind(mqttConnection);
            result = oldSession.assignState(MqttSession.SessionStatus.CONNECTING, MqttSession.SessionStatus.CONNECTED);
            if (!result) {
                throw new MqttSessionCorruptedException("old session moved in connected state by other thread");
            }
            boolean published = this.pool.replace(clientId, oldSession, oldSession);
            if (!published) {
                throw new MqttSessionCorruptedException("old session was already removed");
            }
            LOG.trace("case 2, oldSession with same CId {} disconnected", (Object)clientId);
        } else if (!newIsClean && oldSession.disconnected()) {
            this.reactivateSubscriptions(oldSession);
            boolean connecting = oldSession.assignState(MqttSession.SessionStatus.DISCONNECTED, MqttSession.SessionStatus.CONNECTING);
            if (!connecting) {
                throw new MqttSessionCorruptedException("old session moved in connected state by other thread");
            }
            oldSession.bind(mqttConnection);
            boolean connected = oldSession.assignState(MqttSession.SessionStatus.CONNECTING, MqttSession.SessionStatus.CONNECTED);
            if (!connected) {
                throw new MqttSessionCorruptedException("old session moved in other state state by other thread");
            }
            boolean published = this.pool.replace(clientId, oldSession, oldSession);
            if (!published) {
                throw new MqttSessionCorruptedException("old session was already removed");
            }
            postConnectAction = PostConnectAction.SEND_STORED_MESSAGES;
            LOG.trace("case 3, oldSession with same CId {} disconnected", (Object)clientId);
        } else if (oldSession.connected()) {
            LOG.trace("case 4, oldSession with same CId {} still connected, force to close", (Object)clientId);
            oldSession.closeImmediately();
            boolean published = this.pool.replace(clientId, oldSession, newSession);
            if (!published) {
                throw new MqttSessionCorruptedException("old session was already removed");
            }
        }
        return postConnectAction;
    }

    private void reactivateSubscriptions(MqttSession session) {
        for (Subscription subscription : session.getSubscriptions()) {
        }
    }

    private void unsubscribe(MqttSession session) {
        for (Subscription existingSub : session.getSubscriptions()) {
            this.subscriptionsDirectory.removeSubscription(existingSub.getTopicFilter(), session.getClientID());
        }
    }

    private MqttSession createNewSession(MqttConnection mqttConnection, MqttConnectMessage msg, String clientId) {
        MqttSession newSession;
        boolean clean = msg.variableHeader().isCleanSession();
        Queue sessionQueue = this.queues.computeIfAbsent(clientId, cli -> this.queueRepository.createQueue((String)cli, clean));
        if (msg.variableHeader().isWillFlag()) {
            MqttSession.Will will = this.createWill(msg);
            newSession = new MqttSession(clientId, clean, will, sessionQueue);
        } else {
            newSession = new MqttSession(clean, clientId, sessionQueue);
        }
        newSession.markConnected();
        newSession.bind(mqttConnection);
        return newSession;
    }

    private void copySessionConfig(MqttConnectMessage msg, MqttSession session) {
        boolean clean = msg.variableHeader().isCleanSession();
        MqttSession.Will will = msg.variableHeader().isWillFlag() ? this.createWill(msg) : null;
        session.update(clean, will);
    }

    private MqttSession.Will createWill(MqttConnectMessage msg) {
        ByteBuf willPayload = Unpooled.copiedBuffer((byte[])msg.payload().willMessageInBytes());
        String willTopic = msg.payload().willTopic();
        boolean retained = msg.variableHeader().isWillRetain();
        MqttQoS qos = MqttQoS.valueOf((int)msg.variableHeader().willQos());
        return new MqttSession.Will(willTopic, willPayload, qos, retained);
    }

    MqttSession retrieve(String clientID) {
        return (MqttSession)this.pool.get(clientID);
    }

    public void remove(String clientID) {
        this.pool.remove(clientID);
    }

    public void disconnect(String clientID) {
        MqttSession session = this.retrieve(clientID);
        if (session == null) {
            LOG.debug("Some other thread already removed the session CId={}", (Object)clientID);
            return;
        }
        session.disconnect();
    }

    private void dropQueuesForClient(String clientId) {
        this.queues.remove(clientId);
    }

    private static enum PostConnectAction {
        NONE,
        SEND_STORED_MESSAGES;

    }

    static final class PubRelMarker
    extends EnqueuedMessage {
        PubRelMarker() {
        }
    }

    static class PublishedMessage
    extends EnqueuedMessage {
        final Topic topic;
        final MqttQoS publishingQos;
        final ByteBuf payload;

        PublishedMessage(Topic topic, MqttQoS publishingQos, ByteBuf payload) {
            this.topic = topic;
            this.publishingQos = publishingQos;
            this.payload = payload;
        }
    }

    public static abstract class EnqueuedMessage {
    }
}

