package org.apache.rocketmq.proxy.service.message;

import io.netty.channel.ChannelHandlerContext;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.AckStatus;
import org.apache.rocketmq.client.consumer.PopResult;
import org.apache.rocketmq.client.consumer.PopStatus;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.ProxyException;
import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
import org.apache.rocketmq.proxy.service.channel.ChannelManager;
import org.apache.rocketmq.proxy.service.channel.InvocationContext;
import org.apache.rocketmq.proxy.service.channel.SimpleChannel;
import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.body.BatchAck;
import org.apache.rocketmq.remoting.protocol.body.BatchAckMessageRequestBody;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.header.AckMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.ConsumerSendMsgBackRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.RecallMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader;

/* loaded from: input_file:org/apache/rocketmq/proxy/service/message/LocalMessageService.class */
public class LocalMessageService implements MessageService {
    private static final Logger log = LoggerFactory.getLogger("RocketmqProxy");
    private final BrokerController brokerController;
    private final ChannelManager channelManager;

    public LocalMessageService(BrokerController brokerController, ChannelManager channelManager, RPCHook rPCHook) {
        this.brokerController = brokerController;
        this.channelManager = channelManager;
    }

    @Override // org.apache.rocketmq.proxy.service.message.MessageService
    public CompletableFuture<List<SendResult>> sendMessage(ProxyContext proxyContext, AddressableMessageQueue addressableMessageQueue, List<Message> list, SendMessageRequestHeader sendMessageRequestHeader, long j) {
        byte[] body;
        String uniqID;
        if (list.size() > 1) {
            sendMessageRequestHeader.setBatch(true);
            MessageBatch generateFromList = MessageBatch.generateFromList(list);
            MessageClientIDSetter.setUniqID(generateFromList);
            body = generateFromList.encode();
            generateFromList.setBody(body);
            uniqID = MessageClientIDSetter.getUniqID(generateFromList);
        } else {
            Message message = list.get(0);
            body = message.getBody();
            uniqID = MessageClientIDSetter.getUniqID(message);
        }
        LocalRemotingCommand createRequestCommand = LocalRemotingCommand.createRequestCommand(10, sendMessageRequestHeader, proxyContext.getLanguage());
        createRequestCommand.setBody(body);
        CompletableFuture completableFuture = new CompletableFuture();
        SimpleChannel createInvocationChannel = this.channelManager.createInvocationChannel(proxyContext);
        InvocationContext invocationContext = new InvocationContext(completableFuture);
        createInvocationChannel.registerInvocationContext(createRequestCommand.getOpaque(), invocationContext);
        try {
            RemotingCommand processRequest = this.brokerController.getSendMessageProcessor().processRequest(createInvocationChannel.getChannelHandlerContext(), createRequestCommand);
            if (processRequest != null) {
                invocationContext.handle(processRequest);
                createInvocationChannel.eraseInvocationContext(createRequestCommand.getOpaque());
            }
        } catch (Exception e) {
            completableFuture.completeExceptionally(e);
            createInvocationChannel.eraseInvocationContext(createRequestCommand.getOpaque());
            log.error("Failed to process sendMessage command", e);
        }
        String str = uniqID;
        return completableFuture.thenApply(remotingCommand -> {
            SendStatus sendStatus;
            SendResult sendResult = new SendResult();
            SendMessageResponseHeader readCustomHeader = remotingCommand.readCustomHeader();
            switch (remotingCommand.getCode()) {
                case 0:
                    sendStatus = SendStatus.SEND_OK;
                    break;
                case 10:
                    sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;
                    break;
                case 11:
                    sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;
                    break;
                case 12:
                    sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT;
                    break;
                default:
                    throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, remotingCommand.getRemark());
            }
            sendResult.setSendStatus(sendStatus);
            sendResult.setMsgId(str);
            sendResult.setMessageQueue(new MessageQueue(sendMessageRequestHeader.getTopic(), this.brokerController.getBrokerConfig().getBrokerName(), sendMessageRequestHeader.getQueueId().intValue()));
            sendResult.setQueueOffset(readCustomHeader.getQueueOffset().longValue());
            sendResult.setTransactionId(readCustomHeader.getTransactionId());
            sendResult.setOffsetMsgId(readCustomHeader.getMsgId());
            sendResult.setRecallHandle(readCustomHeader.getRecallHandle());
            return Collections.singletonList(sendResult);
        });
    }

    @Override // org.apache.rocketmq.proxy.service.message.MessageService
    public CompletableFuture<RemotingCommand> sendMessageBack(ProxyContext proxyContext, ReceiptHandle receiptHandle, String str, ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader, long j) {
        ChannelHandlerContext channelHandlerContext = this.channelManager.createChannel(proxyContext).getChannelHandlerContext();
        LocalRemotingCommand createRequestCommand = LocalRemotingCommand.createRequestCommand(36, consumerSendMsgBackRequestHeader, proxyContext.getLanguage());
        CompletableFuture<RemotingCommand> completableFuture = new CompletableFuture<>();
        try {
            completableFuture.complete(this.brokerController.getSendMessageProcessor().processRequest(channelHandlerContext, createRequestCommand));
        } catch (Exception e) {
            log.error("Fail to process sendMessageBack command", e);
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    @Override // org.apache.rocketmq.proxy.service.message.MessageService
    public CompletableFuture<Void> endTransactionOneway(ProxyContext proxyContext, String str, EndTransactionRequestHeader endTransactionRequestHeader, long j) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        try {
            this.brokerController.getEndTransactionProcessor().processRequest(this.channelManager.createChannel(proxyContext).getChannelHandlerContext(), LocalRemotingCommand.createRequestCommand(37, endTransactionRequestHeader, proxyContext.getLanguage()));
            completableFuture.complete(null);
        } catch (Exception e) {
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    @Override // org.apache.rocketmq.proxy.service.message.MessageService
    public CompletableFuture<PopResult> popMessage(ProxyContext proxyContext, AddressableMessageQueue addressableMessageQueue, PopMessageRequestHeader popMessageRequestHeader, long j) {
        popMessageRequestHeader.setBornTime(System.currentTimeMillis());
        LocalRemotingCommand createRequestCommand = LocalRemotingCommand.createRequestCommand(200050, popMessageRequestHeader, proxyContext.getLanguage());
        CompletableFuture completableFuture = new CompletableFuture();
        SimpleChannel createInvocationChannel = this.channelManager.createInvocationChannel(proxyContext);
        InvocationContext invocationContext = new InvocationContext(completableFuture);
        createInvocationChannel.registerInvocationContext(createRequestCommand.getOpaque(), invocationContext);
        try {
            RemotingCommand processRequest = this.brokerController.getPopMessageProcessor().processRequest(createInvocationChannel.getChannelHandlerContext(), createRequestCommand);
            if (processRequest != null) {
                invocationContext.handle(processRequest);
                createInvocationChannel.eraseInvocationContext(createRequestCommand.getOpaque());
            }
        } catch (Exception e) {
            completableFuture.completeExceptionally(e);
            createInvocationChannel.eraseInvocationContext(createRequestCommand.getOpaque());
            log.error("Failed to process popMessage command", e);
        }
        return completableFuture.thenApply(remotingCommand -> {
            PopStatus popStatus;
            Integer num;
            ArrayList<MessageExt> arrayList = new ArrayList();
            switch (remotingCommand.getCode()) {
                case 0:
                    popStatus = PopStatus.FOUND;
                    arrayList = MessageDecoder.decodesBatch(ByteBuffer.wrap(remotingCommand.getBody()), true, false, true);
                    break;
                case 19:
                case 210:
                    popStatus = PopStatus.POLLING_NOT_FOUND;
                    break;
                case 209:
                    popStatus = PopStatus.POLLING_FULL;
                    break;
                default:
                    throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, remotingCommand.getRemark());
            }
            PopResult popResult = new PopResult(popStatus, arrayList);
            PopMessageResponseHeader readCustomHeader = remotingCommand.readCustomHeader();
            if (popStatus == PopStatus.FOUND) {
                popResult.setInvisibleTime(readCustomHeader.getInvisibleTime());
                popResult.setPopTime(readCustomHeader.getPopTime());
                Map parseStartOffsetInfo = ExtraInfoUtil.parseStartOffsetInfo(readCustomHeader.getStartOffsetInfo());
                Map parseMsgOffsetInfo = ExtraInfoUtil.parseMsgOffsetInfo(readCustomHeader.getMsgOffsetInfo());
                Map parseOrderCountInfo = ExtraInfoUtil.parseOrderCountInfo(readCustomHeader.getOrderCountInfo());
                HashMap hashMap = new HashMap(16);
                for (MessageExt messageExt : arrayList) {
                    String startOffsetInfoMapKey = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), messageExt.getProperty("POP_CK"), messageExt.getQueueId());
                    if (!hashMap.containsKey(startOffsetInfoMapKey)) {
                        hashMap.put(startOffsetInfoMapKey, new ArrayList(4));
                    }
                    ((List) hashMap.get(startOffsetInfoMapKey)).add(Long.valueOf(messageExt.getQueueOffset()));
                }
                HashMap hashMap2 = new HashMap(5);
                for (MessageExt messageExt2 : arrayList) {
                    if (parseStartOffsetInfo == null) {
                        String str = messageExt2.getTopic() + messageExt2.getQueueId();
                        if (!hashMap2.containsKey(messageExt2.getTopic() + messageExt2.getQueueId())) {
                            hashMap2.put(str, ExtraInfoUtil.buildExtraInfo(messageExt2.getQueueOffset(), readCustomHeader.getPopTime(), readCustomHeader.getInvisibleTime(), readCustomHeader.getReviveQid(), messageExt2.getTopic(), addressableMessageQueue.getBrokerName(), messageExt2.getQueueId()));
                        }
                        messageExt2.getProperties().put("POP_CK", ((String) hashMap2.get(str)) + " " + messageExt2.getQueueOffset());
                    } else if (messageExt2.getProperty("POP_CK") == null) {
                        String startOffsetInfoMapKey2 = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt2.getTopic(), messageExt2.getQueueId());
                        Long l = (Long) ((List) parseMsgOffsetInfo.get(startOffsetInfoMapKey2)).get(((List) hashMap.get(startOffsetInfoMapKey2)).indexOf(Long.valueOf(messageExt2.getQueueOffset())));
                        if (l.longValue() != messageExt2.getQueueOffset()) {
                            log.warn("Queue offset [{}] of msg is strange, not equal to the stored in msg, {}", l, messageExt2);
                        }
                        messageExt2.getProperties().put("POP_CK", ExtraInfoUtil.buildExtraInfo(((Long) parseStartOffsetInfo.get(startOffsetInfoMapKey2)).longValue(), readCustomHeader.getPopTime(), readCustomHeader.getInvisibleTime(), readCustomHeader.getReviveQid(), messageExt2.getTopic(), addressableMessageQueue.getBrokerName(), messageExt2.getQueueId(), l.longValue()));
                        if (popMessageRequestHeader.isOrder() && parseOrderCountInfo != null && (num = (Integer) parseOrderCountInfo.get(startOffsetInfoMapKey2)) != null && num.intValue() > 0) {
                            messageExt2.setReconsumeTimes(num.intValue());
                        }
                    }
                    messageExt2.getProperties().computeIfAbsent("1ST_POP_TIME", str2 -> {
                        return String.valueOf(readCustomHeader.getPopTime());
                    });
                    messageExt2.setBrokerName(messageExt2.getBrokerName());
                    messageExt2.setTopic(addressableMessageQueue.getTopic());
                }
            }
            return popResult;
        });
    }

    @Override // org.apache.rocketmq.proxy.service.message.MessageService
    public CompletableFuture<AckResult> changeInvisibleTime(ProxyContext proxyContext, ReceiptHandle receiptHandle, String str, ChangeInvisibleTimeRequestHeader changeInvisibleTimeRequestHeader, long j) {
        ChannelHandlerContext channelHandlerContext = this.channelManager.createChannel(proxyContext).getChannelHandlerContext();
        LocalRemotingCommand createRequestCommand = LocalRemotingCommand.createRequestCommand(200053, changeInvisibleTimeRequestHeader, proxyContext.getLanguage());
        CompletableFuture completableFuture = new CompletableFuture();
        try {
            completableFuture = this.brokerController.getChangeInvisibleTimeProcessor().processRequestAsync(channelHandlerContext.channel(), createRequestCommand, true);
        } catch (Exception e) {
            log.error("Fail to process changeInvisibleTime command", e);
            completableFuture.completeExceptionally(e);
        }
        return completableFuture.thenApply(remotingCommand -> {
            ChangeInvisibleTimeResponseHeader readCustomHeader = remotingCommand.readCustomHeader();
            AckResult ackResult = new AckResult();
            if (0 == remotingCommand.getCode()) {
                ackResult.setStatus(AckStatus.OK);
            } else {
                ackResult.setStatus(AckStatus.NO_EXIST);
            }
            ackResult.setPopTime(readCustomHeader.getPopTime());
            ackResult.setExtraInfo(ReceiptHandle.builder().startOffset(receiptHandle.getStartOffset()).retrieveTime(readCustomHeader.getPopTime()).invisibleTime(readCustomHeader.getInvisibleTime()).reviveQueueId(readCustomHeader.getReviveQid()).topicType(receiptHandle.getTopicType()).brokerName(receiptHandle.getBrokerName()).queueId(receiptHandle.getQueueId()).offset(receiptHandle.getOffset()).build().encode());
            return ackResult;
        });
    }

    @Override // org.apache.rocketmq.proxy.service.message.MessageService
    public CompletableFuture<AckResult> ackMessage(ProxyContext proxyContext, ReceiptHandle receiptHandle, String str, AckMessageRequestHeader ackMessageRequestHeader, long j) {
        ChannelHandlerContext channelHandlerContext = this.channelManager.createChannel(proxyContext).getChannelHandlerContext();
        LocalRemotingCommand createRequestCommand = LocalRemotingCommand.createRequestCommand(200051, ackMessageRequestHeader, proxyContext.getLanguage());
        CompletableFuture completableFuture = new CompletableFuture();
        try {
            completableFuture.complete(this.brokerController.getAckMessageProcessor().processRequest(channelHandlerContext, createRequestCommand));
        } catch (Exception e) {
            log.error("Fail to process ackMessage command", e);
            completableFuture.completeExceptionally(e);
        }
        return completableFuture.thenApply(remotingCommand -> {
            AckResult ackResult = new AckResult();
            if (0 == remotingCommand.getCode()) {
                ackResult.setStatus(AckStatus.OK);
            } else {
                ackResult.setStatus(AckStatus.NO_EXIST);
            }
            return ackResult;
        });
    }

    @Override // org.apache.rocketmq.proxy.service.message.MessageService
    public CompletableFuture<AckResult> batchAckMessage(ProxyContext proxyContext, List<ReceiptHandleMessage> list, String str, String str2, long j) {
        ChannelHandlerContext channelHandlerContext = this.channelManager.createChannel(proxyContext).getChannelHandlerContext();
        RemotingCommand createRequestCommand = LocalRemotingCommand.createRequestCommand(200151, null);
        HashMap hashMap = new HashMap();
        Iterator<ReceiptHandleMessage> it = list.iterator();
        while (it.hasNext()) {
            String[] split = ExtraInfoUtil.split(it.next().getReceiptHandle().getReceiptHandle());
            ((BatchAck) hashMap.computeIfAbsent(ExtraInfoUtil.getRetry(split) + "@" + ExtraInfoUtil.getQueueId(split) + "@" + ExtraInfoUtil.getCkQueueOffset(split) + "@" + ExtraInfoUtil.getPopTime(split), str3 -> {
                BatchAck batchAck = new BatchAck();
                batchAck.setConsumerGroup(str);
                batchAck.setTopic(str2);
                batchAck.setRetry(ExtraInfoUtil.getRetry(split));
                batchAck.setStartOffset(ExtraInfoUtil.getCkQueueOffset(split).longValue());
                batchAck.setQueueId(ExtraInfoUtil.getQueueId(split));
                batchAck.setReviveQueueId(ExtraInfoUtil.getReviveQid(split));
                batchAck.setPopTime(ExtraInfoUtil.getPopTime(split).longValue());
                batchAck.setInvisibleTime(ExtraInfoUtil.getInvisibleTime(split).longValue());
                batchAck.setBitSet(new BitSet());
                return batchAck;
            })).getBitSet().set((int) (ExtraInfoUtil.getQueueOffset(split) - ExtraInfoUtil.getCkQueueOffset(split).longValue()));
        }
        BatchAckMessageRequestBody batchAckMessageRequestBody = new BatchAckMessageRequestBody();
        batchAckMessageRequestBody.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
        batchAckMessageRequestBody.setAcks(new ArrayList(hashMap.values()));
        createRequestCommand.setBody(batchAckMessageRequestBody.encode());
        CompletableFuture completableFuture = new CompletableFuture();
        try {
            completableFuture.complete(this.brokerController.getAckMessageProcessor().processRequest(channelHandlerContext, createRequestCommand));
        } catch (Exception e) {
            log.error("Fail to process batchAckMessage command", e);
            completableFuture.completeExceptionally(e);
        }
        return completableFuture.thenApply(remotingCommand -> {
            AckResult ackResult = new AckResult();
            if (0 == remotingCommand.getCode()) {
                ackResult.setStatus(AckStatus.OK);
            } else {
                ackResult.setStatus(AckStatus.NO_EXIST);
            }
            return ackResult;
        });
    }

    @Override // org.apache.rocketmq.proxy.service.message.MessageService
    public CompletableFuture<PullResult> pullMessage(ProxyContext proxyContext, AddressableMessageQueue addressableMessageQueue, PullMessageRequestHeader pullMessageRequestHeader, long j) {
        throw new NotImplementedException("pullMessage is not implemented in LocalMessageService");
    }

    @Override // org.apache.rocketmq.proxy.service.message.MessageService
    public CompletableFuture<Long> queryConsumerOffset(ProxyContext proxyContext, AddressableMessageQueue addressableMessageQueue, QueryConsumerOffsetRequestHeader queryConsumerOffsetRequestHeader, long j) {
        throw new NotImplementedException("queryConsumerOffset is not implemented in LocalMessageService");
    }

    @Override // org.apache.rocketmq.proxy.service.message.MessageService
    public CompletableFuture<Void> updateConsumerOffset(ProxyContext proxyContext, AddressableMessageQueue addressableMessageQueue, UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader, long j) {
        throw new NotImplementedException("updateConsumerOffset is not implemented in LocalMessageService");
    }

    @Override // org.apache.rocketmq.proxy.service.message.MessageService
    public CompletableFuture<Void> updateConsumerOffsetAsync(ProxyContext proxyContext, AddressableMessageQueue addressableMessageQueue, UpdateConsumerOffsetRequestHeader updateConsumerOffsetRequestHeader, long j) {
        throw new NotImplementedException("updateConsumerOffsetAsync is not implemented in LocalMessageService");
    }

    @Override // org.apache.rocketmq.proxy.service.message.MessageService
    public CompletableFuture<Set<MessageQueue>> lockBatchMQ(ProxyContext proxyContext, AddressableMessageQueue addressableMessageQueue, LockBatchRequestBody lockBatchRequestBody, long j) {
        throw new NotImplementedException("lockBatchMQ is not implemented in LocalMessageService");
    }

    @Override // org.apache.rocketmq.proxy.service.message.MessageService
    public CompletableFuture<Void> unlockBatchMQ(ProxyContext proxyContext, AddressableMessageQueue addressableMessageQueue, UnlockBatchRequestBody unlockBatchRequestBody, long j) {
        throw new NotImplementedException("unlockBatchMQ is not implemented in LocalMessageService");
    }

    @Override // org.apache.rocketmq.proxy.service.message.MessageService
    public CompletableFuture<Long> getMaxOffset(ProxyContext proxyContext, AddressableMessageQueue addressableMessageQueue, GetMaxOffsetRequestHeader getMaxOffsetRequestHeader, long j) {
        throw new NotImplementedException("getMaxOffset is not implemented in LocalMessageService");
    }

    @Override // org.apache.rocketmq.proxy.service.message.MessageService
    public CompletableFuture<Long> getMinOffset(ProxyContext proxyContext, AddressableMessageQueue addressableMessageQueue, GetMinOffsetRequestHeader getMinOffsetRequestHeader, long j) {
        throw new NotImplementedException("getMinOffset is not implemented in LocalMessageService");
    }

    @Override // org.apache.rocketmq.proxy.service.message.MessageService
    public CompletableFuture<String> recallMessage(ProxyContext proxyContext, String str, RecallMessageRequestHeader recallMessageRequestHeader, long j) {
        ChannelHandlerContext channelHandlerContext = this.channelManager.createChannel(proxyContext).getChannelHandlerContext();
        LocalRemotingCommand createRequestCommand = LocalRemotingCommand.createRequestCommand(370, recallMessageRequestHeader, proxyContext.getLanguage());
        CompletableFuture completableFuture = new CompletableFuture();
        try {
            completableFuture.complete(this.brokerController.getRecallMessageProcessor().processRequest(channelHandlerContext, createRequestCommand));
        } catch (Exception e) {
            log.error("Fail to process recallMessage command", e);
            completableFuture.completeExceptionally(e);
        }
        return completableFuture.thenApply(remotingCommand -> {
            switch (remotingCommand.getCode()) {
                case 0:
                    return remotingCommand.readCustomHeader().getMsgId();
                default:
                    throw new ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, remotingCommand.getRemark());
            }
        });
    }

    @Override // org.apache.rocketmq.proxy.service.message.MessageService
    public CompletableFuture<RemotingCommand> request(ProxyContext proxyContext, String str, RemotingCommand remotingCommand, long j) {
        throw new NotImplementedException("request is not implemented in LocalMessageService");
    }

    @Override // org.apache.rocketmq.proxy.service.message.MessageService
    public CompletableFuture<Void> requestOneway(ProxyContext proxyContext, String str, RemotingCommand remotingCommand, long j) {
        throw new NotImplementedException("requestOneway is not implemented in LocalMessageService");
    }
}
