/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime.distributed;

import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.distributed.ConnectAssignor;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocol;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility;
import org.apache.kafka.connect.runtime.distributed.EagerAssignor;
import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment;
import org.apache.kafka.connect.runtime.distributed.ExtendedWorkerState;
import org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor;
import org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol;
import org.apache.kafka.connect.runtime.distributed.WorkerRebalanceListener;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;

public class WorkerCoordinator
extends AbstractCoordinator
implements Closeable {
    public static final String DEFAULT_SUBPROTOCOL = "default";
    private final Logger log;
    private final String restUrl;
    private final ConfigBackingStore configStorage;
    private volatile ExtendedAssignment assignmentSnapshot;
    private ClusterConfigState configSnapshot;
    private final WorkerRebalanceListener listener;
    private final ConnectProtocolCompatibility protocolCompatibility;
    private LeaderState leaderState;
    private boolean rejoinRequested;
    private volatile ConnectProtocolCompatibility currentConnectProtocol;
    private volatile int lastCompletedGenerationId;
    private final ConnectAssignor eagerAssignor;
    private final ConnectAssignor incrementalAssignor;
    private final int coordinatorDiscoveryTimeoutMs;

    public WorkerCoordinator(GroupRebalanceConfig config, LogContext logContext, ConsumerNetworkClient client, Metrics metrics, String metricGrpPrefix, Time time, String restUrl, ConfigBackingStore configStorage, WorkerRebalanceListener listener, ConnectProtocolCompatibility protocolCompatibility, int maxDelay) {
        super(config, logContext, client, metrics, metricGrpPrefix, time);
        this.log = logContext.logger(WorkerCoordinator.class);
        this.restUrl = restUrl;
        this.configStorage = configStorage;
        this.assignmentSnapshot = null;
        new WorkerCoordinatorMetrics(metrics, metricGrpPrefix);
        this.listener = listener;
        this.rejoinRequested = false;
        this.protocolCompatibility = protocolCompatibility;
        this.incrementalAssignor = new IncrementalCooperativeAssignor(logContext, time, maxDelay);
        this.eagerAssignor = new EagerAssignor(logContext);
        this.currentConnectProtocol = protocolCompatibility;
        this.coordinatorDiscoveryTimeoutMs = config.heartbeatIntervalMs;
        this.lastCompletedGenerationId = AbstractCoordinator.Generation.NO_GENERATION.generationId;
    }

    public void requestRejoin() {
        this.rejoinRequested = true;
    }

    public String protocolType() {
        return "connect";
    }

    protected synchronized boolean ensureCoordinatorReady(Timer timer) {
        return super.ensureCoordinatorReady(timer);
    }

    public void poll(long timeout) {
        long elapsed;
        long remaining;
        long start;
        long now = start = this.time.milliseconds();
        do {
            if (this.coordinatorUnknown()) {
                this.log.debug("Broker coordinator is marked unknown. Attempting discovery with a timeout of {}ms", (Object)this.coordinatorDiscoveryTimeoutMs);
                if (this.ensureCoordinatorReady(this.time.timer((long)this.coordinatorDiscoveryTimeoutMs))) {
                    this.log.debug("Broker coordinator is ready");
                } else {
                    this.log.debug("Can not connect to broker coordinator");
                    ExtendedAssignment localAssignmentSnapshot = this.assignmentSnapshot;
                    if (localAssignmentSnapshot != null && !localAssignmentSnapshot.failed()) {
                        this.log.info("Broker coordinator was unreachable for {}ms. Revoking previous assignment {} to avoid running tasks while not being a member the group", (Object)this.coordinatorDiscoveryTimeoutMs, (Object)localAssignmentSnapshot);
                        this.listener.onRevoked(localAssignmentSnapshot.leader(), localAssignmentSnapshot.connectors(), localAssignmentSnapshot.tasks());
                        this.assignmentSnapshot = null;
                    }
                }
                now = this.time.milliseconds();
            }
            if (this.rejoinNeededOrPending()) {
                this.ensureActiveGroup();
                now = this.time.milliseconds();
            }
            this.pollHeartbeat(now);
            elapsed = now - start;
            remaining = timeout - elapsed;
            long pollTimeout = Math.min(Math.max(0L, remaining), this.timeToNextHeartbeat(now));
            this.client.poll(this.time.timer(pollTimeout));
        } while ((remaining = timeout - (elapsed = (now = this.time.milliseconds()) - start)) > 0L);
    }

    public JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata() {
        this.configSnapshot = this.configStorage.snapshot();
        ExtendedAssignment localAssignmentSnapshot = this.assignmentSnapshot;
        ExtendedWorkerState workerState = new ExtendedWorkerState(this.restUrl, this.configSnapshot.offset(), localAssignmentSnapshot);
        switch (this.protocolCompatibility) {
            case EAGER: {
                return ConnectProtocol.metadataRequest(workerState);
            }
            case COMPATIBLE: {
                return IncrementalCooperativeConnectProtocol.metadataRequest(workerState, false);
            }
            case SESSIONED: {
                return IncrementalCooperativeConnectProtocol.metadataRequest(workerState, true);
            }
        }
        throw new IllegalStateException("Unknown Connect protocol compatibility mode " + (Object)((Object)this.protocolCompatibility));
    }

    protected void onJoinComplete(int generation, String memberId, String protocol, ByteBuffer memberAssignment) {
        ExtendedAssignment newAssignment = IncrementalCooperativeConnectProtocol.deserializeAssignment(memberAssignment);
        this.log.debug("Deserialized new assignment: {}", (Object)newAssignment);
        this.currentConnectProtocol = ConnectProtocolCompatibility.fromProtocol(protocol);
        this.rejoinRequested = false;
        if (this.currentConnectProtocol != ConnectProtocolCompatibility.EAGER) {
            ExtendedAssignment localAssignmentSnapshot;
            if (!newAssignment.revokedConnectors().isEmpty() || !newAssignment.revokedTasks().isEmpty()) {
                this.listener.onRevoked(newAssignment.leader(), newAssignment.revokedConnectors(), newAssignment.revokedTasks());
            }
            if ((localAssignmentSnapshot = this.assignmentSnapshot) != null) {
                localAssignmentSnapshot.connectors().removeAll(newAssignment.revokedConnectors());
                localAssignmentSnapshot.tasks().removeAll(newAssignment.revokedTasks());
                this.log.debug("After revocations snapshot of assignment: {}", (Object)localAssignmentSnapshot);
                newAssignment.connectors().addAll(localAssignmentSnapshot.connectors());
                newAssignment.tasks().addAll(localAssignmentSnapshot.tasks());
            }
            this.log.debug("Augmented new assignment: {}", (Object)newAssignment);
        }
        this.assignmentSnapshot = newAssignment;
        this.lastCompletedGenerationId = generation;
        this.listener.onAssigned(newAssignment, generation);
    }

    protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata) {
        return ConnectProtocolCompatibility.fromProtocol(protocol) == ConnectProtocolCompatibility.EAGER ? this.eagerAssignor.performAssignment(leaderId, protocol, allMemberMetadata, this) : this.incrementalAssignor.performAssignment(leaderId, protocol, allMemberMetadata, this);
    }

    protected void onJoinPrepare(int generation, String memberId) {
        this.log.info("Rebalance started");
        this.leaderState(null);
        ExtendedAssignment localAssignmentSnapshot = this.assignmentSnapshot;
        if (this.currentConnectProtocol == ConnectProtocolCompatibility.EAGER) {
            this.log.debug("Revoking previous assignment {}", (Object)localAssignmentSnapshot);
            if (localAssignmentSnapshot != null && !localAssignmentSnapshot.failed()) {
                this.listener.onRevoked(localAssignmentSnapshot.leader(), localAssignmentSnapshot.connectors(), localAssignmentSnapshot.tasks());
            }
        } else {
            this.log.debug("Cooperative rebalance triggered. Keeping assignment {} until it's explicitly revoked.", (Object)localAssignmentSnapshot);
        }
    }

    protected boolean rejoinNeededOrPending() {
        ExtendedAssignment localAssignmentSnapshot = this.assignmentSnapshot;
        return super.rejoinNeededOrPending() || localAssignmentSnapshot == null || localAssignmentSnapshot.failed() || this.rejoinRequested;
    }

    public String memberId() {
        AbstractCoordinator.Generation generation = this.generationIfStable();
        if (generation != null) {
            return generation.memberId;
        }
        return "";
    }

    public int generationId() {
        return super.generation().generationId;
    }

    public int lastCompletedGenerationId() {
        return this.lastCompletedGenerationId;
    }

    public void revokeAssignment(ExtendedAssignment assignment) {
        this.listener.onRevoked(assignment.leader(), assignment.connectors(), assignment.tasks());
    }

    private boolean isLeader() {
        ExtendedAssignment localAssignmentSnapshot = this.assignmentSnapshot;
        return localAssignmentSnapshot != null && this.memberId().equals(localAssignmentSnapshot.leader());
    }

    public String ownerUrl(String connector) {
        if (this.rejoinNeededOrPending() || !this.isLeader()) {
            return null;
        }
        return this.leaderState().ownerUrl(connector);
    }

    public String ownerUrl(ConnectorTaskId task) {
        if (this.rejoinNeededOrPending() || !this.isLeader()) {
            return null;
        }
        return this.leaderState().ownerUrl(task);
    }

    public ClusterConfigState configFreshSnapshot() {
        return this.configStorage.snapshot();
    }

    public ClusterConfigState configSnapshot() {
        return this.configSnapshot;
    }

    public void configSnapshot(ClusterConfigState update) {
        this.configSnapshot = update;
    }

    private LeaderState leaderState() {
        return this.leaderState;
    }

    public void leaderState(LeaderState update) {
        this.leaderState = update;
    }

    public short currentProtocolVersion() {
        return this.currentConnectProtocol.protocolVersion();
    }

    public static <K, V> Map<V, K> invertAssignment(Map<K, Collection<V>> assignment) {
        HashMap<V, K> inverted = new HashMap<V, K>();
        for (Map.Entry<K, Collection<V>> assignmentEntry : assignment.entrySet()) {
            K key = assignmentEntry.getKey();
            for (V value : assignmentEntry.getValue()) {
                inverted.put(value, key);
            }
        }
        return inverted;
    }

    public static class WorkerLoad {
        private final String worker;
        private final Collection<String> connectors;
        private final Collection<ConnectorTaskId> tasks;

        private WorkerLoad(String worker, Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
            this.worker = worker;
            this.connectors = connectors;
            this.tasks = tasks;
        }

        public String worker() {
            return this.worker;
        }

        public Collection<String> connectors() {
            return this.connectors;
        }

        public Collection<ConnectorTaskId> tasks() {
            return this.tasks;
        }

        public int connectorsSize() {
            return this.connectors.size();
        }

        public int tasksSize() {
            return this.tasks.size();
        }

        public void assign(String connector) {
            this.connectors.add(connector);
        }

        public void assign(ConnectorTaskId task) {
            this.tasks.add(task);
        }

        public int size() {
            return this.connectors.size() + this.tasks.size();
        }

        public boolean isEmpty() {
            return this.connectors.isEmpty() && this.tasks.isEmpty();
        }

        public static Comparator<WorkerLoad> connectorComparator() {
            return (left, right) -> {
                int res = left.connectors.size() - right.connectors.size();
                return res != 0 ? res : (left.worker == null ? (right.worker == null ? 0 : -1) : left.worker.compareTo(right.worker));
            };
        }

        public static Comparator<WorkerLoad> taskComparator() {
            return (left, right) -> {
                int res = left.tasks.size() - right.tasks.size();
                return res != 0 ? res : (left.worker == null ? (right.worker == null ? 0 : -1) : left.worker.compareTo(right.worker));
            };
        }

        public String toString() {
            return "{ worker=" + this.worker + ", connectorIds=" + this.connectors + ", taskIds=" + this.tasks + '}';
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof WorkerLoad)) {
                return false;
            }
            WorkerLoad that = (WorkerLoad)o;
            return this.worker.equals(that.worker) && this.connectors.equals(that.connectors) && this.tasks.equals(that.tasks);
        }

        public int hashCode() {
            return Objects.hash(this.worker, this.connectors, this.tasks);
        }

        public static class Builder {
            private String withWorker;
            private Collection<String> withConnectors;
            private Collection<ConnectorTaskId> withTasks;

            public Builder(String worker) {
                this.withWorker = Objects.requireNonNull(worker, "worker cannot be null");
            }

            public Builder withCopies(Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
                this.withConnectors = new ArrayList<String>(Objects.requireNonNull(connectors, "connectors may be empty but not null"));
                this.withTasks = new ArrayList<ConnectorTaskId>(Objects.requireNonNull(tasks, "tasks may be empty but not null"));
                return this;
            }

            public Builder with(Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
                this.withConnectors = Objects.requireNonNull(connectors, "connectors may be empty but not null");
                this.withTasks = Objects.requireNonNull(tasks, "tasks may be empty but not null");
                return this;
            }

            public WorkerLoad build() {
                return new WorkerLoad(this.withWorker, this.withConnectors != null ? this.withConnectors : new ArrayList(), this.withTasks != null ? this.withTasks : new ArrayList());
            }
        }
    }

    public static class ConnectorsAndTasks {
        public static final ConnectorsAndTasks EMPTY = new ConnectorsAndTasks(Collections.emptyList(), Collections.emptyList());
        private final Collection<String> connectors;
        private final Collection<ConnectorTaskId> tasks;

        private ConnectorsAndTasks(Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
            this.connectors = connectors;
            this.tasks = tasks;
        }

        public Collection<String> connectors() {
            return this.connectors;
        }

        public Collection<ConnectorTaskId> tasks() {
            return this.tasks;
        }

        public int size() {
            return this.connectors.size() + this.tasks.size();
        }

        public boolean isEmpty() {
            return this.connectors.isEmpty() && this.tasks.isEmpty();
        }

        public String toString() {
            return "{ connectorIds=" + this.connectors + ", taskIds=" + this.tasks + '}';
        }

        public static class Builder {
            private Collection<String> withConnectors;
            private Collection<ConnectorTaskId> withTasks;

            public Builder withCopies(Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
                this.withConnectors = new ArrayList<String>(connectors);
                this.withTasks = new ArrayList<ConnectorTaskId>(tasks);
                return this;
            }

            public Builder with(Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
                this.withConnectors = new ArrayList<String>(connectors);
                this.withTasks = new ArrayList<ConnectorTaskId>(tasks);
                return this;
            }

            public ConnectorsAndTasks build() {
                return new ConnectorsAndTasks(this.withConnectors != null ? this.withConnectors : new ArrayList(), this.withTasks != null ? this.withTasks : new ArrayList());
            }
        }
    }

    public static class LeaderState {
        private final Map<String, ExtendedWorkerState> allMembers;
        private final Map<String, String> connectorOwners;
        private final Map<ConnectorTaskId, String> taskOwners;

        public LeaderState(Map<String, ExtendedWorkerState> allMembers, Map<String, Collection<String>> connectorAssignment, Map<String, Collection<ConnectorTaskId>> taskAssignment) {
            this.allMembers = allMembers;
            this.connectorOwners = WorkerCoordinator.invertAssignment(connectorAssignment);
            this.taskOwners = WorkerCoordinator.invertAssignment(taskAssignment);
        }

        private String ownerUrl(ConnectorTaskId id) {
            String ownerId = this.taskOwners.get(id);
            if (ownerId == null) {
                return null;
            }
            return this.allMembers.get(ownerId).url();
        }

        private String ownerUrl(String connector) {
            String ownerId = this.connectorOwners.get(connector);
            if (ownerId == null) {
                return null;
            }
            return this.allMembers.get(ownerId).url();
        }
    }

    private class WorkerCoordinatorMetrics {
        public final String metricGrpName;

        public WorkerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
            this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
            Measurable numConnectors = new Measurable(){

                public double measure(MetricConfig config, long now) {
                    ExtendedAssignment localAssignmentSnapshot = WorkerCoordinator.this.assignmentSnapshot;
                    if (localAssignmentSnapshot == null) {
                        return 0.0;
                    }
                    return localAssignmentSnapshot.connectors().size();
                }
            };
            Measurable numTasks = new Measurable(){

                public double measure(MetricConfig config, long now) {
                    ExtendedAssignment localAssignmentSnapshot = WorkerCoordinator.this.assignmentSnapshot;
                    if (localAssignmentSnapshot == null) {
                        return 0.0;
                    }
                    return localAssignmentSnapshot.tasks().size();
                }
            };
            metrics.addMetric(metrics.metricName("assigned-connectors", this.metricGrpName, "The number of connector instances currently assigned to this consumer"), numConnectors);
            metrics.addMetric(metrics.metricName("assigned-tasks", this.metricGrpName, "The number of tasks currently assigned to this consumer"), numTasks);
        }
    }
}

