/*
 * Decompiled with CFR 0.152.
 */
package ru.ivi.opensource.flinkclickhousesink.applied;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseSinkBuffer;
import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkCommonParams;
import ru.ivi.opensource.flinkclickhousesink.util.ThreadUtil;

public class ClickHouseSinkScheduledCheckerAndCleaner
implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(ClickHouseSinkScheduledCheckerAndCleaner.class);
    private final ScheduledExecutorService scheduledExecutorService;
    private final List<ClickHouseSinkBuffer> clickHouseSinkBuffers = new ArrayList<ClickHouseSinkBuffer>();
    private final List<CompletableFuture<Boolean>> futures;
    private final Predicate<CompletableFuture<Boolean>> filter;

    public ClickHouseSinkScheduledCheckerAndCleaner(ClickHouseSinkCommonParams props, List<CompletableFuture<Boolean>> futures) {
        this.futures = futures;
        this.filter = ClickHouseSinkScheduledCheckerAndCleaner.getFuturesFilter(props.isIgnoringClickHouseSendingExceptionEnabled());
        ThreadFactory factory = ThreadUtil.threadFactory("clickhouse-writer-checker-and-cleaner");
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(factory);
        this.scheduledExecutorService.scheduleWithFixedDelay(this.getTask(), props.getTimeout(), props.getTimeout(), TimeUnit.SECONDS);
        logger.info("Build Sink scheduled checker, timeout (sec) = {}", (Object)props.getTimeout());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addSinkBuffer(ClickHouseSinkBuffer clickHouseSinkBuffer) {
        ClickHouseSinkScheduledCheckerAndCleaner clickHouseSinkScheduledCheckerAndCleaner = this;
        synchronized (clickHouseSinkScheduledCheckerAndCleaner) {
            this.clickHouseSinkBuffers.add(clickHouseSinkBuffer);
        }
        logger.debug("Add sinkBuffer, target table = {}", (Object)clickHouseSinkBuffer.getTargetTable());
    }

    private Runnable getTask() {
        return () -> {
            ClickHouseSinkScheduledCheckerAndCleaner clickHouseSinkScheduledCheckerAndCleaner = this;
            synchronized (clickHouseSinkScheduledCheckerAndCleaner) {
                logger.debug("Start checking buffers and cleanup futures: Before cleanup = {}.", (Object)this.futures.size());
                this.futures.removeIf(this.filter);
                this.clickHouseSinkBuffers.forEach(ClickHouseSinkBuffer::tryAddToQueue);
            }
        };
    }

    private static Predicate<CompletableFuture<Boolean>> getFuturesFilter(boolean ignoringExceptionEnabled) {
        if (ignoringExceptionEnabled) {
            return CompletableFuture::isDone;
        }
        return f -> f.isDone() && !f.isCompletedExceptionally();
    }

    @Override
    public void close() throws Exception {
        logger.info("ClickHouseSinkScheduledCheckerAndCleaner is shutting down.");
        ThreadUtil.shutdownExecutorService(this.scheduledExecutorService);
        logger.info("ClickHouseSinkScheduledCheckerAndCleaner shutdown complete.");
    }
}

