/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.util.Preconditions;

@Internal
public class StreamRecordWriter<T>
extends RecordWriter<T> {
    private static final String DEFAULT_OUTPUT_FLUSH_THREAD_NAME = "OutputFlusher";
    private final OutputFlusher outputFlusher;
    private Throwable flusherException;

    public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector, long timeout) {
        this(writer, channelSelector, timeout, null);
    }

    public StreamRecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector, long timeout, String taskName) {
        super(writer, channelSelector, channelSelector instanceof BroadcastPartitioner, timeout == 0L);
        Preconditions.checkArgument((timeout >= -1L ? 1 : 0) != 0);
        if (this.targetPartition.getResultPartitionType() == ResultPartitionType.BLOCKING || timeout == -1L) {
            this.outputFlusher = null;
        } else if (timeout == 0L) {
            this.outputFlusher = null;
        } else {
            String threadName = taskName == null ? DEFAULT_OUTPUT_FLUSH_THREAD_NAME : "OutputFlusher for " + taskName;
            this.outputFlusher = new OutputFlusher(threadName, timeout);
            this.outputFlusher.start();
        }
    }

    public void emit(T record) throws IOException, InterruptedException {
        this.checkErroneous();
        super.emit(record);
    }

    public void broadcastEmit(T record) throws IOException, InterruptedException {
        this.checkErroneous();
        super.broadcastEmit(record);
    }

    public void randomEmit(T record) throws IOException, InterruptedException {
        this.checkErroneous();
        super.randomEmit(record);
    }

    public void close() {
        this.clearBuffers();
        if (this.outputFlusher != null) {
            this.outputFlusher.terminate();
            try {
                this.outputFlusher.join();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    private void notifyFlusherException(Throwable t) {
        if (this.flusherException == null) {
            this.flusherException = t;
        }
    }

    private void checkErroneous() throws IOException {
        if (this.flusherException != null) {
            throw new IOException("An exception happened while flushing the outputs", this.flusherException);
        }
    }

    private class OutputFlusher
    extends Thread {
        private final long timeout;
        private volatile boolean running;

        OutputFlusher(String name, long timeout) {
            super(name);
            this.running = true;
            this.setDaemon(true);
            this.timeout = timeout;
        }

        public void terminate() {
            this.running = false;
            this.interrupt();
        }

        @Override
        public void run() {
            try {
                while (this.running) {
                    block5: {
                        try {
                            Thread.sleep(this.timeout);
                        }
                        catch (InterruptedException e) {
                            if (!this.running) break block5;
                            throw new Exception(e);
                        }
                    }
                    StreamRecordWriter.this.flushAll();
                }
            }
            catch (Throwable t) {
                StreamRecordWriter.this.notifyFlusherException(t);
            }
        }
    }
}

