/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.hologres.client;

import com.alibaba.hologres.client.exception.HoloClientException;
import com.alibaba.hologres.client.impl.RecordReader;
import com.alibaba.hologres.client.model.ExportContext;
import com.alibaba.hologres.client.model.Record;
import com.alibaba.hologres.client.model.TableSchema;
import com.alibaba.hologres.org.postgresql.jdbc.TimestampUtils;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class RecordInputFormat {
    private final TableSchema schema;
    private final TimestampUtils timestampUtils;
    private final ExportContext exportContext;
    int threadSize;
    AtomicInteger numOpened;
    BlockingQueue<Record> queue;
    ExecutorService threadPool = Executors.newCachedThreadPool();

    public RecordInputFormat(ExportContext exportContext, TableSchema schema) throws IOException {
        this.exportContext = exportContext;
        this.schema = schema;
        this.threadSize = exportContext.getThreadSize();
        this.numOpened = new AtomicInteger(this.threadSize);
        this.timestampUtils = exportContext.getTimestampUtils();
        this.queue = new ArrayBlockingQueue<Record>(1024);
        this.start();
    }

    public Record getRecord() {
        while (this.numOpened.get() > 0 || !this.queue.isEmpty()) {
            Record r;
            try {
                r = this.queue.poll(100L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                return null;
            }
            if (r == null) continue;
            return r;
        }
        this.threadPool.shutdown();
        return null;
    }

    private void start() {
        for (int i = 0; i < this.threadSize; ++i) {
            this.threadPool.execute(new RecordReader(this.exportContext.getInputStream(i), this.schema, this.queue, this.numOpened, this.timestampUtils));
        }
    }

    public void cancel() throws HoloClientException {
        this.threadPool.shutdownNow();
        this.numOpened.set(0);
        this.exportContext.cancel();
    }
}

