package org.hswebframework.web.cache.supports;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import org.hswebframework.web.cache.ReactiveCache;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoOperator;
import reactor.core.publisher.Sinks;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

/* loaded from: input_file:org/hswebframework/web/cache/supports/AbstractReactiveCache.class */
public abstract class AbstractReactiveCache<E> implements ReactiveCache<E> {
    private static final Logger log = LoggerFactory.getLogger(AbstractReactiveCache.class);
    static final Sinks.EmitFailureHandler emitFailureHandler = Sinks.EmitFailureHandler.busyLooping(Duration.ofSeconds(30));
    private final Map<Object, CacheLoader> cacheLoading = new ConcurrentHashMap();

    /* loaded from: input_file:org/hswebframework/web/cache/supports/AbstractReactiveCache$CacheLoader.class */
    protected static class CacheLoader extends MonoOperator<Object, Object> {
        private final AbstractReactiveCache<?> parent;
        private final Object key;
        private Mono<? extends Object> defaultValue;
        private final Sinks.One<Object> holder;
        private volatile Disposable loading;

        protected CacheLoader(AbstractReactiveCache<?> abstractReactiveCache, Object obj, Mono<? extends Object> mono) {
            super(mono.cache());
            this.holder = Sinks.one();
            this.parent = abstractReactiveCache;
            this.key = obj;
        }

        protected void defaultValue(Mono<? extends Object> mono, ContextView contextView) {
            if (this.defaultValue != null) {
                return;
            }
            this.defaultValue = mono;
            tryLoad(contextView);
        }

        private void tryLoad(ContextView contextView) {
            if (this.holder.currentSubscriberCount() == 1 && this.loading == null) {
                Mono mono = this.source;
                if (this.defaultValue != null) {
                    mono = mono.switchIfEmpty(this.defaultValue.flatMap(obj -> {
                        return this.parent.putNow(this.key, obj).thenReturn(obj);
                    }));
                }
                this.loading = mono.subscribe(obj2 -> {
                    complete();
                    this.holder.emitValue(obj2, AbstractReactiveCache.emitFailureHandler);
                }, th -> {
                    complete();
                    this.holder.emitError(th, AbstractReactiveCache.emitFailureHandler);
                }, () -> {
                    complete();
                    this.holder.emitEmpty(AbstractReactiveCache.emitFailureHandler);
                }, Context.of(contextView));
            }
        }

        public void subscribe(CoreSubscriber<? super Object> coreSubscriber) {
            this.holder.asMono().subscribe(coreSubscriber);
            tryLoad(coreSubscriber.currentContext());
        }

        private void complete() {
            ((AbstractReactiveCache) this.parent).cacheLoading.remove(this.key, this);
        }
    }

    protected abstract Mono<Object> getNow(Object obj);

    public abstract Mono<Void> putNow(Object obj, Object obj2);

    @Override // org.hswebframework.web.cache.ReactiveCache
    public final Mono<E> getMono(Object obj) {
        return this.cacheLoading.computeIfAbsent(obj, obj2 -> {
            return new CacheLoader(this, obj2, getNow(obj2));
        }).onErrorResume(th -> {
            return handleLoaderError(obj, th);
        });
    }

    @Override // org.hswebframework.web.cache.ReactiveCache
    public final Mono<E> getMono(Object obj, Supplier<Mono<E>> supplier) {
        return Mono.deferContextual(contextView -> {
            return this.cacheLoading.compute(obj, (obj2, cacheLoader) -> {
                CacheLoader cacheLoader = new CacheLoader(this, obj2, getNow(obj2));
                cacheLoader.defaultValue((Mono) supplier.get(), contextView);
                return cacheLoader;
            });
        }).onErrorResume(th -> {
            return handleLoaderError(obj, th);
        });
    }

    @Override // org.hswebframework.web.cache.ReactiveCache
    public final Flux<E> getFlux(Object obj) {
        return this.cacheLoading.computeIfAbsent(obj, obj2 -> {
            return new CacheLoader(this, obj2, getNow(obj2));
        }).flatMapIterable(obj3 -> {
            return (List) obj3;
        }).onErrorResume(th -> {
            return handleLoaderError(obj, th);
        });
    }

    @Override // org.hswebframework.web.cache.ReactiveCache
    public final Flux<E> getFlux(Object obj, Supplier<Flux<E>> supplier) {
        return Flux.deferContextual(contextView -> {
            return this.cacheLoading.compute(obj, (obj2, cacheLoader) -> {
                CacheLoader cacheLoader = new CacheLoader(this, obj2, getNow(obj2));
                cacheLoader.defaultValue(((Flux) supplier.get()).collectList(), contextView);
                return cacheLoader;
            }).flatMapIterable(obj3 -> {
                return (List) obj3;
            });
        }).onErrorResume(th -> {
            return handleLoaderError(obj, th);
        });
    }

    protected Mono<E> handleLoaderError(Object obj, Throwable th) {
        log.warn("load cache error,key:{},evict it.", obj, th);
        return evict(obj).then(Mono.empty());
    }

    @Override // org.hswebframework.web.cache.ReactiveCache
    public final Mono<Void> put(Object obj, Publisher<E> publisher) {
        return publisher instanceof Mono ? Mono.from(publisher).flatMap(obj2 -> {
            return putNow(obj, obj2);
        }) : Flux.from(publisher).collectList().flatMap(list -> {
            return putNow(obj, list);
        });
    }

    @Override // org.hswebframework.web.cache.ReactiveCache
    public abstract Mono<Void> evict(Object obj);

    @Override // org.hswebframework.web.cache.ReactiveCache
    public Flux<E> getAll(Object... objArr) {
        return Flux.just(objArr).flatMap(this::getMono);
    }

    @Override // org.hswebframework.web.cache.ReactiveCache
    public abstract Mono<Void> evictAll(Iterable<?> iterable);

    @Override // org.hswebframework.web.cache.ReactiveCache
    public abstract Mono<Void> clear();
}
