/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.fs.hdfs;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.time.Duration;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.runtime.fs.hdfs.HadoopFsRecoverable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;

@Internal
class HadoopRecoverableFsDataOutputStream
extends RecoverableFsDataOutputStream {
    private static final long LEASE_TIMEOUT = 100000L;
    private static Method truncateHandle;
    private final FileSystem fs;
    private final Path targetFile;
    private final Path tempFile;
    private final FSDataOutputStream out;

    HadoopRecoverableFsDataOutputStream(FileSystem fs, Path targetFile, Path tempFile) throws IOException {
        HadoopRecoverableFsDataOutputStream.ensureTruncateInitialized();
        this.fs = (FileSystem)Preconditions.checkNotNull((Object)fs);
        this.targetFile = (Path)Preconditions.checkNotNull((Object)targetFile);
        this.tempFile = (Path)Preconditions.checkNotNull((Object)tempFile);
        this.out = fs.create(tempFile);
    }

    HadoopRecoverableFsDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable) throws IOException {
        HadoopRecoverableFsDataOutputStream.ensureTruncateInitialized();
        this.fs = (FileSystem)Preconditions.checkNotNull((Object)fs);
        this.targetFile = (Path)Preconditions.checkNotNull((Object)recoverable.targetFile());
        this.tempFile = (Path)Preconditions.checkNotNull((Object)recoverable.tempFile());
        HadoopRecoverableFsDataOutputStream.safelyTruncateFile(fs, this.tempFile, recoverable);
        this.out = fs.append(this.tempFile);
        long pos = this.out.getPos();
        if (pos != recoverable.offset()) {
            IOUtils.closeQuietly((AutoCloseable)this.out);
            throw new IOException("Truncate failed: " + this.tempFile + " (requested=" + recoverable.offset() + " ,size=" + pos + ')');
        }
    }

    public void write(int b) throws IOException {
        this.out.write(b);
    }

    public void write(byte[] b, int off, int len) throws IOException {
        this.out.write(b, off, len);
    }

    public void flush() throws IOException {
        this.out.hflush();
    }

    public void sync() throws IOException {
        this.out.hflush();
        this.out.hsync();
    }

    public long getPos() throws IOException {
        return this.out.getPos();
    }

    public RecoverableWriter.ResumeRecoverable persist() throws IOException {
        this.sync();
        return new HadoopFsRecoverable(this.targetFile, this.tempFile, this.getPos());
    }

    public RecoverableFsDataOutputStream.Committer closeForCommit() throws IOException {
        long pos = this.getPos();
        this.close();
        return new HadoopFsCommitter(this.fs, new HadoopFsRecoverable(this.targetFile, this.tempFile, pos));
    }

    public void close() throws IOException {
        this.out.close();
    }

    private static void safelyTruncateFile(FileSystem fileSystem, Path path, HadoopFsRecoverable recoverable) throws IOException {
        boolean truncated;
        HadoopRecoverableFsDataOutputStream.ensureTruncateInitialized();
        HadoopRecoverableFsDataOutputStream.waitUntilLeaseIsRevoked(fileSystem, path);
        try {
            truncated = HadoopRecoverableFsDataOutputStream.truncate(fileSystem, path, recoverable.offset());
        }
        catch (Exception e) {
            throw new IOException("Problem while truncating file: " + path, e);
        }
        if (!truncated) {
            HadoopRecoverableFsDataOutputStream.waitUntilLeaseIsRevoked(fileSystem, path);
        }
    }

    private static void ensureTruncateInitialized() throws FlinkRuntimeException {
        if (truncateHandle == null) {
            Method truncateMethod;
            try {
                truncateMethod = FileSystem.class.getMethod("truncate", Path.class, Long.TYPE);
            }
            catch (NoSuchMethodException e) {
                throw new FlinkRuntimeException("Could not find a public truncate method on the Hadoop File System.");
            }
            if (!Modifier.isPublic(truncateMethod.getModifiers())) {
                throw new FlinkRuntimeException("Could not find a public truncate method on the Hadoop File System.");
            }
            truncateHandle = truncateMethod;
        }
    }

    private static boolean truncate(FileSystem hadoopFs, Path file, long length) throws IOException {
        block4: {
            if (truncateHandle != null) {
                try {
                    return (Boolean)truncateHandle.invoke((Object)hadoopFs, file, length);
                }
                catch (InvocationTargetException e) {
                    ExceptionUtils.rethrowIOException((Throwable)e.getTargetException());
                    break block4;
                }
                catch (Throwable t) {
                    throw new IOException("Truncation of file failed because of access/linking problems with Hadoop's truncate call. This is most likely a dependency conflict or class loading problem.");
                }
            }
            throw new IllegalStateException("Truncation handle has not been initialized");
        }
        return false;
    }

    private static boolean waitUntilLeaseIsRevoked(FileSystem fs, Path path) throws IOException {
        Preconditions.checkState((boolean)(fs instanceof DistributedFileSystem));
        DistributedFileSystem dfs = (DistributedFileSystem)fs;
        dfs.recoverLease(path);
        Deadline deadline = Deadline.now().plus(Duration.ofMillis(100000L));
        StopWatch sw = new StopWatch();
        sw.start();
        boolean isClosed = dfs.isFileClosed(path);
        while (!isClosed && deadline.hasTimeLeft()) {
            try {
                Thread.sleep(500L);
            }
            catch (InterruptedException e1) {
                throw new IOException("Recovering the lease failed: ", e1);
            }
            isClosed = dfs.isFileClosed(path);
        }
        return isClosed;
    }

    static class HadoopFsCommitter
    implements RecoverableFsDataOutputStream.Committer {
        private final FileSystem fs;
        private final HadoopFsRecoverable recoverable;

        HadoopFsCommitter(FileSystem fs, HadoopFsRecoverable recoverable) {
            this.fs = (FileSystem)Preconditions.checkNotNull((Object)fs);
            this.recoverable = (HadoopFsRecoverable)Preconditions.checkNotNull((Object)recoverable);
        }

        public void commit() throws IOException {
            FileStatus srcStatus;
            Path src = this.recoverable.tempFile();
            Path dest = this.recoverable.targetFile();
            long expectedLength = this.recoverable.offset();
            try {
                srcStatus = this.fs.getFileStatus(src);
            }
            catch (IOException e) {
                throw new IOException("Cannot clean commit: Staging file does not exist.");
            }
            if (srcStatus.getLen() != expectedLength) {
                throw new IOException("Cannot clean commit: File has trailing junk data.");
            }
            try {
                this.fs.rename(src, dest);
            }
            catch (IOException e) {
                throw new IOException("Committing file by rename failed: " + src + " to " + dest, e);
            }
        }

        public void commitAfterRecovery() throws IOException {
            Path src = this.recoverable.tempFile();
            Path dest = this.recoverable.targetFile();
            long expectedLength = this.recoverable.offset();
            FileStatus srcStatus = null;
            try {
                srcStatus = this.fs.getFileStatus(src);
            }
            catch (FileNotFoundException fileNotFoundException) {
            }
            catch (IOException e) {
                throw new IOException("Committing during recovery failed: Could not access status of source file.");
            }
            if (srcStatus != null) {
                if (srcStatus.getLen() > expectedLength) {
                    HadoopRecoverableFsDataOutputStream.safelyTruncateFile(this.fs, src, this.recoverable);
                }
                try {
                    this.fs.rename(src, dest);
                }
                catch (IOException e) {
                    throw new IOException("Committing file by rename failed: " + src + " to " + dest, e);
                }
            } else if (!this.fs.exists(dest)) {
                // empty if block
            }
        }

        public RecoverableWriter.CommitRecoverable getRecoverable() {
            return this.recoverable;
        }
    }
}

