/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.pulsar.config;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.BasePulsarConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarAdminConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSemantics;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSinkOptions;
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;

public class PulsarConfigUtil {
    private PulsarConfigUtil() {
    }

    public static PulsarAdmin createAdmin(PulsarAdminConfig config) {
        PulsarAdminBuilder builder = PulsarAdmin.builder();
        builder.serviceHttpUrl(config.getAdminUrl());
        builder.authentication(PulsarConfigUtil.createAuthentication(config));
        try {
            return builder.build();
        }
        catch (PulsarClientException e) {
            throw new PulsarConnectorException((SeaTunnelErrorCode)PulsarConnectorErrorCode.OPEN_PULSAR_ADMIN_FAILED, e);
        }
    }

    public static PulsarClient createClient(PulsarClientConfig config, PulsarSemantics pulsarSemantics) {
        ClientBuilder builder = PulsarClient.builder();
        builder.serviceUrl(config.getServiceUrl());
        builder.authentication(PulsarConfigUtil.createAuthentication(config));
        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
            builder.enableTransaction(true);
        }
        try {
            return builder.build();
        }
        catch (PulsarClientException e) {
            throw new PulsarConnectorException((SeaTunnelErrorCode)PulsarConnectorErrorCode.OPEN_PULSAR_CLIENT_FAILED, e);
        }
    }

    public static ConsumerBuilder<byte[]> createConsumerBuilder(PulsarClient client, PulsarConsumerConfig config) {
        ConsumerBuilder<byte[]> builder = client.newConsumer(Schema.BYTES);
        builder.subscriptionName(config.getSubscriptionName());
        return builder;
    }

    private static Authentication createAuthentication(BasePulsarConfig config) {
        if (StringUtils.isBlank(config.getAuthPluginClassName())) {
            return AuthenticationDisabled.INSTANCE;
        }
        if (StringUtils.isNotBlank(config.getAuthPluginClassName())) {
            try {
                return AuthenticationFactory.create(config.getAuthPluginClassName(), config.getAuthParams());
            }
            catch (PulsarClientException.UnsupportedAuthenticationException e) {
                throw new PulsarConnectorException((SeaTunnelErrorCode)PulsarConnectorErrorCode.PULSAR_AUTHENTICATION_FAILED, e);
            }
        }
        throw new PulsarConnectorException((SeaTunnelErrorCode)PulsarConnectorErrorCode.PULSAR_AUTHENTICATION_FAILED, "Authentication parameters are required when using authentication plug-in.");
    }

    public static TransactionCoordinatorClient getTcClient(PulsarClient pulsarClient) {
        TransactionCoordinatorClientImpl coordinatorClient = ((PulsarClientImpl)pulsarClient).getTcClient();
        if (coordinatorClient == null) {
            throw new IllegalArgumentException("You haven't enable transaction in Pulsar client.");
        }
        return coordinatorClient;
    }

    public static Transaction getTransaction(PulsarClient pulsarClient, int timeout) throws PulsarClientException, InterruptedException, ExecutionException {
        Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(timeout, TimeUnit.SECONDS).build().get();
        return transaction;
    }

    public static Producer<byte[]> createProducer(PulsarClient pulsarClient, String topic, PulsarSemantics pulsarSemantics, ReadonlyConfig pluginConfig, MessageRoutingMode messageRoutingMode) throws PulsarClientException {
        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer(Schema.BYTES);
        producerBuilder.topic(topic);
        producerBuilder.messageRoutingMode(messageRoutingMode);
        producerBuilder.blockIfQueueFull(true);
        if (pluginConfig.get(PulsarSinkOptions.PULSAR_CONFIG) != null) {
            HashMap<String, String> pulsarProperties = new HashMap<String, String>();
            ((Map)pluginConfig.get(PulsarSinkOptions.PULSAR_CONFIG)).forEach((key, value) -> pulsarProperties.put((String)key, (String)value));
            producerBuilder.properties(pulsarProperties);
        }
        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
            producerBuilder.sendTimeout(0, TimeUnit.SECONDS);
        }
        return producerBuilder.create();
    }

    public static TypedMessageBuilder<byte[]> createTypedMessageBuilder(Producer<byte[]> producer, TransactionImpl transaction) throws PulsarClientException {
        ProducerBase producerBase = (ProducerBase)producer;
        return new TypedMessageBuilderImpl<byte[]>(producerBase, Schema.BYTES, transaction);
    }
}

