/*
 * Decompiled with CFR 0.152.
 */
package com.jxdinfo.hussar.ds.process.work.joinfield;

import com.alibaba.fastjson.JSONObject;
import com.jxdinfo.hussar.ds.process.component.dto.DataSetFieldDto;
import com.jxdinfo.hussar.ds.process.component.dto.JoinFieldDto;
import com.jxdinfo.hussar.ds.process.component.model.DataSetInputConfigModel;
import com.jxdinfo.hussar.ds.process.component.model.JoinFieldConfigModel;
import com.jxdinfo.hussar.ds.process.dto.widget.CanvasDetailConfigDTO;
import com.jxdinfo.hussar.ds.process.enums.JoinTypeEnum;
import com.jxdinfo.hussar.ds.process.enums.ModuleTypeEnum;
import com.jxdinfo.hussar.ds.process.util.DataTypeUtil;
import com.jxdinfo.hussar.ds.process.work.factory.DataSetExecuteFactory;
import com.jxdinfo.hussar.ds.process.work.service.DataStreamProcessWorker;
import com.jxdinfo.hussar.platform.core.utils.IdUtil;
import com.jxdinfo.hussar.platform.core.utils.JsonUtil;
import com.jxdinfo.hussar.support.exception.HussarException;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.duckdb.DuckDBConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
public class JoinFieldTaskWorker
extends DataStreamProcessWorker {
    private static final Logger log = LoggerFactory.getLogger(JoinFieldTaskWorker.class);

    @Override
    public ModuleTypeEnum getTaskType() {
        return ModuleTypeEnum.JOIN_FIELD;
    }

    @Override
    public TaskResult execute(Task task, CanvasDetailConfigDTO detailConfig, DuckDBConnection connection) {
        JoinFieldConfigModel config = (JoinFieldConfigModel)JsonUtil.parse((String)detailConfig.getConfig(), JoinFieldConfigModel.class);
        Set dataSetIdSet = config.getJoinFieldList().stream().map(JoinFieldDto::getDataSetId).collect(Collectors.toSet());
        try (Statement statement = connection.createStatement();){
            String uuid = IdUtil.simpleUUID();
            dataSetIdSet.parallelStream().forEach(dataSetId -> {
                try {
                    List allFieldDefNameList = config.getAllFiledDefNameList(dataSetId, new String[0]);
                    List allFieldList = config.getAllFiledNameList(dataSetId);
                    statement.execute(String.format("CREATE TABLE _%s_%s (%s);", dataSetId, uuid, String.join((CharSequence)",", allFieldDefNameList)));
                    DataSetInputConfigModel dataSetConfig = config.getDataSetByDataSetId(dataSetId);
                    List<JSONObject> dataSetData = DataSetExecuteFactory.get(dataSetConfig.getDataSetType()).getDataSetData(dataSetConfig);
                    String sql = String.format("INSERT INTO  _%s_%s (%s) VALUES (%s);", dataSetId, uuid, String.join((CharSequence)",", allFieldList), allFieldList.stream().map(a -> "?").collect(Collectors.joining(",")));
                    try (PreparedStatement stmt = connection.prepareStatement(sql);){
                        List fieldList = config.getFieldListByDataSetId(dataSetId);
                        long batchSize = 0L;
                        for (JSONObject jsonObject : dataSetData) {
                            ++batchSize;
                            for (int i = 0; i < allFieldList.size(); ++i) {
                                DataSetFieldDto dataSetFieldDto = (DataSetFieldDto)fieldList.get(i);
                                Object data = DataTypeUtil.getDataByKeyAndType((JSONObject)jsonObject, (String)dataSetFieldDto.getDataType(), (String)dataSetFieldDto.getFieldId());
                                stmt.setObject(i + 1, data);
                            }
                            stmt.addBatch();
                            if (batchSize % 1000L != 0L) continue;
                            stmt.executeBatch();
                            stmt.clearBatch();
                        }
                        if (batchSize % 1000L > 0L) {
                            stmt.executeBatch();
                            stmt.clearBatch();
                        }
                    }
                }
                catch (SQLException e) {
                    log.error("JoinFieldTaskWorker\u6570\u636e\u96c6\u6267\u884c\u9519\u8bef", (Throwable)e);
                    throw new HussarException("JoinFieldTaskWorker\u6570\u636e\u96c6\u6267\u884c\u9519\u8bef", (Throwable)e);
                }
            });
            HashSet<String> addField = new HashSet<String>();
            for (DataSetInputConfigModel dataSetInputConfigModel : config.getDataSetList()) {
                for (DataSetFieldDto dataSetFieldDto : dataSetInputConfigModel.getFieldList()) {
                    addField.add(String.format("_%s_%s.%s", dataSetInputConfigModel.getDataSetId(), uuid, dataSetFieldDto.getFieldId()));
                }
            }
            Map<String, String> joinInfo = config.getJoinFieldList().stream().collect(Collectors.groupingBy(action -> String.format(" %s _%s_%s on ", JoinTypeEnum.getKeyByName((String)action.getJoinType()), action.getDataSetId(), uuid), Collectors.mapping(action -> String.format(" _%s_%s.%s = _main.%s", action.getDataSetId(), uuid, action.getSubFieldId(), action.getMainFieldIdValue()), Collectors.joining(" and "))));
            StringBuffer updateSql = new StringBuffer("CREATE OR REPLACE TABLE _main AS select _main.* , ");
            updateSql.append(String.join((CharSequence)",", addField));
            updateSql.append(" from _main ");
            joinInfo.forEach((k, v) -> updateSql.append((String)k).append((String)v));
            log.info("\u6267\u884c\u8054\u8868\u67e5\u8be2\u5bf9\u5e94\u7684sql\u8bed\u53e5\uff1a{}", (Object)updateSql.toString());
            statement.execute(updateSql.toString());
            this.threadPoolTaskExecutor.execute(() -> {
                try (DuckDBConnection connection2 = this.getDuckDbConnection(task);
                     Statement statement2 = connection2.createStatement();){
                    for (String dataSetId : dataSetIdSet) {
                        statement2.execute(String.format("drop table _%s_%s", dataSetId, uuid));
                    }
                }
                catch (SQLException e) {
                    log.error("\u8054\u8868\u7ec4\u4ef6\u8bed\u53e5\u6267\u884c\u5f02\u5e38:", (Throwable)e);
                }
            });
        }
        catch (SQLException e) {
            log.error("\u8054\u8868\u7ec4\u4ef6\u8bed\u53e5\u6267\u884c\u5f02\u5e38:", (Throwable)e);
            throw new HussarException((Throwable)e);
        }
        return TaskResult.newTaskResult((TaskResult.Status)TaskResult.Status.COMPLETED);
    }
}

