/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.naming.consistency.ephemeral.distro;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.core.distributed.distro.DistroConfig;
import com.alibaba.nacos.core.distributed.distro.DistroProtocol;
import com.alibaba.nacos.core.distributed.distro.component.DistroDataProcessor;
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import com.alibaba.nacos.naming.cluster.ServerStatus;
import com.alibaba.nacos.naming.cluster.transport.Serializer;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.ephemeral.EphemeralConsistencyService;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.DataStore;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroHttpData;
import com.alibaba.nacos.naming.consistency.ephemeral.distro.combined.DistroHttpCombinedKey;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import com.alibaba.nacos.naming.misc.GlobalConfig;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.pojo.Record;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.annotation.PostConstruct;
import org.javatuples.Pair;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Service;

@DependsOn(value={"ProtocolManager"})
@Service(value="distroConsistencyService")
public class DistroConsistencyServiceImpl
implements EphemeralConsistencyService,
DistroDataProcessor {
    private static final String ON_RECEIVE_CHECKSUMS_PROCESSING_TAG = "1";
    private final DistroMapper distroMapper;
    private final DataStore dataStore;
    private final Serializer serializer;
    private final SwitchDomain switchDomain;
    private final GlobalConfig globalConfig;
    private final DistroProtocol distroProtocol;
    private volatile Notifier notifier = new Notifier();
    private Map<String, ConcurrentLinkedQueue<RecordListener>> listeners = new ConcurrentHashMap<String, ConcurrentLinkedQueue<RecordListener>>();
    private Map<String, String> syncChecksumTasks = new ConcurrentHashMap<String, String>(16);

    public DistroConsistencyServiceImpl(DistroMapper distroMapper, DataStore dataStore, Serializer serializer, SwitchDomain switchDomain, GlobalConfig globalConfig, DistroProtocol distroProtocol) {
        this.distroMapper = distroMapper;
        this.dataStore = dataStore;
        this.serializer = serializer;
        this.switchDomain = switchDomain;
        this.globalConfig = globalConfig;
        this.distroProtocol = distroProtocol;
    }

    @PostConstruct
    public void init() {
        GlobalExecutor.submitDistroNotifyTask(this.notifier);
    }

    @Override
    public void put(String key, Record value) throws NacosException {
        this.onPut(key, value);
        if (((UpgradeJudgement)((Object)ApplicationUtils.getBean(UpgradeJudgement.class))).isUseGrpcFeatures()) {
            return;
        }
        this.distroProtocol.sync(new DistroKey(key, "com.alibaba.nacos.naming.iplist."), DataOperation.CHANGE, DistroConfig.getInstance().getSyncDelayMillis());
    }

    @Override
    public void remove(String key) throws NacosException {
        this.onRemove(key);
        this.listeners.remove(key);
    }

    @Override
    public Datum get(String key) throws NacosException {
        return this.dataStore.get(key);
    }

    public void onPut(String key, Record value) {
        if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
            Datum datum = new Datum();
            datum.value = (Instances)value;
            datum.key = key;
            datum.timestamp.incrementAndGet();
            this.dataStore.put(key, datum);
        }
        if (!this.listeners.containsKey(key)) {
            return;
        }
        this.notifier.addTask(key, DataOperation.CHANGE);
    }

    public void onRemove(String key) {
        this.dataStore.remove(key);
        if (!this.listeners.containsKey(key)) {
            return;
        }
        this.notifier.addTask(key, DataOperation.DELETE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onReceiveChecksums(Map<String, String> checksumMap, String server) {
        if (this.syncChecksumTasks.putIfAbsent(server, ON_RECEIVE_CHECKSUMS_PROCESSING_TAG) != null) {
            Loggers.DISTRO.warn("sync checksum task already in process with {}", (Object)server);
            return;
        }
        try {
            ArrayList<String> toUpdateKeys = new ArrayList<String>();
            ArrayList<String> toRemoveKeys = new ArrayList<String>();
            for (Map.Entry<String, String> entry : checksumMap.entrySet()) {
                if (this.distroMapper.responsible(KeyBuilder.getServiceName(entry.getKey()))) {
                    Loggers.DISTRO.error("receive responsible key timestamp of " + entry.getKey() + " from " + server);
                    return;
                }
                if (this.dataStore.contains(entry.getKey()) && this.dataStore.get((String)entry.getKey()).value != null && this.dataStore.get((String)entry.getKey()).value.getChecksum().equals(entry.getValue())) continue;
                toUpdateKeys.add(entry.getKey());
            }
            for (String key : this.dataStore.keys()) {
                if (!server.equals(this.distroMapper.mapSrv(KeyBuilder.getServiceName(key))) || checksumMap.containsKey(key)) continue;
                toRemoveKeys.add(key);
            }
            Loggers.DISTRO.info("to remove keys: {}, to update keys: {}, source: {}", new Object[]{toRemoveKeys, toUpdateKeys, server});
            for (String key : toRemoveKeys) {
                this.onRemove(key);
            }
            if (toUpdateKeys.isEmpty()) {
                return;
            }
            try {
                DistroHttpCombinedKey distroKey = new DistroHttpCombinedKey("com.alibaba.nacos.naming.iplist.", server);
                distroKey.getActualResourceTypes().addAll(toUpdateKeys);
                DistroData remoteData = this.distroProtocol.queryFromRemote((DistroKey)distroKey);
                if (null != remoteData) {
                    this.processData(remoteData.getContent());
                }
            }
            catch (Exception e) {
                Loggers.DISTRO.error("get data from " + server + " failed!", (Throwable)e);
            }
        }
        finally {
            this.syncChecksumTasks.remove(server);
        }
    }

    private boolean processData(byte[] data) throws Exception {
        if (data.length > 0) {
            Map<String, Datum<Instances>> datumMap = this.serializer.deserializeMap(data, Instances.class);
            for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
                this.dataStore.put(entry.getKey(), entry.getValue());
                if (this.listeners.containsKey(entry.getKey()) || !this.switchDomain.isDefaultInstanceEphemeral()) continue;
                Loggers.DISTRO.info("creating service {}", (Object)entry.getKey());
                com.alibaba.nacos.naming.core.Service service = new com.alibaba.nacos.naming.core.Service();
                String serviceName = KeyBuilder.getServiceName(entry.getKey());
                String namespaceId = KeyBuilder.getNamespace(entry.getKey());
                service.setName(serviceName);
                service.setNamespaceId(namespaceId);
                service.setGroupName("DEFAULT_GROUP");
                service.setLastModifiedMillis(System.currentTimeMillis());
                service.recalculateChecksum();
                RecordListener listener = this.listeners.get("com.alibaba.nacos.naming.domains.meta.").peek();
                if (Objects.isNull(listener)) {
                    return false;
                }
                listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
            }
            for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
                if (!this.listeners.containsKey(entry.getKey())) {
                    Loggers.DISTRO.warn("listener of {} not found.", (Object)entry.getKey());
                    continue;
                }
                try {
                    for (RecordListener listener : this.listeners.get(entry.getKey())) {
                        listener.onChange(entry.getKey(), entry.getValue().value);
                    }
                }
                catch (Exception e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", (Object)entry.getKey(), (Object)e);
                    continue;
                }
                this.dataStore.put(entry.getKey(), entry.getValue());
            }
        }
        return true;
    }

    public boolean processData(DistroData distroData) {
        DistroHttpData distroHttpData = (DistroHttpData)distroData;
        Datum datum = (Datum)distroHttpData.getDeserializedContent();
        this.onPut(datum.key, (Record)datum.value);
        return true;
    }

    public String processType() {
        return "com.alibaba.nacos.naming.iplist.";
    }

    public boolean processVerifyData(DistroData distroData, String sourceAddress) {
        if (((UpgradeJudgement)((Object)ApplicationUtils.getBean(UpgradeJudgement.class))).isUseGrpcFeatures()) {
            return true;
        }
        DistroHttpData distroHttpData = (DistroHttpData)distroData;
        Map verifyData = (Map)distroHttpData.getDeserializedContent();
        this.onReceiveChecksums(verifyData, sourceAddress);
        return true;
    }

    public boolean processSnapshot(DistroData distroData) {
        try {
            return this.processData(distroData.getContent());
        }
        catch (Exception e) {
            return false;
        }
    }

    @Override
    public void listen(String key, RecordListener listener) throws NacosException {
        if (!this.listeners.containsKey(key)) {
            this.listeners.put(key, new ConcurrentLinkedQueue());
        }
        if (this.listeners.get(key).contains(listener)) {
            return;
        }
        this.listeners.get(key).add(listener);
    }

    @Override
    public void unListen(String key, RecordListener listener) throws NacosException {
        if (!this.listeners.containsKey(key)) {
            return;
        }
        for (RecordListener recordListener : this.listeners.get(key)) {
            if (!recordListener.equals(listener)) continue;
            this.listeners.get(key).remove(listener);
            break;
        }
    }

    @Override
    public boolean isAvailable() {
        return this.isInitialized() || ServerStatus.UP.name().equals(this.switchDomain.getOverriddenServerStatus());
    }

    @Override
    public Optional<String> getErrorMsg() {
        String errorMsg = !this.isInitialized() && !ServerStatus.UP.name().equals(this.switchDomain.getOverriddenServerStatus()) ? "Distro protocol is not initialized" : null;
        return Optional.ofNullable(errorMsg);
    }

    public boolean isInitialized() {
        return this.distroProtocol.isInitialized() || !this.globalConfig.isDataWarmup();
    }

    public class Notifier
    implements Runnable {
        private ConcurrentHashMap<String, String> services = new ConcurrentHashMap(10240);
        private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<Pair<String, DataOperation>>(0x100000);

        public void addTask(String datumKey, DataOperation action) {
            if (this.services.containsKey(datumKey) && action == DataOperation.CHANGE) {
                return;
            }
            if (action == DataOperation.CHANGE) {
                this.services.put(datumKey, "");
            }
            this.tasks.offer((Pair<String, DataOperation>)Pair.with((Object)datumKey, (Object)action));
        }

        public int getTaskSize() {
            return this.tasks.size();
        }

        @Override
        public void run() {
            Loggers.DISTRO.info("distro notifier started");
            while (true) {
                try {
                    while (true) {
                        Pair<String, DataOperation> pair = this.tasks.take();
                        this.handle(pair);
                    }
                }
                catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
                    continue;
                }
                break;
            }
        }

        private void handle(Pair<String, DataOperation> pair) {
            try {
                String datumKey = (String)pair.getValue0();
                DataOperation action = (DataOperation)pair.getValue1();
                this.services.remove(datumKey);
                int count = 0;
                if (!DistroConsistencyServiceImpl.this.listeners.containsKey(datumKey)) {
                    return;
                }
                for (RecordListener listener : (ConcurrentLinkedQueue)DistroConsistencyServiceImpl.this.listeners.get(datumKey)) {
                    ++count;
                    try {
                        if (action == DataOperation.CHANGE) {
                            listener.onChange(datumKey, ((DistroConsistencyServiceImpl)DistroConsistencyServiceImpl.this).dataStore.get((String)datumKey).value);
                            continue;
                        }
                        if (action != DataOperation.DELETE) continue;
                        listener.onDelete(datumKey);
                    }
                    catch (Throwable e) {
                        Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", (Object)datumKey, (Object)e);
                    }
                }
                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", new Object[]{datumKey, count, action.name()});
                }
            }
            catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
            }
        }
    }
}

