/*
 * Decompiled with CFR 0.152.
 */
package net.dreamlu.iot.mqtt.core.server.http.websocket;

import java.nio.ByteBuffer;
import net.dreamlu.iot.mqtt.codec.MqttMessage;
import net.dreamlu.iot.mqtt.core.server.MqttMessageInterceptors;
import net.dreamlu.iot.mqtt.core.server.MqttServerCreator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Tio;
import org.tio.core.TioConfig;
import org.tio.core.intf.Packet;
import org.tio.core.intf.TioHandler;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.utils.buffer.ByteBufferUtil;
import org.tio.utils.hutool.FastByteBuffer;
import org.tio.websocket.common.WsRequest;
import org.tio.websocket.common.WsResponse;
import org.tio.websocket.server.handler.IWsMsgHandler;

public class MqttWsMsgHandler
implements IWsMsgHandler {
    private static final Logger logger = LoggerFactory.getLogger(MqttWsMsgHandler.class);
    private static final String MQTT_WS_MSG_BODY_KEY = "MQTT_WS_MSG_BODY_KEY";
    private final MqttServerCreator serverCreator;
    private final String[] supportedSubProtocols;
    private final TioHandler mqttServerAioHandler;
    private final MqttMessageInterceptors messageInterceptors;

    public MqttWsMsgHandler(MqttServerCreator serverCreator, TioHandler aioHandler) {
        this(serverCreator, new String[]{"mqtt", "mqttv3.1", "mqttv3.1.1"}, aioHandler);
    }

    public MqttWsMsgHandler(MqttServerCreator serverCreator, String[] supportedSubProtocols, TioHandler aioHandler) {
        this.serverCreator = serverCreator;
        this.supportedSubProtocols = supportedSubProtocols;
        this.mqttServerAioHandler = aioHandler;
        this.messageInterceptors = serverCreator.getMessageInterceptors();
    }

    public String[] getSupportedSubProtocols() {
        return this.supportedSubProtocols;
    }

    public HttpResponse handshake(HttpRequest request, HttpResponse httpResponse, ChannelContext channelContext) {
        if (this.serverCreator.isWebsocketEnable()) {
            return httpResponse;
        }
        return null;
    }

    public void onAfterHandshaked(HttpRequest request, HttpResponse response, ChannelContext context) {
        context.computeIfAbsent(MQTT_WS_MSG_BODY_KEY, key -> new FastByteBuffer());
    }

    public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext context) throws Exception {
        FastByteBuffer wsBody = (FastByteBuffer)context.get(MQTT_WS_MSG_BODY_KEY);
        ByteBuffer buffer = MqttWsMsgHandler.getMqttBody(wsBody, bytes);
        if (buffer == null) {
            return null;
        }
        while (buffer.hasRemaining()) {
            int readableLength = buffer.remaining();
            Packet packet = this.mqttServerAioHandler.decode(buffer, 0, 0, readableLength, context);
            if (packet == null) {
                int remaining = buffer.remaining();
                if (remaining > 0) {
                    byte[] data = new byte[remaining];
                    buffer.get(data);
                    wsBody.writeBytes(data);
                }
                return null;
            }
            try {
                this.messageInterceptors.onAfterDecoded(context, (MqttMessage)packet, readableLength);
            }
            catch (Throwable e) {
                logger.error(e.getMessage(), e);
            }
            this.mqttServerAioHandler.handler(packet, context);
            try {
                this.messageInterceptors.onAfterHandled(context, (MqttMessage)packet, readableLength);
            }
            catch (Throwable e) {
                logger.error(e.getMessage(), e);
            }
        }
        return null;
    }

    public WsResponse encodeSubProtocol(Packet packet, TioConfig tioConfig, ChannelContext context) {
        if (packet instanceof MqttMessage) {
            ByteBuffer buffer = this.mqttServerAioHandler.encode(packet, null, context);
            return WsResponse.fromBytes((byte[])buffer.array());
        }
        return null;
    }

    public Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext context) {
        Tio.remove((ChannelContext)context, (String)"Mqtt websocket close.");
        return null;
    }

    public Object onText(WsRequest wsRequest, String text, ChannelContext context) {
        return null;
    }

    private static synchronized ByteBuffer getMqttBody(FastByteBuffer wsBody, byte[] bytes) {
        wsBody.writeBytes(bytes);
        int length = wsBody.size();
        if (length < 2) {
            return null;
        }
        ByteBuffer buffer = wsBody.toBuffer();
        Integer mqttLength = MqttWsMsgHandler.getMqttLength(buffer);
        if (mqttLength == null || length < mqttLength + 2) {
            return null;
        }
        wsBody.reset();
        buffer.rewind();
        return buffer;
    }

    private static Integer getMqttLength(ByteBuffer buffer) {
        short digit;
        ByteBufferUtil.skipBytes((ByteBuffer)buffer, (int)1);
        int remainingLength = 0;
        int multiplier = 1;
        int loops = 0;
        do {
            if (!buffer.hasRemaining()) {
                return null;
            }
            digit = ByteBufferUtil.readUnsignedByte((ByteBuffer)buffer);
            remainingLength += (digit & 0x7F) * multiplier;
            multiplier *= 128;
        } while ((digit & 0x80) != 0 && ++loops < 4);
        return remainingLength;
    }
}

