/*
 * Decompiled with CFR 0.152.
 */
package io.seata.rm.datasource;

import com.google.common.collect.Lists;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.config.ConfigurationFactory;
import io.seata.core.model.BranchStatus;
import io.seata.rm.datasource.DataSourceManager;
import io.seata.rm.datasource.DataSourceProxy;
import io.seata.rm.datasource.undo.UndoLogManager;
import io.seata.rm.datasource.undo.UndoLogManagerFactory;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncWorker {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncWorker.class);
    private static final int DEFAULT_RESOURCE_SIZE = 16;
    private static final int UNDOLOG_DELETE_LIMIT_SIZE = 1000;
    private static final int ASYNC_COMMIT_BUFFER_LIMIT = ConfigurationFactory.getInstance().getInt("client.rm.asyncCommitBufferLimit", 10000);
    private final DataSourceManager dataSourceManager;
    private final BlockingQueue<Phase2Context> commitQueue;
    private final ScheduledExecutorService scheduledExecutor;

    public AsyncWorker(DataSourceManager dataSourceManager) {
        this.dataSourceManager = dataSourceManager;
        LOGGER.info("Async Commit Buffer Limit: {}", (Object)ASYNC_COMMIT_BUFFER_LIMIT);
        this.commitQueue = new LinkedBlockingQueue<Phase2Context>(ASYNC_COMMIT_BUFFER_LIMIT);
        NamedThreadFactory threadFactory = new NamedThreadFactory("AsyncWorker", 2, true);
        this.scheduledExecutor = new ScheduledThreadPoolExecutor(2, (ThreadFactory)threadFactory);
        this.scheduledExecutor.scheduleAtFixedRate(this::doBranchCommitSafely, 10L, 1000L, TimeUnit.MILLISECONDS);
    }

    public BranchStatus branchCommit(String xid, long branchId, String resourceId) {
        Phase2Context context = new Phase2Context(xid, branchId, resourceId);
        this.addToCommitQueue(context);
        return BranchStatus.PhaseTwo_Committed;
    }

    private void addToCommitQueue(Phase2Context context) {
        if (this.commitQueue.offer(context)) {
            return;
        }
        CompletableFuture.runAsync(this::doBranchCommitSafely, this.scheduledExecutor).thenRun(() -> this.addToCommitQueue(context));
    }

    void doBranchCommitSafely() {
        try {
            this.doBranchCommit();
        }
        catch (Throwable e) {
            LOGGER.error("Exception occur when doing branch commit", e);
        }
    }

    private void doBranchCommit() {
        if (this.commitQueue.isEmpty()) {
            return;
        }
        LinkedList<Phase2Context> allContexts = new LinkedList<Phase2Context>();
        this.commitQueue.drainTo(allContexts);
        Map<String, List<Phase2Context>> groupedContexts = this.groupedByResourceId(allContexts);
        groupedContexts.forEach(this::dealWithGroupedContexts);
    }

    Map<String, List<Phase2Context>> groupedByResourceId(List<Phase2Context> contexts) {
        HashMap<String, List<Phase2Context>> groupedContexts = new HashMap<String, List<Phase2Context>>(16);
        contexts.forEach(context -> {
            List group = groupedContexts.computeIfAbsent(context.resourceId, key -> new LinkedList());
            group.add(context);
        });
        return groupedContexts;
    }

    private void dealWithGroupedContexts(String resourceId, List<Phase2Context> contexts) {
        Connection conn;
        DataSourceProxy dataSourceProxy = this.dataSourceManager.get(resourceId);
        if (dataSourceProxy == null) {
            LOGGER.warn("Failed to find resource for {}", (Object)resourceId);
            return;
        }
        try {
            conn = dataSourceProxy.getPlainConnection();
        }
        catch (SQLException sqle) {
            LOGGER.error("Failed to get connection for async committing on {}", (Object)resourceId, (Object)sqle);
            return;
        }
        UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType());
        List splitByLimit = Lists.partition(contexts, (int)1000);
        splitByLimit.forEach(partition -> this.deleteUndoLog(conn, undoLogManager, (List<Phase2Context>)partition));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void deleteUndoLog(Connection conn, UndoLogManager undoLogManager, List<Phase2Context> contexts) {
        LinkedHashSet<String> xids = new LinkedHashSet<String>(contexts.size());
        LinkedHashSet<Long> branchIds = new LinkedHashSet<Long>(contexts.size());
        contexts.forEach(context -> {
            xids.add(context.xid);
            branchIds.add(context.branchId);
        });
        try {
            undoLogManager.batchDeleteUndoLog(xids, branchIds, conn);
            if (!conn.getAutoCommit()) {
                conn.commit();
            }
        }
        catch (SQLException e) {
            LOGGER.error("Failed to batch delete undo log", (Throwable)e);
            try {
                conn.rollback();
            }
            catch (SQLException rollbackEx) {
                LOGGER.error("Failed to rollback JDBC resource after deleting undo log failed", (Throwable)rollbackEx);
            }
        }
        finally {
            try {
                conn.close();
            }
            catch (SQLException closeEx) {
                LOGGER.error("Failed to close JDBC resource after deleting undo log", (Throwable)closeEx);
            }
        }
    }

    static class Phase2Context {
        String xid;
        long branchId;
        String resourceId;

        public Phase2Context(String xid, long branchId, String resourceId) {
            this.xid = xid;
            this.branchId = branchId;
            this.resourceId = resourceId;
        }
    }
}

