package org.dinky.shaded.paimon.utils;

import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.dinky.shaded.paimon.data.RandomAccessInputView;
import org.dinky.shaded.paimon.data.SimpleCollectingOutputView;
import org.dinky.shaded.paimon.data.serializer.Serializer;
import org.dinky.shaded.paimon.memory.ArraySegmentPool;
import org.dinky.shaded.paimon.memory.MemorySegment;
import org.dinky.shaded.paimon.reader.RecordReader;

/* loaded from: input_file:org/dinky/shaded/paimon/utils/ParallelExecution.class */
public class ParallelExecution<T, E> implements Closeable {
    private final Serializer<T> serializer;
    private final BlockingQueue<MemorySegment> idlePages;
    private final BlockingQueue<ParallelBatch<T, E>> results;
    private final ExecutorService executorService;
    private final AtomicReference<Throwable> exception;
    private final CountDownLatch latch;

    /* loaded from: input_file:org/dinky/shaded/paimon/utils/ParallelExecution$ParallelBatch.class */
    public interface ParallelBatch<T, E> {
        @Nullable
        T next() throws IOException;

        void releaseBatch();

        E extraMesage();
    }

    public ParallelExecution(Serializer<T> serializer, int i, int i2, List<Supplier<Pair<RecordReader<T>, E>>> list) {
        this.serializer = serializer;
        int i3 = i2 * 2;
        this.idlePages = new ArrayBlockingQueue(i3);
        for (int i4 = 0; i4 < i3; i4++) {
            this.idlePages.add(MemorySegment.allocateHeapMemory(i));
        }
        this.executorService = new ThreadPoolExecutor(i2, i2, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ExecutorThreadFactory(Thread.currentThread().getName() + "-parallel"));
        this.results = new LinkedBlockingQueue();
        this.exception = new AtomicReference<>();
        this.latch = new CountDownLatch(list.size());
        for (Supplier<Pair<RecordReader<T>, E>> supplier : list) {
            Serializer<T> duplicate2 = this.serializer.duplicate2();
            this.executorService.submit(() -> {
                asyncRead(supplier, duplicate2);
            });
        }
    }

    @Nullable
    public ParallelBatch<T, E> take() throws InterruptedException, IOException {
        ParallelBatch<T, E> poll;
        do {
            if (this.latch.getCount() == 0 && this.results.isEmpty()) {
                return null;
            }
            poll = this.results.poll(2L, TimeUnit.SECONDS);
            if (this.exception.get() != null) {
                throw new IOException(this.exception.get());
            }
        } while (poll == null);
        return poll;
    }

    private void asyncRead(Supplier<Pair<RecordReader<T>, E>> supplier, Serializer<T> serializer) {
        Pair<RecordReader<T>, E> pair = supplier.get();
        try {
            CloseableIterator<T> closeableIterator = pair.getLeft().toCloseableIterator();
            Throwable th = null;
            int i = 0;
            SimpleCollectingOutputView simpleCollectingOutputView = null;
            while (closeableIterator.hasNext()) {
                try {
                    try {
                        T next = closeableIterator.next();
                        while (true) {
                            if (simpleCollectingOutputView == null) {
                                simpleCollectingOutputView = newOutputView();
                                i = 0;
                            }
                            try {
                                serializer.serialize(next, simpleCollectingOutputView);
                                i++;
                                break;
                            } catch (EOFException e) {
                                sendToResults(simpleCollectingOutputView, i, pair.getRight());
                                simpleCollectingOutputView = null;
                            }
                        }
                    } catch (Throwable th2) {
                        th = th2;
                        throw th2;
                    }
                } finally {
                }
            }
            if (simpleCollectingOutputView != null) {
                sendToResults(simpleCollectingOutputView, i, pair.getRight());
            }
            this.latch.countDown();
            if (closeableIterator != null) {
                if (0 != 0) {
                    try {
                        closeableIterator.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                } else {
                    closeableIterator.close();
                }
            }
        } catch (Throwable th4) {
            this.exception.set(th4);
        }
    }

    private SimpleCollectingOutputView newOutputView() throws InterruptedException {
        MemorySegment take = this.idlePages.take();
        return new SimpleCollectingOutputView(new ArraySegmentPool(Collections.singletonList(take)), take.size());
    }

    private void sendToResults(SimpleCollectingOutputView simpleCollectingOutputView, int i, E e) {
        this.results.add(iterator(simpleCollectingOutputView.getCurrentSegment(), i, e));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.executorService.shutdownNow();
    }

    private ParallelBatch<T, E> iterator(final MemorySegment memorySegment, final int i, final E e) {
        final RandomAccessInputView randomAccessInputView = new RandomAccessInputView(new ArrayList(Collections.singletonList(memorySegment)), memorySegment.size());
        return new ParallelBatch<T, E>() { // from class: org.dinky.shaded.paimon.utils.ParallelExecution.1
            int numReturn = 0;

            @Override // org.dinky.shaded.paimon.utils.ParallelExecution.ParallelBatch
            @Nullable
            public T next() throws IOException {
                if (this.numReturn >= i) {
                    return null;
                }
                this.numReturn++;
                return (T) ParallelExecution.this.serializer.deserialize(randomAccessInputView);
            }

            @Override // org.dinky.shaded.paimon.utils.ParallelExecution.ParallelBatch
            public void releaseBatch() {
                ParallelExecution.this.idlePages.add(memorySegment);
            }

            @Override // org.dinky.shaded.paimon.utils.ParallelExecution.ParallelBatch
            public E extraMesage() {
                return (E) e;
            }
        };
    }
}
