/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.batch;

import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.log.ClientLogger;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageExt;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageQueue;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.logging.InternalLogger;
import java.util.Collection;
import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.PriorityBlockingQueue;

public class TopicCache {
    private static final InternalLogger LOGGER = ClientLogger.getLog();
    private final String topic;
    private final ConcurrentMap<MessageQueue, PriorityBlockingQueue<MessageExt>> cache;

    public TopicCache(String topic) {
        this.topic = topic;
        this.cache = new ConcurrentHashMap<MessageQueue, PriorityBlockingQueue<MessageExt>>();
    }

    public int size() {
        int total = 0;
        for (Map.Entry entry : this.cache.entrySet()) {
            total += ((PriorityBlockingQueue)entry.getValue()).size();
        }
        return total;
    }

    public void put(Collection<MessageExt> messages) {
        for (MessageExt message : messages) {
            if (!this.topic.equals(message.getTopic())) {
                LOGGER.warn("Trying to put an message with mismatched topic name, expect: {}, actual: {}, actualFQN: {}", this.topic, message.getTopic(), ConsumeMessageConcurrentlyService.topicOf(message));
                continue;
            }
            MessageQueue messageQueue = new MessageQueue(ConsumeMessageConcurrentlyService.topicOf(message), message.getBrokerName(), message.getQueueId());
            if (!this.cache.containsKey(messageQueue)) {
                PriorityBlockingQueue<MessageExt> cacheItem = new PriorityBlockingQueue<MessageExt>(128, new Comparator<MessageExt>(){

                    @Override
                    public int compare(MessageExt o1, MessageExt o2) {
                        long rhs;
                        long lhs = o1.getQueueOffset();
                        return lhs < (rhs = o2.getQueueOffset()) ? -1 : (lhs == rhs ? 0 : 1);
                    }
                });
                this.cache.putIfAbsent(messageQueue, cacheItem);
            }
            ((PriorityBlockingQueue)this.cache.get(messageQueue)).add(message);
        }
    }

    public boolean take(int count, Collection<MessageExt> collection) {
        if (this.size() < count) {
            return false;
        }
        int remain = count;
        while (remain > 0) {
            MessageExt candidate = null;
            PriorityBlockingQueue targetQueue = null;
            for (Map.Entry entry : this.cache.entrySet()) {
                PriorityBlockingQueue messages = (PriorityBlockingQueue)entry.getValue();
                if (null == candidate) {
                    candidate = (MessageExt)messages.peek();
                    targetQueue = messages;
                    continue;
                }
                MessageExt challenger = (MessageExt)messages.peek();
                if (null == challenger || challenger.getDecodedTime() >= candidate.getDecodedTime()) continue;
                candidate = challenger;
                targetQueue = messages;
            }
            if (null == candidate) continue;
            collection.add((MessageExt)targetQueue.poll());
            --remain;
        }
        return true;
    }

    public long elapsed() {
        long earliest;
        long current = earliest = System.currentTimeMillis();
        for (Map.Entry entry : this.cache.entrySet()) {
            long decodedTime;
            MessageExt message = (MessageExt)((PriorityBlockingQueue)entry.getValue()).peek();
            if (null == message || (decodedTime = message.getDecodedTime()) >= earliest) continue;
            earliest = decodedTime;
        }
        return current - earliest;
    }
}

