/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.file.source.reader;

import io.airlift.compress.lzo.LzopCodec;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.AbstractReadStrategy;
import org.apache.seatunnel.format.text.TextDeserializationSchema;
import org.apache.seatunnel.format.text.splitor.DefaultTextLineSplitor;
import org.apache.seatunnel.format.text.splitor.TextLineSplitor;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TextReadStrategy
extends AbstractReadStrategy {
    private static final Logger log = LoggerFactory.getLogger(TextReadStrategy.class);
    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
    private String fieldDelimiter = (String)FileBaseSourceOptions.FIELD_DELIMITER.defaultValue();
    private String rowDelimiter = (String)FileBaseSourceOptions.ROW_DELIMITER.defaultValue();
    private DateUtils.Formatter dateFormat = (DateUtils.Formatter)FileBaseSourceOptions.DATE_FORMAT_LEGACY.defaultValue();
    private DateTimeUtils.Formatter datetimeFormat = (DateTimeUtils.Formatter)FileBaseSourceOptions.DATETIME_FORMAT_LEGACY.defaultValue();
    private TimeUtils.Formatter timeFormat = (TimeUtils.Formatter)FileBaseSourceOptions.TIME_FORMAT_LEGACY.defaultValue();
    private CompressFormat compressFormat = (CompressFormat)FileBaseSourceOptions.COMPRESS_CODEC.defaultValue();
    private TextLineSplitor textLineSplitor;
    private int[] indexes;
    private String encoding = (String)FileBaseSourceOptions.ENCODING.defaultValue();

    @Override
    public void read(String path, String tableId, Collector<SeaTunnelRow> output) throws FileConnectorException, IOException {
        Map<String, String> partitionsMap = this.parsePartitionsByPath(path);
        this.resolveArchiveCompressedInputStream(path, tableId, output, partitionsMap, FileFormat.TEXT);
    }

    @Override
    public void readProcess(String path, String tableId, Collector<SeaTunnelRow> output, InputStream inputStream, Map<String, String> partitionsMap, String currentFileName) throws IOException {
        InputStream actualInputStream;
        switch (this.compressFormat) {
            case LZO: {
                LzopCodec lzo = new LzopCodec();
                actualInputStream = lzo.createInputStream(inputStream);
                break;
            }
            case NONE: {
                actualInputStream = inputStream;
                break;
            }
            default: {
                log.warn("Text file does not support this compress type: {}", (Object)this.compressFormat.getCompressCodec());
                actualInputStream = inputStream;
            }
        }
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(actualInputStream, this.encoding));){
            LineProcessor lineProcessor = line -> {
                try {
                    this.processLineData(line, tableId, output, partitionsMap);
                }
                catch (FileConnectorException e) {
                    throw new IOException((Throwable)((Object)e));
                }
            };
            StreamLineSplitter splitter = new StreamLineSplitter(this.rowDelimiter, this.skipHeaderNumber, lineProcessor);
            splitter.processStream(reader);
        }
    }

    private void processLineData(String line, String tableId, Collector<SeaTunnelRow> output, Map<String, String> partitionsMap) throws FileConnectorException {
        try {
            SeaTunnelRow seaTunnelRow = (SeaTunnelRow)this.deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8));
            if (!this.readColumns.isEmpty()) {
                Object[] fields = this.isMergePartition ? new Object[this.readColumns.size() + partitionsMap.size()] : new Object[this.readColumns.size()];
                for (int i = 0; i < this.indexes.length; ++i) {
                    fields[i] = seaTunnelRow.getField(this.indexes[i]);
                }
                seaTunnelRow = new SeaTunnelRow(fields);
            }
            if (this.isMergePartition) {
                int index = this.seaTunnelRowType.getTotalFields();
                for (String value : partitionsMap.values()) {
                    seaTunnelRow.setField(index++, (Object)value);
                }
            }
            seaTunnelRow.setTableId(tableId);
            output.collect((Object)seaTunnelRow);
        }
        catch (IOException e) {
            String errorMsg = String.format("Deserialize this data [%s] failed, please check the origin data", line);
            throw new FileConnectorException(FileConnectorErrorCode.DATA_DESERIALIZE_FAILED, errorMsg, e);
        }
    }

    @Override
    public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) {
        this.seaTunnelRowType = CatalogTableUtil.buildSimpleTextSchema();
        this.seaTunnelRowTypeWithPartition = this.mergePartitionTypes((String)this.fileNames.get(0), this.seaTunnelRowType);
        this.initFormatter();
        if (this.pluginConfig.hasPath(FileBaseSourceOptions.READ_COLUMNS.key())) {
            throw new FileConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, "When reading text files, if user has not specified schema information, SeaTunnel will not support column projection");
        }
        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig((Config)this.pluginConfig);
        TextDeserializationSchema.Builder builder = TextDeserializationSchema.builder().delimiter("\b").textLineSplitor(this.textLineSplitor).nullFormat(readonlyConfig.getOptional(FileBaseSourceOptions.NULL_FORMAT).orElse(null));
        this.deserializationSchema = this.isMergePartition ? builder.seaTunnelRowType(this.seaTunnelRowTypeWithPartition).build() : builder.seaTunnelRowType(this.seaTunnelRowType).build();
        return this.getActualSeaTunnelRowTypeInfo();
    }

    @Override
    public void setCatalogTable(CatalogTable catalogTable) {
        SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
        SeaTunnelRowType userDefinedRowTypeWithPartition = this.mergePartitionTypes((String)this.fileNames.get(0), rowType);
        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig((Config)this.pluginConfig);
        Optional fieldDelimiterOptional = readonlyConfig.getOptional(FileBaseSourceOptions.FIELD_DELIMITER);
        Optional rowDelimiterOptional = readonlyConfig.getOptional(FileBaseSourceOptions.ROW_DELIMITER);
        this.encoding = readonlyConfig.getOptional(FileBaseSourceOptions.ENCODING).orElse(StandardCharsets.UTF_8.name());
        fieldDelimiterOptional.ifPresent(s -> {
            this.fieldDelimiter = s;
        });
        rowDelimiterOptional.ifPresent(s -> {
            this.rowDelimiter = s;
        });
        this.initFormatter();
        TextDeserializationSchema.Builder builder = TextDeserializationSchema.builder().delimiter(this.fieldDelimiter).textLineSplitor(this.textLineSplitor).nullFormat(readonlyConfig.getOptional(FileBaseSourceOptions.NULL_FORMAT).orElse(null));
        this.deserializationSchema = this.isMergePartition ? builder.seaTunnelRowType(userDefinedRowTypeWithPartition).build() : builder.seaTunnelRowType(rowType).build();
        if (this.pluginConfig.hasPath(FileBaseSourceOptions.READ_COLUMNS.key())) {
            this.indexes = new int[this.readColumns.size()];
            String[] fields = new String[this.readColumns.size()];
            SeaTunnelDataType[] types = new SeaTunnelDataType[this.readColumns.size()];
            for (int i = 0; i < this.indexes.length; ++i) {
                this.indexes[i] = rowType.indexOf((String)this.readColumns.get(i));
                fields[i] = rowType.getFieldName(this.indexes[i]);
                types[i] = rowType.getFieldType(this.indexes[i]);
            }
            this.seaTunnelRowType = new SeaTunnelRowType(fields, types);
            this.seaTunnelRowTypeWithPartition = this.mergePartitionTypes((String)this.fileNames.get(0), this.seaTunnelRowType);
        } else {
            this.seaTunnelRowType = rowType;
            this.seaTunnelRowTypeWithPartition = userDefinedRowTypeWithPartition;
        }
    }

    private void initFormatter() {
        if (this.pluginConfig.hasPath(FileBaseSourceOptions.DATE_FORMAT_LEGACY.key())) {
            this.dateFormat = DateUtils.Formatter.parse((String)this.pluginConfig.getString(FileBaseSourceOptions.DATE_FORMAT_LEGACY.key()));
        }
        if (this.pluginConfig.hasPath(FileBaseSourceOptions.DATETIME_FORMAT_LEGACY.key())) {
            this.datetimeFormat = DateTimeUtils.Formatter.parse((String)this.pluginConfig.getString(FileBaseSourceOptions.DATETIME_FORMAT_LEGACY.key()));
        }
        if (this.pluginConfig.hasPath(FileBaseSourceOptions.TIME_FORMAT_LEGACY.key())) {
            this.timeFormat = TimeUtils.Formatter.parse((String)this.pluginConfig.getString(FileBaseSourceOptions.TIME_FORMAT_LEGACY.key()));
        }
        if (this.pluginConfig.hasPath(FileBaseSourceOptions.COMPRESS_CODEC.key())) {
            String compressCodec = this.pluginConfig.getString(FileBaseSourceOptions.COMPRESS_CODEC.key());
            this.compressFormat = CompressFormat.valueOf(compressCodec.toUpperCase());
        }
        this.textLineSplitor = new DefaultTextLineSplitor();
    }

    public static interface LineProcessor {
        public void processLine(String var1) throws IOException;
    }

    public static class StreamLineSplitter {
        private final char[] delimiterChars;
        private final StringBuilder lineBuffer;
        private int delimiterIndex;
        private int skipCount;
        private final long skipHeaderNumber;
        private final LineProcessor lineProcessor;
        private final boolean useReadLine;

        public StreamLineSplitter(String delimiter, long skipHeaderNumber, LineProcessor lineProcessor) {
            this.delimiterChars = delimiter.toCharArray();
            this.lineBuffer = new StringBuilder();
            this.delimiterIndex = 0;
            this.skipCount = 0;
            this.skipHeaderNumber = skipHeaderNumber;
            this.lineProcessor = lineProcessor;
            this.useReadLine = this.isDefaultLineDelimiter(delimiter);
        }

        private boolean isDefaultLineDelimiter(String delimiter) {
            return "\n".equals(delimiter) || "\r".equals(delimiter) || "\r\n".equals(delimiter);
        }

        public void processStream(BufferedReader reader) throws IOException {
            if (this.useReadLine) {
                this.processWithReadLine(reader);
            } else {
                this.processWithCharByChar(reader);
            }
        }

        private void processWithReadLine(BufferedReader reader) throws IOException {
            String line;
            int lineCount = 0;
            while ((line = reader.readLine()) != null) {
                if ((long)lineCount >= this.skipHeaderNumber) {
                    if (line.trim().isEmpty()) continue;
                    this.lineProcessor.processLine(line);
                    continue;
                }
                ++lineCount;
            }
        }

        private void processWithCharByChar(BufferedReader reader) throws IOException {
            String line;
            int ch;
            while ((ch = reader.read()) != -1) {
                char currentChar = (char)ch;
                this.processChar(currentChar);
            }
            if (this.lineBuffer.length() > 0 && (long)this.skipCount >= this.skipHeaderNumber && !(line = this.lineBuffer.toString()).trim().isEmpty()) {
                this.lineProcessor.processLine(line);
            }
        }

        private void processChar(char currentChar) throws IOException {
            if (currentChar == this.delimiterChars[this.delimiterIndex]) {
                ++this.delimiterIndex;
                if (this.delimiterIndex == this.delimiterChars.length) {
                    if ((long)this.skipCount >= this.skipHeaderNumber) {
                        String line = this.lineBuffer.toString();
                        if (!line.trim().isEmpty()) {
                            this.lineProcessor.processLine(line);
                        }
                    } else {
                        ++this.skipCount;
                    }
                    this.lineBuffer.setLength(0);
                    this.delimiterIndex = 0;
                }
            } else {
                if (this.delimiterIndex > 0) {
                    for (int i = 0; i < this.delimiterIndex; ++i) {
                        this.lineBuffer.append(this.delimiterChars[i]);
                    }
                    this.delimiterIndex = 0;
                }
                this.lineBuffer.append(currentChar);
            }
        }
    }
}

