/*
 * Decompiled with CFR 0.152.
 */
package ru.ivi.opensource.flinkclickhousesink;

import java.util.Map;
import java.util.Properties;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.ivi.opensource.flinkclickhousesink.ClickHouseSinkConverter;
import ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseSinkManager;
import ru.ivi.opensource.flinkclickhousesink.applied.Sink;

public class ClickHouseSink<T>
extends RichSinkFunction<T> {
    private static final Logger logger = LoggerFactory.getLogger(ClickHouseSink.class);
    private static final Object DUMMY_LOCK = new Object();
    private final Properties localProperties;
    private final ClickHouseSinkConverter<T> clickHouseSinkConverter;
    private static volatile transient ClickHouseSinkManager sinkManager;
    private transient Sink sink;

    public ClickHouseSink(Properties properties, ClickHouseSinkConverter<T> clickHouseSinkConverter) {
        this.localProperties = properties;
        this.clickHouseSinkConverter = clickHouseSinkConverter;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void open(Configuration config) {
        if (sinkManager == null) {
            Object object = DUMMY_LOCK;
            synchronized (object) {
                if (sinkManager == null) {
                    Map params = this.getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap();
                    sinkManager = new ClickHouseSinkManager(params);
                }
            }
        }
        this.sink = sinkManager.buildSink(this.localProperties);
    }

    public void invoke(T record, SinkFunction.Context context) {
        try {
            String recordAsCSV = this.clickHouseSinkConverter.convert(record);
            this.sink.put(recordAsCSV);
        }
        catch (Exception e) {
            logger.error("Error while sending data to ClickHouse, record = {}", record, (Object)e);
            throw new RuntimeException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws Exception {
        if (this.sink != null) {
            this.sink.close();
        }
        if (sinkManager != null && !sinkManager.isClosed()) {
            Object object = DUMMY_LOCK;
            synchronized (object) {
                if (sinkManager != null && !sinkManager.isClosed()) {
                    sinkManager.close();
                    sinkManager = null;
                }
            }
        }
        super.close();
    }
}

