/*
 * Decompiled with CFR 0.152.
 */
package org.apache.omid.tso;

import com.google.protobuf.MessageLite;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.metrics.NullMetricsProvider;
import org.apache.omid.proto.TSOProto;
import org.apache.omid.tso.MonitoringContext;
import org.apache.omid.tso.MonitoringContextImpl;
import org.apache.omid.tso.RequestProcessor;
import org.apache.omid.tso.TSOChannelHandler;
import org.apache.omid.tso.TSOServerConfig;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class TestTSOChannelHandlerNetty {
    private static final Logger LOG = LoggerFactory.getLogger(TestTSOChannelHandlerNetty.class);
    @Mock
    private RequestProcessor requestProcessor;
    private TSOChannelHandler channelHandler;

    @BeforeMethod
    public void beforeTestMethod() {
        MockitoAnnotations.initMocks((Object)this);
        TSOServerConfig config = new TSOServerConfig();
        config.setPort(1434);
        this.channelHandler = new TSOChannelHandler(config, this.requestProcessor, (MetricsRegistry)new NullMetricsProvider());
    }

    @AfterMethod
    public void afterTestMethod() throws IOException {
        this.channelHandler.close();
    }

    @Test(timeOut=10000L)
    public void testMainAPI() throws Exception {
        Assert.assertNull((Object)this.channelHandler.listeningChannel);
        Assert.assertNull((Object)this.channelHandler.channelGroup);
        this.channelHandler.reconnect();
        Assert.assertTrue((boolean)this.channelHandler.listeningChannel.isOpen());
        Assert.assertEquals((int)this.channelHandler.channelGroup.size(), (int)1);
        Assert.assertEquals((int)((InetSocketAddress)this.channelHandler.listeningChannel.getLocalAddress()).getPort(), (int)1434);
        this.channelHandler.closeConnection();
        Assert.assertFalse((boolean)this.channelHandler.listeningChannel.isOpen());
        Assert.assertEquals((int)this.channelHandler.channelGroup.size(), (int)0);
        this.channelHandler.closeConnection();
        Assert.assertFalse((boolean)this.channelHandler.listeningChannel.isOpen());
        Assert.assertEquals((int)this.channelHandler.channelGroup.size(), (int)0);
        this.channelHandler.reconnect();
        Assert.assertTrue((boolean)this.channelHandler.listeningChannel.isOpen());
        Assert.assertEquals((int)this.channelHandler.channelGroup.size(), (int)1);
        this.channelHandler.reconnect();
        Assert.assertTrue((boolean)this.channelHandler.listeningChannel.isOpen());
        Assert.assertEquals((int)this.channelHandler.channelGroup.size(), (int)1);
        this.channelHandler.close();
        Assert.assertFalse((boolean)this.channelHandler.listeningChannel.isOpen());
        Assert.assertEquals((int)this.channelHandler.channelGroup.size(), (int)0);
        try {
            this.channelHandler.reconnect();
        }
        catch (ChannelException e) {
            Assert.assertFalse((boolean)this.channelHandler.listeningChannel.isOpen());
            Assert.assertEquals((int)this.channelHandler.channelGroup.size(), (int)0);
        }
    }

    @Test(timeOut=10000L)
    public void testNettyConnectionToTSOFromClient() throws Exception {
        ClientBootstrap nettyClient = this.createNettyClientBootstrap();
        ChannelFuture channelF = nettyClient.connect((SocketAddress)new InetSocketAddress("localhost", 1434));
        while (!channelF.isDone()) {
        }
        Assert.assertFalse((boolean)channelF.isSuccess());
        this.channelHandler.reconnect();
        Assert.assertTrue((boolean)this.channelHandler.listeningChannel.isOpen());
        Assert.assertEquals((int)this.channelHandler.channelGroup.size(), (int)1);
        channelF = nettyClient.connect((SocketAddress)new InetSocketAddress("localhost", 1434));
        while (!channelF.isDone()) {
        }
        Assert.assertTrue((boolean)channelF.isSuccess());
        Assert.assertTrue((boolean)channelF.getChannel().isConnected());
        while (this.channelHandler.channelGroup.size() != 2) {
        }
        channelF.getChannel().close().await();
        while (this.channelHandler.channelGroup.size() != 1) {
        }
        channelF = nettyClient.connect((SocketAddress)new InetSocketAddress("localhost", 1434));
        while (!channelF.isDone()) {
        }
        Assert.assertTrue((boolean)channelF.isSuccess());
        while (this.channelHandler.channelGroup.size() != 2) {
        }
        this.channelHandler.closeConnection();
        Assert.assertFalse((boolean)this.channelHandler.listeningChannel.isOpen());
        Assert.assertEquals((int)this.channelHandler.channelGroup.size(), (int)0);
        TimeUnit.SECONDS.sleep(1L);
        Assert.assertFalse((boolean)channelF.getChannel().isOpen());
        this.channelHandler.reconnect();
        Assert.assertTrue((boolean)this.channelHandler.listeningChannel.isOpen());
        Assert.assertEquals((int)this.channelHandler.channelGroup.size(), (int)1);
        channelF = nettyClient.connect((SocketAddress)new InetSocketAddress("localhost", 1434));
        while (!channelF.isDone()) {
        }
        Assert.assertTrue((boolean)channelF.isSuccess());
        while (this.channelHandler.channelGroup.size() != 2) {
        }
        this.channelHandler.reconnect();
        Assert.assertTrue((boolean)this.channelHandler.listeningChannel.isOpen());
        Assert.assertEquals((int)this.channelHandler.channelGroup.size(), (int)1);
        TimeUnit.SECONDS.sleep(1L);
        Assert.assertFalse((boolean)channelF.getChannel().isOpen());
        this.channelHandler.close();
        Assert.assertFalse((boolean)this.channelHandler.listeningChannel.isOpen());
        Assert.assertEquals((int)this.channelHandler.channelGroup.size(), (int)0);
    }

    @Test(timeOut=10000L)
    public void testNettyChannelWriting() throws Exception {
        this.channelHandler.reconnect();
        ClientBootstrap nettyClient = this.createNettyClientBootstrap();
        ChannelFuture channelF = nettyClient.connect((SocketAddress)new InetSocketAddress("localhost", 1434));
        while (!channelF.isDone()) {
        }
        Assert.assertTrue((boolean)channelF.isSuccess());
        Assert.assertTrue((boolean)channelF.getChannel().isConnected());
        Channel channel = channelF.getChannel();
        while (this.channelHandler.channelGroup.size() != 2) {
        }
        TSOProto.HandshakeRequest.Builder handshake = TSOProto.HandshakeRequest.newBuilder();
        handshake.setClientCapabilities(TSOProto.Capabilities.newBuilder().build());
        channelF.getChannel().write((Object)TSOProto.Request.newBuilder().setHandshakeRequest(handshake.build()).build());
        this.testWritingTimestampRequest(channel);
        this.testWritingCommitRequest(channel);
        this.testWritingFenceRequest(channel);
    }

    private void testWritingTimestampRequest(Channel channel) throws InterruptedException {
        Mockito.reset((Object[])new RequestProcessor[]{this.requestProcessor});
        TSOProto.Request.Builder tsBuilder = TSOProto.Request.newBuilder();
        TSOProto.TimestampRequest.Builder tsRequestBuilder = TSOProto.TimestampRequest.newBuilder();
        tsBuilder.setTimestampRequest(tsRequestBuilder.build());
        channel.write((Object)tsBuilder.build()).await();
        ((RequestProcessor)Mockito.verify((Object)this.requestProcessor, (VerificationMode)Mockito.timeout((int)100).times(1))).timestampRequest((Channel)Matchers.any(Channel.class), (MonitoringContext)Matchers.any(MonitoringContextImpl.class));
        ((RequestProcessor)Mockito.verify((Object)this.requestProcessor, (VerificationMode)Mockito.timeout((int)100).never())).commitRequest(Matchers.anyLong(), Matchers.anyCollectionOf(Long.class), Matchers.anyCollectionOf(Long.class), Matchers.anyBoolean(), (Channel)Matchers.any(Channel.class), (MonitoringContext)Matchers.any(MonitoringContextImpl.class));
    }

    private void testWritingCommitRequest(Channel channel) throws InterruptedException {
        Mockito.reset((Object[])new RequestProcessor[]{this.requestProcessor});
        TSOProto.Request.Builder commitBuilder = TSOProto.Request.newBuilder();
        TSOProto.CommitRequest.Builder commitRequestBuilder = TSOProto.CommitRequest.newBuilder();
        commitRequestBuilder.setStartTimestamp(666L);
        commitRequestBuilder.addCellId(666L);
        commitBuilder.setCommitRequest(commitRequestBuilder.build());
        TSOProto.Request r = commitBuilder.build();
        Assert.assertTrue((boolean)r.hasCommitRequest());
        channel.write((Object)commitBuilder.build()).await();
        ((RequestProcessor)Mockito.verify((Object)this.requestProcessor, (VerificationMode)Mockito.timeout((int)100).never())).timestampRequest((Channel)Matchers.any(Channel.class), (MonitoringContext)Matchers.any(MonitoringContextImpl.class));
        ((RequestProcessor)Mockito.verify((Object)this.requestProcessor, (VerificationMode)Mockito.timeout((int)100).times(1))).commitRequest(Matchers.eq((long)666L), Matchers.anyCollectionOf(Long.class), Matchers.anyCollectionOf(Long.class), Matchers.eq((boolean)false), (Channel)Matchers.any(Channel.class), (MonitoringContext)Matchers.any(MonitoringContextImpl.class));
    }

    private void testWritingFenceRequest(Channel channel) throws InterruptedException {
        Mockito.reset((Object[])new RequestProcessor[]{this.requestProcessor});
        TSOProto.Request.Builder fenceBuilder = TSOProto.Request.newBuilder();
        TSOProto.FenceRequest.Builder fenceRequestBuilder = TSOProto.FenceRequest.newBuilder();
        fenceRequestBuilder.setTableId(666L);
        fenceBuilder.setFenceRequest(fenceRequestBuilder.build());
        TSOProto.Request r = fenceBuilder.build();
        Assert.assertTrue((boolean)r.hasFenceRequest());
        channel.write((Object)fenceBuilder.build()).await();
        ((RequestProcessor)Mockito.verify((Object)this.requestProcessor, (VerificationMode)Mockito.timeout((int)100).never())).timestampRequest((Channel)Matchers.any(Channel.class), (MonitoringContext)Matchers.any(MonitoringContextImpl.class));
        ((RequestProcessor)Mockito.verify((Object)this.requestProcessor, (VerificationMode)Mockito.timeout((int)100).times(1))).fenceRequest(Matchers.eq((long)666L), (Channel)Matchers.any(Channel.class), (MonitoringContext)Matchers.any(MonitoringContextImpl.class));
    }

    private ClientBootstrap createNettyClientBootstrap() {
        NioClientSocketChannelFactory factory = new NioClientSocketChannelFactory((Executor)Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("client-boss-%d").build()), (Executor)Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("client-worker-%d").build()), 1);
        ClientBootstrap bootstrap = new ClientBootstrap((ChannelFactory)factory);
        bootstrap.setOption("tcpNoDelay", (Object)true);
        bootstrap.setOption("keepAlive", (Object)true);
        bootstrap.setOption("reuseAddress", (Object)true);
        bootstrap.setOption("connectTimeoutMillis", (Object)100);
        ChannelPipeline pipeline = bootstrap.getPipeline();
        pipeline.addLast("lengthbaseddecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(8192, 0, 4, 0, 4));
        pipeline.addLast("lengthprepender", (ChannelHandler)new LengthFieldPrepender(4));
        pipeline.addLast("protobufdecoder", (ChannelHandler)new ProtobufDecoder((MessageLite)TSOProto.Response.getDefaultInstance()));
        pipeline.addLast("protobufencoder", (ChannelHandler)new ProtobufEncoder());
        pipeline.addLast("handler", (ChannelHandler)new SimpleChannelHandler(){

            public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
                LOG.info("Channel {} connected", (Object)ctx.getChannel());
            }

            public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
                LOG.error("Error on channel {}", (Object)ctx.getChannel(), (Object)e.getCause());
            }
        });
        return bootstrap;
    }
}

