package org.apache.iotdb.session.subscription;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.session.subscription.SubscriptionConsumer;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/session/subscription/SubscriptionPullConsumer.class */
public class SubscriptionPullConsumer extends SubscriptionConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionPullConsumer.class);
    private final boolean autoCommit;
    private final long autoCommitIntervalMs;
    private ScheduledExecutorService autoCommitWorkerExecutor;
    private SortedMap<Long, Set<SubscriptionMessage>> uncommittedMessages;
    private final AtomicBoolean isClosed;

    /* loaded from: input_file:org/apache/iotdb/session/subscription/SubscriptionPullConsumer$Builder.class */
    public static class Builder extends SubscriptionConsumer.Builder {
        private boolean autoCommit = true;
        private long autoCommitIntervalMs = 5000;

        @Override // org.apache.iotdb.session.subscription.SubscriptionConsumer.Builder
        public Builder host(String str) {
            super.host(str);
            return this;
        }

        @Override // org.apache.iotdb.session.subscription.SubscriptionConsumer.Builder
        public Builder port(int i) {
            super.port(i);
            return this;
        }

        @Override // org.apache.iotdb.session.subscription.SubscriptionConsumer.Builder
        public Builder nodeUrls(List<String> list) {
            super.nodeUrls(list);
            return this;
        }

        @Override // org.apache.iotdb.session.subscription.SubscriptionConsumer.Builder
        public Builder username(String str) {
            super.username(str);
            return this;
        }

        @Override // org.apache.iotdb.session.subscription.SubscriptionConsumer.Builder
        public Builder password(String str) {
            super.password(str);
            return this;
        }

        @Override // org.apache.iotdb.session.subscription.SubscriptionConsumer.Builder
        public Builder consumerId(String str) {
            super.consumerId(str);
            return this;
        }

        @Override // org.apache.iotdb.session.subscription.SubscriptionConsumer.Builder
        public Builder consumerGroupId(String str) {
            super.consumerGroupId(str);
            return this;
        }

        @Override // org.apache.iotdb.session.subscription.SubscriptionConsumer.Builder
        public Builder heartbeatIntervalMs(long j) {
            super.heartbeatIntervalMs(j);
            return this;
        }

        @Override // org.apache.iotdb.session.subscription.SubscriptionConsumer.Builder
        public Builder endpointsSyncIntervalMs(long j) {
            super.endpointsSyncIntervalMs(j);
            return this;
        }

        public Builder autoCommit(boolean z) {
            this.autoCommit = z;
            return this;
        }

        public Builder autoCommitIntervalMs(long j) {
            this.autoCommitIntervalMs = Math.max(j, 500L);
            return this;
        }

        @Override // org.apache.iotdb.session.subscription.SubscriptionConsumer.Builder
        public SubscriptionPullConsumer buildPullConsumer() {
            return new SubscriptionPullConsumer(this);
        }

        @Override // org.apache.iotdb.session.subscription.SubscriptionConsumer.Builder
        public SubscriptionPushConsumer buildPushConsumer() {
            throw new SubscriptionException("SubscriptionPullConsumer.Builder do not support build push consumer.");
        }

        @Override // org.apache.iotdb.session.subscription.SubscriptionConsumer.Builder
        public /* bridge */ /* synthetic */ SubscriptionConsumer.Builder nodeUrls(List list) {
            return nodeUrls((List<String>) list);
        }
    }

    public SubscriptionPullConsumer(Builder builder) {
        super(builder);
        this.isClosed = new AtomicBoolean(true);
        this.autoCommit = builder.autoCommit;
        this.autoCommitIntervalMs = builder.autoCommitIntervalMs;
    }

    public SubscriptionPullConsumer(Properties properties) {
        this(properties, ((Boolean) properties.getOrDefault("auto-commit", true)).booleanValue(), ((Long) properties.getOrDefault("auto-commit-interval-ms", 5000L)).longValue());
    }

    private SubscriptionPullConsumer(Properties properties, boolean z, long j) {
        super(new Builder().autoCommit(z).autoCommitIntervalMs(j), properties);
        this.isClosed = new AtomicBoolean(true);
        this.autoCommit = z;
        this.autoCommitIntervalMs = j;
    }

    @Override // org.apache.iotdb.session.subscription.SubscriptionConsumer
    public synchronized void open() throws TException, IoTDBConnectionException, IOException, StatementExecutionException {
        if (this.isClosed.get()) {
            super.open();
            if (this.autoCommit) {
                launchAutoCommitWorker();
            }
            this.isClosed.set(false);
        }
    }

    @Override // org.apache.iotdb.session.subscription.SubscriptionConsumer, java.lang.AutoCloseable
    public synchronized void close() throws IoTDBConnectionException {
        if (this.isClosed.get()) {
            return;
        }
        try {
            if (this.autoCommit) {
                shutdownAutoCommitWorker();
                commitAllUncommittedMessages();
            }
            super.close();
        } finally {
            this.isClosed.set(true);
        }
    }

    public List<SubscriptionMessage> poll(Duration duration) throws TException, IOException, StatementExecutionException {
        return poll(Collections.emptySet(), duration.toMillis());
    }

    public List<SubscriptionMessage> poll(long j) throws TException, IOException, StatementExecutionException {
        return poll(Collections.emptySet(), j);
    }

    public List<SubscriptionMessage> poll(Set<String> set, Duration duration) throws TException, IOException, StatementExecutionException {
        return poll(set, duration.toMillis());
    }

    public List<SubscriptionMessage> poll(Set<String> set, long j) throws TException, IOException, StatementExecutionException {
        ArrayList arrayList = new ArrayList();
        acquireReadLock();
        try {
            Iterator<SubscriptionProvider> it = getAllAvailableProviders().iterator();
            while (it.hasNext()) {
                arrayList.addAll(it.next().getSessionConnection().poll(set, j));
            }
            List<SubscriptionMessage> list = (List) arrayList.stream().map(SubscriptionMessage::new).collect(Collectors.toList());
            if (this.autoCommit) {
                long currentTimeMillis = System.currentTimeMillis();
                long j2 = currentTimeMillis / this.autoCommitIntervalMs;
                if (currentTimeMillis % this.autoCommitIntervalMs == 0) {
                    j2--;
                }
                this.uncommittedMessages.computeIfAbsent(Long.valueOf(j2), l -> {
                    return new ConcurrentSkipListSet();
                }).addAll(list);
            }
            return list;
        } finally {
            releaseReadLock();
        }
    }

    public void commitSync(SubscriptionMessage subscriptionMessage) throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
        commitSync(Collections.singletonList(subscriptionMessage));
    }

    public void commitSync(Iterable<SubscriptionMessage> iterable) throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
        HashMap hashMap = new HashMap();
        for (SubscriptionMessage subscriptionMessage : iterable) {
            ((List) ((Map) hashMap.computeIfAbsent(Integer.valueOf(subscriptionMessage.parseDataNodeIdFromSubscriptionCommitId()), num -> {
                return new HashMap();
            })).computeIfAbsent(subscriptionMessage.getTopicName(), str -> {
                return new ArrayList();
            })).add(subscriptionMessage.getSubscriptionCommitId());
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            commitSyncInternal(((Integer) entry.getKey()).intValue(), (Map) entry.getValue());
        }
    }

    private void commitSyncInternal(int i, Map<String, List<String>> map) throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
        acquireReadLock();
        try {
            SubscriptionProvider provider = getProvider(i);
            if (Objects.isNull(provider) || !provider.isAvailable()) {
                throw new IoTDBConnectionException(String.format("something unexpected happened when commit messages to subscription provider with data node id %s, the subscription provider may be unavailable or not existed", Integer.valueOf(i)));
            }
            provider.getSessionConnection().commitSync(map);
            releaseReadLock();
        } catch (Throwable th) {
            releaseReadLock();
            throw th;
        }
    }

    private void launchAutoCommitWorker() {
        this.uncommittedMessages = new ConcurrentSkipListMap();
        this.autoCommitWorkerExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> {
            Thread thread = new Thread(Thread.currentThread().getThreadGroup(), runnable, "PullConsumerAutoCommitWorker", 0L);
            if (!thread.isDaemon()) {
                thread.setDaemon(true);
            }
            if (thread.getPriority() != 5) {
                thread.setPriority(5);
            }
            return thread;
        });
        this.autoCommitWorkerExecutor.scheduleAtFixedRate(new PullConsumerAutoCommitWorker(this), 0L, this.autoCommitIntervalMs, TimeUnit.MILLISECONDS);
    }

    private void shutdownAutoCommitWorker() {
        this.autoCommitWorkerExecutor.shutdown();
        this.autoCommitWorkerExecutor = null;
    }

    private void commitAllUncommittedMessages() {
        for (Map.Entry<Long, Set<SubscriptionMessage>> entry : this.uncommittedMessages.entrySet()) {
            try {
                commitSync(entry.getValue());
                this.uncommittedMessages.remove(entry.getKey());
            } catch (Exception e) {
                LOGGER.warn("something unexpected happened when commit messages during close", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.iotdb.session.subscription.SubscriptionConsumer
    public boolean isClosed() {
        return this.isClosed.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getAutoCommitIntervalMs() {
        return this.autoCommitIntervalMs;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SortedMap<Long, Set<SubscriptionMessage>> getUncommittedMessages() {
        return this.uncommittedMessages;
    }
}
