/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.core.eventbus.impl.clustered;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.EventBusOptions;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.impl.CodecManager;
import io.vertx.core.eventbus.impl.EventBusImpl;
import io.vertx.core.eventbus.impl.HandlerHolder;
import io.vertx.core.eventbus.impl.MessageImpl;
import io.vertx.core.eventbus.impl.clustered.ClusterNodeInfo;
import io.vertx.core.eventbus.impl.clustered.ClusteredMessage;
import io.vertx.core.eventbus.impl.clustered.ConnectionHolder;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.impl.HAManager;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.JksOptions;
import io.vertx.core.net.KeyCertOptions;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.PemKeyCertOptions;
import io.vertx.core.net.PemTrustOptions;
import io.vertx.core.net.PfxOptions;
import io.vertx.core.net.TCPSSLOptions;
import io.vertx.core.net.TrustOptions;
import io.vertx.core.net.impl.ServerID;
import io.vertx.core.parsetools.RecordParser;
import io.vertx.core.spi.cluster.AsyncMultiMap;
import io.vertx.core.spi.cluster.ChoosableIterable;
import io.vertx.core.spi.cluster.ClusterManager;
import java.io.Serializable;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate;

public class ClusteredEventBus
extends EventBusImpl {
    private static final Logger log = LoggerFactory.getLogger(ClusteredEventBus.class);
    public static final String CLUSTER_PUBLIC_HOST_PROP_NAME = "vertx.cluster.public.host";
    public static final String CLUSTER_PUBLIC_PORT_PROP_NAME = "vertx.cluster.public.port";
    private static final Buffer PONG = Buffer.buffer(new byte[]{1});
    private static final String SERVER_ID_HA_KEY = "server_id";
    private static final String SUBS_MAP_NAME = "__vertx.subs";
    private final ClusterManager clusterManager;
    private final HAManager haManager;
    private final ConcurrentMap<ServerID, ConnectionHolder> connections = new ConcurrentHashMap<ServerID, ConnectionHolder>();
    private final Context sendNoContext;
    private EventBusOptions options;
    private AsyncMultiMap<String, ClusterNodeInfo> subs;
    private Set<String> ownSubs = new ConcurrentHashSet<String>();
    private ServerID serverID;
    private ClusterNodeInfo nodeInfo;
    private NetServer server;

    public ClusteredEventBus(VertxInternal vertx, VertxOptions options, ClusterManager clusterManager, HAManager haManager) {
        super(vertx);
        this.options = options.getEventBusOptions();
        this.clusterManager = clusterManager;
        this.haManager = haManager;
        this.sendNoContext = vertx.getOrCreateContext();
        this.setClusterViewChangedHandler(haManager);
    }

    private NetServerOptions getServerOptions() {
        NetServerOptions serverOptions = new NetServerOptions(this.options.toJson());
        ClusteredEventBus.setCertOptions(serverOptions, this.options.getKeyCertOptions());
        ClusteredEventBus.setTrustOptions(serverOptions, this.options.getTrustOptions());
        return serverOptions;
    }

    static void setCertOptions(TCPSSLOptions options, KeyCertOptions keyCertOptions) {
        if (keyCertOptions == null) {
            return;
        }
        if (keyCertOptions instanceof JksOptions) {
            options.setKeyStoreOptions((JksOptions)keyCertOptions);
        } else if (keyCertOptions instanceof PfxOptions) {
            options.setPfxKeyCertOptions((PfxOptions)keyCertOptions);
        } else {
            options.setPemKeyCertOptions((PemKeyCertOptions)keyCertOptions);
        }
    }

    static void setTrustOptions(TCPSSLOptions sslOptions, TrustOptions options) {
        if (options == null) {
            return;
        }
        if (options instanceof JksOptions) {
            sslOptions.setTrustStoreOptions((JksOptions)options);
        } else if (options instanceof PfxOptions) {
            sslOptions.setPfxTrustOptions((PfxOptions)options);
        } else {
            sslOptions.setPemTrustOptions((PemTrustOptions)options);
        }
    }

    @Override
    public void start(Handler<AsyncResult<Void>> resultHandler) {
        this.clusterManager.getAsyncMultiMap(SUBS_MAP_NAME, ar2 -> {
            if (ar2.succeeded()) {
                this.subs = (AsyncMultiMap)ar2.result();
                this.server = this.vertx.createNetServer(this.getServerOptions());
                this.server.connectHandler(this.getServerHandler());
                this.server.listen(asyncResult -> {
                    if (asyncResult.succeeded()) {
                        int serverPort = this.getClusterPublicPort(this.options, this.server.actualPort());
                        String serverHost = this.getClusterPublicHost(this.options);
                        this.serverID = new ServerID(serverPort, serverHost);
                        this.nodeInfo = new ClusterNodeInfo(this.clusterManager.getNodeID(), this.serverID);
                        this.haManager.addDataToAHAInfo(SERVER_ID_HA_KEY, new JsonObject().put("host", this.serverID.host).put("port", this.serverID.port));
                        if (resultHandler != null) {
                            this.started = true;
                            resultHandler.handle(Future.succeededFuture());
                        }
                    } else if (resultHandler != null) {
                        resultHandler.handle(Future.failedFuture(asyncResult.cause()));
                    } else {
                        log.error(asyncResult.cause());
                    }
                });
            } else if (resultHandler != null) {
                resultHandler.handle(Future.failedFuture(ar2.cause()));
            } else {
                log.error(ar2.cause());
            }
        });
    }

    @Override
    public void close(Handler<AsyncResult<Void>> completionHandler) {
        super.close(ar1 -> {
            if (this.server != null) {
                this.server.close(ar -> {
                    if (ar.failed()) {
                        log.error((Object)"Failed to close server", ar.cause());
                    }
                    for (ConnectionHolder holder : this.connections.values()) {
                        holder.close();
                    }
                    if (completionHandler != null) {
                        completionHandler.handle((AsyncResult<Void>)ar);
                    }
                });
            } else if (completionHandler != null) {
                completionHandler.handle((AsyncResult<Void>)ar1);
            }
        });
    }

    @Override
    protected MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName) {
        Objects.requireNonNull(address, "no null address accepted");
        MessageCodec codec = this.codecManager.lookupCodec(body, codecName);
        ClusteredMessage msg = new ClusteredMessage(this.serverID, address, null, headers, body, codec, send, this);
        return msg;
    }

    @Override
    protected <T> void addRegistration(boolean newAddress, String address, boolean replyHandler, boolean localOnly, Handler<AsyncResult<Void>> completionHandler) {
        if (newAddress && this.subs != null && !replyHandler && !localOnly) {
            this.subs.add(address, this.nodeInfo, completionHandler);
            this.ownSubs.add(address);
        } else {
            completionHandler.handle(Future.succeededFuture());
        }
    }

    @Override
    protected <T> void removeRegistration(HandlerHolder lastHolder, String address, Handler<AsyncResult<Void>> completionHandler) {
        if (lastHolder != null && this.subs != null && !lastHolder.isLocalOnly()) {
            this.ownSubs.remove(address);
            this.removeSub(address, this.nodeInfo, completionHandler);
        } else {
            this.callCompletionHandlerAsync(completionHandler);
        }
    }

    @Override
    protected <T> void sendReply(EventBusImpl.SendContextImpl<T> sendContext, MessageImpl replierMessage) {
        this.clusteredSendReply(((ClusteredMessage)replierMessage).getSender(), sendContext);
    }

    @Override
    protected <T> void sendOrPub(EventBusImpl.SendContextImpl<T> sendContext) {
        String address = sendContext.message.address();
        Handler resultHandler = asyncResult -> {
            if (asyncResult.succeeded()) {
                ChoosableIterable serverIDs = (ChoosableIterable)asyncResult.result();
                if (serverIDs != null && !serverIDs.isEmpty()) {
                    this.sendToSubs(serverIDs, sendContext);
                } else {
                    if (this.metrics != null) {
                        this.metrics.messageSent(address, !sendContext.message.isSend(), true, false);
                    }
                    this.deliverMessageLocally(sendContext);
                }
            } else {
                log.error((Object)"Failed to send message", asyncResult.cause());
            }
        };
        if (Vertx.currentContext() == null) {
            this.sendNoContext.runOnContext(v -> this.subs.get(address, resultHandler));
        } else {
            this.subs.get(address, resultHandler);
        }
    }

    @Override
    protected String generateReplyAddress() {
        return UUID.randomUUID().toString();
    }

    @Override
    protected boolean isMessageLocal(MessageImpl msg) {
        ClusteredMessage clusteredMessage = (ClusteredMessage)msg;
        return !clusteredMessage.isFromWire();
    }

    private void setClusterViewChangedHandler(HAManager haManager) {
        haManager.setClusterViewChangedHandler((Set<String> members) -> {
            this.ownSubs.forEach(address -> this.subs.add((String)address, this.nodeInfo, addResult -> {
                if (addResult.failed()) {
                    log.warn((Object)"Failed to update subs map with self", addResult.cause());
                }
            }));
            this.subs.removeAllMatching((Predicate<ClusterNodeInfo> & Serializable)ci -> !members.contains(ci.nodeId), removeResult -> {
                if (removeResult.failed()) {
                    log.warn((Object)"Error removing subs", removeResult.cause());
                }
            });
        });
    }

    private int getClusterPublicPort(EventBusOptions options, int actualPort) {
        int publicPort = Integer.getInteger(CLUSTER_PUBLIC_PORT_PROP_NAME, options.getClusterPublicPort());
        if (publicPort == -1) {
            publicPort = actualPort;
        }
        return publicPort;
    }

    private String getClusterPublicHost(EventBusOptions options) {
        String publicHost = System.getProperty(CLUSTER_PUBLIC_HOST_PROP_NAME, options.getClusterPublicHost());
        if (publicHost == null) {
            publicHost = options.getHost();
        }
        return publicHost;
    }

    private Handler<NetSocket> getServerHandler() {
        return socket -> {
            final RecordParser parser = RecordParser.newFixed(4, null);
            Handler<Buffer> handler = new Handler<Buffer>(){
                int size = -1;

                @Override
                public void handle(Buffer buff) {
                    if (this.size == -1) {
                        this.size = buff.getInt(0);
                        parser.fixedSizeMode(this.size);
                    } else {
                        ClusteredMessage received = new ClusteredMessage();
                        received.readFromWire(buff, ClusteredEventBus.this.codecManager);
                        if (ClusteredEventBus.this.metrics != null) {
                            ClusteredEventBus.this.metrics.messageRead(received.address(), buff.length());
                        }
                        parser.fixedSizeMode(4);
                        this.size = -1;
                        if (received.codec() == CodecManager.PING_MESSAGE_CODEC) {
                            socket.write(PONG);
                        } else {
                            ClusteredEventBus.this.deliverMessageLocally(received);
                        }
                    }
                }
            };
            parser.setOutput(handler);
            socket.handler((Handler)parser);
        };
    }

    private <T> void sendToSubs(ChoosableIterable<ClusterNodeInfo> subs, EventBusImpl.SendContextImpl<T> sendContext) {
        String address = sendContext.message.address();
        if (sendContext.message.isSend()) {
            ServerID sid;
            ClusterNodeInfo ci = subs.choose();
            ServerID serverID = sid = ci == null ? null : ci.serverID;
            if (sid != null && !sid.equals(this.serverID)) {
                if (this.metrics != null) {
                    this.metrics.messageSent(address, false, false, true);
                }
                this.sendRemote(sid, sendContext.message);
            } else {
                if (this.metrics != null) {
                    this.metrics.messageSent(address, false, true, false);
                }
                this.deliverMessageLocally(sendContext);
            }
        } else {
            boolean local = false;
            boolean remote = false;
            for (ClusterNodeInfo ci : subs) {
                if (!ci.serverID.equals(this.serverID)) {
                    remote = true;
                    this.sendRemote(ci.serverID, sendContext.message);
                    continue;
                }
                local = true;
            }
            if (this.metrics != null) {
                this.metrics.messageSent(address, true, local, remote);
            }
            if (local) {
                this.deliverMessageLocally(sendContext);
            }
        }
    }

    private <T> void clusteredSendReply(ServerID replyDest, EventBusImpl.SendContextImpl<T> sendContext) {
        MessageImpl message = sendContext.message;
        String address = message.address();
        if (!replyDest.equals(this.serverID)) {
            if (this.metrics != null) {
                this.metrics.messageSent(address, false, false, true);
            }
            this.sendRemote(replyDest, message);
        } else {
            if (this.metrics != null) {
                this.metrics.messageSent(address, false, true, false);
            }
            this.deliverMessageLocally(sendContext);
        }
    }

    private void sendRemote(ServerID theServerID, MessageImpl message) {
        ConnectionHolder holder = (ConnectionHolder)this.connections.get(theServerID);
        if (holder == null) {
            holder = new ConnectionHolder(this, theServerID, this.options);
            ConnectionHolder prevHolder = this.connections.putIfAbsent(theServerID, holder);
            if (prevHolder != null) {
                holder = prevHolder;
            } else {
                holder.connect();
            }
        }
        holder.writeMessage((ClusteredMessage)message);
    }

    private void removeSub(String subName, ClusterNodeInfo node, Handler<AsyncResult<Void>> completionHandler) {
        this.subs.remove(subName, node, ar -> {
            if (!ar.succeeded()) {
                log.error((Object)"Failed to remove sub", ar.cause());
            } else if (((Boolean)ar.result()).booleanValue()) {
                if (completionHandler != null) {
                    completionHandler.handle(Future.succeededFuture());
                }
            } else if (completionHandler != null) {
                completionHandler.handle(Future.failedFuture("sub not found"));
            }
        });
    }

    ConcurrentMap<ServerID, ConnectionHolder> connections() {
        return this.connections;
    }

    VertxInternal vertx() {
        return this.vertx;
    }

    EventBusOptions options() {
        return this.options;
    }
}

