package com.tencent.tsf.event.common;

import com.tencent.tsf.event.report.EventReport;
import com.tencent.tsf.event.report.EventReportManager;
import com.tencent.tsf.event.request.Event;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/tencent/tsf/event/common/AbstractEventCollector.class */
public abstract class AbstractEventCollector<E extends Event> implements EventCollector<E> {
    protected final Map<String, BlockingQueue<E>> eventQueueMap = new ConcurrentHashMap();
    protected ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    protected List<EventReport> eventReport;
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractEventCollector.class);
    protected static final Integer QUEUE_THRESHOLD = 1000;
    protected static final Integer MAX_BATCH_SIZE = 50;

    protected void preEvent(E e) {
    }

    public AbstractEventCollector(String str, Integer num) {
        this.eventReport = Collections.singletonList(EventReportManager.getEventReport(str, num));
        postConstruct();
    }

    protected void postConstruct() {
        this.scheduledExecutorService.scheduleWithFixedDelay(() -> {
            ((Stream) this.eventQueueMap.entrySet().stream().parallel()).forEach(entry -> {
                while (!((BlockingQueue) entry.getValue()).isEmpty()) {
                    try {
                        ArrayList arrayList = new ArrayList();
                        ((BlockingQueue) entry.getValue()).drainTo(arrayList, MAX_BATCH_SIZE.intValue());
                        this.eventReport.get(0).postEvent(arrayList);
                    } catch (Throwable th) {
                        LOGGER.warn("eventReport post Event fail, ignore this error: {}", th.getMessage());
                    }
                }
            });
        }, 1000L, 1000L, TimeUnit.MILLISECONDS);
    }

    @Override // com.tencent.tsf.event.common.EventCollector
    public void addEvent(E e) {
        preEvent(e);
        try {
            this.eventQueueMap.computeIfAbsent(e.getEvent(), str -> {
                return new LinkedBlockingQueue(QUEUE_THRESHOLD.intValue());
            }).add(e);
            LOGGER.info("Add Event to EventQueue EventType: {}", e.getEvent());
        } catch (Exception e2) {
            LOGGER.error("[EVENT COLLECTOR] add to eventQueue is full. Log this event and drop it. event={}, error={} ", new Object[]{e, e2.getMessage(), e2});
        }
    }
}
