package org.apache.rocketmq.broker.pop;

import com.alibaba.fastjson.JSON;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import java.nio.ByteBuffer;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.pop.PopConsumerRecord;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.exception.ConsumeQueueException;
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.store.pop.PopCheckPoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/broker/pop/PopConsumerService.class */
public class PopConsumerService extends ServiceThread {
    private static final long OFFSET_NOT_EXIST = -1;
    private static final String ROCKSDB_DIRECTORY = "kvStore";
    private final BrokerConfig brokerConfig;
    private final BrokerController brokerController;
    private final PopConsumerCache popConsumerCache;
    private final PopConsumerKVStore popConsumerStore;
    private static final Logger log = LoggerFactory.getLogger("RocketmqPop");
    private static final int[] REWRITE_INTERVALS_IN_SECONDS = {10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200};
    private final AtomicBoolean consumerRunning = new AtomicBoolean(false);
    private final ConcurrentMap<String, AtomicLong> requestCountTable = new ConcurrentHashMap();
    private final AtomicLong currentTime = new AtomicLong(TimeUnit.SECONDS.toMillis(3));
    private final AtomicLong lastCleanupLockTime = new AtomicLong(System.currentTimeMillis());
    private final PopConsumerLockService consumerLockService = new PopConsumerLockService(TimeUnit.MINUTES.toMillis(2));

    public PopConsumerService(BrokerController brokerController) {
        this.brokerController = brokerController;
        this.brokerConfig = brokerController.getBrokerConfig();
        this.popConsumerStore = new PopConsumerRocksdbStore(Paths.get(brokerController.getMessageStoreConfig().getStorePathRootDir(), ROCKSDB_DIRECTORY).toString());
        this.popConsumerCache = this.brokerConfig.isEnablePopBufferMerge() ? new PopConsumerCache(brokerController, this.popConsumerStore, this.consumerLockService, this::revive) : null;
        log.info("PopConsumerService init, buffer={}, rocksdb filePath={}", Boolean.valueOf(this.brokerConfig.isEnablePopBufferMerge()), this.popConsumerStore.getFilePath());
    }

    public boolean isPopShouldStop(String str, String str2, int i) {
        return this.brokerConfig.isEnablePopMessageThreshold() && this.popConsumerCache != null && this.popConsumerCache.getPopInFlightMessageCount(str, str2, i) >= this.brokerConfig.getPopInflightMessageThreshold();
    }

    public long getPendingFilterCount(String str, String str2, int i) {
        try {
            return this.brokerController.getMessageStore().getMaxOffsetInQueue(str2, i) - this.brokerController.getConsumerOffsetManager().queryOffset(str, str2, i);
        } catch (ConsumeQueueException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public GetMessageResult recodeRetryMessage(GetMessageResult getMessageResult, String str, long j, long j2, long j3) {
        if (getMessageResult.getMessageCount() == 0 || getMessageResult.getMessageMapedList().isEmpty()) {
            return getMessageResult;
        }
        GetMessageResult getMessageResult2 = new GetMessageResult(getMessageResult.getMessageCount());
        getMessageResult2.setStatus(GetMessageStatus.FOUND);
        String brokerName = this.brokerConfig.getBrokerName();
        for (SelectMappedBufferResult selectMappedBufferResult : getMessageResult.getMessageMapedList()) {
            List<MessageExt> decodesBatch = MessageDecoder.decodesBatch(selectMappedBufferResult.getByteBuffer(), true, false, true);
            selectMappedBufferResult.release();
            for (MessageExt messageExt : decodesBatch) {
                try {
                    messageExt.getProperties().putIfAbsent("POP_CK", ExtraInfoUtil.buildExtraInfo(j, j2, j3, 0, messageExt.getTopic(), brokerName, messageExt.getQueueId(), messageExt.getQueueOffset()));
                    messageExt.setTopic(str);
                    messageExt.setStoreSize(0);
                    byte[] encode = MessageDecoder.encode(messageExt, false);
                    getMessageResult2.addMessage(new SelectMappedBufferResult(selectMappedBufferResult.getStartOffset(), ByteBuffer.wrap(encode), encode.length, (MappedFile) null));
                } catch (Exception e) {
                    log.error("PopConsumerService exception in recode retry message, topic={}", str, e);
                }
            }
        }
        return getMessageResult2;
    }

    public PopConsumerContext addGetMessageResult(PopConsumerContext popConsumerContext, GetMessageResult getMessageResult, String str, int i, PopConsumerRecord.RetryType retryType, long j) {
        if (getMessageResult.getStatus() == GetMessageStatus.FOUND && !getMessageResult.getMessageQueueOffset().isEmpty()) {
            if (popConsumerContext.isFifo()) {
                setFifoBlocked(popConsumerContext, popConsumerContext.getGroupId(), str, i, getMessageResult.getMessageQueueOffset());
            }
            popConsumerContext.addGetMessageResult(getMessageResult, str, i, retryType, j);
            if (this.brokerConfig.isPopConsumerKVServiceLog()) {
                log.info("PopConsumerService pop, time={}, invisible={}, groupId={}, topic={}, queueId={}, offset={}, attemptId={}", new Object[]{Long.valueOf(popConsumerContext.getPopTime()), Long.valueOf(popConsumerContext.getInvisibleTime()), popConsumerContext.getGroupId(), str, Integer.valueOf(i), getMessageResult.getMessageQueueOffset(), popConsumerContext.getAttemptId()});
            }
        }
        if (!popConsumerContext.isFifo() && getMessageResult.getNextBeginOffset() > OFFSET_NOT_EXIST) {
            this.brokerController.getConsumerOffsetManager().commitPullOffset(popConsumerContext.getClientHost(), popConsumerContext.getGroupId(), str, i, getMessageResult.getNextBeginOffset());
            long nextBeginOffset = getMessageResult.getStatus() == GetMessageStatus.FOUND ? j : getMessageResult.getNextBeginOffset();
            if (this.brokerConfig.isEnablePopBufferMerge() && this.popConsumerCache != null) {
                long minOffsetInCache = this.popConsumerCache.getMinOffsetInCache(popConsumerContext.getGroupId(), str, i);
                if (minOffsetInCache != OFFSET_NOT_EXIST) {
                    nextBeginOffset = minOffsetInCache;
                }
            }
            this.brokerController.getConsumerOffsetManager().commitOffset(popConsumerContext.getClientHost(), popConsumerContext.getGroupId(), str, i, nextBeginOffset);
        }
        return popConsumerContext;
    }

    public Long getPopOffset(String str, String str2, int i) {
        Long queryThenEraseResetOffset = this.brokerController.getConsumerOffsetManager().queryThenEraseResetOffset(str2, str, Integer.valueOf(i));
        if (queryThenEraseResetOffset != null) {
            clearCache(str, str2, i);
            this.brokerController.getConsumerOrderInfoManager().clearBlock(str2, str, i);
            this.brokerController.getConsumerOffsetManager().commitOffset("ResetPopOffset", str, str2, i, queryThenEraseResetOffset.longValue());
        }
        return queryThenEraseResetOffset;
    }

    public CompletableFuture<GetMessageResult> getMessageAsync(String str, String str2, String str3, int i, long j, int i2, MessageFilter messageFilter) {
        Logger logger = log;
        Object[] objArr = new Object[6];
        objArr[0] = str2;
        objArr[1] = str3;
        objArr[2] = Long.valueOf(j);
        objArr[3] = Integer.valueOf(i);
        objArr[4] = Integer.valueOf(i2);
        objArr[5] = Boolean.valueOf(messageFilter != null);
        logger.debug("PopConsumerService getMessageAsync, groupId={}, topicId={}, queueId={}, offset={}, batchSize={}, filter={}", objArr);
        Long popOffset = getPopOffset(str2, str3, i);
        long longValue = popOffset != null ? popOffset.longValue() : j;
        return this.brokerController.getMessageStore().getMessageAsync(str2, str3, i, j, i2, messageFilter).thenCompose(getMessageResult -> {
            if (getMessageResult == null) {
                return CompletableFuture.completedFuture(null);
            }
            if (!GetMessageStatus.OFFSET_TOO_SMALL.equals(getMessageResult.getStatus()) && !GetMessageStatus.OFFSET_OVERFLOW_BADLY.equals(getMessageResult.getStatus()) && !GetMessageStatus.OFFSET_FOUND_NULL.equals(getMessageResult.getStatus())) {
                return CompletableFuture.completedFuture(getMessageResult);
            }
            this.brokerController.getConsumerOffsetManager().commitOffset(str, str2, str3, i, getMessageResult.getNextBeginOffset());
            log.warn("PopConsumerService getMessageAsync, initial offset because store is no correct, groupId={}, topicId={}, queueId={}, batchSize={}, offset={}->{}", new Object[]{str2, str3, Integer.valueOf(i), Integer.valueOf(i2), Long.valueOf(longValue), Long.valueOf(getMessageResult.getNextBeginOffset())});
            return this.brokerController.getMessageStore().getMessageAsync(str2, str3, i, getMessageResult.getNextBeginOffset(), i2, messageFilter);
        }).whenComplete((getMessageResult2, th) -> {
            if (th != null) {
                log.error("Pop getMessageAsync error", th);
            }
        });
    }

    public void setFifoBlocked(PopConsumerContext popConsumerContext, String str, String str2, int i, List<Long> list) {
        this.brokerController.getConsumerOrderInfoManager().update(popConsumerContext.getAttemptId(), false, str2, str, i, popConsumerContext.getPopTime(), popConsumerContext.getInvisibleTime(), list, popConsumerContext.getOrderCountInfoBuilder());
    }

    public boolean isFifoBlocked(PopConsumerContext popConsumerContext, String str, String str2, int i) {
        return this.brokerController.getConsumerOrderInfoManager().checkBlock(popConsumerContext.getAttemptId(), str2, str, i, popConsumerContext.getInvisibleTime());
    }

    protected CompletableFuture<PopConsumerContext> getMessageAsync(CompletableFuture<PopConsumerContext> completableFuture, String str, String str2, String str3, int i, int i2, MessageFilter messageFilter, PopConsumerRecord.RetryType retryType) {
        return completableFuture.thenCompose(popConsumerContext -> {
            if (isPopShouldStop(str2, str3, i)) {
                return CompletableFuture.completedFuture(popConsumerContext);
            }
            if (popConsumerContext.isFifo() && isFifoBlocked(popConsumerContext, str2, str3, i)) {
                return CompletableFuture.completedFuture(popConsumerContext);
            }
            int messageCount = i2 - popConsumerContext.getMessageCount();
            if (messageCount <= 0) {
                popConsumerContext.addRestCount(getPendingFilterCount(str2, str3, i));
                return CompletableFuture.completedFuture(popConsumerContext);
            }
            long queryPullOffset = this.brokerController.getConsumerOffsetManager().queryPullOffset(str2, str3, i);
            return getMessageAsync(str, str2, str3, i, queryPullOffset, messageCount, messageFilter).thenApply(getMessageResult -> {
                return addGetMessageResult(popConsumerContext, getMessageResult, str3, i, retryType, queryPullOffset);
            });
        });
    }

    public CompletableFuture<PopConsumerContext> popAsync(String str, long j, long j2, String str2, String str3, int i, int i2, boolean z, String str4, MessageFilter messageFilter) {
        PopConsumerContext popConsumerContext = new PopConsumerContext(str, j, j2, str2, z, str4);
        TopicConfig selectTopicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(str3);
        if (selectTopicConfig == null || !this.consumerLockService.tryLock(str2, str3)) {
            return CompletableFuture.completedFuture(popConsumerContext);
        }
        log.debug("PopConsumerService popAsync, groupId={}, topicId={}, queueId={}, batchSize={}, invisibleTime={}, fifo={}, attemptId={}, filter={}", new Object[]{str2, str3, Integer.valueOf(i), Integer.valueOf(i2), Long.valueOf(j2), Boolean.valueOf(z), str4, messageFilter});
        String str5 = str2 + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + str3;
        String buildPopRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(str3, str2);
        String buildPopRetryTopicV2 = KeyBuilder.buildPopRetryTopicV2(str3, str2);
        long andIncrement = ((AtomicLong) Objects.requireNonNull(ConcurrentHashMapUtils.computeIfAbsent(this.requestCountTable, str5, str6 -> {
            return new AtomicLong(0L);
        }))).getAndIncrement();
        boolean z2 = andIncrement % 5 == 0;
        CompletableFuture<PopConsumerContext> completedFuture = CompletableFuture.completedFuture(popConsumerContext);
        if (!z && z2) {
            try {
                if (this.brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
                    completedFuture = getMessageAsync(completedFuture, str, str2, buildPopRetryTopicV1, 0, i2, messageFilter, PopConsumerRecord.RetryType.RETRY_TOPIC_V1);
                }
                if (this.brokerConfig.isEnableRetryTopicV2()) {
                    completedFuture = getMessageAsync(completedFuture, str, str2, buildPopRetryTopicV2, 0, i2, messageFilter, PopConsumerRecord.RetryType.RETRY_TOPIC_V2);
                }
            } catch (Throwable th) {
                log.error("PopConsumerService popAsync error", th);
                return completedFuture;
            }
        }
        if (i != -1) {
            completedFuture = getMessageAsync(completedFuture, str, str2, str3, i, i2, messageFilter, PopConsumerRecord.RetryType.NORMAL_TOPIC);
        } else {
            for (int i3 = 0; i3 < selectTopicConfig.getReadQueueNums(); i3++) {
                completedFuture = getMessageAsync(completedFuture, str, str2, str3, (int) ((andIncrement + i3) % selectTopicConfig.getReadQueueNums()), i2, messageFilter, PopConsumerRecord.RetryType.NORMAL_TOPIC);
            }
            if (!z && !z2) {
                if (this.brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
                    completedFuture = getMessageAsync(completedFuture, str, str2, buildPopRetryTopicV1, 0, i2, messageFilter, PopConsumerRecord.RetryType.RETRY_TOPIC_V1);
                }
                if (this.brokerConfig.isEnableRetryTopicV2()) {
                    completedFuture = getMessageAsync(completedFuture, str, str2, buildPopRetryTopicV2, 0, i2, messageFilter, PopConsumerRecord.RetryType.RETRY_TOPIC_V2);
                }
            }
        }
        return completedFuture.thenCompose(popConsumerContext2 -> {
            if (popConsumerContext2.isFound() && !popConsumerContext2.isFifo()) {
                if (!this.brokerConfig.isEnablePopBufferMerge() || this.popConsumerCache == null || this.popConsumerCache.isCacheFull()) {
                    this.popConsumerStore.writeRecords(popConsumerContext2.getPopConsumerRecordList());
                } else {
                    this.popConsumerCache.writeRecords(popConsumerContext2.getPopConsumerRecordList());
                }
                for (int i4 = 0; i4 < popConsumerContext2.getGetMessageResultList().size(); i4++) {
                    GetMessageResult getMessageResult = popConsumerContext2.getGetMessageResultList().get(i4);
                    PopConsumerRecord popConsumerRecord = popConsumerContext2.getPopConsumerRecordList().get(i4);
                    if (this.brokerConfig.isPopResponseReturnActualRetryTopic() && popConsumerRecord.isRetry()) {
                        popConsumerContext2.getGetMessageResultList().set(i4, recodeRetryMessage(getMessageResult, popConsumerRecord.getTopicId(), popConsumerRecord.getQueueId(), popConsumerContext2.getPopTime(), j2));
                    }
                }
            }
            return CompletableFuture.completedFuture(popConsumerContext2);
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) (popConsumerContext3, th2) -> {
            if (th2 != null) {
                try {
                    log.error("PopConsumerService popAsync get message error", th2 instanceof CompletionException ? th2.getCause() : th2);
                } finally {
                    this.consumerLockService.unlock(str2, str3);
                }
            }
            if (popConsumerContext3.getMessageCount() > 0) {
                log.debug("PopConsumerService popAsync result, found={}, groupId={}, topicId={}, queueId={}, batchSize={}, invisibleTime={}, fifo={}, attemptId={}, filter={}", new Object[]{Integer.valueOf(popConsumerContext3.getMessageCount()), str2, str3, Integer.valueOf(i), Integer.valueOf(i2), Long.valueOf(j2), Boolean.valueOf(z), str4, messageFilter});
            }
        });
    }

    public CompletableFuture<Boolean> ackAsync(long j, long j2, String str, String str2, int i, long j3) {
        if (this.brokerConfig.isPopConsumerKVServiceLog()) {
            log.info("PopConsumerService ack, time={}, invisible={}, groupId={}, topic={}, queueId={}, offset={}", new Object[]{Long.valueOf(j), Long.valueOf(j2), str, str2, Integer.valueOf(i), Long.valueOf(j3)});
        }
        PopConsumerRecord popConsumerRecord = new PopConsumerRecord(j, str, str2, i, 0, j2, j3, null);
        if (this.brokerConfig.isEnablePopBufferMerge() && this.popConsumerCache != null && this.popConsumerCache.deleteRecords(Collections.singletonList(popConsumerRecord)).isEmpty()) {
            return CompletableFuture.completedFuture(true);
        }
        this.popConsumerStore.deleteRecords(Collections.singletonList(popConsumerRecord));
        return CompletableFuture.completedFuture(true);
    }

    public void changeInvisibilityDuration(long j, long j2, long j3, long j4, String str, String str2, int i, long j5) {
        if (this.brokerConfig.isPopConsumerKVServiceLog()) {
            log.info("PopConsumerService change, time={}, invisible={}, groupId={}, topic={}, queueId={}, offset={}, new time={}, new invisible={}", new Object[]{Long.valueOf(j), Long.valueOf(j2), str, str2, Integer.valueOf(i), Long.valueOf(j5), Long.valueOf(j3), Long.valueOf(j4)});
        }
        PopConsumerRecord popConsumerRecord = new PopConsumerRecord(j3, str, str2, i, 0, j4, j5, null);
        PopConsumerRecord popConsumerRecord2 = new PopConsumerRecord(j, str, str2, i, 0, j2, j5, null);
        this.popConsumerStore.writeRecords(Collections.singletonList(popConsumerRecord));
        if (this.brokerConfig.isEnablePopBufferMerge() && this.popConsumerCache != null && this.popConsumerCache.deleteRecords(Collections.singletonList(popConsumerRecord2)).isEmpty()) {
            return;
        }
        this.popConsumerStore.deleteRecords(Collections.singletonList(popConsumerRecord2));
    }

    public CompletableFuture<Triple<MessageExt, String, Boolean>> getMessageAsync(PopConsumerRecord popConsumerRecord) {
        return this.brokerController.getEscapeBridge().getMessageAsync(popConsumerRecord.getTopicId(), popConsumerRecord.getOffset(), popConsumerRecord.getQueueId(), this.brokerConfig.getBrokerName(), false);
    }

    public CompletableFuture<Boolean> revive(PopConsumerRecord popConsumerRecord) {
        return getMessageAsync(popConsumerRecord).thenCompose(triple -> {
            if (triple == null) {
                log.error("PopConsumerService revive error, message may be lost, record={}", popConsumerRecord);
                return CompletableFuture.completedFuture(false);
            }
            if (triple.getLeft() != null) {
                return CompletableFuture.completedFuture(Boolean.valueOf(reviveRetry(popConsumerRecord, (MessageExt) triple.getLeft())));
            }
            log.info("PopConsumerService revive no need retry, record={}", popConsumerRecord);
            return CompletableFuture.completedFuture(Boolean.valueOf(!((Boolean) triple.getRight()).booleanValue()));
        });
    }

    /* JADX WARN: Code restructure failed: missing block: B:12:0x002c, code lost:
    
        r9 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:14:0x002f, code lost:
    
        r5.consumerLockService.unlock(r6, r7);
     */
    /* JADX WARN: Code restructure failed: missing block: B:15:0x0039, code lost:
    
        throw r9;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void clearCache(java.lang.String r6, java.lang.String r7, int r8) {
        /*
            r5 = this;
        L0:
            r0 = r5
            org.apache.rocketmq.broker.pop.PopConsumerLockService r0 = r0.consumerLockService
            r1 = r6
            r2 = r7
            boolean r0 = r0.tryLock(r1, r2)
            if (r0 == 0) goto Lf
            goto L0
        Lf:
            r0 = r5
            org.apache.rocketmq.broker.pop.PopConsumerCache r0 = r0.popConsumerCache     // Catch: java.lang.Throwable -> L2c
            if (r0 == 0) goto L20
            r0 = r5
            org.apache.rocketmq.broker.pop.PopConsumerCache r0 = r0.popConsumerCache     // Catch: java.lang.Throwable -> L2c
            r1 = r6
            r2 = r7
            r3 = r8
            r0.removeRecords(r1, r2, r3)     // Catch: java.lang.Throwable -> L2c
        L20:
            r0 = r5
            org.apache.rocketmq.broker.pop.PopConsumerLockService r0 = r0.consumerLockService
            r1 = r6
            r2 = r7
            r0.unlock(r1, r2)
            goto L3a
        L2c:
            r9 = move-exception
            r0 = r5
            org.apache.rocketmq.broker.pop.PopConsumerLockService r0 = r0.consumerLockService
            r1 = r6
            r2 = r7
            r0.unlock(r1, r2)
            r0 = r9
            throw r0
        L3a:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.rocketmq.broker.pop.PopConsumerService.clearCache(java.lang.String, java.lang.String, int):void");
    }

    public long revive(AtomicLong atomicLong, int i) {
        Stopwatch createStarted = Stopwatch.createStarted();
        long currentTimeMillis = System.currentTimeMillis() - 50;
        List<PopConsumerRecord> scanExpiredRecords = this.popConsumerStore.scanExpiredRecords(atomicLong.get() - TimeUnit.SECONDS.toMillis(3L), currentTimeMillis, i);
        long elapsed = createStarted.elapsed(TimeUnit.MILLISECONDS);
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        ArrayList arrayList = new ArrayList(scanExpiredRecords.size());
        for (PopConsumerRecord popConsumerRecord : scanExpiredRecords) {
            arrayList.add(revive(popConsumerRecord).thenAccept(bool -> {
                if (bool.booleanValue()) {
                    return;
                }
                if (popConsumerRecord.getAttemptTimes() >= this.brokerConfig.getPopReviveMaxAttemptTimes()) {
                    log.error("PopConsumerService drop record, message may be lost, record={}", popConsumerRecord);
                    return;
                }
                PopConsumerRecord popConsumerRecord2 = new PopConsumerRecord(System.currentTimeMillis(), popConsumerRecord.getGroupId(), popConsumerRecord.getTopicId(), popConsumerRecord.getQueueId(), popConsumerRecord.getRetryFlag(), popConsumerRecord.getInvisibleTime() + (1000 * REWRITE_INTERVALS_IN_SECONDS[Math.min(REWRITE_INTERVALS_IN_SECONDS.length, popConsumerRecord.getAttemptTimes())]), popConsumerRecord.getOffset(), popConsumerRecord.getAttemptId());
                popConsumerRecord2.setAttemptTimes(popConsumerRecord.getAttemptTimes() + 1);
                linkedBlockingQueue.add(popConsumerRecord2);
                log.warn("PopConsumerService revive backoff retry, record={}", popConsumerRecord2);
            }));
        }
        CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).join();
        this.popConsumerStore.writeRecords(new ArrayList(linkedBlockingQueue));
        this.popConsumerStore.deleteRecords(scanExpiredRecords);
        atomicLong.set(scanExpiredRecords.isEmpty() ? currentTimeMillis : scanExpiredRecords.get(scanExpiredRecords.size() - 1).getVisibilityTimeout());
        if (this.brokerConfig.isEnablePopBufferMerge()) {
            log.info("PopConsumerService, key size={}, cache size={}, revive count={}, failure count={}, behindInMillis={}, scanInMillis={}, costInMillis={}", new Object[]{Integer.valueOf(this.popConsumerCache.getCacheKeySize()), Integer.valueOf(this.popConsumerCache.getCacheSize()), Integer.valueOf(scanExpiredRecords.size()), Integer.valueOf(linkedBlockingQueue.size()), Long.valueOf(currentTimeMillis - atomicLong.get()), Long.valueOf(elapsed), Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS))});
        } else {
            log.info("PopConsumerService, revive count={}, failure count={}, behindInMillis={}, scanInMillis={}, costInMillis={}", new Object[]{Integer.valueOf(scanExpiredRecords.size()), Integer.valueOf(linkedBlockingQueue.size()), Long.valueOf(currentTimeMillis - atomicLong.get()), Long.valueOf(elapsed), Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS))});
        }
        return scanExpiredRecords.size();
    }

    public void createRetryTopicIfNeeded(String str, String str2) {
        if (this.brokerController.getTopicConfigManager().selectTopicConfig(str2) != null) {
            return;
        }
        TopicConfig topicConfig = new TopicConfig(str2, 1, 1, 6, 0);
        topicConfig.setTopicFilterType(TopicFilterType.SINGLE_TAG);
        this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
        if (this.brokerController.getConsumerOffsetManager().queryOffset(str, str2, 0) < 0) {
            this.brokerController.getConsumerOffsetManager().commitOffset("InitPopOffset", str, str2, 0, 0L);
        }
    }

    public boolean reviveRetry(PopConsumerRecord popConsumerRecord, MessageExt messageExt) {
        if (this.brokerConfig.isPopConsumerKVServiceLog()) {
            log.info("PopConsumerService revive, time={}, invisible={}, groupId={}, topic={}, queueId={}, offset={}", new Object[]{Long.valueOf(popConsumerRecord.getPopTime()), Long.valueOf(popConsumerRecord.getInvisibleTime()), popConsumerRecord.getGroupId(), popConsumerRecord.getTopicId(), Integer.valueOf(popConsumerRecord.getQueueId()), Long.valueOf(popConsumerRecord.getOffset())});
        }
        String topicId = StringUtils.startsWith(popConsumerRecord.getTopicId(), "%RETRY%") ? popConsumerRecord.getTopicId() : KeyBuilder.buildPopRetryTopic(popConsumerRecord.getTopicId(), popConsumerRecord.getGroupId(), this.brokerConfig.isEnableRetryTopicV2());
        createRetryTopicIfNeeded(popConsumerRecord.getGroupId(), topicId);
        MessageExtBrokerInner messageExtBrokerInner = new MessageExtBrokerInner();
        messageExtBrokerInner.setTopic(topicId);
        messageExtBrokerInner.setBody(messageExt.getBody() != null ? messageExt.getBody() : new byte[0]);
        messageExtBrokerInner.setQueueId(0);
        if (messageExt.getTags() != null) {
            messageExtBrokerInner.setTags(messageExt.getTags());
        } else {
            MessageAccessor.setProperties(messageExtBrokerInner, new HashMap());
        }
        messageExtBrokerInner.setBornTimestamp(messageExt.getBornTimestamp());
        messageExtBrokerInner.setFlag(messageExt.getFlag());
        messageExtBrokerInner.setSysFlag(messageExt.getSysFlag());
        messageExtBrokerInner.setBornHost(this.brokerController.getStoreHost());
        messageExtBrokerInner.setStoreHost(this.brokerController.getStoreHost());
        messageExtBrokerInner.setReconsumeTimes(messageExt.getReconsumeTimes() + 1);
        messageExtBrokerInner.getProperties().putAll(messageExt.getProperties());
        if (messageExt.getReconsumeTimes() == 0 || messageExtBrokerInner.getProperties().get("1ST_POP_TIME") == null) {
            messageExtBrokerInner.getProperties().put("1ST_POP_TIME", String.valueOf(popConsumerRecord.getPopTime()));
        }
        messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(messageExtBrokerInner.getProperties()));
        PutMessageResult putMessageToSpecificQueue = this.brokerController.getEscapeBridge().putMessageToSpecificQueue(messageExtBrokerInner);
        if (putMessageToSpecificQueue.getAppendMessageResult() == null || putMessageToSpecificQueue.getAppendMessageResult().getStatus() != AppendMessageStatus.PUT_OK) {
            log.error("PopConsumerService revive retry msg error, put status={}, ck={}, delay={}ms", new Object[]{putMessageToSpecificQueue, JSON.toJSONString(popConsumerRecord), Long.valueOf(System.currentTimeMillis() - popConsumerRecord.getVisibilityTimeout())});
            return false;
        }
        if (this.brokerController.getBrokerStatsManager() == null) {
            return true;
        }
        this.brokerController.getBrokerStatsManager().incBrokerPutNums(messageExtBrokerInner.getTopic(), 1);
        this.brokerController.getBrokerStatsManager().incTopicPutNums(messageExtBrokerInner.getTopic());
        this.brokerController.getBrokerStatsManager().incTopicPutSize(messageExtBrokerInner.getTopic(), putMessageToSpecificQueue.getAppendMessageResult().getWroteBytes());
        return true;
    }

    public synchronized void transferToFsStore() {
        List<PopConsumerRecord> scanExpiredRecords;
        Stopwatch createStarted = Stopwatch.createStarted();
        while (true) {
            try {
                scanExpiredRecords = this.popConsumerStore.scanExpiredRecords(0L, Long.MAX_VALUE, this.brokerConfig.getPopReviveMaxReturnSizePerRead());
            } catch (Throwable th) {
                log.error("PopConsumerStore transfer from kvStore to fsStore failure", th);
            }
            if (scanExpiredRecords == null || scanExpiredRecords.isEmpty()) {
                break;
            }
            for (PopConsumerRecord popConsumerRecord : scanExpiredRecords) {
                PopCheckPoint popCheckPoint = new PopCheckPoint();
                popCheckPoint.setBitMap(0);
                popCheckPoint.setNum((byte) 1);
                popCheckPoint.setPopTime(popConsumerRecord.getPopTime());
                popCheckPoint.setInvisibleTime(popConsumerRecord.getInvisibleTime());
                popCheckPoint.setStartOffset(popConsumerRecord.getOffset());
                popCheckPoint.setCId(popConsumerRecord.getGroupId());
                popCheckPoint.setTopic(popConsumerRecord.getTopicId());
                popCheckPoint.setQueueId(popConsumerRecord.getQueueId());
                popCheckPoint.setBrokerName(this.brokerConfig.getBrokerName());
                popCheckPoint.addDiff(0);
                popCheckPoint.setRePutTimes(popCheckPoint.getRePutTimes());
                this.brokerController.getMessageStore().asyncPutMessage(this.brokerController.getPopMessageProcessor().buildCkMsg(popCheckPoint, ((int) popConsumerRecord.getOffset()) % this.brokerConfig.getReviveQueueNum())).join();
            }
            log.info("PopConsumerStore transfer from kvStore to fsStore, count={}", Integer.valueOf(scanExpiredRecords.size()));
            this.popConsumerStore.deleteRecords(scanExpiredRecords);
            waitForRunning(1L);
        }
        log.info("PopConsumerStore transfer to fsStore finish, cost={}ms", Long.valueOf(createStarted.elapsed(TimeUnit.MILLISECONDS)));
    }

    public String getServiceName() {
        return PopConsumerService.class.getSimpleName();
    }

    @VisibleForTesting
    protected PopConsumerKVStore getPopConsumerStore() {
        return this.popConsumerStore;
    }

    public PopConsumerLockService getConsumerLockService() {
        return this.consumerLockService;
    }

    public void start() {
        if (!this.popConsumerStore.start()) {
            throw new RuntimeException("PopConsumerStore init error");
        }
        if (this.popConsumerCache != null) {
            this.popConsumerCache.start();
        }
        super.start();
    }

    public void shutdown() {
        super.shutdown();
        do {
            waitForRunning(10L);
        } while (this.consumerRunning.get());
        if (this.popConsumerCache != null) {
            this.popConsumerCache.shutdown();
        }
        if (this.popConsumerStore != null) {
            this.popConsumerStore.shutdown();
        }
    }

    public void run() {
        this.consumerRunning.set(true);
        while (!isStopped()) {
            try {
                long revive = revive(this.currentTime, this.brokerConfig.getPopReviveMaxReturnSizePerRead());
                long currentTimeMillis = System.currentTimeMillis();
                if (this.lastCleanupLockTime.get() + TimeUnit.MINUTES.toMillis(1L) < currentTimeMillis) {
                    this.consumerLockService.removeTimeout();
                    this.lastCleanupLockTime.set(currentTimeMillis);
                }
                if (revive < this.brokerConfig.getPopReviveMaxReturnSizePerRead()) {
                    waitForRunning(500L);
                }
            } catch (Exception e) {
                log.error("PopConsumerService revive error", e);
                waitForRunning(500L);
            }
        }
        this.consumerRunning.set(false);
    }
}
