package com.github.xingshuangs.iot.protocol.rtp.service;

import com.github.xingshuangs.iot.exceptions.SocketRuntimeException;
import com.github.xingshuangs.iot.net.client.UdpClientBasic;
import com.github.xingshuangs.iot.protocol.rtcp.service.RtcpUdpClient;
import com.github.xingshuangs.iot.protocol.rtp.model.RtpPackage;
import com.github.xingshuangs.iot.protocol.rtsp.service.IRtspDataStream;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/xingshuangs/iot/protocol/rtp/service/RtpUdpClient.class */
public class RtpUdpClient extends UdpClientBasic implements IRtspDataStream {
    private static final Logger log = LoggerFactory.getLogger(RtpUdpClient.class);
    private boolean terminal;
    private Consumer<byte[]> commCallback;
    private IPayloadParser iPayloadParser;
    private CompletableFuture<Void> future;
    private RtcpUdpClient rtcpUdpClient;

    public void setCommCallback(Consumer<byte[]> consumer) {
        this.commCallback = consumer;
    }

    public void setRtcpUdpClient(RtcpUdpClient rtcpUdpClient) {
        this.rtcpUdpClient = rtcpUdpClient;
    }

    public RtpUdpClient(IPayloadParser iPayloadParser) {
        this.terminal = false;
        this.iPayloadParser = iPayloadParser;
    }

    public RtpUdpClient(String str, int i) {
        super(str, i);
        this.terminal = false;
    }

    @Override // com.github.xingshuangs.iot.protocol.rtsp.service.IRtspDataStream
    public CompletableFuture<Void> getFuture() {
        return this.future;
    }

    @Override // com.github.xingshuangs.iot.net.client.UdpClientBasic, com.github.xingshuangs.iot.net.ICommunicable
    public void close() {
        this.terminal = true;
        super.close();
    }

    @Override // com.github.xingshuangs.iot.protocol.rtsp.service.IRtspDataStream
    public void triggerReceive() {
        this.future = CompletableFuture.runAsync(this::waitForReceiveData);
    }

    @Override // com.github.xingshuangs.iot.protocol.rtsp.service.IRtspDataStream
    public void sendData(byte[] bArr) {
        if (this.commCallback != null) {
            this.commCallback.accept(bArr);
        }
        write(bArr);
    }

    private void waitForReceiveData() {
        log.debug("[RTSP + UDP] RTP 开启异步接收数据线程，远程的IP[/{}:{}]", this.serverAddress.getAddress().getHostAddress(), Integer.valueOf(this.serverAddress.getPort()));
        while (!this.terminal) {
            try {
                byte[] read = read();
                if (this.commCallback != null) {
                    this.commCallback.accept(read);
                }
                RtpPackage fromBytes = RtpPackage.fromBytes(read);
                if (read.length > fromBytes.byteArrayLength()) {
                    log.warn("rtp数据还有未处理部分，未处理字节个数[{}]", Integer.valueOf(read.length - fromBytes.byteArrayLength()));
                }
                if (this.rtcpUdpClient != null) {
                    this.rtcpUdpClient.processRtpPackage(fromBytes);
                }
                this.iPayloadParser.processPackage(fromBytes);
            } catch (SocketRuntimeException e) {
                log.error(e.getMessage());
                this.terminal = true;
            } catch (Exception e2) {
                if (!this.terminal) {
                    log.error(e2.getMessage());
                }
            }
        }
        log.debug("[RTSP + UDP] RTP 关闭异步接收数据线程，远程的IP[/{}:{}]", this.serverAddress.getAddress().getHostAddress(), Integer.valueOf(this.serverAddress.getPort()));
    }
}
