package org.apache.doris.flink.sink.writer;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/doris/flink/sink/writer/CacheRecordBuffer.class */
public class CacheRecordBuffer extends RecordBuffer {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) CacheRecordBuffer.class);
    BlockingDeque<ByteBuffer> bufferCache;
    LinkedBlockingQueue<ByteBuffer> bufferPool;

    public CacheRecordBuffer(int i, int i2) {
        super(i, i2);
        this.bufferCache = new LinkedBlockingDeque();
        this.bufferPool = new LinkedBlockingQueue<>();
    }

    @Override // org.apache.doris.flink.sink.writer.RecordBuffer
    public void startBufferData() throws IOException {
        LOG.info("start buffer data, read queue size {}, write queue size {}, buffer cache size {}, buffer pool size {}", Integer.valueOf(this.readQueue.size()), Integer.valueOf(this.writeQueue.size()), Integer.valueOf(this.bufferCache.size()), Integer.valueOf(this.bufferPool.size()));
        try {
            if (this.currentReadBuffer != null && this.currentReadBuffer.limit() != 0) {
                this.currentReadBuffer.rewind();
                this.readQueue.putFirst(this.currentReadBuffer);
                this.currentReadBuffer = null;
            }
            ByteBuffer pollFirst = this.bufferCache.pollFirst();
            while (pollFirst != null) {
                pollFirst.rewind();
                this.readQueue.putFirst(pollFirst);
                pollFirst = this.bufferCache.pollFirst();
            }
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    @Override // org.apache.doris.flink.sink.writer.RecordBuffer
    public int read(byte[] bArr) throws InterruptedException {
        if (this.currentReadBuffer == null) {
            this.currentReadBuffer = this.readQueue.take();
        }
        if (this.currentReadBuffer.limit() == 0) {
            Preconditions.checkState(this.readQueue.size() == 0);
            this.bufferCache.putFirst(this.currentReadBuffer);
            this.writeQueue.offer(allocate());
            this.currentReadBuffer = null;
            return -1;
        }
        int min = Math.min(this.currentReadBuffer.remaining(), bArr.length);
        this.currentReadBuffer.get(bArr, 0, min);
        if (this.currentReadBuffer.remaining() == 0) {
            this.bufferCache.putFirst(this.currentReadBuffer);
            this.writeQueue.offer(allocate());
            this.currentReadBuffer = null;
        }
        return min;
    }

    public void recycleCache() {
        Preconditions.checkState(this.readQueue.size() == 0);
        ByteBuffer poll = this.bufferCache.poll();
        while (true) {
            ByteBuffer byteBuffer = poll;
            if (byteBuffer == null) {
                return;
            }
            byteBuffer.clear();
            this.bufferPool.add(byteBuffer);
            poll = this.bufferCache.poll();
        }
    }

    private ByteBuffer allocate() {
        ByteBuffer poll = this.bufferPool.poll();
        return poll != null ? poll : ByteBuffer.allocate(this.bufferCapacity);
    }

    @VisibleForTesting
    public int getBufferCacheSize() {
        return this.bufferCache.size();
    }

    @VisibleForTesting
    public int getBufferPoolSize() {
        return this.bufferPool.size();
    }
}
