/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.core.eventbus.impl;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.impl.BodyReadStream;
import io.vertx.core.eventbus.impl.EventBusImpl;
import io.vertx.core.eventbus.impl.HandlerRegistration;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.PromiseInternal;
import io.vertx.core.internal.concurrent.InboundMessageQueue;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.streams.ReadStream;

public class MessageConsumerImpl<T>
extends HandlerRegistration<T>
implements MessageConsumer<T> {
    private static final Logger log = LoggerFactory.getLogger(MessageConsumerImpl.class);
    private final boolean localOnly;
    private Handler<Message<T>> handler;
    private Handler<Void> endHandler;
    private Handler<Message<T>> discardHandler;
    private final int maxBufferedMessages;
    private final InboundMessageQueue<Message<T>> pending;
    private Promise<Void> result;
    private boolean registered;
    private boolean full;

    MessageConsumerImpl(final ContextInternal context, EventBusImpl eventBus, String address, boolean localOnly, int maxBufferedMessages) {
        super(context, eventBus, address, false);
        this.localOnly = localOnly;
        this.result = context.promise();
        this.maxBufferedMessages = maxBufferedMessages;
        this.pending = new InboundMessageQueue<Message<T>>(context.executor(), context.executor(), maxBufferedMessages, maxBufferedMessages){

            @Override
            protected void handleResume() {
                MessageConsumerImpl.this.full = false;
            }

            @Override
            protected void handlePause() {
                MessageConsumerImpl.this.full = true;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            protected void handleMessage(Message<T> msg) {
                Handler handler;
                MessageConsumerImpl messageConsumerImpl = MessageConsumerImpl.this;
                synchronized (messageConsumerImpl) {
                    handler = MessageConsumerImpl.this.handler;
                }
                if (handler != null) {
                    MessageConsumerImpl.this.dispatch(handler, msg, context.duplicate());
                } else {
                    MessageConsumerImpl.this.handleDiscard(msg, false);
                }
            }

            @Override
            protected void handleDispose(Message<T> msg) {
                MessageConsumerImpl.this.handleDiscard(msg, false);
            }
        };
    }

    @Override
    public synchronized Future<Void> completion() {
        return this.result.future();
    }

    @Override
    public synchronized Future<Void> unregister() {
        this.handler = null;
        if (this.endHandler != null) {
            this.endHandler.handle(null);
        }
        this.pending.close();
        Future<Void> fut = super.unregister();
        if (this.registered) {
            this.registered = false;
            Promise<Void> res = this.result;
            fut.onComplete(ar -> res.tryFail("Consumer unregistered before registration completed"));
            this.result = this.context.promise();
        }
        return fut;
    }

    private void handleDiscard(Message<T> message, boolean isFull) {
        if (this.discardHandler != null) {
            this.discardHandler.handle(message);
        } else if (isFull) {
            if (log.isWarnEnabled()) {
                log.warn((Object)("Discarding message as more than " + this.maxBufferedMessages + " buffered in paused consumer. address: " + this.address));
            }
        } else if (log.isWarnEnabled()) {
            log.warn((Object)("Discarding message since the consumer is not registered. address: " + this.address));
        }
        this.discardMessage(message);
    }

    @Override
    protected void doReceive(Message<T> message) {
        if (this.full) {
            this.handleDiscard(message, true);
        } else {
            this.pending.write(message);
        }
    }

    @Override
    protected void dispatch(Message<T> msg, ContextInternal context, Handler<Message<T>> handler) {
        if (handler == null) {
            throw new NullPointerException();
        }
        context.dispatch(msg, handler);
    }

    public synchronized void discardHandler(Handler<Message<T>> handler) {
        this.discardHandler = handler;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized MessageConsumer<T> handler(Handler<Message<T>> h) {
        if (h != null) {
            MessageConsumerImpl messageConsumerImpl = this;
            synchronized (messageConsumerImpl) {
                this.handler = h;
                if (!this.registered) {
                    this.registered = true;
                    Promise<Void> p = this.result;
                    PromiseInternal<Void> registration = this.context.promise();
                    this.register(true, this.localOnly, registration);
                    registration.future().onComplete(ar -> {
                        if (ar.succeeded()) {
                            p.tryComplete();
                        } else {
                            p.tryFail(ar.cause());
                        }
                    });
                }
            }
        } else {
            this.unregister();
        }
        return this;
    }

    @Override
    public ReadStream<T> bodyStream() {
        return new BodyReadStream(this);
    }

    @Override
    public synchronized MessageConsumer<T> pause() {
        this.pending.pause();
        return this;
    }

    @Override
    public MessageConsumer<T> resume() {
        return this.fetch(Long.MAX_VALUE);
    }

    @Override
    public synchronized MessageConsumer<T> fetch(long amount) {
        this.pending.fetch(amount);
        return this;
    }

    @Override
    public synchronized MessageConsumer<T> endHandler(Handler<Void> endHandler) {
        if (endHandler != null) {
            ContextInternal endCtx = this.context.owner().getOrCreateContext();
            this.endHandler = v1 -> endCtx.runOnContext(v2 -> endHandler.handle(null));
        } else {
            this.endHandler = null;
        }
        return this;
    }

    @Override
    public synchronized MessageConsumer<T> exceptionHandler(Handler<Throwable> handler) {
        return this;
    }

    public synchronized Handler<Message<T>> getHandler() {
        return this.handler;
    }
}

