package org.jetlinks.core.server.session;

import io.opentelemetry.api.common.AttributeKey;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.jetlinks.core.command.Command;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.ToDeviceMessageContext;
import org.jetlinks.core.message.codec.TraceDeviceSession;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.trace.DeviceTracer;
import org.jetlinks.core.trace.FluxTracer;
import org.jetlinks.core.trace.TraceHolder;
import org.jetlinks.core.utils.Reactors;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/core/server/session/DeviceSession.class */
public interface DeviceSession {
    String getId();

    String getDeviceId();

    @Nullable
    DeviceOperator getOperator();

    long lastPingTime();

    long connectTime();

    Mono<Boolean> send(EncodedMessage encodedMessage);

    @Nonnull
    @Deprecated
    default <V> Mono<V> execute(@Nonnull Command<V> command) {
        return Mono.error(UnsupportedOperationException::new);
    }

    Transport getTransport();

    void close();

    @Deprecated
    void ping();

    boolean isAlive();

    void onClose(Runnable runnable);

    @Deprecated
    default Optional<String> getServerId() {
        return Optional.empty();
    }

    default Optional<InetSocketAddress> getClientAddress() {
        return Optional.empty();
    }

    default void keepAlive() {
        ping();
    }

    default void setKeepAliveTimeout(Duration duration) {
    }

    default Duration getKeepAliveTimeout() {
        return Duration.ZERO;
    }

    default boolean isWrapFrom(Class<?> cls) {
        return cls.isInstance(this);
    }

    default <T extends DeviceSession> T unwrap(Class<T> cls) {
        return cls.cast(this);
    }

    default Mono<Boolean> isAliveAsync() {
        return Mono.fromSupplier(this::isAlive);
    }

    default boolean isChanged(DeviceSession deviceSession) {
        return !equals(deviceSession);
    }

    default Mono<Boolean> send(ToDeviceMessageContext toDeviceMessageContext) {
        DeviceOperator device = toDeviceMessageContext.getDevice();
        if (device == null) {
            return Reactors.ALWAYS_FALSE;
        }
        Flux flux = (Flux) device.getProtocol().flatMap(protocolSupport -> {
            return protocolSupport.getMessageCodec(getTransport());
        }).flatMapMany(deviceMessageCodec -> {
            return deviceMessageCodec.mo60encode(toDeviceMessageContext);
        }).as(FluxTracer.create(DeviceTracer.SpanName.encode0(device.getDeviceId()), (reactiveSpan, encodedMessage) -> {
            reactiveSpan.setAttribute((AttributeKey<AttributeKey<String>>) DeviceTracer.SpanKey.message, (AttributeKey<String>) encodedMessage.toString());
        }));
        toDeviceMessageContext.getClass();
        return (Mono) flux.as((v1) -> {
            return r1.sendToDevice(v1);
        });
    }

    static DeviceSession trace(DeviceSession deviceSession) {
        return TraceHolder.isDisabled() ? deviceSession : TraceDeviceSession.of(deviceSession);
    }
}
