/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.IOUtils;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.Preconditions;

public abstract class AbstractAsyncSnapshotIOCallable<H extends StateObject>
extends AbstractAsyncIOCallable<H, CheckpointStreamFactory.CheckpointStateOutputStream> {
    protected final long checkpointId;
    protected final long timestamp;
    protected final CheckpointStreamFactory streamFactory;
    protected final CloseableRegistry closeStreamOnCancelRegistry;
    protected final AtomicBoolean open;

    public AbstractAsyncSnapshotIOCallable(long checkpointId, long timestamp, CheckpointStreamFactory streamFactory, CloseableRegistry closeStreamOnCancelRegistry) {
        this.streamFactory = (CheckpointStreamFactory)Preconditions.checkNotNull((Object)streamFactory);
        this.closeStreamOnCancelRegistry = (CloseableRegistry)Preconditions.checkNotNull((Object)closeStreamOnCancelRegistry);
        this.checkpointId = checkpointId;
        this.timestamp = timestamp;
        this.open = new AtomicBoolean(false);
    }

    @Override
    public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception {
        if (this.checkStreamClosedAndDoTransitionToOpen()) {
            CheckpointStreamFactory.CheckpointStateOutputStream stream = this.streamFactory.createCheckpointStateOutputStream(this.checkpointId, this.timestamp);
            try {
                this.closeStreamOnCancelRegistry.registerClosable((Closeable)((Object)stream));
                return stream;
            }
            catch (Exception ex) {
                this.open.set(false);
                throw ex;
            }
        }
        throw new IOException("Async snapshot: a checkpoint stream was already opened.");
    }

    @Override
    public void done(boolean canceled) {
        CheckpointStreamFactory.CheckpointStateOutputStream stream;
        if (this.checkStreamOpenAndDoTransitionToClose() && (stream = (CheckpointStreamFactory.CheckpointStateOutputStream)((Object)this.getIoHandle())) != null) {
            this.closeStreamOnCancelRegistry.unregisterClosable((Closeable)((Object)stream));
            IOUtils.closeQuietly((OutputStream)((Object)stream));
        }
    }

    protected boolean checkStreamClosedAndDoTransitionToOpen() {
        return this.open.compareAndSet(false, true);
    }

    protected boolean checkStreamOpenAndDoTransitionToClose() {
        return this.open.compareAndSet(true, false);
    }

    protected StreamStateHandle closeStreamAndGetStateHandle() throws IOException {
        if (this.checkStreamOpenAndDoTransitionToClose()) {
            CheckpointStreamFactory.CheckpointStateOutputStream stream = (CheckpointStreamFactory.CheckpointStateOutputStream)((Object)this.getIoHandle());
            try {
                StreamStateHandle streamStateHandle = stream.closeAndGetHandle();
                return streamStateHandle;
            }
            finally {
                this.closeStreamOnCancelRegistry.unregisterClosable((Closeable)((Object)stream));
            }
        }
        throw new IOException("Checkpoint stream already closed.");
    }
}

