/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.config.server.service.notify;

import com.alibaba.nacos.api.config.remote.request.cluster.ConfigChangeClusterSyncRequest;
import com.alibaba.nacos.api.config.remote.response.cluster.ConfigChangeClusterSyncResponse;
import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.utils.NetUtils;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.config.server.model.event.ConfigDataChangeEvent;
import com.alibaba.nacos.config.server.monitor.MetricsMonitor;
import com.alibaba.nacos.config.server.remote.ConfigClusterRpcClientProxy;
import com.alibaba.nacos.config.server.service.dump.DumpService;
import com.alibaba.nacos.config.server.service.notify.NotifyTask;
import com.alibaba.nacos.config.server.service.trace.ConfigTraceService;
import com.alibaba.nacos.config.server.utils.ConfigExecutor;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.sys.utils.InetUtils;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class AsyncNotifyService {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncNotifyService.class);
    private static final int MIN_RETRY_INTERVAL = 500;
    private static final int INCREASE_STEPS = 1000;
    private static final int MAX_COUNT = 6;
    @Autowired
    private DumpService dumpService;
    @Autowired
    private ConfigClusterRpcClientProxy configClusterRpcClientProxy;
    private ServerMemberManager memberManager;

    @Autowired
    public AsyncNotifyService(final ServerMemberManager memberManager) {
        this.memberManager = memberManager;
        NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, (int)NotifyCenter.ringBufferSize);
        NotifyCenter.registerSubscriber((Subscriber)new Subscriber(){

            public void onEvent(Event event) {
                if (event instanceof ConfigDataChangeEvent) {
                    ConfigDataChangeEvent evt = (ConfigDataChangeEvent)event;
                    long dumpTs = evt.lastModifiedTs;
                    String dataId = evt.dataId;
                    String group = evt.group;
                    String tenant = evt.tenant;
                    String tag = evt.tag;
                    MetricsMonitor.incrementConfigChangeCount(tenant, group, dataId);
                    Collection ipList = memberManager.allMembers();
                    LinkedList<NotifySingleRpcTask> rpcQueue = new LinkedList<NotifySingleRpcTask>();
                    for (Member member : ipList) {
                        rpcQueue.add(new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));
                    }
                    if (!rpcQueue.isEmpty()) {
                        ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));
                    }
                }
            }

            public Class<? extends Event> subscribeType() {
                return ConfigDataChangeEvent.class;
            }
        });
    }

    private void asyncTaskExecute(NotifySingleRpcTask task) {
        int delay = AsyncNotifyService.getDelayTime(task);
        LinkedList<NotifySingleRpcTask> queue = new LinkedList<NotifySingleRpcTask>();
        queue.add(task);
        AsyncRpcTask asyncTask = new AsyncRpcTask(queue);
        ConfigExecutor.scheduleAsyncNotify(asyncTask, delay, TimeUnit.MILLISECONDS);
    }

    private static int getDelayTime(NotifyTask task) {
        int failCount = task.getFailCount();
        int delay = 500 + failCount * failCount * 1000;
        if (failCount <= 6) {
            task.setFailCount(failCount + 1);
        }
        return delay;
    }

    class AsyncRpcNotifyCallBack
    implements RequestCallBack<ConfigChangeClusterSyncResponse> {
        private NotifySingleRpcTask task;

        public AsyncRpcNotifyCallBack(NotifySingleRpcTask task) {
            this.task = task;
        }

        public Executor getExecutor() {
            return ConfigExecutor.getConfigSubServiceExecutor();
        }

        public long getTimeout() {
            return 1000L;
        }

        public void onResponse(ConfigChangeClusterSyncResponse response) {
            long delayed = System.currentTimeMillis() - this.task.getLastModified();
            if (response.isSuccess()) {
                ConfigTraceService.logNotifyEvent(this.task.getDataId(), this.task.getGroup(), this.task.getTenant(), null, this.task.getLastModified(), InetUtils.getSelfIP(), "ok", delayed, this.task.member.getAddress());
            } else {
                LOGGER.error("[notify-error] target:{} dataId:{} group:{} ts:{} code:{}", new Object[]{this.task.member.getAddress(), this.task.getDataId(), this.task.getGroup(), this.task.getLastModified(), response.getErrorCode()});
                ConfigTraceService.logNotifyEvent(this.task.getDataId(), this.task.getGroup(), this.task.getTenant(), null, this.task.getLastModified(), InetUtils.getSelfIP(), "error", delayed, this.task.member.getAddress());
                AsyncNotifyService.this.asyncTaskExecute(this.task);
                LogUtil.NOTIFY_LOG.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", new Object[]{this.task.member.getAddress(), this.task.getDataId(), this.task.getGroup(), this.task.getLastModified()});
                MetricsMonitor.getConfigNotifyException().increment();
            }
        }

        public void onException(Throwable ex) {
            long delayed = System.currentTimeMillis() - this.task.getLastModified();
            LOGGER.error("[notify-exception] target:{} dataId:{} group:{} ts:{} ex:{}", new Object[]{this.task.member.getAddress(), this.task.getDataId(), this.task.getGroup(), this.task.getLastModified(), ex});
            ConfigTraceService.logNotifyEvent(this.task.getDataId(), this.task.getGroup(), this.task.getTenant(), null, this.task.getLastModified(), InetUtils.getSelfIP(), "exception", delayed, this.task.member.getAddress());
            AsyncNotifyService.this.asyncTaskExecute(this.task);
            LogUtil.NOTIFY_LOG.error("[notify-retry] target:{} dataId:{} group:{} ts:{}", new Object[]{this.task.member.getAddress(), this.task.getDataId(), this.task.getGroup(), this.task.getLastModified()});
            MetricsMonitor.getConfigNotifyException().increment();
        }
    }

    static class NotifySingleRpcTask
    extends NotifyTask {
        private Member member;
        private boolean isBeta;
        private String tag;

        public NotifySingleRpcTask(String dataId, String group, String tenant, String tag, long lastModified, boolean isBeta, Member member) {
            super(dataId, group, tenant, lastModified);
            this.member = member;
            this.isBeta = isBeta;
            this.tag = tag;
        }
    }

    class AsyncRpcTask
    implements Runnable {
        private Queue<NotifySingleRpcTask> queue;

        public AsyncRpcTask(Queue<NotifySingleRpcTask> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            while (!this.queue.isEmpty()) {
                NotifySingleRpcTask task = this.queue.poll();
                ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest();
                syncRequest.setDataId(task.getDataId());
                syncRequest.setGroup(task.getGroup());
                syncRequest.setBeta(task.isBeta);
                syncRequest.setLastModified(task.getLastModified());
                syncRequest.setTag(task.tag);
                syncRequest.setTenant(task.getTenant());
                Member member = task.member;
                if (AsyncNotifyService.this.memberManager.getSelf().equals((Object)member)) {
                    if (syncRequest.isBeta()) {
                        AsyncNotifyService.this.dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(), syncRequest.getLastModified(), NetUtils.localIP(), true);
                        continue;
                    }
                    AsyncNotifyService.this.dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(), syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());
                    continue;
                }
                if (!AsyncNotifyService.this.memberManager.hasMember(member.getAddress())) continue;
                boolean unHealthNeedDelay = AsyncNotifyService.this.memberManager.isUnHealth(member.getAddress());
                if (unHealthNeedDelay) {
                    ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), InetUtils.getSelfIP(), "unhealth", 0L, member.getAddress());
                    AsyncNotifyService.this.asyncTaskExecute(task);
                    continue;
                }
                try {
                    AsyncNotifyService.this.configClusterRpcClientProxy.syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));
                }
                catch (Exception e) {
                    MetricsMonitor.getConfigNotifyException().increment();
                    AsyncNotifyService.this.asyncTaskExecute(task);
                }
            }
        }
    }
}

