package com.taosdata.jdbc.ws.tmq;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.taosdata.jdbc.TSDBError;
import com.taosdata.jdbc.TSDBErrorNumbers;
import com.taosdata.jdbc.common.Consumer;
import com.taosdata.jdbc.enums.TmqMessageType;
import com.taosdata.jdbc.enums.WSFunction;
import com.taosdata.jdbc.tmq.Assignment;
import com.taosdata.jdbc.tmq.ConsumerRecord;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.Deserializer;
import com.taosdata.jdbc.tmq.OffsetAndMetadata;
import com.taosdata.jdbc.tmq.OffsetCommitCallback;
import com.taosdata.jdbc.tmq.TopicPartition;
import com.taosdata.jdbc.ws.FutureResponse;
import com.taosdata.jdbc.ws.InFlightRequest;
import com.taosdata.jdbc.ws.Transport;
import com.taosdata.jdbc.ws.entity.Code;
import com.taosdata.jdbc.ws.entity.FetchBlockResp;
import com.taosdata.jdbc.ws.entity.Response;
import com.taosdata.jdbc.ws.tmq.entity.AssignmentResp;
import com.taosdata.jdbc.ws.tmq.entity.CommitOffsetResp;
import com.taosdata.jdbc.ws.tmq.entity.CommitResp;
import com.taosdata.jdbc.ws.tmq.entity.CommittedResp;
import com.taosdata.jdbc.ws.tmq.entity.ConsumerParam;
import com.taosdata.jdbc.ws.tmq.entity.ListTopicsResp;
import com.taosdata.jdbc.ws.tmq.entity.PollResp;
import com.taosdata.jdbc.ws.tmq.entity.PositionResp;
import com.taosdata.jdbc.ws.tmq.entity.SeekResp;
import com.taosdata.jdbc.ws.tmq.entity.SubscribeResp;
import com.taosdata.jdbc.ws.tmq.entity.TMQRequestFactory;
import com.taosdata.jdbc.ws.tmq.entity.UnsubscribeResp;
import java.nio.ByteOrder;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

/* loaded from: input_file:com/taosdata/jdbc/ws/tmq/WSConsumer.class */
public class WSConsumer<V> implements Consumer<V> {
    private Transport transport;
    private ConsumerParam param;
    private TMQRequestFactory factory;
    private long lastCommitTime = 0;
    private long messageId = 0;
    private Collection<String> topics;

    @Override // com.taosdata.jdbc.common.Consumer
    public void create(Properties properties) throws SQLException {
        this.factory = new TMQRequestFactory();
        this.param = new ConsumerParam(properties);
        InFlightRequest inFlightRequest = new InFlightRequest(this.param.getConnectionParam().getRequestTimeout(), this.param.getConnectionParam().getMaxRequest());
        this.transport = new Transport(WSFunction.TMQ, this.param.getConnectionParam(), inFlightRequest);
        this.transport.setTextMessageHandler(str -> {
            JSONObject parseObject = JSON.parseObject(str);
            Response response = (Response) parseObject.toJavaObject(ConsumerAction.of(parseObject.getString("action")).getResponseClazz());
            FutureResponse remove = inFlightRequest.remove(response.getAction(), Long.valueOf(response.getReqId()));
            if (null != remove) {
                remove.getFuture().complete(response);
            }
        });
        this.transport.setBinaryMessageHandler(byteBuffer -> {
            byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
            byteBuffer.position(8);
            long j = byteBuffer.getLong();
            byteBuffer.position(24);
            FutureResponse remove = inFlightRequest.remove(ConsumerAction.FETCH_BLOCK.getAction(), Long.valueOf(j));
            if (null != remove) {
                remove.getFuture().complete(new FetchBlockResp(j, byteBuffer));
            }
        });
        this.transport.checkConnection(this.param.getConnectionParam().getConnectTimeout());
    }

    @Override // com.taosdata.jdbc.common.Consumer
    public void subscribe(Collection<String> collection) throws SQLException {
        SubscribeResp subscribeResp = (SubscribeResp) this.transport.send(this.factory.generateSubscribe(this.param.getConnectionParam().getUser(), this.param.getConnectionParam().getPassword(), this.param.getConnectionParam().getDatabase(), this.param.getGroupId(), this.param.getClientId(), this.param.getOffsetRest(), (String[]) collection.toArray(new String[0]), String.valueOf(false), this.param.getMsgWithTableName()));
        if (Code.SUCCESS.getCode() != subscribeResp.getCode()) {
            throw new SQLException("subscribe topic error, code: (0x" + Integer.toHexString(subscribeResp.getCode()) + "), message: " + subscribeResp.getMessage());
        }
        this.topics = collection;
    }

    @Override // com.taosdata.jdbc.common.Consumer
    public void unsubscribe() throws SQLException {
        UnsubscribeResp unsubscribeResp = (UnsubscribeResp) this.transport.send(this.factory.generateUnsubscribe());
        if (Code.SUCCESS.getCode() != unsubscribeResp.getCode()) {
            throw new SQLException("unsubscribe topic error, code: (0x" + Integer.toHexString(unsubscribeResp.getCode()) + "), message: " + unsubscribeResp.getMessage() + ", timing: " + unsubscribeResp.getTiming());
        }
    }

    @Override // com.taosdata.jdbc.common.Consumer
    public Set<String> subscription() throws SQLException {
        ListTopicsResp listTopicsResp = (ListTopicsResp) this.transport.send(this.factory.generateSubscription());
        if (Code.SUCCESS.getCode() != listTopicsResp.getCode()) {
            throw new SQLException("get subscription error, code: (0x" + Integer.toHexString(listTopicsResp.getCode()) + "), message: " + listTopicsResp.getMessage());
        }
        return (Set) Arrays.stream(listTopicsResp.getTopics()).collect(Collectors.toSet());
    }

    private boolean handleReconnect() throws SQLException {
        if (this.transport.doReconnectCurNode()) {
            subscribe(this.topics);
            return true;
        }
        this.transport.close();
        return false;
    }

    private ConsumerRecords<V> doPoll(Duration duration, Deserializer<V> deserializer) throws SQLException {
        if (this.param.isAutoCommit() && 0 != this.messageId) {
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - this.lastCommitTime > this.param.getAutoCommitInterval()) {
                commitSync();
                this.lastCommitTime = currentTimeMillis;
            }
        }
        PollResp pollResp = (PollResp) this.transport.send(this.factory.generatePoll(duration.toMillis()));
        if (Code.SUCCESS.getCode() != pollResp.getCode()) {
            throw new SQLException("consumer poll error, code: (0x" + Integer.toHexString(pollResp.getCode()) + "), message: " + pollResp.getMessage());
        }
        if (pollResp.isHaveMessage() && pollResp.getMessageType() == TmqMessageType.TMQ_RES_DATA.getCode()) {
            this.messageId = pollResp.getMessageId();
            ConsumerRecords<V> consumerRecords = new ConsumerRecords<>();
            WSConsumerResultSet wSConsumerResultSet = new WSConsumerResultSet(this.transport, this.factory, pollResp.getMessageId(), pollResp.getDatabase());
            Throwable th = null;
            while (wSConsumerResultSet.next()) {
                try {
                    try {
                        String topic = pollResp.getTopic();
                        String database = pollResp.getDatabase();
                        int vgroupId = pollResp.getVgroupId();
                        consumerRecords.put(new TopicPartition(topic, vgroupId), new ConsumerRecord<>(topic, database, vgroupId, pollResp.getOffset(), deserializer.deserialize(wSConsumerResultSet, topic, database)));
                    } finally {
                    }
                } catch (Throwable th2) {
                    if (wSConsumerResultSet != null) {
                        if (th != null) {
                            try {
                                wSConsumerResultSet.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            wSConsumerResultSet.close();
                        }
                    }
                    throw th2;
                }
            }
            if (wSConsumerResultSet != null) {
                if (0 != 0) {
                    try {
                        wSConsumerResultSet.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    wSConsumerResultSet.close();
                }
            }
            return consumerRecords;
        }
        return ConsumerRecords.emptyRecord();
    }

    @Override // com.taosdata.jdbc.common.Consumer
    public ConsumerRecords<V> poll(Duration duration, Deserializer<V> deserializer) throws SQLException {
        try {
            return doPoll(duration, deserializer);
        } catch (SQLException e) {
            if (e.getErrorCode() == 8961 && !this.transport.isClosed() && this.param.getConnectionParam().isEnableAutoConnect() && handleReconnect()) {
                this.messageId = 0L;
                return ConsumerRecords.emptyRecord();
            }
            if (e.getErrorCode() != 8990 || this.transport.isClosed() || !this.transport.isConnectionLost() || !handleReconnect()) {
                throw e;
            }
            this.messageId = 0L;
            return ConsumerRecords.emptyRecord();
        }
    }

    @Override // com.taosdata.jdbc.common.Consumer
    public synchronized void commitSync() throws SQLException {
        if (0 != this.messageId) {
            CommitResp commitResp = (CommitResp) this.transport.send(this.factory.generateCommit(this.messageId));
            if (Code.SUCCESS.getCode() != commitResp.getCode()) {
                throw new SQLException("consumer commit error. code: (0x" + Integer.toHexString(commitResp.getCode()) + "), message: " + commitResp.getMessage());
            }
            this.messageId = 0L;
        }
    }

    @Override // com.taosdata.jdbc.common.Consumer
    public void close() throws SQLException {
        this.transport.close();
    }

    @Override // com.taosdata.jdbc.common.Consumer
    public void commitAsync(OffsetCommitCallback<V> offsetCommitCallback) {
    }

    @Override // com.taosdata.jdbc.common.Consumer
    public void seek(TopicPartition topicPartition, long j) throws SQLException {
        SeekResp seekResp = (SeekResp) this.transport.send(this.factory.generateSeek(topicPartition.getTopic(), topicPartition.getVGroupId(), j));
        if (Code.SUCCESS.getCode() != seekResp.getCode()) {
            throw new SQLException("consumer seek error, code: (0x" + Integer.toHexString(seekResp.getCode()) + "), message: " + seekResp.getMessage() + ", timing: " + seekResp.getTiming());
        }
    }

    @Override // com.taosdata.jdbc.common.Consumer
    public long position(TopicPartition topicPartition) throws SQLException {
        PositionResp positionResp = (PositionResp) this.transport.send(this.factory.generatePosition(new TopicPartition[]{topicPartition}));
        if (Code.SUCCESS.getCode() != positionResp.getCode()) {
            throw new SQLException("consumer position error, code: (0x" + Integer.toHexString(positionResp.getCode()) + "), message: " + positionResp.getMessage() + ", timing: " + positionResp.getTiming());
        }
        return positionResp.getPosition()[0];
    }

    @Override // com.taosdata.jdbc.common.Consumer
    public Map<TopicPartition, Long> position(String str) throws SQLException {
        TopicPartition[] topicPartitionArr = (TopicPartition[]) Arrays.stream(getAssignment(str)).map(assignment -> {
            return new TopicPartition(str, assignment.getVgId());
        }).toArray(i -> {
            return new TopicPartition[i];
        });
        PositionResp positionResp = (PositionResp) this.transport.send(this.factory.generatePosition(topicPartitionArr));
        if (Code.SUCCESS.getCode() != positionResp.getCode()) {
            throw new SQLException("consumer position error, code: (0x" + Integer.toHexString(positionResp.getCode()) + "), message: " + positionResp.getMessage() + ", timing: " + positionResp.getTiming());
        }
        return (Map) Arrays.stream(topicPartitionArr).collect(Collectors.toMap(topicPartition -> {
            return topicPartition;
        }, topicPartition2 -> {
            return Long.valueOf(positionResp.getPosition()[Arrays.asList(topicPartitionArr).indexOf(topicPartition2)]);
        }));
    }

    @Override // com.taosdata.jdbc.common.Consumer
    public Map<TopicPartition, Long> beginningOffsets(String str) throws SQLException {
        return (Map) Arrays.stream(getAssignment(str)).collect(HashMap::new, (hashMap, assignment) -> {
        }, (v0, v1) -> {
            v0.putAll(v1);
        });
    }

    @Override // com.taosdata.jdbc.common.Consumer
    public Map<TopicPartition, Long> endOffsets(String str) throws SQLException {
        return (Map) Arrays.stream(getAssignment(str)).collect(HashMap::new, (hashMap, assignment) -> {
        }, (v0, v1) -> {
            v0.putAll(v1);
        });
    }

    @Override // com.taosdata.jdbc.common.Consumer
    public void seekToBeginning(Collection<TopicPartition> collection) throws SQLException {
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : collection) {
            if (hashMap.containsKey(topicPartition)) {
                seek(topicPartition, ((Long) hashMap.get(topicPartition)).longValue());
            } else {
                for (Map.Entry<TopicPartition, Long> entry : beginningOffsets(topicPartition.getTopic()).entrySet()) {
                    if (entry.getKey().getVGroupId() == topicPartition.getVGroupId()) {
                        seek(entry.getKey(), entry.getValue().longValue());
                    } else {
                        hashMap.put(entry.getKey(), entry.getValue());
                    }
                }
            }
        }
    }

    @Override // com.taosdata.jdbc.common.Consumer
    public void seekToEnd(Collection<TopicPartition> collection) throws SQLException {
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : collection) {
            if (hashMap.containsKey(topicPartition)) {
                seek(topicPartition, ((Long) hashMap.get(topicPartition)).longValue());
            } else {
                for (Map.Entry<TopicPartition, Long> entry : endOffsets(topicPartition.getTopic()).entrySet()) {
                    if (entry.getKey().getVGroupId() == topicPartition.getVGroupId()) {
                        seek(entry.getKey(), entry.getValue().longValue());
                    } else {
                        hashMap.put(entry.getKey(), entry.getValue());
                    }
                }
            }
        }
    }

    @Override // com.taosdata.jdbc.common.Consumer
    public Set<TopicPartition> assignment() throws SQLException {
        HashSet hashSet = new HashSet();
        for (String str : subscription()) {
            hashSet.addAll((Collection) Arrays.stream(getAssignment(str)).map(assignment -> {
                return new TopicPartition(str, assignment.getVgId());
            }).collect(Collectors.toSet()));
        }
        return hashSet;
    }

    @Override // com.taosdata.jdbc.common.Consumer
    public OffsetAndMetadata committed(TopicPartition topicPartition) throws SQLException {
        CommittedResp committedResp = (CommittedResp) this.transport.send(this.factory.generateCommitted(new TopicPartition[]{topicPartition}));
        if (Code.SUCCESS.getCode() != committedResp.getCode()) {
            throw new SQLException("consumer committed error, code: (0x" + Integer.toHexString(committedResp.getCode()) + "), message: " + committedResp.getMessage() + ", timing: " + committedResp.getTiming());
        }
        return new OffsetAndMetadata(committedResp.getCommitted()[0], null);
    }

    @Override // com.taosdata.jdbc.common.Consumer
    public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> set) throws SQLException {
        HashMap hashMap = new HashMap();
        TopicPartition[] topicPartitionArr = (TopicPartition[]) set.toArray(new TopicPartition[0]);
        CommittedResp committedResp = (CommittedResp) this.transport.send(this.factory.generateCommitted(topicPartitionArr));
        if (Code.SUCCESS.getCode() != committedResp.getCode()) {
            throw new SQLException("consumer committed error, code: (0x" + Integer.toHexString(committedResp.getCode()) + "), message: " + committedResp.getMessage() + ", timing: " + committedResp.getTiming());
        }
        for (int i = 0; i < topicPartitionArr.length; i++) {
            hashMap.put(topicPartitionArr[i], new OffsetAndMetadata(committedResp.getCommitted()[i], null));
        }
        return hashMap;
    }

    @Override // com.taosdata.jdbc.common.Consumer
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> map) throws SQLException {
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
            if (entry.getValue().offset() >= 0) {
                CommitOffsetResp commitOffsetResp = (CommitOffsetResp) this.transport.send(this.factory.generateCommitOffset(entry.getKey(), entry.getValue().offset()));
                if (Code.SUCCESS.getCode() != commitOffsetResp.getCode()) {
                    throw new SQLException("consumer commit offset error, code: (0x" + Integer.toHexString(commitOffsetResp.getCode()) + "), message: " + commitOffsetResp.getMessage() + ", timing: " + commitOffsetResp.getTiming());
                }
            }
        }
    }

    @Override // com.taosdata.jdbc.common.Consumer
    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> map, OffsetCommitCallback<V> offsetCommitCallback) {
        offsetCommitCallback.onComplete(map, TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNSUPPORTED_METHOD));
    }

    private Assignment[] getAssignment(String str) throws SQLException {
        AssignmentResp assignmentResp = (AssignmentResp) this.transport.send(this.factory.generateAssignment(str));
        if (Code.SUCCESS.getCode() != assignmentResp.getCode()) {
            throw new SQLException("consumer assignment error, code: (0x" + Integer.toHexString(assignmentResp.getCode()) + "), message: " + assignmentResp.getMessage() + ", timing: " + assignmentResp.getTiming());
        }
        return assignmentResp.getAssignment();
    }
}
