/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Objects;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Migration;

@PublicEvolving
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
extends AbstractStreamOperator<OUT>
implements OutputTypeConfigurable<OUT>,
StreamCheckpointedOperator {
    private static final long serialVersionUID = 1L;
    protected final F userFunction;
    private transient boolean functionsClosed = false;

    public AbstractUdfStreamOperator(F userFunction) {
        this.userFunction = (Function)Objects.requireNonNull(userFunction);
        this.checkUdfCheckpointingPreconditions();
    }

    public F getUserFunction() {
        return this.userFunction;
    }

    @Override
    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
        super.setup(containingTask, config, output);
        FunctionUtils.setFunctionRuntimeContext(this.userFunction, (RuntimeContext)this.getRuntimeContext());
    }

    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        StreamingFunctionUtils.snapshotFunctionState(context, this.getOperatorStateBackend(), this.userFunction);
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        StreamingFunctionUtils.restoreFunctionState(context, this.userFunction);
    }

    @Override
    public void open() throws Exception {
        super.open();
        FunctionUtils.openFunction(this.userFunction, (Configuration)new Configuration());
    }

    @Override
    public void close() throws Exception {
        super.close();
        this.functionsClosed = true;
        FunctionUtils.closeFunction(this.userFunction);
    }

    @Override
    public void dispose() throws Exception {
        super.dispose();
        if (!this.functionsClosed) {
            this.functionsClosed = true;
            FunctionUtils.closeFunction(this.userFunction);
        }
    }

    @Override
    public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception {
        if (this.userFunction instanceof Checkpointed) {
            Checkpointed chkFunction = (Checkpointed)this.userFunction;
            try {
                Object udfState = chkFunction.snapshotState(checkpointId, timestamp);
                if (udfState != null) {
                    out.write(1);
                    InstantiationUtil.serializeObject((OutputStream)out, udfState);
                } else {
                    out.write(0);
                }
            }
            catch (Exception e) {
                throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e);
            }
        }
    }

    @Override
    public void restoreState(FSDataInputStream in) throws Exception {
        int hasUdfState;
        if (this.userFunction instanceof Checkpointed || this.userFunction instanceof CheckpointedRestoring && in instanceof Migration) {
            Serializable functionState;
            CheckpointedRestoring chkFunction = (CheckpointedRestoring)this.userFunction;
            int hasUdfState2 = in.read();
            if (hasUdfState2 == 1 && (functionState = (Serializable)InstantiationUtil.deserializeObject((InputStream)in, (ClassLoader)this.getUserCodeClassloader())) != null) {
                try {
                    chkFunction.restoreState(functionState);
                }
                catch (Exception e) {
                    throw new Exception("Failed to restore state to function: " + e.getMessage(), e);
                }
            }
        } else if (in instanceof Migration && (hasUdfState = in.read()) == 1) {
            throw new Exception("Found UDF state but operator is not instance of CheckpointedRestoring");
        }
    }

    @Override
    public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
        super.notifyOfCompletedCheckpoint(checkpointId);
        if (this.userFunction instanceof CheckpointListener) {
            ((CheckpointListener)this.userFunction).notifyCheckpointComplete(checkpointId);
        }
    }

    @Override
    public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
        StreamingFunctionUtils.setOutputType(this.userFunction, outTypeInfo, executionConfig);
    }

    public Configuration getUserFunctionParameters() {
        return new Configuration();
    }

    private void checkUdfCheckpointingPreconditions() {
        boolean newCheckpointInferface = false;
        if (this.userFunction instanceof CheckpointedFunction) {
            newCheckpointInferface = true;
        }
        if (this.userFunction instanceof ListCheckpointed) {
            if (newCheckpointInferface) {
                throw new IllegalStateException("User functions are not allowed to implement CheckpointedFunction AND ListCheckpointed.");
            }
            newCheckpointInferface = true;
        }
        if (newCheckpointInferface && this.userFunction instanceof Checkpointed) {
            throw new IllegalStateException("User functions are not allowed to implement Checkpointed AND CheckpointedFunction/ListCheckpointed.");
        }
    }
}

