/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.internal;

import io.netty.buffer.ByteBuf;
import io.rsocket.DuplexConnection;
import io.rsocket.internal.UnboundedProcessor;
import reactor.core.Scannable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public abstract class BaseDuplexConnection
implements DuplexConnection {
    protected final Sinks.Empty<Void> onClose = Sinks.empty();
    protected final UnboundedProcessor sender = new UnboundedProcessor(() -> this.onClose.tryEmitEmpty());

    @Override
    public void sendFrame(int streamId, ByteBuf frame) {
        if (streamId == 0) {
            this.sender.tryEmitPrioritized(frame);
        } else {
            this.sender.tryEmitNormal(frame);
        }
    }

    protected abstract void doOnClose();

    @Override
    public Mono<Void> onClose() {
        return this.onClose.asMono();
    }

    public final void dispose() {
        this.doOnClose();
    }

    public final boolean isDisposed() {
        return (Boolean)this.onClose.scan(Scannable.Attr.TERMINATED) != false || (Boolean)this.onClose.scan(Scannable.Attr.CANCELLED) != false;
    }
}

