/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

@Internal
public class StreamGroupedFold<IN, OUT, KEY>
extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT>,
OutputTypeConfigurable<OUT> {
    private static final long serialVersionUID = 1L;
    private static final String STATE_NAME = "_op_state";
    private transient ValueState<OUT> values;
    private transient OUT initialValue;
    private byte[] serializedInitialValue;
    private TypeSerializer<OUT> outTypeSerializer;

    public StreamGroupedFold(FoldFunction<IN, OUT> folder, OUT initialValue) {
        super(folder);
        this.initialValue = initialValue;
    }

    @Override
    public void open() throws Exception {
        super.open();
        if (this.serializedInitialValue == null) {
            throw new RuntimeException("No initial value was serialized for the fold operator. Probably the setOutputType method was not called.");
        }
        try (ByteArrayInputStream bais = new ByteArrayInputStream(this.serializedInitialValue);
             DataInputViewStreamWrapper in = new DataInputViewStreamWrapper((InputStream)bais);){
            this.initialValue = this.outTypeSerializer.deserialize((DataInputView)in);
        }
        ValueStateDescriptor stateId = new ValueStateDescriptor(STATE_NAME, this.outTypeSerializer);
        this.values = (ValueState)this.getPartitionedState(stateId);
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        Object value = this.values.value();
        if (value != null) {
            Object folded = ((FoldFunction)this.userFunction).fold(this.outTypeSerializer.copy(value), element.getValue());
            this.values.update(folded);
            this.output.collect(element.replace(folded));
        } else {
            Object first = ((FoldFunction)this.userFunction).fold(this.outTypeSerializer.copy(this.initialValue), element.getValue());
            this.values.update(first);
            this.output.collect(element.replace(first));
        }
    }

    @Override
    public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
        this.outTypeSerializer = outTypeInfo.createSerializer(executionConfig);
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper((OutputStream)baos);
        try {
            this.outTypeSerializer.serialize(this.initialValue, (DataOutputView)out);
        }
        catch (IOException ioe) {
            throw new RuntimeException("Unable to serialize initial value of type " + this.initialValue.getClass().getSimpleName() + " of fold operator.", ioe);
        }
        this.serializedInitialValue = baos.toByteArray();
    }
}

