/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.canal.client.kafka;

import com.alibaba.fastjson2.JSON;
import com.alibaba.otter.canal.client.kafka.KafkaCanalConnector;
import com.alibaba.otter.canal.client.kafka.protocol.KafkaFlatMessage;
import com.alibaba.otter.canal.client.kafka.protocol.KafkaMessage;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;

public class KafkaOffsetCanalConnector
extends KafkaCanalConnector {
    public KafkaOffsetCanalConnector(String servers, String topic, Integer partition, String groupId, boolean flatMessage) {
        super(servers, topic, partition, groupId, 100, flatMessage);
        this.properties.put("auto.offset.reset", "earliest");
    }

    public List<KafkaMessage> getListWithoutAck(Long timeout, TimeUnit unit, long offset) throws CanalClientException {
        ConsumerRecords records;
        this.waitClientRunning();
        if (!this.running) {
            return new ArrayList<KafkaMessage>();
        }
        if (offset > -1L) {
            TopicPartition tp = new TopicPartition(this.topic, this.partition == null ? 0 : this.partition);
            this.kafkaConsumer.seek(tp, offset);
        }
        if (!(records = this.kafkaConsumer.poll(unit.toMillis(timeout))).isEmpty()) {
            ArrayList<KafkaMessage> messages = new ArrayList<KafkaMessage>();
            for (ConsumerRecord record : records) {
                KafkaMessage message = new KafkaMessage((Message)record.value(), record.offset());
                messages.add(message);
            }
            return messages;
        }
        return new ArrayList<KafkaMessage>();
    }

    public List<KafkaFlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit, long offset) throws CanalClientException {
        ConsumerRecords records;
        this.waitClientRunning();
        if (!this.running) {
            return new ArrayList<KafkaFlatMessage>();
        }
        if (offset > -1L) {
            TopicPartition tp = new TopicPartition(this.topic, this.partition == null ? 0 : this.partition);
            this.kafkaConsumer2.seek(tp, offset);
        }
        if (!(records = this.kafkaConsumer2.poll(unit.toMillis(timeout))).isEmpty()) {
            ArrayList<KafkaFlatMessage> flatMessages = new ArrayList<KafkaFlatMessage>();
            for (ConsumerRecord record : records) {
                String flatMessageJson = (String)record.value();
                FlatMessage flatMessage = (FlatMessage)JSON.parseObject((String)flatMessageJson, FlatMessage.class);
                KafkaFlatMessage message = new KafkaFlatMessage(flatMessage, record.offset());
                flatMessages.add(message);
            }
            return flatMessages;
        }
        return new ArrayList<KafkaFlatMessage>();
    }

    public void setAutoOffsetReset(String value) {
        if (StringUtils.isNotBlank((CharSequence)value)) {
            this.properties.put("auto.offset.reset", value);
        }
    }
}

