package org.jetlinks.core.device;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.BroadcastMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.server.MessageHandler;
import org.jetlinks.core.utils.Reactors;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:org/jetlinks/core/device/StandaloneDeviceMessageBroker.class */
public class StandaloneDeviceMessageBroker implements DeviceOperationBroker, MessageHandler {
    private static final Logger log = LoggerFactory.getLogger(StandaloneDeviceMessageBroker.class);
    private final Sinks.Many<Message> messageEmitterProcessor;
    private final Map<String, Sinks.Many<DeviceMessageReply>> replyProcessor;
    private final Map<String, AtomicInteger> partCache;
    private final List<Function<Message, Mono<Void>>> handlers;
    private ReplyFailureHandler replyFailureHandler;
    private Function<Publisher<String>, Flux<DeviceStateInfo>> stateHandler;

    public StandaloneDeviceMessageBroker() {
        this(Sinks.many().multicast().onBackpressureBuffer());
    }

    public StandaloneDeviceMessageBroker(Sinks.Many<Message> many) {
        this.replyProcessor = new ConcurrentHashMap();
        this.partCache = new ConcurrentHashMap();
        this.handlers = new CopyOnWriteArrayList();
        this.replyFailureHandler = (th, deviceMessageReply) -> {
            log.info("unhandled reply message:{}", deviceMessageReply, th);
        };
        this.messageEmitterProcessor = many;
    }

    @Override // org.jetlinks.core.server.MessageHandler
    public Flux<Message> handleSendToDeviceMessage(String str) {
        return this.messageEmitterProcessor.asFlux();
    }

    @Override // org.jetlinks.core.server.MessageHandler
    public Disposable handleGetDeviceState(String str, Function<Publisher<String>, Flux<DeviceStateInfo>> function) {
        this.stateHandler = function;
        return () -> {
            this.stateHandler = null;
        };
    }

    @Override // org.jetlinks.core.server.MessageHandler
    public Disposable handleSendToDeviceMessage(String str, Function<Message, Mono<Void>> function) {
        this.handlers.add(function);
        return () -> {
            this.handlers.remove(function);
        };
    }

    @Override // org.jetlinks.core.device.DeviceOperationBroker
    public Flux<DeviceStateInfo> getDeviceState(String str, Collection<String> collection) {
        return this.stateHandler != null ? this.stateHandler.apply(Flux.fromIterable(collection)) : Flux.empty();
    }

    @Override // org.jetlinks.core.server.MessageHandler
    public Mono<Boolean> reply(DeviceMessageReply deviceMessageReply) {
        return Mono.defer(() -> {
            String messageId = deviceMessageReply.getMessageId();
            if (StringUtils.isEmpty(messageId)) {
                log.warn("reply message messageId is empty: {}", deviceMessageReply);
                return Mono.just(false);
            }
            String str = (String) deviceMessageReply.getHeader(Headers.fragmentBodyMessageId).orElse(null);
            if (str == null) {
                Sinks.Many<DeviceMessageReply> many = this.replyProcessor.get(messageId);
                Sinks.EmitResult tryEmitNext = many.tryEmitNext(deviceMessageReply);
                if (!tryEmitNext.isFailure()) {
                    many.tryEmitComplete();
                    return Mono.just(true);
                }
                this.replyProcessor.remove(messageId);
                this.replyFailureHandler.handle(new DeviceOperationException.NoStackTrace(ErrorCode.SYSTEM_ERROR, "no reply handler " + tryEmitNext.name()), deviceMessageReply);
                return Mono.just(false);
            }
            Sinks.Many<DeviceMessageReply> orDefault = this.replyProcessor.getOrDefault(str, this.replyProcessor.get(messageId));
            if (orDefault == null || orDefault.currentSubscriberCount() == 0) {
                this.replyFailureHandler.handle(new NullPointerException("no reply handler"), deviceMessageReply);
                this.replyProcessor.remove(str);
                return Mono.just(false);
            }
            int intValue = ((Integer) deviceMessageReply.getHeader(Headers.fragmentNumber).orElse(1)).intValue();
            AtomicInteger computeIfAbsent = this.partCache.computeIfAbsent(str, str2 -> {
                return new AtomicInteger(intValue);
            });
            orDefault.emitNext(deviceMessageReply, Sinks.EmitFailureHandler.FAIL_FAST);
            if (computeIfAbsent.decrementAndGet() <= 0) {
                orDefault.tryEmitComplete();
                this.replyProcessor.remove(str);
            }
            return Mono.just(true);
        }).doOnError(th -> {
            this.replyFailureHandler.handle(th, deviceMessageReply);
        });
    }

    @Override // org.jetlinks.core.device.DeviceOperationBroker
    public Flux<DeviceMessageReply> handleReply(String str, String str2, Duration duration) {
        return ((Flux) this.replyProcessor.computeIfAbsent(str2, str3 -> {
            return Sinks.many().multicast().onBackpressureBuffer();
        }).asFlux().as(flux -> {
            return duration.isZero() ? flux : flux.timeout(duration, Mono.error(() -> {
                return new DeviceOperationException(ErrorCode.TIME_OUT);
            }));
        })).doFinally(signalType -> {
            this.replyProcessor.remove(str2);
        });
    }

    @Override // org.jetlinks.core.device.DeviceOperationBroker
    public Mono<Integer> send(String str, Publisher<? extends Message> publisher) {
        return (this.messageEmitterProcessor.currentSubscriberCount() == 0 && this.handlers.isEmpty()) ? Reactors.ALWAYS_ZERO : Flux.from(publisher).flatMap(this::send).then(Reactors.ALWAYS_ONE);
    }

    private Mono<Void> send(Message message) {
        if (this.messageEmitterProcessor.currentSubscriberCount() > 0) {
            this.messageEmitterProcessor.emitNext(message, Reactors.emitFailureHandler());
        }
        int size = this.handlers.size();
        return size == 0 ? Mono.empty() : size == 1 ? this.handlers.get(0).apply(message) : Flux.fromIterable(this.handlers).flatMap(function -> {
            return (Mono) function.apply(message);
        }).then();
    }

    @Override // org.jetlinks.core.device.DeviceOperationBroker
    public Mono<Integer> send(Publisher<? extends BroadcastMessage> publisher) {
        return Mono.just(0);
    }

    public void setReplyFailureHandler(ReplyFailureHandler replyFailureHandler) {
        this.replyFailureHandler = replyFailureHandler;
    }
}
