/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.PriorityQueue;
import org.apache.commons.math3.util.ArithmeticUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.AppendingState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.ArrayListSerializer;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.datastream.LegacyWindowOperatorType;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.windowing.assigners.BaseAlignedWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;

@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window>
extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
implements OneInputStreamOperator<IN, OUT>,
Triggerable<K, W> {
    private static final long serialVersionUID = 1L;
    protected final WindowAssigner<? super IN, W> windowAssigner;
    protected final KeySelector<IN, K> keySelector;
    protected final Trigger<? super IN, ? super W> trigger;
    protected final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;
    protected final ListStateDescriptor<Tuple2<W, W>> mergingWindowsDescriptor;
    protected final TypeSerializer<K> keySerializer;
    protected final TypeSerializer<W> windowSerializer;
    protected final long allowedLateness;
    protected transient TimestampedCollector<OUT> timestampedCollector;
    protected transient Context context = new Context(this, null, null);
    protected transient WindowAssigner.WindowAssignerContext windowAssignerContext;
    protected transient InternalTimerService<W> internalTimerService;
    private final LegacyWindowOperatorType legacyWindowOperatorType;
    private transient PriorityQueue<StreamRecord<IN>> restoredFromLegacyAlignedOpRecords;
    private transient PriorityQueue<Timer<K, W>> restoredFromLegacyProcessingTimeTimers;
    private transient PriorityQueue<Timer<K, W>> restoredFromLegacyEventTimeTimers;
    private static final int BEGIN_OF_STATE_MAGIC_NUMBER = 267505218;
    private static final int BEGIN_OF_PANE_MAGIC_NUMBER = -1159790379;

    public WindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> windowSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> keySerializer, StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor, InternalWindowFunction<ACC, OUT, K, W> windowFunction, Trigger<? super IN, ? super W> trigger, long allowedLateness) {
        this(windowAssigner, windowSerializer, keySelector, keySerializer, windowStateDescriptor, windowFunction, trigger, allowedLateness, LegacyWindowOperatorType.NONE);
    }

    public WindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> windowSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> keySerializer, StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor, InternalWindowFunction<ACC, OUT, K, W> windowFunction, Trigger<? super IN, ? super W> trigger, long allowedLateness, LegacyWindowOperatorType legacyWindowOperatorType) {
        super(windowFunction);
        Preconditions.checkArgument((!(windowAssigner instanceof BaseAlignedWindowAssigner) ? 1 : 0) != 0, (Object)("The " + windowAssigner.getClass().getSimpleName() + " cannot be used with a WindowOperator. " + "This assigner is only used with the AccumulatingProcessingTimeWindowOperator and " + "the AggregatingProcessingTimeWindowOperator"));
        Preconditions.checkArgument((allowedLateness >= 0L ? 1 : 0) != 0);
        Preconditions.checkArgument((windowStateDescriptor == null || windowStateDescriptor.isSerializerInitialized() ? 1 : 0) != 0, (Object)"window state serializer is not properly initialized");
        this.windowAssigner = (WindowAssigner)Preconditions.checkNotNull(windowAssigner);
        this.windowSerializer = (TypeSerializer)Preconditions.checkNotNull(windowSerializer);
        this.keySelector = (KeySelector)Preconditions.checkNotNull(keySelector);
        this.keySerializer = (TypeSerializer)Preconditions.checkNotNull(keySerializer);
        this.windowStateDescriptor = windowStateDescriptor;
        this.trigger = (Trigger)Preconditions.checkNotNull(trigger);
        this.allowedLateness = allowedLateness;
        this.legacyWindowOperatorType = legacyWindowOperatorType;
        if (windowAssigner instanceof MergingWindowAssigner) {
            TupleSerializer tupleSerializer = new TupleSerializer(Tuple2.class, new TypeSerializer[]{windowSerializer, windowSerializer});
            this.mergingWindowsDescriptor = new ListStateDescriptor("merging-window-set", (TypeSerializer)tupleSerializer);
        } else {
            this.mergingWindowsDescriptor = null;
        }
        this.setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.timestampedCollector = new TimestampedCollector(this.output);
        this.internalTimerService = this.getInternalTimerService("window-timers", this.windowSerializer, this);
        this.context = new Context(this, null, null);
        this.windowAssignerContext = new WindowAssigner.WindowAssignerContext(){

            @Override
            public long getCurrentProcessingTime() {
                return WindowOperator.this.internalTimerService.currentProcessingTime();
            }
        };
        this.registerRestoredLegacyStateState();
    }

    @Override
    public void close() throws Exception {
        super.close();
        this.timestampedCollector = null;
        this.context = null;
        this.windowAssignerContext = null;
    }

    @Override
    public void dispose() throws Exception {
        super.dispose();
        this.timestampedCollector = null;
        this.context = null;
        this.windowAssignerContext = null;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        Collection<W> elementWindows = this.windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), this.windowAssignerContext);
        final Object key = this.getKeyedStateBackend().getCurrentKey();
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            MergingWindowSet<Window> mergingWindows = this.getMergingWindowSet();
            for (Window window : elementWindows) {
                Window actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>(){

                    @Override
                    public void merge(W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> mergedStateWindows) throws Exception {
                        WindowOperator.this.context.key = key;
                        WindowOperator.this.context.window = mergeResult;
                        WindowOperator.this.context.onMerge(mergedWindows);
                        for (Window m : mergedWindows) {
                            WindowOperator.this.context.window = m;
                            WindowOperator.this.context.clear();
                            WindowOperator.this.deleteCleanupTimer(m);
                        }
                        WindowOperator.this.getKeyedStateBackend().mergePartitionedStates(stateWindowResult, mergedStateWindows, WindowOperator.this.windowSerializer, WindowOperator.this.windowStateDescriptor);
                    }
                });
                if (this.isLate(actualWindow)) {
                    mergingWindows.retireWindow(actualWindow);
                    continue;
                }
                Window stateWindow = mergingWindows.getStateWindow(actualWindow);
                if (stateWindow == null) {
                    throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
                }
                AppendingState<IN, ACC> windowState = this.getPartitionedState(stateWindow, this.windowSerializer, this.windowStateDescriptor);
                windowState.add(element.getValue());
                this.context.key = key;
                this.context.window = actualWindow;
                TriggerResult triggerResult = this.context.onElement(element);
                if (triggerResult.isFire()) {
                    Object contents = windowState.get();
                    if (contents == null) continue;
                    this.emitWindowContents(actualWindow, contents);
                }
                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
                this.registerCleanupTimer(actualWindow);
            }
            mergingWindows.persist();
        } else {
            for (Window window : elementWindows) {
                if (this.isLate(window)) continue;
                AppendingState<IN, ACC> windowState = this.getPartitionedState(window, this.windowSerializer, this.windowStateDescriptor);
                windowState.add(element.getValue());
                this.context.key = key;
                this.context.window = window;
                TriggerResult triggerResult = this.context.onElement(element);
                if (triggerResult.isFire()) {
                    Object contents = windowState.get();
                    if (contents == null) continue;
                    this.emitWindowContents(window, contents);
                }
                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
                this.registerCleanupTimer(window);
            }
        }
    }

    @Override
    public void onEventTime(InternalTimer<K, W> timer) throws Exception {
        W stateWindow;
        this.context.key = timer.getKey();
        this.context.window = (Window)timer.getNamespace();
        MergingWindowSet mergingWindows = null;
        Object windowState = this.windowAssigner instanceof MergingWindowAssigner ? ((stateWindow = (mergingWindows = this.getMergingWindowSet()).getStateWindow(this.context.window)) == null ? null : this.getPartitionedState(stateWindow, this.windowSerializer, this.windowStateDescriptor)) : this.getPartitionedState(this.context.window, this.windowSerializer, this.windowStateDescriptor);
        Object contents = null;
        if (windowState != null) {
            contents = windowState.get();
        }
        if (contents != null) {
            TriggerResult triggerResult = this.context.onEventTime(timer.getTimestamp());
            if (triggerResult.isFire()) {
                this.emitWindowContents(this.context.window, contents);
            }
            if (triggerResult.isPurge()) {
                windowState.clear();
            }
        }
        if (this.windowAssigner.isEventTime() && this.isCleanupTime(this.context.window, timer.getTimestamp())) {
            this.clearAllState(this.context.window, (AppendingState<IN, ACC>)windowState, mergingWindows);
        }
        if (mergingWindows != null) {
            mergingWindows.persist();
        }
    }

    @Override
    public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
        W stateWindow;
        this.context.key = timer.getKey();
        this.context.window = (Window)timer.getNamespace();
        MergingWindowSet mergingWindows = null;
        Object windowState = this.windowAssigner instanceof MergingWindowAssigner ? ((stateWindow = (mergingWindows = this.getMergingWindowSet()).getStateWindow(this.context.window)) == null ? null : this.getPartitionedState(stateWindow, this.windowSerializer, this.windowStateDescriptor)) : this.getPartitionedState(this.context.window, this.windowSerializer, this.windowStateDescriptor);
        Object contents = null;
        if (windowState != null) {
            contents = windowState.get();
        }
        if (contents != null) {
            TriggerResult triggerResult = this.context.onProcessingTime(timer.getTimestamp());
            if (triggerResult.isFire()) {
                this.emitWindowContents(this.context.window, contents);
            }
            if (triggerResult.isPurge()) {
                windowState.clear();
            }
        }
        if (!this.windowAssigner.isEventTime() && this.isCleanupTime(this.context.window, timer.getTimestamp())) {
            this.clearAllState(this.context.window, (AppendingState<IN, ACC>)windowState, mergingWindows);
        }
        if (mergingWindows != null) {
            mergingWindows.persist();
        }
    }

    private void clearAllState(W window, AppendingState<IN, ACC> windowState, MergingWindowSet<W> mergingWindows) throws Exception {
        windowState.clear();
        this.context.clear();
        if (mergingWindows != null) {
            mergingWindows.retireWindow(window);
            mergingWindows.persist();
        }
    }

    private void emitWindowContents(W window, ACC contents) throws Exception {
        this.timestampedCollector.setAbsoluteTimestamp(((Window)window).maxTimestamp());
        ((InternalWindowFunction)this.userFunction).apply(this.context.key, this.context.window, contents, this.timestampedCollector);
    }

    protected MergingWindowSet<W> getMergingWindowSet() throws Exception {
        ListState mergeState = (ListState)this.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, this.mergingWindowsDescriptor);
        MergingWindowSet mergingWindows = new MergingWindowSet((MergingWindowAssigner)this.windowAssigner, mergeState);
        return mergingWindows;
    }

    protected boolean isLate(W window) {
        return this.windowAssigner.isEventTime() && this.cleanupTime(window) <= this.internalTimerService.currentWatermark();
    }

    protected void registerCleanupTimer(W window) {
        long cleanupTime = this.cleanupTime(window);
        if (cleanupTime == Long.MAX_VALUE) {
            return;
        }
        if (this.windowAssigner.isEventTime()) {
            this.context.registerEventTimeTimer(cleanupTime);
        } else {
            this.context.registerProcessingTimeTimer(cleanupTime);
        }
    }

    protected void deleteCleanupTimer(W window) {
        long cleanupTime = this.cleanupTime(window);
        if (cleanupTime == Long.MAX_VALUE) {
            return;
        }
        if (this.windowAssigner.isEventTime()) {
            this.context.deleteEventTimeTimer(cleanupTime);
        } else {
            this.context.deleteProcessingTimeTimer(cleanupTime);
        }
    }

    private long cleanupTime(W window) {
        if (this.windowAssigner.isEventTime()) {
            long cleanupTime = ((Window)window).maxTimestamp() + this.allowedLateness;
            return cleanupTime >= ((Window)window).maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
        }
        return ((Window)window).maxTimestamp();
    }

    protected final boolean isCleanupTime(W window, long time) {
        return time == this.cleanupTime(window);
    }

    @Override
    public void restoreState(FSDataInputStream in) throws Exception {
        super.restoreState(in);
        LOG.info("{} (taskIdx={}) restoring {} state from an older Flink version.", new Object[]{this.getClass().getSimpleName(), this.legacyWindowOperatorType, this.getRuntimeContext().getIndexOfThisSubtask()});
        DataInputViewStreamWrapper streamWrapper = new DataInputViewStreamWrapper((InputStream)in);
        switch (this.legacyWindowOperatorType) {
            case NONE: {
                this.restoreFromLegacyWindowOperator(streamWrapper);
                break;
            }
            case FAST_ACCUMULATING: 
            case FAST_AGGREGATING: {
                this.restoreFromLegacyAlignedWindowOperator(streamWrapper);
            }
        }
    }

    public void registerRestoredLegacyStateState() throws Exception {
        switch (this.legacyWindowOperatorType) {
            case NONE: {
                this.reregisterStateFromLegacyWindowOperator();
                break;
            }
            case FAST_ACCUMULATING: 
            case FAST_AGGREGATING: {
                this.reregisterStateFromLegacyAlignedWindowOperator();
            }
        }
    }

    private void restoreFromLegacyAlignedWindowOperator(DataInputViewStreamWrapper in) throws IOException {
        Preconditions.checkArgument((this.legacyWindowOperatorType != LegacyWindowOperatorType.NONE ? 1 : 0) != 0);
        long nextEvaluationTime = in.readLong();
        long nextSlideTime = in.readLong();
        WindowOperator.validateMagicNumber(267505218, in.readInt());
        this.restoredFromLegacyAlignedOpRecords = new PriorityQueue(42, new Comparator<StreamRecord<IN>>(){

            @Override
            public int compare(StreamRecord<IN> o1, StreamRecord<IN> o2) {
                return Long.compare(o1.getTimestamp(), o2.getTimestamp());
            }
        });
        switch (this.legacyWindowOperatorType) {
            case FAST_ACCUMULATING: {
                this.restoreElementsFromLegacyAccumulatingAlignedWindowOperator((DataInputView)in, nextSlideTime);
                break;
            }
            case FAST_AGGREGATING: {
                this.restoreElementsFromLegacyAggregatingAlignedWindowOperator((DataInputView)in, nextSlideTime);
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} (taskIdx={}) restored {} events from legacy {}.", new Object[]{this.getClass().getSimpleName(), this.getRuntimeContext().getIndexOfThisSubtask(), this.restoredFromLegacyAlignedOpRecords.size(), this.legacyWindowOperatorType});
        }
    }

    private void restoreElementsFromLegacyAccumulatingAlignedWindowOperator(DataInputView in, long nextSlideTime) throws IOException {
        int numPanes;
        long paneSize = this.getPaneSize();
        long nextElementTimestamp = nextSlideTime - (long)numPanes * paneSize;
        ArrayListSerializer ser = new ArrayListSerializer(this.getStateDescriptor().getSerializer());
        for (numPanes = in.readInt(); numPanes > 0; --numPanes) {
            WindowOperator.validateMagicNumber(-1159790379, in.readInt());
            nextElementTimestamp += paneSize - 1L;
            int numElementsInPane = in.readInt();
            for (int i = numElementsInPane - 1; i >= 0; --i) {
                Object key = this.keySerializer.deserialize(in);
                ArrayList valueList = ser.deserialize(in);
                for (Object record : valueList) {
                    this.restoredFromLegacyAlignedOpRecords.add(new StreamRecord(record, nextElementTimestamp));
                }
            }
        }
    }

    private void restoreElementsFromLegacyAggregatingAlignedWindowOperator(DataInputView in, long nextSlideTime) throws IOException {
        int numPanes;
        long paneSize = this.getPaneSize();
        long nextElementTimestamp = nextSlideTime - (long)numPanes * paneSize;
        for (numPanes = in.readInt(); numPanes > 0; --numPanes) {
            WindowOperator.validateMagicNumber(-1159790379, in.readInt());
            nextElementTimestamp += paneSize - 1L;
            int numElementsInPane = in.readInt();
            for (int i = numElementsInPane - 1; i >= 0; --i) {
                Object key = this.keySerializer.deserialize(in);
                Object value = this.getStateDescriptor().getSerializer().deserialize(in);
                this.restoredFromLegacyAlignedOpRecords.add(new StreamRecord<Object>(value, nextElementTimestamp));
            }
        }
    }

    private long getPaneSize() {
        long paneSlide;
        Preconditions.checkArgument((this.legacyWindowOperatorType == LegacyWindowOperatorType.FAST_ACCUMULATING || this.legacyWindowOperatorType == LegacyWindowOperatorType.FAST_AGGREGATING ? 1 : 0) != 0);
        if (this.windowAssigner instanceof SlidingProcessingTimeWindows) {
            SlidingProcessingTimeWindows timeWindows = (SlidingProcessingTimeWindows)this.windowAssigner;
            paneSlide = ArithmeticUtils.gcd((long)timeWindows.getSize(), (long)timeWindows.getSlide());
        } else {
            TumblingProcessingTimeWindows timeWindows = (TumblingProcessingTimeWindows)this.windowAssigner;
            paneSlide = timeWindows.getSize();
        }
        return paneSlide;
    }

    private static void validateMagicNumber(int expected, int found) throws IOException {
        if (expected != found) {
            throw new IOException("Corrupt state stream - wrong magic number. Expected '" + Integer.toHexString(expected) + "', found '" + Integer.toHexString(found) + '\'');
        }
    }

    private void restoreFromLegacyWindowOperator(DataInputViewStreamWrapper in) throws IOException {
        Preconditions.checkArgument((this.legacyWindowOperatorType == LegacyWindowOperatorType.NONE ? 1 : 0) != 0);
        int numWatermarkTimers = in.readInt();
        this.restoredFromLegacyEventTimeTimers = new PriorityQueue(Math.max(numWatermarkTimers, 1));
        for (int i = 0; i < numWatermarkTimers; ++i) {
            Object key = this.keySerializer.deserialize((DataInputView)in);
            Window window = (Window)this.windowSerializer.deserialize((DataInputView)in);
            long timestamp = in.readLong();
            Timer<Object, Window> timer = new Timer<Object, Window>(timestamp, key, window);
            this.restoredFromLegacyEventTimeTimers.add(timer);
        }
        int numProcessingTimeTimers = in.readInt();
        this.restoredFromLegacyProcessingTimeTimers = new PriorityQueue(Math.max(numProcessingTimeTimers, 1));
        for (int i = 0; i < numProcessingTimeTimers; ++i) {
            Object key = this.keySerializer.deserialize((DataInputView)in);
            Window window = (Window)this.windowSerializer.deserialize((DataInputView)in);
            long timestamp = in.readLong();
            Timer<Object, Window> timer = new Timer<Object, Window>(timestamp, key, window);
            this.restoredFromLegacyProcessingTimeTimers.add(timer);
        }
        int numProcessingTimeTimerTimestamp = in.readInt();
        for (int i = 0; i < numProcessingTimeTimerTimestamp; ++i) {
            in.readLong();
            in.readInt();
        }
        if (LOG.isDebugEnabled()) {
            int subtaskIdx = this.getRuntimeContext().getIndexOfThisSubtask();
            if (this.restoredFromLegacyEventTimeTimers != null && !this.restoredFromLegacyEventTimeTimers.isEmpty()) {
                LOG.debug("{} (taskIdx={}) restored {} event time timers from an older Flink version: {}", new Object[]{this.getClass().getSimpleName(), subtaskIdx, this.restoredFromLegacyEventTimeTimers.size(), this.restoredFromLegacyEventTimeTimers});
            }
            if (this.restoredFromLegacyProcessingTimeTimers != null && !this.restoredFromLegacyProcessingTimeTimers.isEmpty()) {
                LOG.debug("{} (taskIdx={}) restored {} processing time timers from an older Flink version: {}", new Object[]{this.getClass().getSimpleName(), subtaskIdx, this.restoredFromLegacyProcessingTimeTimers.size(), this.restoredFromLegacyProcessingTimeTimers});
            }
        }
    }

    public void reregisterStateFromLegacyWindowOperator() {
        if (this.restoredFromLegacyEventTimeTimers != null && !this.restoredFromLegacyEventTimeTimers.isEmpty()) {
            LOG.info("{} (taskIdx={}) re-registering event-time timers from an older Flink version.", (Object)this.getClass().getSimpleName(), (Object)this.getRuntimeContext().getIndexOfThisSubtask());
            for (Timer<K, W> timer : this.restoredFromLegacyEventTimeTimers) {
                this.setCurrentKey(timer.key);
                this.internalTimerService.registerEventTimeTimer(timer.window, timer.timestamp);
            }
        }
        if (this.restoredFromLegacyProcessingTimeTimers != null && !this.restoredFromLegacyProcessingTimeTimers.isEmpty()) {
            LOG.info("{} (taskIdx={}) re-registering processing-time timers from an older Flink version.", (Object)this.getClass().getSimpleName(), (Object)this.getRuntimeContext().getIndexOfThisSubtask());
            for (Timer<K, W> timer : this.restoredFromLegacyProcessingTimeTimers) {
                this.setCurrentKey(timer.key);
                this.internalTimerService.registerProcessingTimeTimer(timer.window, timer.timestamp);
            }
        }
        this.restoredFromLegacyEventTimeTimers = null;
        this.restoredFromLegacyProcessingTimeTimers = null;
    }

    public void reregisterStateFromLegacyAlignedWindowOperator() throws Exception {
        if (this.restoredFromLegacyAlignedOpRecords != null && !this.restoredFromLegacyAlignedOpRecords.isEmpty()) {
            LOG.info("{} (taskIdx={}) re-registering timers from legacy {} from an older Flink version.", new Object[]{this.getClass().getSimpleName(), this.getRuntimeContext().getIndexOfThisSubtask(), this.legacyWindowOperatorType});
            while (!this.restoredFromLegacyAlignedOpRecords.isEmpty()) {
                StreamRecord<IN> record = this.restoredFromLegacyAlignedOpRecords.poll();
                this.setCurrentKey(this.keySelector.getKey(record.getValue()));
                this.processElement(record);
            }
        }
        this.restoredFromLegacyAlignedOpRecords = null;
    }

    @VisibleForTesting
    public Trigger<? super IN, ? super W> getTrigger() {
        return this.trigger;
    }

    @VisibleForTesting
    public KeySelector<IN, K> getKeySelector() {
        return this.keySelector;
    }

    @VisibleForTesting
    public WindowAssigner<? super IN, W> getWindowAssigner() {
        return this.windowAssigner;
    }

    @VisibleForTesting
    public StateDescriptor<? extends AppendingState<IN, ACC>, ?> getStateDescriptor() {
        return this.windowStateDescriptor;
    }

    protected static class Timer<K, W extends Window>
    implements Comparable<Timer<K, W>> {
        protected long timestamp;
        protected K key;
        protected W window;

        public Timer(long timestamp, K key, W window) {
            this.timestamp = timestamp;
            this.key = key;
            this.window = window;
        }

        @Override
        public int compareTo(Timer<K, W> o) {
            return Long.compare(this.timestamp, o.timestamp);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Timer timer = (Timer)o;
            return this.timestamp == timer.timestamp && this.key.equals(timer.key) && this.window.equals(timer.window);
        }

        public int hashCode() {
            int result = (int)(this.timestamp ^ this.timestamp >>> 32);
            result = 31 * result + this.key.hashCode();
            result = 31 * result + this.window.hashCode();
            return result;
        }

        public String toString() {
            return "Timer{timestamp=" + this.timestamp + ", key=" + this.key + ", window=" + this.window + '}';
        }
    }

    public static class Context
    implements Trigger.OnMergeContext {
        protected K key;
        protected W window;
        protected Collection<W> mergedWindows;
        final /* synthetic */ WindowOperator this$0;

        public Context(K key, W window) {
            this.this$0 = var1_1;
            this.key = key;
            this.window = window;
        }

        @Override
        public MetricGroup getMetricGroup() {
            return this.this$0.getMetricGroup();
        }

        @Override
        public long getCurrentWatermark() {
            return this.this$0.internalTimerService.currentWatermark();
        }

        @Override
        public <S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) {
            TypeInformation typeInfo;
            Preconditions.checkNotNull(stateType, (String)"The state type class must not be null");
            try {
                typeInfo = TypeExtractor.getForClass(stateType);
            }
            catch (Exception e) {
                throw new RuntimeException("Cannot analyze type '" + stateType.getName() + "' from the class alone, due to generic type parameters. " + "Please specify the TypeInformation directly.", e);
            }
            return this.getKeyValueState(name, typeInfo, defaultState);
        }

        @Override
        public <S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) {
            Preconditions.checkNotNull((Object)name, (String)"The name of the state must not be null");
            Preconditions.checkNotNull(stateType, (String)"The state type information must not be null");
            ValueStateDescriptor stateDesc = new ValueStateDescriptor(name, stateType.createSerializer(this.this$0.getExecutionConfig()), defaultState);
            return (ValueState)this.getPartitionedState((StateDescriptor<S, ?>)stateDesc);
        }

        @Override
        public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            try {
                return (S)this.this$0.getPartitionedState(this.window, this.this$0.windowSerializer, stateDescriptor);
            }
            catch (Exception e) {
                throw new RuntimeException("Could not retrieve state", e);
            }
        }

        @Override
        public <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            if (this.mergedWindows != null && this.mergedWindows.size() > 0) {
                try {
                    this.this$0.getKeyedStateBackend().mergePartitionedStates(this.window, this.mergedWindows, this.this$0.windowSerializer, stateDescriptor);
                }
                catch (Exception e) {
                    throw new RuntimeException("Error while merging state.", e);
                }
            }
        }

        @Override
        public long getCurrentProcessingTime() {
            return this.this$0.internalTimerService.currentProcessingTime();
        }

        @Override
        public void registerProcessingTimeTimer(long time) {
            this.this$0.internalTimerService.registerProcessingTimeTimer(this.window, time);
        }

        @Override
        public void registerEventTimeTimer(long time) {
            this.this$0.internalTimerService.registerEventTimeTimer(this.window, time);
        }

        @Override
        public void deleteProcessingTimeTimer(long time) {
            this.this$0.internalTimerService.deleteProcessingTimeTimer(this.window, time);
        }

        @Override
        public void deleteEventTimeTimer(long time) {
            this.this$0.internalTimerService.deleteEventTimeTimer(this.window, time);
        }

        public TriggerResult onElement(StreamRecord<IN> element) throws Exception {
            return this.this$0.trigger.onElement(element.getValue(), element.getTimestamp(), this.window, this);
        }

        public TriggerResult onProcessingTime(long time) throws Exception {
            return this.this$0.trigger.onProcessingTime(time, this.window, this);
        }

        public TriggerResult onEventTime(long time) throws Exception {
            return this.this$0.trigger.onEventTime(time, this.window, this);
        }

        public void onMerge(Collection<W> mergedWindows) throws Exception {
            this.mergedWindows = mergedWindows;
            this.this$0.trigger.onMerge(this.window, this);
        }

        public void clear() throws Exception {
            this.this$0.trigger.clear(this.window, this);
        }

        public String toString() {
            return "Context{key=" + this.key + ", window=" + this.window + '}';
        }
    }
}

