/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.pulsar.source.format;

import java.io.IOException;
import java.util.Iterator;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;

public class PulsarCanalDecorator
implements DeserializationSchema<SeaTunnelRow> {
    private static final String MESSAGE = "message";
    private static final String FIELD_DATA = "data";
    private static final String FIELD_OLD = "old";
    public static final String COLUMN_NAME = "columnName";
    public static final String COLUMN_VALUE = "columnValue";
    public static final String COLUMN_INDEX = "index";
    private final CanalJsonDeserializationSchema canalJsonDeserializationSchema;

    public PulsarCanalDecorator(CanalJsonDeserializationSchema canalJsonDeserializationSchema) {
        this.canalJsonDeserializationSchema = canalJsonDeserializationSchema;
    }

    public SeaTunnelRow deserialize(byte[] message) throws IOException {
        throw new UnsupportedOperationException();
    }

    public void deserialize(byte[] message, Collector<SeaTunnelRow> out) throws IOException {
        ObjectNode pulsarCanal = JsonUtils.parseObject((byte[])message);
        ArrayNode canalList = JsonUtils.parseArray((String)pulsarCanal.get(MESSAGE).asText());
        Iterator canalIterator = canalList.elements();
        while (canalIterator.hasNext()) {
            JsonNode next = (JsonNode)canalIterator.next();
            ObjectNode root = this.reconvertPulsarData((ObjectNode)next);
            this.canalJsonDeserializationSchema.deserialize(root, out);
        }
    }

    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
        return this.canalJsonDeserializationSchema.getProducedType();
    }

    private ObjectNode reconvertPulsarData(ObjectNode root) {
        root.replace(FIELD_DATA, this.reconvert(root.get(FIELD_DATA)));
        root.replace(FIELD_OLD, this.reconvert(root.get(FIELD_OLD)));
        return root;
    }

    private JsonNode reconvert(JsonNode node) {
        if (!(node instanceof ArrayNode) || node.size() <= 0) {
            return node;
        }
        long firstColumn = node.get(0).get(COLUMN_INDEX).asLong();
        ArrayNode arrayNode = JsonUtils.createArrayNode();
        ObjectNode rowMap = JsonUtils.createObjectNode();
        for (int i = 0; i < node.size(); ++i) {
            ObjectNode columnNode = (ObjectNode)node.get(i);
            if (firstColumn == columnNode.get(COLUMN_INDEX).asLong()) {
                arrayNode.add((JsonNode)rowMap);
                rowMap = JsonUtils.createObjectNode();
            }
            rowMap.set(columnNode.get(COLUMN_NAME).asText(), columnNode.get(COLUMN_VALUE));
        }
        arrayNode.add((JsonNode)rowMap);
        arrayNode.remove(0);
        return arrayNode;
    }
}

