/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.rabbit;

import com.rabbitmq.stream.Message;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.ContentTypeDelegatingMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.Lifecycle;
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessageHeaders;
import org.springframework.rabbit.stream.producer.RabbitStreamOperations;
import org.springframework.rabbit.stream.support.StreamMessageProperties;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SuccessCallback;

public class RabbitStreamMessageHandler
extends AbstractMessageHandler
implements Lifecycle {
    private static final int DEFAULT_CONFIRM_TIMEOUT = 10000;
    private final RabbitStreamOperations streamOperations;
    private boolean sync;
    private long confirmTimeout = 10000L;
    private SuccessCallback<org.springframework.messaging.Message<?>> successCallback = msg -> {};
    private FailureCallback failureCallback = (msg, ex) -> {};
    private AmqpHeaderMapper headerMapper = DefaultAmqpHeaderMapper.outboundMapper();
    private boolean headersMappedLast;

    public RabbitStreamMessageHandler(RabbitStreamOperations streamOperations) {
        Assert.notNull((Object)streamOperations, (String)"'streamOperations' cannot be null");
        this.streamOperations = streamOperations;
    }

    public void setSuccessCallback(SuccessCallback<org.springframework.messaging.Message<?>> successCallback) {
        Assert.notNull(successCallback, (String)"'successCallback' cannot be null");
        this.successCallback = successCallback;
    }

    public void setFailureCallback(FailureCallback failureCallback) {
        Assert.notNull((Object)failureCallback, (String)"'failureCallback' cannot be null");
        this.failureCallback = failureCallback;
    }

    public void setSync(boolean sync) {
        this.sync = sync;
    }

    public void setConfirmTimeout(long confirmTimeout) {
        this.confirmTimeout = confirmTimeout;
    }

    public void setHeaderMapper(AmqpHeaderMapper headerMapper) {
        Assert.notNull((Object)headerMapper, (String)"headerMapper must not be null");
        this.headerMapper = headerMapper;
    }

    public void setHeadersMappedLast(boolean headersMappedLast) {
        this.headersMappedLast = headersMappedLast;
    }

    public RabbitStreamOperations getStreamOperations() {
        return this.streamOperations;
    }

    protected void handleMessageInternal(org.springframework.messaging.Message<?> requestMessage) {
        Message streamMessage;
        if (requestMessage.getPayload() instanceof Message) {
            streamMessage = (Message)requestMessage.getPayload();
        } else {
            MessageConverter converter = this.streamOperations.messageConverter();
            org.springframework.amqp.core.Message amqpMessage = RabbitStreamMessageHandler.mapMessage(requestMessage, converter, this.headerMapper, this.headersMappedLast);
            streamMessage = this.streamOperations.streamMessageConverter().fromMessage(amqpMessage);
        }
        ListenableFuture future = this.streamOperations.send(streamMessage);
        this.handleConfirms(requestMessage, (ListenableFuture<Boolean>)future);
    }

    private void handleConfirms(org.springframework.messaging.Message<?> message, ListenableFuture<Boolean> future) {
        future.addCallback(bool -> this.successCallback.onSuccess((Object)message), ex -> this.failureCallback.failure(message, ex));
        if (this.sync) {
            try {
                future.get(this.confirmTimeout, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException ex2) {
                Thread.currentThread().interrupt();
                throw new MessageHandlingException(message, (Throwable)ex2);
            }
            catch (ExecutionException | TimeoutException ex3) {
                throw new MessageHandlingException(message, (Throwable)ex3);
            }
        }
    }

    private static org.springframework.amqp.core.Message mapMessage(org.springframework.messaging.Message<?> message, MessageConverter converter, AmqpHeaderMapper headerMapper, boolean headersMappedLast) {
        String contentType;
        StreamMessageProperties amqpMessageProperties = new StreamMessageProperties();
        if (!headersMappedLast) {
            RabbitStreamMessageHandler.mapHeaders(message.getHeaders(), (MessageProperties)amqpMessageProperties, headerMapper);
        }
        if (converter instanceof ContentTypeDelegatingMessageConverter && headersMappedLast && (contentType = RabbitStreamMessageHandler.contentTypeAsString(message.getHeaders())) != null) {
            amqpMessageProperties.setContentType(contentType);
        }
        org.springframework.amqp.core.Message amqpMessage = converter.toMessage(message.getPayload(), (MessageProperties)amqpMessageProperties);
        if (headersMappedLast) {
            RabbitStreamMessageHandler.mapHeaders(message.getHeaders(), (MessageProperties)amqpMessageProperties, headerMapper);
        }
        return amqpMessage;
    }

    private static void mapHeaders(MessageHeaders messageHeaders, MessageProperties amqpMessageProperties, AmqpHeaderMapper headerMapper) {
        headerMapper.fromHeadersToRequest(messageHeaders, (Object)amqpMessageProperties);
    }

    private static String contentTypeAsString(MessageHeaders headers) {
        Object contentType = headers.get((Object)"contentType");
        if (contentType instanceof MimeType) {
            contentType = contentType.toString();
        }
        if (contentType instanceof String) {
            return (String)contentType;
        }
        if (contentType != null) {
            throw new IllegalArgumentException("contentType header must be a MimeType or String, found: " + contentType.getClass().getName());
        }
        return null;
    }

    public void start() {
    }

    public void stop() {
        this.streamOperations.close();
    }

    public boolean isRunning() {
        return true;
    }

    public static interface FailureCallback {
        public void failure(org.springframework.messaging.Message<?> var1, Throwable var2);
    }
}

