/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.test.listener;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.test.clientinterface.MQCollector;
import org.apache.rocketmq.test.util.TestUtil;

public class AbstractListener
extends MQCollector
implements MessageListener {
    public static final Logger LOGGER = LoggerFactory.getLogger(AbstractListener.class);
    protected boolean isDebug = true;
    protected String listenerName = null;
    protected Collection<Object> allSendMsgs = null;

    public AbstractListener() {
    }

    public AbstractListener(String listenerName) {
        this.listenerName = listenerName;
    }

    public AbstractListener(String originMsgCollector, String msgBodyCollector) {
        super(originMsgCollector, msgBodyCollector);
    }

    public boolean isDebug() {
        return this.isDebug;
    }

    public void setDebug(boolean debug) {
        this.isDebug = debug;
    }

    public void waitForMessageConsume(int timeoutMills) {
        TestUtil.waitForMonment(timeoutMills);
    }

    public void stopRecv() {
        super.lockCollectors();
    }

    public Collection<Object> waitForMessageConsume(Collection<Object> allSendMessages, int timeoutMills) {
        this.allSendMsgs = allSendMessages;
        ArrayList<Object> sendMessages = new ArrayList<Object>(allSendMessages);
        long curTime = System.currentTimeMillis();
        while (!sendMessages.isEmpty()) {
            sendMessages.removeIf(msg -> this.msgBodys.getAllData().contains(msg));
            if (sendMessages.isEmpty()) break;
            if (System.currentTimeMillis() - curTime >= (long)timeoutMills) {
                LOGGER.error(String.format("timeout but [%s] not recv all send messages!", this.listenerName));
                break;
            }
            LOGGER.info(String.format("[%s] still [%s] msg not recv!", this.listenerName, sendMessages.size()));
            TestUtil.waitForMonment(500L);
        }
        return sendMessages;
    }

    public long waitForMessageConsume(int size, int timeoutMills) {
        long curTime = System.currentTimeMillis();
        while (this.msgBodys.getDataSize() < (long)size) {
            if (System.currentTimeMillis() - curTime >= (long)timeoutMills) {
                LOGGER.error(String.format("timeout but  [%s]  not recv all send messages!", this.listenerName));
                break;
            }
            LOGGER.info(String.format("[%s] still [%s] msg not recv!", this.listenerName, (long)size - this.msgBodys.getDataSize()));
            TestUtil.waitForMonment(500L);
        }
        return this.msgBodys.getDataSize();
    }

    public void waitForMessageConsume(Map<Object, Object> sendMsgIndex, int timeoutMills) {
        Collection<Object> notRecvMsgs = this.waitForMessageConsume(sendMsgIndex.keySet(), timeoutMills);
        for (Object object : notRecvMsgs) {
            LOGGER.info("{}", sendMsgIndex.get(object));
        }
    }
}

