/*
 * Decompiled with CFR 0.152.
 */
package com.jxdinfo.hussar.logic.engine.service.impl;

import com.jxdinfo.hussar.logic.engine.service.LogicEngineMessageService;
import java.time.Instant;
import java.util.Deque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import javax.annotation.Resource;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

public class LogicEngineMessageServiceRedisImpl
implements LogicEngineMessageService.RedisExt,
InitializingBean {
    private static final Logger logger = LoggerFactory.getLogger(LogicEngineMessageServiceRedisImpl.class);
    private static final String KEY_CLIENT_SERIAL = "hussar-logic:engine:client-serial";
    private static final String KEY_MESSAGE_CHANNEL = "hussar-logic:engine:message-channel";
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    private volatile boolean initialized = false;
    private volatile long identity = 0L;
    private final ConcurrentMap<String, Deque<Consumer<Object>>> listeners = new ConcurrentHashMap<String, Deque<Consumer<Object>>>();

    public void afterPropertiesSet() {
        this.setupClientIdentity();
        this.initialized = true;
    }

    private void setupClientIdentity() {
        long serial;
        long timestamp = Instant.now().toEpochMilli();
        long random = ThreadLocalRandom.current().nextLong(16L);
        try {
            Long serialValue = this.stringRedisTemplate.opsForValue().increment((Object)KEY_CLIENT_SERIAL);
            serial = serialValue != null ? serialValue : 0L;
        }
        catch (Exception ex) {
            logger.error("logic engine failed to retrieve broadcast client serial from redis", (Throwable)ex);
            serial = 0L;
        }
        this.identity = (timestamp & 0x7FFFFFFFFFFL) << 20 | (serial & 0xFFFFL) << 4 | random & 0xFL;
    }

    @Override
    public Pair<Topic, MessageListener> getRedisListenerDefinition() {
        MessageListenerAdapter messageListener = new MessageListenerAdapter((Object)this, "dispatch");
        messageListener.setSerializer(this.stringRedisTemplate.getValueSerializer());
        messageListener.setStringSerializer(this.stringRedisTemplate.getStringSerializer());
        messageListener.afterPropertiesSet();
        return Pair.of((Object)ChannelTopic.of((String)KEY_MESSAGE_CHANNEL), (Object)messageListener);
    }

    public void dispatch(BroadcastMessage message) {
        if (message == null || message.getFrom() == null || message.getEvent() == null) {
            logger.warn("dispatch(): illegal message {}", (Object)message);
            return;
        }
        if (message.getFrom() == this.identity) {
            logger.debug("dispatch(): discard message {} from self ({})", (Object)message, (Object)this.identity);
            return;
        }
        logger.debug("dispatch(): accept message {}", (Object)message);
        for (Consumer<Object> listener : this.getListeners(message.getEvent())) {
            try {
                listener.accept(message.getData());
            }
            catch (Exception ex) {
                logger.error("one of logic engine listener failed to handle broadcast message", (Throwable)ex);
            }
        }
    }

    @Override
    public void broadcast(String event, Object data) {
        logger.debug("broadcast({}, {})", (Object)event, data);
        if (!this.initialized) {
            logger.error("logic engine broadcast client not initialized");
            return;
        }
        BroadcastMessage message = BroadcastMessage.of(this.identity, event, data);
        try {
            this.stringRedisTemplate.convertAndSend(KEY_MESSAGE_CHANNEL, (Object)message);
        }
        catch (Exception ex) {
            logger.error("logic engine broadcast failed", (Throwable)ex);
        }
    }

    @Override
    public void listen(String event, Consumer<Object> listener) {
        this.getListeners(event).addLast(listener);
    }

    private Deque<Consumer<Object>> getListeners(String event) {
        return this.listeners.computeIfAbsent(event, key -> new ConcurrentLinkedDeque());
    }

    public static class BroadcastMessage {
        private Long from;
        private String event;
        private Object data;

        public static BroadcastMessage of(long from, String event, Object data) {
            BroadcastMessage message = new BroadcastMessage();
            message.setFrom(from);
            message.setEvent(event);
            message.setData(data);
            return message;
        }

        public Long getFrom() {
            return this.from;
        }

        public void setFrom(Long from) {
            this.from = from;
        }

        public String getEvent() {
            return this.event;
        }

        public void setEvent(String event) {
            this.event = event;
        }

        public Object getData() {
            return this.data;
        }

        public void setData(Object data) {
            this.data = data;
        }

        public String toString() {
            return "BroadcastMessage{from=" + this.from + ", event='" + this.event + '\'' + ", data=" + this.data + '}';
        }
    }
}

