/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.transforms;

import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.config.ConfigDef;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.connector.ConnectRecord;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Values;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.transforms.Transformation;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.transforms.util.NonEmptyListValidator;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.transforms.util.Requirements;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.transforms.util.SimpleConfig;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;

public abstract class MaskField<R extends ConnectRecord<R>>
implements Transformation<R> {
    public static final String OVERVIEW_DOC = "Mask specified fields with a valid null value for the field type (i.e. 0, false, empty string, and so on).<p/>For numeric and string fields, an optional replacement value can be specified that is converted to the correct type.<p/>Use the concrete transformation type designed for the record key (<code>" + Key.class.getName() + "</code>) or value (<code>" + Value.class.getName() + "</code>).";
    private static final String FIELDS_CONFIG = "fields";
    private static final String REPLACEMENT_CONFIG = "replacement";
    public static final ConfigDef CONFIG_DEF = new ConfigDef().define("fields", ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, new NonEmptyListValidator(), ConfigDef.Importance.HIGH, "Names of fields to mask.").define("replacement", ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(), ConfigDef.Importance.LOW, "Custom value replacement, that will be applied to all 'fields' values (numeric or non-empty string values only).");
    private static final String PURPOSE = "mask fields";
    private static final Map<Class<?>, Function<String, ?>> REPLACEMENT_MAPPING_FUNC = new HashMap();
    private static final Map<Class<?>, Object> PRIMITIVE_VALUE_MAPPING = new HashMap();
    private Set<String> maskedFields;
    private String replacement;

    @Override
    public void configure(Map<String, ?> props) {
        SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
        this.maskedFields = new HashSet<String>(config.getList(FIELDS_CONFIG));
        this.replacement = config.getString(REPLACEMENT_CONFIG);
    }

    @Override
    public R apply(R record) {
        if (this.operatingSchema(record) == null) {
            return this.applySchemaless(record);
        }
        return this.applyWithSchema(record);
    }

    private R applySchemaless(R record) {
        Map<String, Object> value = Requirements.requireMap(this.operatingValue(record), PURPOSE);
        HashMap<String, Object> updatedValue = new HashMap<String, Object>(value);
        for (String field : this.maskedFields) {
            updatedValue.put(field, this.masked(value.get(field)));
        }
        return this.newRecord(record, updatedValue);
    }

    private R applyWithSchema(R record) {
        Struct value = Requirements.requireStruct(this.operatingValue(record), PURPOSE);
        Struct updatedValue = new Struct(value.schema());
        for (Field field : value.schema().fields()) {
            Object origFieldValue = value.get(field);
            updatedValue.put(field, this.maskedFields.contains(field.name()) ? this.masked(origFieldValue) : origFieldValue);
        }
        return this.newRecord(record, updatedValue);
    }

    private Object masked(Object value) {
        if (value == null) {
            return null;
        }
        return this.replacement == null ? MaskField.maskWithNullValue(value) : MaskField.maskWithCustomReplacement(value, this.replacement);
    }

    private static Object maskWithCustomReplacement(Object value, String replacement) {
        Function<String, ?> replacementMapper = REPLACEMENT_MAPPING_FUNC.get(value.getClass());
        if (replacementMapper == null) {
            throw new DataException("Cannot mask value of type " + value.getClass() + " with custom replacement.");
        }
        try {
            return replacementMapper.apply(replacement);
        }
        catch (NumberFormatException ex) {
            throw new DataException("Unable to convert " + replacement + " (" + replacement.getClass() + ") to number", ex);
        }
    }

    private static Object maskWithNullValue(Object value) {
        List maskedValue = PRIMITIVE_VALUE_MAPPING.get(value.getClass());
        if (maskedValue == null) {
            if (value instanceof List) {
                maskedValue = Collections.emptyList();
            } else if (value instanceof Map) {
                maskedValue = Collections.emptyMap();
            } else {
                throw new DataException("Cannot mask value of type: " + value.getClass());
            }
        }
        return maskedValue;
    }

    @Override
    public ConfigDef config() {
        return CONFIG_DEF;
    }

    @Override
    public void close() {
    }

    protected abstract Schema operatingSchema(R var1);

    protected abstract Object operatingValue(R var1);

    protected abstract R newRecord(R var1, Object var2);

    static {
        PRIMITIVE_VALUE_MAPPING.put(Boolean.class, Boolean.FALSE);
        PRIMITIVE_VALUE_MAPPING.put(Byte.class, (byte)0);
        PRIMITIVE_VALUE_MAPPING.put(Short.class, (short)0);
        PRIMITIVE_VALUE_MAPPING.put(Integer.class, 0);
        PRIMITIVE_VALUE_MAPPING.put(Long.class, 0L);
        PRIMITIVE_VALUE_MAPPING.put(Float.class, Float.valueOf(0.0f));
        PRIMITIVE_VALUE_MAPPING.put(Double.class, 0.0);
        PRIMITIVE_VALUE_MAPPING.put(BigInteger.class, BigInteger.ZERO);
        PRIMITIVE_VALUE_MAPPING.put(BigDecimal.class, BigDecimal.ZERO);
        PRIMITIVE_VALUE_MAPPING.put(Date.class, new Date(0L));
        PRIMITIVE_VALUE_MAPPING.put(String.class, "");
        REPLACEMENT_MAPPING_FUNC.put(Byte.class, v2 -> Values.convertToByte(null, v2));
        REPLACEMENT_MAPPING_FUNC.put(Short.class, v2 -> Values.convertToShort(null, v2));
        REPLACEMENT_MAPPING_FUNC.put(Integer.class, v2 -> Values.convertToInteger(null, v2));
        REPLACEMENT_MAPPING_FUNC.put(Long.class, v2 -> Values.convertToLong(null, v2));
        REPLACEMENT_MAPPING_FUNC.put(Float.class, v2 -> Values.convertToFloat(null, v2));
        REPLACEMENT_MAPPING_FUNC.put(Double.class, v2 -> Values.convertToDouble(null, v2));
        REPLACEMENT_MAPPING_FUNC.put(String.class, Function.identity());
        REPLACEMENT_MAPPING_FUNC.put(BigDecimal.class, BigDecimal::new);
        REPLACEMENT_MAPPING_FUNC.put(BigInteger.class, BigInteger::new);
    }

    public static final class Value<R extends ConnectRecord<R>>
    extends MaskField<R> {
        @Override
        protected Schema operatingSchema(R record) {
            return ((ConnectRecord)record).valueSchema();
        }

        @Override
        protected Object operatingValue(R record) {
            return ((ConnectRecord)record).value();
        }

        @Override
        protected R newRecord(R record, Object updatedValue) {
            return ((ConnectRecord)record).newRecord(((ConnectRecord)record).topic(), ((ConnectRecord)record).kafkaPartition(), ((ConnectRecord)record).keySchema(), ((ConnectRecord)record).key(), ((ConnectRecord)record).valueSchema(), updatedValue, ((ConnectRecord)record).timestamp());
        }
    }

    public static final class Key<R extends ConnectRecord<R>>
    extends MaskField<R> {
        @Override
        protected Schema operatingSchema(R record) {
            return ((ConnectRecord)record).keySchema();
        }

        @Override
        protected Object operatingValue(R record) {
            return ((ConnectRecord)record).key();
        }

        @Override
        protected R newRecord(R record, Object updatedValue) {
            return ((ConnectRecord)record).newRecord(((ConnectRecord)record).topic(), ((ConnectRecord)record).kafkaPartition(), ((ConnectRecord)record).keySchema(), updatedValue, ((ConnectRecord)record).valueSchema(), ((ConnectRecord)record).value(), ((ConnectRecord)record).timestamp());
        }
    }
}

