package org.apache.doris.flink.tools.cdc.postgres;

import com.ververica.cdc.connectors.base.options.JdbcSourceOptions;
import com.ververica.cdc.connectors.base.options.SourceOptions;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import com.ververica.cdc.connectors.postgres.source.PostgresSourceBuilder;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.cfg.ConfigurationOptions;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.SourceSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/doris/flink/tools/cdc/postgres/PostgresDatabaseSync.class */
public class PostgresDatabaseSync extends DatabaseSync {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) PostgresDatabaseSync.class);
    private static final String JDBC_URL = "jdbc:postgresql://%s:%d/%s";

    @Override // org.apache.doris.flink.tools.cdc.DatabaseSync
    public void registerDriver() throws SQLException {
        try {
            Class.forName("org.postgresql.Driver");
        } catch (ClassNotFoundException e) {
            throw new SQLException("No suitable driver found, can not found class org.postgresql.Driver");
        }
    }

    @Override // org.apache.doris.flink.tools.cdc.DatabaseSync
    public Connection getConnection() throws SQLException {
        String format = String.format(JDBC_URL, this.config.get(PostgresSourceOptions.HOSTNAME), this.config.get(PostgresSourceOptions.PG_PORT), this.config.get(PostgresSourceOptions.DATABASE_NAME));
        Properties properties = new Properties();
        properties.setProperty("user", (String) this.config.get(PostgresSourceOptions.USERNAME));
        properties.setProperty(ConfigurationOptions.DORIS_PASSWORD, (String) this.config.get(PostgresSourceOptions.PASSWORD));
        return DriverManager.getConnection(format, properties);
    }

    @Override // org.apache.doris.flink.tools.cdc.DatabaseSync
    public List<SourceSchema> getSchemaList() throws Exception {
        String str = (String) this.config.get(PostgresSourceOptions.DATABASE_NAME);
        String str2 = (String) this.config.get(PostgresSourceOptions.SCHEMA_NAME);
        ArrayList arrayList = new ArrayList();
        LOG.info("database-name {}, schema-name {}", str, str2);
        Connection connection = getConnection();
        Throwable th = null;
        try {
            DatabaseMetaData metaData = connection.getMetaData();
            ResultSet tables = metaData.getTables(str, str2, "%", new String[]{"TABLE"});
            Throwable th2 = null;
            while (tables.next()) {
                try {
                    try {
                        String string = tables.getString("TABLE_NAME");
                        String string2 = tables.getString("REMARKS");
                        if (isSyncNeeded(string)) {
                            PostgresSchema postgresSchema = new PostgresSchema(metaData, str, str2, string, string2);
                            postgresSchema.setModel(postgresSchema.primaryKeys.size() > 0 ? DataModel.UNIQUE : DataModel.DUPLICATE);
                            arrayList.add(postgresSchema);
                        }
                    } catch (Throwable th3) {
                        if (tables != null) {
                            if (th2 != null) {
                                try {
                                    tables.close();
                                } catch (Throwable th4) {
                                    th2.addSuppressed(th4);
                                }
                            } else {
                                tables.close();
                            }
                        }
                        throw th3;
                    }
                } finally {
                }
            }
            if (tables != null) {
                if (0 != 0) {
                    try {
                        tables.close();
                    } catch (Throwable th5) {
                        th2.addSuppressed(th5);
                    }
                } else {
                    tables.close();
                }
            }
            return arrayList;
        } finally {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    connection.close();
                }
            }
        }
    }

    @Override // org.apache.doris.flink.tools.cdc.DatabaseSync
    public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment streamExecutionEnvironment) {
        String str = (String) this.config.get(PostgresSourceOptions.DATABASE_NAME);
        String str2 = (String) this.config.get(PostgresSourceOptions.SCHEMA_NAME);
        String str3 = (String) this.config.get(PostgresSourceOptions.SLOT_NAME);
        Preconditions.checkNotNull(str, "database-name in postgres is required");
        Preconditions.checkNotNull(str2, "schema-name in postgres is required");
        Preconditions.checkNotNull(str3, "slot.name in postgres is required");
        String str4 = (String) this.config.get(PostgresSourceOptions.TABLE_NAME);
        String str5 = (String) this.config.get(PostgresSourceOptions.HOSTNAME);
        Integer num = (Integer) this.config.get(PostgresSourceOptions.PG_PORT);
        String str6 = (String) this.config.get(PostgresSourceOptions.USERNAME);
        String str7 = (String) this.config.get(PostgresSourceOptions.PASSWORD);
        StartupOptions initial = StartupOptions.initial();
        String str8 = (String) this.config.get(PostgresSourceOptions.SCAN_STARTUP_MODE);
        if ("initial".equalsIgnoreCase(str8)) {
            initial = StartupOptions.initial();
        } else if ("latest-offset".equalsIgnoreCase(str8)) {
            initial = StartupOptions.latest();
        }
        Properties properties = new Properties();
        properties.putAll(PostgresDateConverter.DEFAULT_PROPS);
        properties.put("decimal.handling.mode", "string");
        for (Map.Entry entry : this.config.toMap().entrySet()) {
            String str9 = (String) entry.getKey();
            String str10 = (String) entry.getValue();
            if (str9.startsWith("debezium.")) {
                properties.put(str9.substring("debezium.".length()), str10);
            }
        }
        JsonDebeziumDeserializationSchema jsonDebeziumDeserializationSchema = new JsonDebeziumDeserializationSchema(false, new HashMap());
        return this.config.getBoolean(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED, false) ? streamExecutionEnvironment.fromSource(PostgresSourceBuilder.PostgresIncrementalSource.builder().hostname(str5).port(num.intValue()).database(str).schemaList(new String[]{str2}).tableList(new String[]{str4}).username(str6).password(str7).deserializer(jsonDebeziumDeserializationSchema).slotName(str3).decodingPluginName((String) this.config.get(PostgresSourceOptions.DECODING_PLUGIN_NAME)).includeSchemaChanges(true).debeziumProperties(properties).startupOptions(initial).splitSize(((Integer) this.config.get(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE)).intValue()).splitMetaGroupSize(((Integer) this.config.get(SourceOptions.CHUNK_META_GROUP_SIZE)).intValue()).fetchSize(((Integer) this.config.get(SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE)).intValue()).connectTimeout((Duration) this.config.get(JdbcSourceOptions.CONNECT_TIMEOUT)).connectionPoolSize(((Integer) this.config.get(JdbcSourceOptions.CONNECTION_POOL_SIZE)).intValue()).connectMaxRetries(((Integer) this.config.get(JdbcSourceOptions.CONNECT_MAX_RETRIES)).intValue()).distributionFactorUpper(((Double) this.config.get(SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND)).doubleValue()).distributionFactorLower(((Double) this.config.get(SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND)).doubleValue()).heartbeatInterval((Duration) this.config.get(PostgresSourceOptions.HEARTBEAT_INTERVAL)).build(), WatermarkStrategy.noWatermarks(), "Postgres IncrSource") : streamExecutionEnvironment.addSource(PostgreSQLSource.builder().hostname(str5).port(num.intValue()).database(str).schemaList(new String[]{str2}).tableList(new String[]{str4}).username(str6).password(str7).debeziumProperties(properties).deserializer(jsonDebeziumDeserializationSchema).slotName(str3).decodingPluginName((String) this.config.get(PostgresSourceOptions.DECODING_PLUGIN_NAME)).build(), "Postgres Source");
    }

    @Override // org.apache.doris.flink.tools.cdc.DatabaseSync
    public String getTableListPrefix() {
        return (String) this.config.get(PostgresSourceOptions.SCHEMA_NAME);
    }
}
