package org.apache.rocketmq.store.queue;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.store.DispatchRequest;
import org.rocksdb.RocksDBException;

/* loaded from: input_file:org/apache/rocketmq/store/queue/RocksGroupCommitService.class */
public class RocksGroupCommitService extends ServiceThread {
    private static final int MAX_BUFFER_SIZE = 100000;
    private static final int PREFERRED_DISPATCH_REQUEST_COUNT = 256;
    private final RocksDBConsumeQueueStore store;
    private final List<DispatchRequest> requests = new ArrayList(PREFERRED_DISPATCH_REQUEST_COUNT);
    private final LinkedBlockingQueue<DispatchRequest> buffer = new LinkedBlockingQueue<>(MAX_BUFFER_SIZE);

    public RocksGroupCommitService(RocksDBConsumeQueueStore rocksDBConsumeQueueStore) {
        this.store = rocksDBConsumeQueueStore;
    }

    public String getServiceName() {
        return "RocksGroupCommit";
    }

    public void run() {
        log.info("{} service started", getServiceName());
        while (!isStopped()) {
            try {
                waitForRunning(10L);
                doCommit();
            } catch (Exception e) {
                log.warn("{} service has exception. ", getServiceName(), e);
            }
        }
        log.info("{} service end", getServiceName());
    }

    public void putRequest(DispatchRequest dispatchRequest) throws InterruptedException {
        while (!this.buffer.offer(dispatchRequest, 3L, TimeUnit.SECONDS)) {
            log.warn("RocksGroupCommitService#buffer is full, 3s elapsed before space becomes available");
        }
        wakeup();
    }

    private void doCommit() {
        while (!this.buffer.isEmpty()) {
            while (true) {
                DispatchRequest poll = this.buffer.poll();
                if (null != poll) {
                    this.requests.add(poll);
                }
                if (this.requests.isEmpty()) {
                    break;
                } else if (null == poll || this.requests.size() >= PREFERRED_DISPATCH_REQUEST_COUNT) {
                    groupCommit();
                }
            }
        }
    }

    private void groupCommit() {
        while (!this.store.isStopped()) {
            try {
                this.store.putMessagePosition(this.requests);
                return;
            } catch (RocksDBException e) {
                log.error("Failed to build consume queue in RocksDB", e);
            }
        }
    }
}
