package com.alibaba.schedulerx.worker.pull;

import akka.actor.ActorSelection;
import akka.actor.Address;
import com.alibaba.schedulerx.common.constants.CommonConstants;
import com.alibaba.schedulerx.protocol.Worker;
import com.alibaba.schedulerx.protocol.utils.FutureUtils;
import com.alibaba.schedulerx.worker.SchedulerxWorker;
import com.alibaba.schedulerx.worker.log.LogFactory;
import com.alibaba.schedulerx.worker.log.Logger;
import java.util.List;
import java.util.concurrent.TimeoutException;

/* loaded from: input_file:com/alibaba/schedulerx/worker/pull/PullThread.class */
public class PullThread extends Thread {
    private final long jobInstanceId;
    private final int pageSize;
    private final BlockingContainerQueue queue;
    private final ActorSelection masterActorSelection;
    private volatile boolean running;
    private final String workerIdAddr;
    private static final Logger LOGGER = LogFactory.getLogger(PullThread.class);

    public PullThread(long j, int i, String str, BlockingContainerQueue blockingContainerQueue) {
        super("Schedulerx-PullThread-" + j);
        this.running = true;
        this.jobInstanceId = j;
        this.pageSize = i;
        this.queue = blockingContainerQueue;
        this.masterActorSelection = SchedulerxWorker.actorSystem.actorSelection(str);
        Address defaultAddress = SchedulerxWorker.actorSystem.provider().getDefaultAddress();
        this.workerIdAddr = defaultAddress.system() + "@" + defaultAddress.host().get() + CommonConstants.ADDRESS_SEPARATOR + defaultAddress.port().get();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (this.running) {
            try {
                Worker.PullTaskFromMasterResponse pullTaskFromMasterResponse = (Worker.PullTaskFromMasterResponse) FutureUtils.awaitResult(this.masterActorSelection, Worker.PullTaskFromMasterRequest.newBuilder().setJobInstanceId(this.jobInstanceId).setPageSize(this.pageSize).setWorkerIdAddr(this.workerIdAddr).build(), 30L);
                if (pullTaskFromMasterResponse.getSuccess()) {
                    List<Worker.MasterStartContainerRequest> requestList = pullTaskFromMasterResponse.getRequestList();
                    if (requestList == null || requestList.isEmpty()) {
                        Thread.sleep(3000L);
                    } else {
                        for (Worker.MasterStartContainerRequest masterStartContainerRequest : requestList) {
                            boolean z = false;
                            while (this.running && !z) {
                                z = this.queue.put(masterStartContainerRequest);
                            }
                        }
                    }
                } else {
                    LOGGER.error("pull container error, " + pullTaskFromMasterResponse.getMessage());
                    PullManager.INSTANCE.stop(this.jobInstanceId);
                }
            } catch (TimeoutException e) {
                LOGGER.error("pull task timeout, stop PullManager");
                PullManager.INSTANCE.crash(this.jobInstanceId);
            } catch (Throwable th) {
                LOGGER.error("", th);
            }
        }
    }

    public void stopRunning() {
        this.running = false;
    }
}
