package org.jetlinks.core.trace;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.time.Instant;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.function.Consumer3;
import reactor.util.context.ContextView;

/* loaded from: input_file:org/jetlinks/core/trace/TraceFlux.class */
public class TraceFlux<T> extends FluxOperator<T, T> {
    private static final Logger log = LoggerFactory.getLogger(TraceFlux.class);
    private final Function<ContextView, CharSequence> spanName;
    private final Tracer tracer;
    private final Consumer3<ContextView, ReactiveSpan, T> onNext;
    private final Consumer3<ContextView, ReactiveSpan, Long> onComplete;
    private final BiConsumer<ContextView, ReactiveSpanBuilder> onSubscription;
    private final BiConsumer<ContextView, Throwable> onError;
    private final boolean fastSubscribe;
    private final Supplier<Context> defaultContext;

    public static <T> TraceFlux<T> trace(Publisher<T> publisher) {
        return new TraceFlux<>(Flux.from(publisher), null, null, null, null, null, null, true, Context::current);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TraceFlux(Flux<? extends T> flux, Function<ContextView, CharSequence> function, Tracer tracer, Consumer3<ContextView, ReactiveSpan, T> consumer3, Consumer3<ContextView, ReactiveSpan, Long> consumer32, BiConsumer<ContextView, ReactiveSpanBuilder> biConsumer, BiConsumer<ContextView, Throwable> biConsumer2, boolean z, Supplier<Context> supplier) {
        super(flux);
        this.spanName = function == null ? contextView -> {
            return name();
        } : function;
        this.tracer = tracer == null ? TraceHolder.telemetry().getTracer(TraceHolder.appName()) : tracer;
        this.onNext = consumer3;
        this.onSubscription = biConsumer;
        this.onComplete = consumer32;
        this.onError = biConsumer2;
        this.fastSubscribe = z;
        this.defaultContext = supplier;
    }

    public TraceFlux<T> onNext(BiConsumer<ReactiveSpan, T> biConsumer) {
        return onNext((contextView, reactiveSpan, obj) -> {
            biConsumer.accept(reactiveSpan, obj);
        });
    }

    public TraceFlux<T> onNext(Consumer3<ContextView, ReactiveSpan, T> consumer3) {
        Consumer3<ContextView, ReactiveSpan, T> consumer32 = this.onNext;
        return new TraceFlux<>(this.source, this.spanName, this.tracer, consumer32 == null ? consumer3 : (contextView, reactiveSpan, obj) -> {
            consumer32.accept(contextView, reactiveSpan, obj);
            consumer3.accept(contextView, reactiveSpan, obj);
        }, this.onComplete, this.onSubscription, this.onError, this.fastSubscribe, this.defaultContext);
    }

    public TraceFlux<T> onComplete(Consumer3<ContextView, ReactiveSpan, Long> consumer3) {
        Consumer3<ContextView, ReactiveSpan, Long> consumer32 = this.onComplete;
        return new TraceFlux<>(this.source, this.spanName, this.tracer, this.onNext, consumer32 == null ? consumer3 : (contextView, reactiveSpan, l) -> {
            consumer32.accept(contextView, reactiveSpan, l);
            consumer3.accept(contextView, reactiveSpan, l);
        }, this.onSubscription, this.onError, this.fastSubscribe, this.defaultContext);
    }

    public TraceFlux<T> onComplete(BiConsumer<Span, Long> biConsumer) {
        return onComplete((contextView, reactiveSpan, l) -> {
            biConsumer.accept(reactiveSpan, l);
        });
    }

    public TraceFlux<T> spanName(String str) {
        return new TraceFlux<>(this.source, contextView -> {
            return str;
        }, this.tracer, this.onNext, this.onComplete, this.onSubscription, this.onError, this.fastSubscribe, this.defaultContext);
    }

    public TraceFlux<T> scopeName(String str) {
        return new TraceFlux<>(this.source, this.spanName, TraceHolder.telemetry().getTracer(str), this.onNext, this.onComplete, this.onSubscription, this.onError, this.fastSubscribe, this.defaultContext);
    }

    public TraceFlux<T> onSubscription(BiConsumer<ContextView, ReactiveSpanBuilder> biConsumer) {
        if (this.onSubscription != null) {
            biConsumer = this.onSubscription.andThen(biConsumer);
        }
        return new TraceFlux<>(this.source, this.spanName, this.tracer, this.onNext, this.onComplete, biConsumer, this.onError, this.fastSubscribe, this.defaultContext);
    }

    public void subscribe(@Nonnull CoreSubscriber<? super T> coreSubscriber) {
        try {
            ContextView currentContext = coreSubscriber.currentContext();
            CharSequence apply = this.spanName.apply(currentContext);
            if (this.fastSubscribe && TraceHolder.isDisabled(apply)) {
                this.source.subscribe(coreSubscriber);
                return;
            }
            ReactiveSpanBuilderWrapper reactiveSpanBuilderWrapper = new ReactiveSpanBuilderWrapper(this.tracer.spanBuilder(apply.toString()));
            Context with = ((Context) currentContext.getOrEmpty(Context.class).orElseGet(this.defaultContext)).with(TraceHolder.SPAN_NAME, apply);
            if (null != this.onSubscription) {
                this.onSubscription.accept(currentContext, reactiveSpanBuilderWrapper);
            }
            Instant now = Instant.now();
            Span startSpan = reactiveSpanBuilderWrapper.mo135setStartTimestamp(now).mo145setParent(with).startSpan();
            try {
                Scope makeCurrent = startSpan.makeCurrent();
                Throwable th = null;
                try {
                    this.source.subscribe(new TraceSubscriber(now, coreSubscriber, startSpan, this.onNext, this.onComplete, this.onError, with));
                    if (makeCurrent != null) {
                        if (0 != 0) {
                            try {
                                makeCurrent.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            makeCurrent.close();
                        }
                    }
                } catch (Throwable th3) {
                    if (makeCurrent != null) {
                        if (0 != 0) {
                            try {
                                makeCurrent.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            makeCurrent.close();
                        }
                    }
                    throw th3;
                }
            } catch (Throwable th5) {
                coreSubscriber.onError(th5);
                startSpan.recordException(th5);
                startSpan.end();
            }
        } catch (Throwable th6) {
            coreSubscriber.onError(th6);
        }
    }
}
