package com.github.xingshuangs.iot.protocol.rtsp.service;

import com.github.xingshuangs.iot.common.buff.ByteReadBuff;
import com.github.xingshuangs.iot.exceptions.RtspCommException;
import com.github.xingshuangs.iot.exceptions.SocketRuntimeException;
import com.github.xingshuangs.iot.net.client.TcpClientBasic;
import com.github.xingshuangs.iot.protocol.rtcp.model.RtcpPackageBuilder;
import com.github.xingshuangs.iot.protocol.rtcp.service.RtcpDataStatistics;
import com.github.xingshuangs.iot.protocol.rtp.model.RtpPackage;
import com.github.xingshuangs.iot.protocol.rtp.service.IPayloadParser;
import com.github.xingshuangs.iot.protocol.rtsp.model.interleaved.RtspInterleaved;
import com.github.xingshuangs.iot.utils.HexUtil;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/xingshuangs/iot/protocol/rtsp/service/RtspInterleavedClient.class */
public class RtspInterleavedClient implements IRtspDataStream {
    private static final Logger log = LoggerFactory.getLogger(RtspInterleavedClient.class);
    public static final Integer BUFFER_SIZE = 4096;
    private Consumer<byte[]> commCallback;
    private final IPayloadParser iPayloadParser;
    private final TcpClientBasic rtspClient;
    private CompletableFuture<Void> future;
    private boolean terminal = false;
    private final RtcpDataStatistics statistics = new RtcpDataStatistics();
    private int rtpVideoChannelNumber = 0;
    private int rtcpVideoChannelNumber = 1;
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();

    public void setCommCallback(Consumer<byte[]> consumer) {
        this.commCallback = consumer;
    }

    public int getRtpVideoChannelNumber() {
        return this.rtpVideoChannelNumber;
    }

    public int getRtcpVideoChannelNumber() {
        return this.rtcpVideoChannelNumber;
    }

    public void setRtpVideoChannelNumber(int i) {
        this.rtpVideoChannelNumber = i;
    }

    public void setRtcpVideoChannelNumber(int i) {
        this.rtcpVideoChannelNumber = i;
    }

    public RtspInterleavedClient(IPayloadParser iPayloadParser, TcpClientBasic tcpClientBasic) {
        this.iPayloadParser = iPayloadParser;
        this.rtspClient = tcpClientBasic;
    }

    @Override // com.github.xingshuangs.iot.protocol.rtsp.service.IRtspDataStream
    public CompletableFuture<Void> getFuture() {
        return this.future;
    }

    @Override // com.github.xingshuangs.iot.protocol.rtsp.service.IRtspDataStream
    public void close() {
        this.executorService.shutdown();
        if (this.terminal) {
            return;
        }
        sendData(this.statistics.createReceiverAndByteContent());
        this.terminal = true;
    }

    @Override // com.github.xingshuangs.iot.protocol.rtsp.service.IRtspDataStream
    public void triggerReceive() {
        this.future = CompletableFuture.runAsync(this::waitForReceiveData, this.executorService);
    }

    @Override // com.github.xingshuangs.iot.protocol.rtsp.service.IRtspDataStream
    public void sendData(byte[] bArr) {
        if (this.commCallback != null) {
            this.commCallback.accept(bArr);
        }
        this.rtspClient.write(bArr);
    }

    private void waitForReceiveData() {
        InetSocketAddress socketAddress = this.rtspClient.getSocketAddress();
        log.debug("[RTSP + TCP] Interleaved 开启异步接收数据线程，远程的IP[/{}:{}]", socketAddress.getAddress().getHostAddress(), Integer.valueOf(socketAddress.getPort()));
        while (!this.terminal) {
            try {
            } catch (SocketRuntimeException e) {
                log.error(e.getMessage());
                this.terminal = true;
            } catch (Exception e2) {
                log.error(e2.getMessage());
            }
            if (!this.rtspClient.checkConnected()) {
                this.terminal = true;
                break;
            }
            byte[] readFromServer = readFromServer();
            if (this.commCallback != null) {
                this.commCallback.accept(readFromServer);
            }
            RtspInterleaved fromBytes = RtspInterleaved.fromBytes(readFromServer);
            if (fromBytes.getChannelId() == this.rtpVideoChannelNumber) {
                rtpVideoHandle(fromBytes);
            } else if (fromBytes.getChannelId() == this.rtcpVideoChannelNumber) {
                rtcpVideoHandle(fromBytes);
            }
        }
        log.debug("[RTSP + TCP] Interleaved 关闭异步接收数据线程，远程的IP[/{}:{}]", socketAddress.getAddress().getHostAddress(), Integer.valueOf(socketAddress.getPort()));
    }

    private byte[] readFromServer() {
        byte[] bArr = new byte[4];
        while (bArr[0] != 36) {
            this.rtspClient.read(bArr, 0, 1, 1024, 0, true);
        }
        if (this.rtspClient.read(bArr, 1, 3, 1024, 0, true) != 3) {
            throw new RtspCommException("头读取长度有误");
        }
        int uInt16 = ByteReadBuff.newInstance(bArr, 2).getUInt16();
        byte[] bArr2 = new byte[uInt16 + 4];
        System.arraycopy(bArr, 0, bArr2, 0, bArr.length);
        int read = this.rtspClient.read(bArr2, 4, uInt16, 1024, 0, true);
        if (4 + read == bArr2.length) {
            return bArr2;
        }
        log.error(HexUtil.toHexString(bArr2));
        throw new RtspCommException("数据体读取长度有误，原来长度[" + bArr2.length + "], 现在长度[" + (4 + read) + "]");
    }

    private void rtcpVideoHandle(RtspInterleaved rtspInterleaved) {
        this.statistics.processRtcpPackage(RtcpPackageBuilder.fromBytes(rtspInterleaved.getPayload()));
    }

    private void rtpVideoHandle(RtspInterleaved rtspInterleaved) {
        RtpPackage fromBytes = RtpPackage.fromBytes(rtspInterleaved.getPayload());
        this.iPayloadParser.processPackage(fromBytes);
        this.statistics.processRtpPackage(fromBytes, this::sendData);
    }
}
