package org.apache.doris.flink.tools.cdc;

import java.io.Serializable;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.doris.flink.catalog.doris.DorisSystem;
import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.cfg.DorisConnectionOptions;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.LoadConstants;
import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
import org.apache.doris.flink.table.DorisConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/doris/flink/tools/cdc/DatabaseSync.class */
public abstract class DatabaseSync {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DatabaseSync.class);
    private static final String LIGHT_SCHEMA_CHANGE = "light_schema_change";
    private static final String TABLE_NAME_OPTIONS = "table-name";
    protected Configuration config;
    protected String database;
    protected TableNameConverter converter;
    protected Pattern includingPattern;
    protected Pattern excludingPattern;
    protected Map<Pattern, String> multiToOneRulesPattern;
    protected Configuration sinkConfig;
    protected boolean ignoreDefaultValue;
    public StreamExecutionEnvironment env;
    private boolean newSchemaChange;
    protected String includingTables;
    protected String excludingTables;
    protected String multiToOneOrigin;
    protected String multiToOneTarget;
    protected String tablePrefix;
    protected String tableSuffix;
    protected boolean singleSink;
    protected Map<String, String> tableConfig = new HashMap();
    private boolean createTableOnly = false;
    private Map<String, String> tableMapping = new HashMap();

    /* loaded from: input_file:org/apache/doris/flink/tools/cdc/DatabaseSync$TableNameConverter.class */
    public static class TableNameConverter implements Serializable {
        private static final long serialVersionUID = 1;
        private final String prefix;
        private final String suffix;
        private Map<Pattern, String> multiToOneRulesPattern;

        TableNameConverter() {
            this("", "");
        }

        TableNameConverter(String str, String str2) {
            this.prefix = str == null ? "" : str;
            this.suffix = str2 == null ? "" : str2;
        }

        TableNameConverter(String str, String str2, Map<Pattern, String> map) {
            this.prefix = str == null ? "" : str;
            this.suffix = str2 == null ? "" : str2;
            this.multiToOneRulesPattern = map;
        }

        public String convert(String str) {
            if (this.multiToOneRulesPattern == null) {
                return this.prefix + str + this.suffix;
            }
            String str2 = null;
            for (Map.Entry<Pattern, String> entry : this.multiToOneRulesPattern.entrySet()) {
                if (entry.getKey().matcher(str).matches()) {
                    str2 = entry.getValue();
                }
            }
            return str2 == null ? this.prefix + str + this.suffix : str2;
        }
    }

    public abstract void registerDriver() throws SQLException;

    public abstract Connection getConnection() throws SQLException;

    public abstract List<SourceSchema> getSchemaList() throws Exception;

    public abstract DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment streamExecutionEnvironment);

    public abstract String getTableListPrefix();

    public DatabaseSync() throws SQLException {
        registerDriver();
    }

    public void create() {
        this.includingPattern = this.includingTables == null ? null : Pattern.compile(this.includingTables);
        this.excludingPattern = this.excludingTables == null ? null : Pattern.compile(this.excludingTables);
        this.multiToOneRulesPattern = multiToOneRulesParser(this.multiToOneOrigin, this.multiToOneTarget);
        this.converter = new TableNameConverter(this.tablePrefix, this.tableSuffix, this.multiToOneRulesPattern);
        if (this.tableConfig.containsKey(LIGHT_SCHEMA_CHANGE)) {
            return;
        }
        this.tableConfig.put(LIGHT_SCHEMA_CHANGE, BooleanUtils.TRUE);
    }

    public void build() throws Exception {
        DorisSystem dorisSystem = new DorisSystem(getDorisConnectionOptions());
        List<SourceSchema> schemaList = getSchemaList();
        Preconditions.checkState(!schemaList.isEmpty(), "No tables to be synchronized.");
        if (!dorisSystem.databaseExists(this.database)) {
            LOG.info("database {} not exist, created", this.database);
            dorisSystem.createDatabase(this.database);
        }
        ArrayList arrayList = new ArrayList();
        ArrayList<String> arrayList2 = new ArrayList();
        for (SourceSchema sourceSchema : schemaList) {
            arrayList.add(sourceSchema.getTableName());
            String convert = this.converter.convert(sourceSchema.getTableName());
            this.tableMapping.put(sourceSchema.getTableIdentifier(), String.format("%s.%s", this.database, convert));
            if (!dorisSystem.tableExists(this.database, convert)) {
                TableSchema convertTableSchema = sourceSchema.convertTableSchema(this.tableConfig);
                convertTableSchema.setDatabase(this.database);
                convertTableSchema.setTable(convert);
                dorisSystem.createTable(convertTableSchema);
            }
            if (!arrayList2.contains(convert)) {
                arrayList2.add(convert);
            }
        }
        if (this.createTableOnly) {
            System.out.println("Create table finished.");
            System.exit(0);
        }
        this.config.setString(TABLE_NAME_OPTIONS, getSyncTableList(arrayList));
        DataStreamSource<String> buildCdcSource = buildCdcSource(this.env);
        if (this.singleSink) {
            buildCdcSource.sinkTo(buildDorisSink());
            return;
        }
        SingleOutputStreamOperator process = buildCdcSource.process(new ParsingProcessFunction(this.converter));
        for (String str : arrayList2) {
            SideOutputDataStream sideOutput = process.getSideOutput(ParsingProcessFunction.createRecordOutputTag(str));
            sideOutput.sinkTo(buildDorisSink(str)).setParallelism(this.sinkConfig.getInteger(DorisConfigOptions.SINK_PARALLELISM, sideOutput.getParallelism())).name(str).uid(str);
        }
    }

    private DorisConnectionOptions getDorisConnectionOptions() {
        String string = this.sinkConfig.getString(DorisConfigOptions.FENODES);
        String string2 = this.sinkConfig.getString(DorisConfigOptions.BENODES);
        String string3 = this.sinkConfig.getString(DorisConfigOptions.USERNAME);
        String string4 = this.sinkConfig.getString(DorisConfigOptions.PASSWORD, "");
        String string5 = this.sinkConfig.getString(DorisConfigOptions.JDBC_URL);
        Preconditions.checkNotNull(string, "fenodes is empty in sink-conf");
        Preconditions.checkNotNull(string3, "username is empty in sink-conf");
        Preconditions.checkNotNull(string5, "jdbcurl is empty in sink-conf");
        return new DorisConnectionOptions.DorisConnectionOptionsBuilder().withFenodes(string).withBenodes(string2).withUsername(string3).withPassword(string4).withJdbcUrl(string5).build();
    }

    public DorisSink<String> buildDorisSink() {
        return buildDorisSink(null);
    }

    public DorisSink<String> buildDorisSink(String str) {
        String string = this.sinkConfig.getString(DorisConfigOptions.FENODES);
        String string2 = this.sinkConfig.getString(DorisConfigOptions.BENODES);
        String string3 = this.sinkConfig.getString(DorisConfigOptions.USERNAME);
        String string4 = this.sinkConfig.getString(DorisConfigOptions.PASSWORD, "");
        DorisSink.Builder builder = DorisSink.builder();
        DorisOptions.Builder builder2 = DorisOptions.builder();
        builder2.setFenodes(string).setBenodes(string2).setUsername(string3).setPassword(string4);
        Optional optional = this.sinkConfig.getOptional(DorisConfigOptions.AUTO_REDIRECT);
        builder2.getClass();
        optional.ifPresent((v1) -> {
            r1.setAutoRedirect(v1);
        });
        if (!this.singleSink && !StringUtils.isNullOrWhitespaceOnly(str)) {
            builder2.setTableIdentifier(this.database + "." + str);
        }
        Properties properties = new Properties();
        properties.setProperty(LoadConstants.FORMAT_KEY, LoadConstants.JSON);
        properties.setProperty(LoadConstants.READ_JSON_BY_LINE, BooleanUtils.TRUE);
        properties.putAll(DorisConfigOptions.getStreamLoadProp(this.sinkConfig.toMap()));
        DorisExecutionOptions.Builder streamLoadProp = DorisExecutionOptions.builder().setStreamLoadProp(properties);
        Optional optional2 = this.sinkConfig.getOptional(DorisConfigOptions.SINK_LABEL_PREFIX);
        streamLoadProp.getClass();
        optional2.ifPresent(streamLoadProp::setLabelPrefix);
        Optional optional3 = this.sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_DELETE);
        streamLoadProp.getClass();
        optional3.ifPresent(streamLoadProp::setDeletable);
        Optional optional4 = this.sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_COUNT);
        streamLoadProp.getClass();
        optional4.ifPresent((v1) -> {
            r1.setBufferCount(v1);
        });
        Optional optional5 = this.sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_SIZE);
        streamLoadProp.getClass();
        optional5.ifPresent((v1) -> {
            r1.setBufferSize(v1);
        });
        Optional optional6 = this.sinkConfig.getOptional(DorisConfigOptions.SINK_CHECK_INTERVAL);
        streamLoadProp.getClass();
        optional6.ifPresent(streamLoadProp::setCheckInterval);
        Optional optional7 = this.sinkConfig.getOptional(DorisConfigOptions.SINK_MAX_RETRIES);
        streamLoadProp.getClass();
        optional7.ifPresent(streamLoadProp::setMaxRetries);
        Optional optional8 = this.sinkConfig.getOptional(DorisConfigOptions.SINK_IGNORE_UPDATE_BEFORE);
        streamLoadProp.getClass();
        optional8.ifPresent((v1) -> {
            r1.setIgnoreUpdateBefore(v1);
        });
        if (!this.sinkConfig.getBoolean(DorisConfigOptions.SINK_ENABLE_2PC)) {
            streamLoadProp.disable2PC();
        } else if (this.sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_2PC).isPresent()) {
            streamLoadProp.enable2PC();
        }
        Optional optional9 = this.sinkConfig.getOptional(DorisConfigOptions.SINK_ENABLE_BATCH_MODE);
        streamLoadProp.getClass();
        optional9.ifPresent(streamLoadProp::setBatchMode);
        Optional optional10 = this.sinkConfig.getOptional(DorisConfigOptions.SINK_FLUSH_QUEUE_SIZE);
        streamLoadProp.getClass();
        optional10.ifPresent((v1) -> {
            r1.setFlushQueueSize(v1);
        });
        Optional optional11 = this.sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_ROWS);
        streamLoadProp.getClass();
        optional11.ifPresent((v1) -> {
            r1.setBufferFlushMaxRows(v1);
        });
        Optional optional12 = this.sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_MAX_BYTES);
        streamLoadProp.getClass();
        optional12.ifPresent((v1) -> {
            r1.setBufferFlushMaxBytes(v1);
        });
        this.sinkConfig.getOptional(DorisConfigOptions.SINK_BUFFER_FLUSH_INTERVAL).ifPresent(duration -> {
            streamLoadProp.setBufferFlushIntervalMs(duration.toMillis());
        });
        Optional optional13 = this.sinkConfig.getOptional(DorisConfigOptions.SINK_USE_CACHE);
        streamLoadProp.getClass();
        optional13.ifPresent((v1) -> {
            r1.setUseCache(v1);
        });
        DorisExecutionOptions build = streamLoadProp.build();
        builder.setDorisReadOptions(DorisReadOptions.builder().build()).setDorisExecutionOptions(build).setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(builder2.build()).setNewSchemaChange(this.newSchemaChange).setExecutionOptions(build).setTableMapping(this.tableMapping).setTableProperties(this.tableConfig).setTargetDatabase(this.database).build()).setDorisOptions(builder2.build());
        return builder.build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isSyncNeeded(String str) {
        boolean z = true;
        if (this.includingPattern != null) {
            z = this.includingPattern.matcher(str).matches();
        }
        if (this.excludingPattern != null) {
            z = z && !this.excludingPattern.matcher(str).matches();
        }
        LOG.debug("table {} is synchronized? {}", str, Boolean.valueOf(z));
        return z;
    }

    protected String getSyncTableList(List<String> list) {
        if (!this.singleSink) {
            return (String) list.stream().map(str -> {
                return getTableListPrefix() + "\\." + str;
            }).collect(Collectors.joining("|"));
        }
        String format = String.format("(%s)\\.(%s)", getTableListPrefix(), this.includingTables);
        return StringUtils.isNullOrWhitespaceOnly(this.excludingTables) ? format : String.format("(%s)(%s)", format, String.format("?!(%s\\.(%s))$", getTableListPrefix(), this.excludingTables));
    }

    protected HashMap<Pattern, String> multiToOneRulesParser(String str, String str2) {
        if (StringUtils.isNullOrWhitespaceOnly(str) || StringUtils.isNullOrWhitespaceOnly(str2)) {
            return null;
        }
        HashMap<Pattern, String> hashMap = new HashMap<>();
        String[] split = str.split("\\|");
        String[] split2 = str2.split("\\|");
        if (split.length != split2.length) {
            System.out.println("param error : multi to one params length are not equal,please check your params.");
            System.exit(1);
        }
        for (int i = 0; i < split.length; i++) {
            try {
                hashMap.put(Pattern.compile(split[i]), split2[i]);
            } catch (Exception e) {
                System.out.println("param error : Your regular expression is incorrect,please check.");
                System.exit(1);
            }
        }
        return hashMap;
    }

    public DatabaseSync setEnv(StreamExecutionEnvironment streamExecutionEnvironment) {
        this.env = streamExecutionEnvironment;
        return this;
    }

    public DatabaseSync setConfig(Configuration configuration) {
        this.config = configuration;
        return this;
    }

    public DatabaseSync setDatabase(String str) {
        this.database = str;
        return this;
    }

    public DatabaseSync setIncludingTables(String str) {
        this.includingTables = str;
        return this;
    }

    public DatabaseSync setExcludingTables(String str) {
        this.excludingTables = str;
        return this;
    }

    public DatabaseSync setMultiToOneOrigin(String str) {
        this.multiToOneOrigin = str;
        return this;
    }

    public DatabaseSync setMultiToOneTarget(String str) {
        this.multiToOneTarget = str;
        return this;
    }

    public DatabaseSync setTableConfig(Map<String, String> map) {
        this.tableConfig = map;
        return this;
    }

    public DatabaseSync setSinkConfig(Configuration configuration) {
        this.sinkConfig = configuration;
        return this;
    }

    public DatabaseSync setIgnoreDefaultValue(boolean z) {
        this.ignoreDefaultValue = z;
        return this;
    }

    public DatabaseSync setCreateTableOnly(boolean z) {
        this.createTableOnly = z;
        return this;
    }

    public DatabaseSync setNewSchemaChange(boolean z) {
        this.newSchemaChange = z;
        return this;
    }

    public DatabaseSync setSingleSink(boolean z) {
        this.singleSink = z;
        return this;
    }

    public DatabaseSync setTablePrefix(String str) {
        this.tablePrefix = str;
        return this;
    }

    public DatabaseSync setTableSuffix(String str) {
        this.tableSuffix = str;
        return this;
    }
}
