/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.highavailability;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher;
import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.leaderelection.LeaderElectionException;
import org.apache.flink.runtime.leaderelection.LeaderInformation;
import org.apache.flink.runtime.leaderelection.LeaderInformationWithComponentId;
import org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriver;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KubernetesMultipleComponentLeaderElectionDriver
implements MultipleComponentLeaderElectionDriver {
    private static final Logger LOG = LoggerFactory.getLogger(KubernetesMultipleComponentLeaderElectionDriver.class);
    private final FlinkKubeClient kubeClient;
    private final String configMapName;
    private final String lockIdentity;
    private final MultipleComponentLeaderElectionDriver.Listener leaderElectionListener;
    private final KubernetesLeaderElector leaderElector;
    private final Map<String, String> configMapLabels;
    private final FatalErrorHandler fatalErrorHandler;
    private final KubernetesSharedWatcher.Watch kubernetesWatch;
    private final AtomicBoolean running = new AtomicBoolean(true);

    public KubernetesMultipleComponentLeaderElectionDriver(KubernetesLeaderElectionConfiguration leaderElectionConfiguration, FlinkKubeClient kubeClient, MultipleComponentLeaderElectionDriver.Listener leaderElectionListener, KubernetesConfigMapSharedWatcher configMapSharedWatcher, Executor watchExecutor, FatalErrorHandler fatalErrorHandler) {
        Preconditions.checkNotNull((Object)leaderElectionConfiguration);
        this.kubeClient = (FlinkKubeClient)Preconditions.checkNotNull((Object)kubeClient);
        this.leaderElectionListener = (MultipleComponentLeaderElectionDriver.Listener)Preconditions.checkNotNull((Object)leaderElectionListener);
        this.fatalErrorHandler = (FatalErrorHandler)Preconditions.checkNotNull((Object)fatalErrorHandler);
        Preconditions.checkNotNull((Object)configMapSharedWatcher);
        Preconditions.checkNotNull((Object)watchExecutor);
        this.configMapName = leaderElectionConfiguration.getConfigMapName();
        this.lockIdentity = leaderElectionConfiguration.getLockIdentity();
        this.leaderElector = kubeClient.createLeaderElector(leaderElectionConfiguration, new LeaderCallbackHandlerImpl());
        this.configMapLabels = KubernetesUtils.getConfigMapLabels(leaderElectionConfiguration.getClusterId(), "high-availability");
        this.kubernetesWatch = configMapSharedWatcher.watch(this.configMapName, new ConfigMapCallbackHandlerImpl(), watchExecutor);
        this.leaderElector.run();
        LOG.debug("Starting the {} for config map {}.", (Object)this.getClass().getSimpleName(), (Object)this.configMapName);
    }

    public void close() throws Exception {
        if (this.running.compareAndSet(true, false)) {
            LOG.info("Closing {}.", (Object)this);
            this.leaderElector.stop();
            this.kubernetesWatch.close();
        }
    }

    public boolean hasLeadership() {
        Preconditions.checkState((boolean)this.running.get());
        Optional<KubernetesConfigMap> optionalConfigMap = this.kubeClient.getConfigMap(this.configMapName);
        if (optionalConfigMap.isPresent()) {
            return KubernetesLeaderElector.hasLeadership(optionalConfigMap.get(), this.lockIdentity);
        }
        this.fatalErrorHandler.onFatalError((Throwable)((Object)new KubernetesException(String.format("ConfigMap %s does not exist. This indicates that somebody has interfered with Flink's operation.", this.configMapName))));
        return false;
    }

    public void publishLeaderInformation(String componentId, LeaderInformation leaderInformation) throws Exception {
        Preconditions.checkState((boolean)this.running.get());
        this.kubeClient.checkAndUpdateConfigMap(this.configMapName, this.updateConfigMapWithLeaderInformation(componentId, leaderInformation)).get();
        LOG.debug("Successfully wrote leader information {} for leader {} into the config map {}.", new Object[]{leaderInformation, componentId, this.configMapName});
    }

    public void deleteLeaderInformation(String componentId) throws Exception {
        this.publishLeaderInformation(componentId, LeaderInformation.empty());
    }

    private Function<KubernetesConfigMap, Optional<KubernetesConfigMap>> updateConfigMapWithLeaderInformation(String leaderName, LeaderInformation leaderInformation) {
        String configMapDataKey = KubernetesUtils.createSingleLeaderKey(leaderName);
        return kubernetesConfigMap -> {
            if (KubernetesLeaderElector.hasLeadership(kubernetesConfigMap, this.lockIdentity)) {
                Map<String, String> data = kubernetesConfigMap.getData();
                if (leaderInformation.isEmpty()) {
                    data.remove(configMapDataKey);
                } else {
                    data.put(configMapDataKey, KubernetesUtils.encodeLeaderInformation(leaderInformation));
                }
                kubernetesConfigMap.getLabels().putAll(this.configMapLabels);
                return Optional.of(kubernetesConfigMap);
            }
            return Optional.empty();
        };
    }

    private static Collection<LeaderInformationWithComponentId> extractLeaderInformation(KubernetesConfigMap configMap) {
        Map<String, String> data = configMap.getData();
        ArrayList<LeaderInformationWithComponentId> leaderInformationWithLeaderNames = new ArrayList<LeaderInformationWithComponentId>();
        for (Map.Entry<String, String> keyValuePair : data.entrySet()) {
            String key = keyValuePair.getKey();
            if (!KubernetesUtils.isSingleLeaderKey(key)) continue;
            String leaderName = KubernetesUtils.extractLeaderName(key);
            LeaderInformation leaderInformation = KubernetesUtils.parseLeaderInformationSafely(keyValuePair.getValue()).orElse(LeaderInformation.empty());
            leaderInformationWithLeaderNames.add(LeaderInformationWithComponentId.create((String)leaderName, (LeaderInformation)leaderInformation));
        }
        return leaderInformationWithLeaderNames;
    }

    private class ConfigMapCallbackHandlerImpl
    implements FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap> {
        private ConfigMapCallbackHandlerImpl() {
        }

        @Override
        public void onAdded(List<KubernetesConfigMap> resources) {
        }

        @Override
        public void onModified(List<KubernetesConfigMap> configMaps) {
            KubernetesConfigMap configMap = KubernetesUtils.checkConfigMaps(configMaps, KubernetesMultipleComponentLeaderElectionDriver.this.configMapName);
            if (KubernetesLeaderElector.hasLeadership(configMap, KubernetesMultipleComponentLeaderElectionDriver.this.lockIdentity)) {
                Collection leaderInformationWithLeaderNames = KubernetesMultipleComponentLeaderElectionDriver.extractLeaderInformation(configMap);
                KubernetesMultipleComponentLeaderElectionDriver.this.leaderElectionListener.notifyAllKnownLeaderInformation(leaderInformationWithLeaderNames);
            }
        }

        @Override
        public void onDeleted(List<KubernetesConfigMap> configMaps) {
            KubernetesConfigMap configMap = KubernetesUtils.checkConfigMaps(configMaps, KubernetesMultipleComponentLeaderElectionDriver.this.configMapName);
            if (KubernetesLeaderElector.hasLeadership(configMap, KubernetesMultipleComponentLeaderElectionDriver.this.lockIdentity)) {
                KubernetesMultipleComponentLeaderElectionDriver.this.fatalErrorHandler.onFatalError((Throwable)new LeaderElectionException(String.format("ConfigMap %s has been deleted externally.", KubernetesMultipleComponentLeaderElectionDriver.this.configMapName)));
            }
        }

        @Override
        public void onError(List<KubernetesConfigMap> configMaps) {
            KubernetesConfigMap configMap = KubernetesUtils.checkConfigMaps(configMaps, KubernetesMultipleComponentLeaderElectionDriver.this.configMapName);
            if (KubernetesLeaderElector.hasLeadership(configMap, KubernetesMultipleComponentLeaderElectionDriver.this.lockIdentity)) {
                KubernetesMultipleComponentLeaderElectionDriver.this.fatalErrorHandler.onFatalError((Throwable)new LeaderElectionException(String.format("Error while watching the ConfigMap %s.", KubernetesMultipleComponentLeaderElectionDriver.this.configMapName)));
            }
        }

        @Override
        public void handleError(Throwable throwable) {
            KubernetesMultipleComponentLeaderElectionDriver.this.fatalErrorHandler.onFatalError((Throwable)new LeaderElectionException(String.format("Error while watching the ConfigMap %s.", KubernetesMultipleComponentLeaderElectionDriver.this.configMapName), throwable));
        }
    }

    private class LeaderCallbackHandlerImpl
    extends KubernetesLeaderElector.LeaderCallbackHandler {
        private LeaderCallbackHandlerImpl() {
        }

        @Override
        public void isLeader() {
            KubernetesMultipleComponentLeaderElectionDriver.this.leaderElectionListener.isLeader();
        }

        @Override
        public void notLeader() {
            KubernetesMultipleComponentLeaderElectionDriver.this.leaderElectionListener.notLeader();
            KubernetesMultipleComponentLeaderElectionDriver.this.leaderElector.run();
        }
    }
}

