/*
 * Decompiled with CFR 0.152.
 */
package ru.ivi.opensource.flinkclickhousesink.applied;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.ivi.opensource.flinkclickhousesink.applied.ClickhouseSinkBuffer;
import ru.ivi.opensource.flinkclickhousesink.model.ClickhouseSinkCommonParams;
import ru.ivi.opensource.flinkclickhousesink.util.ThreadUtil;

public class ClickhouseSinkScheduledChecker
implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(ClickhouseSinkScheduledChecker.class);
    private final ScheduledExecutorService scheduledExecutorService;
    private final List<ClickhouseSinkBuffer> clickhouseSinkBuffers = new ArrayList<ClickhouseSinkBuffer>();
    private final ClickhouseSinkCommonParams params;

    public ClickhouseSinkScheduledChecker(ClickhouseSinkCommonParams props) {
        this.params = props;
        ThreadFactory factory = ThreadUtil.threadFactory("clickhouse-writer-checker");
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(factory);
        this.scheduledExecutorService.scheduleWithFixedDelay(this.getTask(), this.params.getTimeout(), this.params.getTimeout(), TimeUnit.SECONDS);
        logger.info("Build Sink scheduled checker, timeout (sec) = {}", (Object)this.params.getTimeout());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addSinkBuffer(ClickhouseSinkBuffer clickhouseSinkBuffer) {
        ClickhouseSinkScheduledChecker clickhouseSinkScheduledChecker = this;
        synchronized (clickhouseSinkScheduledChecker) {
            this.clickhouseSinkBuffers.add(clickhouseSinkBuffer);
        }
        logger.debug("Add sinkBuffer, target table = {}", (Object)clickhouseSinkBuffer.getTargetTable());
    }

    private Runnable getTask() {
        return () -> {
            ClickhouseSinkScheduledChecker clickhouseSinkScheduledChecker = this;
            synchronized (clickhouseSinkScheduledChecker) {
                logger.debug("Start checking buffers. Current count of buffers = {}", (Object)this.clickhouseSinkBuffers.size());
                this.clickhouseSinkBuffers.forEach(ClickhouseSinkBuffer::tryAddToQueue);
            }
        };
    }

    @Override
    public void close() throws Exception {
        ThreadUtil.shutdownExecutorService(this.scheduledExecutorService);
    }
}

