/*
 * Decompiled with CFR 0.152.
 */
package ru.ivi.opensource.flinkclickhousesink;

import java.util.Map;
import java.util.Properties;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.ivi.opensource.flinkclickhousesink.applied.ClickhouseSinkBuffer;
import ru.ivi.opensource.flinkclickhousesink.applied.ClickhouseSinkManager;

public class ClickhouseSink
extends RichSinkFunction<String> {
    private static final Logger logger = LoggerFactory.getLogger(ClickhouseSink.class);
    private static final Object DUMMY_LOCK = new Object();
    private final Properties localProperties;
    private static volatile transient ClickhouseSinkManager sinkManager;
    private transient ClickhouseSinkBuffer clickhouseSinkBuffer;

    public ClickhouseSink(Properties properties) {
        this.localProperties = properties;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void open(Configuration config) {
        if (sinkManager == null) {
            Object object = DUMMY_LOCK;
            synchronized (object) {
                if (sinkManager == null) {
                    Map params = this.getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap();
                    sinkManager = new ClickhouseSinkManager(params);
                }
            }
        }
        this.clickhouseSinkBuffer = sinkManager.buildBuffer(this.localProperties);
    }

    public void invoke(String recordAsCSV) {
        try {
            this.clickhouseSinkBuffer.put(recordAsCSV);
        }
        catch (Exception e) {
            logger.error("Error while sending data to Clickhouse, record = {}", (Object)recordAsCSV, (Object)e);
            throw new RuntimeException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws Exception {
        this.clickhouseSinkBuffer.close();
        if (!sinkManager.isClosed()) {
            Object object = DUMMY_LOCK;
            synchronized (object) {
                if (!sinkManager.isClosed()) {
                    sinkManager.close();
                }
            }
        }
        super.close();
    }
}

