package org.apache.rocketmq.proxy.remoting.activity;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import java.util.Iterator;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupEvent;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ProducerChangeListener;
import org.apache.rocketmq.broker.client.ProducerGroupEvent;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.remoting.channel.RemotingChannel;
import org.apache.rocketmq.proxy.remoting.channel.RemotingChannelManager;
import org.apache.rocketmq.proxy.remoting.pipeline.RequestPipeline;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.AttributeKeys;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.header.UnregisterClientRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.UnregisterClientResponseHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.remoting.protocol.heartbeat.ProducerData;

/* loaded from: input_file:org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity.class */
public class ClientManagerActivity extends AbstractRemotingActivity {
    private final RemotingChannelManager remotingChannelManager;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity$ConsumerIdsChangeListenerImpl.class */
    public class ConsumerIdsChangeListenerImpl implements ConsumerIdsChangeListener {
        protected ConsumerIdsChangeListenerImpl() {
        }

        public void handle(ConsumerGroupEvent consumerGroupEvent, String str, Object... objArr) {
            if (consumerGroupEvent != ConsumerGroupEvent.CLIENT_UNREGISTER || objArr == null || objArr.length < 1 || !(objArr[0] instanceof ClientChannelInfo)) {
                return;
            }
            ClientChannelInfo clientChannelInfo = (ClientChannelInfo) objArr[0];
            ClientManagerActivity.this.remotingChannelManager.removeConsumerChannel(ProxyContext.createForInner(getClass()), str, clientChannelInfo.getChannel());
            AbstractRemotingActivity.log.info("remove remoting channel when client unregister. clientChannelInfo:{}", clientChannelInfo);
        }

        public void shutdown() {
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/rocketmq/proxy/remoting/activity/ClientManagerActivity$ProducerChangeListenerImpl.class */
    public class ProducerChangeListenerImpl implements ProducerChangeListener {
        protected ProducerChangeListenerImpl() {
        }

        public void handle(ProducerGroupEvent producerGroupEvent, String str, ClientChannelInfo clientChannelInfo) {
            if (producerGroupEvent == ProducerGroupEvent.CLIENT_UNREGISTER) {
                ClientManagerActivity.this.remotingChannelManager.removeProducerChannel(ProxyContext.createForInner(getClass()), str, clientChannelInfo.getChannel());
            }
        }
    }

    public ClientManagerActivity(RequestPipeline requestPipeline, MessagingProcessor messagingProcessor, RemotingChannelManager remotingChannelManager) {
        super(requestPipeline, messagingProcessor);
        this.remotingChannelManager = remotingChannelManager;
        init();
    }

    protected void init() {
        this.messagingProcessor.registerConsumerListener(new ConsumerIdsChangeListenerImpl());
        this.messagingProcessor.registerProducerListener(new ProducerChangeListenerImpl());
    }

    @Override // org.apache.rocketmq.proxy.remoting.activity.AbstractRemotingActivity
    protected RemotingCommand processRequest0(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand, ProxyContext proxyContext) throws Exception {
        switch (remotingCommand.getCode()) {
            case 34:
                return heartBeat(channelHandlerContext, remotingCommand, proxyContext);
            case 35:
                return unregisterClient(channelHandlerContext, remotingCommand, proxyContext);
            case 46:
                return checkClientConfig(channelHandlerContext, remotingCommand, proxyContext);
            default:
                return null;
        }
    }

    protected RemotingCommand heartBeat(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand, ProxyContext proxyContext) {
        HeartbeatData heartbeatData = (HeartbeatData) HeartbeatData.decode(remotingCommand.getBody(), HeartbeatData.class);
        String clientID = heartbeatData.getClientID();
        for (ProducerData producerData : heartbeatData.getProducerDataSet()) {
            ClientChannelInfo clientChannelInfo = new ClientChannelInfo(this.remotingChannelManager.createProducerChannel(proxyContext, channelHandlerContext.channel(), producerData.getGroupName(), clientID), clientID, remotingCommand.getLanguage(), remotingCommand.getVersion());
            setClientPropertiesToChannelAttr(clientChannelInfo);
            this.messagingProcessor.registerProducer(proxyContext, producerData.getGroupName(), clientChannelInfo);
        }
        for (ConsumerData consumerData : heartbeatData.getConsumerDataSet()) {
            ClientChannelInfo clientChannelInfo2 = new ClientChannelInfo(this.remotingChannelManager.createConsumerChannel(proxyContext, channelHandlerContext.channel(), consumerData.getGroupName(), clientID, consumerData.getSubscriptionDataSet()), clientID, remotingCommand.getLanguage(), remotingCommand.getVersion());
            setClientPropertiesToChannelAttr(clientChannelInfo2);
            this.messagingProcessor.registerConsumer(proxyContext, consumerData.getGroupName(), clientChannelInfo2, consumerData.getConsumeType(), consumerData.getMessageModel(), consumerData.getConsumeFromWhere(), consumerData.getSubscriptionDataSet(), true);
        }
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand((Class) null);
        createResponseCommand.setCode(0);
        createResponseCommand.setRemark("");
        return createResponseCommand;
    }

    private void setClientPropertiesToChannelAttr(ClientChannelInfo clientChannelInfo) {
        RemotingChannel channel = clientChannelInfo.getChannel();
        if (channel instanceof RemotingChannel) {
            Channel parent = channel.parent();
            RemotingHelper.setPropertyToAttr(parent, AttributeKeys.CLIENT_ID_KEY, clientChannelInfo.getClientId());
            RemotingHelper.setPropertyToAttr(parent, AttributeKeys.LANGUAGE_CODE_KEY, clientChannelInfo.getLanguage());
            RemotingHelper.setPropertyToAttr(parent, AttributeKeys.VERSION_KEY, Integer.valueOf(clientChannelInfo.getVersion()));
        }
    }

    protected RemotingCommand unregisterClient(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand, ProxyContext proxyContext) throws RemotingCommandException {
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand(UnregisterClientResponseHeader.class);
        UnregisterClientRequestHeader decodeCommandCustomHeader = remotingCommand.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
        String producerGroup = decodeCommandCustomHeader.getProducerGroup();
        if (producerGroup != null) {
            RemotingChannel removeProducerChannel = this.remotingChannelManager.removeProducerChannel(proxyContext, producerGroup, channelHandlerContext.channel());
            if (removeProducerChannel != null) {
                this.messagingProcessor.unRegisterProducer(proxyContext, producerGroup, new ClientChannelInfo(removeProducerChannel, decodeCommandCustomHeader.getClientID(), remotingCommand.getLanguage(), remotingCommand.getVersion()));
            } else {
                log.warn("unregister producer failed, channel not exist, may has been removed, producerGroup={}, channel={}", producerGroup, channelHandlerContext.channel());
            }
        }
        String consumerGroup = decodeCommandCustomHeader.getConsumerGroup();
        if (consumerGroup != null) {
            RemotingChannel removeConsumerChannel = this.remotingChannelManager.removeConsumerChannel(proxyContext, consumerGroup, channelHandlerContext.channel());
            if (removeConsumerChannel != null) {
                this.messagingProcessor.unRegisterConsumer(proxyContext, consumerGroup, new ClientChannelInfo(removeConsumerChannel, decodeCommandCustomHeader.getClientID(), remotingCommand.getLanguage(), remotingCommand.getVersion()));
            } else {
                log.warn("unregister consumer failed, channel not exist, may has been removed, consumerGroup={}, channel={}", consumerGroup, channelHandlerContext.channel());
            }
        }
        createResponseCommand.setCode(0);
        createResponseCommand.setRemark("");
        return createResponseCommand;
    }

    protected RemotingCommand checkClientConfig(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand, ProxyContext proxyContext) {
        RemotingCommand createResponseCommand = RemotingCommand.createResponseCommand((Class) null);
        createResponseCommand.setCode(0);
        createResponseCommand.setRemark("");
        return createResponseCommand;
    }

    public void doChannelCloseEvent(String str, Channel channel) {
        Iterator<RemotingChannel> it = this.remotingChannelManager.removeChannel(channel).iterator();
        while (it.hasNext()) {
            this.messagingProcessor.doChannelCloseEvent(str, (RemotingChannel) it.next());
        }
    }
}
