package org.jetlinks.core.utils;

import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;

/* loaded from: input_file:org/jetlinks/core/utils/DistinctDurationFlux.class */
public class DistinctDurationFlux<T> extends FluxOperator<T, T> {
    private final Function<T, ?> keySelector;
    private final Duration duration;

    /* loaded from: input_file:org/jetlinks/core/utils/DistinctDurationFlux$DistinctDurationSubscriber.class */
    static class DistinctDurationSubscriber<T> extends ConcurrentHashMap<Object, Long> implements CoreSubscriber<T>, Subscription, Runnable, Scannable {
        private final CoreSubscriber<? super T> actual;
        private final Function<T, ?> keySelector;
        private final long expires;
        private final Disposable disposable;
        private volatile Subscription subscription;
        static final AtomicReferenceFieldUpdater<DistinctDurationSubscriber, Subscription> S = AtomicReferenceFieldUpdater.newUpdater(DistinctDurationSubscriber.class, Subscription.class, "subscription");

        DistinctDurationSubscriber(CoreSubscriber<? super T> coreSubscriber, Function<T, ?> function, long j) {
            this.actual = coreSubscriber;
            this.keySelector = function;
            this.expires = j;
            int i = (int) (j * 1.1d);
            this.disposable = Schedulers.parallel().schedulePeriodically(this, i, i, TimeUnit.MILLISECONDS);
        }

        @Nonnull
        public Context currentContext() {
            return this.actual.currentContext();
        }

        @Override // java.lang.Runnable
        public void run() {
            cleanup();
        }

        private void cleanup() {
            long currentTimeMillis = System.currentTimeMillis();
            forEach((obj, l) -> {
                if (currentTimeMillis - l.longValue() > this.expires) {
                    remove(obj);
                }
            });
        }

        public void request(long j) {
            Subscription subscription;
            if (!Operators.validate(j) || (subscription = this.subscription) == null) {
                return;
            }
            subscription.request(j);
        }

        public void cancel() {
            if (Operators.terminate(S, this)) {
                complete();
            }
        }

        public void onSubscribe(@Nonnull Subscription subscription) {
            if (Operators.setOnce(S, this, subscription)) {
                this.actual.onSubscribe(this);
            }
        }

        public void onNext(T t) {
            try {
                long currentTimeMillis = System.currentTimeMillis();
                Object apply = this.keySelector.apply(t);
                if (apply == null) {
                    this.actual.onNext(t);
                    return;
                }
                Long putIfAbsent = putIfAbsent(apply, Long.valueOf(currentTimeMillis));
                if (putIfAbsent == null || currentTimeMillis - putIfAbsent.longValue() > this.expires) {
                    this.actual.onNext(t);
                } else {
                    Operators.onDiscard(t, this.actual.currentContext());
                    request(1L);
                }
            } catch (Throwable th) {
                onError(th);
            }
        }

        public void onError(Throwable th) {
            try {
                if (S.getAndSet(this, Operators.cancelledSubscription()) == Operators.cancelledSubscription()) {
                    Operators.onErrorDropped(th, currentContext());
                } else {
                    this.actual.onError(th);
                }
            } catch (Throwable th2) {
                Operators.onErrorDropped(Exceptions.addSuppressed(th, th2), currentContext());
            } finally {
                complete();
            }
        }

        public void onComplete() {
            try {
            } catch (Throwable th) {
                Operators.onErrorDropped(th, currentContext());
            } finally {
                complete();
            }
            if (S.getAndSet(this, Operators.cancelledSubscription()) != Operators.cancelledSubscription()) {
                this.actual.onComplete();
            }
        }

        private void complete() {
            clear();
            this.disposable.dispose();
        }

        public Object scanUnsafe(@Nonnull Scannable.Attr attr) {
            if (attr == Scannable.Attr.RUN_STYLE) {
                return Scannable.Attr.RunStyle.SYNC;
            }
            if (attr == Scannable.Attr.ACTUAL) {
                return this.actual;
            }
            return null;
        }
    }

    protected DistinctDurationFlux(Flux<? extends T> flux, Function<T, ?> function, Duration duration) {
        super(flux);
        this.keySelector = function;
        this.duration = duration;
    }

    public static <T> Flux<T> create(Flux<? extends T> flux, Function<T, ?> function, Duration duration) {
        return new DistinctDurationFlux(flux, function, duration);
    }

    public void subscribe(@Nonnull CoreSubscriber<? super T> coreSubscriber) {
        this.source.subscribe(new DistinctDurationSubscriber(coreSubscriber, this.keySelector, this.duration.toMillis()));
    }
}
