package org.apache.doris.flink.tools.cdc.mysql;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.deserialization.DorisJsonDebeziumDeserializationSchema;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.SourceSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/doris/flink/tools/cdc/mysql/MysqlDatabaseSync.class */
public class MysqlDatabaseSync extends DatabaseSync {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) MysqlDatabaseSync.class);
    private static final String JDBC_URL = "jdbc:mysql://%s:%d?useInformationSchema=true";
    private static final String PROPERTIES_PREFIX = "jdbc.properties.";

    @Override // org.apache.doris.flink.tools.cdc.DatabaseSync
    public void registerDriver() throws SQLException {
        try {
            Class.forName("com.mysql.cj.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            LOG.warn("can not found class com.mysql.cj.jdbc.Driver, use class com.mysql.jdbc.Driver");
            try {
                Class.forName("com.mysql.jdbc.Driver");
            } catch (Exception e2) {
                throw new SQLException("No suitable driver found, can not found class com.mysql.cj.jdbc.Driver and com.mysql.jdbc.Driver");
            }
        }
    }

    @Override // org.apache.doris.flink.tools.cdc.DatabaseSync
    public Connection getConnection() throws SQLException {
        Properties jdbcProperties = getJdbcProperties();
        StringBuilder sb = new StringBuilder(JDBC_URL);
        jdbcProperties.forEach((obj, obj2) -> {
            sb.append("&").append(obj).append("=").append(obj2);
        });
        return DriverManager.getConnection(String.format(sb.toString(), this.config.get(MySqlSourceOptions.HOSTNAME), this.config.get(MySqlSourceOptions.PORT)), (String) this.config.get(MySqlSourceOptions.USERNAME), (String) this.config.get(MySqlSourceOptions.PASSWORD));
    }

    @Override // org.apache.doris.flink.tools.cdc.DatabaseSync
    public List<SourceSchema> getSchemaList() throws Exception {
        String str = (String) this.config.get(MySqlSourceOptions.DATABASE_NAME);
        ArrayList arrayList = new ArrayList();
        Connection connection = getConnection();
        Throwable th = null;
        try {
            DatabaseMetaData metaData = connection.getMetaData();
            ResultSet tables = metaData.getTables(str, null, "%", new String[]{"TABLE"});
            Throwable th2 = null;
            while (tables.next()) {
                try {
                    try {
                        String string = tables.getString("TABLE_NAME");
                        String string2 = tables.getString("REMARKS");
                        if (isSyncNeeded(string)) {
                            MysqlSchema mysqlSchema = new MysqlSchema(metaData, str, string, string2);
                            mysqlSchema.setModel(!mysqlSchema.primaryKeys.isEmpty() ? DataModel.UNIQUE : DataModel.DUPLICATE);
                            arrayList.add(mysqlSchema);
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (tables != null) {
                        if (th2 != null) {
                            try {
                                tables.close();
                            } catch (Throwable th4) {
                                th2.addSuppressed(th4);
                            }
                        } else {
                            tables.close();
                        }
                    }
                    throw th3;
                }
            }
            if (tables != null) {
                if (0 != 0) {
                    try {
                        tables.close();
                    } catch (Throwable th5) {
                        th2.addSuppressed(th5);
                    }
                } else {
                    tables.close();
                }
            }
            return arrayList;
        } finally {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    connection.close();
                }
            }
        }
    }

    @Override // org.apache.doris.flink.tools.cdc.DatabaseSync
    public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment streamExecutionEnvironment) {
        DorisJsonDebeziumDeserializationSchema jsonDebeziumDeserializationSchema;
        MySqlSourceBuilder<String> builder = MySqlSource.builder();
        String str = (String) this.config.get(MySqlSourceOptions.DATABASE_NAME);
        Preconditions.checkNotNull(str, "database-name in mysql is required");
        builder.hostname((String) this.config.get(MySqlSourceOptions.HOSTNAME)).port(((Integer) this.config.get(MySqlSourceOptions.PORT)).intValue()).username((String) this.config.get(MySqlSourceOptions.USERNAME)).password((String) this.config.get(MySqlSourceOptions.PASSWORD)).databaseList(new String[]{str}).tableList(new String[]{(String) this.config.get(MySqlSourceOptions.TABLE_NAME)});
        Optional optional = this.config.getOptional(MySqlSourceOptions.SERVER_ID);
        builder.getClass();
        optional.ifPresent(builder::serverId);
        Optional optional2 = this.config.getOptional(MySqlSourceOptions.SERVER_TIME_ZONE);
        builder.getClass();
        optional2.ifPresent(builder::serverTimeZone);
        Optional optional3 = this.config.getOptional(MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE);
        builder.getClass();
        optional3.ifPresent((v1) -> {
            r1.fetchSize(v1);
        });
        Optional optional4 = this.config.getOptional(MySqlSourceOptions.CONNECT_TIMEOUT);
        builder.getClass();
        optional4.ifPresent(builder::connectTimeout);
        Optional optional5 = this.config.getOptional(MySqlSourceOptions.CONNECT_MAX_RETRIES);
        builder.getClass();
        optional5.ifPresent((v1) -> {
            r1.connectMaxRetries(v1);
        });
        Optional optional6 = this.config.getOptional(MySqlSourceOptions.CONNECTION_POOL_SIZE);
        builder.getClass();
        optional6.ifPresent((v1) -> {
            r1.connectionPoolSize(v1);
        });
        Optional optional7 = this.config.getOptional(MySqlSourceOptions.HEARTBEAT_INTERVAL);
        builder.getClass();
        optional7.ifPresent(builder::heartbeatInterval);
        Optional optional8 = this.config.getOptional(MySqlSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED);
        builder.getClass();
        optional8.ifPresent((v1) -> {
            r1.scanNewlyAddedTableEnabled(v1);
        });
        Optional optional9 = this.config.getOptional(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
        builder.getClass();
        optional9.ifPresent((v1) -> {
            r1.splitSize(v1);
        });
        Optional optional10 = this.config.getOptional(MySqlSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
        builder.getClass();
        optional10.ifPresent((v1) -> {
            r1.closeIdleReaders(v1);
        });
        setChunkColumns(builder);
        String str2 = (String) this.config.get(MySqlSourceOptions.SCAN_STARTUP_MODE);
        if ("initial".equalsIgnoreCase(str2)) {
            builder.startupOptions(StartupOptions.initial());
        } else if ("earliest-offset".equalsIgnoreCase(str2)) {
            builder.startupOptions(StartupOptions.earliest());
        } else if ("latest-offset".equalsIgnoreCase(str2)) {
            builder.startupOptions(StartupOptions.latest());
        } else if ("specific-offset".equalsIgnoreCase(str2)) {
            BinlogOffsetBuilder builder2 = BinlogOffset.builder();
            String str3 = (String) this.config.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE);
            Long l = (Long) this.config.get(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_POS);
            if (str3 != null && l != null) {
                builder2.setBinlogFilePosition(str3, l.longValue());
            }
            Optional optional11 = this.config.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET);
            builder2.getClass();
            optional11.ifPresent(builder2::setGtidSet);
            Optional optional12 = this.config.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_EVENTS);
            builder2.getClass();
            optional12.ifPresent((v1) -> {
                r1.setSkipEvents(v1);
            });
            Optional optional13 = this.config.getOptional(MySqlSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_SKIP_ROWS);
            builder2.getClass();
            optional13.ifPresent((v1) -> {
                r1.setSkipRows(v1);
            });
            builder.startupOptions(StartupOptions.specificOffset(builder2.build()));
        } else if ("timestamp".equalsIgnoreCase(str2)) {
            builder.startupOptions(StartupOptions.timestamp(((Long) this.config.get(MySqlSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS)).longValue()));
        }
        Properties properties = new Properties();
        Properties properties2 = new Properties();
        properties2.putAll(DateToStringConverter.DEFAULT_PROPS);
        for (Map.Entry entry : this.config.toMap().entrySet()) {
            String str4 = (String) entry.getKey();
            String str5 = (String) entry.getValue();
            if (str4.startsWith(PROPERTIES_PREFIX)) {
                properties.put(str4.substring(PROPERTIES_PREFIX.length()), str5);
            } else if (str4.startsWith("debezium.")) {
                properties2.put(str4.substring("debezium.".length()), str5);
            }
        }
        builder.jdbcProperties(properties);
        builder.debeziumProperties(properties2);
        if (this.ignoreDefaultValue) {
            jsonDebeziumDeserializationSchema = new DorisJsonDebeziumDeserializationSchema();
        } else {
            HashMap hashMap = new HashMap();
            hashMap.put("decimal.format", "numeric");
            jsonDebeziumDeserializationSchema = new JsonDebeziumDeserializationSchema(false, hashMap);
        }
        return streamExecutionEnvironment.fromSource(builder.deserializer(jsonDebeziumDeserializationSchema).includeSchemaChanges(true).build(), WatermarkStrategy.noWatermarks(), "MySQL Source");
    }

    @Override // org.apache.doris.flink.tools.cdc.DatabaseSync
    public String getTableListPrefix() {
        return (String) this.config.get(MySqlSourceOptions.DATABASE_NAME);
    }

    private void setChunkColumns(MySqlSourceBuilder<String> mySqlSourceBuilder) {
        for (Map.Entry<ObjectPath, String> entry : getChunkColumnMap().entrySet()) {
            mySqlSourceBuilder.chunkKeyColumn(entry.getKey(), entry.getValue());
        }
    }

    private Map<ObjectPath, String> getChunkColumnMap() {
        HashMap hashMap = new HashMap();
        String string = this.config.getString(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
        if (!StringUtils.isNullOrWhitespaceOnly(string)) {
            Pattern compile = Pattern.compile("(\\S+)\\.(\\S+):(\\S+)");
            for (String str : string.split(",")) {
                Matcher matcher = compile.matcher(str);
                if (matcher.find()) {
                    hashMap.put(new ObjectPath(matcher.group(1), matcher.group(2)), matcher.group(3));
                }
            }
        }
        return hashMap;
    }

    private Properties getJdbcProperties() {
        Properties properties = new Properties();
        for (Map.Entry entry : this.config.toMap().entrySet()) {
            String str = (String) entry.getKey();
            String str2 = (String) entry.getValue();
            if (str.startsWith(PROPERTIES_PREFIX)) {
                properties.put(str.substring(PROPERTIES_PREFIX.length()), str2);
            }
        }
        return properties;
    }
}
