/*
 * Decompiled with CFR 0.152.
 */
package com.jxdinfo.hussar.support.mq.redis.producer;

import com.jxdinfo.hussar.support.mq.lifecycle.HussarMQLifecycleManager;
import com.jxdinfo.hussar.support.mq.producer.HussarMQAbstractProducer;
import com.jxdinfo.hussar.support.mq.redis.config.HussarRedisMQProperties;
import com.jxdinfo.hussar.support.mq.redis.message.HussarRedisMQMessage;
import com.jxdinfo.hussar.support.mq.redis.message.HussarRedisMQMessageCodec;
import com.jxdinfo.hussar.support.mq.redis.utils.HussarRedisMQUtils;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.lang.NonNull;

public class HussarRedisMQProducer<T>
extends HussarMQAbstractProducer<T> {
    private static final Logger logger = LoggerFactory.getLogger(HussarRedisMQProducer.class);
    private final HussarRedisMQProperties properties;
    private final RedisTemplate<String, Object> redisTemplate;
    private final HussarRedisMQMessageCodec messageCodec;
    private final String streamName;
    private final byte[] streamNameRaw;

    public HussarRedisMQProducer(HussarMQLifecycleManager lifecycleManager, @NonNull HussarRedisMQProperties properties, @NonNull RedisTemplate<String, Object> redisTemplate, @NonNull HussarRedisMQMessageCodec messageCodec, @NonNull String streamName) {
        super(lifecycleManager);
        this.properties = properties;
        this.redisTemplate = redisTemplate;
        this.messageCodec = messageCodec;
        this.streamName = streamName;
        this.streamNameRaw = messageCodec.toRawKey(streamName);
    }

    protected void doStart() {
        HussarRedisMQUtils.createStreamIfAbsent(this.redisTemplate, this.streamName);
    }

    protected void doStop() {
    }

    public void postMessage(T message) {
        this.assertRunning();
        this.redisTemplate.execute(connection -> {
            this.doCheckLimit(connection, 1);
            this.doPostMessage(connection, message);
            return null;
        });
    }

    public void postMessageList(List<T> messages) {
        this.assertRunning();
        if (messages == null) {
            return;
        }
        this.redisTemplate.execute(connection -> {
            this.doCheckLimit(connection, messages.size());
            for (Object message : messages) {
                this.doPostMessage(connection, message);
            }
            return null;
        });
    }

    private void doCheckLimit(RedisConnection connection, int count) {
        long limit = this.properties.getLimit();
        if (limit <= 0L) {
            return;
        }
        logger.debug("Testing whether redis-mq '{}' limit exceeded or not", (Object)this.streamName);
        Long length = connection.xLen(this.streamNameRaw);
        if (length != null && length > limit - (long)count) {
            throw new IllegalStateException("redis-mq '" + this.streamName + "' limit exceeded " + limit + " (current length: " + length + ")");
        }
    }

    private void doPostMessage(RedisConnection connection, T message) {
        block4: {
            logger.debug("Serialize and post message to {}: {}", (Object)this.streamName, message);
            ByteRecord rawRecord = this.rawRecord(message);
            try {
                RecordId recordId = connection.xAdd((MapRecord)rawRecord);
                logger.debug("Message {} is posted to {}", (Object)recordId, (Object)this.streamName);
            }
            catch (Throwable ex) {
                if (!HussarRedisMQUtils.isRedisNoGroup(ex)) break block4;
                try {
                    logger.warn("Stream '{}' not exists, recreate it", (Object)this.streamName);
                    HussarRedisMQUtils.createStreamIfAbsent(this.redisTemplate, this.streamName);
                }
                catch (Throwable throwable) {
                    logger.error("Failed to create stream '{}'", (Object)this.streamName, (Object)throwable);
                    ex.addSuppressed(throwable);
                    throw ex;
                }
            }
        }
    }

    private ByteRecord rawRecord(T message) {
        return HussarRedisMQMessage.of(message).toByteRecord(this.messageCodec).withStreamKey(this.streamNameRaw);
    }

    public String toString() {
        return "HussarRedisMQProducer<" + this.getLifecycleStateName() + " @" + this.streamName + ">";
    }
}

