/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.naming.consistency.persistent.impl;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.utils.ByteUtils;
import com.alibaba.nacos.common.utils.TypeUtils;
import com.alibaba.nacos.consistency.DataOperation;
import com.alibaba.nacos.consistency.SerializeFactory;
import com.alibaba.nacos.consistency.Serializer;
import com.alibaba.nacos.consistency.cp.RequestProcessor4CP;
import com.alibaba.nacos.consistency.entity.ReadRequest;
import com.alibaba.nacos.consistency.entity.Response;
import com.alibaba.nacos.consistency.entity.WriteRequest;
import com.alibaba.nacos.consistency.snapshot.SnapshotOperation;
import com.alibaba.nacos.core.exception.KvStorageException;
import com.alibaba.nacos.core.storage.kv.KvStorage;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.ValueChangeEvent;
import com.alibaba.nacos.naming.consistency.persistent.ClusterVersionJudgement;
import com.alibaba.nacos.naming.consistency.persistent.PersistentConsistencyService;
import com.alibaba.nacos.naming.consistency.persistent.PersistentNotifier;
import com.alibaba.nacos.naming.consistency.persistent.impl.BatchReadResponse;
import com.alibaba.nacos.naming.consistency.persistent.impl.BatchWriteRequest;
import com.alibaba.nacos.naming.consistency.persistent.impl.NamingKvStorage;
import com.alibaba.nacos.naming.consistency.persistent.impl.NamingSnapshotOperation;
import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.pojo.Record;
import com.google.protobuf.ByteString;
import java.lang.reflect.Type;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public abstract class BasePersistentServiceProcessor
extends RequestProcessor4CP
implements PersistentConsistencyService {
    protected final KvStorage kvStorage;
    protected final Serializer serializer;
    protected volatile boolean hasError = false;
    protected volatile String jRaftErrorMsg;
    protected volatile boolean startNotify = false;
    protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    protected final ReentrantReadWriteLock.ReadLock readLock = this.lock.readLock();
    protected final ClusterVersionJudgement versionJudgement;
    protected final PersistentNotifier notifier;
    protected final int queueMaxSize = 16384;
    protected final int priority = 10;

    public BasePersistentServiceProcessor(ClusterVersionJudgement judgement) throws Exception {
        this.versionJudgement = judgement;
        this.kvStorage = new NamingKvStorage(Paths.get(UtilsAndCommons.DATA_BASE_DIR, "data").toString());
        this.serializer = SerializeFactory.getSerializer((String)"JSON");
        this.notifier = new PersistentNotifier(key -> {
            try {
                byte[] data = this.kvStorage.get(ByteUtils.toBytes((String)key));
                Datum datum = (Datum)this.serializer.deserialize(data, this.getDatumTypeFromKey((String)key));
                return null != datum ? (Record)datum.value : null;
            }
            catch (KvStorageException ex) {
                throw new NacosRuntimeException(ex.getErrCode(), ex.getErrMsg());
            }
        });
    }

    public void afterConstruct() {
        NotifyCenter.registerToPublisher(ValueChangeEvent.class, (int)16384);
        this.listenOldRaftClose();
    }

    private void listenOldRaftClose() {
        this.versionJudgement.registerObserver(isNewVersion -> {
            if (isNewVersion.booleanValue()) {
                NotifyCenter.registerSubscriber((Subscriber)this.notifier);
                this.startNotify = true;
            }
        }, 10);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Response onRequest(ReadRequest request) {
        List keys = (List)this.serializer.deserialize(request.getData().toByteArray(), (Type)TypeUtils.parameterize(List.class, (Type[])new Type[]{byte[].class}));
        ReentrantReadWriteLock.ReadLock lock = this.readLock;
        lock.lock();
        try {
            Map result = this.kvStorage.batchGet(keys);
            BatchReadResponse response = new BatchReadResponse();
            result.forEach(response::append);
            Response response2 = Response.newBuilder().setSuccess(true).setData(ByteString.copyFrom((byte[])this.serializer.serialize((Object)response))).build();
            return response2;
        }
        catch (KvStorageException e) {
            Response response = Response.newBuilder().setSuccess(false).setErrMsg(e.getErrMsg()).build();
            return response;
        }
        finally {
            lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Response onApply(WriteRequest request) {
        byte[] data = request.getData().toByteArray();
        BatchWriteRequest bwRequest = (BatchWriteRequest)this.serializer.deserialize(data, BatchWriteRequest.class);
        Op op = Op.valueOf(request.getOperation());
        ReentrantReadWriteLock.ReadLock lock = this.readLock;
        lock.lock();
        try {
            switch (op) {
                case Write: {
                    this.kvStorage.batchPut(bwRequest.getKeys(), bwRequest.getValues());
                    break;
                }
                case Delete: {
                    this.kvStorage.batchDelete(bwRequest.getKeys());
                    break;
                }
                default: {
                    Response response = Response.newBuilder().setSuccess(false).setErrMsg("unsupport operation : " + (Object)((Object)op)).build();
                    return response;
                }
            }
            this.publishValueChangeEvent(op, bwRequest);
            Response response = Response.newBuilder().setSuccess(true).build();
            return response;
        }
        catch (KvStorageException e) {
            Response response = Response.newBuilder().setSuccess(false).setErrMsg(e.getErrMsg()).build();
            return response;
        }
        finally {
            lock.unlock();
        }
    }

    private void publishValueChangeEvent(Op op, BatchWriteRequest request) {
        List<byte[]> keys = request.getKeys();
        List<byte[]> values = request.getValues();
        for (int i = 0; i < keys.size(); ++i) {
            String key = new String(keys.get(i));
            Datum datum = (Datum)this.serializer.deserialize(values.get(i), this.getDatumTypeFromKey(key));
            Record value = null != datum ? (Record)datum.value : null;
            ValueChangeEvent event = ValueChangeEvent.builder().key(key).value(value).action(Op.Delete.equals((Object)op) ? DataOperation.DELETE : DataOperation.CHANGE).build();
            NotifyCenter.publishEvent((Event)event);
        }
    }

    public String group() {
        return "naming_persistent_service";
    }

    public List<SnapshotOperation> loadSnapshotOperate() {
        return Collections.singletonList(new NamingSnapshotOperation(this.kvStorage, this.lock));
    }

    public void onError(Throwable error) {
        super.onError(error);
        this.hasError = true;
        this.jRaftErrorMsg = error.getMessage();
    }

    protected Type getDatumTypeFromKey(String key) {
        return TypeUtils.parameterize(Datum.class, (Type[])new Type[]{this.getClassOfRecordFromKey(key)});
    }

    protected Class<? extends Record> getClassOfRecordFromKey(String key) {
        if (KeyBuilder.matchSwitchKey(key)) {
            return SwitchDomain.class;
        }
        if (KeyBuilder.matchServiceMetaKey(key)) {
            return Service.class;
        }
        if (KeyBuilder.matchInstanceListKey(key)) {
            return Instances.class;
        }
        return Record.class;
    }

    protected void notifierDatumIfAbsent(String key, RecordListener listener) throws NacosException {
        if ("com.alibaba.nacos.naming.domains.meta.".equals(key)) {
            this.notifierAllServiceMeta(listener);
        } else {
            Datum datum = this.get(key);
            if (null != datum) {
                this.notifierDatum(key, datum, listener);
            }
        }
    }

    private void notifierAllServiceMeta(RecordListener listener) throws NacosException {
        for (byte[] each : this.kvStorage.allKeys()) {
            Datum datum;
            String key = new String(each);
            if (!listener.interests(key) || null == (datum = this.get(key))) continue;
            this.notifierDatum(key, datum, listener);
        }
    }

    private void notifierDatum(String key, Datum datum, RecordListener listener) {
        try {
            listener.onChange(key, datum.value);
        }
        catch (Exception e) {
            Loggers.RAFT.error("NACOS-RAFT failed to notify listener", (Throwable)e);
        }
    }

    static enum Op {
        Write("Write"),
        Read("Read"),
        Delete("Delete");

        protected final String desc;

        private Op(String desc) {
            this.desc = desc;
        }
    }
}

