package org.apache.ratis.grpc.util;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.IntSupplier;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.thirdparty.io.grpc.ClientInterceptor;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.ResourceSemaphore;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutExecutor;
import org.apache.ratis.util.function.StringSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/ratis/grpc/util/StreamObserverWithTimeout.class */
public final class StreamObserverWithTimeout<T> implements StreamObserver<T> {
    public static final Logger LOG = LoggerFactory.getLogger(StreamObserverWithTimeout.class);
    private final String name;
    private final Function<T, String> requestToStringFunction;
    private final TimeDuration timeout;
    private final StreamObserver<T> observer;
    private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
    private final AtomicBoolean isClose = new AtomicBoolean();
    private final AtomicInteger requestCount = new AtomicInteger();
    private final IntSupplier responseCount;
    private final ResourceSemaphore semaphore;

    public static <T> StreamObserverWithTimeout<T> newInstance(String str, Function<T, String> function, TimeDuration timeDuration, int i, Function<ClientInterceptor, StreamObserver<T>> function2) {
        AtomicInteger atomicInteger = new AtomicInteger();
        ResourceSemaphore resourceSemaphore = i > 0 ? new ResourceSemaphore(i) : null;
        ResponseNotifyClientInterceptor responseNotifyClientInterceptor = new ResponseNotifyClientInterceptor(obj -> {
            atomicInteger.getAndIncrement();
            if (resourceSemaphore != null) {
                resourceSemaphore.release();
            }
        });
        atomicInteger.getClass();
        return new StreamObserverWithTimeout<>(str, function, timeDuration, atomicInteger::get, resourceSemaphore, function2.apply(responseNotifyClientInterceptor));
    }

    private StreamObserverWithTimeout(String str, Function<T, String> function, TimeDuration timeDuration, IntSupplier intSupplier, ResourceSemaphore resourceSemaphore, StreamObserver<T> streamObserver) {
        this.name = JavaUtils.getClassSimpleName(getClass()) + "-" + str;
        this.requestToStringFunction = function;
        this.timeout = timeDuration;
        this.responseCount = intSupplier;
        this.semaphore = resourceSemaphore;
        this.observer = streamObserver;
    }

    private void acquire(StringSupplier stringSupplier) {
        if (this.semaphore == null) {
            return;
        }
        boolean z = false;
        while (!z && !this.isClose.get()) {
            try {
                z = this.semaphore.tryAcquire(this.timeout.getDuration(), this.timeout.getUnit());
            } catch (InterruptedException e) {
                throw new IllegalStateException("Interrupted onNext " + stringSupplier, e);
            }
        }
        if (!z) {
            throw new IllegalStateException("Failed onNext " + stringSupplier + ": already closed.");
        }
    }

    public void onNext(T t) {
        StringSupplier stringSupplier = StringSupplier.get(() -> {
            return this.requestToStringFunction.apply(t);
        });
        acquire(stringSupplier);
        this.observer.onNext(t);
        int incrementAndGet = this.requestCount.incrementAndGet();
        this.scheduler.onTimeout(this.timeout, () -> {
            handleTimeout(incrementAndGet, stringSupplier);
        }, LOG, () -> {
            return this.name + ": Timeout check failed for request: " + stringSupplier;
        });
    }

    private void handleTimeout(int i, StringSupplier stringSupplier) {
        if (i > this.responseCount.getAsInt()) {
            onError(new TimeoutIOException(this.name + ": Timed out " + this.timeout + " for sending request " + stringSupplier));
        }
    }

    public void onError(Throwable th) {
        if (this.isClose.compareAndSet(false, true)) {
            this.observer.onError(th);
        }
    }

    public void onCompleted() {
        if (this.isClose.compareAndSet(false, true)) {
            this.observer.onCompleted();
        }
    }

    public String toString() {
        return this.name;
    }
}
