package com.alibaba.cloud.stream.binder.rocketmq.integration.inbound;

import com.alibaba.cloud.stream.binder.rocketmq.metrics.Instrumentation;
import com.alibaba.cloud.stream.binder.rocketmq.metrics.InstrumentationManager;
import com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties;
import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQMessageConverterSupport;
import com.alibaba.cloud.stream.binder.rocketmq.utils.RocketMQUtils;
import java.util.List;
import java.util.function.Supplier;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.integration.context.OrderlyShutdownCapable;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/alibaba/cloud/stream/binder/rocketmq/integration/inbound/RocketMQInboundChannelAdapter.class */
public class RocketMQInboundChannelAdapter extends MessageProducerSupport implements OrderlyShutdownCapable {
    private static final Logger log = LoggerFactory.getLogger(RocketMQInboundChannelAdapter.class);
    private RetryTemplate retryTemplate;
    private RecoveryCallback<Object> recoveryCallback;
    private DefaultMQPushConsumer pushConsumer;
    private final String topic;
    private final ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties;

    public RocketMQInboundChannelAdapter(String str, ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {
        this.topic = str;
        this.extendedConsumerProperties = extendedConsumerProperties;
    }

    protected void onInit() {
        if (this.extendedConsumerProperties.getExtension() == null || !((RocketMQConsumerProperties) this.extendedConsumerProperties.getExtension()).getEnabled()) {
            return;
        }
        try {
            super.onInit();
            if (this.retryTemplate != null) {
                Assert.state(getErrorChannel() == null, "Cannot have an 'errorChannel' property when a 'RetryTemplate' is provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to send an error message when retries are exhausted");
                this.retryTemplate.registerListener(new RetryListener() { // from class: com.alibaba.cloud.stream.binder.rocketmq.integration.inbound.RocketMQInboundChannelAdapter.1
                    public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) {
                        return true;
                    }

                    public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable th) {
                    }

                    public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable th) {
                    }
                });
            }
            this.pushConsumer = RocketMQConsumerFactory.initPushConsumer(this.extendedConsumerProperties);
            if (((RocketMQConsumerProperties) this.extendedConsumerProperties.getExtension()).getPush().getOrderly()) {
                this.pushConsumer.registerMessageListener((list, consumeOrderlyContext) -> {
                    return (ConsumeOrderlyStatus) consumeMessage(list, () -> {
                        consumeOrderlyContext.setSuspendCurrentQueueTimeMillis(((RocketMQConsumerProperties) this.extendedConsumerProperties.getExtension()).getPush().getSuspendCurrentQueueTimeMillis());
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }, () -> {
                        return ConsumeOrderlyStatus.SUCCESS;
                    });
                });
            } else {
                this.pushConsumer.registerMessageListener((list2, consumeConcurrentlyContext) -> {
                    return (ConsumeConcurrentlyStatus) consumeMessage(list2, () -> {
                        consumeConcurrentlyContext.setDelayLevelWhenNextConsume(((RocketMQConsumerProperties) this.extendedConsumerProperties.getExtension()).getPush().getDelayLevelWhenNextConsume());
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }, () -> {
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    });
                });
            }
        } catch (Exception e) {
            log.error("DefaultMQPushConsumer init failed, Caused by " + e.getMessage());
            throw new MessagingException(MessageBuilder.withPayload("DefaultMQPushConsumer init failed, Caused by " + e.getMessage()).build(), e);
        }
    }

    private <R> R consumeMessage(List<MessageExt> list, Supplier<R> supplier, Supplier<R> supplier2) {
        if (CollectionUtils.isEmpty(list)) {
            throw new MessagingException("DefaultMQPushConsumer consuming failed, Caused by messageExtList is empty");
        }
        for (MessageExt messageExt : list) {
            try {
                Message convertMessage2Spring = RocketMQMessageConverterSupport.convertMessage2Spring(messageExt);
                if (this.retryTemplate != null) {
                    this.retryTemplate.execute(retryContext -> {
                        sendMessage(convertMessage2Spring);
                        return convertMessage2Spring;
                    }, this.recoveryCallback);
                } else {
                    sendMessage(convertMessage2Spring);
                }
            } catch (Exception e) {
                log.warn("consume message failed. messageExt:{}", messageExt, e);
                return supplier.get();
            }
        }
        return supplier2.get();
    }

    protected void doStart() {
        if (this.extendedConsumerProperties.getExtension() == null || !((RocketMQConsumerProperties) this.extendedConsumerProperties.getExtension()).getEnabled()) {
            return;
        }
        Instrumentation instrumentation = new Instrumentation(this.topic, this);
        try {
            try {
                this.pushConsumer.subscribe(this.topic, RocketMQUtils.getMessageSelector(((RocketMQConsumerProperties) this.extendedConsumerProperties.getExtension()).getSubscription()));
                this.pushConsumer.start();
                instrumentation.markStartedSuccessfully();
                InstrumentationManager.addHealthInstrumentation(instrumentation);
            } catch (Exception e) {
                instrumentation.markStartFailed(e);
                log.error("DefaultMQPushConsumer init failed, Caused by " + e.getMessage());
                throw new MessagingException(MessageBuilder.withPayload("DefaultMQPushConsumer init failed, Caused by " + e.getMessage()).build(), e);
            }
        } catch (Throwable th) {
            InstrumentationManager.addHealthInstrumentation(instrumentation);
            throw th;
        }
    }

    protected void doStop() {
        if (this.pushConsumer != null) {
            this.pushConsumer.shutdown();
        }
    }

    public void setRetryTemplate(RetryTemplate retryTemplate) {
        this.retryTemplate = retryTemplate;
    }

    public void setRecoveryCallback(RecoveryCallback<Object> recoveryCallback) {
        this.recoveryCallback = recoveryCallback;
    }

    public int beforeShutdown() {
        stop();
        return 0;
    }

    public int afterShutdown() {
        return 0;
    }
}
