/*
 * Decompiled with CFR 0.152.
 */
package com.moilioncircle.redis.replicator;

import com.moilioncircle.redis.replicator.AbstractReplicator;
import com.moilioncircle.redis.replicator.AbstractReplicatorRetrier;
import com.moilioncircle.redis.replicator.Configuration;
import com.moilioncircle.redis.replicator.DefaultExceptionListener;
import com.moilioncircle.redis.replicator.Status;
import com.moilioncircle.redis.replicator.cmd.BulkReplyHandler;
import com.moilioncircle.redis.replicator.cmd.CommandName;
import com.moilioncircle.redis.replicator.cmd.CommandParser;
import com.moilioncircle.redis.replicator.cmd.OffsetHandler;
import com.moilioncircle.redis.replicator.cmd.RedisCodec;
import com.moilioncircle.redis.replicator.cmd.ReplyParser;
import com.moilioncircle.redis.replicator.event.Event;
import com.moilioncircle.redis.replicator.event.PostCommandSyncEvent;
import com.moilioncircle.redis.replicator.event.PreCommandSyncEvent;
import com.moilioncircle.redis.replicator.io.AsyncBufferedInputStream;
import com.moilioncircle.redis.replicator.io.RateLimitInputStream;
import com.moilioncircle.redis.replicator.io.RedisInputStream;
import com.moilioncircle.redis.replicator.io.RedisOutputStream;
import com.moilioncircle.redis.replicator.net.RedisSocketFactory;
import com.moilioncircle.redis.replicator.rdb.RdbParser;
import com.moilioncircle.redis.replicator.util.Concurrents;
import com.moilioncircle.redis.replicator.util.Strings;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedisSocketReplicator
extends AbstractReplicator {
    protected static final Logger logger = LoggerFactory.getLogger(RedisSocketReplicator.class);
    protected final int port;
    protected final String host;
    protected Socket socket;
    protected ReplyParser replyParser;
    protected ScheduledFuture<?> heartbeat;
    protected RedisOutputStream outputStream;
    protected ScheduledExecutorService executor;
    protected final RedisSocketFactory socketFactory;

    public RedisSocketReplicator(String host, int port, Configuration configuration) {
        Objects.requireNonNull(host);
        if (port <= 0 || port > 65535) {
            throw new IllegalArgumentException("illegal argument port: " + port);
        }
        Objects.requireNonNull(configuration);
        this.host = host;
        this.port = port;
        this.configuration = configuration;
        this.socketFactory = new RedisSocketFactory(configuration);
        this.builtInCommandParserRegister();
        if (configuration.isUseDefaultExceptionListener()) {
            this.addExceptionListener(new DefaultExceptionListener());
        }
    }

    public String getHost() {
        return this.host;
    }

    public int getPort() {
        return this.port;
    }

    @Override
    public void open() throws IOException {
        super.open();
        this.executor = Executors.newSingleThreadScheduledExecutor();
        try {
            new RedisSocketReplicatorRetrier().retry(this);
        }
        finally {
            this.doClose();
            this.doCloseListener(this);
            Concurrents.terminateQuietly(this.executor, this.configuration.getConnectionTimeout(), TimeUnit.MILLISECONDS);
        }
    }

    protected SyncMode trySync(String reply) throws IOException {
        logger.info(reply);
        if (reply.startsWith("FULLRESYNC")) {
            this.parseDump(this);
            String[] ary = reply.split(" ");
            this.configuration.setReplId(ary[1]);
            this.configuration.setReplOffset(Long.parseLong(ary[2]));
            return SyncMode.PSYNC;
        }
        if (reply.startsWith("CONTINUE")) {
            String[] ary = reply.split(" ");
            String replId = this.configuration.getReplId();
            if (ary.length > 1 && replId != null && !replId.equals(ary[1])) {
                this.configuration.setReplId(ary[1]);
            }
            return SyncMode.PSYNC;
        }
        if (reply.startsWith("NOMASTERLINK") || reply.startsWith("LOADING")) {
            return SyncMode.SYNC_LATER;
        }
        logger.info("SYNC");
        this.send("SYNC".getBytes());
        this.parseDump(this);
        return SyncMode.SYNC;
    }

    protected void parseDump(final AbstractReplicator replicator) throws IOException {
        byte[] rawReply = (byte[])this.reply(new BulkReplyHandler(){

            @Override
            public byte[] handle(long len, RedisInputStream in) throws IOException {
                if (len != -1L) {
                    logger.info("RDB dump file size:{}", (Object)len);
                } else {
                    logger.info("Disk-less replication.");
                }
                if (len != -1L && RedisSocketReplicator.this.configuration.isDiscardRdbEvent()) {
                    logger.info("discard {} bytes", (Object)len);
                    in.skip(len);
                } else {
                    new RdbParser(in, replicator).parse();
                    if (len == -1L) {
                        in.skip(40L, false);
                    }
                }
                return "OK".getBytes();
            }
        });
        String reply = Strings.toString(rawReply);
        if ("OK".equals(reply)) {
            return;
        }
        throw new IOException("SYNC failed. reason : [" + reply + "]");
    }

    protected void establishConnection() throws IOException {
        this.connect();
        if (this.configuration.getAuthPassword() != null) {
            this.auth(this.configuration.getAuthPassword());
        }
        this.sendPing();
        this.sendSlavePort();
        this.sendSlaveIp();
        this.sendSlaveCapa("eof");
        this.sendSlaveCapa("psync2");
    }

    protected void auth(String password) throws IOException {
        if (password != null) {
            logger.info("AUTH {}", (Object)password);
            this.send("AUTH".getBytes(), new byte[][]{password.getBytes()});
            String reply = Strings.toString(this.reply());
            logger.info(reply);
            if ("OK".equals(reply)) {
                return;
            }
            if (reply.contains("no password")) {
                logger.warn("[AUTH {}] failed. {}", (Object)password, (Object)reply);
                return;
            }
            throw new AssertionError((Object)("[AUTH " + password + "] failed. " + reply));
        }
    }

    protected void sendPing() throws IOException {
        logger.info("PING");
        this.send("PING".getBytes());
        String reply = Strings.toString(this.reply());
        logger.info(reply);
        if ("PONG".equalsIgnoreCase(reply)) {
            return;
        }
        if (reply.contains("NOAUTH")) {
            throw new AssertionError((Object)reply);
        }
        if (reply.contains("operation not permitted")) {
            throw new AssertionError((Object)"-NOAUTH Authentication required.");
        }
        logger.warn("[PING] failed. {}", (Object)reply);
    }

    protected void sendSlavePort() throws IOException {
        logger.info("REPLCONF listening-port {}", (Object)this.socket.getLocalPort());
        this.send("REPLCONF".getBytes(), "listening-port".getBytes(), String.valueOf(this.socket.getLocalPort()).getBytes());
        String reply = Strings.toString(this.reply());
        logger.info(reply);
        if ("OK".equals(reply)) {
            return;
        }
        logger.warn("[REPLCONF listening-port {}] failed. {}", (Object)this.socket.getLocalPort(), (Object)reply);
    }

    protected void sendSlaveIp() throws IOException {
        logger.info("REPLCONF ip-address {}", (Object)this.socket.getLocalAddress().getHostAddress());
        this.send("REPLCONF".getBytes(), "ip-address".getBytes(), this.socket.getLocalAddress().getHostAddress().getBytes());
        String reply = Strings.toString(this.reply());
        logger.info(reply);
        if ("OK".equals(reply)) {
            return;
        }
        logger.warn("[REPLCONF ip-address {}] failed. {}", (Object)this.socket.getLocalAddress().getHostAddress(), (Object)reply);
    }

    protected void sendSlaveCapa(String cmd) throws IOException {
        logger.info("REPLCONF capa {}", (Object)cmd);
        this.send("REPLCONF".getBytes(), "capa".getBytes(), cmd.getBytes());
        String reply = Strings.toString(this.reply());
        logger.info(reply);
        if ("OK".equals(reply)) {
            return;
        }
        logger.warn("[REPLCONF capa {}] failed. {}", (Object)cmd, (Object)reply);
    }

    protected void heartbeat() {
        assert (this.heartbeat == null || this.heartbeat.isCancelled());
        this.heartbeat = this.executor.scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                RedisSocketReplicator.this.sendQuietly("REPLCONF".getBytes(), "ACK".getBytes(), String.valueOf(RedisSocketReplicator.this.configuration.getReplOffset()).getBytes());
            }
        }, this.configuration.getHeartbeatPeriod(), this.configuration.getHeartbeatPeriod(), TimeUnit.MILLISECONDS);
        logger.info("heartbeat started.");
    }

    protected void send(byte[] command) throws IOException {
        this.send(command, new byte[0][]);
    }

    protected void send(byte[] command, byte[] ... args) throws IOException {
        this.outputStream.write(42);
        this.outputStream.write(String.valueOf(args.length + 1).getBytes());
        this.outputStream.writeCrLf();
        this.outputStream.write(36);
        this.outputStream.write(String.valueOf(command.length).getBytes());
        this.outputStream.writeCrLf();
        this.outputStream.write(command);
        this.outputStream.writeCrLf();
        for (byte[] arg : args) {
            this.outputStream.write(36);
            this.outputStream.write(String.valueOf(arg.length).getBytes());
            this.outputStream.writeCrLf();
            this.outputStream.write(arg);
            this.outputStream.writeCrLf();
        }
        this.outputStream.flush();
    }

    protected void sendQuietly(byte[] command, byte[] ... args) {
        try {
            this.send(command, args);
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    protected <T> T reply() throws IOException {
        return (T)this.replyParser.parse();
    }

    protected <T> T reply(BulkReplyHandler handler) throws IOException {
        return (T)this.replyParser.parse(handler);
    }

    protected void connect() throws IOException {
        if (!this.compareAndSet(Status.DISCONNECTED, Status.CONNECTING)) {
            return;
        }
        try {
            this.socket = this.socketFactory.createSocket(this.host, this.port, this.configuration.getConnectionTimeout());
            this.outputStream = new RedisOutputStream(this.socket.getOutputStream());
            InputStream inputStream = this.socket.getInputStream();
            if (this.configuration.getAsyncCachedBytes() > 0) {
                inputStream = new AsyncBufferedInputStream(inputStream, this.configuration.getAsyncCachedBytes());
            }
            if (this.configuration.getRateLimit() > 0) {
                inputStream = new RateLimitInputStream(inputStream, this.configuration.getRateLimit());
            }
            this.inputStream = new RedisInputStream(inputStream, this.configuration.getBufferSize());
            this.inputStream.setRawByteListeners(this.rawByteListeners);
            this.replyParser = new ReplyParser(this.inputStream, new RedisCodec());
            logger.info("Connected to redis-server[{}:{}]", (Object)this.host, (Object)this.port);
        }
        finally {
            this.setStatus(Status.CONNECTED);
        }
    }

    @Override
    protected void doClose() throws IOException {
        this.compareAndSet(Status.CONNECTED, Status.DISCONNECTING);
        try {
            if (this.heartbeat != null) {
                if (!this.heartbeat.isCancelled()) {
                    this.heartbeat.cancel(true);
                }
                logger.info("heartbeat canceled.");
            }
            try {
                if (this.inputStream != null) {
                    this.inputStream.setRawByteListeners(null);
                    this.inputStream.close();
                }
            }
            catch (IOException iOException) {
                // empty catch block
            }
            try {
                if (this.outputStream != null) {
                    this.outputStream.close();
                }
            }
            catch (IOException iOException) {
                // empty catch block
            }
            try {
                if (this.socket != null && !this.socket.isClosed()) {
                    this.socket.close();
                }
            }
            catch (IOException iOException) {
                // empty catch block
            }
            logger.info("socket closed. redis-server[{}:{}]", (Object)this.host, (Object)this.port);
        }
        finally {
            this.setStatus(Status.DISCONNECTED);
        }
    }

    private class RedisSocketReplicatorRetrier
    extends AbstractReplicatorRetrier {
        private RedisSocketReplicatorRetrier() {
        }

        @Override
        protected boolean connect() throws IOException {
            RedisSocketReplicator.this.establishConnection();
            return true;
        }

        @Override
        protected boolean close(IOException reason) throws IOException {
            if (reason != null) {
                logger.error("[redis-replicator] socket error. redis-server[{}:{}]", new Object[]{RedisSocketReplicator.this.host, RedisSocketReplicator.this.port, reason});
            }
            RedisSocketReplicator.this.doClose();
            if (reason != null) {
                logger.info("reconnecting to redis-server[{}:{}]. retry times:{}", new Object[]{RedisSocketReplicator.this.host, RedisSocketReplicator.this.port, this.retries + 1});
            }
            return true;
        }

        @Override
        protected boolean isManualClosed() {
            return RedisSocketReplicator.this.isClosed();
        }

        @Override
        protected boolean open() throws IOException {
            String replId = RedisSocketReplicator.this.configuration.getReplId();
            long replOffset = RedisSocketReplicator.this.configuration.getReplOffset();
            logger.info("PSYNC {} {}", (Object)replId, (Object)String.valueOf(replOffset >= 0L ? replOffset + 1L : replOffset));
            RedisSocketReplicator.this.send("PSYNC".getBytes(), replId.getBytes(), String.valueOf(replOffset >= 0L ? replOffset + 1L : replOffset).getBytes());
            String reply = Strings.toString(RedisSocketReplicator.this.reply());
            SyncMode mode = RedisSocketReplicator.this.trySync(reply);
            if (mode == SyncMode.PSYNC && RedisSocketReplicator.this.getStatus() == Status.CONNECTED) {
                RedisSocketReplicator.this.heartbeat();
            } else if (mode == SyncMode.SYNC_LATER && RedisSocketReplicator.this.getStatus() == Status.CONNECTED) {
                return false;
            }
            if (RedisSocketReplicator.this.getStatus() != Status.CONNECTED) {
                return true;
            }
            RedisSocketReplicator.this.submitEvent(new PreCommandSyncEvent());
            final long[] offset = new long[1];
            while (RedisSocketReplicator.this.getStatus() == Status.CONNECTED) {
                Object obj = RedisSocketReplicator.this.replyParser.parse(new OffsetHandler(){

                    @Override
                    public void handle(long len) {
                        offset[0] = len;
                    }
                });
                if (obj instanceof Object[]) {
                    Object[] raw;
                    CommandName name;
                    CommandParser parser;
                    if (RedisSocketReplicator.this.verbose() && logger.isDebugEnabled()) {
                        logger.debug(Strings.format((Object[])obj));
                    }
                    if ((parser = (CommandParser)RedisSocketReplicator.this.commands.get(name = CommandName.name(Strings.toString((raw = (Object[])obj)[0])))) == null) {
                        logger.warn("command [{}] not register. raw command:{}", (Object)name, (Object)Strings.format(raw));
                        RedisSocketReplicator.this.configuration.addOffset(offset[0]);
                        offset[0] = 0L;
                        continue;
                    }
                    if (!Strings.isEquals(Strings.toString(raw[0]), "PING")) {
                        if (Strings.isEquals(Strings.toString(raw[0]), "REPLCONF") && Strings.isEquals(Strings.toString(raw[1]), "GETACK")) {
                            if (mode == SyncMode.PSYNC) {
                                RedisSocketReplicator.this.executor.execute(new Runnable(){

                                    @Override
                                    public void run() {
                                        RedisSocketReplicator.this.sendQuietly("REPLCONF".getBytes(), "ACK".getBytes(), String.valueOf(RedisSocketReplicator.this.configuration.getReplOffset()).getBytes());
                                    }
                                });
                            }
                        } else {
                            RedisSocketReplicator.this.submitEvent((Event)parser.parse(raw));
                        }
                    }
                } else {
                    logger.info("unexpected redis reply:{}", obj);
                }
                RedisSocketReplicator.this.configuration.addOffset(offset[0]);
                offset[0] = 0L;
            }
            if (RedisSocketReplicator.this.getStatus() == Status.CONNECTED) {
                RedisSocketReplicator.this.submitEvent(new PostCommandSyncEvent());
            }
            return true;
        }
    }

    protected static enum SyncMode {
        SYNC,
        PSYNC,
        SYNC_LATER;

    }
}

