/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state.restore;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
import org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.PriorityQueueFlag;
import org.apache.flink.contrib.streaming.state.restore.RocksDBRestoreResult;
import org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil;
import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.RocksDBException;

public class RocksDBFullRestoreOperation<K>
extends AbstractRocksDBRestoreOperation<K> {
    private KeyGroupsStateHandle currentKeyGroupsStateHandle;
    private FSDataInputStream currentStateHandleInStream;
    private DataInputView currentStateHandleInView;
    private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies;
    private StreamCompressionDecorator keygroupStreamCompressionDecorator;
    private final long writeBatchSize;
    private final PriorityQueueFlag queueRestoreEnabled;

    public RocksDBFullRestoreOperation(KeyGroupRange keyGroupRange, int keyGroupPrefixBytes, int numberOfTransferringThreads, CloseableRegistry cancelStreamRegistry, ClassLoader userCodeClassLoader, Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, StateSerializerProvider<K> keySerializerProvider, File instanceBasePath, File instanceRocksDBPath, DBOptions dbOptions, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, RocksDBNativeMetricOptions nativeMetricOptions, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> restoreStateHandles, @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, @Nonnegative long writeBatchSize, Long writeBufferManagerCapacity, PriorityQueueFlag queueRestoreEnabled) {
        super(keyGroupRange, keyGroupPrefixBytes, numberOfTransferringThreads, cancelStreamRegistry, userCodeClassLoader, kvStateInformation, keySerializerProvider, instanceBasePath, instanceRocksDBPath, dbOptions, columnFamilyOptionsFactory, nativeMetricOptions, metricGroup, restoreStateHandles, ttlCompactFiltersManager, writeBufferManagerCapacity);
        Preconditions.checkArgument((writeBatchSize >= 0L ? 1 : 0) != 0, (Object)"Write batch size have to be no negative.");
        this.writeBatchSize = writeBatchSize;
        this.queueRestoreEnabled = queueRestoreEnabled;
    }

    @Override
    public RocksDBRestoreResult restore() throws IOException, StateMigrationException, RocksDBException {
        this.openDB();
        for (KeyedStateHandle keyedStateHandle : this.restoreStateHandles) {
            if (keyedStateHandle == null) continue;
            if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) {
                throw StateUtil.unexpectedStateHandleException(KeyGroupsStateHandle.class, keyedStateHandle.getClass());
            }
            this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle)keyedStateHandle;
            this.restoreKeyGroupsInStateHandle();
        }
        return new RocksDBRestoreResult(this.db, this.defaultColumnFamilyHandle, this.nativeMetricMonitor, -1L, null, null);
    }

    private void restoreKeyGroupsInStateHandle() throws IOException, StateMigrationException, RocksDBException {
        try {
            this.logger.info("Starting to restore from state handle: {}.", (Object)this.currentKeyGroupsStateHandle);
            this.currentStateHandleInStream = this.currentKeyGroupsStateHandle.openInputStream();
            this.cancelStreamRegistry.registerCloseable((Closeable)this.currentStateHandleInStream);
            this.currentStateHandleInView = new DataInputViewStreamWrapper((InputStream)this.currentStateHandleInStream);
            this.restoreKVStateMetaData();
            this.restoreKVStateData();
            this.logger.info("Finished restoring from state handle: {}.", (Object)this.currentKeyGroupsStateHandle);
        }
        finally {
            if (this.cancelStreamRegistry.unregisterCloseable((Closeable)this.currentStateHandleInStream)) {
                IOUtils.closeQuietly((AutoCloseable)this.currentStateHandleInStream);
            }
        }
    }

    private void restoreKVStateMetaData() throws IOException, StateMigrationException {
        KeyedBackendSerializationProxy serializationProxy = this.readMetaData(this.currentStateHandleInView);
        this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ? SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
        List restoredMetaInfos = serializationProxy.getStateMetaInfoSnapshots();
        this.currentStateHandleKVStateColumnFamilies = new ArrayList<ColumnFamilyHandle>(restoredMetaInfos.size());
        for (StateMetaInfoSnapshot restoredMetaInfo : restoredMetaInfos) {
            if (restoredMetaInfo.getBackendStateType() == StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE && this.queueRestoreEnabled == PriorityQueueFlag.THROW_ON_PRIORITY_QUEUE) {
                throw new StateMigrationException("Can not restore savepoint taken with RocksDB timers enabled with Heap timers!");
            }
            RocksDBKeyedStateBackend.RocksDbKvStateInfo registeredStateCFHandle = this.getOrRegisterStateColumnFamilyHandle(null, restoredMetaInfo);
            this.currentStateHandleKVStateColumnFamilies.add(registeredStateCFHandle.columnFamilyHandle);
        }
    }

    private void restoreKVStateData() throws IOException, RocksDBException {
        try (RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(this.db, this.writeBatchSize);){
            for (Tuple2 keyGroupOffset : this.currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
                int keyGroup = (Integer)keyGroupOffset.f0;
                Preconditions.checkState((boolean)this.keyGroupRange.contains(keyGroup), (Object)"The key group must belong to the backend");
                long offset = (Long)keyGroupOffset.f1;
                if (0L == offset) continue;
                this.currentStateHandleInStream.seek(offset);
                InputStream compressedKgIn = this.keygroupStreamCompressionDecorator.decorateWithCompression((InputStream)this.currentStateHandleInStream);
                Throwable throwable = null;
                try {
                    DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
                    int kvStateId = compressedKgInputView.readShort();
                    ColumnFamilyHandle handle = this.currentStateHandleKVStateColumnFamilies.get(kvStateId);
                    boolean keyGroupHasMoreKeys = true;
                    while (keyGroupHasMoreKeys) {
                        byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize((DataInputView)compressedKgInputView);
                        byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize((DataInputView)compressedKgInputView);
                        if (RocksSnapshotUtil.hasMetaDataFollowsFlag(key)) {
                            RocksSnapshotUtil.clearMetaDataFollowsFlag(key);
                            writeBatchWrapper.put(handle, key, value);
                            kvStateId = 0xFFFF & compressedKgInputView.readShort();
                            if (65535 == kvStateId) {
                                keyGroupHasMoreKeys = false;
                                continue;
                            }
                            handle = this.currentStateHandleKVStateColumnFamilies.get(kvStateId);
                            continue;
                        }
                        writeBatchWrapper.put(handle, key, value);
                    }
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (compressedKgIn == null) continue;
                    if (throwable != null) {
                        try {
                            compressedKgIn.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    compressedKgIn.close();
                }
            }
        }
    }
}

