package org.apache.rocketmq.proxy.remoting.activity;

import io.netty.channel.ChannelHandlerContext;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.utils.ProxyUtils;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.body.Connection;
import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseBody;
import org.apache.rocketmq.remoting.protocol.header.GetConsumerListByGroupResponseHeader;

/* loaded from: input_file:org/apache/rocketmq/proxy/remoting/activity/ConsumerManagerActivity.class */
public class ConsumerManagerActivity extends AbstractRemotingActivity {
    public ConsumerManagerActivity(RequestPipeline requestPipeline, MessagingProcessor messagingProcessor) {
        super(requestPipeline, messagingProcessor);
    }

    @Override // org.apache.rocketmq.proxy.remoting.activity.AbstractRemotingActivity
    protected RemotingCommand processRequest0(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand, ProxyContext proxyContext) throws Exception {
        switch (remotingCommand.getCode()) {
            case 14:
            case 15:
            case 29:
            case 30:
            case 31:
            case ProxyUtils.MAX_MSG_NUMS_FOR_POP_REQUEST /* 32 */:
                return request(channelHandlerContext, remotingCommand, proxyContext, Duration.ofSeconds(3L).toMillis());
            case 38:
                return getConsumerListByGroup(channelHandlerContext, remotingCommand, proxyContext);
            case 41:
                return lockBatchMQ(channelHandlerContext, remotingCommand, proxyContext);
            case 42:
                return unlockBatchMQ(channelHandlerContext, remotingCommand, proxyContext);
            case 203:
                return getConsumerConnectionList(channelHandlerContext, remotingCommand, proxyContext);
            default:
                return null;
        }
    }

    protected RemotingCommand getConsumerListByGroup(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand, ProxyContext proxyContext) throws Exception {
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
        List allClientId = this.messagingProcessor.getConsumerGroupInfo(proxyContext, remotingCommand.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class).getConsumerGroup()).getAllClientId();
        GetConsumerListByGroupResponseBody getConsumerListByGroupResponseBody = new GetConsumerListByGroupResponseBody();
        getConsumerListByGroupResponseBody.setConsumerIdList(allClientId);
        createResponseCommand.setBody(getConsumerListByGroupResponseBody.encode());
        createResponseCommand.setCode(0);
        return createResponseCommand;
    }

    protected RemotingCommand getConsumerConnectionList(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand, ProxyContext proxyContext) throws Exception {
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand(GetConsumerConnectionListRequestHeader.class);
        GetConsumerConnectionListRequestHeader decodeCommandCustomHeader = remotingCommand.decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class);
        ConsumerGroupInfo consumerGroupInfo = this.messagingProcessor.getConsumerGroupInfo(proxyContext, decodeCommandCustomHeader.getConsumerGroup());
        if (consumerGroupInfo == null) {
            createResponseCommand.setCode(206);
            createResponseCommand.setRemark("the consumer group[" + decodeCommandCustomHeader.getConsumerGroup() + "] not online");
            return createResponseCommand;
        }
        ConsumerConnection consumerConnection = new ConsumerConnection();
        consumerConnection.setConsumeFromWhere(consumerGroupInfo.getConsumeFromWhere());
        consumerConnection.setConsumeType(consumerGroupInfo.getConsumeType());
        consumerConnection.setMessageModel(consumerGroupInfo.getMessageModel());
        consumerConnection.getSubscriptionTable().putAll(consumerGroupInfo.getSubscriptionTable());
        Iterator it = consumerGroupInfo.getChannelInfoTable().entrySet().iterator();
        while (it.hasNext()) {
            ClientChannelInfo clientChannelInfo = (ClientChannelInfo) ((Map.Entry) it.next()).getValue();
            Connection connection = new Connection();
            connection.setClientId(clientChannelInfo.getClientId());
            connection.setLanguage(clientChannelInfo.getLanguage());
            connection.setVersion(clientChannelInfo.getVersion());
            connection.setClientAddr(RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()));
            consumerConnection.getConnectionSet().add(connection);
        }
        createResponseCommand.setBody(consumerConnection.encode());
        createResponseCommand.setCode(0);
        createResponseCommand.setRemark((String) null);
        return createResponseCommand;
    }

    protected RemotingCommand lockBatchMQ(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand, ProxyContext proxyContext) throws Exception {
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand((Class) null);
        LockBatchRequestBody lockBatchRequestBody = (LockBatchRequestBody) LockBatchRequestBody.decode(remotingCommand.getBody(), LockBatchRequestBody.class);
        Set mqSet = lockBatchRequestBody.getMqSet();
        if (mqSet.isEmpty()) {
            createResponseCommand.setBody(lockBatchRequestBody.encode());
            createResponseCommand.setRemark("MessageQueue set is empty");
            return createResponseCommand;
        }
        this.messagingProcessor.request(proxyContext, ((MessageQueue) new ArrayList(mqSet).get(0)).getBrokerName(), remotingCommand, Duration.ofSeconds(3L).toMillis()).thenAccept(remotingCommand2 -> {
            writeResponse(channelHandlerContext, proxyContext, remotingCommand, remotingCommand2);
        }).exceptionally(th -> {
            writeErrResponse(channelHandlerContext, proxyContext, remotingCommand, th);
            return null;
        });
        return null;
    }

    protected RemotingCommand unlockBatchMQ(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand, ProxyContext proxyContext) throws Exception {
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand((Class) null);
        UnlockBatchRequestBody unlockBatchRequestBody = (UnlockBatchRequestBody) UnlockBatchRequestBody.decode(remotingCommand.getBody(), UnlockBatchRequestBody.class);
        Set mqSet = unlockBatchRequestBody.getMqSet();
        if (mqSet.isEmpty()) {
            createResponseCommand.setBody(unlockBatchRequestBody.encode());
            createResponseCommand.setRemark("MessageQueue set is empty");
            return createResponseCommand;
        }
        this.messagingProcessor.request(proxyContext, ((MessageQueue) new ArrayList(mqSet).get(0)).getBrokerName(), remotingCommand, Duration.ofSeconds(3L).toMillis()).thenAccept(remotingCommand2 -> {
            writeResponse(channelHandlerContext, proxyContext, remotingCommand, remotingCommand2);
        }).exceptionally(th -> {
            writeErrResponse(channelHandlerContext, proxyContext, remotingCommand, th);
            return null;
        });
        return null;
    }
}
