/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyExtractorFunction;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateSnapshotRestore;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.StateSnapshotTransformers;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.HeapAggregatingState;
import org.apache.flink.runtime.state.heap.HeapListState;
import org.apache.flink.runtime.state.heap.HeapMapState;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
import org.apache.flink.runtime.state.heap.HeapReducingState;
import org.apache.flink.runtime.state.heap.HeapSnapshotStrategy;
import org.apache.flink.runtime.state.heap.HeapValueState;
import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.StateMigrationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HeapKeyedStateBackend<K>
extends AbstractKeyedStateBackend<K> {
    private static final Logger LOG = LoggerFactory.getLogger(HeapKeyedStateBackend.class);
    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES = Stream.of(Tuple2.of(ValueStateDescriptor.class, HeapValueState::create), Tuple2.of(ListStateDescriptor.class, HeapListState::create), Tuple2.of(MapStateDescriptor.class, HeapMapState::create), Tuple2.of(AggregatingStateDescriptor.class, HeapAggregatingState::create), Tuple2.of(ReducingStateDescriptor.class, HeapReducingState::create)).collect(Collectors.toMap(t -> (Class)t.f0, t -> (StateFactory)t.f1));
    private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
    private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates;
    private final LocalRecoveryConfig localRecoveryConfig;
    private final HeapSnapshotStrategy<K> snapshotStrategy;
    private final HeapPriorityQueueSetFactory priorityQueueSetFactory;

    public HeapKeyedStateBackend(TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, CloseableRegistry cancelStreamRegistry, StreamCompressionDecorator keyGroupCompressionDecorator, Map<String, StateTable<K, ?, ?>> registeredKVStates, Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates, LocalRecoveryConfig localRecoveryConfig, HeapPriorityQueueSetFactory priorityQueueSetFactory, HeapSnapshotStrategy<K> snapshotStrategy, InternalKeyContext<K> keyContext) {
        super(kvStateRegistry, keySerializer, userCodeClassLoader, executionConfig, ttlTimeProvider, cancelStreamRegistry, keyGroupCompressionDecorator, keyContext);
        this.registeredKVStates = registeredKVStates;
        this.registeredPQStates = registeredPQStates;
        this.localRecoveryConfig = localRecoveryConfig;
        LOG.info("Initializing heap keyed state backend with stream factory.");
        this.priorityQueueSetFactory = priorityQueueSetFactory;
        this.snapshotStrategy = snapshotStrategy;
    }

    @Override
    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
        HeapPriorityQueueSnapshotRestoreWrapper existingState = this.registeredPQStates.get(stateName);
        if (existingState != null) {
            TypeSerializerSchemaCompatibility compatibilityResult = existingState.getMetaInfo().updateElementSerializer(byteOrderedElementSerializer);
            if (compatibilityResult.isIncompatible()) {
                throw new FlinkRuntimeException((Throwable)new StateMigrationException("For heap backends, the new priority queue serializer must not be incompatible."));
            }
            this.registeredPQStates.put(stateName, existingState.forUpdatedSerializer(byteOrderedElementSerializer));
            return existingState.getPriorityQueue();
        }
        RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo = new RegisteredPriorityQueueStateBackendMetaInfo<T>(stateName, byteOrderedElementSerializer);
        return this.createInternal(metaInfo);
    }

    @Nonnull
    private <T extends HeapPriorityQueueElement & PriorityComparable> KeyGroupedInternalPriorityQueue<T> createInternal(RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo) {
        String stateName = metaInfo.getName();
        KeyGroupedInternalPriorityQueue priorityQueue = this.priorityQueueSetFactory.create(stateName, (TypeSerializer)metaInfo.getElementSerializer());
        HeapPriorityQueueSnapshotRestoreWrapper<T> wrapper = new HeapPriorityQueueSnapshotRestoreWrapper<T>(priorityQueue, metaInfo, KeyExtractorFunction.forKeyedObjects(), this.keyGroupRange, this.numberOfKeyGroups);
        this.registeredPQStates.put(stateName, wrapper);
        return priorityQueue;
    }

    private <N, V> StateTable<K, N, V> tryRegisterStateTable(TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<V> snapshotTransformFactory) throws StateMigrationException {
        StateTable<K, Object, Object> stateTable = this.registeredKVStates.get(stateDesc.getName());
        TypeSerializer newStateSerializer = stateDesc.getSerializer();
        if (stateTable != null) {
            RegisteredKeyValueStateBackendMetaInfo<?, ?> restoredKvMetaInfo = stateTable.getMetaInfo();
            restoredKvMetaInfo.updateSnapshotTransformFactory(snapshotTransformFactory);
            TypeSerializer<?> previousNamespaceSerializer = restoredKvMetaInfo.getNamespaceSerializer();
            TypeSerializerSchemaCompatibility<?> namespaceCompatibility = restoredKvMetaInfo.updateNamespaceSerializer(namespaceSerializer);
            if (namespaceCompatibility.isCompatibleAfterMigration() || namespaceCompatibility.isIncompatible()) {
                throw new StateMigrationException("For heap backends, the new namespace serializer (" + namespaceSerializer + ") must be compatible with the old namespace serializer (" + previousNamespaceSerializer + ").");
            }
            restoredKvMetaInfo.checkStateMetaInfo(stateDesc);
            TypeSerializer<?> previousStateSerializer = restoredKvMetaInfo.getStateSerializer();
            TypeSerializerSchemaCompatibility<?> stateCompatibility = restoredKvMetaInfo.updateStateSerializer(newStateSerializer);
            if (stateCompatibility.isIncompatible()) {
                throw new StateMigrationException("For heap backends, the new state serializer (" + newStateSerializer + ") must not be incompatible with the old state serializer (" + previousStateSerializer + ").");
            }
            stateTable.setMetaInfo(restoredKvMetaInfo);
        } else {
            RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<N, V>(stateDesc.getType(), stateDesc.getName(), namespaceSerializer, newStateSerializer, snapshotTransformFactory);
            stateTable = this.snapshotStrategy.newStateTable(this.keyContext, newMetaInfo, this.keySerializer);
            this.registeredKVStates.put(stateDesc.getName(), stateTable);
        }
        return stateTable;
    }

    @Override
    public <N> Stream<K> getKeys(String state, N namespace) {
        if (!this.registeredKVStates.containsKey(state)) {
            return Stream.empty();
        }
        StateSnapshotRestore stateSnapshotRestore = this.registeredKVStates.get(state);
        StateTable table = (StateTable)stateSnapshotRestore;
        return table.getKeys(namespace);
    }

    @Override
    public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state) {
        if (!this.registeredKVStates.containsKey(state)) {
            return Stream.empty();
        }
        StateSnapshotRestore stateSnapshotRestore = this.registeredKVStates.get(state);
        StateTable table = (StateTable)stateSnapshotRestore;
        return table.getKeysAndNamespaces();
    }

    @Override
    @Nonnull
    public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(@Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<S, SV> stateDesc, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
        StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
        if (stateFactory == null) {
            String message = String.format("State %s is not supported by %s", stateDesc.getClass(), this.getClass());
            throw new FlinkRuntimeException(message);
        }
        StateTable<K, N, SV> stateTable = this.tryRegisterStateTable(namespaceSerializer, stateDesc, this.getStateSnapshotTransformFactory(stateDesc, snapshotTransformFactory));
        return stateFactory.createState(stateDesc, stateTable, this.getKeySerializer());
    }

    private <SV, SEV> StateSnapshotTransformer.StateSnapshotTransformFactory<SV> getStateSnapshotTransformFactory(StateDescriptor<?, SV> stateDesc, StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) {
        if (stateDesc instanceof ListStateDescriptor) {
            return new StateSnapshotTransformers.ListStateSnapshotTransformFactory<SEV>(snapshotTransformFactory);
        }
        if (stateDesc instanceof MapStateDescriptor) {
            return new StateSnapshotTransformers.MapStateSnapshotTransformFactory(snapshotTransformFactory);
        }
        return snapshotTransformFactory;
    }

    @Override
    @Nonnull
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws IOException {
        long startTime = System.currentTimeMillis();
        RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunner = this.snapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
        this.snapshotStrategy.logSyncCompleted(streamFactory, startTime);
        return snapshotRunner;
    }

    public void notifyCheckpointComplete(long checkpointId) {
    }

    public void notifyCheckpointAborted(long checkpointId) {
    }

    @Override
    public <N, S extends State, T> void applyToAllKeys(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor, KeyedStateFunction<K, S> function) throws Exception {
        try (Stream<K> keyStream = this.getKeys(stateDescriptor.getName(), namespace);){
            List keys = keyStream.collect(Collectors.toList());
            S state = this.getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
            for (Object key : keys) {
                this.setCurrentKey(key);
                function.process(key, state);
            }
        }
    }

    public String toString() {
        return "HeapKeyedStateBackend";
    }

    @Override
    @VisibleForTesting
    public int numKeyValueStateEntries() {
        int sum = 0;
        for (StateSnapshotRestore stateSnapshotRestore : this.registeredKVStates.values()) {
            sum += ((StateTable)stateSnapshotRestore).size();
        }
        return sum;
    }

    @VisibleForTesting
    public int numKeyValueStateEntries(Object namespace) {
        int sum = 0;
        for (StateTable<K, ?, ?> state : this.registeredKVStates.values()) {
            sum += state.sizeOfNamespace(namespace);
        }
        return sum;
    }

    @Override
    public boolean supportsAsynchronousSnapshots() {
        return this.snapshotStrategy.isAsynchronous();
    }

    @VisibleForTesting
    public LocalRecoveryConfig getLocalRecoveryConfig() {
        return this.localRecoveryConfig;
    }

    private static interface StateFactory {
        public <K, N, SV, S extends State, IS extends S> IS createState(StateDescriptor<S, SV> var1, StateTable<K, N, SV> var2, TypeSerializer<K> var3) throws Exception;
    }
}

