/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.AcknowledgmentsGroupingTracker;
import org.apache.pulsar.client.impl.BatchMessageAckerDisabled;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.LastCumulativeAck;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.TimedCompletableFuture;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Runnables;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Triple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistentAcknowledgmentsGroupingTracker
implements AcknowledgmentsGroupingTracker {
    private static final Logger log = LoggerFactory.getLogger(PersistentAcknowledgmentsGroupingTracker.class);
    private static final int MAX_ACK_GROUP_SIZE = 1000;
    private final ConsumerImpl<?> consumer;
    private final long acknowledgementGroupTimeMicros;
    private volatile TimedCompletableFuture<Void> currentIndividualAckFuture;
    private volatile TimedCompletableFuture<Void> currentCumulativeAckFuture;
    private final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final ConcurrentSkipListSet<MessageIdImpl> pendingIndividualAcks;
    private final ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable> pendingIndividualBatchIndexAcks;
    private final ScheduledFuture<?> scheduledTask;
    private final boolean batchIndexAckEnabled;
    private final boolean ackReceiptEnabled;

    public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumer, ConsumerConfigurationData<?> conf, EventLoopGroup eventLoopGroup) {
        this.consumer = consumer;
        this.pendingIndividualAcks = new ConcurrentSkipListSet();
        this.pendingIndividualBatchIndexAcks = new ConcurrentHashMap();
        this.acknowledgementGroupTimeMicros = conf.getAcknowledgementsGroupTimeMicros();
        this.batchIndexAckEnabled = conf.isBatchIndexAckEnabled();
        this.ackReceiptEnabled = conf.isAckReceiptEnabled();
        this.currentIndividualAckFuture = new TimedCompletableFuture();
        this.currentCumulativeAckFuture = new TimedCompletableFuture();
        this.scheduledTask = this.acknowledgementGroupTimeMicros > 0L ? eventLoopGroup.next().scheduleWithFixedDelay(Runnables.catchingAndLoggingThrowables(this::flush), this.acknowledgementGroupTimeMicros, this.acknowledgementGroupTimeMicros, TimeUnit.MICROSECONDS) : null;
    }

    @Override
    public boolean isDuplicate(MessageId messageId) {
        MessageIdImpl messageIdOfLastAck = this.lastCumulativeAck.getMessageId();
        if (messageIdOfLastAck != null && messageId.compareTo(messageIdOfLastAck) <= 0) {
            return true;
        }
        return this.pendingIndividualAcks.contains((MessageIdImpl)messageId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds, CommandAck.AckType ackType, Map<String, Long> properties) {
        if (CommandAck.AckType.Cumulative.equals((Object)ackType)) {
            if (this.isAckReceiptEnabled(this.consumer.getClientCnx())) {
                HashSet completableFutureSet = new HashSet();
                messageIds.forEach(messageId -> completableFutureSet.add(this.addAcknowledgment((MessageIdImpl)messageId, ackType, properties)));
                return FutureUtil.waitForAll(new ArrayList(completableFutureSet));
            }
            messageIds.forEach(messageId -> this.addAcknowledgment((MessageIdImpl)messageId, ackType, properties));
            return CompletableFuture.completedFuture(null);
        }
        if (this.isAckReceiptEnabled(this.consumer.getClientCnx())) {
            this.lock.readLock().lock();
            try {
                if (messageIds.size() != 0) {
                    this.addListAcknowledgment(messageIds);
                    TimedCompletableFuture<Void> timedCompletableFuture = this.currentIndividualAckFuture;
                    return timedCompletableFuture;
                }
                CompletableFuture<Object> completableFuture = CompletableFuture.completedFuture(null);
                return completableFuture;
            }
            finally {
                this.lock.readLock().unlock();
                if (this.acknowledgementGroupTimeMicros == 0L || this.pendingIndividualAcks.size() >= 1000) {
                    this.flush();
                }
            }
        }
        this.addListAcknowledgment(messageIds);
        if (this.acknowledgementGroupTimeMicros == 0L || this.pendingIndividualAcks.size() >= 1000) {
            this.flush();
        }
        return CompletableFuture.completedFuture(null);
    }

    private void addListAcknowledgment(List<MessageId> messageIds) {
        for (MessageId messageId : messageIds) {
            this.consumer.onAcknowledge(messageId, null);
            if (messageId instanceof BatchMessageIdImpl) {
                BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl)messageId;
                if (!batchMessageId.ackIndividual()) {
                    this.doIndividualBatchAckAsync((BatchMessageIdImpl)messageId);
                    continue;
                }
                messageId = this.modifyBatchMessageIdAndStatesInConsumer(batchMessageId);
                this.doIndividualAckAsync((MessageIdImpl)messageId);
                continue;
            }
            this.modifyMessageIdStatesInConsumer((MessageIdImpl)messageId);
            this.doIndividualAckAsync((MessageIdImpl)messageId);
        }
    }

    @Override
    public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, CommandAck.AckType ackType, Map<String, Long> properties) {
        if (msgId instanceof BatchMessageIdImpl) {
            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl)msgId;
            if (ackType == CommandAck.AckType.Individual) {
                this.consumer.onAcknowledge(msgId, null);
                if (batchMessageId.ackIndividual()) {
                    MessageIdImpl messageId = this.modifyBatchMessageIdAndStatesInConsumer(batchMessageId);
                    return this.doIndividualAck(messageId, properties);
                }
                if (this.batchIndexAckEnabled) {
                    return this.doIndividualBatchAck(batchMessageId, properties);
                }
                return CompletableFuture.completedFuture(null);
            }
            this.consumer.onAcknowledgeCumulative(msgId, null);
            if (batchMessageId.ackCumulative()) {
                return this.doCumulativeAck(msgId, properties, null);
            }
            if (this.batchIndexAckEnabled) {
                return this.doCumulativeBatchIndexAck(batchMessageId, properties);
            }
            if (CommandAck.AckType.Cumulative == ackType && !batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
                this.doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null);
                batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
            }
            return CompletableFuture.completedFuture(null);
        }
        if (ackType == CommandAck.AckType.Individual) {
            this.consumer.onAcknowledge(msgId, null);
            this.modifyMessageIdStatesInConsumer(msgId);
            return this.doIndividualAck(msgId, properties);
        }
        this.consumer.onAcknowledgeCumulative(msgId, null);
        return this.doCumulativeAck(msgId, properties, null);
    }

    private MessageIdImpl modifyBatchMessageIdAndStatesInConsumer(BatchMessageIdImpl batchMessageId) {
        MessageIdImpl messageId = new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(), batchMessageId.getPartitionIndex());
        this.consumer.getStats().incrementNumAcksSent(batchMessageId.getBatchSize());
        this.clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
        return messageId;
    }

    private void modifyMessageIdStatesInConsumer(MessageIdImpl messageId) {
        this.consumer.getStats().incrementNumAcksSent(1L);
        this.clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
    }

    private void clearMessageIdFromUnAckTrackerAndDeadLetter(MessageIdImpl messageId) {
        this.consumer.getUnAckedMessageTracker().remove(messageId);
        if (this.consumer.getPossibleSendToDeadLetterTopicMessages() != null) {
            this.consumer.getPossibleSendToDeadLetterTopicMessages().remove(messageId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> doIndividualAck(MessageIdImpl messageId, Map<String, Long> properties) {
        if (this.acknowledgementGroupTimeMicros == 0L || properties != null && !properties.isEmpty()) {
            return this.doImmediateAck(messageId, CommandAck.AckType.Individual, properties, null);
        }
        if (this.isAckReceiptEnabled(this.consumer.getClientCnx())) {
            this.lock.readLock().lock();
            try {
                this.doIndividualAckAsync(messageId);
                TimedCompletableFuture<Void> timedCompletableFuture = this.currentIndividualAckFuture;
                return timedCompletableFuture;
            }
            finally {
                this.lock.readLock().unlock();
                if (this.pendingIndividualAcks.size() >= 1000) {
                    this.flush();
                }
            }
        }
        this.doIndividualAckAsync(messageId);
        if (this.pendingIndividualAcks.size() >= 1000) {
            this.flush();
        }
        return CompletableFuture.completedFuture(null);
    }

    private void doIndividualAckAsync(MessageIdImpl messageId) {
        this.pendingIndividualAcks.add(messageId);
        this.pendingIndividualBatchIndexAcks.remove(messageId);
    }

    private CompletableFuture<Void> doIndividualBatchAck(BatchMessageIdImpl batchMessageId, Map<String, Long> properties) {
        if (this.acknowledgementGroupTimeMicros == 0L || properties != null && !properties.isEmpty()) {
            return this.doImmediateBatchIndexAck(batchMessageId, batchMessageId.getBatchIndex(), batchMessageId.getBatchSize(), CommandAck.AckType.Individual, properties);
        }
        return this.doIndividualBatchAck(batchMessageId);
    }

    private CompletableFuture<Void> doIndividualBatchAck(BatchMessageIdImpl batchMessageId) {
        if (this.isAckReceiptEnabled(this.consumer.getClientCnx())) {
            this.lock.readLock().lock();
            try {
                this.doIndividualBatchAckAsync(batchMessageId);
                TimedCompletableFuture<Void> timedCompletableFuture = this.currentIndividualAckFuture;
                return timedCompletableFuture;
            }
            finally {
                this.lock.readLock().unlock();
                if (this.pendingIndividualBatchIndexAcks.size() >= 1000) {
                    this.flush();
                }
            }
        }
        this.doIndividualBatchAckAsync(batchMessageId);
        if (this.pendingIndividualBatchIndexAcks.size() >= 1000) {
            this.flush();
        }
        return CompletableFuture.completedFuture(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> doCumulativeAck(MessageIdImpl messageId, Map<String, Long> properties, BitSetRecyclable bitSet) {
        this.consumer.getStats().incrementNumAcksSent(this.consumer.getUnAckedMessageTracker().removeMessagesTill(messageId));
        if (this.acknowledgementGroupTimeMicros == 0L || properties != null && !properties.isEmpty()) {
            return this.doImmediateAck(messageId, CommandAck.AckType.Cumulative, properties, bitSet);
        }
        if (this.isAckReceiptEnabled(this.consumer.getClientCnx())) {
            this.lock.readLock().lock();
            try {
                this.doCumulativeAckAsync(messageId, bitSet);
                TimedCompletableFuture<Void> timedCompletableFuture = this.currentCumulativeAckFuture;
                return timedCompletableFuture;
            }
            finally {
                this.lock.readLock().unlock();
            }
        }
        this.doCumulativeAckAsync(messageId, bitSet);
        return CompletableFuture.completedFuture(null);
    }

    private void doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) {
        ConcurrentBitSetRecyclable bitSet = this.pendingIndividualBatchIndexAcks.computeIfAbsent(new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(), batchMessageId.getPartitionIndex()), v -> {
            ConcurrentBitSetRecyclable value;
            if (batchMessageId.getAcker() != null && !(batchMessageId.getAcker() instanceof BatchMessageAckerDisabled)) {
                value = ConcurrentBitSetRecyclable.create(batchMessageId.getAcker().getBitSet());
            } else {
                value = ConcurrentBitSetRecyclable.create();
                value.set(0, batchMessageId.getOriginalBatchSize());
            }
            return value;
        });
        bitSet.clear(batchMessageId.getBatchIndex());
    }

    private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable bitSet) {
        this.lastCumulativeAck.update(msgId, bitSet);
    }

    private CompletableFuture<Void> doCumulativeBatchIndexAck(BatchMessageIdImpl batchMessageId, Map<String, Long> properties) {
        if (this.acknowledgementGroupTimeMicros == 0L || properties != null && !properties.isEmpty()) {
            return this.doImmediateBatchIndexAck(batchMessageId, batchMessageId.getBatchIndex(), batchMessageId.getBatchSize(), CommandAck.AckType.Cumulative, properties);
        }
        BitSetRecyclable bitSet = BitSetRecyclable.create();
        bitSet.set(0, batchMessageId.getBatchSize());
        bitSet.clear(0, batchMessageId.getBatchIndex() + 1);
        return this.doCumulativeAck(batchMessageId, null, bitSet);
    }

    private CompletableFuture<Void> doImmediateAck(MessageIdImpl msgId, CommandAck.AckType ackType, Map<String, Long> properties, BitSetRecyclable bitSet) {
        ClientCnx cnx = this.consumer.getClientCnx();
        if (cnx == null) {
            return FutureUtil.failedFuture(new PulsarClientException.ConnectException("Consumer connect fail! consumer state:" + (Object)((Object)this.consumer.getState())));
        }
        return this.newImmediateAckAndFlush(this.consumer.consumerId, msgId, bitSet, ackType, properties, cnx);
    }

    private CompletableFuture<Void> doImmediateBatchIndexAck(BatchMessageIdImpl msgId, int batchIndex, int batchSize, CommandAck.AckType ackType, Map<String, Long> properties) {
        BitSetRecyclable bitSet;
        ClientCnx cnx = this.consumer.getClientCnx();
        if (cnx == null) {
            return FutureUtil.failedFuture(new PulsarClientException.ConnectException("Consumer connect fail! consumer state:" + (Object)((Object)this.consumer.getState())));
        }
        if (msgId.getAcker() != null && !(msgId.getAcker() instanceof BatchMessageAckerDisabled)) {
            bitSet = BitSetRecyclable.valueOf(msgId.getAcker().getBitSet().toLongArray());
        } else {
            bitSet = BitSetRecyclable.create();
            bitSet.set(0, batchSize);
        }
        if (ackType == CommandAck.AckType.Cumulative) {
            bitSet.clear(0, batchIndex + 1);
        } else {
            bitSet.clear(batchIndex);
        }
        CompletableFuture<Void> completableFuture = this.newMessageAckCommandAndWrite(cnx, this.consumer.consumerId, msgId.ledgerId, msgId.entryId, bitSet, ackType, null, properties, true, null, null);
        bitSet.recycle();
        return completableFuture;
    }

    @Override
    public void flush() {
        ClientCnx cnx = this.consumer.getClientCnx();
        if (cnx == null) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Cannot flush pending acks since we're not connected to broker", this.consumer);
            }
            return;
        }
        if (this.isAckReceiptEnabled(this.consumer.getClientCnx())) {
            this.lock.writeLock().lock();
            try {
                this.flushAsync(cnx);
            }
            finally {
                this.lock.writeLock().unlock();
            }
        } else {
            this.flushAsync(cnx);
        }
    }

    private void flushAsync(ClientCnx cnx) {
        LastCumulativeAck lastCumulativeAckToFlush = this.lastCumulativeAck.flush();
        boolean shouldFlush = false;
        if (lastCumulativeAckToFlush != null) {
            shouldFlush = true;
            MessageIdImpl messageId = lastCumulativeAckToFlush.getMessageId();
            this.newMessageAckCommandAndWrite(cnx, this.consumer.consumerId, messageId.getLedgerId(), messageId.getEntryId(), lastCumulativeAckToFlush.getBitSetRecyclable(), CommandAck.AckType.Cumulative, null, Collections.emptyMap(), false, this.currentCumulativeAckFuture, null);
            this.consumer.unAckedChunkedMessageIdSequenceMap.remove(messageId);
        }
        ArrayList<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = new ArrayList<Triple<Long, Long, ConcurrentBitSetRecyclable>>(this.pendingIndividualAcks.size() + this.pendingIndividualBatchIndexAcks.size());
        if (!this.pendingIndividualAcks.isEmpty()) {
            MessageIdImpl msgId;
            if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
                while ((msgId = this.pendingIndividualAcks.pollFirst()) != null) {
                    MessageIdImpl[] chunkMsgIds = (MessageIdImpl[])this.consumer.unAckedChunkedMessageIdSequenceMap.get(msgId);
                    if (chunkMsgIds != null && chunkMsgIds.length > 1) {
                        for (MessageIdImpl cMsgId : chunkMsgIds) {
                            if (cMsgId == null) continue;
                            entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
                        }
                        this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
                        continue;
                    }
                    entriesToAck.add(Triple.of(msgId.getLedgerId(), msgId.getEntryId(), null));
                }
            } else {
                while ((msgId = this.pendingIndividualAcks.pollFirst()) != null) {
                    this.newMessageAckCommandAndWrite(cnx, this.consumer.consumerId, msgId.getLedgerId(), msgId.getEntryId(), null, CommandAck.AckType.Individual, null, Collections.emptyMap(), false, null, null);
                    shouldFlush = true;
                }
            }
        }
        if (!this.pendingIndividualBatchIndexAcks.isEmpty()) {
            Iterator<Map.Entry<MessageIdImpl, ConcurrentBitSetRecyclable>> iterator = this.pendingIndividualBatchIndexAcks.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry<MessageIdImpl, ConcurrentBitSetRecyclable> entry = iterator.next();
                entriesToAck.add(Triple.of(entry.getKey().ledgerId, entry.getKey().entryId, entry.getValue()));
                iterator.remove();
            }
        }
        if (entriesToAck.size() > 0) {
            this.newMessageAckCommandAndWrite(cnx, this.consumer.consumerId, 0L, 0L, null, CommandAck.AckType.Individual, null, null, true, this.currentIndividualAckFuture, entriesToAck);
            shouldFlush = true;
        }
        if (shouldFlush) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Flushing pending acks to broker: last-cumulative-ack: {} -- individual-acks: {} -- individual-batch-index-acks: {}", new Object[]{this.consumer, this.lastCumulativeAck, this.pendingIndividualAcks, entriesToAck});
            }
            cnx.ctx().flush();
        }
    }

    @Override
    public void flushAndClean() {
        this.flush();
        this.lastCumulativeAck.reset();
        this.pendingIndividualAcks.clear();
    }

    @Override
    public void close() {
        this.flush();
        if (this.scheduledTask != null && !this.scheduledTask.isCancelled()) {
            this.scheduledTask.cancel(true);
        }
    }

    private CompletableFuture<Void> newImmediateAckAndFlush(long consumerId, MessageIdImpl msgId, BitSetRecyclable bitSet, CommandAck.AckType ackType, Map<String, Long> map, ClientCnx cnx) {
        CompletableFuture<Void> completableFuture;
        MessageIdImpl[] chunkMsgIds = (MessageIdImpl[])this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
        if (chunkMsgIds != null && ackType != CommandAck.AckType.Cumulative) {
            if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
                ArrayList<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = new ArrayList<Triple<Long, Long, ConcurrentBitSetRecyclable>>(chunkMsgIds.length);
                for (MessageIdImpl cMsgId : chunkMsgIds) {
                    if (cMsgId == null || chunkMsgIds.length <= 1) continue;
                    entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
                }
                completableFuture = this.newMessageAckCommandAndWrite(cnx, this.consumer.consumerId, 0L, 0L, null, ackType, null, null, true, null, entriesToAck);
            } else {
                for (MessageIdImpl cMsgId : chunkMsgIds) {
                    this.newMessageAckCommandAndWrite(cnx, consumerId, cMsgId.getLedgerId(), cMsgId.getEntryId(), bitSet, ackType, null, map, true, null, null);
                }
                completableFuture = CompletableFuture.completedFuture(null);
            }
        } else {
            completableFuture = this.newMessageAckCommandAndWrite(cnx, consumerId, msgId.ledgerId, msgId.getEntryId(), bitSet, ackType, null, map, true, null, null);
        }
        return completableFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> newMessageAckCommandAndWrite(ClientCnx cnx, long consumerId, long ledgerId, long entryId, BitSetRecyclable ackSet, CommandAck.AckType ackType, CommandAck.ValidationError validationError, Map<String, Long> properties, boolean flush, TimedCompletableFuture<Void> timedCompletableFuture, List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck) {
        if (this.isAckReceiptEnabled(this.consumer.getClientCnx())) {
            long requestId = this.consumer.getClient().newRequestId();
            ByteBuf cmd = entriesToAck == null ? Commands.newAck(consumerId, ledgerId, entryId, ackSet, ackType, null, properties, requestId) : Commands.newMultiMessageAck(consumerId, entriesToAck, requestId);
            if (timedCompletableFuture == null) {
                return cnx.newAckForReceipt(cmd, requestId);
            }
            if (ackType == CommandAck.AckType.Individual) {
                this.currentIndividualAckFuture = new TimedCompletableFuture();
            } else {
                this.currentCumulativeAckFuture = new TimedCompletableFuture();
            }
            cnx.newAckForReceiptWithFuture(cmd, requestId, timedCompletableFuture);
            return timedCompletableFuture;
        }
        if (this.ackReceiptEnabled) {
            PersistentAcknowledgmentsGroupingTracker requestId = this;
            synchronized (requestId) {
                if (!this.currentCumulativeAckFuture.isDone()) {
                    this.currentCumulativeAckFuture.complete(null);
                }
                if (!this.currentIndividualAckFuture.isDone()) {
                    this.currentIndividualAckFuture.complete(null);
                }
            }
        }
        ByteBuf cmd = entriesToAck == null ? Commands.newAck(consumerId, ledgerId, entryId, ackSet, ackType, null, properties, -1L) : Commands.newMultiMessageAck(consumerId, entriesToAck, -1L);
        if (flush) {
            cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
        } else {
            cnx.ctx().write(cmd, cnx.ctx().voidPromise());
        }
        return CompletableFuture.completedFuture(null);
    }

    private boolean isAckReceiptEnabled(ClientCnx cnx) {
        return this.ackReceiptEnabled && cnx != null && Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion());
    }
}

