package com.starrocks.connector.flink.table.sink;

import com.starrocks.connector.flink.manager.StarRocksSinkBufferEntity;
import com.starrocks.connector.flink.manager.StarRocksSinkTable;
import com.starrocks.connector.flink.manager.StarRocksStreamLoadListener;
import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer;
import com.starrocks.connector.flink.row.sink.StarRocksISerializer;
import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory;
import com.starrocks.connector.flink.table.data.StarRocksRowData;
import com.starrocks.connector.flink.tools.EnvUtils;
import com.starrocks.connector.flink.tools.JsonWrapper;
import com.starrocks.data.load.stream.LabelGeneratorFactory;
import com.starrocks.data.load.stream.StreamLoadSnapshot;
import com.starrocks.data.load.stream.v2.StreamLoadManagerV2;
import com.starrocks.shade.com.google.common.base.Strings;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.alter.Alter;
import net.sf.jsqlparser.statement.truncate.Truncate;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.NestedRowData;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.class */
public class StarRocksDynamicSinkFunctionV2<T> extends StarRocksDynamicSinkFunctionBase<T> {
    private static final long serialVersionUID = 1;
    private static final Logger log = LoggerFactory.getLogger(StarRocksDynamicSinkFunctionV2.class);
    private static final int NESTED_ROW_DATA_HEADER_SIZE = 256;
    private final StarRocksSinkOptions sinkOptions;
    private final StreamLoadManagerV2 sinkManager;
    private final StarRocksISerializer serializer;
    private final StarRocksIRowTransformer<T> rowTransformer;
    private volatile transient ListState<StarrocksSnapshotState> snapshotStates;
    private transient long restoredCheckpointId;
    private transient List<ExactlyOnceLabelGeneratorSnapshot> restoredGeneratorSnapshots;
    private transient Map<Long, List<StreamLoadSnapshot>> snapshotMap;
    private transient StarRocksStreamLoadListener streamLoadListener;

    @Nullable
    private transient ExactlyOnceLabelGeneratorFactory exactlyOnceLabelFactory;

    @Deprecated
    private transient ListState<Map<String, StarRocksSinkBufferEntity>> legacyState;

    @Deprecated
    private transient List<StarRocksSinkBufferEntity> legacyData;
    private transient long totalReceivedRows;
    private transient JsonWrapper jsonWrapper;

    public StarRocksDynamicSinkFunctionV2(StarRocksSinkOptions starRocksSinkOptions, TableSchema tableSchema, StarRocksIRowTransformer<T> starRocksIRowTransformer) {
        this.sinkOptions = starRocksSinkOptions;
        this.rowTransformer = starRocksIRowTransformer;
        StarRocksSinkTable build = StarRocksSinkTable.builder().sinkOptions(starRocksSinkOptions).build();
        build.validateTableStructure(starRocksSinkOptions, tableSchema);
        this.serializer = StarRocksSerializerFactory.createSerializer(starRocksSinkOptions, tableSchema.getFieldNames());
        starRocksIRowTransformer.setStarRocksColumns(build.getFieldMapping());
        starRocksIRowTransformer.setTableSchema(tableSchema);
        this.sinkManager = new StreamLoadManagerV2(starRocksSinkOptions.getProperties(build), starRocksSinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE);
    }

    public StarRocksDynamicSinkFunctionV2(StarRocksSinkOptions starRocksSinkOptions) {
        this.sinkOptions = starRocksSinkOptions;
        this.sinkManager = new StreamLoadManagerV2(starRocksSinkOptions.getProperties(null), starRocksSinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE);
        this.serializer = null;
        this.rowTransformer = null;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void invoke(T t, SinkFunction.Context context) throws Exception {
        if (this.serializer == null) {
            if (t instanceof StarRocksSinkRowDataWithMeta) {
                StarRocksSinkRowDataWithMeta starRocksSinkRowDataWithMeta = (StarRocksSinkRowDataWithMeta) t;
                if (Strings.isNullOrEmpty(starRocksSinkRowDataWithMeta.getDatabase()) || Strings.isNullOrEmpty(starRocksSinkRowDataWithMeta.getTable()) || starRocksSinkRowDataWithMeta.getDataRows() == null) {
                    log.warn(String.format("json row data not fulfilled. {database: %s, table: %s, dataRows: %s}", starRocksSinkRowDataWithMeta.getDatabase(), starRocksSinkRowDataWithMeta.getTable(), Arrays.toString(starRocksSinkRowDataWithMeta.getDataRows())));
                    return;
                } else {
                    this.sinkManager.write(null, starRocksSinkRowDataWithMeta.getDatabase(), starRocksSinkRowDataWithMeta.getTable(), starRocksSinkRowDataWithMeta.getDataRows());
                    return;
                }
            }
            if (!(t instanceof StarRocksRowData)) {
                this.sinkManager.write(null, this.sinkOptions.getDatabaseName(), this.sinkOptions.getTableName(), t.toString());
                return;
            }
            StarRocksRowData starRocksRowData = (StarRocksRowData) t;
            if (Strings.isNullOrEmpty(starRocksRowData.getDatabase()) || Strings.isNullOrEmpty(starRocksRowData.getTable()) || starRocksRowData.getRow() == null) {
                log.warn(String.format("json row data not fulfilled. {database: %s, table: %s, dataRows: %s}", starRocksRowData.getDatabase(), starRocksRowData.getTable(), starRocksRowData.getRow()));
                return;
            } else {
                this.sinkManager.write(starRocksRowData.getUniqueKey(), starRocksRowData.getDatabase(), starRocksRowData.getTable(), starRocksRowData.getRow());
                return;
            }
        }
        if (t instanceof NestedRowData) {
            NestedRowData nestedRowData = (NestedRowData) t;
            if (nestedRowData.getSegments().length != 1 || nestedRowData.getSegments()[0].size() < NESTED_ROW_DATA_HEADER_SIZE) {
                return;
            }
            byte[] bArr = new byte[nestedRowData.getSegments()[0].size() - NESTED_ROW_DATA_HEADER_SIZE];
            nestedRowData.getSegments()[0].get(NESTED_ROW_DATA_HEADER_SIZE, bArr);
            Map map = (Map) InstantiationUtil.deserializeObject(bArr, HashMap.class.getClassLoader());
            if (map == null || "true".equals(map.get("snapshot")) || Strings.isNullOrEmpty((String) map.get("ddl")) || Strings.isNullOrEmpty((String) map.get("databaseName"))) {
                return;
            }
            Truncate parse = CCJSqlParserUtil.parse((String) map.get("ddl"));
            if (parse instanceof Truncate) {
                if (!this.sinkOptions.getTableName().equalsIgnoreCase(parse.getTable().getName())) {
                    return;
                }
            } else if (parse instanceof Alter) {
            }
        }
        if (t instanceof RowData) {
            if (RowKind.UPDATE_BEFORE.equals(((RowData) t).getRowKind()) && (!this.sinkOptions.supportUpsertDelete() || this.sinkOptions.getIgnoreUpdateBefore())) {
                return;
            }
            if (!this.sinkOptions.supportUpsertDelete() && RowKind.DELETE.equals(((RowData) t).getRowKind())) {
                return;
            }
        }
        flushLegacyData();
        String serialize = this.serializer.serialize(this.rowTransformer.transform(t, this.sinkOptions.supportUpsertDelete()));
        this.sinkManager.write(null, this.sinkOptions.getDatabaseName(), this.sinkOptions.getTableName(), serialize);
        this.totalReceivedRows++;
        if (this.totalReceivedRows % 100 == 1) {
            log.debug("Received raw record: {}", t);
            log.debug("Received serialized record: {}", serialize);
        }
    }

    public void open(Configuration configuration) throws Exception {
        LabelGeneratorFactory defaultLabelGeneratorFactory;
        this.totalReceivedRows = 0L;
        if (this.serializer != null) {
            this.serializer.open(new StarRocksISerializer.SerializerContext(getOrCreateJsonWrapper()));
        }
        this.streamLoadListener = new StarRocksStreamLoadListener(getRuntimeContext(), this.sinkOptions);
        this.sinkManager.setStreamLoadListener(this.streamLoadListener);
        String labelPrefix = this.sinkOptions.getLabelPrefix();
        if (labelPrefix == null || this.sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE || !this.sinkOptions.isEnableExactlyOnceLabelGen()) {
            defaultLabelGeneratorFactory = new LabelGeneratorFactory.DefaultLabelGeneratorFactory(labelPrefix == null ? "flink" : labelPrefix);
        } else {
            this.exactlyOnceLabelFactory = new ExactlyOnceLabelGeneratorFactory(labelPrefix, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getIndexOfThisSubtask(), this.restoredCheckpointId);
            this.exactlyOnceLabelFactory.restore(this.restoredGeneratorSnapshots);
            defaultLabelGeneratorFactory = this.exactlyOnceLabelFactory;
        }
        this.sinkManager.setLabelGeneratorFactory(defaultLabelGeneratorFactory);
        this.sinkManager.init();
        if (this.rowTransformer != null) {
            this.rowTransformer.setRuntimeContext(getRuntimeContext());
            this.rowTransformer.setFastJsonWrapper(getOrCreateJsonWrapper());
        }
        if (this.sinkOptions.getSemantic() == StarRocksSinkSemantic.EXACTLY_ONCE) {
            openForExactlyOnce();
        }
        log.info("Open sink function v2. {}", EnvUtils.getGitInformation());
    }

    private void openForExactlyOnce() throws Exception {
        if (this.sinkOptions.isAbortLingeringTxns()) {
            new LingeringTransactionAborter(this.sinkOptions.getLabelPrefix(), this.restoredCheckpointId, getRuntimeContext().getIndexOfThisSubtask(), this.sinkOptions.getAbortCheckNumTxns(), this.sinkOptions.getDbTables(), this.restoredGeneratorSnapshots, this.sinkManager.getStreamLoader()).execute();
        }
        notifyCheckpointComplete(Long.MAX_VALUE);
    }

    private JsonWrapper getOrCreateJsonWrapper() {
        if (this.jsonWrapper == null) {
            this.jsonWrapper = new JsonWrapper();
        }
        return this.jsonWrapper;
    }

    public void finish() {
        this.sinkManager.flush();
    }

    public void close() {
        log.info("Close sink function");
        try {
            try {
                this.sinkManager.flush();
                this.sinkManager.abort(this.sinkManager.snapshot());
                this.sinkManager.close();
            } catch (Exception e) {
                log.error("Failed to flush when closing", e);
                throw e;
            }
        } catch (Throwable th) {
            this.sinkManager.abort(this.sinkManager.snapshot());
            this.sinkManager.close();
            throw th;
        }
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        this.sinkManager.flush();
        if (this.sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) {
            return;
        }
        StreamLoadSnapshot snapshot = this.sinkManager.snapshot();
        if (!this.sinkManager.prepare(snapshot)) {
            this.sinkManager.abort(snapshot);
            throw new RuntimeException("Snapshot state failed by prepare");
        }
        this.snapshotMap.put(Long.valueOf(functionSnapshotContext.getCheckpointId()), Collections.singletonList(snapshot));
        this.snapshotStates.clear();
        this.snapshotStates.add(StarrocksSnapshotState.of(this.snapshotMap, this.exactlyOnceLabelFactory == null ? null : this.exactlyOnceLabelFactory.snapshot(functionSnapshotContext.getCheckpointId())));
        if (this.legacyState != null) {
            this.legacyState.clear();
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        log.info("Initialize state");
        if (this.sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) {
            return;
        }
        this.snapshotStates = new SimpleVersionedListState(functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("starrocks-sink-transaction", TypeInformation.of(new TypeHint<byte[]>() { // from class: com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.1
        }))), new StarRocksVersionedSerializer(getOrCreateJsonWrapper()));
        this.legacyState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("buffered-rows", TypeInformation.of(new TypeHint<Map<String, StarRocksSinkBufferEntity>>() { // from class: com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionV2.2
        })));
        this.restoredCheckpointId = functionInitializationContext.getRestoredCheckpointId().orElse(0L);
        this.restoredGeneratorSnapshots = new ArrayList();
        this.snapshotMap = new ConcurrentHashMap();
        if (functionInitializationContext.isRestored()) {
            for (StarrocksSnapshotState starrocksSnapshotState : (Iterable) this.snapshotStates.get()) {
                for (Map.Entry<Long, List<StreamLoadSnapshot>> entry : starrocksSnapshotState.getData().entrySet()) {
                    this.snapshotMap.compute(entry.getKey(), (l, list) -> {
                        if (list == null) {
                            return new ArrayList((Collection) entry.getValue());
                        }
                        list.addAll((Collection) entry.getValue());
                        return list;
                    });
                }
                if (starrocksSnapshotState.getLabelSnapshots() != null) {
                    this.restoredGeneratorSnapshots.addAll(starrocksSnapshotState.getLabelSnapshots());
                }
            }
            this.legacyData = new ArrayList();
            Iterator<T> it = ((Iterable) this.legacyState.get()).iterator();
            while (it.hasNext()) {
                this.legacyData.addAll(((Map) it.next()).values());
            }
            log.info("There are {} items from legacy state", Integer.valueOf(this.legacyData.size()));
        }
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        if (this.sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) {
            return;
        }
        boolean z = true;
        for (Long l : (List) this.snapshotMap.keySet().stream().filter(l2 -> {
            return l2.longValue() <= j;
        }).sorted((v0, v1) -> {
            return Long.compare(v0, v1);
        }).collect(Collectors.toList())) {
            try {
                Iterator<StreamLoadSnapshot> it = this.snapshotMap.get(l).iterator();
                while (true) {
                    if (it.hasNext()) {
                        if (!this.sinkManager.commit(it.next())) {
                            z = false;
                            break;
                        }
                    } else {
                        break;
                    }
                }
                if (!z) {
                    throw new RuntimeException(String.format("Failed to commit some transactions for snapshot %s, please check taskmanager logs for details", l));
                }
                this.snapshotMap.remove(l);
            } catch (Exception e) {
                log.error("Failed to notify checkpoint complete, checkpoint id : {}", Long.valueOf(j), e);
                throw new RuntimeException("Failed to notify checkpoint complete for checkpoint id " + j, e);
            }
        }
        this.legacyState = null;
    }

    private void flushLegacyData() {
        if (this.legacyData == null || this.legacyData.isEmpty()) {
            return;
        }
        for (StarRocksSinkBufferEntity starRocksSinkBufferEntity : this.legacyData) {
            Iterator<byte[]> it = starRocksSinkBufferEntity.getBuffer().iterator();
            while (it.hasNext()) {
                this.sinkManager.write(null, starRocksSinkBufferEntity.getDatabase(), starRocksSinkBufferEntity.getTable(), new String(it.next(), StandardCharsets.UTF_8));
            }
            log.info("Write {} legacy records from table '{}' of database '{}'", new Object[]{Integer.valueOf(starRocksSinkBufferEntity.getBuffer().size()), starRocksSinkBufferEntity.getDatabase(), starRocksSinkBufferEntity.getTable()});
        }
        this.legacyData.clear();
    }
}
