/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashSet;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupsList;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;

public class HeapInternalTimerService<K, N>
implements InternalTimerService<N>,
ProcessingTimeCallback {
    private final ProcessingTimeService processingTimeService;
    private final KeyContext keyContext;
    private final Set<InternalTimer<K, N>>[] processingTimeTimersByKeyGroup;
    private final PriorityQueue<InternalTimer<K, N>> processingTimeTimersQueue;
    private final Set<InternalTimer<K, N>>[] eventTimeTimersByKeyGroup;
    private final PriorityQueue<InternalTimer<K, N>> eventTimeTimersQueue;
    private final KeyGroupsList localKeyGroupRange;
    private final int totalKeyGroups;
    private final int localKeyGroupRangeStartIdx;
    private long currentWatermark = Long.MIN_VALUE;
    private ScheduledFuture<?> nextTimer;
    private TypeSerializer<K> keySerializer;
    private TypeSerializer<N> namespaceSerializer;
    private InternalTimer.TimerSerializer<K, N> timerSerializer;
    private Triggerable<K, N> triggerTarget;
    private volatile boolean isInitialized;
    private TypeSerializer<K> keyDeserializer;
    private TypeSerializer<N> namespaceDeserializer;

    public HeapInternalTimerService(int totalKeyGroups, KeyGroupsList localKeyGroupRange, KeyContext keyContext, ProcessingTimeService processingTimeService) {
        this.keyContext = (KeyContext)Preconditions.checkNotNull((Object)keyContext);
        this.processingTimeService = (ProcessingTimeService)Preconditions.checkNotNull((Object)processingTimeService);
        this.totalKeyGroups = totalKeyGroups;
        this.localKeyGroupRange = (KeyGroupsList)Preconditions.checkNotNull((Object)localKeyGroupRange);
        int startIdx = Integer.MAX_VALUE;
        for (Integer keyGroupIdx : localKeyGroupRange) {
            startIdx = Math.min(keyGroupIdx, startIdx);
        }
        this.localKeyGroupRangeStartIdx = startIdx;
        int localKeyGroups = this.localKeyGroupRange.getNumberOfKeyGroups();
        this.eventTimeTimersQueue = new PriorityQueue(100);
        this.eventTimeTimersByKeyGroup = new HashSet[localKeyGroups];
        this.processingTimeTimersQueue = new PriorityQueue(100);
        this.processingTimeTimersByKeyGroup = new HashSet[localKeyGroups];
    }

    public void startTimerService(TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerTarget) {
        if (!this.isInitialized) {
            if (keySerializer == null || namespaceSerializer == null) {
                throw new IllegalArgumentException("The TimersService serializers cannot be null.");
            }
            if (this.keySerializer != null || this.namespaceSerializer != null || this.triggerTarget != null) {
                throw new IllegalStateException("The TimerService has already been initialized.");
            }
            if (this.keyDeserializer != null && !this.keyDeserializer.equals(keySerializer) || this.namespaceDeserializer != null && !this.namespaceDeserializer.equals(namespaceSerializer)) {
                throw new IllegalStateException("Tried to initialize restored TimerService with different serializers than those used to snapshot its state.");
            }
            this.keySerializer = keySerializer;
            this.namespaceSerializer = namespaceSerializer;
            this.keyDeserializer = null;
            this.namespaceDeserializer = null;
            this.triggerTarget = (Triggerable)Preconditions.checkNotNull(triggerTarget);
            this.timerSerializer = new InternalTimer.TimerSerializer<K, N>(this.keySerializer, this.namespaceSerializer);
            if (this.processingTimeTimersQueue.size() > 0) {
                this.nextTimer = this.processingTimeService.registerTimer(this.processingTimeTimersQueue.peek().getTimestamp(), this);
            }
            this.isInitialized = true;
        } else if (!this.keySerializer.equals(keySerializer) || !this.namespaceSerializer.equals(namespaceSerializer)) {
            throw new IllegalArgumentException("Already initialized Timer Service tried to be initialized with different key and namespace serializers.");
        }
    }

    @Override
    public long currentProcessingTime() {
        return this.processingTimeService.getCurrentProcessingTime();
    }

    @Override
    public long currentWatermark() {
        return this.currentWatermark;
    }

    @Override
    public void registerProcessingTimeTimer(N namespace, long time) {
        InternalTimer<Object, N> timer = new InternalTimer<Object, N>(time, this.keyContext.getCurrentKey(), namespace);
        Set<InternalTimer<Object, N>> timerSet = this.getProcessingTimeTimerSetForTimer(timer);
        if (timerSet.add(timer)) {
            InternalTimer<K, N> oldHead = this.processingTimeTimersQueue.peek();
            long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
            this.processingTimeTimersQueue.add(timer);
            if (time < nextTriggerTime) {
                if (this.nextTimer != null) {
                    this.nextTimer.cancel(false);
                }
                this.nextTimer = this.processingTimeService.registerTimer(time, this);
            }
        }
    }

    @Override
    public void registerEventTimeTimer(N namespace, long time) {
        InternalTimer<Object, N> timer = new InternalTimer<Object, N>(time, this.keyContext.getCurrentKey(), namespace);
        Set<InternalTimer<Object, N>> timerSet = this.getEventTimeTimerSetForTimer(timer);
        if (timerSet.add(timer)) {
            this.eventTimeTimersQueue.add(timer);
        }
    }

    @Override
    public void deleteProcessingTimeTimer(N namespace, long time) {
        InternalTimer<Object, N> timer = new InternalTimer<Object, N>(time, this.keyContext.getCurrentKey(), namespace);
        Set<InternalTimer<Object, N>> timerSet = this.getProcessingTimeTimerSetForTimer(timer);
        if (timerSet.remove(timer)) {
            this.processingTimeTimersQueue.remove(timer);
        }
    }

    @Override
    public void deleteEventTimeTimer(N namespace, long time) {
        InternalTimer<Object, N> timer = new InternalTimer<Object, N>(time, this.keyContext.getCurrentKey(), namespace);
        Set<InternalTimer<Object, N>> timerSet = this.getEventTimeTimerSetForTimer(timer);
        if (timerSet.remove(timer)) {
            this.eventTimeTimersQueue.remove(timer);
        }
    }

    @Override
    public void onProcessingTime(long time) throws Exception {
        InternalTimer<K, N> timer;
        this.nextTimer = null;
        while ((timer = this.processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            Set<InternalTimer<K, N>> timerSet = this.getProcessingTimeTimerSetForTimer(timer);
            timerSet.remove(timer);
            this.processingTimeTimersQueue.remove();
            this.keyContext.setCurrentKey(timer.getKey());
            this.triggerTarget.onProcessingTime(timer);
        }
        if (timer != null && this.nextTimer == null) {
            this.nextTimer = this.processingTimeService.registerTimer(timer.getTimestamp(), this);
        }
    }

    public void advanceWatermark(long time) throws Exception {
        InternalTimer<K, N> timer;
        this.currentWatermark = time;
        while ((timer = this.eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            Set<InternalTimer<K, N>> timerSet = this.getEventTimeTimerSetForTimer(timer);
            timerSet.remove(timer);
            this.eventTimeTimersQueue.remove();
            this.keyContext.setCurrentKey(timer.getKey());
            this.triggerTarget.onEventTime(timer);
        }
    }

    public void snapshotTimersForKeyGroup(DataOutputViewStreamWrapper stream, int keyGroupIdx) throws Exception {
        InstantiationUtil.serializeObject((OutputStream)stream, this.keySerializer);
        InstantiationUtil.serializeObject((OutputStream)stream, this.namespaceSerializer);
        Set<InternalTimer<K, N>> eventTimers = this.getEventTimeTimerSetForKeyGroup(keyGroupIdx);
        if (eventTimers != null) {
            stream.writeInt(eventTimers.size());
            for (InternalTimer<K, N> timer : eventTimers) {
                this.timerSerializer.serialize(timer, (DataOutputView)stream);
            }
        } else {
            stream.writeInt(0);
        }
        Set<InternalTimer<K, N>> processingTimers = this.getProcessingTimeTimerSetForKeyGroup(keyGroupIdx);
        if (processingTimers != null) {
            stream.writeInt(processingTimers.size());
            for (InternalTimer<K, N> timer : processingTimers) {
                this.timerSerializer.serialize(timer, (DataOutputView)stream);
            }
        } else {
            stream.writeInt(0);
        }
    }

    public void restoreTimersForKeyGroup(DataInputViewStreamWrapper stream, int keyGroupIdx, ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {
        int sizeOfProcessingTimeTimers;
        TypeSerializer tmpKeyDeserializer = (TypeSerializer)InstantiationUtil.deserializeObject((InputStream)stream, (ClassLoader)userCodeClassLoader);
        TypeSerializer tmpNamespaceDeserializer = (TypeSerializer)InstantiationUtil.deserializeObject((InputStream)stream, (ClassLoader)userCodeClassLoader);
        if (this.keyDeserializer != null && !this.keyDeserializer.equals((Object)tmpKeyDeserializer) || this.namespaceDeserializer != null && !this.namespaceDeserializer.equals((Object)tmpNamespaceDeserializer)) {
            throw new IllegalArgumentException("Tried to restore timers for the same service with different serializers.");
        }
        this.keyDeserializer = tmpKeyDeserializer;
        this.namespaceDeserializer = tmpNamespaceDeserializer;
        InternalTimer.TimerSerializer<K, N> timerSerializer = new InternalTimer.TimerSerializer<K, N>(this.keyDeserializer, this.namespaceDeserializer);
        Preconditions.checkArgument((boolean)this.localKeyGroupRange.contains(keyGroupIdx), (Object)("Key Group " + keyGroupIdx + " does not belong to the local range."));
        int sizeOfEventTimeTimers = stream.readInt();
        if (sizeOfEventTimeTimers > 0) {
            Set<InternalTimer<K, N>> eventTimers = this.getEventTimeTimerSetForKeyGroup(keyGroupIdx);
            for (int i = 0; i < sizeOfEventTimeTimers; ++i) {
                Object timer = timerSerializer.deserialize((DataInputView)stream);
                eventTimers.add((InternalTimer<K, N>)timer);
                this.eventTimeTimersQueue.add((InternalTimer<K, N>)timer);
            }
        }
        if ((sizeOfProcessingTimeTimers = stream.readInt()) > 0) {
            Set<InternalTimer<K, N>> processingTimers = this.getProcessingTimeTimerSetForKeyGroup(keyGroupIdx);
            for (int i = 0; i < sizeOfProcessingTimeTimers; ++i) {
                Object timer = timerSerializer.deserialize((DataInputView)stream);
                processingTimers.add((InternalTimer<K, N>)timer);
                this.processingTimeTimersQueue.add((InternalTimer<K, N>)timer);
            }
        }
    }

    private Set<InternalTimer<K, N>> getEventTimeTimerSetForTimer(InternalTimer<K, N> timer) {
        Preconditions.checkArgument((this.localKeyGroupRange != null ? 1 : 0) != 0, (Object)"The operator has not been initialized.");
        int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup(timer.getKey(), (int)this.totalKeyGroups);
        return this.getEventTimeTimerSetForKeyGroup(keyGroupIdx);
    }

    private Set<InternalTimer<K, N>> getEventTimeTimerSetForKeyGroup(int keyGroupIdx) {
        int localIdx = this.getIndexForKeyGroup(keyGroupIdx);
        Set<InternalTimer<K, N>> timers = this.eventTimeTimersByKeyGroup[localIdx];
        if (timers == null) {
            this.eventTimeTimersByKeyGroup[localIdx] = timers = new HashSet<InternalTimer<K, N>>();
        }
        return timers;
    }

    private Set<InternalTimer<K, N>> getProcessingTimeTimerSetForTimer(InternalTimer<K, N> timer) {
        Preconditions.checkArgument((this.localKeyGroupRange != null ? 1 : 0) != 0, (Object)"The operator has not been initialized.");
        int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup(timer.getKey(), (int)this.totalKeyGroups);
        return this.getProcessingTimeTimerSetForKeyGroup(keyGroupIdx);
    }

    private Set<InternalTimer<K, N>> getProcessingTimeTimerSetForKeyGroup(int keyGroupIdx) {
        int localIdx = this.getIndexForKeyGroup(keyGroupIdx);
        Set<InternalTimer<K, N>> timers = this.processingTimeTimersByKeyGroup[localIdx];
        if (timers == null) {
            this.processingTimeTimersByKeyGroup[localIdx] = timers = new HashSet<InternalTimer<K, N>>();
        }
        return timers;
    }

    private int getIndexForKeyGroup(int keyGroupIdx) {
        Preconditions.checkArgument((boolean)this.localKeyGroupRange.contains(keyGroupIdx), (Object)("Key Group " + keyGroupIdx + " does not belong to the local range."));
        return keyGroupIdx - this.localKeyGroupRangeStartIdx;
    }

    public int numProcessingTimeTimers() {
        return this.processingTimeTimersQueue.size();
    }

    public int numEventTimeTimers() {
        return this.eventTimeTimersQueue.size();
    }

    public int numProcessingTimeTimers(N namespace) {
        int count = 0;
        for (InternalTimer<K, N> timer : this.processingTimeTimersQueue) {
            if (!timer.getNamespace().equals(namespace)) continue;
            ++count;
        }
        return count;
    }

    public int numEventTimeTimers(N namespace) {
        int count = 0;
        for (InternalTimer<K, N> timer : this.eventTimeTimersQueue) {
            if (!timer.getNamespace().equals(namespace)) continue;
            ++count;
        }
        return count;
    }

    @VisibleForTesting
    public int getLocalKeyGroupRangeStartIdx() {
        return this.localKeyGroupRangeStartIdx;
    }

    @VisibleForTesting
    public Set<InternalTimer<K, N>>[] getEventTimeTimersPerKeyGroup() {
        return this.eventTimeTimersByKeyGroup;
    }

    @VisibleForTesting
    public Set<InternalTimer<K, N>>[] getProcessingTimeTimersPerKeyGroup() {
        return this.processingTimeTimersByKeyGroup;
    }
}

