/*
 * Decompiled with CFR 0.152.
 */
package azkaban.executor;

import azkaban.db.DatabaseOperator;
import azkaban.db.EncodingType;
import azkaban.db.SQLTransaction;
import azkaban.executor.ExecutableFlow;
import azkaban.executor.ExecutionReference;
import azkaban.executor.ExecutorManagerException;
import azkaban.executor.Status;
import azkaban.utils.GZIPUtils;
import azkaban.utils.JSONUtils;
import azkaban.utils.Pair;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.dbutils.ResultSetHandler;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class ExecutionFlowDao {
    private static final Logger logger = LoggerFactory.getLogger(ExecutionFlowDao.class);
    private final DatabaseOperator dbOperator;

    @Inject
    public ExecutionFlowDao(DatabaseOperator dbOperator) {
        this.dbOperator = dbOperator;
    }

    public synchronized void uploadExecutableFlow(ExecutableFlow flow) throws ExecutorManagerException {
        String useExecutorParam = flow.getExecutionOptions().getFlowParameters().get("useExecutor");
        String executorId = StringUtils.isNotEmpty((String)useExecutorParam) ? useExecutorParam : null;
        String INSERT_EXECUTABLE_FLOW = "INSERT INTO execution_flows (project_id, flow_id, version, status, submit_time, submit_user, update_time, flow_type, use_executor, repeat_id) values (?,?,?,?,?,?,?,?,?, ?)";
        long submitTime = System.currentTimeMillis();
        flow.setStatus(Status.PREPARING);
        flow.setSubmitTime(submitTime);
        SQLTransaction insertAndGetLastID = transOperator -> {
            transOperator.update("INSERT INTO execution_flows (project_id, flow_id, version, status, submit_time, submit_user, update_time, flow_type, use_executor, repeat_id) values (?,?,?,?,?,?,?,?,?, ?)", new Object[]{flow.getProjectId(), flow.getFlowId(), flow.getVersion(), Status.PREPARING.getNumVal(), submitTime, flow.getSubmitUser(), submitTime, flow.getFlowType(), executorId, flow.getRepeatId()});
            transOperator.getConnection().commit();
            return transOperator.getLastInsertId();
        };
        try {
            long id = (Long)this.dbOperator.transaction(insertAndGetLastID);
            logger.info("Flow given " + flow.getFlowId() + " given id " + id);
            flow.setExecutionId((int)id);
            this.updateExecutableFlow(flow);
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error creating execution.", e);
        }
    }

    List<ExecutableFlow> fetchFlowHistory(int skip, int num) throws ExecutorManagerException {
        try {
            return (List)this.dbOperator.query(FetchExecutableFlows.FETCH_ALL_EXECUTABLE_FLOW_HISTORY, (ResultSetHandler)new FetchExecutableFlows(), new Object[]{skip, num});
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching flow History", e);
        }
    }

    List<ExecutableFlow> fetchMaintainedFlowHistory(String username, List<Integer> projectIds, int skip, int num) throws ExecutorManagerException {
        try {
            String projectIdsStr = projectIds.stream().map(Objects::toString).collect(Collectors.joining(",", "(", ")"));
            String querySQL = "SELECT ef.exec_id, ef.enc_type, ef.flow_data FROM execution_flows ef WHERE ef.project_id IN " + projectIdsStr + " ORDER BY exec_id DESC LIMIT ?, ?";
            return (List)this.dbOperator.query(querySQL, (ResultSetHandler)new FetchExecutableFlows(), new Object[]{skip, num});
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching flow History", e);
        }
    }

    List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num) throws ExecutorManagerException {
        try {
            return (List)this.dbOperator.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_HISTORY, (ResultSetHandler)new FetchExecutableFlows(), new Object[]{projectId, flowId, skip, num});
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching flow history", e);
        }
    }

    public List<Pair<ExecutionReference, ExecutableFlow>> fetchQueuedFlows() throws ExecutorManagerException {
        try {
            return (List)this.dbOperator.query(FetchQueuedExecutableFlows.FETCH_QUEUED_EXECUTABLE_FLOW, (ResultSetHandler)new FetchQueuedExecutableFlows());
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching active flows", e);
        }
    }

    public List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, long startTime) throws ExecutorManagerException {
        try {
            return (List)this.dbOperator.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_BY_START_TIME, (ResultSetHandler)new FetchExecutableFlows(), new Object[]{projectId, flowId, startTime});
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching historic flows", e);
        }
    }

    List<ExecutableFlow> fetchFlowHistory(int projectId, String flowId, int skip, int num, Status status) throws ExecutorManagerException {
        try {
            return (List)this.dbOperator.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_BY_STATUS, (ResultSetHandler)new FetchExecutableFlows(), new Object[]{projectId, flowId, status.getNumVal(), skip, num});
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching active flows", e);
        }
    }

    List<ExecutableFlow> fetchRecentlyFinishedFlows(Duration maxAge) throws ExecutorManagerException {
        try {
            return (List)this.dbOperator.query("SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE end_time > ? AND status IN (?, ?, ?)", (ResultSetHandler)new FetchRecentlyFinishedFlows(), new Object[]{System.currentTimeMillis() - maxAge.toMillis(), Status.SUCCEEDED.getNumVal(), Status.KILLED.getNumVal(), Status.FAILED.getNumVal()});
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching recently finished flows", e);
        }
    }

    List<ExecutableFlow> fetchFlowHistory(String projContain, String flowContains, String execIdContain, String userNameContains, String status, long startTime, long endTime, int skip, int num, int flowType) throws ExecutorManagerException {
        String[] statusArray;
        StringBuilder querySql = new StringBuilder("SELECT exec_id, ef.enc_type, flow_data FROM execution_flows ef JOIN projects p ON ef.project_id = p.id");
        ArrayList<Object> params = new ArrayList<Object>();
        boolean first = true;
        if (StringUtils.isNotBlank((String)projContain)) {
            first = this.wrapperSqlParam(first, projContain, querySql, "name", "LIKE", params);
        }
        if (StringUtils.isNotBlank((String)flowContains)) {
            first = this.wrapperSqlParam(first, flowContains, querySql, "flow_id", "LIKE", params);
        }
        if (StringUtils.isNotBlank((String)execIdContain)) {
            first = this.wrapperSqlParam(first, execIdContain, querySql, "exec_id", "LIKE", params);
        }
        if (StringUtils.isNotBlank((String)userNameContains)) {
            first = this.wrapperSqlParam(first, userNameContains, querySql, "submit_user", "LIKE", params);
        }
        if (!"0".equals((statusArray = status.split(","))[0])) {
            first = this.wrapperMultipleStatusSql(first, statusArray, querySql, "status", "in");
        }
        if (startTime > 0L) {
            first = this.wrapperSqlParam(first, "" + startTime, querySql, "start_time", ">", params);
        }
        if (endTime > 0L) {
            first = this.wrapperSqlParam(first, "" + endTime, querySql, "end_time", "<", params);
        }
        if (flowType != -1) {
            this.wrapperSqlParam(first, "" + flowType, querySql, "flow_type", "=", params);
        }
        if (skip > -1 && num > 0) {
            querySql.append(" ORDER BY exec_id DESC LIMIT ?, ?");
            params.add(skip);
            params.add(num);
        }
        try {
            return (List)this.dbOperator.query(querySql.toString(), (ResultSetHandler)new FetchExecutableFlows(), params.toArray());
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching active flows", e);
        }
    }

    public boolean wrapperMultipleStatusSql(boolean firstParam, String[] statusArray, StringBuilder querySql, String dbColumnName, String action) {
        if (firstParam) {
            querySql.append(" WHERE ");
            firstParam = false;
        } else {
            querySql.append(" AND ");
        }
        StringBuilder statusBuilder = new StringBuilder();
        String joinStatus = String.join((CharSequence)",", statusArray);
        statusBuilder.append(dbColumnName).append(" ").append(action).append("(").append(joinStatus).append(") ");
        querySql.append((CharSequence)statusBuilder);
        return firstParam;
    }

    List<ExecutableFlow> fetchMaintainedFlowHistory(String projContain, String flowContains, String execIdContain, String userNameContains, String status, long startTime, long endTime, int skip, int num, int flowType, String username, List<Integer> projectIds) throws ExecutorManagerException {
        String[] statusArray;
        String projectIdsStr = projectIds.stream().map(Object::toString).collect(Collectors.joining(",", "(", ")"));
        StringBuilder querySql = new StringBuilder("SELECT ef.exec_id, ef.enc_type, ef.flow_data FROM execution_flows ef, projects p WHERE ef.project_id = p.id AND ef.project_id IN " + projectIdsStr + " ");
        ArrayList<Object> params = new ArrayList<Object>();
        boolean first = false;
        if (StringUtils.isNotBlank((String)projContain)) {
            first = this.wrapperSqlParam(first, projContain, querySql, "p.name", "LIKE", params);
        }
        if (StringUtils.isNotBlank((String)flowContains)) {
            first = this.wrapperSqlParam(first, flowContains, querySql, "flow_id", "LIKE", params);
        }
        if (StringUtils.isNotBlank((String)execIdContain)) {
            first = this.wrapperSqlParam(first, execIdContain, querySql, "exec_id", "LIKE", params);
        }
        if (StringUtils.isNotBlank((String)userNameContains)) {
            first = this.wrapperSqlParam(first, userNameContains, querySql, "submit_user", "LIKE", params);
        }
        if (!"0".equals((statusArray = status.split(","))[0])) {
            first = this.wrapperMultipleStatusSql(first, statusArray, querySql, "status", "in");
        }
        if (startTime > 0L) {
            first = this.wrapperSqlParam(first, "" + startTime, querySql, "start_time", ">", params);
        }
        if (endTime > 0L) {
            first = this.wrapperSqlParam(first, "" + endTime, querySql, "end_time", "<", params);
        }
        if (flowType != -1) {
            this.wrapperSqlParam(first, "" + flowType, querySql, "flow_type", "=", params);
        }
        if (skip > -1 && num > 0) {
            querySql.append(" ORDER BY exec_id DESC LIMIT ?, ?");
            params.add(skip);
            params.add(num);
        }
        try {
            return (List)this.dbOperator.query(querySql.toString(), (ResultSetHandler)new FetchExecutableFlows(), params.toArray());
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching active flows", e);
        }
    }

    private boolean wrapperSqlParam(boolean firstParam, String param, StringBuilder querySql, String dbColumnName, String action, List<Object> params) {
        if (firstParam) {
            querySql.append(" WHERE ");
            firstParam = false;
        } else {
            querySql.append(" AND ");
        }
        querySql.append(" ").append(dbColumnName).append(" ").append(action).append(" ?");
        if (action.equalsIgnoreCase("like")) {
            params.add('%' + param + '%');
        } else {
            params.add(param);
        }
        return firstParam;
    }

    List<ExecutableFlow> fetchFlowHistoryQuickSearch(String flowContains, String userNameContains, int skip, int num) throws ExecutorManagerException {
        String query;
        ArrayList<Object> params = new ArrayList<Object>();
        if (null != userNameContains) {
            query = "SELECT ef.exec_id, ef.enc_type, ef.flow_data FROM execution_flows ef, projects p, project_permissions pp WHERE ef.project_id = p.id AND ef.project_id = pp.project_id AND pp.name=? ";
            params.add(userNameContains);
        } else {
            query = "SELECT exec_id, ef.enc_type, flow_data FROM execution_flows ef JOIN projects p ON ef.project_id = p.id";
        }
        if (flowContains != null && !flowContains.isEmpty()) {
            query = query + " AND ";
            query = query + " (ef.exec_id LIKE ? OR ef.flow_id LIKE ? OR p.name LIKE ?) ";
            params.add('%' + flowContains + '%');
            params.add('%' + flowContains + '%');
            params.add('%' + flowContains + '%');
        }
        if (skip > -1 && num > 0) {
            query = query + " ORDER BY exec_id DESC LIMIT ?, ? ";
            params.add(skip);
            params.add(num);
        }
        try {
            return (List)this.dbOperator.query(query, (ResultSetHandler)new FetchExecutableFlows(), params.toArray());
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching active flows", e);
        }
    }

    List<ExecutableFlow> fetchFlowHistoryQuickSearch(String flowContains, String username, int skip, int num, List<Integer> projectIds) throws ExecutorManagerException {
        ArrayList<Object> params = new ArrayList<Object>();
        String projectIdsStr = projectIds.stream().map(Objects::toString).collect(Collectors.joining(",", "(", ")"));
        String query = "SELECT ef.exec_id, ef.enc_type, ef.flow_data, ef.project_id as project_id FROM execution_flows ef, projects p WHERE ef.project_id = p.id AND ef.project_id IN " + projectIdsStr + " ";
        if (flowContains != null && !flowContains.isEmpty()) {
            query = query + " AND ";
            query = query + " (ef.exec_id LIKE ? OR ef.flow_id LIKE ? OR p.name LIKE ?) ";
            params.add('%' + flowContains + '%');
            params.add('%' + flowContains + '%');
            params.add('%' + flowContains + '%');
        }
        if (skip > -1 && num > 0) {
            query = query + " ORDER BY exec_id DESC LIMIT ?, ? ";
            params.add(skip);
            params.add(num);
        }
        try {
            return (List)this.dbOperator.query(query, (ResultSetHandler)new FetchExecutableFlows(), params.toArray());
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching active flows", e);
        }
    }

    List<ExecutableFlow> fetchFlowAllHistory(int projectId, String flowId, String user) throws ExecutorManagerException {
        String querySql = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE project_id=? AND flow_id=? ";
        ArrayList<Object> params = new ArrayList<Object>();
        try {
            params.add(projectId);
            params.add(flowId);
            if (user != null && !user.isEmpty()) {
                querySql = querySql + "AND ";
                querySql = querySql + "submit_user = ? ";
                params.add(user);
            }
            querySql = querySql + "ORDER BY exec_id ";
            return (List)this.dbOperator.query(querySql, (ResultSetHandler)new FetchExecutableFlows(), params.toArray());
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching flow history", e);
        }
    }

    void updateExecutableFlow(ExecutableFlow flow) throws ExecutorManagerException {
        logger.info("current flow status is {}.", (Object)flow.getStatus());
        this.updateExecutableFlow(flow, EncodingType.GZIP);
    }

    private void updateExecutableFlow(ExecutableFlow flow, EncodingType encType) throws ExecutorManagerException {
        String UPDATE_EXECUTABLE_FLOW_DATA = "UPDATE execution_flows SET status=?,update_time=?,start_time=?,end_time=?,enc_type=?,flow_data=?,flow_type=? WHERE exec_id=?";
        String json = JSONUtils.toJSON(flow.toObject());
        byte[] data = null;
        try {
            byte[] stringData;
            data = stringData = json.getBytes("UTF-8");
            if (encType == EncodingType.GZIP) {
                data = GZIPUtils.gzipBytes(stringData);
            }
        }
        catch (IOException e) {
            throw new ExecutorManagerException("Error encoding the execution flow.");
        }
        try {
            this.dbOperator.update("UPDATE execution_flows SET status=?,update_time=?,start_time=?,end_time=?,enc_type=?,flow_data=?,flow_type=? WHERE exec_id=?", new Object[]{flow.getStatus().getNumVal(), flow.getUpdateTime(), flow.getStartTime(), flow.getEndTime(), encType.getNumVal(), data, flow.getFlowType(), flow.getExecutionId()});
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error updating flow.", e);
        }
    }

    public ExecutableFlow fetchExecutableFlow(int execId) throws ExecutorManagerException {
        FetchExecutableFlows flowHandler = new FetchExecutableFlows();
        try {
            List properties = (List)this.dbOperator.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW, (ResultSetHandler)flowHandler, new Object[]{execId});
            if (properties.isEmpty()) {
                return null;
            }
            return (ExecutableFlow)properties.get(0);
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching flow id " + execId, e);
        }
    }

    public List<ExecutableFlow> fetchExecutableFlowByRepeatId(int repeatId) throws ExecutorManagerException {
        FetchExecutableFlows flowHandler = new FetchExecutableFlows();
        try {
            List executableFlows = (List)this.dbOperator.query(FetchExecutableFlows.FETCH_EXECUTABLE_FLOW_BY_REPEAT_ID, (ResultSetHandler)flowHandler, new Object[]{repeatId});
            return executableFlows;
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching flow by repeatId: " + repeatId, e);
        }
    }

    public void unsetExecutorIdForExecution(int executionId) throws ExecutorManagerException {
        String UNSET_EXECUTOR = "UPDATE execution_flows SET executor_id = null where exec_id = ?";
        SQLTransaction unsetExecutor = transOperator -> transOperator.update("UPDATE execution_flows SET executor_id = null where exec_id = ?", new Object[]{executionId});
        try {
            this.dbOperator.transaction(unsetExecutor);
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error unsetting executor id for execution " + executionId, e);
        }
    }

    public int selectAndUpdateExecution(int executorId, boolean isActive) throws ExecutorManagerException {
        String UPDATE_EXECUTION = "UPDATE execution_flows SET executor_id = ? where exec_id = ?";
        String selectExecutionForUpdate = isActive ? SelectFromExecutionFlows.SELECT_EXECUTION_FOR_UPDATE_ACTIVE : SelectFromExecutionFlows.SELECT_EXECUTION_FOR_UPDATE_INACTIVE;
        SQLTransaction selectAndUpdateExecution = transOperator -> {
            List execIds = (List)transOperator.query(selectExecutionForUpdate, (ResultSetHandler)new SelectFromExecutionFlows(), new Object[]{executorId});
            int execId = -1;
            if (!execIds.isEmpty()) {
                execId = (Integer)execIds.get(0);
                transOperator.update("UPDATE execution_flows SET executor_id = ? where exec_id = ?", new Object[]{executorId, execId});
            }
            transOperator.getConnection().commit();
            return execId;
        };
        try {
            return (Integer)this.dbOperator.transaction(selectAndUpdateExecution);
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error selecting and updating execution with executor " + executorId, e);
        }
    }

    List<ExecutableFlow> fetchUserFlowHistory(int skip, int num, String user) throws ExecutorManagerException {
        String querySQL = "select exec_id,enc_type, flow_data from execution_flows where exec_id in ( select exec_id from ( SELECT exec_id FROM execution_flows ef  left join project_permissions pp on ef.project_id = pp.project_id  WHERE pp.name=? ORDER BY exec_id DESC LIMIT ?, ? ) a ) order by exec_id DESC";
        try {
            return (List)this.dbOperator.query(querySQL, (ResultSetHandler)new FetchExecutableFlows(), new Object[]{user, skip, num});
        }
        catch (SQLException e) {
            logger.error("\u6267\u884c\u67e5\u627e\u7528\u6237:" + user + " Flow \u5386\u53f2\u5931\u8d25 fetchUserFlowHistory Dao");
            throw new ExecutorManagerException("Error fetching User: " + user + "  flow History", e);
        }
    }

    List<ExecutableFlow> fetchUserFlowHistoryByAdvanceFilter(String projContain, String flowContains, String execIdContain, String userNameContains, String status, long startTime, long endTime, int skip, int num, int flowType) throws ExecutorManagerException {
        String[] statusArray;
        StringBuilder baseQuerySql = new StringBuilder("SELECT exec_id, ef.enc_type, flow_data FROM execution_flows ef JOIN projects p ON ef.project_id = p.id");
        ArrayList<Object> params = new ArrayList<Object>();
        boolean first = true;
        if (StringUtils.isNotBlank((String)projContain)) {
            first = this.wrapperSqlParam(first, projContain, baseQuerySql, "name", "LIKE", params);
        }
        if (StringUtils.isNotBlank((String)flowContains)) {
            first = this.wrapperSqlParam(first, flowContains, baseQuerySql, "flow_id", "LIKE", params);
        }
        if (StringUtils.isNotBlank((String)execIdContain)) {
            first = this.wrapperSqlParam(first, execIdContain, baseQuerySql, "exec_id", "LIKE", params);
        }
        if (!"0".equals((statusArray = status.split(","))[0])) {
            first = this.wrapperMultipleStatusSql(first, statusArray, baseQuerySql, "status", "in");
        }
        if (startTime > 0L) {
            first = this.wrapperSqlParam(first, "" + startTime, baseQuerySql, "start_time", ">", params);
        }
        if (endTime > 0L) {
            first = this.wrapperSqlParam(first, "" + endTime, baseQuerySql, "end_time", "<", params);
        }
        first = this.wrapperSqlParam(first, userNameContains, baseQuerySql, "submit_user", "=", params);
        if (flowType != -1) {
            this.wrapperSqlParam(first, "" + flowType, baseQuerySql, "flow_type", "=", params);
        }
        if (skip > -1 && num > 0) {
            baseQuerySql.append("  ORDER BY exec_id DESC LIMIT ?, ?");
            params.add(skip);
            params.add(num);
        }
        try {
            return (List)this.dbOperator.query(baseQuerySql.toString(), (ResultSetHandler)new FetchExecutableFlows(), params.toArray());
        }
        catch (SQLException e) {
            logger.error("\u6267\u884c\u6839\u636e\u6761\u4ef6\u67e5\u627e\u7528\u6237:" + userNameContains + " Flow \u5386\u53f2\u5931\u8d25 fetchUserFlowHistory Dao");
            throw new ExecutorManagerException("Error fetching active flows", e);
        }
    }

    List<ExecutableFlow> fetchHistoryRecoverFlows(String userNameContains) throws ExecutorManagerException {
        String query = "SELECT exec_id, enc_type, flow_data FROM execution_flows a,(select max(exec_id) as bexec_id from execution_flows WHERE flow_type = 2 or flow_type = 3 or flow_type = 4  or flow_type = 5 Group by repeat_id) b where a.exec_id = b.bexec_id";
        ArrayList<String> params = new ArrayList<String>();
        boolean first = true;
        if (userNameContains != null && !userNameContains.isEmpty()) {
            query = query + " AND ";
            query = query + " submit_user = ?";
            params.add(userNameContains);
        }
        query = query + " ORDER BY start_time DESC";
        try {
            return (List)this.dbOperator.query(query, (ResultSetHandler)new FetchExecutableFlows(), params.toArray());
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching history recover flows", e);
        }
    }

    List<ExecutableFlow> fetchHistoryRecoverFlowByRepeatId(String repeatId) throws ExecutorManagerException {
        String query = "select exec_id, enc_type, flow_data from execution_flows a,(SELECT max(exec_id) as bexec_id FROM execution_flows WHERE repeat_id = ?)b where a.exec_id = b.bexec_id ";
        ArrayList<String> params = new ArrayList<String>();
        params.add(repeatId);
        try {
            return (List)this.dbOperator.query(query, (ResultSetHandler)new FetchExecutableFlows(), params.toArray());
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching history recover flow by repeatId", e);
        }
    }

    List<ExecutableFlow> fetchHistoryRecoverFlowByFlowId(String flowId, String projectId) throws ExecutorManagerException {
        String query = "select exec_id, enc_type, flow_data from execution_flows where flow_id = ? and project_id = ? and repeat_id != '' order by start_time DESC limit 1 ";
        ArrayList<String> params = new ArrayList<String>();
        params.add(flowId);
        params.add(projectId);
        try {
            return (List)this.dbOperator.query(query, (ResultSetHandler)new FetchExecutableFlows(), params.toArray());
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching history recover flow by flowId", e);
        }
    }

    List<ExecutableFlow> fetchHistoryRecoverFlows(Map paramMap, int skip, int num) throws ExecutorManagerException {
        String userName;
        String query = "SELECT exec_id, enc_type, flow_data FROM execution_flows a,(select max(exec_id) as bexec_id from execution_flows WHERE flow_type = 2 or flow_type = 3 or flow_type = 4  or flow_type = 5 Group by repeat_id) b where a.exec_id = b.bexec_id";
        ArrayList<Object> params = new ArrayList<Object>();
        boolean first = true;
        if (!paramMap.isEmpty() && (userName = String.valueOf(paramMap.get("userName"))) != null && !userName.isEmpty()) {
            query = query + " AND submit_user = ?";
            params.add(userName);
        }
        if (skip > -1 && num > 0) {
            query = query + "  ORDER BY start_time DESC LIMIT ?, ?";
            params.add(skip);
            params.add(num);
        }
        try {
            return (List)this.dbOperator.query(query, (ResultSetHandler)new FetchExecutableFlows(), params.toArray());
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching history recover flows", e);
        }
    }

    List<ExecutableFlow> getProjectLastExecutableFlow(int projectId, String flowId) throws ExecutorManagerException {
        try {
            String query = "select exec_id, enc_type, flow_data from execution_flows where project_id = ? and flow_id = ? order by start_time DESC limit 1 ";
            return (List)this.dbOperator.query(query, (ResultSetHandler)new FetchExecutableFlows(), new Object[]{projectId, flowId});
        }
        catch (SQLException e) {
            logger.error("\u6267\u884c\u67e5\u627e\u9879\u76ee:" + projectId + " \u6700\u540e\u4e00\u6b21\u6267\u884c\u5de5\u4f5c\u6d41\u8bb0\u5f55\u5931\u8d25 getProjectLastExecutableFlow Dao");
            throw new ExecutorManagerException("Error fetching project: " + projectId + " last flow History", e);
        }
    }

    List<ExecutableFlow> fetchUserFlowHistoryByProjectIdAndFlowId(int projectId, String flowId, int skip, int num, String userName) throws ExecutorManagerException {
        try {
            return (List)this.dbOperator.query(FetchExecutableFlows.FETCH_USER_EXECUTABLE_FLOW_HISTORY_BY_PROJECT_AND_FLOW, (ResultSetHandler)new FetchExecutableFlows(), new Object[]{projectId, flowId, userName, skip, num});
        }
        catch (SQLException e) {
            throw new ExecutorManagerException("Error fetching flow history", e);
        }
    }

    List<ExecutableFlow> fetchUserFlowHistory(String loginUser, String projContain, String flowContains, String execIdContain, String userNameContains, String status, long startTime, long endTime, int skip, int num, int flowType) throws ExecutorManagerException {
        String[] statusArray;
        StringBuilder querySql = new StringBuilder("SELECT ef.exec_id, ef.enc_type, ef.flow_data FROM execution_flows ef, projects p, project_permissions pp WHERE ef.project_id = p.id AND ef.project_id = pp.project_id AND pp.name=? ");
        ArrayList<Object> params = new ArrayList<Object>();
        params.add(loginUser);
        boolean first = false;
        if (projContain != null && !projContain.isEmpty()) {
            first = this.wrapperSqlParam(first, projContain, querySql, "p.name", "like", params);
        }
        if (flowContains != null && !flowContains.isEmpty()) {
            first = this.wrapperSqlParam(first, flowContains, querySql, "flow_id", "like", params);
        }
        if (execIdContain != null && !execIdContain.isEmpty()) {
            first = this.wrapperSqlParam(first, execIdContain, querySql, "exec_id", "like", params);
        }
        if (!"0".equals((statusArray = status.split(","))[0])) {
            first = this.wrapperMultipleStatusSql(first, statusArray, querySql, "status", "in");
        }
        if (startTime > 0L) {
            first = this.wrapperSqlParam(first, "" + startTime, querySql, "start_time", ">", params);
        }
        if (endTime > 0L) {
            first = this.wrapperSqlParam(first, "" + endTime, querySql, "end_time", "<", params);
        }
        if (userNameContains != null && !userNameContains.isEmpty()) {
            first = this.wrapperSqlParam(first, userNameContains, querySql, "submit_user", "like", params);
        }
        if (flowType != -1) {
            first = this.wrapperSqlParam(first, "" + flowType, querySql, "flow_type", "=", params);
        }
        if (skip > -1 && num > 0) {
            querySql.append("  ORDER BY exec_id DESC LIMIT ?, ?");
            params.add(skip);
            params.add(num);
        }
        try {
            return (List)this.dbOperator.query(querySql.toString(), (ResultSetHandler)new FetchExecutableFlows(), params.toArray());
        }
        catch (SQLException e) {
            logger.error("\u6267\u884c\u6839\u636e\u6761\u4ef6\u67e5\u627e\u7528\u6237:" + userNameContains + " Flow \u5386\u53f2\u5931\u8d25 fetchUserFlowHistory Dao");
            throw new ExecutorManagerException("Error fetching active flows", e);
        }
    }

    List<ExecutableFlow> getTodayExecutableFlowData(String userName) throws ExecutorManagerException {
        String query = "";
        ArrayList<Object> params = new ArrayList<Object>();
        Calendar calendar = Calendar.getInstance();
        calendar.set(11, 0);
        calendar.set(12, 0);
        calendar.set(13, 0);
        calendar.set(14, 1);
        params.add(calendar.getTimeInMillis());
        calendar.set(11, 23);
        calendar.set(12, 59);
        calendar.set(13, 59);
        params.add(calendar.getTimeInMillis());
        if (null != userName) {
            query = "SELECT ef.exec_id, ef.enc_type, ef.flow_data FROM execution_flows ef WHERE (ef.submit_time >= ? AND ef.submit_time <= ?) AND ef.flow_type=3 AND ef.`project_id` IN (SELECT pp.`project_id` FROM project_permissions pp WHERE pp.`name` = ?) ;";
            params.add(userName);
        } else {
            query = "SELECT ef.exec_id, ef.enc_type, ef.flow_data FROM execution_flows ef WHERE (ef.submit_time >= ? AND ef.submit_time <= ?) AND ef.flow_type=3 ";
        }
        try {
            return (List)this.dbOperator.query(query, (ResultSetHandler)new FetchExecutableFlows(), params.toArray());
        }
        catch (SQLException e) {
            logger.error("\u6267\u884c\u6839\u636e\u6761\u4ef6\u67e5\u627e\u7528\u6237:" + userName + " \u5f53\u5929Flow\u6267\u884c\u8bb0\u5f55\u5931\u8d25 getTodayExecutableFlowData Dao");
            throw new ExecutorManagerException("Error fetching active flows", e);
        }
    }

    public Integer getTodayFlowRunTimesByFlowId(String projectId, String flowId, String usename) throws ExecutorManagerException {
        String COUNT = usename == null ? "SELECT count(exec_id) FROM execution_flows  WHERE project_id = ? AND flow_id = ? AND submit_time >= ? AND submit_time <= ? AND flow_type = 3" : "SELECT count(ef.exec_id) FROM execution_flows ef, project_permissions pp  WHERE ef.project_id = pp.project_id AND ef.project_id = ? AND ef.flow_id = ? AND ef.submit_time >= ? AND ef.submit_time <= ? AND ef.flow_type = 3";
        ArrayList<Object> params = new ArrayList<Object>();
        params.add(projectId);
        params.add(flowId);
        Calendar calendar = Calendar.getInstance();
        calendar.set(11, 0);
        calendar.set(12, 0);
        calendar.set(13, 0);
        calendar.set(14, 1);
        params.add(calendar.getTimeInMillis());
        calendar.set(11, 23);
        calendar.set(12, 59);
        calendar.set(13, 59);
        params.add(calendar.getTimeInMillis());
        if (usename != null) {
            logger.info("\u975eadmin\u7528\u6237");
            COUNT = COUNT + " AND pp.name = ?";
            params.add(usename);
        }
        try {
            return (Integer)this.dbOperator.query(COUNT, (ResultSetHandler)new FetchFlowRunTimes(), params.toArray());
        }
        catch (SQLException e) {
            logger.error("\u83b7\u53d6\u5b9a\u65f6\u8c03\u5ea6\u4efb\u52a1\u4eca\u5929\u8fd0\u884c\u6b21\u6570\u5931\u8d25");
            throw new ExecutorManagerException("Error fetching active flows", e);
        }
    }

    List<ExecutableFlow> getTodayExecutableFlowDataNew(String userName) throws ExecutorManagerException {
        String query = "";
        ArrayList<Object> params = new ArrayList<Object>();
        Calendar calendar = Calendar.getInstance();
        calendar.set(11, 0);
        calendar.set(12, 0);
        calendar.set(13, 0);
        calendar.set(14, 1);
        params.add(calendar.getTimeInMillis());
        calendar.set(11, 23);
        calendar.set(12, 59);
        calendar.set(13, 59);
        params.add(calendar.getTimeInMillis());
        if (null != userName) {
            query = "SELECT ef.exec_id, ef.enc_type, ef.flow_data FROM execution_flows ef WHERE ef.exec_id IN  (SELECT MAX(exec_id) FROM execution_flows WHERE  submit_time >= ? AND submit_time <= ? AND flow_type=3  GROUP BY project_id, flow_id) AND ef.`project_id` IN (SELECT pp.`project_id` FROM project_permissions pp WHERE pp.`name` = ?);";
            params.add(userName);
        } else {
            query = "SELECT ef.exec_id, ef.enc_type, ef.flow_data FROM execution_flows ef WHERE ef.flow_type=3 AND ef.exec_id IN (SELECT MAX(exec_id) FROM execution_flows WHERE  submit_time >= ? AND submit_time <= ? AND flow_type=3  GROUP BY project_id, flow_id);";
        }
        try {
            return (List)this.dbOperator.query(query, (ResultSetHandler)new FetchExecutableFlows(), params.toArray());
        }
        catch (SQLException e) {
            logger.error("\u6267\u884c\u6839\u636e\u6761\u4ef6\u67e5\u627e\u7528\u6237:" + userName + " \u5f53\u5929Flow\u6267\u884c\u8bb0\u5f55\u5931\u8d25 getTodayExecutableFlowData Dao");
            throw new ExecutorManagerException("Error fetching active flows", e);
        }
    }

    List<ExecutableFlow> getRealTimeExecFlowDataDao(String userName) throws ExecutorManagerException {
        String query = "";
        ArrayList<String> params = new ArrayList<String>();
        if (null != userName) {
            query = "select  exec_id,enc_type,flow_data  from execution_flows ef where ef.status in(60,70,80) and exec_id in (  select exec_id from (  SELECT  ef.exec_id  FROM  execution_flows ef,  project_permissions pp   WHERE  ef.project_id = pp.project_id  AND pp. NAME = ?  ORDER BY  ef.start_time DESC  LIMIT 10  ) a  )";
            params.add(userName);
        } else {
            query = "SELECT ef.exec_id, ef.enc_type, ef.flow_data FROM execution_flows ef  LEFT JOIN projects p ON ef.project_id = p.id  WHERE ef.status in(60,70,80)  ORDER BY ef.start_time DESC LIMIT 10 ;";
        }
        try {
            return (List)this.dbOperator.query(query, (ResultSetHandler)new FetchExecutableFlows(), params.toArray());
        }
        catch (SQLException e) {
            logger.error("\u6267\u884c\u6839\u636e\u6761\u4ef6\u67e5\u627e\u7528\u6237:" + userName + " Flow \u5386\u53f2\u5931\u8d25 fetchUserFlowHistory Dao");
            throw new ExecutorManagerException("Error fetching active flows", e);
        }
    }

    private static class FetchRecentlyFinishedFlows
    implements ResultSetHandler<List<ExecutableFlow>> {
        private static final String FETCH_RECENTLY_FINISHED_FLOW = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE end_time > ? AND status IN (?, ?, ?)";

        private FetchRecentlyFinishedFlows() {
        }

        public List<ExecutableFlow> handle(ResultSet rs) throws SQLException {
            if (!rs.next()) {
                return Collections.emptyList();
            }
            ArrayList<ExecutableFlow> execFlows = new ArrayList<ExecutableFlow>();
            do {
                int id = rs.getInt(1);
                int encodingType = rs.getInt(2);
                byte[] data = rs.getBytes(3);
                if (data == null) continue;
                EncodingType encType = EncodingType.fromInteger((int)encodingType);
                try {
                    ExecutableFlow exFlow = ExecutableFlow.createExecutableFlowFromObject(GZIPUtils.transformBytesToObject(data, encType));
                    execFlows.add(exFlow);
                }
                catch (IOException e) {
                    throw new SQLException("Error retrieving flow data " + id, e);
                }
            } while (rs.next());
            return execFlows;
        }
    }

    private static class FetchQueuedExecutableFlows
    implements ResultSetHandler<List<Pair<ExecutionReference, ExecutableFlow>>> {
        private static final String FETCH_QUEUED_EXECUTABLE_FLOW = "SELECT exec_id, enc_type, flow_data FROM execution_flows Where executor_id is NULL AND status = " + Status.PREPARING.getNumVal();

        private FetchQueuedExecutableFlows() {
        }

        public List<Pair<ExecutionReference, ExecutableFlow>> handle(ResultSet rs) throws SQLException {
            if (!rs.next()) {
                return Collections.emptyList();
            }
            ArrayList<Pair<ExecutionReference, ExecutableFlow>> execFlows = new ArrayList<Pair<ExecutionReference, ExecutableFlow>>();
            do {
                int id = rs.getInt(1);
                int encodingType = rs.getInt(2);
                byte[] data = rs.getBytes(3);
                if (data == null) {
                    logger.error("Found a flow with empty data blob exec_id: " + id);
                    continue;
                }
                EncodingType encType = EncodingType.fromInteger((int)encodingType);
                try {
                    ExecutableFlow exFlow = ExecutableFlow.createExecutableFlowFromObject(GZIPUtils.transformBytesToObject(data, encType));
                    ExecutionReference ref = new ExecutionReference(id);
                    execFlows.add(new Pair<ExecutionReference, ExecutableFlow>(ref, exFlow));
                }
                catch (IOException e) {
                    throw new SQLException("Error retrieving flow data " + id, e);
                }
            } while (rs.next());
            return execFlows;
        }
    }

    public static class FetchExecutableFlows
    implements ResultSetHandler<List<ExecutableFlow>> {
        static String FETCH_EXECUTABLE_FLOW_BY_REPEAT_ID = "SELECT ef.exec_id, ef.enc_type, ef.flow_data FROM execution_flows ef  WHERE ef.`repeat_id` = ? AND ef.status IN (20, 30, 80);";
        static String FETCH_EXECUTABLE_FLOW_BY_START_TIME = "SELECT ef.exec_id, ef.enc_type, ef.flow_data FROM execution_flows ef WHERE project_id=? AND flow_id=? AND start_time >= ? ORDER BY start_time DESC";
        static String FETCH_BASE_EXECUTABLE_FLOW_QUERY = "SELECT ef.exec_id, ef.enc_type, ef.flow_data FROM execution_flows ef";
        static String FETCH_EXECUTABLE_FLOW = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE exec_id=?";
        static String FETCH_ALL_EXECUTABLE_FLOW_HISTORY = "SELECT exec_id, enc_type, flow_data FROM execution_flows ORDER BY exec_id DESC LIMIT ?, ?";
        static String FETCH_EXECUTABLE_FLOW_HISTORY = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE project_id=? AND flow_id=? ORDER BY exec_id DESC LIMIT ?, ?";
        static String FETCH_EXECUTABLE_FLOW_BY_STATUS = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE project_id=? AND flow_id=? AND status=? ORDER BY exec_id DESC LIMIT ?, ?";
        static String FETCH_USER_EXECUTABLE_FLOW_HISTORY = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE submit_user=? ORDER BY exec_id DESC LIMIT ?, ?";
        static String FETCH_USER_EXECUTABLE_FLOW_HISTORY_BY_PROJECT_AND_FLOW = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE project_id=? AND flow_id=? AND submit_user=? ORDER BY exec_id DESC LIMIT ?, ?";
        static String FETCH_EXECUTABLE_FLOW_ALL_HISTORY = "SELECT exec_id, enc_type, flow_data FROM execution_flows WHERE project_id=? AND flow_id=? ORDER BY exec_id ";

        public List<ExecutableFlow> handle(ResultSet rs) throws SQLException {
            if (!rs.next()) {
                return Collections.emptyList();
            }
            ArrayList<ExecutableFlow> execFlows = new ArrayList<ExecutableFlow>();
            do {
                int id = rs.getInt(1);
                int encodingType = rs.getInt(2);
                byte[] data = rs.getBytes(3);
                if (data == null) continue;
                EncodingType encType = EncodingType.fromInteger((int)encodingType);
                try {
                    ExecutableFlow exFlow = ExecutableFlow.createExecutableFlowFromObject(GZIPUtils.transformBytesToObject(data, encType));
                    execFlows.add(exFlow);
                }
                catch (IOException e) {
                    throw new SQLException("Error retrieving flow data " + id, e);
                }
            } while (rs.next());
            return execFlows;
        }
    }

    public static class FetchFlowRunTimes
    implements ResultSetHandler<Integer> {
        public Integer handle(ResultSet rs) throws SQLException {
            int times = 0;
            while (rs.next()) {
                times = rs.getInt(1);
            }
            return times;
        }
    }

    public static class SelectFromExecutionFlows
    implements ResultSetHandler<List<Integer>> {
        private static final String SELECT_EXECUTION_FOR_UPDATE_FORMAT = "SELECT exec_id from execution_flows WHERE status = " + Status.PREPARING.getNumVal() + " and executor_id is NULL and flow_data is NOT NULL and %s ORDER BY submit_time ASC LIMIT 1 FOR UPDATE";
        public static final String SELECT_EXECUTION_FOR_UPDATE_ACTIVE = String.format(SELECT_EXECUTION_FOR_UPDATE_FORMAT, "(use_executor is NULL or use_executor = ?)");
        public static final String SELECT_EXECUTION_FOR_UPDATE_INACTIVE = String.format(SELECT_EXECUTION_FOR_UPDATE_FORMAT, "use_executor = ?");

        public List<Integer> handle(ResultSet rs) throws SQLException {
            if (!rs.next()) {
                return Collections.emptyList();
            }
            ArrayList<Integer> execIds = new ArrayList<Integer>();
            do {
                int execId = rs.getInt(1);
                execIds.add(execId);
            } while (rs.next());
            return execIds;
        }
    }
}

