package org.apache.ratis.server.impl;

import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.leader.LogAppender;
import org.apache.ratis.server.raftlog.RaftLogIndex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/ratis/server/impl/ReadIndexHeartbeats.class */
public class ReadIndexHeartbeats {
    private static final Logger LOG = LoggerFactory.getLogger(ReadIndexHeartbeats.class);
    private final AppendEntriesListeners appendEntriesListeners = new AppendEntriesListeners();
    private final RaftLogIndex ackedCommitIndex = new RaftLogIndex("ackedCommitIndex", -1);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/ratis/server/impl/ReadIndexHeartbeats$AppendEntriesListener.class */
    public static class AppendEntriesListener {
        private final long commitIndex;
        private final CompletableFuture<Long> future = new CompletableFuture<>();
        private final ConcurrentHashMap<RaftPeerId, HeartbeatAck> replies = new ConcurrentHashMap<>();

        /* JADX INFO: Access modifiers changed from: package-private */
        public AppendEntriesListener(long j) {
            this.commitIndex = j;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public CompletableFuture<Long> getFuture() {
            return this.future;
        }

        boolean receive(LogAppender logAppender, RaftProtos.AppendEntriesReplyProto appendEntriesReplyProto, Predicate<Predicate<RaftPeerId>> predicate) {
            if (isCompletedNormally()) {
                return true;
            }
            if (!this.replies.computeIfAbsent(logAppender.getFollowerId(), raftPeerId -> {
                return new HeartbeatAck(logAppender);
            }).receive(appendEntriesReplyProto) || !predicate.test(this::isAcknowledged)) {
                return isCompletedNormally();
            }
            this.future.complete(Long.valueOf(this.commitIndex));
            return true;
        }

        boolean isAcknowledged(RaftPeerId raftPeerId) {
            return Optional.ofNullable(this.replies.get(raftPeerId)).filter((v0) -> {
                return v0.isAcknowledged();
            }).isPresent();
        }

        boolean isCompletedNormally() {
            return (!this.future.isDone() || this.future.isCancelled() || this.future.isCompletedExceptionally()) ? false : true;
        }
    }

    /* loaded from: input_file:org/apache/ratis/server/impl/ReadIndexHeartbeats$AppendEntriesListeners.class */
    class AppendEntriesListeners {
        private final NavigableMap<Long, AppendEntriesListener> sorted = new TreeMap();

        AppendEntriesListeners() {
        }

        synchronized AppendEntriesListener add(long j, Function<Long, AppendEntriesListener> function) {
            return (AppendEntriesListener) this.sorted.computeIfAbsent(Long.valueOf(j), function);
        }

        synchronized void onAppendEntriesReply(LogAppender logAppender, RaftProtos.AppendEntriesReplyProto appendEntriesReplyProto, Predicate<Predicate<RaftPeerId>> predicate) {
            long followerCommit = appendEntriesReplyProto.getFollowerCommit();
            Iterator<Map.Entry<Long, AppendEntriesListener>> it = this.sorted.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Long, AppendEntriesListener> next = it.next();
                if (next.getKey().longValue() > followerCommit) {
                    return;
                }
                AppendEntriesListener value = next.getValue();
                if (value != null && value.receive(logAppender, appendEntriesReplyProto, predicate)) {
                    ReadIndexHeartbeats.this.ackedCommitIndex.updateToMax(value.commitIndex, obj -> {
                        ReadIndexHeartbeats.LOG.debug("{}: {}", this, obj);
                    });
                    it.remove();
                }
            }
        }

        synchronized void failAll(Exception exc) {
            this.sorted.forEach((l, appendEntriesListener) -> {
                appendEntriesListener.getFuture().completeExceptionally(exc);
            });
            this.sorted.clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/ratis/server/impl/ReadIndexHeartbeats$HeartbeatAck.class */
    public static class HeartbeatAck {
        private final LogAppender appender;
        private final long minCallId;
        private volatile boolean acknowledged = false;

        HeartbeatAck(LogAppender logAppender) {
            this.appender = logAppender;
            this.minCallId = logAppender.getCallId();
        }

        boolean isAcknowledged() {
            return this.acknowledged;
        }

        boolean receive(RaftProtos.AppendEntriesReplyProto appendEntriesReplyProto) {
            if (this.acknowledged) {
                return false;
            }
            synchronized (this) {
                if (this.acknowledged || !isValid(appendEntriesReplyProto)) {
                    return false;
                }
                this.acknowledged = true;
                return true;
            }
        }

        private boolean isValid(RaftProtos.AppendEntriesReplyProto appendEntriesReplyProto) {
            return appendEntriesReplyProto != null && appendEntriesReplyProto.getServerReply().getSuccess() && this.appender.getCallIdComparator().compare(Long.valueOf(appendEntriesReplyProto.getServerReply().getCallId()), Long.valueOf(this.minCallId)) >= 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AppendEntriesListener addAppendEntriesListener(long j, Function<Long, AppendEntriesListener> function) {
        if (j <= this.ackedCommitIndex.get()) {
            return null;
        }
        return this.appendEntriesListeners.add(j, function);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onAppendEntriesReply(LogAppender logAppender, RaftProtos.AppendEntriesReplyProto appendEntriesReplyProto, Predicate<Predicate<RaftPeerId>> predicate) {
        this.appendEntriesListeners.onAppendEntriesReply(logAppender, appendEntriesReplyProto, predicate);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void failListeners(Exception exc) {
        this.appendEntriesListeners.failAll(exc);
    }
}
