package com.zmops.zeus.server.transfer.core.source.reader;

import com.zmops.zeus.server.transfer.api.Message;
import com.zmops.zeus.server.transfer.api.Reader;
import com.zmops.zeus.server.transfer.api.Validator;
import com.zmops.zeus.server.transfer.conf.JobConstants;
import com.zmops.zeus.server.transfer.conf.JobProfile;
import com.zmops.zeus.server.transfer.core.FileException;
import com.zmops.zeus.server.transfer.core.message.DefaultMessage;
import com.zmops.zeus.server.transfer.core.validator.PatternValidator;
import com.zmops.zeus.server.transfer.metrics.PluginMetric;
import com.zmops.zeus.server.transfer.utils.TransferUtils;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/zmops/zeus/server/transfer/core/source/reader/TextFileReader.class */
public class TextFileReader implements Reader {
    private static final Logger LOGGER = LoggerFactory.getLogger(TextFileReader.class);
    public static final int NEVER_STOP_SIGN = -1;
    private final File file;
    private final int position;
    private final String md5;
    private Iterator<String> iterator;
    private Stream<String> stream;
    private long timeout;
    private long lastTime;
    private final PluginMetric textFileMetric;
    private final List<Validator> validators;

    public TextFileReader(File file, int i) {
        this(file, i, JobConstants.DEFAULT_JOB_LINE_FILTER);
    }

    public TextFileReader(File file, int i, String str) {
        this.lastTime = 0L;
        this.validators = new ArrayList();
        this.file = file;
        this.position = i;
        this.md5 = str;
        this.textFileMetric = new PluginMetric();
        this.textFileMetric.tagName.setName(file.getAbsolutePath());
    }

    public TextFileReader(File file) {
        this(file, 0);
    }

    @Override // com.zmops.zeus.server.transfer.api.Reader
    public Message read() {
        if (this.iterator != null && this.iterator.hasNext()) {
            String next = this.iterator.next();
            if (validateMessage(next)) {
                this.textFileMetric.readNum.incr();
                return new DefaultMessage(next.getBytes(StandardCharsets.UTF_8));
            }
        }
        TransferUtils.silenceSleepInMs(100L);
        return null;
    }

    private boolean validateMessage(String str) {
        if (this.validators.isEmpty()) {
            return true;
        }
        return this.validators.stream().allMatch(validator -> {
            return validator.validate(str);
        });
    }

    @Override // com.zmops.zeus.server.transfer.api.Reader
    public boolean isFinished() {
        if (this.timeout == -1) {
            return false;
        }
        if (this.iterator == null) {
            return true;
        }
        if (this.iterator.hasNext()) {
            this.lastTime = 0L;
            return false;
        }
        if (this.lastTime == 0) {
            this.lastTime = System.currentTimeMillis();
        }
        return System.currentTimeMillis() - this.lastTime > this.timeout;
    }

    @Override // com.zmops.zeus.server.transfer.api.Reader
    public String getReadFile() {
        return this.file.getAbsolutePath();
    }

    @Override // com.zmops.zeus.server.transfer.api.Reader
    public void setReadTimeout(long j) {
        this.timeout = j;
    }

    public void addPatternValidator(String str) {
        if (str.isEmpty()) {
            return;
        }
        this.validators.add(new PatternValidator(str));
    }

    @Override // com.zmops.zeus.server.transfer.api.Stage
    public void init(JobProfile jobProfile) {
        try {
            initReadTimeout(jobProfile);
            String fileMd5 = TransferUtils.getFileMd5(this.file);
            if (StringUtils.isNotBlank(this.md5) && !this.md5.equals(fileMd5)) {
                LOGGER.warn("md5 is differ from origin, origin: {}, new {}", this.md5, fileMd5);
            }
            LOGGER.info("file name for task is {}, md5 is {}", this.file, fileMd5);
            this.stream = Files.newBufferedReader(this.file.toPath()).lines().skip(this.position);
            this.iterator = this.stream.iterator();
        } catch (Exception e) {
            throw new FileException("error init stream for " + this.file.getPath(), e);
        }
    }

    private void initReadTimeout(JobProfile jobProfile) {
        int i = jobProfile.getInt(JobConstants.JOB_FILE_MAX_WAIT, 1);
        if (i == -1) {
            this.timeout = -1L;
        } else {
            this.timeout = TimeUnit.MINUTES.toMillis(i);
        }
    }

    @Override // com.zmops.zeus.server.transfer.api.Stage
    public void destroy() {
        TransferUtils.finallyClose(this.stream);
        LOGGER.info("destroy reader with read {} num {}", this.textFileMetric.tagName.getName(), this.textFileMetric.readNum.snapshot2());
    }
}
