package com.jxdinfo.hussar.logic.engine.service.impl;

import com.jxdinfo.hussar.logic.engine.service.LogicEngineMessageService;
import java.time.Instant;
import java.util.Deque;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import javax.annotation.Resource;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

/* loaded from: input_file:com/jxdinfo/hussar/logic/engine/service/impl/LogicEngineMessageServiceRedisImpl.class */
public class LogicEngineMessageServiceRedisImpl implements LogicEngineMessageService.RedisExt, InitializingBean {
    private static final Logger logger = LoggerFactory.getLogger(LogicEngineMessageServiceRedisImpl.class);
    private static final String KEY_CLIENT_SERIAL = "hussar-logic:engine:client-serial";
    private static final String KEY_MESSAGE_CHANNEL = "hussar-logic:engine:message-channel";

    @Resource
    private StringRedisTemplate stringRedisTemplate;
    private volatile boolean initialized = false;
    private volatile long identity = 0;
    private final ConcurrentMap<String, Deque<Consumer<Object>>> listeners = new ConcurrentHashMap();

    /* loaded from: input_file:com/jxdinfo/hussar/logic/engine/service/impl/LogicEngineMessageServiceRedisImpl$BroadcastMessage.class */
    public static class BroadcastMessage {
        private Long from;
        private String event;
        private Object data;

        public static BroadcastMessage of(long j, String str, Object obj) {
            BroadcastMessage broadcastMessage = new BroadcastMessage();
            broadcastMessage.setFrom(Long.valueOf(j));
            broadcastMessage.setEvent(str);
            broadcastMessage.setData(obj);
            return broadcastMessage;
        }

        public Long getFrom() {
            return this.from;
        }

        public void setFrom(Long l) {
            this.from = l;
        }

        public String getEvent() {
            return this.event;
        }

        public void setEvent(String str) {
            this.event = str;
        }

        public Object getData() {
            return this.data;
        }

        public void setData(Object obj) {
            this.data = obj;
        }

        public String toString() {
            return "BroadcastMessage{from=" + this.from + ", event='" + this.event + "', data=" + this.data + '}';
        }
    }

    public void afterPropertiesSet() {
        setupClientIdentity();
        this.initialized = true;
    }

    private void setupClientIdentity() {
        long j;
        long epochMilli = Instant.now().toEpochMilli();
        long nextLong = ThreadLocalRandom.current().nextLong(16L);
        try {
            Long increment = this.stringRedisTemplate.opsForValue().increment(KEY_CLIENT_SERIAL);
            j = increment != null ? increment.longValue() : 0L;
        } catch (Exception e) {
            logger.error("logic engine failed to retrieve broadcast client serial from redis", e);
            j = 0;
        }
        this.identity = ((epochMilli & 8796093022207L) << 20) | ((j & 65535) << 4) | (nextLong & 15);
    }

    @Override // com.jxdinfo.hussar.logic.engine.service.LogicEngineMessageService.RedisExt
    public Pair<Topic, MessageListener> getRedisListenerDefinition() {
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(this, "dispatch");
        messageListenerAdapter.setSerializer(this.stringRedisTemplate.getValueSerializer());
        messageListenerAdapter.setStringSerializer(this.stringRedisTemplate.getStringSerializer());
        messageListenerAdapter.afterPropertiesSet();
        return Pair.of(ChannelTopic.of(KEY_MESSAGE_CHANNEL), messageListenerAdapter);
    }

    public void dispatch(BroadcastMessage broadcastMessage) {
        if (broadcastMessage == null || broadcastMessage.getFrom() == null || broadcastMessage.getEvent() == null) {
            logger.warn("dispatch(): illegal message {}", broadcastMessage);
            return;
        }
        if (broadcastMessage.getFrom().longValue() == this.identity) {
            logger.debug("dispatch(): discard message {} from self ({})", broadcastMessage, Long.valueOf(this.identity));
            return;
        }
        logger.debug("dispatch(): accept message {}", broadcastMessage);
        Iterator<Consumer<Object>> it = getListeners(broadcastMessage.getEvent()).iterator();
        while (it.hasNext()) {
            try {
                it.next().accept(broadcastMessage.getData());
            } catch (Exception e) {
                logger.error("one of logic engine listener failed to handle broadcast message", e);
            }
        }
    }

    @Override // com.jxdinfo.hussar.logic.engine.service.LogicEngineMessageService
    public void broadcast(String str, Object obj) {
        logger.debug("broadcast({}, {})", str, obj);
        if (!this.initialized) {
            logger.error("logic engine broadcast client not initialized");
            return;
        }
        try {
            this.stringRedisTemplate.convertAndSend(KEY_MESSAGE_CHANNEL, BroadcastMessage.of(this.identity, str, obj));
        } catch (Exception e) {
            logger.error("logic engine broadcast failed", e);
        }
    }

    @Override // com.jxdinfo.hussar.logic.engine.service.LogicEngineMessageService
    public void listen(String str, Consumer<Object> consumer) {
        getListeners(str).addLast(consumer);
    }

    private Deque<Consumer<Object>> getListeners(String str) {
        return this.listeners.computeIfAbsent(str, str2 -> {
            return new ConcurrentLinkedDeque();
        });
    }
}
