/*
 * Decompiled with CFR 0.152.
 */
package io.seata.core.rpc.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.concurrent.EventExecutorGroup;
import io.seata.common.exception.FrameworkErrorCode;
import io.seata.common.exception.FrameworkException;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.NetUtil;
import io.seata.core.protocol.AbstractMessage;
import io.seata.core.protocol.HeartbeatMessage;
import io.seata.core.protocol.MergeResultMessage;
import io.seata.core.protocol.MergedWarpMessage;
import io.seata.core.protocol.MessageFuture;
import io.seata.core.protocol.ResultCode;
import io.seata.core.protocol.RpcMessage;
import io.seata.core.protocol.transaction.GlobalBeginResponse;
import io.seata.core.rpc.ClientMessageListener;
import io.seata.core.rpc.ClientMessageSender;
import io.seata.core.rpc.netty.AbstractRpcRemoting;
import io.seata.core.rpc.netty.NettyClientChannelManager;
import io.seata.core.rpc.netty.NettyClientConfig;
import io.seata.core.rpc.netty.NettyPoolKey;
import io.seata.core.rpc.netty.NettyPoolableFactory;
import io.seata.core.rpc.netty.RegisterMsgListener;
import io.seata.core.rpc.netty.RpcClientBootstrap;
import io.seata.discovery.loadbalance.LoadBalanceFactory;
import io.seata.discovery.registry.RegistryFactory;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractRpcRemotingClient
extends AbstractRpcRemoting
implements RegisterMsgListener,
ClientMessageSender {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRpcRemotingClient.class);
    private static final String MSG_ID_PREFIX = "msgId:";
    private static final String FUTURES_PREFIX = "futures:";
    private static final String SINGLE_LOG_POSTFIX = ";";
    private static final int MAX_MERGE_SEND_MILLS = 1;
    private static final String THREAD_PREFIX_SPLIT_CHAR = "_";
    private static final int MAX_MERGE_SEND_THREAD = 1;
    private static final long KEEP_ALIVE_TIME = Integer.MAX_VALUE;
    private static final int SCHEDULE_INTERVAL_MILLS = 5;
    private static final String MERGE_THREAD_PREFIX = "rpcMergeMessageSend";
    private final RpcClientBootstrap clientBootstrap;
    private NettyClientChannelManager clientChannelManager;
    private ClientMessageListener clientMessageListener;
    private final NettyPoolKey.TransactionRole transactionRole;
    private ExecutorService mergeSendExecutorService;

    public AbstractRpcRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup, ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {
        super(messageExecutor);
        this.transactionRole = transactionRole;
        this.clientBootstrap = new RpcClientBootstrap(nettyClientConfig, eventExecutorGroup, (ChannelHandler)this, transactionRole);
        this.clientChannelManager = new NettyClientChannelManager(new NettyPoolableFactory(this, this.clientBootstrap), this.getPoolKeyFunction(), nettyClientConfig);
    }

    public NettyClientChannelManager getClientChannelManager() {
        return this.clientChannelManager;
    }

    protected abstract Function<String, NettyPoolKey> getPoolKeyFunction();

    protected abstract String getTransactionServiceGroup();

    @Override
    public void init() {
        this.clientBootstrap.start();
        this.timerExecutor.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                AbstractRpcRemotingClient.this.clientChannelManager.reconnect(AbstractRpcRemotingClient.this.getTransactionServiceGroup());
            }
        }, 5L, 5L, TimeUnit.SECONDS);
        this.mergeSendExecutorService = new ThreadPoolExecutor(1, 1, Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), (ThreadFactory)new NamedThreadFactory(this.getThreadPrefix(), 1));
        this.mergeSendExecutorService.submit(new MergedSendRunnable());
        super.init();
    }

    @Override
    public void destroy() {
        this.clientBootstrap.shutdown();
        this.mergeSendExecutorService.shutdown();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (!(msg instanceof RpcMessage)) {
            return;
        }
        RpcMessage rpcMessage = (RpcMessage)msg;
        if (rpcMessage.getBody() == HeartbeatMessage.PONG) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("received PONG from {}", (Object)ctx.channel().remoteAddress());
            }
            return;
        }
        if (rpcMessage.getBody() instanceof MergeResultMessage) {
            MergeResultMessage results = (MergeResultMessage)rpcMessage.getBody();
            MergedWarpMessage mergeMessage = (MergedWarpMessage)this.mergeMsgMap.remove(rpcMessage.getId());
            for (int i = 0; i < mergeMessage.msgs.size(); ++i) {
                int msgId = mergeMessage.msgIds.get(i);
                MessageFuture future = (MessageFuture)this.futures.remove(msgId);
                if (future == null) {
                    if (!LOGGER.isInfoEnabled()) continue;
                    LOGGER.info("msg: {} is not found in futures.", (Object)msgId);
                    continue;
                }
                future.setResultMessage(results.getMsgs()[i]);
            }
            return;
        }
        super.channelRead(ctx, msg);
    }

    @Override
    public void dispatch(RpcMessage request, ChannelHandlerContext ctx) {
        if (this.clientMessageListener != null) {
            String remoteAddress = NetUtil.toStringAddress((SocketAddress)ctx.channel().remoteAddress());
            this.clientMessageListener.onMessage(request, remoteAddress, this);
        }
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (this.messageExecutor.isShutdown()) {
            return;
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("channel inactive: {}", (Object)ctx.channel());
        }
        this.clientChannelManager.releaseChannel(ctx.channel(), NetUtil.toStringAddress((SocketAddress)ctx.channel().remoteAddress()));
        super.channelInactive(ctx);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent)evt;
            if (idleStateEvent.state() == IdleState.READER_IDLE) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("channel" + ctx.channel() + " read idle.");
                }
                try {
                    String serverAddress = NetUtil.toStringAddress((SocketAddress)ctx.channel().remoteAddress());
                    this.clientChannelManager.invalidateObject(serverAddress, ctx.channel());
                }
                catch (Exception exx) {
                    LOGGER.error(exx.getMessage());
                }
                finally {
                    this.clientChannelManager.releaseChannel(ctx.channel(), this.getAddressFromContext(ctx));
                }
            }
            if (idleStateEvent == IdleStateEvent.WRITER_IDLE_STATE_EVENT) {
                try {
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("will send ping msg,channel" + ctx.channel());
                    }
                    this.sendRequest(ctx.channel(), HeartbeatMessage.PING);
                }
                catch (Throwable throwable) {
                    LOGGER.error("", (Object)"send request error", (Object)throwable);
                }
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        LOGGER.error(FrameworkErrorCode.ExceptionCaught.getErrCode(), (Object)(NetUtil.toStringAddress((SocketAddress)ctx.channel().remoteAddress()) + "connect exception. " + cause.getMessage()), (Object)cause);
        this.clientChannelManager.releaseChannel(ctx.channel(), this.getAddressFromChannel(ctx.channel()));
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("remove exception rm channel:" + ctx.channel());
        }
        super.exceptionCaught(ctx, cause);
    }

    @Override
    public Object sendMsgWithResponse(Object msg, long timeout) throws TimeoutException {
        Channel channel;
        String validAddress = this.loadBalance(this.getTransactionServiceGroup());
        Object result = super.sendAsyncRequestWithResponse(validAddress, channel = this.clientChannelManager.acquireChannel(validAddress), msg, timeout);
        if (result instanceof GlobalBeginResponse && ((GlobalBeginResponse)result).getResultCode() == ResultCode.Failed) {
            LOGGER.error("begin response error,release channel:" + channel);
            this.clientChannelManager.releaseChannel(channel, validAddress);
        }
        return result;
    }

    @Override
    public Object sendMsgWithResponse(Object msg) throws TimeoutException {
        return this.sendMsgWithResponse(msg, NettyClientConfig.getRpcRequestTimeout());
    }

    @Override
    public Object sendMsgWithResponse(String serverAddress, Object msg, long timeout) throws TimeoutException {
        return this.sendAsyncRequestWithResponse(serverAddress, this.clientChannelManager.acquireChannel(serverAddress), msg, timeout);
    }

    @Override
    public void sendResponse(RpcMessage request, String serverAddress, Object msg) {
        super.sendResponse(request, this.clientChannelManager.acquireChannel(serverAddress), msg);
    }

    public ClientMessageListener getClientMessageListener() {
        return this.clientMessageListener;
    }

    public void setClientMessageListener(ClientMessageListener clientMessageListener) {
        this.clientMessageListener = clientMessageListener;
    }

    @Override
    public void destroyChannel(String serverAddress, Channel channel) {
        this.clientChannelManager.destroyChannel(serverAddress, channel);
    }

    private String loadBalance(String transactionServiceGroup) {
        InetSocketAddress address = null;
        try {
            List inetSocketAddressList = RegistryFactory.getInstance().lookup(transactionServiceGroup);
            address = (InetSocketAddress)LoadBalanceFactory.getInstance().select(inetSocketAddressList);
        }
        catch (Exception ex) {
            LOGGER.error(ex.getMessage());
        }
        if (address == null) {
            throw new FrameworkException(FrameworkErrorCode.NoAvailableService);
        }
        return NetUtil.toStringAddress((InetSocketAddress)address);
    }

    private String getThreadPrefix() {
        return "rpcMergeMessageSend_" + this.transactionRole.name();
    }

    private class MergedSendRunnable
    implements Runnable {
        private MergedSendRunnable() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (true) {
                Iterator iterator = AbstractRpcRemotingClient.this.mergeLock;
                synchronized (iterator) {
                    try {
                        AbstractRpcRemotingClient.this.mergeLock.wait(1L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                }
                AbstractRpcRemotingClient.this.isSending = true;
                for (String address : AbstractRpcRemotingClient.this.basketMap.keySet()) {
                    BlockingQueue basket = (BlockingQueue)AbstractRpcRemotingClient.this.basketMap.get(address);
                    if (basket.isEmpty()) continue;
                    MergedWarpMessage mergeMessage = new MergedWarpMessage();
                    while (!basket.isEmpty()) {
                        RpcMessage msg = (RpcMessage)basket.poll();
                        mergeMessage.msgs.add((AbstractMessage)msg.getBody());
                        mergeMessage.msgIds.add(msg.getId());
                    }
                    if (mergeMessage.msgIds.size() > 1) {
                        this.printMergeMessageLog(mergeMessage);
                    }
                    Channel sendChannel = null;
                    try {
                        sendChannel = AbstractRpcRemotingClient.this.clientChannelManager.acquireChannel(address);
                        AbstractRpcRemotingClient.this.sendRequest(sendChannel, mergeMessage);
                    }
                    catch (FrameworkException e) {
                        if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
                            AbstractRpcRemotingClient.this.destroyChannel(address, sendChannel);
                        }
                        for (Integer msgId : mergeMessage.msgIds) {
                            MessageFuture messageFuture = (MessageFuture)AbstractRpcRemotingClient.this.futures.remove(msgId);
                            if (messageFuture == null) continue;
                            messageFuture.setResultMessage(null);
                        }
                        LOGGER.error("", (Object)"client merge call failed", (Object)e);
                    }
                }
                AbstractRpcRemotingClient.this.isSending = false;
            }
        }

        private void printMergeMessageLog(MergedWarpMessage mergeMessage) {
            if (LOGGER.isDebugEnabled()) {
                long l;
                LOGGER.debug("merge msg size:" + mergeMessage.msgIds.size());
                for (AbstractMessage cm : mergeMessage.msgs) {
                    LOGGER.debug(cm.toString());
                }
                StringBuilder sb = new StringBuilder();
                Iterator<Integer> iterator = mergeMessage.msgIds.iterator();
                while (iterator.hasNext()) {
                    l = iterator.next().intValue();
                    sb.append(AbstractRpcRemotingClient.MSG_ID_PREFIX).append(l).append(AbstractRpcRemotingClient.SINGLE_LOG_POSTFIX);
                }
                sb.append("\n");
                iterator = ((ConcurrentHashMap.KeySetView)AbstractRpcRemotingClient.this.futures.keySet()).iterator();
                while (iterator.hasNext()) {
                    l = iterator.next().intValue();
                    sb.append(AbstractRpcRemotingClient.FUTURES_PREFIX).append(l).append(AbstractRpcRemotingClient.SINGLE_LOG_POSTFIX);
                }
                LOGGER.debug(sb.toString());
            }
        }
    }
}

