package org.jetlinks.core.server.session;

import io.netty.util.internal.ThreadLocalRandom;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import lombok.Generated;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.server.ClientConnection;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/core/server/session/MultiConnectionDeviceSession.class */
public abstract class MultiConnectionDeviceSession<C extends ClientConnection> extends CopyOnWriteArrayList<C> implements DeviceSession {
    private final Disposable.Composite disposable = Disposables.composite();

    @Generated
    private final String id;

    @Generated
    private final transient DeviceOperator operator;
    protected final transient org.jetlinks.core.device.session.DeviceSessionManager sessionManager;

    @Override // org.jetlinks.core.server.session.DeviceSession
    public String getDeviceId() {
        return this.id;
    }

    @Override // org.jetlinks.core.server.session.DeviceSession
    public abstract long lastPingTime();

    @Override // org.jetlinks.core.server.session.DeviceSession
    public abstract long connectTime();

    public void registerConnection(C c) {
        if (c.isAlive() && addIfAbsent(c)) {
            c.onDisconnect(() -> {
                handleDisconnect(c);
            });
        }
    }

    private void handleDisconnect(C c) {
        unregisterConnection(c);
        this.sessionManager.getSession(getDeviceId(), true).subscribe();
    }

    protected void unregisterConnection(C c) {
        synchronized (this) {
            remove(c);
        }
    }

    @Override // org.jetlinks.core.server.session.DeviceSession
    public Mono<Boolean> send(EncodedMessage encodedMessage) {
        C takeConnection = takeConnection();
        return takeConnection == null ? Mono.error(new DeviceOperationException.NoStackTrace(ErrorCode.CONNECTION_LOST)) : Mono.defer(() -> {
            return takeConnection.sendMessage(encodedMessage);
        }).thenReturn(true);
    }

    @Override // org.jetlinks.core.server.session.DeviceSession
    public abstract Transport getTransport();

    @Override // org.jetlinks.core.server.session.DeviceSession
    public void close() {
        this.disposable.dispose();
        synchronized (this) {
            Iterator<C> it = iterator();
            while (it.hasNext()) {
                ((ClientConnection) it.next()).disconnect();
            }
            clear();
        }
    }

    @Override // org.jetlinks.core.server.session.DeviceSession
    public abstract void ping();

    @Override // org.jetlinks.core.server.session.DeviceSession
    public boolean isAlive() {
        if (this.disposable.isDisposed()) {
            return false;
        }
        boolean z = false;
        Iterator<C> it = iterator();
        while (it.hasNext()) {
            z |= ((ClientConnection) it.next()).isAlive();
        }
        return z;
    }

    @Override // org.jetlinks.core.server.session.DeviceSession
    public void onClose(Runnable runnable) {
        Disposable.Composite composite = this.disposable;
        runnable.getClass();
        composite.add(runnable::run);
    }

    @Override // org.jetlinks.core.server.session.DeviceSession
    public Optional<InetSocketAddress> getClientAddress() {
        return Optional.ofNullable(takeConnection()).map((v0) -> {
            return v0.address();
        });
    }

    @Override // org.jetlinks.core.server.session.DeviceSession
    public abstract void setKeepAliveTimeout(Duration duration);

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.jetlinks.core.server.ClientConnection] */
    /* JADX WARN: Type inference failed for: r0v19, types: [org.jetlinks.core.server.ClientConnection] */
    protected final C takeConnection() {
        while (true) {
            synchronized (this) {
                int size = size();
                if (size == 0) {
                    return null;
                }
                C c = size == 1 ? (ClientConnection) get(0) : (ClientConnection) get(ThreadLocalRandom.current().nextInt(size));
                if (c.isAlive()) {
                    return c;
                }
                c.disconnect();
                handleDisconnect(c);
            }
        }
    }

    public MultiConnectionDeviceSession(String str, DeviceOperator deviceOperator, org.jetlinks.core.device.session.DeviceSessionManager deviceSessionManager) {
        this.id = str;
        this.operator = deviceOperator;
        this.sessionManager = deviceSessionManager;
    }

    @Override // org.jetlinks.core.server.session.DeviceSession
    public String getId() {
        return this.id;
    }

    @Override // org.jetlinks.core.server.session.DeviceSession
    public DeviceOperator getOperator() {
        return this.operator;
    }
}
