/*
 * Decompiled with CFR 0.152.
 */
package com.github.xingshuangs.iot.protocol.rtcp.service;

import com.github.xingshuangs.iot.exceptions.SocketRuntimeException;
import com.github.xingshuangs.iot.net.client.UdpClientBasic;
import com.github.xingshuangs.iot.protocol.rtcp.model.RtcpBasePackage;
import com.github.xingshuangs.iot.protocol.rtcp.model.RtcpPackageBuilder;
import com.github.xingshuangs.iot.protocol.rtcp.service.RtcpDataStatistics;
import com.github.xingshuangs.iot.protocol.rtp.model.RtpPackage;
import com.github.xingshuangs.iot.protocol.rtsp.service.IRtspDataStream;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RtcpUdpClient
extends UdpClientBasic
implements IRtspDataStream {
    private static final Logger log = LoggerFactory.getLogger(RtcpUdpClient.class);
    private boolean terminal = false;
    private final RtcpDataStatistics statistics = new RtcpDataStatistics();
    private Consumer<byte[]> commCallback;
    private CompletableFuture<Void> future;
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();

    public void setCommCallback(Consumer<byte[]> commCallback) {
        this.commCallback = commCallback;
    }

    public RtcpUdpClient() {
    }

    public RtcpUdpClient(String ip, int port) {
        super(ip, port);
    }

    @Override
    public CompletableFuture<Void> getFuture() {
        return this.future;
    }

    @Override
    public void close() {
        this.executorService.shutdown();
        if (!this.terminal) {
            byte[] receiverAndByteContent = this.statistics.createReceiverAndByteContent();
            this.sendData(receiverAndByteContent);
            this.terminal = true;
        }
        super.close();
    }

    @Override
    public void triggerReceive() {
        this.future = CompletableFuture.runAsync(this::waitForReceiveData, this.executorService);
    }

    @Override
    public void sendData(byte[] data) {
        if (this.commCallback != null) {
            this.commCallback.accept(data);
        }
        this.write(data);
    }

    private void waitForReceiveData() {
        log.debug("[RTSP + UDP] RTCP enables asynchronous data receiving thread, remote IP[/{}:{}]", (Object)this.serverAddress.getAddress().getHostAddress(), (Object)this.serverAddress.getPort());
        while (!this.terminal) {
            try {
                byte[] data = this.read();
                if (this.commCallback != null) {
                    this.commCallback.accept(data);
                }
                List<RtcpBasePackage> basePackages = RtcpPackageBuilder.fromBytes(data);
                this.statistics.processRtcpPackage(basePackages);
            }
            catch (SocketRuntimeException e) {
                log.error(e.getMessage());
                this.terminal = true;
                break;
            }
            catch (Exception e) {
                if (this.terminal) continue;
                log.error(e.getMessage(), (Throwable)e);
            }
        }
        log.debug("[RTSP + UDP] RTCP closes asynchronous receiving thread, remote IP[/{}:{}]", (Object)this.serverAddress.getAddress().getHostAddress(), (Object)this.serverAddress.getPort());
    }

    public void processRtpPackage(RtpPackage rtpPackage) {
        this.statistics.processRtpPackage(rtpPackage, this::sendData);
    }
}

