/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.Collection;
import java.util.Objects;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.AppendingState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.shaded.com.google.common.base.Function;
import org.apache.flink.shaded.com.google.common.collect.FluentIterable;
import org.apache.flink.shaded.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet;
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

@Internal
public class EvictingWindowOperator<K, IN, OUT, W extends Window>
extends WindowOperator<K, IN, Iterable<IN>, OUT, W> {
    private static final long serialVersionUID = 1L;
    private final Evictor<? super IN, ? super W> evictor;
    private transient EvictorContext evictorContext;
    private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor;

    public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> windowSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> keySerializer, StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> windowStateDescriptor, InternalWindowFunction<Iterable<IN>, OUT, K, W> windowFunction, Trigger<? super IN, ? super W> trigger, Evictor<? super IN, ? super W> evictor, long allowedLateness) {
        super(windowAssigner, windowSerializer, keySelector, keySerializer, null, windowFunction, trigger, allowedLateness);
        this.evictor = Objects.requireNonNull(evictor);
        this.windowStateDescriptor = windowStateDescriptor;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        Collection elementWindows = this.windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), this.windowAssignerContext);
        final Object key = this.getKeyedStateBackend().getCurrentKey();
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            MergingWindowSet<Window> mergingWindows = this.getMergingWindowSet();
            for (Window window : elementWindows) {
                Window actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>(){

                    @Override
                    public void merge(W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> mergedStateWindows) throws Exception {
                        EvictingWindowOperator.this.context.key = key;
                        EvictingWindowOperator.this.context.window = mergeResult;
                        EvictingWindowOperator.this.context.onMerge(mergedWindows);
                        for (Window m : mergedWindows) {
                            EvictingWindowOperator.this.context.window = m;
                            EvictingWindowOperator.this.context.clear();
                            EvictingWindowOperator.this.deleteCleanupTimer(m);
                        }
                        EvictingWindowOperator.this.getKeyedStateBackend().mergePartitionedStates(stateWindowResult, mergedStateWindows, EvictingWindowOperator.this.windowSerializer, EvictingWindowOperator.this.windowStateDescriptor);
                    }
                });
                if (this.isLate(actualWindow)) {
                    mergingWindows.retireWindow(actualWindow);
                    continue;
                }
                Window stateWindow = mergingWindows.getStateWindow(actualWindow);
                if (stateWindow == null) {
                    throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
                }
                ListState<StreamRecord<IN>> windowState = this.getPartitionedState(stateWindow, this.windowSerializer, this.windowStateDescriptor);
                windowState.add(element);
                this.context.key = key;
                this.context.window = actualWindow;
                this.evictorContext.key = key;
                this.evictorContext.window = actualWindow;
                TriggerResult triggerResult = this.context.onElement(element);
                if (triggerResult.isFire()) {
                    Iterable contents = (Iterable)windowState.get();
                    if (contents == null) continue;
                    this.emitWindowContents(actualWindow, contents, windowState);
                }
                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
                this.registerCleanupTimer(actualWindow);
            }
            mergingWindows.persist();
        } else {
            for (Window window : elementWindows) {
                if (this.isLate(window)) continue;
                ListState<StreamRecord<IN>> windowState = this.getPartitionedState(window, this.windowSerializer, this.windowStateDescriptor);
                windowState.add(element);
                this.context.key = key;
                this.context.window = window;
                this.evictorContext.key = key;
                this.evictorContext.window = window;
                TriggerResult triggerResult = this.context.onElement(element);
                if (triggerResult.isFire()) {
                    Iterable contents = (Iterable)windowState.get();
                    if (contents == null) continue;
                    this.emitWindowContents(window, contents, windowState);
                }
                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
                this.registerCleanupTimer(window);
            }
        }
    }

    @Override
    public void onEventTime(InternalTimer<K, W> timer) throws Exception {
        Object stateWindow;
        this.context.key = timer.getKey();
        this.context.window = (Window)timer.getNamespace();
        this.evictorContext.key = timer.getKey();
        this.evictorContext.window = (Window)timer.getNamespace();
        MergingWindowSet mergingWindows = null;
        Object windowState = this.windowAssigner instanceof MergingWindowAssigner ? ((stateWindow = (mergingWindows = this.getMergingWindowSet()).getStateWindow(this.context.window)) == null ? null : this.getPartitionedState(stateWindow, this.windowSerializer, this.windowStateDescriptor)) : this.getPartitionedState(this.context.window, this.windowSerializer, this.windowStateDescriptor);
        Iterable contents = null;
        if (windowState != null) {
            contents = (Iterable)windowState.get();
        }
        if (contents != null) {
            TriggerResult triggerResult = this.context.onEventTime(timer.getTimestamp());
            if (triggerResult.isFire()) {
                this.emitWindowContents(this.context.window, contents, (ListState<StreamRecord<IN>>)windowState);
            }
            if (triggerResult.isPurge()) {
                windowState.clear();
            }
        }
        if (this.windowAssigner.isEventTime() && this.isCleanupTime(this.context.window, timer.getTimestamp())) {
            this.clearAllState(this.context.window, (ListState<StreamRecord<IN>>)windowState, mergingWindows);
        }
        if (mergingWindows != null) {
            mergingWindows.persist();
        }
    }

    @Override
    public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
        Object stateWindow;
        this.context.key = timer.getKey();
        this.context.window = (Window)timer.getNamespace();
        this.evictorContext.key = timer.getKey();
        this.evictorContext.window = (Window)timer.getNamespace();
        MergingWindowSet mergingWindows = null;
        Object windowState = this.windowAssigner instanceof MergingWindowAssigner ? ((stateWindow = (mergingWindows = this.getMergingWindowSet()).getStateWindow(this.context.window)) == null ? null : this.getPartitionedState(stateWindow, this.windowSerializer, this.windowStateDescriptor)) : this.getPartitionedState(this.context.window, this.windowSerializer, this.windowStateDescriptor);
        Iterable contents = null;
        if (windowState != null) {
            contents = (Iterable)windowState.get();
        }
        if (contents != null) {
            TriggerResult triggerResult = this.context.onProcessingTime(timer.getTimestamp());
            if (triggerResult.isFire()) {
                this.emitWindowContents(this.context.window, contents, (ListState<StreamRecord<IN>>)windowState);
            }
            if (triggerResult.isPurge()) {
                windowState.clear();
            }
        }
        if (!this.windowAssigner.isEventTime() && this.isCleanupTime(this.context.window, timer.getTimestamp())) {
            this.clearAllState(this.context.window, (ListState<StreamRecord<IN>>)windowState, mergingWindows);
        }
        if (mergingWindows != null) {
            mergingWindows.persist();
        }
    }

    private void emitWindowContents(W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception {
        this.timestampedCollector.setAbsoluteTimestamp(((Window)window).maxTimestamp());
        FluentIterable recordsWithTimestamp = FluentIterable.from(contents).transform(new Function<StreamRecord<IN>, TimestampedValue<IN>>(){

            @Override
            public TimestampedValue<IN> apply(StreamRecord<IN> input) {
                return TimestampedValue.from(input);
            }
        });
        this.evictorContext.evictBefore(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
        FluentIterable projectedContents = recordsWithTimestamp.transform(new Function<TimestampedValue<IN>, IN>(){

            @Override
            public IN apply(TimestampedValue<IN> input) {
                return input.getValue();
            }
        });
        ((InternalWindowFunction)this.userFunction).apply(this.context.key, this.context.window, projectedContents, this.timestampedCollector);
        this.evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
        windowState.clear();
        for (TimestampedValue timestampedValue : recordsWithTimestamp) {
            windowState.add(timestampedValue.getStreamRecord());
        }
    }

    private void clearAllState(W window, ListState<StreamRecord<IN>> windowState, MergingWindowSet<W> mergingWindows) throws Exception {
        windowState.clear();
        this.context.clear();
        if (mergingWindows != null) {
            mergingWindows.retireWindow(window);
            mergingWindows.persist();
        }
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.evictorContext = new EvictorContext(this, null, null);
    }

    @Override
    public void close() throws Exception {
        super.close();
        this.evictorContext = null;
    }

    @Override
    public void dispose() throws Exception {
        super.dispose();
        this.evictorContext = null;
    }

    @VisibleForTesting
    public Evictor<? super IN, ? super W> getEvictor() {
        return this.evictor;
    }

    @Override
    @VisibleForTesting
    public StateDescriptor<? extends AppendingState<IN, Iterable<IN>>, ?> getStateDescriptor() {
        return this.windowStateDescriptor;
    }

    static class EvictorContext
    implements Evictor.EvictorContext {
        protected K key;
        protected W window;
        final /* synthetic */ EvictingWindowOperator this$0;

        public EvictorContext(K key, W window) {
            this.this$0 = var1_1;
            this.key = key;
            this.window = window;
        }

        @Override
        public long getCurrentProcessingTime() {
            return this.this$0.internalTimerService.currentProcessingTime();
        }

        @Override
        public long getCurrentWatermark() {
            return this.this$0.internalTimerService.currentWatermark();
        }

        @Override
        public MetricGroup getMetricGroup() {
            return this.this$0.getMetricGroup();
        }

        public K getKey() {
            return this.key;
        }

        void evictBefore(Iterable<TimestampedValue<IN>> elements, int size) {
            this.this$0.evictor.evictBefore(elements, size, this.window, this);
        }

        void evictAfter(Iterable<TimestampedValue<IN>> elements, int size) {
            this.this$0.evictor.evictAfter(elements, size, this.window, this);
        }
    }
}

