package com.iohao.game.bolt.broker.cluster;

import com.iohao.game.bolt.broker.core.message.BrokerClusterMessage;
import com.iohao.game.bolt.broker.core.message.BrokerMessage;
import io.scalecube.cluster.Cluster;
import io.scalecube.cluster.ClusterMessageHandler;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.net.Address;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Generated;
import org.jctools.maps.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Sinks;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/iohao/game/bolt/broker/cluster/BrokerClusterMessageHandler.class */
public final class BrokerClusterMessageHandler implements ClusterMessageHandler, Function<Cluster, ClusterMessageHandler> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger("ClusterTopic");
    final String name;
    final Broker localBroker;
    final ClusterMessageListener clusterMessageListener;
    Cluster cluster;
    Map<String, Broker> brokers = new NonBlockingHashMap();
    Sinks.Many<Collection<Broker>> brokersEmitterProcessor = Sinks.many().multicast().onBackpressureBuffer();

    public BrokerClusterMessageHandler(String str, Broker broker, ClusterMessageListener clusterMessageListener) {
        this.name = str;
        this.localBroker = broker;
        this.clusterMessageListener = clusterMessageListener;
    }

    @Override // java.util.function.Function
    public ClusterMessageHandler apply(Cluster cluster) {
        this.cluster = cluster;
        return this;
    }

    public void onMessage(Message message) {
        log.info("\n{}", this.name + " received: " + String.valueOf(message.data()));
    }

    public void onGossip(Message message) {
        log.info("\n{}", this.name + " received: " + String.valueOf(message.data()));
    }

    public void onMembershipEvent(MembershipEvent membershipEvent) {
        NonBlockingHashMap nonBlockingHashMap = new NonBlockingHashMap();
        this.cluster.members().forEach(member -> {
            this.cluster.metadata(member).ifPresent(brokerClusterMetadata -> {
                Broker localBroker = brokerClusterMetadata.getLocalBroker();
                Address address = member.address();
                Broker brokerAddress = new Broker(address.host()).setClusterAddress(address.toString()).setId(localBroker.getId()).setPort(localBroker.getPort()).setBrokerAddress(localBroker.getBrokerAddress());
                nonBlockingHashMap.put(brokerAddress.getClusterAddress(), brokerAddress);
            });
        });
        this.brokers = nonBlockingHashMap;
        this.brokersEmitterProcessor.tryEmitNext(this.brokers.values());
        inform();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BrokerClusterMessage getBrokerClusterMessage() {
        List list = (List) this.brokers.values().stream().map(broker -> {
            BrokerMessage brokerMessage = new BrokerMessage();
            brokerMessage.setAddress(broker.getBrokerAddress());
            brokerMessage.setId(broker.getId());
            return brokerMessage;
        }).collect(Collectors.toList());
        BrokerClusterMessage brokerClusterMessage = new BrokerClusterMessage();
        brokerClusterMessage.setBrokerMessageList(list);
        brokerClusterMessage.setName(this.localBroker.getBrokerAddress());
        return brokerClusterMessage;
    }

    private void inform() {
        if (Objects.isNull(this.clusterMessageListener)) {
            return;
        }
        this.clusterMessageListener.inform(getBrokerClusterMessage());
    }

    private void print(MembershipEvent membershipEvent) {
        int size = this.cluster.members().size();
        String str = (String) this.cluster.members().stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining("\n"));
        log.info("\n{} {}", this.name + " received: " + membershipEvent.member().alias(), membershipEvent);
        log.info("size : {} {} \n{}", new Object[]{Integer.valueOf(size), membershipEvent.type(), str});
    }
}
