/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.rules.logical;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.rex.RexNode;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.codegen.WatermarkGeneratorCodeGenerator;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.runtime.generated.GeneratedWatermarkGenerator;
import scala.Option;

public abstract class PushWatermarkIntoTableSourceScanRuleBase
extends RelOptRule {
    public PushWatermarkIntoTableSourceScanRuleBase(RelOptRuleOperand operand, String description) {
        super(operand, description);
    }

    protected FlinkLogicalTableSourceScan getNewScan(FlinkLogicalWatermarkAssigner watermarkAssigner, RexNode watermarkExpr, FlinkLogicalTableSourceScan scan, TableConfig tableConfig, boolean useWatermarkAssignerRowType) {
        GeneratedWatermarkGenerator generatedWatermarkGenerator = WatermarkGeneratorCodeGenerator.generateWatermarkGenerator(tableConfig, FlinkTypeFactory.toLogicalRowType(scan.getRowType()), watermarkExpr, (Option<String>)Option.apply((Object)"context"));
        Configuration configuration = tableConfig.getConfiguration();
        DefaultWatermarkGeneratorSupplier supplier = new DefaultWatermarkGeneratorSupplier(configuration, generatedWatermarkGenerator);
        String digest = String.format("watermark=[%s]", watermarkExpr);
        WatermarkStrategy watermarkStrategy = WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier)supplier);
        Duration idleTimeout = (Duration)configuration.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT);
        if (!idleTimeout.isZero() && !idleTimeout.isNegative()) {
            watermarkStrategy.withIdleness(idleTimeout);
            digest = String.format("%s, idletimeout=[%s]", digest, idleTimeout.toMillis());
        }
        TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
        DynamicTableSource newDynamicTableSource = tableSourceTable.tableSource().copy();
        ((SupportsWatermarkPushDown)newDynamicTableSource).applyWatermark(watermarkStrategy);
        TableSourceTable newTableSourceTable = useWatermarkAssignerRowType ? tableSourceTable.copy(newDynamicTableSource, watermarkAssigner.getRowType(), new String[]{digest}) : tableSourceTable.copy(newDynamicTableSource, scan.getRowType(), new String[]{digest});
        return FlinkLogicalTableSourceScan.create(scan.getCluster(), newTableSourceTable);
    }

    protected boolean supportsWatermarkPushDown(FlinkLogicalTableSourceScan scan) {
        TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class);
        return tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsWatermarkPushDown;
    }

    private static class DefaultWatermarkGeneratorSupplier
    implements WatermarkGeneratorSupplier<RowData> {
        private static final long serialVersionUID = 1L;
        private final Configuration configuration;
        private final GeneratedWatermarkGenerator generatedWatermarkGenerator;

        public DefaultWatermarkGeneratorSupplier(Configuration configuration, GeneratedWatermarkGenerator generatedWatermarkGenerator) {
            this.configuration = configuration;
            this.generatedWatermarkGenerator = generatedWatermarkGenerator;
        }

        public WatermarkGenerator<RowData> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            ArrayList<Object> references = new ArrayList<Object>(Arrays.asList(this.generatedWatermarkGenerator.getReferences()));
            references.add(context);
            org.apache.flink.table.runtime.generated.WatermarkGenerator innerWatermarkGenerator = (org.apache.flink.table.runtime.generated.WatermarkGenerator)new GeneratedWatermarkGenerator(this.generatedWatermarkGenerator.getClassName(), this.generatedWatermarkGenerator.getCode(), references.toArray()).newInstance(Thread.currentThread().getContextClassLoader());
            try {
                innerWatermarkGenerator.open(this.configuration);
            }
            catch (Exception e) {
                throw new RuntimeException("Fail to instantiate generated watermark generator.", e);
            }
            return new DefaultWatermarkGenerator(innerWatermarkGenerator);
        }

        private static class DefaultWatermarkGenerator
        implements WatermarkGenerator<RowData> {
            private static final long serialVersionUID = 1L;
            private final org.apache.flink.table.runtime.generated.WatermarkGenerator innerWatermarkGenerator;
            private Long currentWatermark = Long.MIN_VALUE;

            public DefaultWatermarkGenerator(org.apache.flink.table.runtime.generated.WatermarkGenerator watermarkGenerator) {
                this.innerWatermarkGenerator = watermarkGenerator;
            }

            public void onEvent(RowData event, long eventTimestamp, WatermarkOutput output) {
                try {
                    Long watermark = this.innerWatermarkGenerator.currentWatermark(event);
                    if (watermark != null) {
                        this.currentWatermark = watermark;
                    }
                }
                catch (Exception e) {
                    throw new RuntimeException(String.format("Generated WatermarkGenerator fails to generate for row: %s.", event), e);
                }
            }

            public void onPeriodicEmit(WatermarkOutput output) {
                output.emitWatermark(new Watermark(this.currentWatermark.longValue()));
            }
        }
    }
}

