/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpConnection;
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpLink;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpSession;
import com.azure.core.amqp.AmqpShutdownSignal;
import com.azure.core.amqp.AmqpTransaction;
import com.azure.core.amqp.AmqpTransactionCoordinator;
import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.AmqpEndpointStateUtil;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.AmqpSendLink;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.ReactorReceiver;
import com.azure.core.amqp.implementation.ReactorSender;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.amqp.implementation.TransactionCoordinator;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.util.logging.ClientLogger;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Extendable;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class ReactorSession
implements AmqpSession {
    private static final String TRANSACTION_LINK_NAME = "coordinator";
    private final ConcurrentMap<String, LinkSubscription<AmqpSendLink>> openSendLinks = new ConcurrentHashMap<String, LinkSubscription<AmqpSendLink>>();
    private final ConcurrentMap<String, LinkSubscription<AmqpReceiveLink>> openReceiveLinks = new ConcurrentHashMap<String, LinkSubscription<AmqpReceiveLink>>();
    private final Scheduler timeoutScheduler = Schedulers.parallel();
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final Object closeLock = new Object();
    private final Sinks.Empty<Void> isClosedMono = Sinks.empty();
    private final ClientLogger logger = new ClientLogger(ReactorSession.class);
    private final Flux<AmqpEndpointState> endpointStates;
    private final AmqpConnection amqpConnection;
    private final Session session;
    private final SessionHandler sessionHandler;
    private final String sessionName;
    private final ReactorProvider provider;
    private final TokenManagerProvider tokenManagerProvider;
    private final MessageSerializer messageSerializer;
    private final String activeTimeoutMessage;
    private final AmqpRetryOptions retryOptions;
    private final ReactorHandlerProvider handlerProvider;
    private final Mono<ClaimsBasedSecurityNode> cbsNodeSupplier;
    private final Disposable.Composite connectionSubscriptions;
    private final AtomicReference<TransactionCoordinator> transactionCoordinator = new AtomicReference();
    private final Flux<AmqpShutdownSignal> shutdownSignals;

    public ReactorSession(AmqpConnection amqpConnection, Session session, SessionHandler sessionHandler, String sessionName, ReactorProvider provider, ReactorHandlerProvider handlerProvider, Mono<ClaimsBasedSecurityNode> cbsNodeSupplier, TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer, AmqpRetryOptions retryOptions) {
        this.amqpConnection = amqpConnection;
        this.session = session;
        this.sessionHandler = sessionHandler;
        this.handlerProvider = handlerProvider;
        this.sessionName = sessionName;
        this.provider = provider;
        this.cbsNodeSupplier = cbsNodeSupplier;
        this.tokenManagerProvider = tokenManagerProvider;
        this.messageSerializer = messageSerializer;
        this.retryOptions = retryOptions;
        this.activeTimeoutMessage = String.format("ReactorSession connectionId[%s], session[%s]: Retries exhausted waiting for ACTIVE endpoint state.", sessionHandler.getConnectionId(), sessionName);
        this.endpointStates = sessionHandler.getEndpointStates().map(state -> {
            this.logger.verbose("connectionId[{}], sessionName[{}], state[{}]", new Object[]{sessionHandler.getConnectionId(), sessionName, state});
            return AmqpEndpointStateUtil.getConnectionState(state);
        }).doOnError(error -> this.handleError((Throwable)error)).doOnComplete(() -> this.handleClose()).cache(1);
        this.shutdownSignals = amqpConnection.getShutdownSignals();
        this.connectionSubscriptions = Disposables.composite((Disposable[])new Disposable[]{this.endpointStates.subscribe(), this.shutdownSignals.flatMap(signal -> this.closeAsync("Shutdown signal received", null, false)).subscribe()});
        session.open();
    }

    Session session() {
        return this.session;
    }

    @Override
    public Flux<AmqpEndpointState> getEndpointStates() {
        return this.endpointStates;
    }

    public boolean isDisposed() {
        return this.isDisposed.get();
    }

    public void dispose() {
        this.closeAsync().block(this.retryOptions.getTryTimeout());
    }

    @Override
    public String getSessionName() {
        return this.sessionName;
    }

    @Override
    public Duration getOperationTimeout() {
        return this.retryOptions.getTryTimeout();
    }

    @Override
    public Mono<AmqpTransaction> createTransaction() {
        return this.getOrCreateTransactionCoordinator().flatMap(coordinator -> coordinator.declare());
    }

    @Override
    public Mono<Void> commitTransaction(AmqpTransaction transaction) {
        return this.getOrCreateTransactionCoordinator().flatMap(coordinator -> coordinator.discharge(transaction, true));
    }

    @Override
    public Mono<Void> rollbackTransaction(AmqpTransaction transaction) {
        return this.getOrCreateTransactionCoordinator().flatMap(coordinator -> coordinator.discharge(transaction, false));
    }

    @Override
    public Mono<AmqpLink> createProducer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retry) {
        return this.createProducer(linkName, entityPath, timeout, retry, null).or(this.onClosedError(String.format("connectionId[%s] entityPath[%s] linkName[%s] Connection closed while waiting for new producer link.", this.sessionHandler.getConnectionId(), entityPath, linkName)));
    }

    @Override
    public Mono<AmqpLink> createConsumer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retry) {
        return this.createConsumer(linkName, entityPath, timeout, retry, null, null, null, SenderSettleMode.UNSETTLED, ReceiverSettleMode.SECOND).or(this.onClosedError(String.format("connectionId[%s] entityPath[%s] linkName[%s] Connection closed while waiting for new receive link.", this.sessionHandler.getConnectionId(), entityPath, linkName))).cast(AmqpLink.class);
    }

    @Override
    public boolean removeLink(String linkName) {
        return this.removeLink(this.openSendLinks, linkName) || this.removeLink(this.openReceiveLinks, linkName);
    }

    Mono<Void> isClosed() {
        return this.isClosedMono.asMono();
    }

    @Override
    public Mono<Void> closeAsync() {
        return this.closeAsync(null, null, true);
    }

    Mono<Void> closeAsync(String message, ErrorCondition errorCondition, boolean disposeLinks) {
        if (this.isDisposed.getAndSet(true)) {
            return this.isClosedMono.asMono();
        }
        String condition = errorCondition != null ? errorCondition.toString() : "n/a";
        this.logger.verbose("connectionId[{}], sessionName[{}], errorCondition[{}]. Setting error condition and disposing session. {}", new Object[]{this.sessionHandler.getConnectionId(), this.sessionName, condition, message != null ? message : ""});
        return Mono.fromRunnable(() -> {
            try {
                this.provider.getReactorDispatcher().invoke(() -> this.disposeWork(errorCondition, disposeLinks));
            }
            catch (IOException e) {
                this.logger.info("connectionId[{}] sessionName[{}] Error while scheduling work. Manually disposing.", new Object[]{this.sessionHandler.getConnectionId(), this.sessionName, e});
                this.disposeWork(errorCondition, disposeLinks);
            }
            catch (RejectedExecutionException e) {
                this.logger.info("connectionId[{}] sessionName[{}] RejectedExecutionException when scheduling work.", new Object[]{this.sessionHandler.getConnectionId(), this.sessionName});
                this.disposeWork(errorCondition, disposeLinks);
            }
        }).then(this.isClosedMono.asMono());
    }

    @Override
    public Mono<? extends AmqpTransactionCoordinator> getOrCreateTransactionCoordinator() {
        if (this.isDisposed()) {
            return Mono.error((Throwable)this.logger.logExceptionAsError((RuntimeException)((Object)new AmqpException(true, String.format("connectionId[%s] sessionName[%s] Cannot create coordinator send link '%s' from a closed session.", this.sessionHandler.getConnectionId(), this.sessionName, TRANSACTION_LINK_NAME), this.sessionHandler.getErrorContext()))));
        }
        TransactionCoordinator existing = this.transactionCoordinator.get();
        if (existing != null) {
            this.logger.verbose("connectionId[{}] coordinator[{}]: Returning existing transaction coordinator.", new Object[]{this.sessionHandler.getConnectionId(), TRANSACTION_LINK_NAME});
            return Mono.just((Object)existing);
        }
        return this.createProducer(TRANSACTION_LINK_NAME, TRANSACTION_LINK_NAME, (org.apache.qpid.proton.amqp.transport.Target)new Coordinator(), this.retryOptions, null, false).map(link -> {
            TransactionCoordinator newCoordinator = new TransactionCoordinator((AmqpSendLink)link, this.messageSerializer);
            if (this.transactionCoordinator.compareAndSet(null, newCoordinator)) {
                return newCoordinator;
            }
            return this.transactionCoordinator.get();
        }).or(this.onClosedError(String.format("connectionId[%s] Connection closed while waiting for transaction coordinator creation.", this.sessionHandler.getConnectionId())));
    }

    protected Mono<AmqpReceiveLink> createConsumer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retry, Map<Symbol, Object> sourceFilters, Map<Symbol, Object> receiverProperties, Symbol[] receiverDesiredCapabilities, SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode) {
        if (this.isDisposed()) {
            return Mono.error((Throwable)this.logger.logExceptionAsError((RuntimeException)((Object)new AmqpException(true, String.format("connectionId[%s] sessionName[%s] entityPath[%s] linkName[%s] Cannot create receive link from a closed session.", this.sessionHandler.getConnectionId(), this.sessionName, entityPath, linkName), this.sessionHandler.getErrorContext()))));
        }
        LinkSubscription existingLink = (LinkSubscription)this.openReceiveLinks.get(linkName);
        if (existingLink != null) {
            this.logger.info("linkName[{}] entityPath[{}] Returning existing receive link.", new Object[]{linkName, entityPath});
            return Mono.just((Object)((AmqpReceiveLink)existingLink.getLink()));
        }
        TokenManager tokenManager = this.tokenManagerProvider.getTokenManager(this.cbsNodeSupplier, entityPath);
        return Mono.when((Publisher[])new Publisher[]{this.onActiveEndpoint(), tokenManager.authorize()}).then(Mono.create(sink -> {
            try {
                this.provider.getReactorDispatcher().invoke(() -> {
                    LinkSubscription computed = this.openReceiveLinks.compute(linkName, (linkNameKey, existing) -> {
                        if (existing != null) {
                            this.logger.info("linkName[{}]: Another receive link exists. Disposing of new one.", new Object[]{linkName});
                            tokenManager.close();
                            return existing;
                        }
                        this.logger.info("connectionId[{}] sessionId[{}] linkName[{}] Creating a new receiver link.", new Object[]{this.sessionHandler.getConnectionId(), this.sessionName, linkName});
                        return this.getSubscription((String)linkNameKey, entityPath, sourceFilters, receiverProperties, receiverDesiredCapabilities, senderSettleMode, receiverSettleMode, tokenManager);
                    });
                    sink.success((Object)((AmqpReceiveLink)computed.getLink()));
                });
            }
            catch (IOException | RejectedExecutionException e) {
                sink.error((Throwable)e);
            }
        }));
    }

    protected ReactorReceiver createConsumer(String entityPath, Receiver receiver, ReceiveLinkHandler receiveLinkHandler, TokenManager tokenManager, ReactorProvider reactorProvider) {
        return new ReactorReceiver(this.amqpConnection, entityPath, receiver, receiveLinkHandler, tokenManager, reactorProvider.getReactorDispatcher(), this.retryOptions);
    }

    protected Mono<AmqpLink> createProducer(String linkName, String entityPath, Duration timeout, AmqpRetryPolicy retry, Map<Symbol, Object> linkProperties) {
        AmqpRetryOptions options;
        Target target = new Target();
        target.setAddress(entityPath);
        AmqpRetryOptions amqpRetryOptions = options = retry != null ? new AmqpRetryOptions(retry.getRetryOptions()) : new AmqpRetryOptions();
        if (timeout != null) {
            options.setTryTimeout(timeout);
        }
        return this.createProducer(linkName, entityPath, (org.apache.qpid.proton.amqp.transport.Target)target, options, linkProperties, true).cast(AmqpLink.class);
    }

    private Mono<AmqpSendLink> createProducer(String linkName, String entityPath, org.apache.qpid.proton.amqp.transport.Target target, AmqpRetryOptions options, Map<Symbol, Object> linkProperties, boolean requiresAuthorization) {
        Mono<Long> authorize;
        TokenManager tokenManager;
        if (this.isDisposed()) {
            return Mono.error((Throwable)this.logger.logExceptionAsError((RuntimeException)((Object)new AmqpException(true, String.format("connectionId[%s] sessionName[%s] entityPath[%s] linkName[%s] Cannot create send link from a closed session.", this.sessionHandler.getConnectionId(), this.sessionName, entityPath, linkName), this.sessionHandler.getErrorContext()))));
        }
        LinkSubscription existing = (LinkSubscription)this.openSendLinks.get(linkName);
        if (existing != null) {
            this.logger.verbose("linkName[{}]: Returning existing send link.", new Object[]{linkName});
            return Mono.just((Object)((AmqpSendLink)existing.getLink()));
        }
        if (requiresAuthorization) {
            tokenManager = this.tokenManagerProvider.getTokenManager(this.cbsNodeSupplier, entityPath);
            authorize = tokenManager.authorize();
        } else {
            tokenManager = null;
            authorize = Mono.empty();
        }
        return Mono.when((Publisher[])new Publisher[]{this.onActiveEndpoint(), authorize}).then(Mono.create(sink -> {
            try {
                this.provider.getReactorDispatcher().invoke(() -> {
                    LinkSubscription computed = this.openSendLinks.compute(linkName, (linkNameKey, existingLink) -> {
                        if (existingLink != null) {
                            this.logger.info("linkName[{}]: Another send link exists. Disposing of new one.", new Object[]{linkName});
                            if (tokenManager != null) {
                                tokenManager.close();
                            }
                            return existingLink;
                        }
                        this.logger.info("connectionId[{}] sessionId[{}] linkName[{}] Creating a new send link.", new Object[]{this.sessionHandler.getConnectionId(), this.sessionName, linkName});
                        return this.getSubscription(linkName, entityPath, target, linkProperties, options, tokenManager);
                    });
                    sink.success((Object)((AmqpSendLink)computed.getLink()));
                });
            }
            catch (IOException | RejectedExecutionException e) {
                sink.error((Throwable)e);
            }
        }));
    }

    private LinkSubscription<AmqpSendLink> getSubscription(String linkName, String entityPath, org.apache.qpid.proton.amqp.transport.Target target, Map<Symbol, Object> linkProperties, AmqpRetryOptions options, TokenManager tokenManager) {
        Sender sender = this.session.sender(linkName);
        sender.setTarget(target);
        Source source = new Source();
        sender.setSource((org.apache.qpid.proton.amqp.transport.Source)source);
        sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
        if (linkProperties != null && linkProperties.size() > 0) {
            sender.setProperties(linkProperties);
        }
        SendLinkHandler sendLinkHandler = this.handlerProvider.createSendLinkHandler(this.sessionHandler.getConnectionId(), this.sessionHandler.getHostname(), linkName, entityPath);
        BaseHandler.setHandler((Extendable)sender, (Handler)sendLinkHandler);
        sender.open();
        ReactorSender reactorSender = new ReactorSender(this.amqpConnection, entityPath, sender, sendLinkHandler, this.provider, tokenManager, this.messageSerializer, options, this.timeoutScheduler);
        Disposable subscription = reactorSender.getEndpointStates().subscribe(state -> {}, error -> {
            if (!this.isDisposed.get()) {
                this.removeLink(this.openSendLinks, linkName);
            }
        }, () -> {
            if (!this.isDisposed.get()) {
                this.logger.info("linkName[{}]: Complete. Removing and disposing send link.", new Object[]{linkName});
                this.removeLink(this.openSendLinks, linkName);
            }
        });
        return new LinkSubscription<AmqpSendLink>(reactorSender, subscription, String.format("connectionId[%s] session[%s]: Setting error on receive link.", this.sessionHandler.getConnectionId(), this.sessionName), null);
    }

    private LinkSubscription<AmqpReceiveLink> getSubscription(String linkName, String entityPath, Map<Symbol, Object> sourceFilters, Map<Symbol, Object> receiverProperties, Symbol[] receiverDesiredCapabilities, SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode, TokenManager tokenManager) {
        Receiver receiver = this.session.receiver(linkName);
        Source source = new Source();
        source.setAddress(entityPath);
        if (sourceFilters != null && sourceFilters.size() > 0) {
            source.setFilter(sourceFilters);
        }
        receiver.setSource((org.apache.qpid.proton.amqp.transport.Source)source);
        Target target = new Target();
        receiver.setTarget((org.apache.qpid.proton.amqp.transport.Target)target);
        receiver.setSenderSettleMode(senderSettleMode);
        receiver.setReceiverSettleMode(receiverSettleMode);
        if (receiverProperties != null && !receiverProperties.isEmpty()) {
            receiver.setProperties(receiverProperties);
        }
        if (receiverDesiredCapabilities != null && receiverDesiredCapabilities.length > 0) {
            receiver.setDesiredCapabilities(receiverDesiredCapabilities);
        }
        ReceiveLinkHandler receiveLinkHandler = this.handlerProvider.createReceiveLinkHandler(this.sessionHandler.getConnectionId(), this.sessionHandler.getHostname(), linkName, entityPath);
        BaseHandler.setHandler((Extendable)receiver, (Handler)receiveLinkHandler);
        receiver.open();
        ReactorReceiver reactorReceiver = this.createConsumer(entityPath, receiver, receiveLinkHandler, tokenManager, this.provider);
        Disposable subscription = reactorReceiver.getEndpointStates().subscribe(state -> {}, error -> {
            if (!this.isDisposed.get()) {
                this.removeLink(this.openReceiveLinks, linkName);
            }
        }, () -> {
            if (!this.isDisposed.get()) {
                this.logger.info("linkName[{}] entityPath[{}]: Complete. Removing receive link.", new Object[]{linkName, entityPath});
                this.removeLink(this.openReceiveLinks, linkName);
            }
        });
        return new LinkSubscription<AmqpReceiveLink>(reactorReceiver, subscription, String.format("connectionId[%s] sessionName[%s]: Setting error on receive link.", this.amqpConnection.getId(), this.sessionName), null);
    }

    private <T> Mono<T> onClosedError(String message) {
        return Mono.firstWithSignal((Mono[])new Mono[]{this.isClosedMono.asMono(), this.shutdownSignals.next()}).then(Mono.error((Throwable)((Object)new AmqpException(false, String.format("connectionId[%s] Connection closed. %s", this.sessionHandler.getConnectionId(), message), this.sessionHandler.getErrorContext()))));
    }

    private Mono<Void> onActiveEndpoint() {
        return RetryUtil.withRetry(this.getEndpointStates().takeUntil(state -> state == AmqpEndpointState.ACTIVE), this.retryOptions, this.activeTimeoutMessage).then();
    }

    private void handleClose() {
        this.logger.verbose("connectionId[{}] sessionName[{}] Disposing of active send and receive links due to session close.", new Object[]{this.sessionHandler.getConnectionId(), this.sessionName});
        this.closeAsync().subscribe();
    }

    private void handleError(Throwable error) {
        ErrorCondition condition;
        this.logger.verbose("connectionId[{}] sessionName[{}]  Disposing of active links due to error.", new Object[]{this.sessionHandler.getConnectionId(), this.sessionName, error});
        if (error instanceof AmqpException) {
            AmqpException exception = (AmqpException)((Object)error);
            String errorCondition = exception.getErrorCondition() != null ? exception.getErrorCondition().getErrorCondition() : "UNKNOWN";
            condition = new ErrorCondition(Symbol.getSymbol((String)errorCondition), exception.getMessage());
            this.closeAsync(exception.getMessage(), condition, true).subscribe();
        } else {
            condition = null;
        }
        this.closeAsync(error.getMessage(), condition, true).subscribe();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void disposeWork(ErrorCondition errorCondition, boolean disposeLinks) {
        if (this.session.getLocalState() != EndpointState.CLOSED) {
            this.session.close();
            if (errorCondition != null && this.session.getCondition() == null) {
                this.session.setCondition(errorCondition);
            }
        }
        ArrayList closingLinks = new ArrayList();
        if (disposeLinks) {
            Object object = this.closeLock;
            synchronized (object) {
                this.openReceiveLinks.values().forEach(link -> {
                    if (link == null) {
                        return;
                    }
                    closingLinks.add(link.closeAsync(errorCondition));
                });
                this.openSendLinks.values().forEach(link -> {
                    if (link == null) {
                        return;
                    }
                    closingLinks.add(link.closeAsync(errorCondition));
                });
            }
        }
        Mono closeLinksMono = Mono.when(closingLinks).timeout(this.retryOptions.getTryTimeout()).onErrorResume(error -> {
            this.logger.warning("connectionId[{}] sessionName[{}] Timed out waiting for all links to close.", new Object[]{this.sessionHandler.getConnectionId(), this.sessionName, error});
            return Mono.empty();
        }).then(Mono.fromRunnable(() -> {
            this.isClosedMono.emitEmpty((signalType, result) -> {
                this.logger.warning("connectionId[{}] signal[{}] result[{}] Unable to emit shutdown signal.", new Object[]{this.sessionHandler.getConnectionId(), signalType, result});
                return false;
            });
            this.sessionHandler.close();
            this.connectionSubscriptions.dispose();
        }));
        this.connectionSubscriptions.add(closeLinksMono.subscribe());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T extends AmqpLink> boolean removeLink(ConcurrentMap<String, LinkSubscription<T>> openLinks, String key) {
        if (key == null) {
            return false;
        }
        Object object = this.closeLock;
        synchronized (object) {
            LinkSubscription removed = (LinkSubscription)openLinks.remove(key);
            if (removed != null) {
                removed.closeAsync(null).subscribe();
            }
            return removed != null;
        }
    }

    private static final class LinkSubscription<T extends AmqpLink> {
        private final AtomicBoolean isDisposed = new AtomicBoolean();
        private final T link;
        private final Disposable subscription;
        private final String errorMessage;

        private LinkSubscription(T link, Disposable subscription, String errorMessage) {
            this.link = link;
            this.subscription = subscription;
            this.errorMessage = errorMessage;
        }

        public T getLink() {
            return this.link;
        }

        Mono<Void> closeAsync(ErrorCondition errorCondition) {
            if (this.isDisposed.getAndSet(true)) {
                return Mono.empty();
            }
            this.subscription.dispose();
            if (this.link instanceof ReactorReceiver) {
                return ((ReactorReceiver)this.link).closeAsync(this.errorMessage, errorCondition);
            }
            if (this.link instanceof ReactorSender) {
                return ((ReactorSender)this.link).closeAsync(this.errorMessage, errorCondition);
            }
            this.link.dispose();
            return Mono.empty();
        }

        /* synthetic */ LinkSubscription(AmqpLink x0, Disposable x1, String x2, 1 x3) {
            this(x0, x1, x2);
        }
    }
}

