/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.pulsar.source;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarAdminConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.PulsarSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.PulsarSplitEnumeratorState;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.NeverStopCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.StopCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.PulsarDiscoverer;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.TopicListDiscoverer;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.discoverer.TopicPatternDiscoverer;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.format.PulsarCanalDecorator;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;

public class PulsarSource
implements SeaTunnelSource<SeaTunnelRow, PulsarPartitionSplit, PulsarSplitEnumeratorState>,
SupportParallelism {
    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
    private CatalogTable catalogTable;
    private PulsarAdminConfig adminConfig;
    private PulsarClientConfig clientConfig;
    private PulsarConsumerConfig consumerConfig;
    private PulsarDiscoverer partitionDiscoverer;
    private long partitionDiscoveryIntervalMs;
    private StartCursor startCursor;
    private StopCursor stopCursor;
    protected int pollTimeout;
    protected long pollInterval;
    protected int batchSize;

    public PulsarSource(ReadonlyConfig config, CatalogTable catalogTable) {
        this.catalogTable = catalogTable;
        PulsarAdminConfig.Builder adminConfigBuilder = PulsarAdminConfig.builder().adminUrl((String)config.get(PulsarSourceOptions.ADMIN_SERVICE_URL));
        adminConfigBuilder.authPluginClassName((String)config.get(PulsarSourceOptions.AUTH_PLUGIN_CLASS));
        adminConfigBuilder.authParams((String)config.get(PulsarSourceOptions.AUTH_PARAMS));
        this.adminConfig = adminConfigBuilder.build();
        PulsarClientConfig.Builder clientConfigBuilder = PulsarClientConfig.builder().serviceUrl((String)config.get(PulsarSourceOptions.CLIENT_SERVICE_URL));
        clientConfigBuilder.authPluginClassName((String)config.get(PulsarSourceOptions.AUTH_PLUGIN_CLASS));
        clientConfigBuilder.authParams((String)config.get(PulsarSourceOptions.AUTH_PARAMS));
        this.clientConfig = clientConfigBuilder.build();
        PulsarConsumerConfig.Builder consumerConfigBuilder = PulsarConsumerConfig.builder().subscriptionName((String)config.get(PulsarSourceOptions.SUBSCRIPTION_NAME));
        this.consumerConfig = consumerConfigBuilder.build();
        this.partitionDiscoveryIntervalMs = (Long)config.get(PulsarSourceOptions.TOPIC_DISCOVERY_INTERVAL);
        this.pollTimeout = (Integer)config.get(PulsarSourceOptions.POLL_TIMEOUT);
        this.pollInterval = (Long)config.get(PulsarSourceOptions.POLL_INTERVAL);
        this.batchSize = (Integer)config.get(PulsarSourceOptions.POLL_BATCH_SIZE);
        this.setStartCursor(config);
        this.setStopCursor(config);
        this.setPartitionDiscoverer(config);
        this.setDeserialization(config);
        if (this.partitionDiscoverer instanceof TopicPatternDiscoverer && this.partitionDiscoveryIntervalMs > 0L && Boundedness.BOUNDED == this.stopCursor.getBoundedness()) {
            throw new PulsarConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, "Bounded streams do not support dynamic partition discovery.");
        }
    }

    public String getPluginName() {
        return "Pulsar";
    }

    private void setStartCursor(ReadonlyConfig config) {
        PulsarSourceOptions.StartMode startMode = (PulsarSourceOptions.StartMode)((Object)config.get(PulsarSourceOptions.CURSOR_STARTUP_MODE));
        switch (startMode) {
            case EARLIEST: {
                this.startCursor = StartCursor.earliest();
                break;
            }
            case LATEST: {
                this.startCursor = StartCursor.latest();
                break;
            }
            case SUBSCRIPTION: {
                PulsarSourceOptions.CursorResetStrategy resetStrategy = (PulsarSourceOptions.CursorResetStrategy)((Object)config.get(PulsarSourceOptions.CURSOR_RESET_MODE));
                this.startCursor = StartCursor.subscription(resetStrategy);
                break;
            }
            case TIMESTAMP: {
                if (!config.getOptional(PulsarSourceOptions.CURSOR_STARTUP_TIMESTAMP).isPresent()) {
                    throw new PulsarConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The '%s' property is required when the '%s' is 'timestamp'.", PulsarSourceOptions.CURSOR_STARTUP_TIMESTAMP.key(), PulsarSourceOptions.CURSOR_STARTUP_MODE.key()));
                }
                this.startCursor = StartCursor.timestamp((Long)config.get(PulsarSourceOptions.CURSOR_STARTUP_TIMESTAMP));
                break;
            }
            default: {
                throw new PulsarConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The %s mode is not supported.", new Object[]{startMode}));
            }
        }
    }

    private void setStopCursor(ReadonlyConfig config) {
        PulsarSourceOptions.StopMode stopMode = (PulsarSourceOptions.StopMode)((Object)config.get(PulsarSourceOptions.CURSOR_STOP_MODE));
        switch (stopMode) {
            case LATEST: {
                this.stopCursor = StopCursor.latest();
                break;
            }
            case NEVER: {
                this.stopCursor = StopCursor.never();
                break;
            }
            case TIMESTAMP: {
                if (!config.getOptional(PulsarSourceOptions.CURSOR_STOP_TIMESTAMP).isPresent()) {
                    throw new PulsarConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The '%s' property is required when the '%s' is 'timestamp'.", PulsarSourceOptions.CURSOR_STOP_TIMESTAMP.key(), PulsarSourceOptions.CURSOR_STOP_MODE.key()));
                }
                this.stopCursor = StopCursor.timestamp((Long)config.get(PulsarSourceOptions.CURSOR_STOP_TIMESTAMP));
                break;
            }
            default: {
                throw new PulsarConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format("The %s mode is not supported.", new Object[]{stopMode}));
            }
        }
    }

    private void setPartitionDiscoverer(ReadonlyConfig config) {
        String topicPattern;
        String topic;
        if (config.getOptional(PulsarSourceOptions.TOPIC).isPresent() && StringUtils.isNotBlank(topic = (String)config.get(PulsarSourceOptions.TOPIC))) {
            this.partitionDiscoverer = new TopicListDiscoverer(Arrays.asList(StringUtils.split(topic, ",")));
        }
        if (config.getOptional(PulsarSourceOptions.TOPIC_PATTERN).isPresent() && StringUtils.isNotBlank(topicPattern = (String)config.get(PulsarSourceOptions.TOPIC_PATTERN))) {
            this.partitionDiscoverer = new TopicPatternDiscoverer(Pattern.compile(topicPattern));
        }
        if (this.partitionDiscoverer == null) {
            throw new PulsarConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.OPTION_VALIDATION_FAILED, String.format("The properties '%s' or '%s' is required.", PulsarSourceOptions.TOPIC.key(), PulsarSourceOptions.TOPIC_PATTERN.key()));
        }
    }

    private void setDeserialization(ReadonlyConfig config) {
        String format = (String)config.get(PulsarSourceOptions.FORMAT);
        switch (format.toUpperCase()) {
            case "JSON": {
                this.deserializationSchema = new JsonDeserializationSchema(false, false, this.catalogTable.getSeaTunnelRowType());
                break;
            }
            case "CANAL_JSON": {
                this.deserializationSchema = new PulsarCanalDecorator(CanalJsonDeserializationSchema.builder(this.catalogTable).setIgnoreParseErrors(true).build());
                break;
            }
            default: {
                throw new SeaTunnelJsonFormatException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + format);
            }
        }
    }

    public Boundedness getBoundedness() {
        return this.stopCursor instanceof NeverStopCursor ? Boundedness.UNBOUNDED : Boundedness.BOUNDED;
    }

    public List<CatalogTable> getProducedCatalogTables() {
        return Collections.singletonList(this.catalogTable);
    }

    public SourceReader<SeaTunnelRow, PulsarPartitionSplit> createReader(SourceReader.Context readerContext) throws Exception {
        return new PulsarSourceReader<SeaTunnelRow>(readerContext, this.clientConfig, this.consumerConfig, this.startCursor, this.deserializationSchema, this.pollTimeout, this.pollInterval, this.batchSize);
    }

    public SourceSplitEnumerator<PulsarPartitionSplit, PulsarSplitEnumeratorState> createEnumerator(SourceSplitEnumerator.Context<PulsarPartitionSplit> enumeratorContext) throws Exception {
        return new PulsarSplitEnumerator(enumeratorContext, this.adminConfig, this.partitionDiscoverer, this.partitionDiscoveryIntervalMs, this.startCursor, this.stopCursor, this.consumerConfig.getSubscriptionName());
    }

    public SourceSplitEnumerator<PulsarPartitionSplit, PulsarSplitEnumeratorState> restoreEnumerator(SourceSplitEnumerator.Context<PulsarPartitionSplit> enumeratorContext, PulsarSplitEnumeratorState checkpointState) throws Exception {
        return new PulsarSplitEnumerator(enumeratorContext, this.adminConfig, this.partitionDiscoverer, this.partitionDiscoveryIntervalMs, this.startCursor, this.stopCursor, this.consumerConfig.getSubscriptionName(), checkpointState.getAssignedPartitions());
    }
}

