package com.oceanbase.clogproxy.client.connection;

import com.google.protobuf.InvalidProtocolBufferException;
import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.connection.StreamContext;
import com.oceanbase.clogproxy.client.enums.ErrorCode;
import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
import com.oceanbase.clogproxy.client.util.ClientUtil;
import com.oceanbase.clogproxy.common.packet.CompressType;
import com.oceanbase.clogproxy.common.packet.HeaderType;
import com.oceanbase.clogproxy.common.packet.ProtocolVersion;
import com.oceanbase.clogproxy.common.packet.protocol.LogProxyProto;
import com.oceanbase.clogproxy.common.packet.protocol.V1Proto;
import com.oceanbase.clogproxy.common.util.NetworkUtil;
import com.oceanbase.oms.logmessage.LogMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import java.util.concurrent.BlockingQueue;
import net.jpountz.lz4.LZ4FastDecompressor;
import org.apache.commons.lang3.Conversion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/oceanbase/clogproxy/client/connection/ClientHandlerV01.class */
public class ClientHandlerV01 {
    private static final Logger logger = LoggerFactory.getLogger(ClientHandlerV01.class);
    private static final byte[] MAGIC_STRING = {120, 105, 53, 51, 103, 93, 113};
    private static final String CLIENT_IP = NetworkUtil.getLocalIp();
    private final ClientConf config;
    private ConnectionParams params;
    private final BlockingQueue<StreamContext.TransferPacket> recordQueue;
    private final LZ4FastDecompressor fastDecompressor;
    private HandshakeState state = HandshakeState.PROTOCOL_VERSION;
    private String logProxyIp;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.oceanbase.clogproxy.client.connection.ClientHandlerV01$1, reason: invalid class name */
    /* loaded from: input_file:com/oceanbase/clogproxy/client/connection/ClientHandlerV01$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$oceanbase$clogproxy$common$packet$ProtocolVersion = new int[ProtocolVersion.values().length];

        static {
            try {
                $SwitchMap$com$oceanbase$clogproxy$common$packet$ProtocolVersion[ProtocolVersion.V1.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$oceanbase$clogproxy$common$packet$ProtocolVersion[ProtocolVersion.V0.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            $SwitchMap$com$oceanbase$clogproxy$client$connection$ClientHandlerV01$HandshakeState = new int[HandshakeState.values().length];
            try {
                $SwitchMap$com$oceanbase$clogproxy$client$connection$ClientHandlerV01$HandshakeState[HandshakeState.PROTOCOL_VERSION.ordinal()] = 1;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$oceanbase$clogproxy$client$connection$ClientHandlerV01$HandshakeState[HandshakeState.HEADER_CODE.ordinal()] = 2;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$oceanbase$clogproxy$client$connection$ClientHandlerV01$HandshakeState[HandshakeState.RESPONSE_CODE.ordinal()] = 3;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$com$oceanbase$clogproxy$client$connection$ClientHandlerV01$HandshakeState[HandshakeState.MESSAGE.ordinal()] = 4;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$com$oceanbase$clogproxy$client$connection$ClientHandlerV01$HandshakeState[HandshakeState.LOGPROXY_IP.ordinal()] = 5;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$com$oceanbase$clogproxy$client$connection$ClientHandlerV01$HandshakeState[HandshakeState.LOGPROXY_VERSION.ordinal()] = 6;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$com$oceanbase$clogproxy$client$connection$ClientHandlerV01$HandshakeState[HandshakeState.STREAM.ordinal()] = 7;
            } catch (NoSuchFieldError e9) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/oceanbase/clogproxy/client/connection/ClientHandlerV01$HandshakeState.class */
    public enum HandshakeState {
        PROTOCOL_VERSION,
        HEADER_CODE,
        RESPONSE_CODE,
        MESSAGE,
        LOGPROXY_IP,
        LOGPROXY_VERSION,
        STREAM
    }

    public ClientHandlerV01(ClientConf clientConf, ConnectionParams connectionParams, BlockingQueue<StreamContext.TransferPacket> blockingQueue, LZ4FastDecompressor lZ4FastDecompressor) {
        this.config = clientConf;
        this.params = connectionParams;
        this.recordQueue = blockingQueue;
        this.fastDecompressor = lZ4FastDecompressor;
    }

    public void setParams(ConnectionParams connectionParams) {
        this.params = connectionParams;
    }

    public boolean channelRead(boolean z, ByteBuf byteBuf, boolean z2) throws Exception {
        boolean z3 = z2;
        switch (this.state) {
            case PROTOCOL_VERSION:
                if (byteBuf.readableBytes() < 2) {
                    z3 = true;
                    break;
                } else {
                    short readShort = byteBuf.readShort();
                    if (ProtocolVersion.codeOf(readShort) != null) {
                        this.state = HandshakeState.HEADER_CODE;
                        break;
                    } else {
                        resetState();
                        logger.error("unsupport protocol version: {}", Integer.valueOf(readShort));
                        throw new LogProxyClientException(ErrorCode.E_PROTOCOL, "unsupport protocol version: " + ((int) readShort));
                    }
                }
            case HEADER_CODE:
                if (byteBuf.readableBytes() < 4) {
                    z3 = true;
                    break;
                } else {
                    int readInt = byteBuf.readInt();
                    if (readInt != HeaderType.HANDSHAKE_RESPONSE_CLIENT.code() && readInt != HeaderType.ERROR_RESPONSE.code()) {
                        resetState();
                        logger.error("unexpected Header Type, expected: {}({}), income: {}", new Object[]{Integer.valueOf(HeaderType.HANDSHAKE_RESPONSE_CLIENT.code()), HeaderType.HANDSHAKE_RESPONSE_CLIENT.name(), Integer.valueOf(readInt)});
                        throw new LogProxyClientException(ErrorCode.E_HEADER_TYPE, "unexpected Header Type: " + readInt);
                    }
                    this.state = HandshakeState.RESPONSE_CODE;
                    break;
                }
            case RESPONSE_CODE:
                if (byteBuf.readableBytes() < 4) {
                    z3 = true;
                    break;
                } else if (byteBuf.readInt() == 0) {
                    this.state = HandshakeState.LOGPROXY_IP;
                    break;
                } else {
                    this.state = HandshakeState.MESSAGE;
                    break;
                }
            case MESSAGE:
                String decodeStringInt = decodeStringInt(byteBuf);
                if (decodeStringInt == null) {
                    z3 = true;
                    break;
                } else {
                    resetState();
                    logger.error("LogProxy refused handshake request: {}", decodeStringInt);
                    throw new LogProxyClientException(ErrorCode.NO_AUTH, "LogProxy refused handshake request: " + decodeStringInt, true);
                }
            case LOGPROXY_IP:
                this.logProxyIp = decodeStringByte(byteBuf);
                if (this.logProxyIp == null) {
                    z3 = true;
                    break;
                } else {
                    this.state = HandshakeState.LOGPROXY_VERSION;
                    break;
                }
            case LOGPROXY_VERSION:
                String decodeStringByte = decodeStringByte(byteBuf);
                if (decodeStringByte == null) {
                    z3 = true;
                    break;
                } else {
                    logger.info("Connected to LogProxy: {}, {}", this.logProxyIp, decodeStringByte);
                    this.state = HandshakeState.STREAM;
                    break;
                }
            case STREAM:
                parseData(z, byteBuf);
                z3 = true;
                break;
        }
        return z3;
    }

    private static String decodeStringInt(ByteBuf byteBuf) {
        if (byteBuf.readableBytes() < 4) {
            return null;
        }
        byteBuf.markReaderIndex();
        int readInt = byteBuf.readInt();
        if (byteBuf.readableBytes() < readInt) {
            byteBuf.resetReaderIndex();
            return null;
        }
        byte[] bArr = new byte[readInt];
        byteBuf.readBytes(bArr);
        String str = new String(bArr);
        if (str.isEmpty()) {
            throw new RuntimeException("decode string is null or empty");
        }
        return str;
    }

    private static String decodeStringByte(ByteBuf byteBuf) {
        if (byteBuf.readableBytes() < 1) {
            return null;
        }
        byteBuf.markReaderIndex();
        int readByte = byteBuf.readByte();
        if (byteBuf.readableBytes() < readByte) {
            byteBuf.resetReaderIndex();
            return null;
        }
        byte[] bArr = new byte[readByte];
        byteBuf.readBytes(bArr);
        String str = new String(bArr);
        if (str.isEmpty()) {
            throw new RuntimeException("decode string is null or empty");
        }
        return str;
    }

    private void parseData(boolean z, ByteBuf byteBuf) throws LogProxyClientException {
        boolean parseDataV0;
        while (z && byteBuf.readableBytes() >= 2) {
            byteBuf.markReaderIndex();
            short readShort = byteBuf.readShort();
            ProtocolVersion codeOf = ProtocolVersion.codeOf(readShort);
            if (codeOf == null) {
                resetState();
                logger.error("unsupport protocol version: {}", Integer.valueOf(readShort));
                throw new LogProxyClientException(ErrorCode.E_PROTOCOL, "unsupport protocol version: " + ((int) readShort));
            }
            switch (AnonymousClass1.$SwitchMap$com$oceanbase$clogproxy$common$packet$ProtocolVersion[codeOf.ordinal()]) {
                case 1:
                    parseDataV0 = parseDataV1(byteBuf);
                    break;
                case 2:
                default:
                    parseDataV0 = parseDataV0(byteBuf);
                    break;
            }
            if (!parseDataV0) {
                return;
            }
        }
    }

    private boolean parseDataV0(ByteBuf byteBuf) {
        if (byteBuf.readableBytes() < 8) {
            byteBuf.resetReaderIndex();
            return false;
        }
        int readInt = byteBuf.readInt();
        if (readInt != HeaderType.DATA_CLIENT.code()) {
            resetState();
            logger.error("unexpected Header Type, expected: {}({}), income: {}", new Object[]{Integer.valueOf(HeaderType.DATA_CLIENT.code()), HeaderType.DATA_CLIENT.name(), Integer.valueOf(readInt)});
            throw new LogProxyClientException(ErrorCode.E_HEADER_TYPE, "unexpected Header Type: " + readInt);
        }
        if (byteBuf.readableBytes() < byteBuf.readInt()) {
            byteBuf.resetReaderIndex();
            return false;
        }
        byte readByte = byteBuf.readByte();
        if (CompressType.codeOf(readByte) == null) {
            throw new LogProxyClientException(ErrorCode.E_COMPRESS_TYPE, "unexpected Compress Type: " + ((int) readByte));
        }
        int readInt2 = byteBuf.readInt();
        int readInt3 = byteBuf.readInt();
        byte[] bArr = new byte[readInt3];
        byteBuf.readBytes(bArr);
        if (readByte != CompressType.LZ4.code()) {
            parseRecord(bArr);
            return true;
        }
        byte[] bArr2 = new byte[readInt2];
        int decompress = this.fastDecompressor.decompress(bArr, 0, bArr2, 0, readInt2);
        if (decompress != readInt3) {
            throw new LogProxyClientException(ErrorCode.E_LEN, "decompressed length [" + decompress + "] is not expected [" + readInt3 + "]");
        }
        parseRecord(bArr2);
        return true;
    }

    private void parseRecord(byte[] bArr) throws LogProxyClientException {
        int i = 0;
        while (true) {
            int i2 = i;
            if (i2 >= bArr.length) {
                return;
            }
            int swapInt = ByteBufUtil.swapInt(Conversion.byteArrayToInt(bArr, i2 + 4, 0, 0, 4));
            LogMessage logMessage = new LogMessage(false);
            byte[] bArr2 = new byte[swapInt];
            System.arraycopy(bArr, i2 + 8, bArr2, 0, bArr2.length);
            try {
                logMessage.parse(bArr2);
                if (logger.isTraceEnabled()) {
                    logger.trace("Log message: {}", logMessage);
                }
                while (true) {
                    try {
                        this.recordQueue.put(new StreamContext.TransferPacket(logMessage));
                        break;
                    } catch (InterruptedException e) {
                    }
                }
                i = i2 + 8 + swapInt;
            } catch (Exception e2) {
                if (!this.config.isIgnoreUnknownRecordType()) {
                    throw new LogProxyClientException(ErrorCode.E_PARSE, e2);
                }
                logger.debug("Unsupported record type: {}", logMessage);
                i = i2 + 8 + swapInt;
            }
        }
    }

    private boolean parseDataV1(ByteBuf byteBuf) {
        if (byteBuf.readableBytes() < 4) {
            byteBuf.resetReaderIndex();
            return false;
        }
        int readInt = byteBuf.readInt();
        if (byteBuf.readableBytes() < readInt) {
            byteBuf.resetReaderIndex();
            return false;
        }
        byte[] bArr = new byte[readInt];
        byteBuf.readBytes(bArr, 0, readInt);
        try {
            V1Proto.PbPacket parseFrom = V1Proto.PbPacket.parseFrom(bArr);
            if (parseFrom.getCompressType() != CompressType.NONE.code()) {
                throw new LogProxyClientException(ErrorCode.E_COMPRESS_TYPE, "Unsupport Compress Type: " + parseFrom.getCompressType());
            }
            if (parseFrom.getType() != HeaderType.STATUS.code()) {
                throw new LogProxyClientException(ErrorCode.E_HEADER_TYPE, "Unsupport Header Type: " + parseFrom.getType());
            }
            LogProxyProto.RuntimeStatus parseFrom2 = LogProxyProto.RuntimeStatus.parseFrom(parseFrom.getPayload());
            if (parseFrom2 == null) {
                throw new LogProxyClientException(ErrorCode.E_PARSE, "Failed to read PB packet, empty Runtime Status");
            }
            while (true) {
                try {
                    this.recordQueue.put(new StreamContext.TransferPacket(parseFrom2));
                    return true;
                } catch (InterruptedException e) {
                }
            }
        } catch (InvalidProtocolBufferException e2) {
            throw new LogProxyClientException(ErrorCode.E_PARSE, "Failed to read PB packet", (Throwable) e2);
        }
    }

    public void resetState() {
        this.state = HandshakeState.PROTOCOL_VERSION;
    }

    public ByteBuf generateConnectRequest() {
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(MAGIC_STRING.length);
        buffer.writeBytes(MAGIC_STRING);
        buffer.capacity(buffer.capacity() + 2 + 4 + 1);
        buffer.writeShort(ProtocolVersion.V0.code());
        buffer.writeInt(HeaderType.HANDSHAKE_REQUEST_CLIENT.code());
        buffer.writeByte(this.params.getLogType().code());
        int length = CLIENT_IP.length();
        buffer.capacity(buffer.capacity() + length + 4);
        buffer.writeInt(length);
        buffer.writeBytes(CLIENT_IP.getBytes());
        int length2 = this.params.getClientId().length();
        buffer.capacity(buffer.capacity() + length2 + 4);
        buffer.writeInt(length2);
        buffer.writeBytes(this.params.getClientId().getBytes());
        String clientVersion = ClientUtil.getClientVersion();
        int length3 = clientVersion.length();
        buffer.capacity(buffer.capacity() + length3 + 4);
        buffer.writeInt(length3);
        buffer.writeBytes(clientVersion.getBytes());
        int length4 = this.params.getConfigurationString().length();
        buffer.capacity(buffer.capacity() + length4 + 4);
        buffer.writeInt(length4);
        buffer.writeBytes(this.params.getConfigurationString().getBytes());
        return buffer;
    }
}
