package org.terracotta.async;

import com.terracotta.toolkit.async.ClusterInfoUtil;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.terracotta.async.exceptions.ExistingRunningThreadException;
import org.terracotta.cluster.ClusterEvent;
import org.terracotta.cluster.ClusterInfo;
import org.terracotta.cluster.ClusterListener;
import org.terracotta.cluster.ClusterNode;
import org.terracotta.collections.ConcurrentDistributedMap;
import org.terracotta.collections.ConcurrentDistributedSet;
import org.terracotta.locking.LockType;
import org.terracotta.locking.TerracottaReadWriteLock;
import org.terracotta.locking.strategy.HashcodeLockStrategy;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:TIMs/terracotta-toolkit-1.5-4.6.0.jar:org/terracotta/async/ProcessingBucketGroup.class */
public class ProcessingBucketGroup<I> implements ClusterListener {
    private static final Logger LOGGER = Logger.getLogger(ProcessingBucketGroup.class.getName());
    private static final String NO_CLUSTER_NODE_ID = "Client[1]";
    private final transient ThreadGroup threadGroup;
    private volatile transient ClusterInfo cluster;
    private final Map<String, Set<ProcessingBucket<I>>> buckets;
    private final TerracottaReadWriteLock groupLock;
    private final Lock groupWriteLock;
    private final Lock groupReadLock;
    private final StealPolicy<I> policy;
    private final AsyncConfig config;
    private Collection<ProcessingBucket<I>> deadBuckets;

    private void onLoad() {
        this.cluster = ClusterInfoUtil.determineDsoClusterInstance(this.cluster);
    }

    public ProcessingBucketGroup(AsyncConfig asyncConfig, StealPolicy<I> stealPolicy) {
        this(null, asyncConfig, stealPolicy);
    }

    public ProcessingBucketGroup(ClusterInfo clusterInfo, AsyncConfig asyncConfig, StealPolicy<I> stealPolicy) {
        this.threadGroup = new ThreadGroup("ProcessingBucketGroup - thread group");
        this.deadBuckets = null;
        this.cluster = ClusterInfoUtil.determineDsoClusterInstance(clusterInfo);
        this.config = asyncConfig;
        this.policy = stealPolicy;
        this.buckets = newMap();
        this.groupLock = new TerracottaReadWriteLock(asyncConfig.isSynchronousWrite());
        this.groupWriteLock = this.groupLock.writeLock();
        this.groupReadLock = this.groupLock.readLock();
    }

    private static <K, V> Map<K, V> newMap() {
        return new ConcurrentDistributedMap(LockType.WRITE, new HashcodeLockStrategy());
    }

    public AsyncConfig getConfig() {
        return this.config;
    }

    public ProcessingBucket<I> createNewBucket(ItemProcessor<I> itemProcessor) {
        return createNewBucket(itemProcessor, null);
    }

    public ProcessingBucket<I> createNewBucket(ItemProcessor<I> itemProcessor, AsyncErrorHandler asyncErrorHandler) {
        return new ProcessingBucket<>(this, this.cluster, itemProcessor, asyncErrorHandler);
    }

    public void start(ProcessingBucket<I> processingBucket) throws ExistingRunningThreadException {
        if (null == processingBucket) {
            throw new IllegalArgumentException("bucket can't be null");
        }
        AsyncCoordinator.UNFLUSHABLE_STATE.add(this);
        this.groupWriteLock.lock();
        try {
            if (this.cluster != null) {
                this.cluster.addClusterListener(this);
            }
            Set<ProcessingBucket<I>> set = this.buckets.get(getCurrentNodeId());
            if (null == set) {
                set = new ConcurrentDistributedSet();
                this.buckets.put(getCurrentNodeId(), set);
            } else if (set.contains(processingBucket)) {
                throw new IllegalArgumentException("bucket already part of the group");
            }
            set.add(processingBucket);
            processingBucket.start("ProcessingBucket " + this.buckets.size());
            this.groupWriteLock.unlock();
        } catch (Throwable th) {
            this.groupWriteLock.unlock();
            throw th;
        }
    }

    private String getCurrentNodeId() {
        return null == this.cluster ? NO_CLUSTER_NODE_ID : this.cluster.waitUntilNodeJoinsCluster().getId();
    }

    public void stop() {
        this.groupWriteLock.lock();
        try {
            Iterator<Set<ProcessingBucket<I>>> it = this.buckets.values().iterator();
            while (it.hasNext()) {
                Iterator<ProcessingBucket<I>> it2 = it.next().iterator();
                while (it2.hasNext()) {
                    it2.next().stop();
                }
            }
            if (this.cluster != null) {
                this.cluster.removeClusterListener(this);
            }
        } finally {
            this.groupWriteLock.unlock();
        }
    }

    int recoverDeadBuckets(String str) {
        this.groupWriteLock.lock();
        try {
            Set<ProcessingBucket<I>> set = this.buckets.get(str);
            if (set == null || set.size() <= 0) {
                this.buckets.remove(str);
                this.groupWriteLock.unlock();
                return 0;
            }
            if (null == this.deadBuckets) {
                this.deadBuckets = new ConcurrentDistributedSet();
            }
            if (LOGGER.isLoggable(Level.FINE)) {
                for (ProcessingBucket<I> processingBucket : set) {
                    System.out.println("Registering dead bucket with " + processingBucket.getWaitCount() + " waiting items and " + processingBucket.getQuarantinedCount() + " quarantined items");
                }
            }
            this.deadBuckets.addAll(set);
            this.buckets.remove(str);
            int size = set.size();
            this.groupWriteLock.unlock();
            return size;
        } catch (Throwable th) {
            this.groupWriteLock.unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ThreadGroup getThreadGroup() {
        return this.threadGroup;
    }

    Set<String> determineDeadNodes() {
        HashSet hashSet = null;
        if (this.cluster != null) {
            ArrayList arrayList = new ArrayList();
            Iterator<ClusterNode> it = this.cluster.getClusterTopology().getNodes().iterator();
            while (it.hasNext()) {
                arrayList.add(it.next().getId());
            }
            for (String str : this.buckets.keySet()) {
                if (!arrayList.contains(str)) {
                    if (null == hashSet) {
                        hashSet = new HashSet();
                    }
                    hashSet.add(str);
                }
            }
        }
        return hashSet;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Code restructure failed: missing block: B:8:0x0024, code lost:
    
        if (0 == r0.size()) goto L9;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public int quarantineItemsFromDeadBuckets(org.terracotta.async.ProcessingBucket<I> r5) {
        /*
            Method dump skipped, instructions count: 426
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.terracotta.async.ProcessingBucketGroup.quarantineItemsFromDeadBuckets(org.terracotta.async.ProcessingBucket):int");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int tryToStealFromOthers(ProcessingBucket<I> processingBucket) {
        if (null == processingBucket) {
            throw new IllegalArgumentException("thief can't be null");
        }
        if (!this.config.isStealingEnabled()) {
            return 0;
        }
        boolean z = false;
        if (this.groupReadLock.tryLock()) {
            try {
                Iterator<Set<ProcessingBucket<I>>> it = this.buckets.values().iterator();
                while (it.hasNext()) {
                    for (ProcessingBucket<I> processingBucket2 : it.next()) {
                        if (processingBucket2 != processingBucket) {
                            if (LOGGER.isLoggable(Level.FINER)) {
                                LOGGER.finer("tryToStealFromOthers() : " + processingBucket + " trying to steal from " + processingBucket2);
                            }
                            int tryToStealFromMe = processingBucket2.tryToStealFromMe(this.policy, processingBucket);
                            if (tryToStealFromMe > 0) {
                                this.groupReadLock.unlock();
                                return tryToStealFromMe;
                            }
                            if (0 == tryToStealFromMe) {
                                z = true;
                            }
                        }
                    }
                }
            } finally {
                this.groupReadLock.unlock();
            }
        }
        if (z) {
            return 0;
        }
        processingBucket.fireStealingDelayed();
        return 0;
    }

    @Override // org.terracotta.cluster.ClusterListener
    public void nodeJoined(ClusterEvent clusterEvent) {
    }

    @Override // org.terracotta.cluster.ClusterListener
    public void nodeLeft(ClusterEvent clusterEvent) {
        this.groupWriteLock.lock();
        try {
            int recoverDeadBuckets = recoverDeadBuckets(clusterEvent.getNode().getId());
            if (LOGGER.isLoggable(Level.FINE)) {
                LOGGER.fine("Node '" + clusterEvent.getNode() + "' left and " + recoverDeadBuckets + " dead buckets are picked up");
            }
        } finally {
            this.groupWriteLock.unlock();
        }
    }

    @Override // org.terracotta.cluster.ClusterListener
    public void operationsDisabled(ClusterEvent clusterEvent) {
    }

    @Override // org.terracotta.cluster.ClusterListener
    public void operationsEnabled(ClusterEvent clusterEvent) {
    }
}
