package org.apache.seatunnel.connectors.seatunnel.rabbitmq.source;

import com.google.common.base.Preconditions;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.Handover;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.client.RabbitmqClient;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.exception.RabbitmqConnectorException;
import org.apache.seatunnel.connectors.seatunnel.rabbitmq.split.RabbitmqSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/rabbitmq/source/RabbitmqSourceReader.class */
public class RabbitmqSourceReader<T> implements SourceReader<T, RabbitmqSplit> {
    private static final Logger log = LoggerFactory.getLogger(RabbitmqSourceReader.class);
    protected final SourceReader.Context context;
    protected transient Channel channel;
    private final boolean usesCorrelationId;
    protected transient boolean autoAck;
    protected transient Set<String> correlationIdsProcessedButNotAcknowledged;
    protected transient List<Long> deliveryTagsProcessedForCurrentSnapshot;
    private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
    private RabbitmqClient rabbitMQClient;
    private DefaultConsumer consumer;
    private final RabbitmqConfig config;
    protected final Handover<Delivery> handover = new Handover<>();
    protected final SortedMap<Long, List<Long>> pendingDeliveryTagsToCommit = Collections.synchronizedSortedMap(new TreeMap());
    protected final SortedMap<Long, Set<String>> pendingCorrelationIdsToCommit = Collections.synchronizedSortedMap(new TreeMap());

    public RabbitmqSourceReader(DeserializationSchema<SeaTunnelRow> deserializationSchema, SourceReader.Context context, RabbitmqConfig rabbitmqConfig) {
        this.context = context;
        this.deserializationSchema = deserializationSchema;
        this.config = rabbitmqConfig;
        this.rabbitMQClient = new RabbitmqClient(rabbitmqConfig);
        this.channel = this.rabbitMQClient.getChannel();
        this.usesCorrelationId = rabbitmqConfig.isUsesCorrelationId();
    }

    public void open() throws Exception {
        this.correlationIdsProcessedButNotAcknowledged = new HashSet();
        this.deliveryTagsProcessedForCurrentSnapshot = new ArrayList();
        this.consumer = this.rabbitMQClient.getQueueingConsumer(this.handover);
        if (Boundedness.UNBOUNDED.equals(this.context.getBoundedness())) {
            this.autoAck = false;
            this.channel.txSelect();
        } else {
            this.autoAck = true;
        }
        log.debug("Starting RabbitMQ source with autoAck status: " + this.autoAck);
        this.channel.basicConsume(this.config.getQueueName(), this.autoAck, this.consumer);
    }

    public void close() throws IOException {
        if (this.rabbitMQClient != null) {
            this.rabbitMQClient.close();
        }
    }

    public void pollNext(Collector collector) throws Exception {
        Optional pollNext = this.handover.pollNext();
        if (pollNext.isPresent()) {
            Delivery delivery = (Delivery) pollNext.get();
            AMQP.BasicProperties properties = delivery.getProperties();
            String correlationId = Objects.isNull(properties) ? null : properties.getCorrelationId();
            byte[] body = delivery.getBody();
            Envelope envelope = delivery.getEnvelope();
            synchronized (collector.getCheckpointLock()) {
                if (verifyMessageIdentifier(properties.getCorrelationId(), envelope.getDeliveryTag())) {
                    this.deliveryTagsProcessedForCurrentSnapshot.add(Long.valueOf(envelope.getDeliveryTag()));
                    this.deserializationSchema.deserialize(body, collector);
                    if (Boundedness.BOUNDED.equals(this.context.getBoundedness())) {
                        this.context.signalNoMoreElement();
                    }
                }
            }
        }
    }

    public List<RabbitmqSplit> snapshotState(long j) throws Exception {
        List<RabbitmqSplit> singletonList = Collections.singletonList(new RabbitmqSplit(this.deliveryTagsProcessedForCurrentSnapshot, this.correlationIdsProcessedButNotAcknowledged));
        List<Long> computeIfAbsent = this.pendingDeliveryTagsToCommit.computeIfAbsent(Long.valueOf(j), l -> {
            return new ArrayList();
        });
        Set<String> computeIfAbsent2 = this.pendingCorrelationIdsToCommit.computeIfAbsent(Long.valueOf(j), l2 -> {
            return new HashSet();
        });
        for (RabbitmqSplit rabbitmqSplit : singletonList) {
            List<Long> deliveryTags = rabbitmqSplit.getDeliveryTags();
            Set<String> correlationIds = rabbitmqSplit.getCorrelationIds();
            if (deliveryTags != null) {
                computeIfAbsent.addAll(deliveryTags);
            }
            if (correlationIds != null) {
                computeIfAbsent2.addAll(correlationIds);
            }
        }
        this.deliveryTagsProcessedForCurrentSnapshot.clear();
        return singletonList;
    }

    public void addSplits(List list) {
    }

    public void handleNoMoreSplits() {
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        log.debug("Committing cursors for checkpoint {}", Long.valueOf(j));
        List<Long> remove = this.pendingDeliveryTagsToCommit.remove(Long.valueOf(j));
        Set<String> remove2 = this.pendingCorrelationIdsToCommit.remove(Long.valueOf(j));
        if (remove == null || remove2 == null) {
            log.debug("pending delivery tags or correlationIds checkpoint {} either do not exist or have already been committed.", Long.valueOf(j));
            return;
        }
        if (!this.autoAck) {
            acknowledgeDeliveryTags(remove);
        }
        this.correlationIdsProcessedButNotAcknowledged.removeAll(remove2);
    }

    protected void acknowledgeDeliveryTags(List<Long> list) {
        try {
            Iterator<Long> it = list.iterator();
            while (it.hasNext()) {
                this.channel.basicAck(it.next().longValue(), false);
            }
            this.channel.txCommit();
        } catch (IOException e) {
            throw new RabbitmqConnectorException(RabbitmqConnectorErrorCode.MESSAGE_ACK_FAILED, e);
        }
    }

    public boolean verifyMessageIdentifier(String str, long j) {
        if (this.autoAck || !this.usesCorrelationId) {
            return true;
        }
        Preconditions.checkNotNull(str, "RabbitMQ source was instantiated with usesCorrelationId set to true yet we couldn't extract the correlation id from it!");
        if (this.correlationIdsProcessedButNotAcknowledged.add(str)) {
            return true;
        }
        try {
            this.channel.basicReject(j, false);
            return false;
        } catch (IOException e) {
            throw new RabbitmqConnectorException(RabbitmqConnectorErrorCode.MESSAGE_ACK_REJECTED, e);
        }
    }
}
