package io.questdb.cutlass.line.tcp;

import io.questdb.cutlass.line.tcp.LineTcpMeasurementScheduler;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.network.IOContext;
import io.questdb.network.IODispatcher;
import io.questdb.network.NetworkFacade;
import io.questdb.std.Mutable;
import io.questdb.std.Unsafe;
import io.questdb.std.Vect;
import io.questdb.std.datetime.millitime.MillisecondClock;
import io.questdb.std.str.DirectByteCharSequence;
import io.questdb.std.str.FloatingDirectCharSink;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/questdb/cutlass/line/tcp/LineTcpConnectionContext.class */
public class LineTcpConnectionContext implements IOContext, Mutable {
    private static final Log LOG;
    private static final long QUEUE_FULL_LOG_HYSTERESIS_IN_MS = 10000;
    protected final NetworkFacade nf;
    private final LineTcpMeasurementScheduler scheduler;
    private final MillisecondClock milliClock;
    protected long fd;
    protected IODispatcher<LineTcpConnectionContext> dispatcher;
    protected long recvBufStart;
    protected long recvBufEnd;
    protected long recvBufPos;
    protected boolean peerDisconnected;
    private boolean goodMeasurement;
    protected long recvBufStartOfMeasurement;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final DirectByteCharSequence byteCharSequence = new DirectByteCharSequence();
    private long lastQueueFullLogMillis = 0;
    private final NewLineProtoParser protoParser = new NewLineProtoParser();
    private final FloatingDirectCharSink charSink = new FloatingDirectCharSink();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/questdb/cutlass/line/tcp/LineTcpConnectionContext$IOContextResult.class */
    public enum IOContextResult {
        NEEDS_READ,
        NEEDS_WRITE,
        QUEUE_FULL,
        NEEDS_DISCONNECT
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LineTcpConnectionContext(LineTcpReceiverConfiguration lineTcpReceiverConfiguration, LineTcpMeasurementScheduler lineTcpMeasurementScheduler) {
        this.nf = lineTcpReceiverConfiguration.getNetworkFacade();
        this.scheduler = lineTcpMeasurementScheduler;
        this.milliClock = lineTcpReceiverConfiguration.getMillisecondClock();
        this.recvBufStart = Unsafe.malloc(lineTcpReceiverConfiguration.getNetMsgBufferSize());
        this.recvBufEnd = this.recvBufStart + lineTcpReceiverConfiguration.getNetMsgBufferSize();
        clear();
    }

    public void clear() {
        this.recvBufPos = this.recvBufStart;
        this.peerDisconnected = false;
        resetParser();
    }

    private void resetParser() {
        this.protoParser.of(this.recvBufStart);
        this.goodMeasurement = true;
        this.recvBufStartOfMeasurement = this.recvBufStart;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r3v0, types: [io.questdb.cutlass.line.tcp.LineTcpConnectionContext] */
    @Override // io.questdb.network.IOContext, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.fd = -1L;
        Unsafe.free(this.recvBufStart, this.recvBufEnd - this.recvBufStart);
        ?? r3 = 0;
        this.recvBufPos = 0L;
        this.recvBufEnd = 0L;
        r3.recvBufStart = this;
        this.protoParser.close();
        this.charSink.close();
    }

    @Override // io.questdb.network.IOContext
    public long getFd() {
        return this.fd;
    }

    @Override // io.questdb.network.IOContext
    public boolean invalid() {
        return this.fd == -1;
    }

    @Override // io.questdb.network.IOContext
    public IODispatcher<LineTcpConnectionContext> getDispatcher() {
        return this.dispatcher;
    }

    private boolean checkQueueFullLogHysteresis() {
        long ticks = this.milliClock.getTicks();
        if (ticks - this.lastQueueFullLogMillis < QUEUE_FULL_LOG_HYSTERESIS_IN_MS) {
            return false;
        }
        this.lastQueueFullLogMillis = ticks;
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final boolean compactBuffer(long j) {
        if (!$assertionsDisabled && j > this.recvBufPos) {
            throw new AssertionError();
        }
        if (j <= this.recvBufStart) {
            return false;
        }
        int i = (int) (this.recvBufPos - j);
        if (i > 0) {
            Vect.memcpy(j, this.recvBufStart, i);
        }
        this.recvBufPos = this.recvBufStart + i;
        return true;
    }

    private void doHandleDisconnectEvent() {
        if (this.protoParser.getBufferAddress() == this.recvBufEnd) {
            LOG.error().$('[').$(this.fd).$((CharSequence) "] buffer overflow [msgBufferSize=").$(this.recvBufEnd - this.recvBufStart).$(']').$();
        } else if (this.peerDisconnected) {
            if (this.recvBufPos != this.recvBufStart) {
                LOG.info().$('[').$(this.fd).$((CharSequence) "] peer disconnected with partial measurement, ").$(this.recvBufPos - this.recvBufStart).$((CharSequence) " unprocessed bytes").$();
            } else {
                LOG.info().$('[').$(this.fd).$((CharSequence) "] peer disconnected").$();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IOContextResult handleIO(LineTcpMeasurementScheduler.NetworkIOJob networkIOJob) {
        read();
        return parseMeasurements(networkIOJob);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final IOContextResult parseMeasurements(LineTcpMeasurementScheduler.NetworkIOJob networkIOJob) {
        while (true) {
            try {
                switch (this.goodMeasurement ? this.protoParser.parseMeasurement(this.recvBufPos) : this.protoParser.skipMeasurement(this.recvBufPos)) {
                    case MEASUREMENT_COMPLETE:
                        if (!this.goodMeasurement) {
                            LOG.error().$('[').$(this.fd).$((CharSequence) "] could not parse measurement, code ").$(this.protoParser.getErrorCode()).$((CharSequence) " at ").$((int) (this.protoParser.getBufferAddress() - this.recvBufStartOfMeasurement)).$((CharSequence) " line (may be mangled due to partial parsing) is ").$((CharSequence) this.byteCharSequence.of(this.recvBufStartOfMeasurement, this.protoParser.getBufferAddress())).$();
                            this.goodMeasurement = true;
                        } else if (this.scheduler.tryButCouldNotCommit(networkIOJob, this.protoParser, this.charSink)) {
                            if (checkQueueFullLogHysteresis()) {
                                LOG.debug().$('[').$(this.fd).$((CharSequence) "] queue full").$();
                            }
                            return IOContextResult.QUEUE_FULL;
                        }
                        this.protoParser.startNextMeasurement();
                        this.recvBufStartOfMeasurement = this.protoParser.getBufferAddress();
                        if (this.recvBufStartOfMeasurement == this.recvBufPos) {
                            this.recvBufPos = this.recvBufStart;
                            this.protoParser.of(this.recvBufStart);
                        }
                        break;
                    case ERROR:
                        this.goodMeasurement = false;
                        break;
                    case BUFFER_UNDERFLOW:
                        if (this.recvBufPos == this.recvBufEnd) {
                            if (!compactBuffer(this.recvBufStartOfMeasurement)) {
                                doHandleDisconnectEvent();
                                return IOContextResult.NEEDS_DISCONNECT;
                            }
                            resetParser();
                        }
                        if (!read()) {
                            return this.peerDisconnected ? IOContextResult.NEEDS_DISCONNECT : IOContextResult.NEEDS_READ;
                        }
                        break;
                }
            } catch (RuntimeException e) {
                LOG.error().$('[').$(this.fd).$((CharSequence) "] could not process line data").$((Throwable) e).$();
                return IOContextResult.NEEDS_DISCONNECT;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LineTcpConnectionContext of(long j, IODispatcher<LineTcpConnectionContext> iODispatcher) {
        this.fd = j;
        this.dispatcher = iODispatcher;
        clear();
        return this;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final boolean read() {
        int i = (int) (this.recvBufEnd - this.recvBufPos);
        if (i <= 0 || this.peerDisconnected) {
            return !this.peerDisconnected;
        }
        int recv = this.nf.recv(this.fd, this.recvBufPos, i);
        if (recv > 0) {
            this.recvBufPos += recv;
            i -= recv;
        } else {
            this.peerDisconnected = recv < 0;
        }
        return i < i;
    }

    static {
        $assertionsDisabled = !LineTcpConnectionContext.class.desiredAssertionStatus();
        LOG = LogFactory.getLog(LineTcpConnectionContext.class);
    }
}
