/*
 * Decompiled with CFR 0.152.
 */
package com.theokanning.openai.service.assistant_stream;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.theokanning.openai.OpenAiError;
import com.theokanning.openai.OpenAiHttpException;
import com.theokanning.openai.assistants.StreamEvent;
import com.theokanning.openai.assistants.message.Message;
import com.theokanning.openai.assistants.message.content.MessageDelta;
import com.theokanning.openai.assistants.run.Run;
import com.theokanning.openai.assistants.run.ToolCall;
import com.theokanning.openai.assistants.run.ToolCallFunction;
import com.theokanning.openai.assistants.run_step.RunStep;
import com.theokanning.openai.assistants.run_step.RunStepDelta;
import com.theokanning.openai.service.assistant_stream.AssistantEventHandler;
import com.theokanning.openai.service.assistant_stream.AssistantSSE;
import com.theokanning.openai.service.assistant_stream.DeltaUtil;
import com.theokanning.openai.utils.JsonUtil;
import io.reactivex.Flowable;
import io.reactivex.disposables.Disposable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AssistantStreamManager {
    private static final Logger log = LoggerFactory.getLogger(AssistantStreamManager.class);
    private final AssistantEventHandler eventHandler;
    private final List<MessageDelta> msgDeltas;
    private final List<RunStepDelta> runStepDeltas;
    private final List<AssistantSSE> eventMsgsHolder;
    private final ObjectMapper mapper = JsonUtil.getInstance();
    private MessageDelta accumulatedMessageDelta;
    private RunStepDelta accumulatedRsd;
    private Run currentRun;
    private Message currentMessage;
    private RunStep currentRunStep;
    private volatile boolean completed;
    private final Flowable<AssistantSSE> stream;
    private Disposable disposable;

    private AssistantStreamManager(Flowable<AssistantSSE> stream, AssistantEventHandler eventHandler) {
        this.eventHandler = eventHandler;
        this.msgDeltas = Collections.synchronizedList(new ArrayList());
        this.runStepDeltas = Collections.synchronizedList(new ArrayList());
        this.eventMsgsHolder = Collections.synchronizedList(new ArrayList());
        this.stream = stream;
    }

    private AssistantStreamManager(Flowable<AssistantSSE> stream) {
        this(stream, new AssistantEventHandler(){});
    }

    public static AssistantStreamManager start(Flowable<AssistantSSE> stream, AssistantEventHandler eventHandler) {
        AssistantStreamManager manager = new AssistantStreamManager(stream, eventHandler);
        manager.start();
        return manager;
    }

    public static AssistantStreamManager start(Flowable<AssistantSSE> stream) {
        AssistantStreamManager manager = new AssistantStreamManager(stream);
        manager.start();
        return manager;
    }

    public static AssistantStreamManager syncStart(Flowable<AssistantSSE> stream, AssistantEventHandler eventHandler) {
        AssistantStreamManager manager = new AssistantStreamManager(stream, eventHandler);
        manager.syncStart();
        return manager;
    }

    public static AssistantStreamManager syncStart(Flowable<AssistantSSE> stream) {
        AssistantStreamManager manager = new AssistantStreamManager(stream);
        manager.syncStart();
        return manager;
    }

    public Optional<MessageDelta> getAccumulatedMsg() {
        return Optional.ofNullable(this.accumulatedMessageDelta);
    }

    public Optional<RunStepDelta> getAccumulatedRsd() {
        return Optional.ofNullable(this.accumulatedRsd);
    }

    public Optional<Run> getCurrentRun() {
        return Optional.ofNullable(this.currentRun);
    }

    public Optional<Message> getCurrentMessage() {
        return Optional.ofNullable(this.currentMessage);
    }

    public Optional<RunStep> getCurrentRunStep() {
        return Optional.ofNullable(this.currentRunStep);
    }

    private void start() {
        this.disposable = this.stream.subscribe(this::handleEvent, this.eventHandler::onError, () -> {
            this.completed = true;
        });
    }

    public void shutDown() {
        if (this.disposable != null) {
            this.disposable.dispose();
        }
    }

    private void syncStart() {
        this.stream.blockingSubscribe(this::handleEvent, this.eventHandler::onError, () -> {
            this.completed = true;
        });
    }

    private void handleEvent(AssistantSSE sse) {
        StreamEvent eventType = sse.getEvent();
        this.eventMsgsHolder.add(sse);
        this.eventHandler.onEvent(sse);
        switch (eventType) {
            case THREAD_RUN_CREATED: {
                this.updateCurrentRun(sse);
                this.eventHandler.onRunCreated(this.currentRun);
                break;
            }
            case THREAD_RUN_QUEUED: {
                this.updateCurrentRun(sse);
                this.eventHandler.onRunQueued(this.currentRun);
                break;
            }
            case THREAD_RUN_IN_PROGRESS: {
                this.updateCurrentRun(sse);
                this.eventHandler.onRunInProgress(this.currentRun);
                break;
            }
            case THREAD_RUN_REQUIRES_ACTION: {
                this.updateCurrentRun(sse);
                this.translationRunStepDelta();
                this.eventHandler.onRunRequiresAction(this.currentRun);
                break;
            }
            case THREAD_RUN_COMPLETED: {
                this.updateCurrentRun(sse);
                this.eventHandler.onRunCompleted(this.currentRun);
                break;
            }
            case THREAD_RUN_FAILED: {
                this.updateCurrentRun(sse);
                log.warn("run:{} failed at:{}", (Object)this.currentRun.getId(), (Object)this.currentRun.getFailedAt());
                this.eventHandler.onRunFailed(this.currentRun);
                break;
            }
            case THREAD_RUN_CANCELLING: {
                this.updateCurrentRun(sse);
                this.eventHandler.onRunCancelling(this.currentRun);
                break;
            }
            case THREAD_RUN_CANCELLED: {
                this.updateCurrentRun(sse);
                this.eventHandler.onRunCancelled(this.currentRun);
                break;
            }
            case THREAD_RUN_EXPIRED: {
                this.updateCurrentRun(sse);
                log.warn("run:{} expired at:{}", (Object)this.currentRun.getId(), (Object)this.currentRun.getExpiresAt());
                this.eventHandler.onRunExpired(this.currentRun);
                break;
            }
            case THREAD_RUN_STEP_CREATED: {
                this.updateCurrentRunStep(sse);
                this.eventHandler.onRunStepCreated(this.currentRunStep);
                break;
            }
            case THREAD_RUN_STEP_IN_PROGRESS: {
                this.updateCurrentRunStep(sse);
                this.eventHandler.onRunStepInProgress(this.currentRunStep);
                break;
            }
            case THREAD_RUN_STEP_DELTA: {
                this.accumulateRunStepDeltaAndSave(sse);
                this.eventHandler.onRunStepDelta(this.runStepDeltas.get(this.runStepDeltas.size() - 1));
                break;
            }
            case THREAD_RUN_STEP_COMPLETED: {
                this.updateCurrentRunStep(sse);
                this.eventHandler.onRunStepCompleted(this.currentRunStep);
                break;
            }
            case THREAD_RUN_STEP_FAILED: {
                this.updateCurrentRunStep(sse);
                log.warn("runid:{} ,RunStepId:{} failed at:{}", new Object[]{this.currentRun.getId(), this.currentRunStep.getId(), this.currentRunStep.getFailedAt()});
                this.eventHandler.onRunStepFailed(this.currentRunStep);
                break;
            }
            case THREAD_RUN_STEP_CANCELLED: {
                this.updateCurrentRunStep(sse);
                this.eventHandler.onRunStepCancelled(this.currentRunStep);
                break;
            }
            case THREAD_RUN_STEP_EXPIRED: {
                this.updateCurrentRunStep(sse);
                log.warn("runid:{} ,RunStepId:{} expired at: {}", new Object[]{this.currentRun.getId(), this.currentRunStep.getId(), this.currentRunStep.getExpiredAt()});
                this.eventHandler.onRunStepExpired(this.currentRunStep);
                break;
            }
            case THREAD_MESSAGE_CREATED: {
                this.updateCurrentMessage(sse);
                this.eventHandler.onMessageCreated(this.currentMessage);
                break;
            }
            case THREAD_MESSAGE_IN_PROGRESS: {
                this.updateCurrentMessage(sse);
                this.eventHandler.onMessageInProgress(this.currentMessage);
                break;
            }
            case THREAD_MESSAGE_DELTA: {
                this.accumulateMessageDeltaAndSave(sse);
                this.eventHandler.onMessageDelta(this.msgDeltas.get(this.msgDeltas.size() - 1));
                break;
            }
            case THREAD_MESSAGE_COMPLETED: {
                this.updateCurrentMessage(sse);
                this.eventHandler.onMessageCompleted(this.currentMessage);
                break;
            }
            case THREAD_MESSAGE_INCOMPLETE: {
                this.updateCurrentMessage(sse);
                log.warn("Message:{} incomplete", (Object)this.currentMessage.getId());
                this.eventHandler.onMessageInComplete(this.currentMessage);
                break;
            }
            case DONE: {
                this.completed = true;
                this.eventHandler.onEnd();
                break;
            }
            case ERROR: {
                log.error("Stream error,the final message is:{},Run is {} ", (Object)this.currentMessage, (Object)this.currentRun);
                this.completed = true;
                this.eventHandler.onError((Throwable)new OpenAiHttpException((OpenAiError)sse.getPojo(), null, 200));
            }
        }
    }

    public void waitForCompletion() {
        if (this.disposable != null && this.disposable.isDisposed()) {
            return;
        }
        while (!this.completed) {
            try {
                Thread.sleep(200L);
            }
            catch (InterruptedException e) {
                log.error("InterruptedException", (Throwable)e);
                this.shutDown();
            }
        }
    }

    public Optional<StreamEvent> getCurrentEvent() {
        return Optional.ofNullable(this.eventMsgsHolder.isEmpty() ? null : this.eventMsgsHolder.get(this.eventMsgsHolder.size() - 1).getEvent());
    }

    public List<AssistantSSE> getEventMsgsHolder() {
        return new ArrayList<AssistantSSE>(this.eventMsgsHolder);
    }

    public List<MessageDelta> getMsgDeltas() {
        return new ArrayList<MessageDelta>(this.msgDeltas);
    }

    public List<RunStepDelta> getRunStepDeltas() {
        return new ArrayList<RunStepDelta>(this.runStepDeltas);
    }

    private void translationRunStepDelta() {
        for (ToolCall toolCall : this.accumulatedRsd.getDelta().getStepDetails().getToolCalls()) {
            ToolCallFunction function = toolCall.getFunction();
            try {
                function.setArguments(this.mapper.readTree(function.getArguments().asText()));
            }
            catch (JsonProcessingException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private void updateCurrentRunStep(AssistantSSE sse) {
        if (!sse.getEvent().dataClass.equals(RunStep.class)) {
            throw new IllegalArgumentException("Event data is not a RunStep,raw data is: " + sse.getData() + "event is:" + sse.getEvent().name());
        }
        this.currentRunStep = (RunStep)sse.getPojo();
    }

    private void updateCurrentRun(AssistantSSE sse) {
        if (!sse.getEvent().dataClass.equals(Run.class)) {
            throw new IllegalArgumentException("Event data is not a Run,raw data is: " + sse.getData() + "event is:" + sse.getEvent().name());
        }
        this.currentRun = (Run)sse.getPojo();
    }

    private void updateCurrentMessage(AssistantSSE sse) {
        if (!sse.getEvent().dataClass.equals(Message.class)) {
            throw new IllegalArgumentException("Event data is not a Message,raw data is: " + sse.getData() + "event is:" + sse.getEvent().name());
        }
        this.currentMessage = (Message)sse.getPojo();
    }

    private void accumulateRunStepDeltaAndSave(AssistantSSE sse) {
        if (!sse.getEvent().dataClass.equals(RunStepDelta.class)) {
            throw new IllegalArgumentException("Event data is not a RunStepDelta,raw data is: " + sse.getData() + "event is:" + sse.getEvent().name());
        }
        RunStepDelta currentRenStepDelta = (RunStepDelta)sse.getPojo();
        this.runStepDeltas.add(currentRenStepDelta);
        this.accumulatedRsd = DeltaUtil.accumulatRunStepDelta(this.accumulatedRsd, currentRenStepDelta);
    }

    private void accumulateMessageDeltaAndSave(AssistantSSE sse) {
        if (!sse.getEvent().dataClass.equals(MessageDelta.class)) {
            throw new IllegalArgumentException("Event data is not a MessageDelta,raw data is: " + sse.getData() + "event is:" + sse.getEvent().name());
        }
        MessageDelta msgDelta = (MessageDelta)sse.getPojo();
        this.msgDeltas.add(msgDelta);
        this.accumulatedMessageDelta = DeltaUtil.accumulatMessageDelta(this.accumulatedMessageDelta, msgDelta);
        if (this.accumulatedMessageDelta.getDelta().getRole() == null || this.accumulatedMessageDelta.getDelta().getRole().isEmpty()) {
            this.getCurrentMessage().ifPresent(message -> this.accumulatedMessageDelta.getDelta().setRole(message.getRole()));
        }
    }

    public boolean isCompleted() {
        return this.completed;
    }
}

