package com.tencent.cloud.tsf.lane.instrument.kafka;

import com.tencent.cloud.tsf.lane.config.TsfLaneKafkaConfiguration;
import com.tencent.cloud.tsf.lane.service.LaneService;
import com.tencent.cloud.tsf.lane.service.TsfLaneIdHolder;
import com.tencent.cloud.tsf.lane.sync.TsfActiveLane;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.annotation.Order;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.Message;
import org.springframework.tsf.core.context.TsfCoreContextHolder;
import org.springframework.tsf.core.context.TsfCrossContextHolder;

@Aspect
@Order(1)
/* loaded from: input_file:com/tencent/cloud/tsf/lane/instrument/kafka/KafkaLaneAspect.class */
public class KafkaLaneAspect {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaLaneAspect.class);
    private static final String TSF_LANE_ID = "tsf_laneId";

    @Autowired
    private TsfLaneKafkaConfiguration tsfLaneKafkaConfiguration;

    @Autowired
    private TsfActiveLane tsfActiveLane;

    @Pointcut("execution(* org.springframework.kafka.core.KafkaTemplate.send(..))")
    private void producerPointcut() {
    }

    @Around("producerPointcut()")
    public Object aroundProducerMessage(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
        ProducerRecord producerRecord;
        byte[] bArr;
        Object[] args = proceedingJoinPoint.getArgs();
        KafkaTemplate kafkaTemplate = (KafkaTemplate) proceedingJoinPoint.getTarget();
        if (!this.tsfLaneKafkaConfiguration.getLaneOn().booleanValue()) {
            return proceedingJoinPoint.proceed(args);
        }
        LaneService.getInstance().checkAndRecordLane();
        String threadLaneId = getThreadLaneId();
        if (StringUtils.isBlank(threadLaneId)) {
            return proceedingJoinPoint.proceed(args);
        }
        LOG.debug("kafka producer lane before, args: {}, thread laneId: {}", args, threadLaneId);
        try {
            if (args.length == 1) {
                if (args[0] instanceof Message) {
                    Message message = (Message) args[0];
                    producerRecord = kafkaTemplate.getMessageConverter().fromMessage(message, kafkaTemplate.getDefaultTopic());
                    if (!producerRecord.headers().iterator().hasNext() && (bArr = (byte[]) message.getHeaders().get("kafka_correlationId", byte[].class)) != null) {
                        producerRecord.headers().add("kafka_correlationId", bArr);
                    }
                } else {
                    producerRecord = (ProducerRecord) args[0];
                }
            } else if (args.length == 2) {
                producerRecord = new ProducerRecord((String) args[0], args[1]);
            } else if (args.length == 3) {
                producerRecord = new ProducerRecord((String) args[0], args[1], args[2]);
            } else if (args.length == 4) {
                producerRecord = new ProducerRecord((String) args[0], (Integer) args[1], args[2], args[3]);
            } else {
                if (args.length != 5) {
                    LOG.error("KafkaTemplate send message with wrong args: {}", args);
                    return proceedingJoinPoint.proceed(args);
                }
                producerRecord = new ProducerRecord((String) args[0], (Integer) args[1], (Long) args[2], args[3], args[4]);
            }
            producerRecord.headers().add(new RecordHeader("tsf_laneId", threadLaneId.getBytes(StandardCharsets.UTF_8)));
            LOG.debug("kafka producer lane after, args: {}, laneId: {}", producerRecord, threadLaneId);
            return kafkaTemplate.send(producerRecord);
        } catch (Exception e) {
            LOG.error("add laneId to kafka message error", e);
            return proceedingJoinPoint.proceed(args);
        }
    }

    @Pointcut("@annotation(org.springframework.kafka.annotation.KafkaListener)")
    private void consumerPointcut() {
    }

    @Around("consumerPointcut()")
    public Object aroundConsumerMessage(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
        Object[] args = proceedingJoinPoint.getArgs();
        if (!this.tsfLaneKafkaConfiguration.getLaneOn().booleanValue()) {
            return proceedingJoinPoint.proceed(args);
        }
        List<String> currentGroupLaneIds = TsfLaneIdHolder.getCurrentGroupLaneIds();
        LOG.debug("kafka consumer lane before, args: {}, group laneId: {}", args, currentGroupLaneIds);
        ConsumerRecord consumerRecord = null;
        List<ConsumerRecord> list = null;
        int i = -1;
        Acknowledgment acknowledgment = null;
        for (int i2 = 0; i2 < args.length; i2++) {
            try {
                if (args[i2] instanceof Acknowledgment) {
                    acknowledgment = (Acknowledgment) args[i2];
                } else if (args[i2] instanceof ConsumerRecord) {
                    consumerRecord = (ConsumerRecord) args[i2];
                    i = i2;
                } else if (args[i2] instanceof List) {
                    list = (List) args[i2];
                    i = i2;
                }
            } catch (Exception e) {
                LOG.error("extract laneId from kafka message error", e);
            }
        }
        if (list != null) {
            if (!list.isEmpty() && (list.get(0) instanceof ConsumerRecord)) {
                ArrayList arrayList = new ArrayList();
                for (ConsumerRecord consumerRecord2 : list) {
                    if (ifConsume(getConsumerRecordLaneId(consumerRecord2), currentGroupLaneIds)) {
                        arrayList.add(consumerRecord2);
                    } else if (acknowledgment != null) {
                        acknowledgment.acknowledge();
                    }
                }
                args[i] = arrayList;
            }
            return proceedingJoinPoint.proceed(args);
        }
        if (consumerRecord != null && !ifConsume(getConsumerRecordLaneId(consumerRecord), currentGroupLaneIds)) {
            if (acknowledgment != null) {
                acknowledgment.acknowledge();
            }
            return new Object();
        }
        LOG.debug("kafka consumer lane after, args: {}, group laneId: {}", args, currentGroupLaneIds);
        Object proceed = proceedingJoinPoint.proceed(args);
        TsfLaneIdHolder.removeLaneId();
        TsfLaneIdHolder.removeCrossLaneId();
        return proceed;
    }

    private String getConsumerRecordLaneId(ConsumerRecord consumerRecord) {
        String str = null;
        Iterator it = consumerRecord.headers().headers("tsf_laneId").iterator();
        if (it.hasNext()) {
            str = new String(((Header) it.next()).value(), StandardCharsets.UTF_8);
        }
        return str;
    }

    private boolean ifConsume(String str, List<String> list) {
        if (StringUtils.isEmpty(str)) {
            if (list.isEmpty()) {
                return true;
            }
            return this.tsfLaneKafkaConfiguration.getLaneConsumeMain().booleanValue();
        }
        if (!list.isEmpty()) {
            return list.contains(str);
        }
        if (!((!this.tsfActiveLane.isLaneExist(str)) || (this.tsfLaneKafkaConfiguration.getMainConsumeLane().booleanValue() && this.tsfActiveLane.isLaneExist(str) && !this.tsfActiveLane.isActiveLane(str)))) {
            return false;
        }
        TsfLaneIdHolder.setLaneId(str);
        TsfCrossContextHolder.set(TsfCoreContextHolder.get());
        return true;
    }

    private String getThreadLaneId() {
        String crossLaneId = TsfLaneIdHolder.getCrossLaneId();
        return crossLaneId != null ? crossLaneId : TsfLaneIdHolder.getLaneId();
    }
}
