/*
 * Decompiled with CFR 0.152.
 */
package kafka.controller;

import java.util.concurrent.LinkedBlockingQueue;
import kafka.api.RequestOrResponse;
import kafka.cluster.Broker;
import kafka.controller.ControllerBrokerStateInfo;
import kafka.controller.ControllerContext;
import kafka.controller.RequestSendThread;
import kafka.network.BlockingChannel;
import kafka.network.BlockingChannel$;
import kafka.server.KafkaConfig;
import kafka.utils.Log4jController$;
import kafka.utils.Logging;
import kafka.utils.Logging$class;
import org.apache.log4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 * Duplicate member names - consider using --renamedupmembers true
 */
@ScalaSignature(bytes="\u0006\u0001\u0005ma\u0001B\u0001\u0003\u0001\u001d\u0011\u0001dQ8oiJ|G\u000e\\3s\u0007\"\fgN\\3m\u001b\u0006t\u0017mZ3s\u0015\t\u0019A!\u0001\u0006d_:$(o\u001c7mKJT\u0011!B\u0001\u0006W\u000647.Y\u0002\u0001'\u0011\u0001\u0001\u0002\u0005\f\u0011\u0005%qQ\"\u0001\u0006\u000b\u0005-a\u0011\u0001\u00027b]\u001eT\u0011!D\u0001\u0005U\u00064\u0018-\u0003\u0002\u0010\u0015\t1qJ\u00196fGR\u0004\"!\u0005\u000b\u000e\u0003IQ!a\u0005\u0003\u0002\u000bU$\u0018\u000e\\:\n\u0005U\u0011\"a\u0002'pO\u001eLgn\u001a\t\u0003/ii\u0011\u0001\u0007\u0006\u00023\u0005)1oY1mC&\u00111\u0004\u0007\u0002\f'\u000e\fG.Y(cU\u0016\u001cG\u000f\u0003\u0005\u001e\u0001\t\u0015\r\u0011\"\u0003\u001f\u0003E\u0019wN\u001c;s_2dWM]\"p]R,\u0007\u0010^\u000b\u0002?A\u0011\u0001%I\u0007\u0002\u0005%\u0011!E\u0001\u0002\u0012\u0007>tGO]8mY\u0016\u00148i\u001c8uKb$\b\u0002\u0003\u0013\u0001\u0005\u0003\u0005\u000b\u0011B\u0010\u0002%\r|g\u000e\u001e:pY2,'oQ8oi\u0016DH\u000f\t\u0005\tM\u0001\u0011\t\u0011)A\u0005O\u000511m\u001c8gS\u001e\u0004\"\u0001K\u0016\u000e\u0003%R!A\u000b\u0003\u0002\rM,'O^3s\u0013\ta\u0013FA\u0006LC\u001a\\\u0017mQ8oM&<\u0007\"\u0002\u0018\u0001\t\u0003y\u0013A\u0002\u001fj]&$h\bF\u00021cI\u0002\"\u0001\t\u0001\t\u000bui\u0003\u0019A\u0010\t\u000b\u0019j\u0003\u0019A\u0014\t\u000fQ\u0002!\u0019!C\u0005k\u0005y!M]8lKJ\u001cF/\u0019;f\u0013:4w.F\u00017!\u00119DHP!\u000e\u0003aR!!\u000f\u001e\u0002\u000f5,H/\u00192mK*\u00111\bG\u0001\u000bG>dG.Z2uS>t\u0017BA\u001f9\u0005\u001dA\u0015m\u001d5NCB\u0004\"aF \n\u0005\u0001C\"aA%oiB\u0011\u0001EQ\u0005\u0003\u0007\n\u0011\u0011dQ8oiJ|G\u000e\\3s\u0005J|7.\u001a:Ti\u0006$X-\u00138g_\"1Q\t\u0001Q\u0001\nY\n\u0001C\u0019:pW\u0016\u00148\u000b^1uK&sgm\u001c\u0011\t\u000f\u001d\u0003!\u0019!C\u0005\u0011\u0006Q!M]8lKJdunY6\u0016\u0003!AaA\u0013\u0001!\u0002\u0013A\u0011a\u00032s_.,'\u000fT8dW\u0002BQ\u0001\u0014\u0001\u0005\u00025\u000bqa\u001d;beR,\b\u000fF\u0001O!\t9r*\u0003\u0002Q1\t!QK\\5u\u0011\u0015\u0011\u0006\u0001\"\u0001N\u0003!\u0019\b.\u001e;e_^t\u0007\"\u0002+\u0001\t\u0003)\u0016aC:f]\u0012\u0014V-];fgR$BA\u0014,YA\")qk\u0015a\u0001}\u0005A!M]8lKJLE\rC\u0003Z'\u0002\u0007!,A\u0004sKF,Xm\u001d;\u0011\u0005msV\"\u0001/\u000b\u0005u#\u0011aA1qS&\u0011q\f\u0018\u0002\u0012%\u0016\fX/Z:u\u001fJ\u0014Vm\u001d9p]N,\u0007bB1T!\u0003\u0005\rAY\u0001\tG\u0006dGNY1dWB!qc\u0019.O\u0013\t!\u0007DA\u0005Gk:\u001cG/[8oc!)a\r\u0001C\u0001O\u0006I\u0011\r\u001a3Ce>\\WM\u001d\u000b\u0003\u001d\"DQ![3A\u0002)\faA\u0019:pW\u0016\u0014\bCA6o\u001b\u0005a'BA7\u0005\u0003\u001d\u0019G.^:uKJL!a\u001c7\u0003\r\t\u0013xn[3s\u0011\u0015\t\b\u0001\"\u0001s\u00031\u0011X-\\8wK\n\u0013xn[3s)\tq5\u000fC\u0003Xa\u0002\u0007a\bC\u0003v\u0001\u0011%a/\u0001\u0007bI\u0012tUm\u001e\"s_.,'\u000f\u0006\u0002Oo\")\u0011\u000e\u001ea\u0001U\")\u0011\u0010\u0001C\u0005u\u0006!\"/Z7pm\u0016,\u00050[:uS:<'I]8lKJ$\"AT>\t\u000b]C\b\u0019\u0001 \t\u000bu\u0004A\u0011\u0002@\u0002-M$\u0018M\u001d;SKF,Xm\u001d;TK:$G\u000b\u001b:fC\u0012$\"AT@\t\u000b]c\b\u0019\u0001 \t\u0013\u0005\r\u0001!%A\u0005\u0002\u0005\u0015\u0011!F:f]\u0012\u0014V-];fgR$C-\u001a4bk2$HeM\u000b\u0003\u0003\u000fQ3AYA\u0005W\t\tY\u0001\u0005\u0003\u0002\u000e\u0005]QBAA\b\u0015\u0011\t\t\"a\u0005\u0002\u0013Ut7\r[3dW\u0016$'bAA\u000b1\u0005Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\t\u0005e\u0011q\u0002\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,\u0007")
public class ControllerChannelManager
implements Logging {
    private final ControllerContext controllerContext;
    public final KafkaConfig kafka$controller$ControllerChannelManager$$config;
    private final HashMap<Object, ControllerBrokerStateInfo> brokerStateInfo;
    private final Object brokerLock;
    private final String loggerName;
    private final Logger logger;
    private String logIdent;
    private final Log4jController$ kafka$utils$Logging$$log4jController;
    public volatile int bitmap$0;

    @Override
    public String loggerName() {
        return this.loggerName;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public Logger logger() {
        if ((this.bitmap$0 & 1) != 0) return this.logger;
        ControllerChannelManager controllerChannelManager = this;
        synchronized (controllerChannelManager) {
            if ((this.bitmap$0 & 1) == 0) {
                this.logger = Logging$class.logger(this);
                this.bitmap$0 |= 1;
            }
            return this.logger;
        }
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String string) {
        this.logIdent = string;
    }

    @Override
    public final Log4jController$ kafka$utils$Logging$$log4jController() {
        return this.kafka$utils$Logging$$log4jController;
    }

    @Override
    public void kafka$utils$Logging$_setter_$loggerName_$eq(String string) {
        this.loggerName = string;
    }

    @Override
    public void kafka$utils$Logging$_setter_$kafka$utils$Logging$$log4jController_$eq(Log4jController$ log4jController$) {
        this.kafka$utils$Logging$$log4jController = log4jController$;
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging$class.trace(this, msg);
    }

    @Override
    public Object trace(Function0<Throwable> e) {
        return Logging$class.trace(this, e);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.trace(this, msg, e);
    }

    @Override
    public void swallowTrace(Function0<BoxedUnit> action) {
        Logging$class.swallowTrace(this, action);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging$class.debug(this, msg);
    }

    @Override
    public Object debug(Function0<Throwable> e) {
        return Logging$class.debug(this, e);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.debug(this, msg, e);
    }

    @Override
    public void swallowDebug(Function0<BoxedUnit> action) {
        Logging$class.swallowDebug(this, action);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging$class.info(this, msg);
    }

    @Override
    public Object info(Function0<Throwable> e) {
        return Logging$class.info(this, e);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.info(this, msg, e);
    }

    @Override
    public void swallowInfo(Function0<BoxedUnit> action) {
        Logging$class.swallowInfo(this, action);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging$class.warn(this, msg);
    }

    @Override
    public Object warn(Function0<Throwable> e) {
        return Logging$class.warn(this, e);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.warn(this, msg, e);
    }

    @Override
    public void swallowWarn(Function0<BoxedUnit> action) {
        Logging$class.swallowWarn(this, action);
    }

    @Override
    public void swallow(Function0<BoxedUnit> action) {
        Logging$class.swallow(this, action);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging$class.error(this, msg);
    }

    @Override
    public Object error(Function0<Throwable> e) {
        return Logging$class.error(this, e);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.error(this, msg, e);
    }

    @Override
    public void swallowError(Function0<BoxedUnit> action) {
        Logging$class.swallowError(this, action);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging$class.fatal(this, msg);
    }

    @Override
    public Object fatal(Function0<Throwable> e) {
        return Logging$class.fatal(this, e);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging$class.fatal(this, msg, e);
    }

    private ControllerContext controllerContext() {
        return this.controllerContext;
    }

    private HashMap<Object, ControllerBrokerStateInfo> brokerStateInfo() {
        return this.brokerStateInfo;
    }

    private Object brokerLock() {
        return this.brokerLock;
    }

    public void startup() {
        Object object = this.brokerLock();
        synchronized (object) {
            this.brokerStateInfo().foreach((Function1)new Serializable(this){
                public static final long serialVersionUID;
                private final ControllerChannelManager $outer;

                static {
                    long l = serialVersionUID = 0L;
                }

                public final void apply(Tuple2<Object, ControllerBrokerStateInfo> brokerState) {
                    this.$outer.kafka$controller$ControllerChannelManager$$startRequestSendThread(brokerState._1$mcI$sp());
                }
                {
                    if ($outer == null) {
                        throw new NullPointerException();
                    }
                    this.$outer = $outer;
                }
            });
            return;
        }
    }

    public void shutdown() {
        Object object = this.brokerLock();
        synchronized (object) {
            this.brokerStateInfo().foreach((Function1)new Serializable(this){
                public static final long serialVersionUID;
                private final ControllerChannelManager $outer;

                static {
                    long l = serialVersionUID = 0L;
                }

                public final void apply(Tuple2<Object, ControllerBrokerStateInfo> brokerState) {
                    this.$outer.kafka$controller$ControllerChannelManager$$removeExistingBroker(brokerState._1$mcI$sp());
                }
                {
                    if ($outer == null) {
                        throw new NullPointerException();
                    }
                    this.$outer = $outer;
                }
            });
            return;
        }
    }

    public void sendRequest(int brokerId$1, RequestOrResponse request$1, Function1<RequestOrResponse, BoxedUnit> callback) {
        Object object = this.brokerLock();
        synchronized (object) {
            Option option;
            block7: {
                block6: {
                    BoxedUnit boxedUnit;
                    block5: {
                        Option stateInfoOpt;
                        option = stateInfoOpt = this.brokerStateInfo().get((Object)BoxesRunTime.boxToInteger((int)brokerId$1));
                        if (!(option instanceof Some)) break block5;
                        ((ControllerBrokerStateInfo)((Some)option).x()).messageQueue().put((Tuple2<RequestOrResponse, Function1<RequestOrResponse, BoxedUnit>>)new Tuple2((Object)request$1, callback));
                        boxedUnit = BoxedUnit.UNIT;
                        break block6;
                    }
                    None$ none$ = None$.MODULE$;
                    Option option2 = option;
                    if (none$ != null ? !none$.equals(option2) : option2 != null) break block7;
                    this.warn((Function0<String>)new Serializable(this, brokerId$1, request$1){
                        public static final long serialVersionUID;
                        private final int brokerId$1;
                        private final RequestOrResponse request$1;

                        static {
                            long l = serialVersionUID = 0L;
                        }

                        public final String apply() {
                            return Predef$.MODULE$.augmentString("Not sending request %s to broker %d, since it is offline.").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.request$1, BoxesRunTime.boxToInteger((int)this.brokerId$1)}));
                        }
                        {
                            this.brokerId$1 = n;
                            this.request$1 = requestOrResponse;
                        }
                    });
                    boxedUnit = BoxedUnit.UNIT;
                }
                return;
            }
            throw new MatchError((Object)option);
        }
    }

    public Function1 sendRequest$default$3() {
        return null;
    }

    public void addBroker(Broker broker) {
        Object object = this.brokerLock();
        synchronized (object) {
            BoxedUnit boxedUnit;
            if (this.brokerStateInfo().contains((Object)BoxesRunTime.boxToInteger((int)broker.id()))) {
                boxedUnit = BoxedUnit.UNIT;
            } else {
                this.kafka$controller$ControllerChannelManager$$addNewBroker(broker);
                this.kafka$controller$ControllerChannelManager$$startRequestSendThread(broker.id());
                boxedUnit = BoxedUnit.UNIT;
            }
            return;
        }
    }

    public void removeBroker(int brokerId) {
        Object object = this.brokerLock();
        synchronized (object) {
            this.kafka$controller$ControllerChannelManager$$removeExistingBroker(brokerId);
            return;
        }
    }

    public final void kafka$controller$ControllerChannelManager$$addNewBroker(Broker broker$2) {
        LinkedBlockingQueue<Tuple2<RequestOrResponse, Function1<RequestOrResponse, BoxedUnit>>> messageQueue = new LinkedBlockingQueue<Tuple2<RequestOrResponse, Function1<RequestOrResponse, BoxedUnit>>>(this.kafka$controller$ControllerChannelManager$$config.controllerMessageQueueSize());
        this.debug((Function0<String>)new Serializable(this, broker$2){
            public static final long serialVersionUID;
            private final ControllerChannelManager $outer;
            private final Broker broker$2;

            static {
                long l = serialVersionUID = 0L;
            }

            public final String apply() {
                return Predef$.MODULE$.augmentString("Controller %d trying to connect to broker %d").format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.$outer.kafka$controller$ControllerChannelManager$$config.brokerId()), BoxesRunTime.boxToInteger((int)this.broker$2.id())}));
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
                this.broker$2 = broker;
            }
        });
        BlockingChannel channel = new BlockingChannel(broker$2.host(), broker$2.port(), BlockingChannel$.MODULE$.UseDefaultBufferSize(), BlockingChannel$.MODULE$.UseDefaultBufferSize(), this.kafka$controller$ControllerChannelManager$$config.controllerSocketTimeoutMs());
        RequestSendThread requestThread = new RequestSendThread(this.kafka$controller$ControllerChannelManager$$config.brokerId(), this.controllerContext(), broker$2, messageQueue, channel);
        requestThread.setDaemon(false);
        this.brokerStateInfo().put((Object)BoxesRunTime.boxToInteger((int)broker$2.id()), (Object)new ControllerBrokerStateInfo(channel, broker$2, messageQueue, requestThread));
    }

    public final void kafka$controller$ControllerChannelManager$$removeExistingBroker(int brokerId) {
        try {
            ((ControllerBrokerStateInfo)this.brokerStateInfo().apply((Object)BoxesRunTime.boxToInteger((int)brokerId))).channel().disconnect();
            ((ControllerBrokerStateInfo)this.brokerStateInfo().apply((Object)BoxesRunTime.boxToInteger((int)brokerId))).messageQueue().clear();
            ((ControllerBrokerStateInfo)this.brokerStateInfo().apply((Object)BoxesRunTime.boxToInteger((int)brokerId))).requestSendThread().shutdown();
            this.brokerStateInfo().remove((Object)BoxesRunTime.boxToInteger((int)brokerId));
        }
        catch (Throwable throwable) {
            this.error((Function0<String>)new Serializable(this){
                public static final long serialVersionUID;

                static {
                    long l = serialVersionUID = 0L;
                }

                public final String apply() {
                    return "Error while removing broker by the controller";
                }
            }, (Function0<Throwable>)new Serializable(this, throwable){
                public static final long serialVersionUID;
                private final Throwable e$1;

                static {
                    long l = serialVersionUID = 0L;
                }

                public final Throwable apply() {
                    return this.e$1;
                }
                {
                    this.e$1 = throwable;
                }
            });
        }
    }

    public final void kafka$controller$ControllerChannelManager$$startRequestSendThread(int brokerId) {
        RequestSendThread requestThread = ((ControllerBrokerStateInfo)this.brokerStateInfo().apply((Object)BoxesRunTime.boxToInteger((int)brokerId))).requestSendThread();
        Thread.State state = requestThread.getState();
        Thread.State state2 = Thread.State.NEW;
        if (!(state != null ? !((Object)((Object)state)).equals((Object)state2) : state2 != null)) {
            requestThread.start();
        }
    }

    public ControllerChannelManager(ControllerContext controllerContext, KafkaConfig config) {
        this.controllerContext = controllerContext;
        this.kafka$controller$ControllerChannelManager$$config = config;
        Logging$class.$init$(this);
        this.brokerStateInfo = new HashMap();
        this.brokerLock = new Object();
        this.logIdent_$eq(new StringBuilder().append((Object)"[Channel manager on controller ").append((Object)BoxesRunTime.boxToInteger((int)config.brokerId())).append((Object)"]: ").toString());
        controllerContext.liveBrokers().foreach((Function1)new Serializable(this){
            public static final long serialVersionUID;
            private final ControllerChannelManager $outer;

            static {
                long l = serialVersionUID = 0L;
            }

            public final void apply(Broker broker) {
                this.$outer.kafka$controller$ControllerChannelManager$$addNewBroker(broker);
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
            }
        });
    }
}

