/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.flink.kinesis.shaded.org.apache.commons.io.Charsets;
import org.apache.flink.kinesis.shaded.org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LogInputStreamReader
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(LogInputStreamReader.class);
    private static final Pattern LEVEL_REGEX = Pattern.compile("\\[(?<level>trace|debug|info|warn(?:ing)?|error|fatal)\\]", 10);
    private static final Map<String, LoggingFunction> EMITTERS = LogInputStreamReader.makeEmitters();
    private final String streamType;
    private final BufferedReader reader;
    private final DefaultLoggingFunction logFunction;
    private volatile boolean running = true;
    private volatile boolean shuttingDown = false;
    private boolean isReadingRecord = false;
    private final LinkedList<String> messageData = new LinkedList();

    private static Map<String, LoggingFunction> makeEmitters() {
        HashMap<String, LoggingFunction> emitters = new HashMap<String, LoggingFunction>();
        emitters.put("trace", new LoggingFunction(){

            @Override
            public void apply(String message) {
                log.trace(message);
            }
        });
        emitters.put("debug", new LoggingFunction(){

            @Override
            public void apply(String message) {
                log.debug(message);
            }
        });
        emitters.put("info", new LoggingFunction(){

            @Override
            public void apply(String message) {
                log.info(message);
            }
        });
        emitters.put("warn", new LoggingFunction(){

            @Override
            public void apply(String message) {
                log.warn(message);
            }
        });
        emitters.put("warning", new LoggingFunction(){

            @Override
            public void apply(String message) {
                log.warn(message);
            }
        });
        emitters.put("error", new LoggingFunction(){

            @Override
            public void apply(String message) {
                log.error(message);
            }
        });
        emitters.put("fatal", new LoggingFunction(){

            @Override
            public void apply(String message) {
                log.error(message);
            }
        });
        return Collections.unmodifiableMap(emitters);
    }

    public LogInputStreamReader(InputStream is, String streamType, DefaultLoggingFunction logFunction) {
        this.streamType = streamType;
        this.reader = new BufferedReader(new InputStreamReader(is, Charsets.US_ASCII));
        this.logFunction = logFunction;
    }

    @Override
    public void run() {
        while (this.running) {
            try {
                String logLine = this.reader.readLine();
                if (logLine == null) continue;
                if (logLine.startsWith("++++")) {
                    this.startRead();
                    continue;
                }
                if (logLine.startsWith("----")) {
                    this.finishRead();
                    continue;
                }
                if (this.isReadingRecord) {
                    this.messageData.add(logLine);
                    continue;
                }
                this.logFunction.apply(log, logLine);
            }
            catch (IOException ioex) {
                if (this.shuttingDown) {
                    if (ioex.getMessage() != null && ioex.getMessage().contains("Stream closed")) continue;
                    log.info("Received IO Exception during shutdown.  This can happen, but should indicate that the stream has been closed: {}", (Object)ioex.getMessage());
                    continue;
                }
                log.error("Caught IO Exception while reading log line", (Throwable)ioex);
            }
        }
        if (!this.messageData.isEmpty()) {
            this.logFunction.apply(log, this.makeMessage());
        }
    }

    private void finishRead() {
        if (!this.isReadingRecord) {
            log.warn("{}: Terminator encountered, but wasn't reading record.", (Object)this.streamType);
        }
        this.isReadingRecord = false;
        if (!this.messageData.isEmpty()) {
            String message = this.makeMessage();
            this.getLevelOrDefault(message).apply(message);
        } else {
            log.warn("{}: Finished reading record, but didn't find any message data.", (Object)this.streamType);
        }
        this.messageData.clear();
    }

    private void startRead() {
        this.isReadingRecord = true;
        if (!this.messageData.isEmpty()) {
            log.warn("{}: New log record started, but message data has existing data: {}", (Object)this.streamType, (Object)this.makeMessage());
            this.messageData.clear();
        }
    }

    private LoggingFunction getLevelOrDefault(String message) {
        LoggingFunction res;
        String level;
        Matcher matcher = LEVEL_REGEX.matcher(message);
        if (matcher.find() && (level = matcher.group("level")) != null && (res = EMITTERS.get(level.toLowerCase())) != null) {
            return res;
        }
        return new LoggingFunction(){

            @Override
            public void apply(String message) {
                LogInputStreamReader.this.logFunction.apply(log, "!!Failed to extract level!! - " + message);
            }
        };
    }

    private String makeMessage() {
        return StringUtils.join(this.messageData, "\n");
    }

    public void shutdown() {
        this.running = false;
    }

    public void prepareForShutdown() {
        this.shuttingDown = true;
    }

    static interface DefaultLoggingFunction {
        public void apply(Logger var1, String var2);
    }

    static interface LoggingFunction {
        public void apply(String var1);
    }
}

