/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.awssdk.utils.async;

import java.util.Arrays;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Validate;

@SdkProtectedApi
public final class SimplePublisher<T>
implements Publisher<T> {
    private static final Logger log = Logger.loggerFor(SimplePublisher.class);
    private final AtomicLong outstandingDemand = new AtomicLong();
    private final Queue<QueueEntry<T>> eventQueue = new ConcurrentLinkedQueue<QueueEntry<T>>();
    private final Set<QueueEntry.Type> entryTypesToFail = new CopyOnWriteArraySet<QueueEntry.Type>();
    private final AtomicBoolean processingQueue = new AtomicBoolean(false);
    private final AtomicReference<Supplier<RuntimeException>> rejectException = new AtomicReference();
    private Subscriber<? super T> subscriber;

    public CompletableFuture<Void> send(T value) {
        log.trace(() -> "Received send() with " + value);
        OnNextQueueEntry entry = new OnNextQueueEntry(value);
        try {
            Validate.notNull(value, "Null cannot be written.", new Object[0]);
            this.validateRejectState();
            this.eventQueue.add(entry);
            this.processEventQueue();
        }
        catch (RuntimeException t) {
            entry.resultFuture.completeExceptionally(t);
        }
        return entry.resultFuture;
    }

    public CompletableFuture<Void> complete() {
        log.trace(() -> "Received complete()");
        OnCompleteQueueEntry entry = new OnCompleteQueueEntry();
        try {
            this.validateRejectState();
            this.setRejectExceptionOrThrow(() -> new IllegalStateException("complete() has been invoked"));
            this.eventQueue.add(entry);
            this.processEventQueue();
        }
        catch (RuntimeException t) {
            entry.resultFuture.completeExceptionally(t);
        }
        return entry.resultFuture;
    }

    public CompletableFuture<Void> error(Throwable error) {
        log.trace(() -> "Received error() with " + error, error);
        OnErrorQueueEntry entry = new OnErrorQueueEntry(error);
        try {
            this.validateRejectState();
            this.setRejectExceptionOrThrow(() -> new IllegalStateException("error() has been invoked"));
            this.eventQueue.add(entry);
            this.processEventQueue();
        }
        catch (RuntimeException t) {
            entry.resultFuture.completeExceptionally(t);
        }
        return entry.resultFuture;
    }

    public void subscribe(Subscriber<? super T> s) {
        if (this.subscriber != null) {
            s.onSubscribe((Subscription)new NoOpSubscription());
            s.onError((Throwable)new IllegalStateException("Only one subscription may be active at a time."));
        }
        this.subscriber = s;
        s.onSubscribe((Subscription)new SubscriptionImpl());
        this.processEventQueue();
    }

    private void processEventQueue() {
        do {
            if (!this.processingQueue.compareAndSet(false, true)) {
                return;
            }
            try {
                this.doProcessQueue();
            }
            catch (Throwable e) {
                this.panicAndDie(e);
                break;
            }
            finally {
                this.processingQueue.set(false);
            }
        } while (this.shouldProcessQueueEntry(this.eventQueue.peek()));
    }

    private void doProcessQueue() {
        QueueEntry<T> entry;
        while (this.shouldProcessQueueEntry(entry = this.eventQueue.peek())) {
            if (this.entryTypesToFail.contains((Object)entry.type())) {
                entry.resultFuture.completeExceptionally(this.rejectException.get().get());
            } else {
                switch (entry.type()) {
                    case ON_NEXT: {
                        OnNextQueueEntry onNextEntry = (OnNextQueueEntry)entry;
                        log.trace(() -> "Calling onNext() with " + onNextEntry.value);
                        this.subscriber.onNext(onNextEntry.value);
                        long newDemand = this.outstandingDemand.decrementAndGet();
                        log.trace(() -> "Decreased demand to " + newDemand);
                        break;
                    }
                    case ON_COMPLETE: {
                        this.entryTypesToFail.addAll(Arrays.asList(QueueEntry.Type.ON_NEXT, QueueEntry.Type.ON_COMPLETE, QueueEntry.Type.ON_ERROR));
                        log.trace(() -> "Calling onComplete()");
                        this.subscriber.onComplete();
                        break;
                    }
                    case ON_ERROR: {
                        OnErrorQueueEntry onErrorEntry = (OnErrorQueueEntry)entry;
                        this.entryTypesToFail.addAll(Arrays.asList(QueueEntry.Type.ON_NEXT, QueueEntry.Type.ON_COMPLETE, QueueEntry.Type.ON_ERROR));
                        log.trace(() -> "Calling onError() with " + onErrorEntry.failure, onErrorEntry.failure);
                        this.subscriber.onError(onErrorEntry.failure);
                        break;
                    }
                    case CANCEL: {
                        this.subscriber = null;
                        break;
                    }
                    default: {
                        throw new IllegalStateException("Unknown entry type: " + (Object)((Object)entry.type()));
                    }
                }
                entry.resultFuture.complete(null);
            }
            this.eventQueue.remove();
        }
        return;
    }

    private boolean shouldProcessQueueEntry(QueueEntry<T> entry) {
        if (this.subscriber == null) {
            return false;
        }
        if (entry == null) {
            return false;
        }
        if (entry.type() != QueueEntry.Type.ON_NEXT) {
            return true;
        }
        if (this.entryTypesToFail.contains((Object)QueueEntry.Type.ON_NEXT)) {
            return true;
        }
        return this.outstandingDemand.get() > 0L;
    }

    private void panicAndDie(Throwable cause) {
        try {
            QueueEntry<T> entry;
            IllegalStateException failure = new IllegalStateException("Encountered fatal error in publisher", cause);
            this.rejectException.compareAndSet(null, () -> failure);
            this.entryTypesToFail.addAll(Arrays.asList(QueueEntry.Type.values()));
            this.subscriber.onError(cause instanceof Error ? cause : failure);
            while ((entry = this.eventQueue.poll()) != null) {
                entry.resultFuture.completeExceptionally(failure);
            }
        }
        catch (Throwable t) {
            t.addSuppressed(cause);
            log.error(() -> "Failed while processing a failure. This could result in stuck futures.", t);
        }
    }

    private void validateRejectState() {
        if (this.rejectException.get() != null) {
            throw this.rejectException.get().get();
        }
    }

    private void setRejectExceptionOrThrow(Supplier<RuntimeException> rejectedException) {
        if (!this.rejectException.compareAndSet(null, rejectedException)) {
            throw this.rejectException.get().get();
        }
    }

    private static final class NoOpSubscription
    implements Subscription {
        private NoOpSubscription() {
        }

        public void request(long n) {
        }

        public void cancel() {
        }
    }

    private static final class CancelQueueEntry<T>
    extends QueueEntry<T> {
        private CancelQueueEntry() {
        }

        @Override
        protected QueueEntry.Type type() {
            return QueueEntry.Type.CANCEL;
        }
    }

    private static final class OnErrorQueueEntry<T>
    extends QueueEntry<T> {
        private final Throwable failure;

        private OnErrorQueueEntry(Throwable failure) {
            this.failure = failure;
        }

        @Override
        protected QueueEntry.Type type() {
            return QueueEntry.Type.ON_ERROR;
        }
    }

    private static final class OnCompleteQueueEntry<T>
    extends QueueEntry<T> {
        private OnCompleteQueueEntry() {
        }

        @Override
        protected QueueEntry.Type type() {
            return QueueEntry.Type.ON_COMPLETE;
        }
    }

    private static final class OnNextQueueEntry<T>
    extends QueueEntry<T> {
        private final T value;

        private OnNextQueueEntry(T value) {
            this.value = value;
        }

        @Override
        protected QueueEntry.Type type() {
            return QueueEntry.Type.ON_NEXT;
        }
    }

    static abstract class QueueEntry<T> {
        protected final CompletableFuture<Void> resultFuture = new CompletableFuture();

        QueueEntry() {
        }

        protected abstract Type type();

        protected static enum Type {
            ON_NEXT,
            ON_COMPLETE,
            ON_ERROR,
            CANCEL;

        }
    }

    private class SubscriptionImpl
    implements Subscription {
        private SubscriptionImpl() {
        }

        public void request(long n) {
            log.trace(() -> "Received request() with " + n);
            if (n <= 0L) {
                IllegalArgumentException failure = new IllegalArgumentException("A downstream publisher requested an invalid amount of data: " + n);
                SimplePublisher.this.rejectException.compareAndSet(null, () -> failure);
                SimplePublisher.this.eventQueue.add(new OnErrorQueueEntry(failure));
                SimplePublisher.this.entryTypesToFail.addAll(Arrays.asList(QueueEntry.Type.ON_NEXT, QueueEntry.Type.ON_COMPLETE));
                SimplePublisher.this.processEventQueue();
            } else {
                long newDemand = SimplePublisher.this.outstandingDemand.updateAndGet(current -> {
                    if (Long.MAX_VALUE - current < n) {
                        return Long.MAX_VALUE;
                    }
                    return current + n;
                });
                log.trace(() -> "Increased demand to " + newDemand);
                SimplePublisher.this.processEventQueue();
            }
        }

        public void cancel() {
            log.trace(() -> "Received cancel()");
            IllegalStateException failure = new IllegalStateException("A downstream publisher has cancelled the subscription.");
            SimplePublisher.this.rejectException.compareAndSet(null, () -> failure);
            SimplePublisher.this.eventQueue.add(new CancelQueueEntry());
            SimplePublisher.this.entryTypesToFail.addAll(Arrays.asList(QueueEntry.Type.ON_NEXT, QueueEntry.Type.ON_COMPLETE, QueueEntry.Type.ON_ERROR));
            SimplePublisher.this.processEventQueue();
        }
    }
}

