package org.apache.rocketmq.proxy.grpc.v2;

import apache.rocketmq.v2.AckMessageRequest;
import apache.rocketmq.v2.AckMessageResponse;
import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.EndTransactionRequest;
import apache.rocketmq.v2.EndTransactionResponse;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.HeartbeatResponse;
import apache.rocketmq.v2.MessagingServiceGrpc;
import apache.rocketmq.v2.NotifyClientTerminationRequest;
import apache.rocketmq.v2.NotifyClientTerminationResponse;
import apache.rocketmq.v2.QueryAssignmentRequest;
import apache.rocketmq.v2.QueryAssignmentResponse;
import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.QueryRouteResponse;
import apache.rocketmq.v2.RecallMessageRequest;
import apache.rocketmq.v2.RecallMessageResponse;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.SendMessageResponse;
import apache.rocketmq.v2.Status;
import apache.rocketmq.v2.TelemetryCommand;
import com.google.protobuf.GeneratedMessageV3;
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.auth.config.AuthConfig;
import org.apache.rocketmq.common.constant.GrpcConstants;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.grpc.pipeline.AuthenticationPipeline;
import org.apache.rocketmq.proxy.grpc.pipeline.AuthorizationPipeline;
import org.apache.rocketmq.proxy.grpc.pipeline.ContextInitPipeline;
import org.apache.rocketmq.proxy.grpc.pipeline.RequestPipeline;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcProxyException;
import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder;
import org.apache.rocketmq.proxy.grpc.v2.common.ResponseWriter;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;

/* loaded from: input_file:org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.class */
public class GrpcMessagingApplication extends MessagingServiceGrpc.MessagingServiceImplBase implements StartAndShutdown {
    private static final Logger log = LoggerFactory.getLogger("RocketmqProxy");
    private final GrpcMessingActivity grpcMessingActivity;
    protected final RequestPipeline requestPipeline;
    protected ThreadPoolExecutor routeThreadPoolExecutor;
    protected ThreadPoolExecutor producerThreadPoolExecutor;
    protected ThreadPoolExecutor consumerThreadPoolExecutor;
    protected ThreadPoolExecutor clientManagerThreadPoolExecutor;
    protected ThreadPoolExecutor transactionThreadPoolExecutor;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication$GrpcTask.class */
    public static class GrpcTask<V, T> implements Runnable {
        protected final Runnable runnable;
        protected final ProxyContext context;
        protected final V request;
        protected final T executeRejectResponse;
        protected final StreamObserver<T> streamObserver;

        public GrpcTask(Runnable runnable, ProxyContext proxyContext, V v, StreamObserver<T> streamObserver, T t) {
            this.runnable = runnable;
            this.context = proxyContext;
            this.streamObserver = streamObserver;
            this.request = v;
            this.executeRejectResponse = t;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.runnable.run();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication$GrpcTaskRejectedExecutionHandler.class */
    public class GrpcTaskRejectedExecutionHandler implements RejectedExecutionHandler {
        public GrpcTaskRejectedExecutionHandler() {
        }

        @Override // java.util.concurrent.RejectedExecutionHandler
        public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
            if (runnable instanceof GrpcTask) {
                try {
                    GrpcTask grpcTask = (GrpcTask) runnable;
                    GrpcMessagingApplication.this.writeResponse(grpcTask.context, grpcTask.request, grpcTask.executeRejectResponse, grpcTask.streamObserver, null, null);
                } catch (Throwable th) {
                    GrpcMessagingApplication.log.warn("write rejected error response failed", th);
                }
            }
        }
    }

    protected GrpcMessagingApplication(GrpcMessingActivity grpcMessingActivity, RequestPipeline requestPipeline) {
        this.grpcMessingActivity = grpcMessingActivity;
        this.requestPipeline = requestPipeline;
        ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
        this.routeThreadPoolExecutor = ThreadPoolMonitor.createAndMonitor(proxyConfig.getGrpcRouteThreadPoolNums(), proxyConfig.getGrpcRouteThreadPoolNums(), 1L, TimeUnit.MINUTES, "GrpcRouteThreadPool", proxyConfig.getGrpcRouteThreadQueueCapacity());
        this.producerThreadPoolExecutor = ThreadPoolMonitor.createAndMonitor(proxyConfig.getGrpcProducerThreadPoolNums(), proxyConfig.getGrpcProducerThreadPoolNums(), 1L, TimeUnit.MINUTES, "GrpcProducerThreadPool", proxyConfig.getGrpcProducerThreadQueueCapacity());
        this.consumerThreadPoolExecutor = ThreadPoolMonitor.createAndMonitor(proxyConfig.getGrpcConsumerThreadPoolNums(), proxyConfig.getGrpcConsumerThreadPoolNums(), 1L, TimeUnit.MINUTES, "GrpcConsumerThreadPool", proxyConfig.getGrpcConsumerThreadQueueCapacity());
        this.clientManagerThreadPoolExecutor = ThreadPoolMonitor.createAndMonitor(proxyConfig.getGrpcClientManagerThreadPoolNums(), proxyConfig.getGrpcClientManagerThreadPoolNums(), 1L, TimeUnit.MINUTES, "GrpcClientManagerThreadPool", proxyConfig.getGrpcClientManagerThreadQueueCapacity());
        this.transactionThreadPoolExecutor = ThreadPoolMonitor.createAndMonitor(proxyConfig.getGrpcTransactionThreadPoolNums(), proxyConfig.getGrpcTransactionThreadPoolNums(), 1L, TimeUnit.MINUTES, "GrpcTransactionThreadPool", proxyConfig.getGrpcTransactionThreadQueueCapacity());
        init();
    }

    protected void init() {
        GrpcTaskRejectedExecutionHandler grpcTaskRejectedExecutionHandler = new GrpcTaskRejectedExecutionHandler();
        this.routeThreadPoolExecutor.setRejectedExecutionHandler(grpcTaskRejectedExecutionHandler);
        this.routeThreadPoolExecutor.setRejectedExecutionHandler(grpcTaskRejectedExecutionHandler);
        this.producerThreadPoolExecutor.setRejectedExecutionHandler(grpcTaskRejectedExecutionHandler);
        this.consumerThreadPoolExecutor.setRejectedExecutionHandler(grpcTaskRejectedExecutionHandler);
        this.clientManagerThreadPoolExecutor.setRejectedExecutionHandler(grpcTaskRejectedExecutionHandler);
        this.transactionThreadPoolExecutor.setRejectedExecutionHandler(grpcTaskRejectedExecutionHandler);
    }

    public static GrpcMessagingApplication create(MessagingProcessor messagingProcessor) {
        RequestPipeline requestPipeline = (proxyContext, metadata, generatedMessageV3) -> {
        };
        AuthConfig authConfig = ConfigurationManager.getAuthConfig();
        if (authConfig != null) {
            requestPipeline = requestPipeline.pipe(new AuthorizationPipeline(authConfig, messagingProcessor)).pipe(new AuthenticationPipeline(authConfig, messagingProcessor));
        }
        return new GrpcMessagingApplication(new DefaultGrpcMessingActivity(messagingProcessor), requestPipeline.pipe(new ContextInitPipeline()));
    }

    protected Status flowLimitStatus() {
        return ResponseBuilder.getInstance().buildStatus(Code.TOO_MANY_REQUESTS, "flow limit");
    }

    protected Status convertExceptionToStatus(Throwable th) {
        return ResponseBuilder.getInstance().buildStatus(th);
    }

    protected <V, T> void addExecutor(ExecutorService executorService, ProxyContext proxyContext, V v, Runnable runnable, StreamObserver<T> streamObserver, Function<Status, T> function) {
        if (v instanceof GeneratedMessageV3) {
            this.requestPipeline.execute(proxyContext, (Metadata) GrpcConstants.METADATA.get(Context.current()), (GeneratedMessageV3) v);
            validateContext(proxyContext);
        } else {
            log.error("[BUG]grpc request pipe is not been executed");
        }
        executorService.submit(new GrpcTask(runnable, proxyContext, v, streamObserver, function.apply(flowLimitStatus())));
    }

    protected <V, T> void writeResponse(ProxyContext proxyContext, V v, T t, StreamObserver<T> streamObserver, Throwable th, Function<Status, T> function) {
        if (th != null) {
            ResponseWriter.getInstance().write(streamObserver, function.apply(convertExceptionToStatus(th)));
        } else {
            ResponseWriter.getInstance().write(streamObserver, t);
        }
    }

    protected ProxyContext createContext() {
        return ProxyContext.create();
    }

    protected void validateContext(ProxyContext proxyContext) {
        if (StringUtils.isBlank(proxyContext.getClientID())) {
            throw new GrpcProxyException(Code.CLIENT_ID_REQUIRED, "client id cannot be empty");
        }
    }

    public void queryRoute(QueryRouteRequest queryRouteRequest, StreamObserver<QueryRouteResponse> streamObserver) {
        Function function = status -> {
            return QueryRouteResponse.newBuilder().setStatus(status).build();
        };
        ProxyContext createContext = createContext();
        try {
            addExecutor(this.routeThreadPoolExecutor, createContext, queryRouteRequest, () -> {
                this.grpcMessingActivity.queryRoute(createContext, queryRouteRequest).whenComplete((queryRouteResponse, th) -> {
                    writeResponse(createContext, queryRouteRequest, queryRouteResponse, streamObserver, th, function);
                });
            }, streamObserver, function);
        } catch (Throwable th) {
            writeResponse(createContext, queryRouteRequest, null, streamObserver, th, function);
        }
    }

    public void heartbeat(HeartbeatRequest heartbeatRequest, StreamObserver<HeartbeatResponse> streamObserver) {
        Function function = status -> {
            return HeartbeatResponse.newBuilder().setStatus(status).build();
        };
        ProxyContext createContext = createContext();
        try {
            addExecutor(this.clientManagerThreadPoolExecutor, createContext, heartbeatRequest, () -> {
                this.grpcMessingActivity.heartbeat(createContext, heartbeatRequest).whenComplete((heartbeatResponse, th) -> {
                    writeResponse(createContext, heartbeatRequest, heartbeatResponse, streamObserver, th, function);
                });
            }, streamObserver, function);
        } catch (Throwable th) {
            writeResponse(createContext, heartbeatRequest, null, streamObserver, th, function);
        }
    }

    public void sendMessage(SendMessageRequest sendMessageRequest, StreamObserver<SendMessageResponse> streamObserver) {
        Function function = status -> {
            return SendMessageResponse.newBuilder().setStatus(status).build();
        };
        ProxyContext createContext = createContext();
        try {
            addExecutor(this.producerThreadPoolExecutor, createContext, sendMessageRequest, () -> {
                this.grpcMessingActivity.sendMessage(createContext, sendMessageRequest).whenComplete((sendMessageResponse, th) -> {
                    writeResponse(createContext, sendMessageRequest, sendMessageResponse, streamObserver, th, function);
                });
            }, streamObserver, function);
        } catch (Throwable th) {
            writeResponse(createContext, sendMessageRequest, null, streamObserver, th, function);
        }
    }

    public void queryAssignment(QueryAssignmentRequest queryAssignmentRequest, StreamObserver<QueryAssignmentResponse> streamObserver) {
        Function function = status -> {
            return QueryAssignmentResponse.newBuilder().setStatus(status).build();
        };
        ProxyContext createContext = createContext();
        try {
            addExecutor(this.routeThreadPoolExecutor, createContext, queryAssignmentRequest, () -> {
                this.grpcMessingActivity.queryAssignment(createContext, queryAssignmentRequest).whenComplete((queryAssignmentResponse, th) -> {
                    writeResponse(createContext, queryAssignmentRequest, queryAssignmentResponse, streamObserver, th, function);
                });
            }, streamObserver, function);
        } catch (Throwable th) {
            writeResponse(createContext, queryAssignmentRequest, null, streamObserver, th, function);
        }
    }

    public void receiveMessage(ReceiveMessageRequest receiveMessageRequest, StreamObserver<ReceiveMessageResponse> streamObserver) {
        Function function = status -> {
            return ReceiveMessageResponse.newBuilder().setStatus(status).build();
        };
        ProxyContext createContext = createContext();
        try {
            addExecutor(this.consumerThreadPoolExecutor, createContext, receiveMessageRequest, () -> {
                this.grpcMessingActivity.receiveMessage(createContext, receiveMessageRequest, streamObserver);
            }, streamObserver, function);
        } catch (Throwable th) {
            writeResponse(createContext, receiveMessageRequest, null, streamObserver, th, function);
        }
    }

    public void ackMessage(AckMessageRequest ackMessageRequest, StreamObserver<AckMessageResponse> streamObserver) {
        Function function = status -> {
            return AckMessageResponse.newBuilder().setStatus(status).build();
        };
        ProxyContext createContext = createContext();
        try {
            addExecutor(this.consumerThreadPoolExecutor, createContext, ackMessageRequest, () -> {
                this.grpcMessingActivity.ackMessage(createContext, ackMessageRequest).whenComplete((ackMessageResponse, th) -> {
                    writeResponse(createContext, ackMessageRequest, ackMessageResponse, streamObserver, th, function);
                });
            }, streamObserver, function);
        } catch (Throwable th) {
            writeResponse(createContext, ackMessageRequest, null, streamObserver, th, function);
        }
    }

    public void forwardMessageToDeadLetterQueue(ForwardMessageToDeadLetterQueueRequest forwardMessageToDeadLetterQueueRequest, StreamObserver<ForwardMessageToDeadLetterQueueResponse> streamObserver) {
        Function function = status -> {
            return ForwardMessageToDeadLetterQueueResponse.newBuilder().setStatus(status).build();
        };
        ProxyContext createContext = createContext();
        try {
            addExecutor(this.producerThreadPoolExecutor, createContext, forwardMessageToDeadLetterQueueRequest, () -> {
                this.grpcMessingActivity.forwardMessageToDeadLetterQueue(createContext, forwardMessageToDeadLetterQueueRequest).whenComplete((forwardMessageToDeadLetterQueueResponse, th) -> {
                    writeResponse(createContext, forwardMessageToDeadLetterQueueRequest, forwardMessageToDeadLetterQueueResponse, streamObserver, th, function);
                });
            }, streamObserver, function);
        } catch (Throwable th) {
            writeResponse(createContext, forwardMessageToDeadLetterQueueRequest, null, streamObserver, th, function);
        }
    }

    public void endTransaction(EndTransactionRequest endTransactionRequest, StreamObserver<EndTransactionResponse> streamObserver) {
        Function function = status -> {
            return EndTransactionResponse.newBuilder().setStatus(status).build();
        };
        ProxyContext createContext = createContext();
        try {
            addExecutor(this.transactionThreadPoolExecutor, createContext, endTransactionRequest, () -> {
                this.grpcMessingActivity.endTransaction(createContext, endTransactionRequest).whenComplete((endTransactionResponse, th) -> {
                    writeResponse(createContext, endTransactionRequest, endTransactionResponse, streamObserver, th, function);
                });
            }, streamObserver, function);
        } catch (Throwable th) {
            writeResponse(createContext, endTransactionRequest, null, streamObserver, th, function);
        }
    }

    public void notifyClientTermination(NotifyClientTerminationRequest notifyClientTerminationRequest, StreamObserver<NotifyClientTerminationResponse> streamObserver) {
        Function function = status -> {
            return NotifyClientTerminationResponse.newBuilder().setStatus(status).build();
        };
        ProxyContext createContext = createContext();
        try {
            addExecutor(this.clientManagerThreadPoolExecutor, createContext, notifyClientTerminationRequest, () -> {
                this.grpcMessingActivity.notifyClientTermination(createContext, notifyClientTerminationRequest).whenComplete((notifyClientTerminationResponse, th) -> {
                    writeResponse(createContext, notifyClientTerminationRequest, notifyClientTerminationResponse, streamObserver, th, function);
                });
            }, streamObserver, function);
        } catch (Throwable th) {
            writeResponse(createContext, notifyClientTerminationRequest, null, streamObserver, th, function);
        }
    }

    public void changeInvisibleDuration(ChangeInvisibleDurationRequest changeInvisibleDurationRequest, StreamObserver<ChangeInvisibleDurationResponse> streamObserver) {
        Function function = status -> {
            return ChangeInvisibleDurationResponse.newBuilder().setStatus(status).build();
        };
        ProxyContext createContext = createContext();
        try {
            addExecutor(this.consumerThreadPoolExecutor, createContext, changeInvisibleDurationRequest, () -> {
                this.grpcMessingActivity.changeInvisibleDuration(createContext, changeInvisibleDurationRequest).whenComplete((changeInvisibleDurationResponse, th) -> {
                    writeResponse(createContext, changeInvisibleDurationRequest, changeInvisibleDurationResponse, streamObserver, th, function);
                });
            }, streamObserver, function);
        } catch (Throwable th) {
            writeResponse(createContext, changeInvisibleDurationRequest, null, streamObserver, th, function);
        }
    }

    public void recallMessage(RecallMessageRequest recallMessageRequest, StreamObserver<RecallMessageResponse> streamObserver) {
        Function function = status -> {
            return RecallMessageResponse.newBuilder().setStatus(status).build();
        };
        ProxyContext createContext = createContext();
        try {
            addExecutor(this.producerThreadPoolExecutor, createContext, recallMessageRequest, () -> {
                this.grpcMessingActivity.recallMessage(createContext, recallMessageRequest).whenComplete((recallMessageResponse, th) -> {
                    writeResponse(createContext, recallMessageRequest, recallMessageResponse, streamObserver, th, function);
                });
            }, streamObserver, function);
        } catch (Throwable th) {
            writeResponse(createContext, recallMessageRequest, null, streamObserver, th, function);
        }
    }

    public StreamObserver<TelemetryCommand> telemetry(final StreamObserver<TelemetryCommand> streamObserver) {
        final Function function = status -> {
            return TelemetryCommand.newBuilder().setStatus(status).build();
        };
        final ContextStreamObserver<TelemetryCommand> telemetry = this.grpcMessingActivity.telemetry(streamObserver);
        return new StreamObserver<TelemetryCommand>() { // from class: org.apache.rocketmq.proxy.grpc.v2.GrpcMessagingApplication.1
            public void onNext(TelemetryCommand telemetryCommand) {
                ProxyContext createContext = GrpcMessagingApplication.this.createContext();
                try {
                    GrpcMessagingApplication grpcMessagingApplication = GrpcMessagingApplication.this;
                    ThreadPoolExecutor threadPoolExecutor = GrpcMessagingApplication.this.clientManagerThreadPoolExecutor;
                    ContextStreamObserver contextStreamObserver = telemetry;
                    grpcMessagingApplication.addExecutor(threadPoolExecutor, createContext, telemetryCommand, () -> {
                        contextStreamObserver.onNext(createContext, telemetryCommand);
                    }, streamObserver, function);
                } catch (Throwable th) {
                    GrpcMessagingApplication.this.writeResponse(createContext, telemetryCommand, null, streamObserver, th, function);
                }
            }

            public void onError(Throwable th) {
                telemetry.onError(th);
            }

            public void onCompleted() {
                telemetry.onCompleted();
            }
        };
    }

    public void shutdown() throws Exception {
        this.grpcMessingActivity.shutdown();
        this.routeThreadPoolExecutor.shutdown();
        this.routeThreadPoolExecutor.shutdown();
        this.producerThreadPoolExecutor.shutdown();
        this.consumerThreadPoolExecutor.shutdown();
        this.clientManagerThreadPoolExecutor.shutdown();
        this.transactionThreadPoolExecutor.shutdown();
    }

    public void start() throws Exception {
        this.grpcMessingActivity.start();
    }
}
