/*
 * Decompiled with CFR 0.152.
 */
package com.github.netty.protocol.mqtt.interception;

import com.github.netty.core.util.LoggerFactoryX;
import com.github.netty.core.util.LoggerX;
import com.github.netty.core.util.ThreadPoolX;
import com.github.netty.protocol.mqtt.interception.InterceptAcknowledgedMessage;
import com.github.netty.protocol.mqtt.interception.InterceptConnectMessage;
import com.github.netty.protocol.mqtt.interception.InterceptConnectionLostMessage;
import com.github.netty.protocol.mqtt.interception.InterceptDisconnectMessage;
import com.github.netty.protocol.mqtt.interception.InterceptHandler;
import com.github.netty.protocol.mqtt.interception.InterceptPublishMessage;
import com.github.netty.protocol.mqtt.interception.InterceptSubscribeMessage;
import com.github.netty.protocol.mqtt.interception.InterceptUnsubscribeMessage;
import com.github.netty.protocol.mqtt.interception.Interceptor;
import com.github.netty.protocol.mqtt.subscriptions.Subscription;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

public final class BrokerInterceptor
implements Interceptor {
    private static final LoggerX LOG = LoggerFactoryX.getLogger(BrokerInterceptor.class);
    private final Map<Class<?>, List<InterceptHandler>> handlers = new HashMap();
    private final ExecutorService executor;

    public BrokerInterceptor(int poolSize, List<InterceptHandler> handlers) {
        for (Class<?> messageType : InterceptHandler.ALL_MESSAGE_TYPES) {
            this.handlers.put(messageType, new CopyOnWriteArrayList());
        }
        if (handlers != null) {
            for (InterceptHandler handler : handlers) {
                this.addInterceptHandler(handler);
            }
        }
        this.executor = new ThreadPoolX("MQTT", poolSize);
    }

    public BrokerInterceptor(int poolSize) {
        this(poolSize, null);
    }

    public void stop() {
        LOG.info("Shutting down interceptor thread pool...");
        this.executor.shutdown();
        try {
            LOG.info("Waiting for thread pool tasks to terminate...");
            this.executor.awaitTermination(10L, TimeUnit.SECONDS);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        if (!this.executor.isTerminated()) {
            LOG.warn("Forcing shutdown of interceptor thread pool...");
            this.executor.shutdownNow();
        }
        LOG.info("interceptors stopped");
    }

    @Override
    public void notifyClientConnected(MqttConnectMessage msg) {
        for (InterceptHandler handler : this.handlers.get(InterceptConnectMessage.class)) {
            LOG.debug("Sending MQTT CONNECT message to interceptor. CId={}, interceptorId={}", (Object)msg.payload().clientIdentifier(), (Object)handler.getID());
            this.executor.execute(() -> handler.onConnect(new InterceptConnectMessage(msg)));
        }
    }

    @Override
    public void notifyClientDisconnected(String clientID, String username) {
        for (InterceptHandler handler : this.handlers.get(InterceptDisconnectMessage.class)) {
            LOG.debug("Notifying MQTT client disconnection to interceptor. CId={}, username={}, interceptorId={}", clientID, username, handler.getID());
            this.executor.execute(() -> handler.onDisconnect(new InterceptDisconnectMessage(clientID, username)));
        }
    }

    @Override
    public void notifyClientConnectionLost(String clientID, String username) {
        for (InterceptHandler handler : this.handlers.get(InterceptConnectionLostMessage.class)) {
            LOG.debug("Notifying unexpected MQTT client disconnection to interceptor CId={}, username={}, interceptorId={}", clientID, username, handler.getID());
            this.executor.execute(() -> handler.onConnectionLost(new InterceptConnectionLostMessage(clientID, username)));
        }
    }

    @Override
    public void notifyTopicPublished(MqttPublishMessage msg, String clientID, String username) {
        msg.retain();
        this.executor.execute(() -> {
            try {
                int messageId = msg.variableHeader().messageId();
                String topic = msg.variableHeader().topicName();
                for (InterceptHandler handler : this.handlers.get(InterceptPublishMessage.class)) {
                    LOG.debug("Notifying MQTT PUBLISH message to interceptor. CId={}, messageId={}, topic={}, interceptorId={}", clientID, messageId, topic, handler.getID());
                    handler.onPublish(new InterceptPublishMessage(msg, clientID, username));
                }
            }
            finally {
                msg.release();
            }
        });
    }

    @Override
    public void notifyTopicSubscribed(Subscription sub, String username) {
        for (InterceptHandler handler : this.handlers.get(InterceptSubscribeMessage.class)) {
            LOG.debug("Notifying MQTT SUBSCRIBE message to interceptor. CId={}, topicFilter={}, interceptorId={}", sub.getClientId(), sub.getTopicFilter(), handler.getID());
            this.executor.execute(() -> handler.onSubscribe(new InterceptSubscribeMessage(sub, username)));
        }
    }

    @Override
    public void notifyTopicUnsubscribed(String topic, String clientID, String username) {
        for (InterceptHandler handler : this.handlers.get(InterceptUnsubscribeMessage.class)) {
            LOG.debug("Notifying MQTT UNSUBSCRIBE message to interceptor. CId={}, topic={}, interceptorId={}", clientID, topic, handler.getID());
            this.executor.execute(() -> handler.onUnsubscribe(new InterceptUnsubscribeMessage(topic, clientID, username)));
        }
    }

    @Override
    public void notifyMessageAcknowledged(InterceptAcknowledgedMessage msg) {
        for (InterceptHandler handler : this.handlers.get(InterceptAcknowledgedMessage.class)) {
            LOG.debug("Notifying MQTT ACK message to interceptor. CId={}, messageId={}, topic={}, interceptorId={}", msg.getMsg(), msg.getPacketID(), msg.getTopic(), handler.getID());
            this.executor.execute(() -> handler.onMessageAcknowledged(msg));
        }
    }

    @Override
    public void addInterceptHandler(InterceptHandler interceptHandler) {
        Class<?>[] interceptedMessageTypes = BrokerInterceptor.getInterceptedMessageTypes(interceptHandler);
        LOG.info("Adding MQTT message interceptor. InterceptorId={}, handledMessageTypes={}", (Object)interceptHandler.getID(), (Object)interceptedMessageTypes);
        for (Class<?> interceptMessageType : interceptedMessageTypes) {
            this.handlers.get(interceptMessageType).add(interceptHandler);
        }
    }

    @Override
    public void removeInterceptHandler(InterceptHandler interceptHandler) {
        Class<?>[] interceptedMessageTypes = BrokerInterceptor.getInterceptedMessageTypes(interceptHandler);
        LOG.info("Removing MQTT message interceptor. InterceptorId={}, handledMessageTypes={}", (Object)interceptHandler.getID(), (Object)interceptedMessageTypes);
        for (Class<?> interceptMessageType : interceptedMessageTypes) {
            this.handlers.get(interceptMessageType).remove(interceptHandler);
        }
    }

    private static Class<?>[] getInterceptedMessageTypes(InterceptHandler interceptHandler) {
        Class<?>[] interceptedMessageTypes = interceptHandler.getInterceptedMessageTypes();
        if (interceptedMessageTypes == null) {
            return InterceptHandler.ALL_MESSAGE_TYPES;
        }
        return interceptedMessageTypes;
    }
}

