/*
 * Decompiled with CFR 0.152.
 */
package com.github.netty.protocol.mqtt;

import com.github.netty.core.util.LoggerFactoryX;
import com.github.netty.core.util.LoggerX;
import com.github.netty.protocol.mqtt.MqttInflightResenderChannelHandler;
import com.github.netty.protocol.mqtt.MqttPostOffice;
import com.github.netty.protocol.mqtt.MqttSession;
import com.github.netty.protocol.mqtt.MqttSessionRegistry;
import com.github.netty.protocol.mqtt.MqttUtil;
import com.github.netty.protocol.mqtt.config.BrokerConfiguration;
import com.github.netty.protocol.mqtt.exception.MqttSessionCorruptedException;
import com.github.netty.protocol.mqtt.interception.BrokerInterceptor;
import com.github.netty.protocol.mqtt.security.IAuthenticator;
import com.github.netty.protocol.mqtt.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import io.netty.handler.codec.mqtt.MqttVersion;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public final class MqttConnection {
    private static final LoggerX LOG = LoggerFactoryX.getLogger(MqttConnection.class);
    final Channel channel;
    private BrokerConfiguration brokerConfig;
    private IAuthenticator authenticator;
    private MqttSessionRegistry sessionRegistry;
    private final MqttPostOffice postOffice;
    private boolean connected;
    private boolean authFlushed;
    private final AtomicInteger lastPacketId = new AtomicInteger(0);
    private final BrokerInterceptor interceptor;

    public MqttConnection(BrokerInterceptor interceptor, Channel channel, BrokerConfiguration brokerConfig, IAuthenticator authenticator, MqttSessionRegistry sessionRegistry, MqttPostOffice postOffice) {
        this.interceptor = interceptor;
        this.channel = channel;
        this.brokerConfig = brokerConfig;
        this.authenticator = authenticator;
        this.sessionRegistry = sessionRegistry;
        this.postOffice = postOffice;
        this.connected = false;
        this.authFlushed = false;
    }

    public boolean isAuthFlushed() {
        return this.authFlushed;
    }

    public void setAuthFlushed(boolean authFlushed) {
        this.authFlushed = authFlushed;
    }

    public void handleMessage(MqttMessage msg) {
        MqttMessageType messageType = msg.fixedHeader().messageType();
        LOG.debug("Received MQTT message, type: {}, channel: {}", (Object)messageType, (Object)this.channel);
        switch (messageType) {
            case CONNECT: {
                this.processConnect((MqttConnectMessage)msg);
                break;
            }
            case SUBSCRIBE: {
                this.processSubscribe((MqttSubscribeMessage)msg);
                break;
            }
            case UNSUBSCRIBE: {
                this.processUnsubscribe((MqttUnsubscribeMessage)msg);
                break;
            }
            case PUBLISH: {
                this.processPublish((MqttPublishMessage)msg);
                break;
            }
            case PUBREC: {
                this.processPubRec(msg);
                break;
            }
            case PUBCOMP: {
                this.processPubComp(msg);
                break;
            }
            case PUBREL: {
                this.processPubRel(msg);
                break;
            }
            case DISCONNECT: {
                this.processDisconnect(msg);
                break;
            }
            case PUBACK: {
                this.processPubAck(msg);
                break;
            }
            case PINGREQ: {
                MqttFixedHeader pingHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
                MqttMessage pingResp = new MqttMessage(pingHeader);
                this.channel.writeAndFlush((Object)pingResp).addListener((GenericFutureListener)ChannelFutureListener.CLOSE_ON_FAILURE);
                break;
            }
            default: {
                LOG.error("Unknown MessageType: {}, channel: {}", (Object)messageType, (Object)this.channel);
            }
        }
    }

    private void processPubComp(MqttMessage msg) {
        int messageID = ((MqttMessageIdVariableHeader)msg.variableHeader()).messageId();
        MqttSession session = this.sessionRegistry.retrieve(this.getClientId());
        session.processPubComp(messageID);
    }

    private void processPubRec(MqttMessage msg) {
        int messageID = ((MqttMessageIdVariableHeader)msg.variableHeader()).messageId();
        MqttSession session = this.sessionRegistry.retrieve(this.getClientId());
        session.processPubRec(messageID);
    }

    static MqttMessage pubrel(int messageID) {
        MqttFixedHeader pubRelHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
        return new MqttMessage(pubRelHeader, (Object)MqttMessageIdVariableHeader.from((int)messageID));
    }

    private void processPubAck(MqttMessage msg) {
        int messageID = ((MqttMessageIdVariableHeader)msg.variableHeader()).messageId();
        MqttSession session = this.sessionRegistry.retrieve(this.getClientId());
        session.pubAckReceived(messageID);
    }

    void processConnect(MqttConnectMessage msg) {
        MqttConnectPayload payload = msg.payload();
        String clientId = payload.clientIdentifier();
        String username = payload.userName();
        LOG.trace("Processing CONNECT message. CId={} username: {} channel: {}", clientId, username, this.channel);
        if (this.isNotProtocolVersion(msg, MqttVersion.MQTT_3_1) && this.isNotProtocolVersion(msg, MqttVersion.MQTT_3_1_1)) {
            LOG.warn("MQTT protocol version is not valid. CId={} channel: {}", (Object)clientId, (Object)this.channel);
            this.abortConnection(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION);
            return;
        }
        boolean cleanSession = msg.variableHeader().isCleanSession();
        if (clientId == null || clientId.length() == 0) {
            if (!this.brokerConfig.isAllowZeroByteClientId()) {
                LOG.warn("Broker doesn't permit MQTT empty client ID. Username: {}, channel: {}", (Object)username, (Object)this.channel);
                this.abortConnection(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
                return;
            }
            if (!cleanSession) {
                LOG.warn("MQTT client ID cannot be empty for persistent session. Username: {}, channel: {}", (Object)username, (Object)this.channel);
                this.abortConnection(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
                return;
            }
            clientId = UUID.randomUUID().toString().replace("-", "");
            LOG.debug("Client has connected with integration generated id: {}, username: {}, channel: {}", clientId, username, this.channel);
        }
        if (!this.login(msg, clientId)) {
            this.abortConnection(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
            this.channel.close().addListener((GenericFutureListener)ChannelFutureListener.CLOSE_ON_FAILURE);
            return;
        }
        try {
            LOG.trace("Binding MQTTConnection (channel: {}) to session", (Object)this.channel);
            this.sessionRegistry.bindToSession(this, msg, clientId);
            this.initializeKeepAliveTimeout(this.channel, msg, clientId);
            this.setupInflightResender(this.channel);
            MqttUtil.clientID(this.channel, clientId);
            LOG.trace("CONNACK sent, channel: {}", (Object)this.channel);
            this.interceptor.notifyClientConnected(msg);
        }
        catch (MqttSessionCorruptedException scex) {
            LOG.warn("MQTT session for client ID {} cannot be created, channel: {}", (Object)clientId, (Object)this.channel);
            this.abortConnection(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
        }
    }

    private void setupInflightResender(Channel channel) {
        channel.pipeline().addFirst("inflightResender", (ChannelHandler)new MqttInflightResenderChannelHandler(5000L, TimeUnit.MILLISECONDS));
    }

    private void initializeKeepAliveTimeout(Channel channel, MqttConnectMessage msg, String clientId) {
        int keepAlive = msg.variableHeader().keepAliveTimeSeconds();
        MqttUtil.keepAlive(channel, keepAlive);
        MqttUtil.cleanSession(channel, msg.variableHeader().isCleanSession());
        MqttUtil.clientID(channel, clientId);
        int idleTime = Math.round((float)keepAlive * 1.5f);
        this.setIdleTime(channel.pipeline(), idleTime);
        LOG.debug("Connection has been configured CId={}, keepAlive={}, removeTemporaryQoS2={}, idleTime={}", clientId, keepAlive, msg.variableHeader().isCleanSession(), idleTime);
    }

    private void setIdleTime(ChannelPipeline pipeline, int idleTime) {
        if (pipeline.names().contains("idleStateHandler")) {
            pipeline.remove("idleStateHandler");
        }
        pipeline.addFirst("idleStateHandler", (ChannelHandler)new IdleStateHandler(idleTime, 0, 0));
    }

    private boolean isNotProtocolVersion(MqttConnectMessage msg, MqttVersion version) {
        return msg.variableHeader().version() != version.protocolLevel();
    }

    private void abortConnection(MqttConnectReturnCode returnCode) {
        MqttConnAckMessage badProto = this.connAck(returnCode, false);
        this.channel.writeAndFlush((Object)badProto).addListener((GenericFutureListener)ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
        this.channel.close().addListener((GenericFutureListener)ChannelFutureListener.CLOSE_ON_FAILURE);
    }

    private MqttConnAckMessage connAck(MqttConnectReturnCode returnCode, boolean sessionPresent) {
        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
        MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(returnCode, sessionPresent);
        return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
    }

    private boolean login(MqttConnectMessage msg, String clientId) {
        if (msg.variableHeader().hasUserName()) {
            byte[] pwd = null;
            if (msg.variableHeader().hasPassword()) {
                pwd = msg.payload().password().getBytes(Charset.forName("UTF-8"));
            } else if (!this.brokerConfig.isAllowAnonymous()) {
                LOG.error("Client didn't supply any password and MQTT anonymous mode is disabled CId={}", (Object)clientId);
                return false;
            }
            String login = msg.payload().userName();
            if (!this.authenticator.checkValid(clientId, login, pwd)) {
                LOG.error("Authenticator has rejected the MQTT credentials CId={}, username={}", (Object)clientId, (Object)login);
                return false;
            }
            MqttUtil.userName(this.channel, login);
        } else if (!this.brokerConfig.isAllowAnonymous()) {
            LOG.error("Client didn't supply any credentials and MQTT anonymous mode is disabled. CId={}", (Object)clientId);
            return false;
        }
        return true;
    }

    public void handleConnectionLost() {
        String clientID = MqttUtil.clientID(this.channel);
        String userName = MqttUtil.userName(this.channel);
        if (clientID == null || clientID.isEmpty()) {
            return;
        }
        LOG.info("Notifying connection lost event. CId: {}, channel: {}", (Object)clientID, (Object)this.channel);
        MqttSession session = this.sessionRegistry.retrieve(clientID);
        if (session.hasWill()) {
            this.postOffice.fireWill(session.getWill());
        }
        if (session.isClean()) {
            this.sessionRegistry.remove(clientID);
        } else {
            this.sessionRegistry.disconnect(clientID);
        }
        this.connected = false;
        this.interceptor.notifyClientConnectionLost(clientID, userName);
    }

    void sendConnAck(boolean isSessionAlreadyPresent) {
        this.connected = true;
        MqttConnAckMessage ackMessage = this.connAck(MqttConnectReturnCode.CONNECTION_ACCEPTED, isSessionAlreadyPresent);
        this.channel.writeAndFlush((Object)ackMessage).addListener((GenericFutureListener)ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
    }

    boolean isConnected() {
        return this.connected;
    }

    void dropConnection() {
        this.channel.close().addListener((GenericFutureListener)ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
    }

    void processDisconnect(MqttMessage msg) {
        String clientID = MqttUtil.clientID(this.channel);
        String userName = MqttUtil.userName(this.channel);
        LOG.trace("Start DISCONNECT CId={}, channel: {}", (Object)clientID, (Object)this.channel);
        if (!this.connected) {
            LOG.info("DISCONNECT received on already closed connection, CId={}, channel: {}", (Object)clientID, (Object)this.channel);
            return;
        }
        this.sessionRegistry.disconnect(clientID);
        this.connected = false;
        this.channel.close().addListener((GenericFutureListener)ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
        LOG.trace("Processed DISCONNECT CId={}, channel: {}", (Object)clientID, (Object)this.channel);
        this.interceptor.notifyClientDisconnected(clientID, userName);
    }

    void processSubscribe(MqttSubscribeMessage msg) {
        String clientID = MqttUtil.clientID(this.channel);
        if (!this.connected) {
            LOG.warn("SUBSCRIBE received on already closed connection, CId={}, channel: {}", (Object)clientID, (Object)this.channel);
            this.dropConnection();
            return;
        }
        this.postOffice.subscribeClientToTopics(msg, clientID, MqttUtil.userName(this.channel), this);
    }

    void sendSubAckMessage(int messageID, MqttSubAckMessage ackMessage) {
        String clientId = MqttUtil.clientID(this.channel);
        LOG.trace("Sending SUBACK response CId={}, messageId: {}", (Object)clientId, (Object)messageID);
        this.channel.writeAndFlush((Object)ackMessage).addListener((GenericFutureListener)ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
    }

    private void processUnsubscribe(MqttUnsubscribeMessage msg) {
        List topics = msg.payload().topics();
        String clientID = MqttUtil.clientID(this.channel);
        LOG.trace("Processing UNSUBSCRIBE message. CId={}, topics: {}", (Object)clientID, (Object)topics);
        this.postOffice.unsubscribe(topics, this, msg.variableHeader().messageId());
    }

    void sendUnsubAckMessage(List<String> topics, String clientID, int messageID) {
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
        MqttUnsubAckMessage ackMessage = new MqttUnsubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from((int)messageID));
        LOG.trace("Sending UNSUBACK message. CId={}, messageId: {}, topics: {}", clientID, messageID, topics);
        this.channel.writeAndFlush((Object)ackMessage).addListener((GenericFutureListener)ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
        LOG.trace("Client <{}> unsubscribed from topics <{}>", (Object)clientID, (Object)topics);
    }

    void processPublish(MqttPublishMessage msg) {
        MqttQoS qos = msg.fixedHeader().qosLevel();
        String username = MqttUtil.userName(this.channel);
        String topicName = msg.variableHeader().topicName();
        String clientId = this.getClientId();
        LOG.trace("Processing PUBLISH message. CId={}, topic: {}, messageId: {}, qos: {}", clientId, topicName, msg.variableHeader().packetId(), qos);
        ByteBuf payload = msg.payload();
        boolean retain = msg.fixedHeader().isRetain();
        Topic topic = new Topic(topicName);
        if (!topic.isValid()) {
            LOG.debug("Drop connection because of invalid topic format");
            this.dropConnection();
        }
        switch (qos) {
            case AT_MOST_ONCE: {
                this.postOffice.receivedPublishQos0(topic, username, clientId, payload, retain, msg);
                break;
            }
            case AT_LEAST_ONCE: {
                int messageID = msg.variableHeader().packetId();
                this.postOffice.receivedPublishQos1(this, topic, username, payload, messageID, retain, msg);
                break;
            }
            case EXACTLY_ONCE: {
                int messageID = msg.variableHeader().packetId();
                MqttSession session = this.sessionRegistry.retrieve(clientId);
                session.receivedPublishQos2(messageID, msg);
                this.postOffice.receivedPublishQos2(this, msg, username);
                break;
            }
            default: {
                LOG.error("Unknown QoS-Type:{}", (Object)qos);
            }
        }
    }

    void sendPublishReceived(int messageID) {
        LOG.trace("sendPubRec invoked on channel: {}", (Object)this.channel);
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0);
        MqttPubAckMessage pubRecMessage = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from((int)messageID));
        this.sendIfWritableElseDrop((MqttMessage)pubRecMessage);
    }

    private void processPubRel(MqttMessage msg) {
        MqttSession session = this.sessionRegistry.retrieve(this.getClientId());
        int messageID = ((MqttMessageIdVariableHeader)msg.variableHeader()).messageId();
        session.receivedPubRelQos2(messageID);
        this.sendPubCompMessage(messageID);
    }

    void sendPublish(MqttPublishMessage publishMsg) {
        int packetId = publishMsg.variableHeader().packetId();
        String topicName = publishMsg.variableHeader().topicName();
        String clientId = this.getClientId();
        MqttQoS qos = publishMsg.fixedHeader().qosLevel();
        if (LOG.isTraceEnabled()) {
            LOG.trace("Sending PUBLISH({}) message. MessageId={}, CId={}, topic={}, payload={}", qos, packetId, clientId, topicName, MqttUtil.payload2Str(publishMsg.payload()));
        } else {
            LOG.debug("Sending PUBLISH({}) message. MessageId={}, CId={}, topic={}", qos, packetId, clientId, topicName);
        }
        this.sendIfWritableElseDrop((MqttMessage)publishMsg);
    }

    void sendIfWritableElseDrop(MqttMessage msg) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("OUT {} on channel {}", (Object)msg.fixedHeader().messageType(), (Object)this.channel);
        }
        if (this.channel.isWritable()) {
            if (this.authFlushed) {
                this.channel.write((Object)msg).addListener((GenericFutureListener)ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
            } else {
                this.channel.writeAndFlush((Object)msg).addListener((GenericFutureListener)ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
            }
        }
    }

    public void writabilityChanged() {
        if (this.channel.isWritable()) {
            LOG.debug("Channel {} is again writable", (Object)this.channel);
            MqttSession session = this.sessionRegistry.retrieve(this.getClientId());
            session.writabilityChanged();
        }
    }

    void sendPubAck(int messageID) {
        LOG.trace("sendPubAck invoked");
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
        MqttPubAckMessage pubAckMessage = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from((int)messageID));
        this.sendIfWritableElseDrop((MqttMessage)pubAckMessage);
    }

    private void sendPubCompMessage(int messageID) {
        LOG.trace("Sending PUBCOMP message on channel: {}, messageId: {}", (Object)this.channel, (Object)messageID);
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0);
        MqttMessage pubCompMessage = new MqttMessage(fixedHeader, (Object)MqttMessageIdVariableHeader.from((int)messageID));
        this.sendIfWritableElseDrop(pubCompMessage);
    }

    String getClientId() {
        return MqttUtil.clientID(this.channel);
    }

    public void sendPublishRetainedQos0(Topic topic, MqttQoS qos, ByteBuf payload) {
        MqttPublishMessage publishMsg = MqttConnection.retainedPublish(topic.toString(), qos, payload);
        this.sendPublish(publishMsg);
    }

    public void sendPublishRetainedWithPacketId(Topic topic, MqttQoS qos, ByteBuf payload) {
        int packetId = this.nextPacketId();
        MqttPublishMessage publishMsg = MqttConnection.retainedPublishWithMessageId(topic.toString(), qos, payload, packetId);
        this.sendPublish(publishMsg);
    }

    private static MqttPublishMessage retainedPublish(String topic, MqttQoS qos, ByteBuf message) {
        return MqttConnection.retainedPublishWithMessageId(topic, qos, message, 0);
    }

    private static MqttPublishMessage retainedPublishWithMessageId(String topic, MqttQoS qos, ByteBuf message, int messageId) {
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, true, 0);
        MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic, messageId);
        return new MqttPublishMessage(fixedHeader, varHeader, message);
    }

    void sendPublishNotRetainedQos0(Topic topic, MqttQoS qos, ByteBuf payload) {
        MqttPublishMessage publishMsg = MqttConnection.notRetainedPublish(topic.toString(), qos, payload);
        this.sendPublish(publishMsg);
    }

    static MqttPublishMessage notRetainedPublish(String topic, MqttQoS qos, ByteBuf message) {
        return MqttConnection.notRetainedPublishWithMessageId(topic, qos, message, 0);
    }

    static MqttPublishMessage notRetainedPublishWithMessageId(String topic, MqttQoS qos, ByteBuf message, int messageId) {
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, false, 0);
        MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic, messageId);
        return new MqttPublishMessage(fixedHeader, varHeader, message);
    }

    public void resendNotAckedPublishes() {
        MqttSession session = this.sessionRegistry.retrieve(this.getClientId());
        session.resendInflightNotAcked();
    }

    int nextPacketId() {
        return this.lastPacketId.incrementAndGet();
    }

    InetSocketAddress remoteAddress() {
        return (InetSocketAddress)this.channel.remoteAddress();
    }

    public String toString() {
        return "MQTTConnection{channel=" + this.channel + ", connected=" + this.connected + '}';
    }
}

