package com.tencent.polaris.discovery.client.flow;

import com.tencent.polaris.api.exception.PolarisException;
import com.tencent.polaris.api.listener.ServiceListener;
import com.tencent.polaris.api.plugin.compose.Extensions;
import com.tencent.polaris.api.plugin.registry.AbstractResourceEventListener;
import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.api.pojo.RegistryCacheValue;
import com.tencent.polaris.api.pojo.ServiceChangeEvent;
import com.tencent.polaris.api.pojo.ServiceEventKey;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.api.rpc.InstancesResponse;
import com.tencent.polaris.api.rpc.WatchServiceResponse;
import com.tencent.polaris.api.utils.CollectionUtils;
import com.tencent.polaris.client.pojo.ServiceInstancesByProto;
import com.tencent.polaris.client.util.NamedThreadFactory;
import com.tencent.polaris.client.util.Utils;
import com.tencent.polaris.logging.LoggerFactory;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;

/* loaded from: input_file:com/tencent/polaris/discovery/client/flow/WatchFlow.class */
public class WatchFlow {
    private static final Logger LOG = LoggerFactory.getLogger(SyncFlow.class);
    private static final Logger UPDATE_EVENT_LOG = LoggerFactory.getLogger("polaris-update-event-async");
    private static final Map<ServiceKey, Set<ServiceListener>> watchers = new ConcurrentHashMap();
    private final AtomicBoolean initialize = new AtomicBoolean(false);
    private Extensions extensions;
    private SyncFlow syncFlow;
    private DispatchExecutor executor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/tencent/polaris/discovery/client/flow/WatchFlow$DispatchExecutor.class */
    public static class DispatchExecutor {
        private final Executor[] executors;

        public DispatchExecutor(int i) {
            i = i < 1 ? 1 : i;
            this.executors = new Executor[i];
            for (int i2 = 0; i2 < i; i2++) {
                this.executors[i2] = Executors.newFixedThreadPool(1, new NamedThreadFactory("service-watch-dispatch" + i2));
            }
        }

        public void execute(ServiceKey serviceKey, Runnable runnable) {
            this.executors[Math.abs(serviceKey.hashCode()) % this.executors.length].execute(runnable);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/tencent/polaris/discovery/client/flow/WatchFlow$InstanceChangeListener.class */
    public class InstanceChangeListener extends AbstractResourceEventListener {
        private final BiConsumer<ServiceChangeEvent, ServiceListener> consumer;

        private InstanceChangeListener() {
            this.consumer = (serviceChangeEvent, serviceListener) -> {
                WatchFlow.this.executor.execute(serviceChangeEvent.getServiceKey(), () -> {
                    serviceListener.onEvent(serviceChangeEvent);
                });
            };
        }

        public void onResourceUpdated(ServiceEventKey serviceEventKey, RegistryCacheValue registryCacheValue, RegistryCacheValue registryCacheValue2) {
            if (registryCacheValue2.getEventType() == ServiceEventKey.EventType.INSTANCE && (registryCacheValue instanceof ServiceInstancesByProto) && (registryCacheValue2 instanceof ServiceInstancesByProto)) {
                WatchFlow.LOG.debug("receive service={} change event", serviceEventKey);
                ServiceInstancesByProto serviceInstancesByProto = (ServiceInstancesByProto) registryCacheValue;
                ServiceInstancesByProto serviceInstancesByProto2 = (ServiceInstancesByProto) registryCacheValue2;
                ServiceChangeEvent build = ServiceChangeEvent.builder().serviceKey(serviceEventKey.getServiceKey()).addInstances(Utils.checkAddInstances(serviceInstancesByProto, serviceInstancesByProto2)).updateInstances(Utils.checkUpdateInstances(serviceInstancesByProto, serviceInstancesByProto2)).deleteInstances(Utils.checkDeleteInstances(serviceInstancesByProto, serviceInstancesByProto2)).allInstances(serviceInstancesByProto2.getInstances()).build();
                logChangeInstances(serviceEventKey, build, serviceInstancesByProto, serviceInstancesByProto2);
                ((Set) WatchFlow.watchers.getOrDefault(serviceEventKey.getServiceKey(), Collections.emptySet())).forEach(serviceListener -> {
                    this.consumer.accept(build, serviceListener);
                });
            }
        }

        private void logChangeInstances(ServiceEventKey serviceEventKey, ServiceChangeEvent serviceChangeEvent, ServiceInstancesByProto serviceInstancesByProto, ServiceInstancesByProto serviceInstancesByProto2) {
            WatchFlow.UPDATE_EVENT_LOG.info("service instances of {} change, oldRevision {}, newRevision {}, oldCount {}, newCount {}.", new Object[]{serviceEventKey, serviceInstancesByProto.getRevision(), serviceInstancesByProto2.getRevision(), Integer.valueOf(serviceInstancesByProto.getInstances().size()), Integer.valueOf(serviceInstancesByProto2.getInstances().size())});
            for (Instance instance : serviceChangeEvent.getAddInstances()) {
                WatchFlow.UPDATE_EVENT_LOG.info("add instance of {}: [{}:{}, status: {}].", new Object[]{serviceEventKey, instance.getHost(), Integer.valueOf(instance.getPort()), totalInstanceInfo(instance)});
            }
            for (ServiceChangeEvent.OneInstanceUpdate oneInstanceUpdate : serviceChangeEvent.getUpdateInstances()) {
                WatchFlow.UPDATE_EVENT_LOG.info("modify instance of {} from [{}:{}, status: {}] to [{}:{}, status: {}].", new Object[]{serviceEventKey, oneInstanceUpdate.getBefore().getHost(), Integer.valueOf(oneInstanceUpdate.getBefore().getPort()), totalInstanceInfo(oneInstanceUpdate.getBefore()), oneInstanceUpdate.getAfter().getHost(), Integer.valueOf(oneInstanceUpdate.getAfter().getPort()), totalInstanceInfo(oneInstanceUpdate.getAfter())});
            }
            for (Instance instance2 : serviceChangeEvent.getDeleteInstances()) {
                WatchFlow.UPDATE_EVENT_LOG.info("delete instance of {}: [{}:{}, status: {}].", new Object[]{serviceEventKey, instance2.getHost(), Integer.valueOf(instance2.getPort()), totalInstanceInfo(instance2)});
            }
        }

        private String totalInstanceInfo(Instance instance) {
            return String.format("healthy:%s;isolate:%s;weight:%s", Boolean.valueOf(instance.isHealthy()), Boolean.valueOf(instance.isIsolated()), Integer.valueOf(instance.getWeight()));
        }
    }

    public void init(Extensions extensions, SyncFlow syncFlow) {
        this.extensions = extensions;
        this.syncFlow = syncFlow;
        initFlow();
    }

    public WatchServiceResponse commonWatchService(CommonWatchServiceRequest commonWatchServiceRequest) throws PolarisException {
        this.extensions.getLocalRegistry().watchResource(commonWatchServiceRequest.getSvcEventKey());
        ServiceKey serviceKey = commonWatchServiceRequest.getSvcEventKey().getServiceKey();
        InstancesResponse commonSyncGetAllInstances = this.syncFlow.commonSyncGetAllInstances(commonWatchServiceRequest.getAllRequest());
        watchers.computeIfAbsent(commonWatchServiceRequest.getSvcEventKey().getServiceKey(), serviceKey2 -> {
            return Collections.synchronizedSet(new HashSet());
        });
        List listeners = commonWatchServiceRequest.getWatchServiceRequest().getListeners();
        Set<ServiceListener> set = watchers.get(serviceKey);
        List list = (List) listeners.stream().filter(serviceListener -> {
            return !set.contains(serviceListener);
        }).collect(Collectors.toList());
        if (CollectionUtils.isNotEmpty(list)) {
            ServiceChangeEvent build = ServiceChangeEvent.builder().serviceKey(serviceKey).addInstances(Arrays.asList(commonSyncGetAllInstances.getInstances())).allInstances(Arrays.asList(commonSyncGetAllInstances.getInstances())).build();
            list.forEach(serviceListener2 -> {
                this.executor.execute(build.getServiceKey(), () -> {
                    serviceListener2.onEvent(build);
                });
            });
        }
        return new WatchServiceResponse(commonSyncGetAllInstances, set.addAll(listeners));
    }

    public WatchServiceResponse commonUnWatchService(CommonUnWatchServiceRequest commonUnWatchServiceRequest) throws PolarisException {
        boolean z = true;
        Set<ServiceListener> set = watchers.get(commonUnWatchServiceRequest.getSvcEventKey().getServiceKey());
        if (commonUnWatchServiceRequest.getRequest().isRemoveAll()) {
            watchers.remove(commonUnWatchServiceRequest.getSvcEventKey().getServiceKey());
            this.extensions.getLocalRegistry().unwatchResource(commonUnWatchServiceRequest.getSvcEventKey());
        } else {
            if (CollectionUtils.isNotEmpty(set)) {
                z = set.removeAll(commonUnWatchServiceRequest.getRequest().getListeners());
            }
            if (CollectionUtils.isEmpty(set)) {
                this.extensions.getLocalRegistry().unwatchResource(commonUnWatchServiceRequest.getSvcEventKey());
            }
        }
        return new WatchServiceResponse((InstancesResponse) null, z);
    }

    private void initFlow() {
        if (this.initialize.compareAndSet(false, true)) {
            this.extensions.getLocalRegistry().registerResourceListener(new InstanceChangeListener());
            this.executor = new DispatchExecutor(this.extensions.getConfiguration().getConsumer().getSubscribe().getCallbackConcurrency());
        }
    }
}
