/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout;

import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ConsumerStatus;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.DescribeStreamConsumerResponse;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisException;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherConfiguration;
import org.apache.flink.streaming.connectors.kinesis.proxy.FullJitterBackoff;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyV2Interface;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class StreamConsumerRegistrar {
    private static final Logger LOG = LoggerFactory.getLogger(StreamConsumerRegistrar.class);
    private final KinesisProxyV2Interface kinesisProxyV2Interface;
    private final FanOutRecordPublisherConfiguration configuration;
    private final FullJitterBackoff backoff;

    public StreamConsumerRegistrar(KinesisProxyV2Interface kinesisProxyV2Interface, FanOutRecordPublisherConfiguration configuration, FullJitterBackoff backoff) {
        this.kinesisProxyV2Interface = (KinesisProxyV2Interface)Preconditions.checkNotNull((Object)kinesisProxyV2Interface);
        this.configuration = (FanOutRecordPublisherConfiguration)Preconditions.checkNotNull((Object)configuration);
        this.backoff = (FullJitterBackoff)Preconditions.checkNotNull((Object)backoff);
    }

    public String registerStreamConsumer(String stream, String streamConsumerName) throws ExecutionException, InterruptedException {
        LOG.debug("Registering stream consumer - {}::{}", (Object)stream, (Object)streamConsumerName);
        int attempt = 1;
        if (this.configuration.getEfoRegistrationType() == ConsumerConfigConstants.EFORegistrationType.LAZY) {
            this.registrationBackoff(this.configuration, this.backoff, attempt++);
        }
        DescribeStreamSummaryResponse describeStreamSummaryResponse = this.kinesisProxyV2Interface.describeStreamSummary(stream);
        String streamArn = describeStreamSummaryResponse.streamDescriptionSummary().streamARN();
        LOG.debug("Found stream ARN - {}", (Object)streamArn);
        Optional<DescribeStreamConsumerResponse> describeStreamConsumerResponse = this.describeStreamConsumer(streamArn, streamConsumerName);
        if (!describeStreamConsumerResponse.isPresent()) {
            this.invokeIgnoringResourceInUse(() -> this.kinesisProxyV2Interface.registerStreamConsumer(streamArn, streamConsumerName));
        }
        String streamConsumerArn = this.waitForConsumerToBecomeActive(describeStreamConsumerResponse.orElse(null), streamArn, streamConsumerName, attempt);
        LOG.debug("Using stream consumer - {}", (Object)streamConsumerArn);
        return streamConsumerArn;
    }

    public void deregisterStreamConsumer(String stream) throws InterruptedException, ExecutionException {
        LOG.debug("Deregistering stream consumer - {}", (Object)stream);
        int attempt = 1;
        String streamConsumerArn = this.getStreamConsumerArn(stream);
        this.deregistrationBackoff(this.configuration, this.backoff, attempt++);
        Optional<DescribeStreamConsumerResponse> response = this.describeStreamConsumer(streamConsumerArn);
        if (response.isPresent() && response.get().consumerDescription().consumerStatus() != ConsumerStatus.DELETING) {
            this.invokeIgnoringResourceInUse(() -> this.kinesisProxyV2Interface.deregisterStreamConsumer(streamConsumerArn));
        }
        this.waitForConsumerToDeregister(response.orElse(null), streamConsumerArn, attempt);
        LOG.debug("Deregistered stream consumer - {}", (Object)streamConsumerArn);
    }

    public void close() {
        this.kinesisProxyV2Interface.close();
    }

    @VisibleForTesting
    void registrationBackoff(FanOutRecordPublisherConfiguration configuration, FullJitterBackoff backoff, int attempt) throws InterruptedException {
        long backoffMillis = backoff.calculateFullJitterBackoff(configuration.getRegisterStreamBaseBackoffMillis(), configuration.getRegisterStreamMaxBackoffMillis(), configuration.getRegisterStreamExpConstant(), attempt);
        backoff.sleep(backoffMillis);
    }

    @VisibleForTesting
    void deregistrationBackoff(FanOutRecordPublisherConfiguration configuration, FullJitterBackoff backoff, int attempt) throws InterruptedException {
        long backoffMillis = backoff.calculateFullJitterBackoff(configuration.getDeregisterStreamBaseBackoffMillis(), configuration.getDeregisterStreamMaxBackoffMillis(), configuration.getDeregisterStreamExpConstant(), attempt);
        backoff.sleep(backoffMillis);
    }

    private String waitForConsumerToBecomeActive(@Nullable DescribeStreamConsumerResponse describeStreamConsumerResponse, String streamArn, String streamConsumerName, int initialAttempt) throws InterruptedException, ExecutionException {
        int attempt = initialAttempt;
        Instant start = Instant.now();
        Duration timeout = this.configuration.getRegisterStreamConsumerTimeout();
        DescribeStreamConsumerResponse response = describeStreamConsumerResponse;
        while (response == null || response.consumerDescription().consumerStatus() != ConsumerStatus.ACTIVE) {
            LOG.debug("Waiting for stream consumer to become active, attempt {} - {} on {}", new Object[]{attempt, streamConsumerName, streamArn});
            this.registrationBackoff(this.configuration, this.backoff, attempt++);
            response = this.kinesisProxyV2Interface.describeStreamConsumer(streamArn, streamConsumerName);
            if (Duration.between(start, Instant.now()).compareTo(timeout) <= 0) continue;
            throw new FlinkKinesisException.FlinkKinesisTimeoutException("Timeout waiting for stream consumer to become active: " + streamConsumerName + " on " + streamArn);
        }
        return response.consumerDescription().consumerARN();
    }

    private void waitForConsumerToDeregister(@Nullable DescribeStreamConsumerResponse describeStreamConsumerResponse, String streamConsumerArn, int initialAttempt) throws InterruptedException, ExecutionException {
        int attempt = initialAttempt;
        Instant start = Instant.now();
        Duration timeout = this.configuration.getDeregisterStreamConsumerTimeout();
        Optional<DescribeStreamConsumerResponse> response = Optional.ofNullable(describeStreamConsumerResponse);
        while (response.isPresent() && response.get().consumerDescription().consumerStatus() != ConsumerStatus.DELETING) {
            LOG.debug("Waiting for stream consumer to deregister, attempt {} - {}", (Object)attempt, (Object)streamConsumerArn);
            this.deregistrationBackoff(this.configuration, this.backoff, attempt++);
            response = this.describeStreamConsumer(streamConsumerArn);
            if (Duration.between(start, Instant.now()).compareTo(timeout) <= 0) continue;
            throw new FlinkKinesisException.FlinkKinesisTimeoutException("Timeout waiting for stream consumer to deregister: " + streamConsumerArn);
        }
    }

    private Optional<DescribeStreamConsumerResponse> describeStreamConsumer(String streamArn, String streamConsumerName) throws InterruptedException, ExecutionException {
        return this.describeStreamConsumer(() -> this.kinesisProxyV2Interface.describeStreamConsumer(streamArn, streamConsumerName));
    }

    private Optional<DescribeStreamConsumerResponse> describeStreamConsumer(String streamConsumerArn) throws InterruptedException, ExecutionException {
        return this.describeStreamConsumer(() -> this.kinesisProxyV2Interface.describeStreamConsumer(streamConsumerArn));
    }

    private Optional<DescribeStreamConsumerResponse> describeStreamConsumer(ResponseSupplier<DescribeStreamConsumerResponse> responseSupplier) throws InterruptedException, ExecutionException {
        DescribeStreamConsumerResponse response;
        try {
            response = responseSupplier.get();
        }
        catch (ExecutionException ex) {
            if (this.isResourceNotFound(ex)) {
                return Optional.empty();
            }
            throw ex;
        }
        return Optional.ofNullable(response);
    }

    private <T> void invokeIgnoringResourceInUse(ResponseSupplier<T> responseSupplier) throws InterruptedException, ExecutionException {
        try {
            responseSupplier.get();
        }
        catch (ExecutionException ex) {
            if (this.isResourceInUse(ex)) {
                return;
            }
            throw ex;
        }
    }

    private boolean isResourceNotFound(ExecutionException ex) {
        return ex.getCause() instanceof ResourceNotFoundException;
    }

    private boolean isResourceInUse(ExecutionException ex) {
        return ex.getCause() instanceof ResourceInUseException;
    }

    private String getStreamConsumerArn(String stream) {
        Optional<String> streamConsumerArn = this.configuration.getStreamConsumerArn(stream);
        if (!streamConsumerArn.isPresent()) {
            throw new IllegalArgumentException("Stream consumer ARN not found for stream: " + stream);
        }
        return streamConsumerArn.get();
    }

    private static interface ResponseSupplier<T> {
        public T get() throws ExecutionException, InterruptedException;
    }
}

