/*
 * Decompiled with CFR 0.152.
 */
package io.seata.core.rpc.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.GenericFutureListener;
import io.seata.common.exception.FrameworkErrorCode;
import io.seata.common.exception.FrameworkException;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.thread.PositiveAtomicCounter;
import io.seata.core.protocol.HeartbeatMessage;
import io.seata.core.protocol.MergeMessage;
import io.seata.core.protocol.MessageFuture;
import io.seata.core.protocol.ProtocolConstants;
import io.seata.core.protocol.RpcMessage;
import io.seata.core.rpc.Disposable;
import io.seata.core.rpc.netty.NettyClientConfig;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractRpcRemoting
extends ChannelDuplexHandler
implements Disposable {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRpcRemoting.class);
    protected final ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, (ThreadFactory)new NamedThreadFactory("timeoutChecker", 1, true));
    protected final ThreadPoolExecutor messageExecutor;
    protected final PositiveAtomicCounter idGenerator = new PositiveAtomicCounter();
    protected final ConcurrentHashMap<Integer, MessageFuture> futures = new ConcurrentHashMap();
    protected final ConcurrentHashMap<String, BlockingQueue<RpcMessage>> basketMap = new ConcurrentHashMap();
    private static final long NOT_WRITEABLE_CHECK_MILLS = 10L;
    protected final Object mergeLock = new Object();
    protected volatile long nowMills = 0L;
    private static final int TIMEOUT_CHECK_INTERNAL = 3000;
    private final Object lock = new Object();
    protected volatile boolean isSending = false;
    private String group = "DEFAULT";
    protected final Map<Integer, MergeMessage> mergeMsgMap = new ConcurrentHashMap<Integer, MergeMessage>();
    protected ChannelHandler[] channelHandlers;
    boolean allowDumpStack = false;

    public AbstractRpcRemoting(ThreadPoolExecutor messageExecutor) {
        this.messageExecutor = messageExecutor;
    }

    public int getNextMessageId() {
        return this.idGenerator.incrementAndGet();
    }

    public void init() {
        this.timerExecutor.scheduleAtFixedRate(new Runnable(){

            @Override
            public void run() {
                ArrayList<MessageFuture> timeoutMessageFutures = new ArrayList<MessageFuture>(AbstractRpcRemoting.this.futures.size());
                for (MessageFuture future : AbstractRpcRemoting.this.futures.values()) {
                    if (!future.isTimeout()) continue;
                    timeoutMessageFutures.add(future);
                }
                for (MessageFuture messageFuture : timeoutMessageFutures) {
                    AbstractRpcRemoting.this.futures.remove(messageFuture.getRequestMessage().getId());
                    messageFuture.setResultMessage(null);
                    if (!LOGGER.isDebugEnabled()) continue;
                    LOGGER.debug("timeout clear future : " + messageFuture.getRequestMessage().getBody());
                }
                AbstractRpcRemoting.this.nowMills = System.currentTimeMillis();
            }
        }, 3000L, 3000L, TimeUnit.MILLISECONDS);
    }

    @Override
    public void destroy() {
        this.timerExecutor.shutdown();
        this.messageExecutor.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void channelWritabilityChanged(ChannelHandlerContext ctx) {
        Object object = this.lock;
        synchronized (object) {
            if (ctx.channel().isWritable()) {
                this.lock.notifyAll();
            }
        }
        ctx.fireChannelWritabilityChanged();
    }

    protected Object sendAsyncRequestWithResponse(Channel channel, Object msg) throws TimeoutException {
        return this.sendAsyncRequestWithResponse(null, channel, msg, NettyClientConfig.getRpcRequestTimeout());
    }

    protected Object sendAsyncRequestWithResponse(String address, Channel channel, Object msg, long timeout) throws TimeoutException {
        if (timeout <= 0L) {
            throw new FrameworkException("timeout should more than 0ms");
        }
        return this.sendAsyncRequest(address, channel, msg, timeout);
    }

    protected Object sendAsyncRequestWithoutResponse(Channel channel, Object msg) throws TimeoutException {
        return this.sendAsyncRequest(null, channel, msg, 0L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Object sendAsyncRequest(String address, Channel channel, Object msg, long timeout) throws TimeoutException {
        if (channel == null) {
            LOGGER.warn("sendAsyncRequestWithResponse nothing, caused by null channel.");
            return null;
        }
        final RpcMessage rpcMessage = new RpcMessage();
        rpcMessage.setId(this.getNextMessageId());
        rpcMessage.setMessageType((byte)2);
        rpcMessage.setCodec(ProtocolConstants.CONFIGURED_CODEC);
        rpcMessage.setCompressor(ProtocolConstants.CONFIGURED_COMPRESSOR);
        rpcMessage.setBody(msg);
        MessageFuture messageFuture = new MessageFuture();
        messageFuture.setRequestMessage(rpcMessage);
        messageFuture.setTimeout(timeout);
        this.futures.put(rpcMessage.getId(), messageFuture);
        if (address != null) {
            ConcurrentHashMap<String, BlockingQueue<RpcMessage>> map = this.basketMap;
            BlockingQueue<RpcMessage> basket = map.get(address);
            if (basket == null) {
                map.putIfAbsent(address, new LinkedBlockingQueue());
                basket = map.get(address);
            }
            basket.offer(rpcMessage);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("offer message: " + rpcMessage.getBody());
            }
            if (!this.isSending) {
                Object object = this.mergeLock;
                synchronized (object) {
                    this.mergeLock.notifyAll();
                }
            }
        } else {
            this.channelWriteableCheck(channel, msg);
            ChannelFuture future = channel.writeAndFlush((Object)rpcMessage);
            future.addListener((GenericFutureListener)new ChannelFutureListener(){

                public void operationComplete(ChannelFuture future) {
                    if (!future.isSuccess()) {
                        MessageFuture messageFuture = AbstractRpcRemoting.this.futures.remove(rpcMessage.getId());
                        if (messageFuture != null) {
                            messageFuture.setResultMessage(future.cause());
                        }
                        AbstractRpcRemoting.this.destroyChannel(future.channel());
                    }
                }
            });
        }
        if (timeout > 0L) {
            try {
                return messageFuture.get(timeout, TimeUnit.MILLISECONDS);
            }
            catch (Exception exx) {
                LOGGER.error("wait response error:" + exx.getMessage() + ",ip:" + address + ",request:" + msg);
                if (exx instanceof TimeoutException) {
                    throw (TimeoutException)exx;
                }
                throw new RuntimeException(exx);
            }
        }
        return null;
    }

    protected void sendRequest(Channel channel, Object msg) {
        RpcMessage rpcMessage = new RpcMessage();
        rpcMessage.setMessageType(msg instanceof HeartbeatMessage ? (byte)3 : 0);
        rpcMessage.setCodec(ProtocolConstants.CONFIGURED_CODEC);
        rpcMessage.setCompressor(ProtocolConstants.CONFIGURED_COMPRESSOR);
        rpcMessage.setBody(msg);
        rpcMessage.setId(this.getNextMessageId());
        if (msg instanceof MergeMessage) {
            this.mergeMsgMap.put(rpcMessage.getId(), (MergeMessage)msg);
        }
        this.channelWriteableCheck(channel, msg);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?" + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());
        }
        channel.writeAndFlush((Object)rpcMessage);
    }

    protected void sendResponse(RpcMessage request, Channel channel, Object msg) {
        RpcMessage rpcMessage = new RpcMessage();
        rpcMessage.setMessageType(msg instanceof HeartbeatMessage ? (byte)4 : 1);
        rpcMessage.setCodec(request.getCodec());
        rpcMessage.setCompressor(request.getCompressor());
        rpcMessage.setBody(msg);
        rpcMessage.setId(request.getId());
        this.channelWriteableCheck(channel, msg);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("send response:" + rpcMessage.getBody() + ",channel:" + channel);
        }
        channel.writeAndFlush((Object)rpcMessage);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void channelWriteableCheck(Channel channel, Object msg) {
        int tryTimes = 0;
        Object object = this.lock;
        synchronized (object) {
            while (!channel.isWritable()) {
                try {
                    if (++tryTimes > NettyClientConfig.getMaxNotWriteableRetry()) {
                        this.destroyChannel(channel);
                        throw new FrameworkException("msg:" + (msg == null ? "null" : msg.toString()), FrameworkErrorCode.ChannelIsNotWritable);
                    }
                    this.lock.wait(10L);
                }
                catch (InterruptedException exx) {
                    LOGGER.error(exx.getMessage());
                }
            }
        }
    }

    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        block13: {
            if (msg instanceof RpcMessage) {
                final RpcMessage rpcMessage = (RpcMessage)msg;
                if (rpcMessage.getMessageType() == 0 || rpcMessage.getMessageType() == 2) {
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
                    }
                    try {
                        this.messageExecutor.execute(new Runnable(){

                            @Override
                            public void run() {
                                try {
                                    AbstractRpcRemoting.this.dispatch(rpcMessage, ctx);
                                }
                                catch (Throwable th) {
                                    LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), (Object)th.getMessage(), (Object)th);
                                }
                            }
                        });
                    }
                    catch (RejectedExecutionException e) {
                        LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(), (Object)("thread pool is full, current max pool size is " + this.messageExecutor.getActiveCount()));
                        if (!this.allowDumpStack) break block13;
                        String name = ManagementFactory.getRuntimeMXBean().getName();
                        String pid = name.split("@")[0];
                        int idx = new Random().nextInt(100);
                        try {
                            Runtime.getRuntime().exec("jstack " + pid + " >d:/" + idx + ".log");
                        }
                        catch (IOException exx) {
                            LOGGER.error(exx.getMessage());
                        }
                        this.allowDumpStack = false;
                    }
                } else {
                    MessageFuture messageFuture = this.futures.remove(rpcMessage.getId());
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug(String.format("%s msgId:%s, future :%s, body:%s", this, rpcMessage.getId(), messageFuture, rpcMessage.getBody()));
                    }
                    if (messageFuture != null) {
                        messageFuture.setResultMessage(rpcMessage.getBody());
                    } else {
                        try {
                            this.messageExecutor.execute(new Runnable(){

                                @Override
                                public void run() {
                                    try {
                                        AbstractRpcRemoting.this.dispatch(rpcMessage, ctx);
                                    }
                                    catch (Throwable th) {
                                        LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), (Object)th.getMessage(), (Object)th);
                                    }
                                }
                            });
                        }
                        catch (RejectedExecutionException e) {
                            LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(), (Object)("thread pool is full, current max pool size is " + this.messageExecutor.getActiveCount()));
                        }
                    }
                }
            }
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        LOGGER.error(FrameworkErrorCode.ExceptionCaught.getErrCode(), (Object)(ctx.channel() + " connect exception. " + cause.getMessage()), (Object)cause);
        try {
            this.destroyChannel(ctx.channel());
        }
        catch (Exception e) {
            LOGGER.error("", (Object)("close channel" + ctx.channel() + " fail."), (Object)e);
        }
    }

    public abstract void dispatch(RpcMessage var1, ChannelHandlerContext var2);

    public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info(ctx + " will closed");
        }
        super.close(ctx, future);
    }

    protected void addChannelPipelineLast(Channel channel, ChannelHandler ... handlers) {
        if (null != channel && null != handlers) {
            channel.pipeline().addLast(handlers);
        }
    }

    protected void setChannelHandlers(ChannelHandler ... handlers) {
        this.channelHandlers = handlers;
    }

    public String getGroup() {
        return this.group;
    }

    public void setGroup(String group) {
        this.group = group;
    }

    public void destroyChannel(Channel channel) {
        this.destroyChannel(this.getAddressFromChannel(channel), channel);
    }

    public abstract void destroyChannel(String var1, Channel var2);

    protected String getAddressFromContext(ChannelHandlerContext ctx) {
        return this.getAddressFromChannel(ctx.channel());
    }

    protected String getAddressFromChannel(Channel channel) {
        SocketAddress socketAddress = channel.remoteAddress();
        String address = socketAddress.toString();
        if (socketAddress.toString().indexOf(NettyClientConfig.getSocketAddressStartChar()) == 0) {
            address = socketAddress.toString().substring(NettyClientConfig.getSocketAddressStartChar().length());
        }
        return address;
    }
}

