package com.jxdinfo.hussar.support.log.collect;

import com.jxdinfo.hussar.support.log.client.ElasticLowerClient;
import com.jxdinfo.hussar.support.log.core.constant.LogMessageConstant;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;

/* loaded from: input_file:BOOT-INF/lib/hussar-log-starter-0.0.7.jar:com/jxdinfo/hussar/support/log/collect/KafkaLogCollect.class */
public class KafkaLogCollect extends BaseLogCollect {
    private final Logger logger = LoggerFactory.getLogger((Class<?>) KafkaLogCollect.class);
    private KafkaConsumer<String, String> kafkaConsumer;

    public KafkaLogCollect(ElasticLowerClient elasticLowerClient, KafkaConsumer kafkaConsumer, ApplicationEventPublisher applicationEventPublisher) {
        this.elasticLowerClient = elasticLowerClient;
        this.kafkaConsumer = kafkaConsumer;
        this.kafkaConsumer.subscribe(Arrays.asList(LogMessageConstant.LOG_KEY, LogMessageConstant.LOG_KEY_TRACE));
        this.applicationEventPublisher = applicationEventPublisher;
        this.logger.info("kafkaConsumer subscribe ready!");
        this.logger.info("sending log ready!");
    }

    public void kafkaStart() {
        this.threadPoolExecutor.execute(() -> {
            collectRuningLog();
        });
        this.logger.info("KafkaLogCollect is starting!");
    }

    public void collectRuningLog() {
        while (true) {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            try {
                this.kafkaConsumer.poll(Duration.ofMillis(1000L)).forEach(consumerRecord -> {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("get log:" + ((String) consumerRecord.value()) + "  logType:" + consumerRecord.topic());
                    }
                    if (consumerRecord.topic().equals(LogMessageConstant.LOG_KEY)) {
                        arrayList.add(consumerRecord.value());
                    }
                    if (consumerRecord.topic().equals(LogMessageConstant.LOG_KEY_TRACE)) {
                        arrayList2.add(consumerRecord.value());
                    }
                });
            } catch (Exception e) {
                this.logger.error("get logs from kafka failed! ", (Throwable) e);
            }
            if (arrayList.size() > 0) {
                super.sendLog(super.getRunLogIndex(), arrayList);
                publisherMonitorEvent(arrayList);
            }
            if (arrayList2.size() > 0) {
                super.sendTraceLogList(super.getTraceLogIndex(), arrayList2);
            }
        }
    }
}
