/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class BlobCache
implements BlobService {
    private static final Logger LOG = LoggerFactory.getLogger(BlobCache.class);
    private final InetSocketAddress serverAddress;
    private final File storageDir;
    private final BlobView blobView;
    private final AtomicBoolean shutdownRequested = new AtomicBoolean();
    private final Thread shutdownHook;
    private final int numFetchRetries;
    private final Configuration blobClientConfig;

    public BlobCache(InetSocketAddress serverAddress, Configuration blobClientConfig, BlobView blobView) throws IOException {
        this.serverAddress = (InetSocketAddress)Preconditions.checkNotNull((Object)serverAddress);
        this.blobClientConfig = (Configuration)Preconditions.checkNotNull((Object)blobClientConfig);
        this.blobView = (BlobView)Preconditions.checkNotNull((Object)blobView, (String)"blobStore");
        String storageDirectory = blobClientConfig.getString("blob.storage.directory", null);
        this.storageDir = BlobUtils.initStorageDirectory(storageDirectory);
        LOG.info("Created BLOB cache storage directory " + this.storageDir);
        int fetchRetries = blobClientConfig.getInteger("blob.fetch.retries", 5);
        if (fetchRetries >= 0) {
            this.numFetchRetries = fetchRetries;
        } else {
            LOG.warn("Invalid value for {}. System will attempt no retires on failed fetches of BLOBs.", (Object)"blob.fetch.retries");
            this.numFetchRetries = 0;
        }
        this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
    }

    @Override
    public URL getURL(BlobKey requiredBlob) throws IOException {
        Preconditions.checkArgument((requiredBlob != null ? 1 : 0) != 0, (Object)"BLOB key cannot be null.");
        File localJarFile = BlobUtils.getStorageLocation(this.storageDir, requiredBlob);
        if (localJarFile.exists()) {
            return localJarFile.toURI().toURL();
        }
        try {
            this.blobView.get(requiredBlob, localJarFile);
        }
        catch (Exception e) {
            LOG.info("Failed to copy from blob store. Downloading from BLOB server instead.", (Throwable)e);
        }
        if (localJarFile.exists()) {
            return localJarFile.toURI().toURL();
        }
        byte[] buf = new byte[65536];
        int attempt = 0;
        while (true) {
            if (attempt == 0) {
                LOG.info("Downloading {} from {}", (Object)requiredBlob, (Object)this.serverAddress);
            } else {
                LOG.info("Downloading {} from {} (retry {})", new Object[]{requiredBlob, this.serverAddress, attempt});
            }
            try {
                BlobClient bc = null;
                InputStream is = null;
                FileOutputStream os = null;
                try {
                    int read;
                    bc = new BlobClient(this.serverAddress, this.blobClientConfig);
                    is = bc.get(requiredBlob);
                    os = new FileOutputStream(localJarFile);
                    while ((read = is.read(buf)) >= 0) {
                        ((OutputStream)os).write(buf, 0, read);
                    }
                    ((OutputStream)os).close();
                    os = null;
                    is.close();
                    is = null;
                    bc.close();
                    bc = null;
                    return localJarFile.toURI().toURL();
                }
                catch (Throwable t) {
                    IOUtils.closeQuietly(os);
                    IOUtils.closeQuietly(is);
                    IOUtils.closeQuietly((AutoCloseable)bc);
                    if (t instanceof IOException) {
                        throw (IOException)t;
                    }
                    throw new IOException(t.getMessage(), t);
                }
            }
            catch (IOException e) {
                String message = "Failed to fetch BLOB " + requiredBlob + " from " + this.serverAddress + " and store it under " + localJarFile.getAbsolutePath();
                if (attempt < this.numFetchRetries) {
                    ++attempt;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(message + " Retrying...", (Throwable)e);
                        continue;
                    }
                    LOG.error(message + " Retrying...");
                    continue;
                }
                LOG.error(message + " No retries left.", (Throwable)e);
                throw new IOException(message, e);
            }
            break;
        }
    }

    @Override
    public void delete(BlobKey key) throws IOException {
        File localFile = BlobUtils.getStorageLocation(this.storageDir, key);
        if (localFile.exists() && !localFile.delete()) {
            LOG.warn("Failed to delete locally cached BLOB " + key + " at " + localFile.getAbsolutePath());
        }
    }

    public void deleteGlobal(BlobKey key) throws IOException {
        this.delete(key);
        try (BlobClient bc = this.createClient();){
            bc.delete(key);
        }
    }

    @Override
    public int getPort() {
        return this.serverAddress.getPort();
    }

    @Override
    public void close() throws IOException {
        if (this.shutdownRequested.compareAndSet(false, true)) {
            LOG.info("Shutting down BlobCache");
            try {
                FileUtils.deleteDirectory((File)this.storageDir);
            }
            finally {
                if (this.shutdownHook != null && this.shutdownHook != Thread.currentThread()) {
                    try {
                        Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
                    }
                    catch (IllegalStateException illegalStateException) {
                    }
                    catch (Throwable t) {
                        LOG.warn("Exception while unregistering BLOB cache's cleanup shutdown hook.");
                    }
                }
            }
        }
    }

    @Override
    public BlobClient createClient() throws IOException {
        return new BlobClient(this.serverAddress, this.blobClientConfig);
    }

    public File getStorageDir() {
        return this.storageDir;
    }
}

