/*
 * Decompiled with CFR 0.152.
 */
package io.seata.core.rpc.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.seata.common.util.NetUtil;
import io.seata.core.protocol.HeartbeatMessage;
import io.seata.core.protocol.RpcMessage;
import io.seata.core.rpc.RemotingServer;
import io.seata.core.rpc.RpcContext;
import io.seata.core.rpc.netty.AbstractNettyRemoting;
import io.seata.core.rpc.netty.ChannelManager;
import io.seata.core.rpc.netty.NettyServerBootstrap;
import io.seata.core.rpc.netty.NettyServerConfig;
import io.seata.core.rpc.processor.Pair;
import io.seata.core.rpc.processor.RemotingProcessor;
import java.net.SocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractNettyRemotingServer
extends AbstractNettyRemoting
implements RemotingServer {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNettyRemotingServer.class);
    private final NettyServerBootstrap serverBootstrap;

    @Override
    public void init() {
        super.init();
        this.serverBootstrap.start();
    }

    public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) {
        super(messageExecutor);
        this.serverBootstrap = new NettyServerBootstrap(nettyServerConfig);
        this.serverBootstrap.setChannelHandlers(new ChannelHandler[]{new ServerHandler()});
    }

    @Override
    public Object sendSyncRequest(String resourceId, String clientId, Object msg, boolean tryOtherApp) throws TimeoutException {
        Channel channel = ChannelManager.getChannel(resourceId, clientId, tryOtherApp);
        if (channel == null) {
            throw new RuntimeException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId);
        }
        RpcMessage rpcMessage = this.buildRequestMessage(msg, (byte)0);
        return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());
    }

    @Override
    public Object sendSyncRequest(Channel channel, Object msg) throws TimeoutException {
        if (channel == null) {
            throw new RuntimeException("client is not connected");
        }
        RpcMessage rpcMessage = this.buildRequestMessage(msg, (byte)0);
        return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());
    }

    @Override
    public void sendAsyncRequest(Channel channel, Object msg) {
        if (channel == null) {
            throw new RuntimeException("client is not connected");
        }
        RpcMessage rpcMessage = this.buildRequestMessage(msg, (byte)2);
        super.sendAsync(channel, rpcMessage);
    }

    @Override
    public void sendAsyncResponse(RpcMessage rpcMessage, Channel channel, Object msg) {
        Channel clientChannel = channel;
        if (!(msg instanceof HeartbeatMessage)) {
            clientChannel = ChannelManager.getSameClientChannel(channel);
        }
        if (clientChannel == null) {
            throw new RuntimeException("channel is error.");
        }
        RpcMessage rpcMsg = this.buildResponseMessage(rpcMessage, msg, msg instanceof HeartbeatMessage ? (byte)4 : 1);
        super.sendAsync(clientChannel, rpcMsg);
    }

    @Override
    public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) {
        Pair<RemotingProcessor, ExecutorService> pair = new Pair<RemotingProcessor, ExecutorService>(processor, executor);
        this.processorTable.put(messageType, pair);
    }

    public int getListenPort() {
        return this.serverBootstrap.getListenPort();
    }

    @Override
    public void destroy() {
        this.serverBootstrap.shutdown();
        super.destroy();
    }

    protected void debugLog(String format, Object ... arguments) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(format, arguments);
        }
    }

    private void closeChannelHandlerContext(ChannelHandlerContext ctx) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());
        }
        ctx.disconnect();
        ctx.close();
    }

    @ChannelHandler.Sharable
    class ServerHandler
    extends ChannelDuplexHandler {
        ServerHandler() {
        }

        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (!(msg instanceof RpcMessage)) {
                return;
            }
            AbstractNettyRemotingServer.this.processMessage(ctx, (RpcMessage)msg);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void channelWritabilityChanged(ChannelHandlerContext ctx) {
            Object object = AbstractNettyRemotingServer.this.lock;
            synchronized (object) {
                if (ctx.channel().isWritable()) {
                    AbstractNettyRemotingServer.this.lock.notifyAll();
                }
            }
            ctx.fireChannelWritabilityChanged();
        }

        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            AbstractNettyRemotingServer.this.debugLog("inactive:{}", ctx);
            if (AbstractNettyRemotingServer.this.messageExecutor.isShutdown()) {
                return;
            }
            this.handleDisconnect(ctx);
            super.channelInactive(ctx);
        }

        private void handleDisconnect(ChannelHandlerContext ctx) {
            String ipAndPort = NetUtil.toStringAddress((SocketAddress)ctx.channel().remoteAddress());
            RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info(ipAndPort + " to server channel inactive.");
            }
            if (rpcContext != null && rpcContext.getClientRole() != null) {
                rpcContext.release();
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("remove channel:" + ctx.channel() + "context:" + rpcContext);
                }
            } else if (LOGGER.isInfoEnabled()) {
                LOGGER.info("remove unused channel:" + ctx.channel());
            }
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            try {
                if (cause instanceof DecoderException && null == ChannelManager.getContextFromIdentified(ctx.channel())) {
                    return;
                }
                LOGGER.error("exceptionCaught:{}, channel:{}", (Object)cause.getMessage(), (Object)ctx.channel());
                super.exceptionCaught(ctx, cause);
            }
            finally {
                ChannelManager.releaseRpcContext(ctx.channel());
            }
        }

        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
            if (evt instanceof IdleStateEvent) {
                AbstractNettyRemotingServer.this.debugLog("idle:{}", evt);
                IdleStateEvent idleStateEvent = (IdleStateEvent)evt;
                if (idleStateEvent.state() == IdleState.READER_IDLE) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("channel:" + ctx.channel() + " read idle.");
                    }
                    this.handleDisconnect(ctx);
                    try {
                        AbstractNettyRemotingServer.this.closeChannelHandlerContext(ctx);
                    }
                    catch (Exception e) {
                        LOGGER.error(e.getMessage());
                    }
                }
            }
        }

        public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info(ctx + " will closed");
            }
            super.close(ctx, future);
        }
    }
}

