/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolFactory;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotInfoWithUtilization;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.Clock;

public class DeclarativeSlotPoolBridge
extends DeclarativeSlotPoolService
implements SlotPool {
    private final Map<SlotRequestId, PendingRequest> pendingRequests;
    private final Map<SlotRequestId, AllocationID> fulfilledRequests;
    private final Time idleSlotTimeout;
    @Nullable
    private ComponentMainThreadExecutor componentMainThreadExecutor;
    private final Time batchSlotTimeout;
    private boolean isBatchSlotRequestTimeoutCheckDisabled;

    public DeclarativeSlotPoolBridge(JobID jobId, DeclarativeSlotPoolFactory declarativeSlotPoolFactory, Clock clock, Time rpcTimeout, Time idleSlotTimeout, Time batchSlotTimeout) {
        super(jobId, declarativeSlotPoolFactory, clock, idleSlotTimeout, rpcTimeout);
        this.idleSlotTimeout = idleSlotTimeout;
        this.batchSlotTimeout = (Time)Preconditions.checkNotNull((Object)batchSlotTimeout);
        this.isBatchSlotRequestTimeoutCheckDisabled = false;
        this.pendingRequests = new LinkedHashMap<SlotRequestId, PendingRequest>();
        this.fulfilledRequests = new HashMap<SlotRequestId, AllocationID>();
    }

    @Override
    public <T> Optional<T> castInto(Class<T> clazz) {
        if (clazz.isAssignableFrom(this.getClass())) {
            return Optional.of(clazz.cast(this));
        }
        return Optional.empty();
    }

    @Override
    protected void onStart(ComponentMainThreadExecutor componentMainThreadExecutor) {
        this.componentMainThreadExecutor = componentMainThreadExecutor;
        this.getDeclarativeSlotPool().registerNewSlotsListener(this::newSlotsAreAvailable);
        componentMainThreadExecutor.schedule(this::checkIdleSlotTimeout, this.idleSlotTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        componentMainThreadExecutor.schedule(this::checkBatchSlotTimeout, this.batchSlotTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
    }

    @Override
    protected void onClose() {
        FlinkException cause = new FlinkException("Closing slot pool");
        this.cancelPendingRequests(request -> true, cause);
    }

    private void cancelPendingRequests(Predicate<PendingRequest> requestPredicate, FlinkException cancelCause) {
        ResourceCounter decreasedResourceRequirements = ResourceCounter.empty();
        ArrayList<PendingRequest> pendingRequestsToFail = new ArrayList<PendingRequest>(this.pendingRequests.values());
        this.pendingRequests.clear();
        for (PendingRequest pendingRequest : pendingRequestsToFail) {
            if (requestPredicate.test(pendingRequest)) {
                pendingRequest.failRequest((Exception)((Object)cancelCause));
                decreasedResourceRequirements = decreasedResourceRequirements.add(pendingRequest.getResourceProfile(), 1);
                continue;
            }
            this.pendingRequests.put(pendingRequest.getSlotRequestId(), pendingRequest);
        }
        this.getDeclarativeSlotPool().decreaseResourceRequirementsBy(decreasedResourceRequirements);
    }

    @Override
    protected void onReleaseTaskManager(ResourceCounter previouslyFulfilledRequirement) {
        this.getDeclarativeSlotPool().decreaseResourceRequirementsBy(previouslyFulfilledRequirement);
    }

    @VisibleForTesting
    void newSlotsAreAvailable(Collection<? extends PhysicalSlot> newSlots) {
        ArrayList matchingsToFulfill = new ArrayList();
        for (PhysicalSlot physicalSlot : newSlots) {
            Optional<PendingRequest> matchingPendingRequest = this.findMatchingPendingRequest(physicalSlot);
            matchingPendingRequest.ifPresent(pendingRequest -> {
                Preconditions.checkNotNull((Object)this.pendingRequests.remove((Object)pendingRequest.getSlotRequestId()), (String)"Cannot fulfill a non existing pending slot request.");
                this.reserveFreeSlot(pendingRequest.getSlotRequestId(), newSlot.getAllocationId(), ((PendingRequest)pendingRequest).resourceProfile);
                matchingsToFulfill.add(PendingRequestSlotMatching.createFor(pendingRequest, newSlot));
            });
        }
        for (PendingRequestSlotMatching pendingRequestSlotMatching : matchingsToFulfill) {
            pendingRequestSlotMatching.fulfillPendingRequest();
        }
    }

    private void reserveFreeSlot(SlotRequestId slotRequestId, AllocationID allocationId, ResourceProfile resourceProfile) {
        this.log.debug("Reserve slot {} for slot request id {}", (Object)allocationId, (Object)slotRequestId);
        this.getDeclarativeSlotPool().reserveFreeSlot(allocationId, resourceProfile);
        this.fulfilledRequests.put(slotRequestId, allocationId);
    }

    private Optional<PendingRequest> findMatchingPendingRequest(PhysicalSlot slot) {
        ResourceProfile resourceProfile = slot.getResourceProfile();
        for (PendingRequest pendingRequest : this.pendingRequests.values()) {
            if (!resourceProfile.isMatching(pendingRequest.getResourceProfile())) continue;
            this.log.debug("Matched slot {} to pending request {}.", (Object)slot, (Object)pendingRequest);
            return Optional.of(pendingRequest);
        }
        this.log.debug("Could not match slot {} to any pending request.", (Object)slot);
        return Optional.empty();
    }

    @Override
    public Optional<PhysicalSlot> allocateAvailableSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull AllocationID allocationID, @Nonnull ResourceProfile requirementProfile) {
        this.assertRunningInMainThread();
        Preconditions.checkNotNull((Object)requirementProfile, (String)"The requiredSlotProfile must not be null.");
        this.log.debug("Reserving free slot {} for slot request id {} and profile {}.", new Object[]{allocationID, slotRequestId, requirementProfile});
        return Optional.of(this.reserveFreeSlotForResource(slotRequestId, allocationID, requirementProfile));
    }

    private PhysicalSlot reserveFreeSlotForResource(SlotRequestId slotRequestId, AllocationID allocationId, ResourceProfile requiredSlotProfile) {
        this.getDeclarativeSlotPool().increaseResourceRequirementsBy(ResourceCounter.withResource(requiredSlotProfile, 1));
        PhysicalSlot physicalSlot = this.getDeclarativeSlotPool().reserveFreeSlot(allocationId, requiredSlotProfile);
        this.fulfilledRequests.put(slotRequestId, allocationId);
        return physicalSlot;
    }

    @Override
    @Nonnull
    public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile, @Nullable Time timeout) {
        this.assertRunningInMainThread();
        this.log.debug("Request new allocated slot with slot request id {} and resource profile {}", (Object)slotRequestId, (Object)resourceProfile);
        PendingRequest pendingRequest = PendingRequest.createNormalRequest(slotRequestId, resourceProfile);
        return this.internalRequestNewSlot(pendingRequest, timeout);
    }

    @Override
    @Nonnull
    public CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile) {
        this.assertRunningInMainThread();
        this.log.debug("Request new allocated batch slot with slot request id {} and resource profile {}", (Object)slotRequestId, (Object)resourceProfile);
        PendingRequest pendingRequest = PendingRequest.createBatchRequest(slotRequestId, resourceProfile);
        return this.internalRequestNewSlot(pendingRequest, null);
    }

    private CompletableFuture<PhysicalSlot> internalRequestNewSlot(PendingRequest pendingRequest, @Nullable Time timeout) {
        this.internalRequestNewAllocatedSlot(pendingRequest);
        if (timeout == null) {
            return pendingRequest.getSlotFuture();
        }
        return FutureUtils.orTimeout(pendingRequest.getSlotFuture(), timeout.toMilliseconds(), TimeUnit.MILLISECONDS, this.componentMainThreadExecutor).whenComplete((physicalSlot, throwable) -> {
            if (throwable instanceof TimeoutException) {
                this.timeoutPendingSlotRequest(pendingRequest.getSlotRequestId());
            }
        });
    }

    private void timeoutPendingSlotRequest(SlotRequestId slotRequestId) {
        this.releaseSlot(slotRequestId, new TimeoutException("Pending slot request timed out in slot pool."));
    }

    private void internalRequestNewAllocatedSlot(PendingRequest pendingRequest) {
        this.pendingRequests.put(pendingRequest.getSlotRequestId(), pendingRequest);
        this.getDeclarativeSlotPool().increaseResourceRequirementsBy(ResourceCounter.withResource(pendingRequest.getResourceProfile(), 1));
    }

    @Override
    public Optional<ResourceID> failAllocation(AllocationID allocationID, Exception cause) {
        throw new UnsupportedOperationException("Please call failAllocation(ResourceID, AllocationID, Exception)");
    }

    @Override
    protected void onFailAllocation(ResourceCounter previouslyFulfilledRequirements) {
        this.getDeclarativeSlotPool().decreaseResourceRequirementsBy(previouslyFulfilledRequirements);
    }

    @Override
    public void releaseSlot(@Nonnull SlotRequestId slotRequestId, @Nullable Throwable cause) {
        this.log.debug("Release slot with slot request id {}", (Object)slotRequestId);
        this.assertRunningInMainThread();
        PendingRequest pendingRequest = this.pendingRequests.remove((Object)slotRequestId);
        if (pendingRequest != null) {
            this.getDeclarativeSlotPool().decreaseResourceRequirementsBy(ResourceCounter.withResource(pendingRequest.getResourceProfile(), 1));
            pendingRequest.failRequest((Exception)((Object)new FlinkException(String.format("Pending slot request with %s has been released.", new Object[]{pendingRequest.getSlotRequestId()}), cause)));
        } else {
            AllocationID allocationId = this.fulfilledRequests.remove((Object)slotRequestId);
            if (allocationId != null) {
                ResourceCounter previouslyFulfilledRequirement = this.getDeclarativeSlotPool().freeReservedSlot(allocationId, cause, this.getRelativeTimeMillis());
                this.getDeclarativeSlotPool().decreaseResourceRequirementsBy(previouslyFulfilledRequirement);
            } else {
                this.log.debug("Could not find slot which has fulfilled slot request {}. Ignoring the release operation.", (Object)slotRequestId);
            }
        }
    }

    @Override
    public void notifyNotEnoughResourcesAvailable(Collection<ResourceRequirement> acquiredResources) {
        this.assertRunningInMainThread();
        this.failPendingRequests(acquiredResources);
    }

    private void failPendingRequests(Collection<ResourceRequirement> acquiredResources) {
        Predicate<PendingRequest> predicate = request -> !this.isBatchSlotRequestTimeoutCheckDisabled || !request.isBatchRequest();
        if (this.pendingRequests.values().stream().anyMatch(predicate)) {
            this.log.warn("Could not acquire the minimum required resources, failing slot requests. Acquired: {}. Current slot pool status: {}", acquiredResources, (Object)this.getSlotServiceStatus());
            this.cancelPendingRequests(predicate, new NoResourceAvailableException("Could not acquire the minimum required resources."));
        }
    }

    @Override
    public Collection<SlotInfo> getAllocatedSlotsInformation() {
        this.assertRunningInMainThread();
        Collection<? extends SlotInfo> allSlotsInformation = this.getDeclarativeSlotPool().getAllSlotsInformation();
        Set freeSlots = this.getDeclarativeSlotPool().getFreeSlotsInformation().stream().map(SlotInfoWithUtilization::getAllocationId).collect(Collectors.toSet());
        return allSlotsInformation.stream().filter(slotInfo -> !freeSlots.contains((Object)slotInfo.getAllocationId())).collect(Collectors.toList());
    }

    @Override
    @Nonnull
    public Collection<SlotInfoWithUtilization> getAvailableSlotsInformation() {
        this.assertRunningInMainThread();
        return this.getDeclarativeSlotPool().getFreeSlotsInformation();
    }

    @Override
    public void disableBatchSlotRequestTimeoutCheck() {
        this.isBatchSlotRequestTimeoutCheckDisabled = true;
    }

    private void assertRunningInMainThread() {
        if (this.componentMainThreadExecutor == null) {
            throw new IllegalStateException("The FutureSlotPool has not been started yet.");
        }
        this.componentMainThreadExecutor.assertRunningInMainThread();
    }

    private void checkIdleSlotTimeout() {
        this.getDeclarativeSlotPool().releaseIdleSlots(this.getRelativeTimeMillis());
        if (this.componentMainThreadExecutor != null) {
            this.componentMainThreadExecutor.schedule(this::checkIdleSlotTimeout, this.idleSlotTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    private void checkBatchSlotTimeout() {
        if (this.isBatchSlotRequestTimeoutCheckDisabled) {
            return;
        }
        Collection<PendingRequest> pendingBatchRequests = this.getPendingBatchRequests();
        if (!pendingBatchRequests.isEmpty()) {
            Set<ResourceProfile> allResourceProfiles = this.getResourceProfilesFromAllSlots();
            Map<Boolean, List<PendingRequest>> fulfillableAndUnfulfillableRequests = pendingBatchRequests.stream().collect(Collectors.partitioningBy(DeclarativeSlotPoolBridge.canBeFulfilledWithAnySlot(allResourceProfiles)));
            List<PendingRequest> fulfillableRequests = fulfillableAndUnfulfillableRequests.get(true);
            List<PendingRequest> unfulfillableRequests = fulfillableAndUnfulfillableRequests.get(false);
            long currentTimestamp = this.getRelativeTimeMillis();
            for (PendingRequest fulfillableRequest : fulfillableRequests) {
                fulfillableRequest.markFulfillable();
            }
            for (PendingRequest unfulfillableRequest : unfulfillableRequests) {
                unfulfillableRequest.markUnfulfillable(currentTimestamp);
                if (unfulfillableRequest.getUnfulfillableSince() + this.batchSlotTimeout.toMilliseconds() > currentTimestamp) continue;
                this.timeoutPendingSlotRequest(unfulfillableRequest.getSlotRequestId());
            }
        }
        if (this.componentMainThreadExecutor != null) {
            this.componentMainThreadExecutor.schedule(this::checkBatchSlotTimeout, this.batchSlotTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    private Set<ResourceProfile> getResourceProfilesFromAllSlots() {
        return Stream.concat(this.getAvailableSlotsInformation().stream(), this.getAllocatedSlotsInformation().stream()).map(SlotInfo::getResourceProfile).collect(Collectors.toSet());
    }

    private Collection<PendingRequest> getPendingBatchRequests() {
        return this.pendingRequests.values().stream().filter(PendingRequest::isBatchRequest).collect(Collectors.toList());
    }

    private static Predicate<PendingRequest> canBeFulfilledWithAnySlot(Set<ResourceProfile> allocatedResourceProfiles) {
        return pendingRequest -> {
            for (ResourceProfile allocatedResourceProfile : allocatedResourceProfiles) {
                if (!allocatedResourceProfile.isMatching(pendingRequest.getResourceProfile())) continue;
                return true;
            }
            return false;
        };
    }

    private static final class PendingRequestSlotMatching {
        private final PendingRequest pendingRequest;
        private final PhysicalSlot matchedSlot;

        private PendingRequestSlotMatching(PendingRequest pendingRequest, PhysicalSlot matchedSlot) {
            this.pendingRequest = pendingRequest;
            this.matchedSlot = matchedSlot;
        }

        public static PendingRequestSlotMatching createFor(PendingRequest pendingRequest, PhysicalSlot newSlot) {
            return new PendingRequestSlotMatching(pendingRequest, newSlot);
        }

        public void fulfillPendingRequest() {
            Preconditions.checkState((boolean)this.pendingRequest.fulfill(this.matchedSlot), (Object)"Pending requests must be fulfillable.");
        }
    }

    private static final class PendingRequest {
        private final SlotRequestId slotRequestId;
        private final ResourceProfile resourceProfile;
        private final CompletableFuture<PhysicalSlot> slotFuture;
        private final boolean isBatchRequest;
        private long unfulfillableSince;

        private PendingRequest(SlotRequestId slotRequestId, ResourceProfile resourceProfile, boolean isBatchRequest) {
            this.slotRequestId = slotRequestId;
            this.resourceProfile = resourceProfile;
            this.isBatchRequest = isBatchRequest;
            this.slotFuture = new CompletableFuture();
            this.unfulfillableSince = Long.MAX_VALUE;
        }

        static PendingRequest createBatchRequest(SlotRequestId slotRequestId, ResourceProfile resourceProfile) {
            return new PendingRequest(slotRequestId, resourceProfile, true);
        }

        static PendingRequest createNormalRequest(SlotRequestId slotRequestId, ResourceProfile resourceProfile) {
            return new PendingRequest(slotRequestId, resourceProfile, false);
        }

        SlotRequestId getSlotRequestId() {
            return this.slotRequestId;
        }

        ResourceProfile getResourceProfile() {
            return this.resourceProfile;
        }

        CompletableFuture<PhysicalSlot> getSlotFuture() {
            return this.slotFuture;
        }

        void failRequest(Exception cause) {
            this.slotFuture.completeExceptionally(cause);
        }

        public boolean isBatchRequest() {
            return this.isBatchRequest;
        }

        public void markFulfillable() {
            this.unfulfillableSince = Long.MAX_VALUE;
        }

        public void markUnfulfillable(long currentTimestamp) {
            this.unfulfillableSince = currentTimestamp;
        }

        public long getUnfulfillableSince() {
            return this.unfulfillableSince;
        }

        public boolean fulfill(PhysicalSlot slot) {
            return this.slotFuture.complete(slot);
        }

        public String toString() {
            return "PendingRequest{slotRequestId=" + (Object)((Object)this.slotRequestId) + ", resourceProfile=" + this.resourceProfile + ", isBatchRequest=" + this.isBatchRequest + ", unfulfillableSince=" + this.unfulfillableSince + '}';
        }
    }
}

