/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.naming.core.v2.event.publisher;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.ShardedEventPublisher;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alipay.sofa.jraft.util.concurrent.ConcurrentHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;

public class NamingEventPublisher
extends Thread
implements ShardedEventPublisher {
    private static final String THREAD_NAME = "naming.publisher-";
    private static final int DEFAULT_WAIT_TIME = 60;
    private final Map<Class<? extends Event>, Set<Subscriber<? extends Event>>> subscribes = new ConcurrentHashMap<Class<? extends Event>, Set<Subscriber<? extends Event>>>();
    private volatile boolean initialized = false;
    private volatile boolean shutdown = false;
    private int queueMaxSize = -1;
    private BlockingQueue<Event> queue;
    private String publisherName;

    public void init(Class<? extends Event> type, int bufferSize) {
        this.queueMaxSize = bufferSize;
        this.queue = new ArrayBlockingQueue<Event>(bufferSize);
        this.publisherName = type.getSimpleName();
        super.setName(THREAD_NAME + this.publisherName);
        super.setDaemon(true);
        super.start();
        this.initialized = true;
    }

    public long currentEventSize() {
        return this.queue.size();
    }

    public void addSubscriber(Subscriber subscriber) {
        this.addSubscriber(subscriber, subscriber.subscribeType());
    }

    public void addSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType) {
        this.subscribes.computeIfAbsent(subscribeType, inputType -> new ConcurrentHashSet());
        this.subscribes.get(subscribeType).add((Subscriber<? extends Event>)subscriber);
    }

    public void removeSubscriber(Subscriber subscriber) {
        this.removeSubscriber(subscriber, subscriber.subscribeType());
    }

    public void removeSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType) {
        this.subscribes.computeIfPresent(subscribeType, (inputType, subscribers) -> {
            subscribers.remove(subscriber);
            return subscribers.isEmpty() ? null : subscribers;
        });
    }

    public boolean publish(Event event) {
        this.checkIsStart();
        boolean success = this.queue.offer(event);
        if (!success) {
            Loggers.EVT_LOG.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", (Object)event);
            this.handleEvent(event);
            return true;
        }
        return true;
    }

    public void notifySubscriber(Subscriber subscriber, Event event) {
        if (Loggers.EVT_LOG.isDebugEnabled()) {
            Loggers.EVT_LOG.debug("[NotifyCenter] the {} will received by {}", (Object)event, (Object)subscriber);
        }
        Runnable job = () -> subscriber.onEvent(event);
        Executor executor = subscriber.executor();
        if (executor != null) {
            executor.execute(job);
        } else {
            try {
                job.run();
            }
            catch (Throwable e) {
                Loggers.EVT_LOG.error("Event callback exception: ", e);
            }
        }
    }

    public void shutdown() throws NacosException {
        this.shutdown = true;
        this.queue.clear();
    }

    @Override
    public void run() {
        try {
            this.waitSubscriberForInit();
            this.handleEvents();
        }
        catch (Exception e) {
            Loggers.EVT_LOG.error("Naming Event Publisher {}, stop to handle event due to unexpected exception: ", (Object)this.publisherName, (Object)e);
        }
    }

    private void waitSubscriberForInit() {
        for (int waitTimes = 60; waitTimes > 0 && !this.shutdown && this.subscribes.isEmpty(); --waitTimes) {
            ThreadUtils.sleep((long)1000L);
        }
    }

    private void handleEvents() {
        while (!this.shutdown) {
            try {
                Event event = this.queue.take();
                this.handleEvent(event);
            }
            catch (InterruptedException e) {
                Loggers.EVT_LOG.warn("Naming Event Publisher {} take event from queue failed:", (Object)this.publisherName, (Object)e);
                Thread.currentThread().interrupt();
            }
        }
    }

    private void handleEvent(Event event) {
        Class<?> eventType = event.getClass();
        Set<Subscriber<? extends Event>> subscribers = this.subscribes.get(eventType);
        if (null == subscribers) {
            if (Loggers.EVT_LOG.isDebugEnabled()) {
                Loggers.EVT_LOG.debug("[NotifyCenter] No subscribers for slow event {}", (Object)eventType.getName());
            }
            return;
        }
        for (Subscriber<? extends Event> subscriber : subscribers) {
            this.notifySubscriber(subscriber, event);
        }
    }

    void checkIsStart() {
        if (!this.initialized) {
            throw new IllegalStateException("Publisher does not start");
        }
    }

    public String getStatus() {
        return String.format("Publisher %-30s: shutdown=%5s, queue=%7d/%-7d", this.publisherName, this.shutdown, this.currentEventSize(), this.queueMaxSize);
    }
}

