/*
 * Decompiled with CFR 0.152.
 */
package ru.ivi.opensource.flinkclickhousesink.applied;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.Dsl;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.ivi.opensource.flinkclickhousesink.model.ClickhouseRequestBlank;
import ru.ivi.opensource.flinkclickhousesink.model.ClickhouseSinkCommonParams;
import ru.ivi.opensource.flinkclickhousesink.util.ThreadUtil;

public class ClickhouseWriter
implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(ClickhouseWriter.class);
    private ExecutorService service;
    private ExecutorService callbackService;
    private List<WriterTask> tasks;
    private BlockingQueue<ClickhouseRequestBlank> commonQueue;
    private AsyncHttpClient asyncHttpClient;
    private ClickhouseSinkCommonParams sinkParams;

    public ClickhouseWriter(ClickhouseSinkCommonParams sinkParams) {
        this.sinkParams = sinkParams;
        this.initDirAndExecutors();
    }

    private void initDirAndExecutors() {
        try {
            ClickhouseWriter.initDir(this.sinkParams.getFailedRecordsPath());
            this.buildComponents();
        }
        catch (Exception e) {
            logger.error("Error while starting CH writer", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    private static void initDir(String pathName) throws IOException {
        Path path = Paths.get(pathName, new String[0]);
        Files.createDirectories(path, new FileAttribute[0]);
    }

    private void buildComponents() {
        this.asyncHttpClient = Dsl.asyncHttpClient();
        int numWriters = this.sinkParams.getNumWriters();
        this.commonQueue = new LinkedBlockingQueue<ClickhouseRequestBlank>(this.sinkParams.getQueueMaxCapacity());
        ThreadFactory threadFactory = ThreadUtil.threadFactory("clickhouse-writer");
        this.service = Executors.newFixedThreadPool(this.sinkParams.getNumWriters(), threadFactory);
        ThreadFactory callbackServiceFactory = ThreadUtil.threadFactory("clickhouse-writer-callback-executor");
        int cores = Runtime.getRuntime().availableProcessors();
        int coreThreadsNum = Math.max(cores / 4, 2);
        this.callbackService = new ThreadPoolExecutor(coreThreadsNum, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), callbackServiceFactory);
        this.tasks = Lists.newArrayList();
        for (int i = 0; i < numWriters; ++i) {
            WriterTask task = new WriterTask(i, this.asyncHttpClient, this.commonQueue, this.sinkParams, this.callbackService);
            this.tasks.add(task);
            this.service.submit(task);
        }
    }

    public void put(ClickhouseRequestBlank params) {
        try {
            this.commonQueue.put(params);
        }
        catch (InterruptedException e) {
            logger.error("Interrupted error while putting data to queue", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    private void stopWriters() {
        if (this.tasks != null && this.tasks.size() > 0) {
            this.tasks.forEach(WriterTask::setStopWorking);
        }
    }

    @Override
    public void close() throws Exception {
        logger.info("Closing clickhouse-writer...");
        this.stopWriters();
        ThreadUtil.shutdownExecutorService(this.service);
        ThreadUtil.shutdownExecutorService(this.callbackService);
        this.asyncHttpClient.close();
        logger.info("{} is closed", (Object)ClickhouseWriter.class.getSimpleName());
    }

    static class WriterTask
    implements Runnable {
        private static final Logger logger = LoggerFactory.getLogger(WriterTask.class);
        private static final int HTTP_OK = 200;
        private final BlockingQueue<ClickhouseRequestBlank> queue;
        private final ClickhouseSinkCommonParams sinkSettings;
        private final AsyncHttpClient asyncHttpClient;
        private final ExecutorService callbackService;
        private final int id;
        private volatile boolean isWorking;

        WriterTask(int id, AsyncHttpClient asyncHttpClient, BlockingQueue<ClickhouseRequestBlank> queue, ClickhouseSinkCommonParams settings, ExecutorService callbackService) {
            this.id = id;
            this.sinkSettings = settings;
            this.queue = queue;
            this.callbackService = callbackService;
            this.asyncHttpClient = asyncHttpClient;
        }

        @Override
        public void run() {
            try {
                this.isWorking = true;
                logger.info("Start writer task, id = {}", (Object)this.id);
                while (this.isWorking || this.queue.size() > 0) {
                    ClickhouseRequestBlank blank = this.queue.poll(300L, TimeUnit.MILLISECONDS);
                    if (blank == null) continue;
                    this.send(blank);
                }
            }
            catch (Exception e) {
                logger.error("Error while inserting data", (Throwable)e);
                throw new RuntimeException(e);
            }
            finally {
                logger.info("Task id = {} is finished", (Object)this.id);
            }
        }

        private void send(ClickhouseRequestBlank requestBlank) {
            Request request = this.buildRequest(requestBlank);
            logger.debug("Ready to load data to {}, size = {}", (Object)requestBlank.getTargetTable(), (Object)requestBlank.getValues().size());
            ListenableFuture whenResponse = this.asyncHttpClient.executeRequest(request);
            Runnable callback = this.responseCallback((ListenableFuture<Response>)whenResponse, requestBlank);
            whenResponse.addListener(callback, (Executor)this.callbackService);
        }

        private Request buildRequest(ClickhouseRequestBlank requestBlank) {
            String resultCSV = String.join((CharSequence)" , ", requestBlank.getValues());
            String query = String.format("INSERT INTO %s VALUES %s", requestBlank.getTargetTable(), resultCSV);
            String host = this.sinkSettings.getClickhouseClusterSettings().getRandomHostUrl();
            BoundRequestBuilder builder = (BoundRequestBuilder)((BoundRequestBuilder)this.asyncHttpClient.preparePost(host).setHeader((CharSequence)"Content-Type", "text/plain; charset=utf-8")).setBody(query);
            if (this.sinkSettings.getClickhouseClusterSettings().isAuthorizationRequired()) {
                builder.setHeader((CharSequence)"Authorization", "Basic " + this.sinkSettings.getClickhouseClusterSettings().getCredentials());
            }
            return builder.build();
        }

        private Runnable responseCallback(ListenableFuture<Response> whenResponse, ClickhouseRequestBlank requestBlank) {
            return () -> {
                Response response = null;
                try {
                    response = (Response)whenResponse.get();
                    if (response.getStatusCode() != 200) {
                        this.handleUnsuccessfulResponse(response, requestBlank);
                    } else {
                        logger.info("Successful send data to Clickhouse, batch size = {}, target table = {}, current attempt = {}", new Object[]{requestBlank.getValues().size(), requestBlank.getTargetTable(), requestBlank.getAttemptCounter()});
                    }
                }
                catch (Exception e) {
                    logger.error("Error while executing callback, params = {}", (Object)this.sinkSettings, (Object)e);
                    try {
                        this.handleUnsuccessfulResponse(response, requestBlank);
                    }
                    catch (Exception error) {
                        logger.error("Error while handle unsuccessful response", (Throwable)error);
                    }
                }
            };
        }

        private void handleUnsuccessfulResponse(Response response, ClickhouseRequestBlank requestBlank) throws Exception {
            int currentCounter = requestBlank.getAttemptCounter();
            if (currentCounter > this.sinkSettings.getMaxRetries()) {
                logger.warn("Failed to send data to Clickhouse, cause: limit of attempts is exceeded. Clickhouse response = {}. Ready to flush data on disk", (Object)response);
                this.logFailedRecords(requestBlank);
            } else {
                requestBlank.incrementCounter();
                logger.warn("Next attempt to send data to Clickhouse, table = {}, buffer size = {}, current attempt num = {}, max attempt num = {}, response = {}", new Object[]{requestBlank.getTargetTable(), requestBlank.getValues().size(), requestBlank.getAttemptCounter(), this.sinkSettings.getMaxRetries(), response});
                this.queue.put(requestBlank);
            }
        }

        private void logFailedRecords(ClickhouseRequestBlank requestBlank) throws Exception {
            String filePath = String.format("%s/%s_%s", this.sinkSettings.getFailedRecordsPath(), requestBlank.getTargetTable(), System.currentTimeMillis());
            try (PrintWriter writer = new PrintWriter(filePath);){
                List<String> records = requestBlank.getValues();
                records.forEach(writer::println);
                writer.flush();
            }
            logger.info("Successful send data on disk, path = {}, size = {} ", (Object)filePath, (Object)requestBlank.getValues().size());
        }

        void setStopWorking() {
            this.isWorking = false;
        }
    }
}

