/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.job;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

public class JobVertexBackPressureHandler
extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, JobVertexBackPressureInfo, JobVertexMessageParameters> {
    public JobVertexBackPressureHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, Map<String, String> responseHeaders, MessageHeaders<EmptyRequestBody, JobVertexBackPressureInfo, JobVertexMessageParameters> messageHeaders) {
        super(leaderRetriever, timeout, responseHeaders, messageHeaders);
    }

    @Override
    protected CompletableFuture<JobVertexBackPressureInfo> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
        JobID jobId = (JobID)request.getPathParameter(JobIDPathParameter.class);
        JobVertexID jobVertexId = (JobVertexID)request.getPathParameter(JobVertexIdPathParameter.class);
        return gateway.requestOperatorBackPressureStats(jobId, jobVertexId).thenApply(operatorBackPressureStats -> operatorBackPressureStats.getOperatorBackPressureStats().map(JobVertexBackPressureHandler::createJobVertexBackPressureInfo).orElse(JobVertexBackPressureInfo.deprecated()));
    }

    private static JobVertexBackPressureInfo createJobVertexBackPressureInfo(OperatorBackPressureStats operatorBackPressureStats) {
        return new JobVertexBackPressureInfo(JobVertexBackPressureInfo.VertexBackPressureStatus.OK, JobVertexBackPressureHandler.getBackPressureLevel(operatorBackPressureStats.getMaxBackPressureRatio()), operatorBackPressureStats.getEndTimestamp(), IntStream.range(0, operatorBackPressureStats.getNumberOfSubTasks()).mapToObj(subtask -> {
            double backPressureRatio = operatorBackPressureStats.getBackPressureRatio(subtask);
            return new JobVertexBackPressureInfo.SubtaskBackPressureInfo(subtask, JobVertexBackPressureHandler.getBackPressureLevel(backPressureRatio), backPressureRatio);
        }).collect(Collectors.toList()));
    }

    private static JobVertexBackPressureInfo.VertexBackPressureLevel getBackPressureLevel(double backPressureRatio) {
        if (backPressureRatio <= 0.1) {
            return JobVertexBackPressureInfo.VertexBackPressureLevel.OK;
        }
        if (backPressureRatio <= 0.5) {
            return JobVertexBackPressureInfo.VertexBackPressureLevel.LOW;
        }
        return JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH;
    }
}

