/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.JulianFields;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.NonNull;
import org.apache.avro.Conversions;
import org.apache.avro.Schema;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.simple.NanoTime;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.schema.ConversionPatterns;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractWriteStrategy;

public class ParquetWriteStrategy
extends AbstractWriteStrategy<ParquetWriter<GenericRecord>> {
    private final LinkedHashMap<String, ParquetWriter<GenericRecord>> beingWrittenWriter = new LinkedHashMap();
    private AvroSchemaConverter schemaConverter;
    private Schema schema;
    private Set<String> writePathsAsInt96;
    public static final int[] PRECISION_TO_BYTE_COUNT = new int[38];

    public ParquetWriteStrategy(FileSinkConfig fileSinkConfig) {
        super(fileSinkConfig);
    }

    @Override
    public void init(HadoopConf conf, String jobId, String uuidPrefix, int subTaskIndex) {
        super.init(conf, jobId, uuidPrefix, subTaskIndex);
        Configuration configuration = this.getConfiguration(this.hadoopConf);
        this.writePathsAsInt96 = new HashSet<String>(this.fileSinkConfig.getParquetAvroWriteFixedAsInt96());
        if (this.fileSinkConfig.getParquetWriteTimestampAsInt96().booleanValue()) {
            ArrayList<String> timestampFields = new ArrayList<String>();
            for (int i = 0; i < this.seaTunnelRowType.getTotalFields(); ++i) {
                if (!SqlType.TIMESTAMP.equals((Object)this.seaTunnelRowType.getFieldType(i).getSqlType())) continue;
                timestampFields.add(this.seaTunnelRowType.getFieldName(i));
            }
            this.writePathsAsInt96.addAll(timestampFields);
        }
        if (!this.writePathsAsInt96.isEmpty()) {
            configuration.set("parquet.avro.writeFixedAsInt96", String.join((CharSequence)",", this.writePathsAsInt96));
        }
        this.schemaConverter = new AvroSchemaConverter(configuration);
    }

    @Override
    public void write(@NonNull SeaTunnelRow seaTunnelRow) {
        if (seaTunnelRow == null) {
            throw new NullPointerException("seaTunnelRow is marked non-null but is null");
        }
        super.write(seaTunnelRow);
        String filePath = this.getOrCreateFilePathBeingWritten(seaTunnelRow);
        Object writer = this.getOrCreateOutputStream(filePath);
        GenericRecordBuilder recordBuilder = new GenericRecordBuilder(this.schema);
        for (Integer integer : this.sinkColumnsIndexInRow) {
            String fieldName = this.seaTunnelRowType.getFieldName(integer.intValue());
            Object field = seaTunnelRow.getField(integer.intValue());
            recordBuilder.set(fieldName.toLowerCase(), this.resolveObject(fieldName, field, this.seaTunnelRowType.getFieldType(integer.intValue())));
        }
        GenericData.Record record = recordBuilder.build();
        try {
            ((ParquetWriter)writer).write(record);
        }
        catch (IOException e) {
            throw CommonError.fileOperationFailed((String)"ParquetFile", (String)"write", (String)filePath, (Throwable)e);
        }
    }

    @Override
    public void finishAndCloseFile() {
        this.beingWrittenWriter.forEach((k, v) -> {
            try {
                v.close();
            }
            catch (IOException e) {
                String errorMsg = String.format("Close file [%s] parquet writer failed, error msg: [%s]", k, e.getMessage());
                throw new FileConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, errorMsg, e);
            }
            this.needMoveFiles.put(k, this.getTargetLocation((String)k));
        });
        this.beingWrittenWriter.clear();
    }

    @Override
    public ParquetWriter<GenericRecord> getOrCreateOutputStream(@NonNull String filePath) {
        if (filePath == null) {
            throw new NullPointerException("filePath is marked non-null but is null");
        }
        if (this.schema == null) {
            this.schema = this.buildAvroSchemaWithRowType(this.seaTunnelRowType, this.sinkColumnsIndexInRow);
        }
        ParquetWriter<GenericRecord> writer = this.beingWrittenWriter.get(filePath);
        GenericData dataModel = new GenericData();
        dataModel.addLogicalTypeConversion(new Conversions.DecimalConversion());
        dataModel.addLogicalTypeConversion(new TimeConversions.DateConversion());
        dataModel.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
        if (writer == null) {
            Path path = new Path(filePath);
            return this.hadoopFileSystemProxy.doWithHadoopAuth((configuration, userGroupInformation) -> {
                try {
                    if (!this.writePathsAsInt96.isEmpty()) {
                        configuration.set("parquet.avro.writeFixedAsInt96", String.join((CharSequence)",", this.writePathsAsInt96));
                    }
                    HadoopOutputFile outputFile = HadoopOutputFile.fromPath(path, this.getConfiguration(this.hadoopConf));
                    ParquetWriter newWriter = ((AvroParquetWriter.Builder)((AvroParquetWriter.Builder)((AvroParquetWriter.Builder)((AvroParquetWriter.Builder)AvroParquetWriter.builder(outputFile).withWriteMode(ParquetFileWriter.Mode.OVERWRITE)).withDataModel(dataModel).withConf(configuration)).withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)).withCompressionCodec(this.compressFormat.getParquetCompression())).withSchema(this.schema).build();
                    this.beingWrittenWriter.put(filePath, newWriter);
                    return newWriter;
                }
                catch (IOException e) {
                    String errorMsg = String.format("Get parquet writer for file [%s] error", filePath);
                    throw new FileConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, errorMsg, e);
                }
            });
        }
        return writer;
    }

    private Object resolveObject(String name, Object data, SeaTunnelDataType<?> seaTunnelDataType) {
        if (data == null) {
            return null;
        }
        switch (seaTunnelDataType.getSqlType()) {
            case ARRAY: {
                SeaTunnelDataType elementType = ((ArrayType)seaTunnelDataType).getElementType();
                ArrayList<Object> records = new ArrayList<Object>(((Object[])data).length);
                for (Object object : (Object[])data) {
                    Object resolvedObject = this.resolveObject(name, object, elementType);
                    records.add(resolvedObject);
                }
                return records;
            }
            case MAP: 
            case STRING: 
            case BOOLEAN: 
            case TINYINT: 
            case SMALLINT: 
            case INT: 
            case BIGINT: 
            case FLOAT: 
            case DOUBLE: 
            case NULL: 
            case DECIMAL: 
            case DATE: {
                return data;
            }
            case TIMESTAMP: {
                if (this.writePathsAsInt96.contains(name)) {
                    LocalDateTime localDateTime = (LocalDateTime)data;
                    Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
                    calendar.setTime(Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant()));
                    int julianDays = (int)JulianFields.JULIAN_DAY.getFrom(LocalDate.of(calendar.get(1), calendar.get(2) + 1, calendar.get(5)));
                    long timeOfDayNanos = TimeUnit.HOURS.toNanos(calendar.get(11)) + TimeUnit.MINUTES.toNanos(calendar.get(12)) + TimeUnit.SECONDS.toNanos(calendar.get(13)) + TimeUnit.MILLISECONDS.toNanos(calendar.get(14));
                    NanoTime nanoTime = new NanoTime(julianDays, timeOfDayNanos);
                    return new GenericData.Fixed(this.schema.getField(name).schema(), nanoTime.toBinary().getBytes());
                }
                return ((LocalDateTime)data).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
            }
            case BYTES: {
                if (this.writePathsAsInt96.contains(name)) {
                    return new GenericData.Fixed(this.schema.getField(name).schema(), (byte[])data);
                }
                return ByteBuffer.wrap((byte[])data);
            }
            case ROW: {
                SeaTunnelRow seaTunnelRow = (SeaTunnelRow)data;
                SeaTunnelDataType[] fieldTypes = ((SeaTunnelRowType)seaTunnelDataType).getFieldTypes();
                String[] fieldNames = ((SeaTunnelRowType)seaTunnelDataType).getFieldNames();
                List<Integer> sinkColumnsIndex = IntStream.rangeClosed(0, fieldNames.length - 1).boxed().collect(Collectors.toList());
                Schema recordSchema = this.buildAvroSchemaWithRowType((SeaTunnelRowType)seaTunnelDataType, sinkColumnsIndex);
                GenericRecordBuilder recordBuilder = new GenericRecordBuilder(recordSchema);
                for (int i = 0; i < fieldNames.length; ++i) {
                    recordBuilder.set(fieldNames[i].toLowerCase(), this.resolveObject(fieldNames[i], seaTunnelRow.getField(i), fieldTypes[i]));
                }
                return recordBuilder.build();
            }
        }
        String errorMsg = String.format("SeaTunnel file connector is not supported for this data type [%s]", seaTunnelDataType.getSqlType());
        throw new FileConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg);
    }

    public Type seaTunnelDataType2ParquetDataType(String fieldName, SeaTunnelDataType<?> seaTunnelDataType) {
        switch (seaTunnelDataType.getSqlType()) {
            case ARRAY: {
                SeaTunnelDataType elementType = ((ArrayType)seaTunnelDataType).getElementType();
                return (Type)((Types.GroupBuilder)((Types.GroupBuilder)Types.optionalGroup().as(OriginalType.LIST)).addField((Type)((Types.GroupBuilder)Types.repeatedGroup().addField(this.seaTunnelDataType2ParquetDataType("array_element", elementType))).named("bag"))).named(fieldName);
            }
            case MAP: {
                SeaTunnelDataType keyType = ((MapType)seaTunnelDataType).getKeyType();
                SeaTunnelDataType valueType = ((MapType)seaTunnelDataType).getValueType();
                return ConversionPatterns.mapType(Type.Repetition.OPTIONAL, fieldName, this.seaTunnelDataType2ParquetDataType("key", keyType), this.seaTunnelDataType2ParquetDataType("value", valueType));
            }
            case STRING: {
                return (Type)((Types.PrimitiveBuilder)Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.OPTIONAL).as(LogicalTypeAnnotation.stringType())).named(fieldName);
            }
            case BOOLEAN: {
                return (Type)Types.primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN, Type.Repetition.OPTIONAL).named(fieldName);
            }
            case TINYINT: {
                return (Type)((Types.PrimitiveBuilder)((Types.PrimitiveBuilder)Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.OPTIONAL).as(LogicalTypeAnnotation.intType(8, true))).as(OriginalType.INT_8)).named(fieldName);
            }
            case SMALLINT: {
                return (Type)((Types.PrimitiveBuilder)((Types.PrimitiveBuilder)Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.OPTIONAL).as(LogicalTypeAnnotation.intType(16, true))).as(OriginalType.INT_16)).named(fieldName);
            }
            case INT: {
                return (Type)Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.OPTIONAL).named(fieldName);
            }
            case DATE: {
                return (Type)((Types.PrimitiveBuilder)((Types.PrimitiveBuilder)Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.OPTIONAL).as(LogicalTypeAnnotation.dateType())).as(OriginalType.DATE)).named(fieldName);
            }
            case BIGINT: {
                return (Type)Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.OPTIONAL).named(fieldName);
            }
            case TIMESTAMP: {
                if (this.writePathsAsInt96.contains(fieldName)) {
                    return (Type)Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, Type.Repetition.OPTIONAL).named(fieldName);
                }
                return (Type)((Types.PrimitiveBuilder)Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, Type.Repetition.OPTIONAL).as(OriginalType.TIMESTAMP_MILLIS)).named(fieldName);
            }
            case FLOAT: {
                return (Type)Types.primitive(PrimitiveType.PrimitiveTypeName.FLOAT, Type.Repetition.OPTIONAL).named(fieldName);
            }
            case DOUBLE: {
                return (Type)Types.primitive(PrimitiveType.PrimitiveTypeName.DOUBLE, Type.Repetition.OPTIONAL).named(fieldName);
            }
            case DECIMAL: {
                int precision = ((DecimalType)seaTunnelDataType).getPrecision();
                int scale = ((DecimalType)seaTunnelDataType).getScale();
                return (Type)((Types.PrimitiveBuilder)((Types.PrimitiveBuilder)((Types.PrimitiveBuilder)((Types.PrimitiveBuilder)Types.optional(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY).length(PRECISION_TO_BYTE_COUNT[precision - 1])).as(OriginalType.DECIMAL)).precision(precision)).scale(scale)).named(fieldName);
            }
            case BYTES: {
                if (this.writePathsAsInt96.contains(fieldName)) {
                    return (Type)((Types.PrimitiveBuilder)Types.primitive(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, Type.Repetition.OPTIONAL).length(12)).named(fieldName);
                }
                return (Type)Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.OPTIONAL).named(fieldName);
            }
            case ROW: {
                SeaTunnelDataType[] fieldTypes = ((SeaTunnelRowType)seaTunnelDataType).getFieldTypes();
                String[] fieldNames = ((SeaTunnelRowType)seaTunnelDataType).getFieldNames();
                Type[] types = new Type[fieldTypes.length];
                for (int i = 0; i < fieldNames.length; ++i) {
                    Type type;
                    types[i] = type = this.seaTunnelDataType2ParquetDataType(fieldNames[i], fieldTypes[i]);
                }
                return (Type)((Types.GroupBuilder)Types.optionalGroup().addFields(types)).named(fieldName);
            }
        }
        String errorMsg = String.format("SeaTunnel file connector is not supported for this data type [%s]", seaTunnelDataType.getSqlType());
        throw new FileConnectorException((SeaTunnelErrorCode)CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg);
    }

    private Schema buildAvroSchemaWithRowType(SeaTunnelRowType seaTunnelRowType, List<Integer> sinkColumnsIndex) {
        ArrayList types = new ArrayList();
        SeaTunnelDataType[] fieldTypes = seaTunnelRowType.getFieldTypes();
        String[] fieldNames = seaTunnelRowType.getFieldNames();
        sinkColumnsIndex.forEach(index -> {
            Type type = this.seaTunnelDataType2ParquetDataType(fieldNames[index].toLowerCase(), fieldTypes[index]);
            types.add(type);
        });
        MessageType seaTunnelRow = (MessageType)((Types.GroupBuilder)Types.buildMessage().addFields(types.toArray(new Type[0]))).named("SeaTunnelRecord");
        return this.schemaConverter.convert(seaTunnelRow);
    }

    static {
        for (int prec = 1; prec <= 38; ++prec) {
            ParquetWriteStrategy.PRECISION_TO_BYTE_COUNT[prec - 1] = (int)Math.ceil((Math.log(Math.pow(10.0, prec) - 1.0) / Math.log(2.0) + 1.0) / 8.0);
        }
    }
}

