/*
 * Decompiled with CFR 0.152.
 */
package com.alicloud.openservices.tablestore.tunnel.pipeline;

import com.alicloud.openservices.tablestore.tunnel.pipeline.AbstractStage;
import com.alicloud.openservices.tablestore.tunnel.pipeline.PipelineContext;
import com.alicloud.openservices.tablestore.tunnel.pipeline.Stage;
import com.alicloud.openservices.tablestore.tunnel.pipeline.StageException;
import com.alicloud.openservices.tablestore.tunnel.pipeline.ThreadPoolStageDecorator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Pipeline<INPUT, OUTPUT>
extends AbstractStage<INPUT, OUTPUT> {
    private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class);
    private final List<Stage<?, ?>> stages = new LinkedList();
    private final ExecutorService helperExecutor;

    public Pipeline(ExecutorService helperExecutor) {
        this.helperExecutor = helperExecutor;
    }

    @Override
    public void init(PipelineContext context) {
        Stage<Object, Object> preStage = this;
        for (Stage<?, ?> stage : this.stages) {
            preStage.setNextStage(stage);
            preStage = stage;
        }
        this.helperExecutor.submit(new PipelineInitTask(context, this.stages));
    }

    public <INPUT, OUTPUT> void addExecutorForStage(Stage<INPUT, OUTPUT> stage, ExecutorService executorService) {
        this.stages.add(new ThreadPoolStageDecorator<INPUT, OUTPUT>(stage, executorService));
    }

    @Override
    public void process(INPUT input) {
        if (!this.stages.isEmpty()) {
            Stage<?, ?> firstStage = this.stages.get(0);
            firstStage.process(input);
        }
    }

    @Override
    public OUTPUT doProcess(INPUT input) throws StageException {
        return null;
    }

    @Override
    public void shutdown() {
        this.shutdown(false);
    }

    public void shutdown(boolean isHalt) {
        for (Stage<?, ?> stage : this.stages) {
            stage.shutdown();
        }
        if (isHalt) {
            LOG.info("shutdown pipeline helper executor.");
            this.helperExecutor.shutdownNow();
        }
    }

    static class PipelineInitTask
    implements Runnable {
        final List<Stage<?, ?>> stages;
        final PipelineContext context;

        public PipelineInitTask(PipelineContext context, List<Stage<?, ?>> stages) {
            this.context = context;
            this.stages = stages;
        }

        @Override
        public void run() {
            try {
                for (Stage<?, ?> stage : this.stages) {
                    stage.init(this.context);
                }
            }
            catch (Exception e) {
                LOG.error("Pipeline Init Error", (Throwable)e);
            }
        }
    }
}

