/*
 * Decompiled with CFR 0.152.
 */
package ru.ivi.opensource.flinkclickhousesink.applied;

import com.google.common.collect.Lists;
import io.netty.handler.codec.http.HttpHeaderNames;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.Dsl;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseRequestBlank;
import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkCommonParams;
import ru.ivi.opensource.flinkclickhousesink.util.ThreadUtil;

public class ClickHouseWriter
implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(ClickHouseWriter.class);
    private ExecutorService service;
    private ExecutorService callbackService;
    private List<WriterTask> tasks;
    private final BlockingQueue<ClickHouseRequestBlank> commonQueue;
    private final AtomicLong unprocessedRequestsCounter = new AtomicLong();
    private final AsyncHttpClient asyncHttpClient;
    private final List<CompletableFuture<Boolean>> futures;
    private final ClickHouseSinkCommonParams sinkParams;

    public ClickHouseWriter(ClickHouseSinkCommonParams sinkParams, List<CompletableFuture<Boolean>> futures) {
        this(sinkParams, futures, Dsl.asyncHttpClient());
    }

    public ClickHouseWriter(ClickHouseSinkCommonParams sinkParams, List<CompletableFuture<Boolean>> futures, AsyncHttpClient asyncHttpClient) {
        this.sinkParams = sinkParams;
        this.futures = futures;
        this.commonQueue = new LinkedBlockingQueue<ClickHouseRequestBlank>(sinkParams.getQueueMaxCapacity());
        this.asyncHttpClient = asyncHttpClient;
        this.initDirAndExecutors();
    }

    private void initDirAndExecutors() {
        try {
            ClickHouseWriter.initDir(this.sinkParams.getFailedRecordsPath());
            this.buildComponents();
        }
        catch (Exception e) {
            logger.error("Error while starting CH writer", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    private static void initDir(String pathName) throws IOException {
        Path path = Paths.get(pathName, new String[0]);
        Files.createDirectories(path, new FileAttribute[0]);
    }

    private void buildComponents() {
        logger.info("Building components");
        ThreadFactory threadFactory = ThreadUtil.threadFactory("clickhouse-writer");
        this.service = Executors.newFixedThreadPool(this.sinkParams.getNumWriters(), threadFactory);
        ThreadFactory callbackServiceFactory = ThreadUtil.threadFactory("clickhouse-writer-callback-executor");
        this.callbackService = Executors.newCachedThreadPool(callbackServiceFactory);
        int numWriters = this.sinkParams.getNumWriters();
        this.tasks = Lists.newArrayListWithCapacity((int)numWriters);
        for (int i = 0; i < numWriters; ++i) {
            WriterTask task = new WriterTask(i, this.asyncHttpClient, this.commonQueue, this.sinkParams, this.callbackService, this.futures, this.unprocessedRequestsCounter);
            this.tasks.add(task);
            this.service.submit(task);
        }
    }

    public void put(ClickHouseRequestBlank params) {
        try {
            this.unprocessedRequestsCounter.incrementAndGet();
            this.commonQueue.put(params);
        }
        catch (InterruptedException e) {
            logger.error("Interrupted error while putting data to queue", (Throwable)e);
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    /*
     * Exception decompiling
     */
    private void waitUntilAllFuturesDone() {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private void stopWriters() {
        logger.info("Stopping writers.");
        if (this.tasks != null && this.tasks.size() > 0) {
            this.tasks.forEach(WriterTask::setStopWorking);
        }
        logger.info("Writers stopped.");
    }

    @Override
    public void close() throws Exception {
        logger.info("ClickHouseWriter is shutting down.");
        try {
            this.waitUntilAllFuturesDone();
        }
        finally {
            ThreadUtil.shutdownExecutorService(this.service);
            ThreadUtil.shutdownExecutorService(this.callbackService);
            this.asyncHttpClient.close();
            logger.info("{} shutdown complete.", (Object)ClickHouseWriter.class.getSimpleName());
        }
    }

    private static /* synthetic */ boolean lambda$waitUntilAllFuturesDone$0(CompletableFuture f) {
        return f.isDone() && !f.isCompletedExceptionally();
    }

    static class WriterTask
    implements Runnable {
        private static final Logger logger = LoggerFactory.getLogger(WriterTask.class);
        private static final int HTTP_OK = 200;
        private final BlockingQueue<ClickHouseRequestBlank> queue;
        private final AtomicLong queueCounter;
        private final ClickHouseSinkCommonParams sinkSettings;
        private final AsyncHttpClient asyncHttpClient;
        private final ExecutorService callbackService;
        private final List<CompletableFuture<Boolean>> futures;
        private final int id;
        private volatile boolean isWorking;

        WriterTask(int id, AsyncHttpClient asyncHttpClient, BlockingQueue<ClickHouseRequestBlank> queue, ClickHouseSinkCommonParams settings, ExecutorService callbackService, List<CompletableFuture<Boolean>> futures, AtomicLong queueCounter) {
            this.id = id;
            this.sinkSettings = settings;
            this.queue = queue;
            this.callbackService = callbackService;
            this.asyncHttpClient = asyncHttpClient;
            this.futures = futures;
            this.queueCounter = queueCounter;
        }

        @Override
        public void run() {
            try {
                this.isWorking = true;
                logger.info("Start writer task, id = {}", (Object)this.id);
                while (this.isWorking || this.queue.size() > 0) {
                    ClickHouseRequestBlank blank = this.queue.poll(300L, TimeUnit.MILLISECONDS);
                    if (blank == null) continue;
                    CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
                    this.futures.add(future);
                    this.send(blank, future);
                }
            }
            catch (Exception e) {
                logger.error("Error while inserting data", (Throwable)e);
                throw new RuntimeException(e);
            }
            finally {
                logger.info("Task id = {} is finished", (Object)this.id);
            }
        }

        private void send(ClickHouseRequestBlank requestBlank, CompletableFuture<Boolean> future) {
            Request request = this.buildRequest(requestBlank);
            logger.info("Ready to load data to {}, size = {}", (Object)requestBlank.getTargetTable(), (Object)requestBlank.getValues().size());
            ListenableFuture whenResponse = this.asyncHttpClient.executeRequest(request);
            Runnable callback = this.responseCallback((ListenableFuture<Response>)whenResponse, requestBlank, future);
            whenResponse.addListener(callback, (Executor)this.callbackService);
        }

        private Request buildRequest(ClickHouseRequestBlank requestBlank) {
            String resultCSV = String.join((CharSequence)" , ", requestBlank.getValues());
            String query = String.format("INSERT INTO %s VALUES %s", requestBlank.getTargetTable(), resultCSV);
            String host = this.sinkSettings.getClickHouseClusterSettings().getRandomHostUrl();
            BoundRequestBuilder builder = (BoundRequestBuilder)((BoundRequestBuilder)this.asyncHttpClient.preparePost(host).setHeader((CharSequence)HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=utf-8")).setBody(query);
            if (this.sinkSettings.getClickHouseClusterSettings().isAuthorizationRequired()) {
                builder.setHeader((CharSequence)HttpHeaderNames.AUTHORIZATION, "Basic " + this.sinkSettings.getClickHouseClusterSettings().getCredentials());
            }
            return builder.build();
        }

        private Runnable responseCallback(ListenableFuture<Response> whenResponse, ClickHouseRequestBlank requestBlank, CompletableFuture<Boolean> future) {
            return () -> {
                Response response = null;
                try {
                    response = (Response)whenResponse.get();
                    if (response.getStatusCode() != 200) {
                        this.handleUnsuccessfulResponse(response, requestBlank, future);
                    } else {
                        logger.info("Successful send data to ClickHouse, batch size = {}, target table = {}, current attempt = {}", new Object[]{requestBlank.getValues().size(), requestBlank.getTargetTable(), requestBlank.getAttemptCounter()});
                        future.complete(true);
                    }
                }
                catch (Exception e) {
                    logger.error("Error while executing callback, params = {}", (Object)this.sinkSettings, (Object)e);
                    requestBlank.setException(e);
                    try {
                        this.handleUnsuccessfulResponse(response, requestBlank, future);
                    }
                    catch (Exception error) {
                        logger.error("Error while handle unsuccessful response", (Throwable)error);
                        future.completeExceptionally(error);
                    }
                }
                finally {
                    this.queueCounter.decrementAndGet();
                }
            };
        }

        private void handleUnsuccessfulResponse(Response response, ClickHouseRequestBlank requestBlank, CompletableFuture<Boolean> future) throws Exception {
            int currentCounter = requestBlank.getAttemptCounter();
            if (currentCounter >= this.sinkSettings.getMaxRetries()) {
                logger.warn("Failed to send data to ClickHouse, cause: limit of attempts is exceeded. ClickHouse response = {}. Ready to flush data on disk.", (Object)response, (Object)requestBlank.getException());
                this.logFailedRecords(requestBlank);
                future.completeExceptionally(new RuntimeException(String.format("Failed to send data to ClickHouse, cause: limit of attempts is exceeded. ClickHouse response: %s. Cause: %s", response != null ? response.getResponseBody() : null, requestBlank.getException())));
            } else {
                requestBlank.incrementCounter();
                logger.warn("Next attempt to send data to ClickHouse, table = {}, buffer size = {}, current attempt num = {}, max attempt num = {}, response = {}", new Object[]{requestBlank.getTargetTable(), requestBlank.getValues().size(), requestBlank.getAttemptCounter(), this.sinkSettings.getMaxRetries(), response});
                this.queueCounter.incrementAndGet();
                this.queue.put(requestBlank);
                future.complete(false);
            }
        }

        private void logFailedRecords(ClickHouseRequestBlank requestBlank) throws Exception {
            String filePath = String.format("%s/%s_%s", this.sinkSettings.getFailedRecordsPath(), requestBlank.getTargetTable(), System.currentTimeMillis());
            try (PrintWriter writer = new PrintWriter(filePath);){
                List<String> records = requestBlank.getValues();
                records.forEach(writer::println);
                writer.flush();
            }
            logger.info("Successful send data on disk, path = {}, size = {} ", (Object)filePath, (Object)requestBlank.getValues().size());
        }

        void setStopWorking() {
            this.isWorking = false;
        }
    }
}

