/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.salesforce.internal.streaming;

import java.io.IOException;
import java.net.CookieManager;
import java.net.CookiePolicy;
import java.net.CookieStore;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.apache.camel.CamelException;
import org.apache.camel.component.salesforce.SalesforceComponent;
import org.apache.camel.component.salesforce.SalesforceEndpoint;
import org.apache.camel.component.salesforce.SalesforceEndpointConfig;
import org.apache.camel.component.salesforce.SalesforceHttpClient;
import org.apache.camel.component.salesforce.StreamingApiConsumer;
import org.apache.camel.component.salesforce.api.SalesforceException;
import org.apache.camel.component.salesforce.internal.SalesforceSession;
import org.apache.camel.component.salesforce.internal.streaming.ReplayExtension;
import org.apache.camel.support.service.ServiceSupport;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSession;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.client.BayeuxClient;
import org.cometd.client.http.jetty.JettyHttpClientTransport;
import org.cometd.client.transport.ClientTransport;
import org.eclipse.jetty.client.Request;
import org.eclipse.jetty.http.HttpCookie;
import org.eclipse.jetty.http.HttpCookieStore;
import org.eclipse.jetty.http.HttpHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscriptionHelper
extends ServiceSupport {
    static final ReplayExtension REPLAY_EXTENSION = new ReplayExtension();
    private static final Logger LOG = LoggerFactory.getLogger(SubscriptionHelper.class);
    private static final int HANDSHAKE_TIMEOUT_SEC = 120;
    private static final String FAILURE_FIELD = "failure";
    private static final String EXCEPTION_FIELD = "exception";
    private static final String SFDC_FIELD = "sfdc";
    private static final String FAILURE_REASON_FIELD = "failureReason";
    private static final String SERVER_TOO_BUSY_ERROR = "503::";
    private static final String AUTHENTICATION_INVALID = "401::Authentication invalid";
    private static final String INVALID_REPLAY_ID_PATTERN = "400::The replayId \\{.*} you provided was invalid.*";
    BayeuxClient client;
    private final SalesforceComponent component;
    private SalesforceSession session;
    private final long maxBackoff;
    private final long backoffIncrement;
    private volatile String handshakeError;
    private volatile Exception handshakeException;
    private volatile String connectError;
    private volatile Exception connectException;
    private final AtomicLong handshakeBackoff;
    private final Map<String, Set<StreamingApiConsumer>> channelToConsumers = new ConcurrentHashMap<String, Set<StreamingApiConsumer>>();
    private final Map<StreamingApiConsumer, ClientSessionChannel.MessageListener> consumerToListener = new ConcurrentHashMap<StreamingApiConsumer, ClientSessionChannel.MessageListener>();
    private final Set<String> channelsToSubscribe = ConcurrentHashMap.newKeySet();
    private final ClientSessionChannel.MessageListener handshakeListener = this.createHandshakeListener();
    private final ClientSessionChannel.MessageListener subscriptionListener = this.createSubscriptionListener();
    private final ClientSessionChannel.MessageListener connectListener = this.createConnectionListener();

    public SubscriptionHelper(SalesforceComponent component) {
        this.component = component;
        this.handshakeBackoff = new AtomicLong();
        this.backoffIncrement = component.getConfig().getBackoffIncrement();
        this.maxBackoff = component.getConfig().getMaxBackoff();
    }

    private ClientSessionChannel.MessageListener createHandshakeListener() {
        return (channel, message) -> this.component.getHttpClient().getWorkerPool().execute(() -> {
            LOG.debug("[CHANNEL:META_HANDSHAKE]: {}", (Object)message);
            if (!message.isSuccessful()) {
                String failureReason;
                LOG.warn("Handshake failure: {}", (Object)message);
                this.handshakeError = (String)message.get((Object)"error");
                this.handshakeException = SubscriptionHelper.getFailure(message);
                if (this.handshakeError != null && this.handshakeError.startsWith("403::") && (failureReason = SubscriptionHelper.getFailureReason(message)).equals(AUTHENTICATION_INVALID)) {
                    LOG.debug("attempting login due to handshake error: 403 -> 401::Authentication invalid");
                    this.session.attemptLoginUntilSuccessful(this.backoffIncrement, this.maxBackoff);
                }
                LOG.debug("Handshake failed, so try again.");
                this.client.handshake();
            } else if (!this.channelToConsumers.isEmpty()) {
                this.channelsToSubscribe.clear();
                this.channelsToSubscribe.addAll(this.channelToConsumers.keySet());
                LOG.info("Handshake successful. Channels to subscribe: {}", this.channelsToSubscribe);
            }
        });
    }

    private ClientSessionChannel.MessageListener createConnectionListener() {
        return (channel, message) -> this.component.getHttpClient().getWorkerPool().execute(() -> {
            LOG.debug("[CHANNEL:META_CONNECT]: {}", (Object)message);
            if (!message.isSuccessful()) {
                LOG.warn("Connect failure: {}", (Object)message);
                this.connectError = (String)message.get((Object)"error");
                this.connectException = SubscriptionHelper.getFailure(message);
                if (this.connectError != null && this.connectError.equals(AUTHENTICATION_INVALID)) {
                    LOG.debug("connectError: {}", (Object)this.connectError);
                    LOG.debug("Attempting login...");
                    this.session.attemptLoginUntilSuccessful(this.backoffIncrement, this.maxBackoff);
                }
                if (message.getAdvice() == null || "none".equals(message.getAdvice().get("reconnect"))) {
                    LOG.debug("Advice == none, so handshaking");
                    this.client.handshake();
                }
            } else if (!this.channelsToSubscribe.isEmpty()) {
                LOG.info("Subscribing to channels: {}", this.channelsToSubscribe);
                for (String channelName : this.channelsToSubscribe) {
                    for (StreamingApiConsumer consumer : this.channelToConsumers.get(channelName)) {
                        this.subscribe(consumer);
                    }
                }
            }
        });
    }

    private ClientSessionChannel.MessageListener createSubscriptionListener() {
        return (channel, message) -> this.component.getHttpClient().getWorkerPool().execute(() -> {
            LOG.debug("[CHANNEL:META_SUBSCRIBE]: {}", (Object)message);
            String channelName = message.getOrDefault((Object)"subscription", (Object)"").toString();
            if (!message.isSuccessful()) {
                LOG.warn("Subscription failure: {}", (Object)message);
                Set consumers = this.channelToConsumers.getOrDefault(channelName, Collections.emptySet());
                consumers.stream().findFirst().ifPresent(salesforceConsumer -> this.subscriptionFailed((StreamingApiConsumer)((Object)((Object)((Object)salesforceConsumer))), message));
            } else {
                LOG.info("Subscribed to channel {}", (Object)channelName);
                this.channelsToSubscribe.remove(channelName);
                this.handshakeBackoff.set(0L);
            }
        });
    }

    private void subscriptionFailed(StreamingApiConsumer firstConsumer, Message message) {
        String channelName = message.getOrDefault((Object)"subscription", (Object)"").toString();
        Set consumers = this.channelToConsumers.getOrDefault(channelName, Collections.emptySet());
        String error = (String)message.get((Object)"error");
        if (error == null) {
            error = "Missing error message";
        }
        Exception failure = SubscriptionHelper.getFailure(message);
        String msg = String.format("Error subscribing to %s: %s", firstConsumer.getTopicName(), failure != null ? failure.getMessage() : error);
        boolean abort = true;
        LOG.warn(msg);
        if (SubscriptionHelper.isTemporaryError(message)) {
            long backoff = this.handshakeBackoff.getAndAdd(this.backoffIncrement);
            if (backoff > this.maxBackoff) {
                LOG.error("Subscribe aborted after exceeding {} msecs backoff", (Object)this.maxBackoff);
            } else {
                abort = false;
                try {
                    LOG.debug("Pausing for {} msecs before subscribe attempt", (Object)backoff);
                    Thread.sleep(backoff);
                    for (StreamingApiConsumer consumer : consumers) {
                        this.subscribe(consumer);
                    }
                }
                catch (InterruptedException e) {
                    LOG.warn("Aborting subscribe on interrupt!", (Throwable)e);
                }
            }
        } else if (error.matches(INVALID_REPLAY_ID_PATTERN)) {
            abort = false;
            long fallBackReplayId = firstConsumer.getEndpoint().getConfiguration().getFallBackReplayId();
            LOG.warn(error);
            LOG.warn("Falling back to replayId {} for channel {}", (Object)fallBackReplayId, (Object)channelName);
            REPLAY_EXTENSION.setReplayId(channelName, fallBackReplayId);
            for (StreamingApiConsumer consumer : consumers) {
                this.subscribe(consumer);
            }
        }
        if (abort && this.client != null) {
            for (StreamingApiConsumer consumer : consumers) {
                consumer.handleException(msg, (Throwable)((Object)new SalesforceException(msg, failure)));
            }
        }
    }

    protected void doStart() throws Exception {
        this.session = this.component.getSession();
        if (this.component.getLoginConfig().isLazyLogin()) {
            throw new CamelException("Lazy login is not supported by salesforce consumers.");
        }
        this.client = SubscriptionHelper.createClient(this.component, this.session);
        this.initMessageListeners();
        this.handshake();
    }

    private void initMessageListeners() {
        this.client.getChannel("/meta/handshake").addListener((ClientSessionChannel.ClientSessionChannelListener)this.handshakeListener);
        this.client.getChannel("/meta/subscribe").addListener((ClientSessionChannel.ClientSessionChannelListener)this.subscriptionListener);
        this.client.getChannel("/meta/connect").addListener((ClientSessionChannel.ClientSessionChannelListener)this.connectListener);
    }

    private void handshake() throws CamelException {
        this.client.handshake();
        long waitMs = TimeUnit.MILLISECONDS.convert(120L, TimeUnit.SECONDS);
        if (!this.client.waitFor(waitMs, BayeuxClient.State.CONNECTED, new BayeuxClient.State[0])) {
            if (this.handshakeException != null) {
                throw new CamelException(String.format("Exception during HANDSHAKE: %s", this.handshakeException.getMessage()), (Throwable)this.handshakeException);
            }
            if (this.handshakeError != null) {
                throw new CamelException(String.format("Error during HANDSHAKE: %s", this.handshakeError));
            }
            if (this.connectException != null) {
                throw new CamelException(String.format("Exception during CONNECT: %s", this.connectException.getMessage()), (Throwable)this.connectException);
            }
            if (this.connectError != null) {
                throw new CamelException(String.format("Error during CONNECT: %s", this.connectError));
            }
            throw new CamelException(String.format("Handshake request timeout after %s seconds", 120));
        }
    }

    private static Exception getFailure(Message message) {
        Object exception = null;
        if (message.get((Object)EXCEPTION_FIELD) != null) {
            exception = (Exception)message.get((Object)EXCEPTION_FIELD);
        } else if (message.get((Object)FAILURE_FIELD) != null) {
            exception = (Exception)((Map)message.get((Object)FAILURE_FIELD)).get(EXCEPTION_FIELD);
        } else {
            String failureReason = SubscriptionHelper.getFailureReason(message);
            if (failureReason != null) {
                exception = new SalesforceException(failureReason, null);
            }
        }
        return exception;
    }

    private void closeChannel(String name) {
        if (this.client == null) {
            return;
        }
        ClientSessionChannel channel = this.client.getChannel(name);
        for (ClientSessionChannel.ClientSessionChannelListener listener : channel.getListeners()) {
            channel.removeListener(listener);
        }
        channel.release();
    }

    protected void doStop() throws Exception {
        this.closeChannel("/meta/connect");
        this.closeChannel("/meta/subscribe");
        this.closeChannel("/meta/handshake");
        if (this.client == null) {
            return;
        }
        this.client.disconnect();
        boolean disconnected = this.client.waitFor(60000L, BayeuxClient.State.DISCONNECTED, new BayeuxClient.State[0]);
        if (!disconnected) {
            LOG.warn("Could not disconnect client connected to: {}", (Object)SubscriptionHelper.getEndpointUrl(this.component));
            this.client.abort();
        }
        this.client = null;
        if (this.session != null) {
            this.session.logout();
        }
        LOG.debug("Stopped the helper and destroyed the client");
    }

    static BayeuxClient createClient(SalesforceComponent component, final SalesforceSession session) throws SalesforceException {
        SalesforceHttpClient httpClient = component.getConfig().getHttpClient();
        HashMap<String, Object> options = new HashMap<String, Object>();
        options.put("maxNetworkDelay", 120000);
        if (component.getLongPollingTransportProperties() != null) {
            options.putAll(component.getLongPollingTransportProperties());
        }
        if (session.getAccessToken() == null && !component.getLoginConfig().isLazyLogin()) {
            session.login(null);
        }
        final CookieStore cookieStore = new CookieManager().getCookieStore();
        HttpCookieStore.Default httpCookieStore = new HttpCookieStore.Default();
        JettyHttpClientTransport transport = new JettyHttpClientTransport(options, httpClient, (HttpCookieStore)httpCookieStore){
            final /* synthetic */ HttpCookieStore val$httpCookieStore;
            {
                this.val$httpCookieStore = httpCookieStore;
                super(options, httpClient);
            }

            protected void customize(Request request) {
                super.customize(request);
                String accessToken = session.getAccessToken();
                if (accessToken == null) {
                    try {
                        accessToken = session.login(null);
                    }
                    catch (SalesforceException e) {
                        throw new RuntimeException((Throwable)((Object)e));
                    }
                }
                String finalAccessToken = new String(accessToken);
                request.headers(h -> h.add(HttpHeader.AUTHORIZATION, "OAuth " + finalAccessToken));
            }

            protected void storeCookies(URI uri, Map<String, List<String>> cookies) {
                block3: {
                    try {
                        CookieManager cookieManager = new CookieManager(cookieStore, CookiePolicy.ACCEPT_ALL);
                        cookieManager.put(uri, cookies);
                        for (java.net.HttpCookie httpCookie : cookieManager.getCookieStore().getCookies()) {
                            this.val$httpCookieStore.add(uri, HttpCookie.from((java.net.HttpCookie)httpCookie));
                        }
                    }
                    catch (IOException x) {
                        if (!LOG.isDebugEnabled()) break block3;
                        LOG.debug("Could not parse cookies", (Throwable)x);
                    }
                }
            }

            protected HttpCookieStore getHttpCookieStore() {
                return this.val$httpCookieStore;
            }
        };
        BayeuxClient client = new BayeuxClient(SubscriptionHelper.getEndpointUrl(component), (ClientTransport)transport, new ClientTransport[0]);
        client.addExtension((ClientSession.Extension)REPLAY_EXTENSION);
        return client;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void subscribe(StreamingApiConsumer consumer) {
        this.lock.lock();
        try {
            String channelName = SubscriptionHelper.getChannelName(consumer.getTopicName());
            this.channelToConsumers.computeIfAbsent(channelName, key -> ConcurrentHashMap.newKeySet()).add(consumer);
            this.channelsToSubscribe.add(channelName);
            this.setReplayIdIfAbsent(consumer.getEndpoint());
            LOG.info("Subscribing to channel {}...", (Object)channelName);
            ClientSessionChannel.MessageListener messageListener = this.consumerToListener.computeIfAbsent(consumer, key -> (channel, message) -> {
                LOG.debug("Received Message: {}", (Object)message);
                consumer.processMessage(channel, message);
            });
            ClientSessionChannel clientChannel = this.client.getChannel(channelName);
            clientChannel.subscribe(messageListener);
        }
        finally {
            this.lock.unlock();
        }
    }

    private static boolean isTemporaryError(Message message) {
        String failureReason = SubscriptionHelper.getFailureReason(message);
        return failureReason != null && failureReason.startsWith(SERVER_TOO_BUSY_ERROR);
    }

    private static String getFailureReason(Message message) {
        Map sfdcFields;
        String failureReason = null;
        if (message.getExt() != null && (sfdcFields = (Map)message.getExt().get(SFDC_FIELD)) != null) {
            failureReason = (String)sfdcFields.get(FAILURE_REASON_FIELD);
        }
        return failureReason;
    }

    private void setReplayIdIfAbsent(SalesforceEndpoint endpoint) {
        String topicName = endpoint.getTopicName();
        Optional<Long> replayId = SubscriptionHelper.determineReplayIdFor(endpoint, topicName);
        if (replayId.isPresent()) {
            String channelName = SubscriptionHelper.getChannelName(topicName);
            Long replayIdValue = replayId.get();
            REPLAY_EXTENSION.setReplayIdIfAbsent(channelName, replayIdValue);
        }
    }

    static Optional<Long> determineReplayIdFor(SalesforceEndpoint endpoint, String topicName) {
        String channelName = SubscriptionHelper.getChannelName(topicName);
        Long replayId = endpoint.getReplayId();
        SalesforceComponent component = endpoint.getComponent();
        SalesforceEndpointConfig endpointConfiguration = endpoint.getConfiguration();
        Map<String, Long> endpointInitialReplayIdMap = endpointConfiguration.getInitialReplayIdMap();
        Long endpointReplayId = endpointInitialReplayIdMap.getOrDefault(topicName, endpointInitialReplayIdMap.get(channelName));
        Long endpointDefaultReplayId = endpointConfiguration.getDefaultReplayId();
        SalesforceEndpointConfig componentConfiguration = component.getConfig();
        Map<String, Long> componentInitialReplayIdMap = componentConfiguration.getInitialReplayIdMap();
        Long componentReplayId = componentInitialReplayIdMap.getOrDefault(topicName, componentInitialReplayIdMap.get(channelName));
        Long componentDefaultReplayId = componentConfiguration.getDefaultReplayId();
        return Stream.of(replayId, endpointReplayId, componentReplayId, endpointDefaultReplayId, componentDefaultReplayId).filter(Objects::nonNull).findFirst();
    }

    static String getChannelName(String topicName) {
        StringBuilder channelName = new StringBuilder();
        if (topicName.charAt(0) != '/') {
            channelName.append('/');
        }
        if (topicName.indexOf(47, 1) > 0) {
            channelName.append(topicName);
        } else {
            channelName.append("topic/");
            channelName.append(topicName);
        }
        return channelName.toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unsubscribe(StreamingApiConsumer consumer) {
        this.lock.lock();
        try {
            ClientSessionChannel.MessageListener listener;
            String channelName = SubscriptionHelper.getChannelName(consumer.getTopicName());
            Set<StreamingApiConsumer> consumers = this.channelToConsumers.get(channelName);
            if (consumers != null) {
                consumers.remove((Object)consumer);
                if (consumers.isEmpty()) {
                    this.channelToConsumers.remove(channelName);
                }
            }
            if ((listener = this.consumerToListener.remove((Object)consumer)) != null) {
                LOG.debug("Unsubscribing from channel {}...", (Object)channelName);
                ClientSessionChannel clientChannel = this.client.getChannel(channelName);
                clientChannel.unsubscribe(listener);
                clientChannel.release();
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    static String getEndpointUrl(SalesforceComponent component) {
        if (Double.parseDouble(component.getConfig().getApiVersion()) == 36.0) {
            boolean replayOptionsPresent;
            boolean bl = replayOptionsPresent = component.getConfig().getDefaultReplayId() != null || !component.getConfig().getInitialReplayIdMap().isEmpty();
            if (replayOptionsPresent) {
                return component.getSession().getInstanceUrl() + "/cometd/replay/" + component.getConfig().getApiVersion();
            }
        }
        return component.getSession().getInstanceUrl() + "/cometd/" + component.getConfig().getApiVersion();
    }
}

