/*
 * Decompiled with CFR 0.152.
 */
package com.tencent.polaris.plugins.connector.grpc;

import com.google.protobuf.Message;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.StringValue;
import com.google.protobuf.TextFormat;
import com.google.protobuf.UInt32Value;
import com.tencent.polaris.api.config.consumer.DiscoveryConfig;
import com.tencent.polaris.api.config.global.ClusterType;
import com.tencent.polaris.api.config.global.ServerConnectorConfig;
import com.tencent.polaris.api.config.provider.RegisterConfig;
import com.tencent.polaris.api.exception.ErrorCode;
import com.tencent.polaris.api.exception.PolarisException;
import com.tencent.polaris.api.exception.RetriableException;
import com.tencent.polaris.api.plugin.PluginType;
import com.tencent.polaris.api.plugin.common.InitContext;
import com.tencent.polaris.api.plugin.common.PluginTypes;
import com.tencent.polaris.api.plugin.compose.Extensions;
import com.tencent.polaris.api.plugin.server.CommonProviderRequest;
import com.tencent.polaris.api.plugin.server.CommonProviderResponse;
import com.tencent.polaris.api.plugin.server.CommonServiceContractRequest;
import com.tencent.polaris.api.plugin.server.InterfaceDescriptor;
import com.tencent.polaris.api.plugin.server.ReportClientRequest;
import com.tencent.polaris.api.plugin.server.ReportClientResponse;
import com.tencent.polaris.api.plugin.server.ReportServiceContractRequest;
import com.tencent.polaris.api.plugin.server.ReportServiceContractResponse;
import com.tencent.polaris.api.plugin.server.ServerEvent;
import com.tencent.polaris.api.plugin.server.ServiceEventHandler;
import com.tencent.polaris.api.plugin.server.TargetServer;
import com.tencent.polaris.api.pojo.ServiceEventKey;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.api.utils.ThreadPoolUtils;
import com.tencent.polaris.client.pojo.ServiceRuleByProto;
import com.tencent.polaris.client.util.NamedThreadFactory;
import com.tencent.polaris.factory.config.global.ServerConnectorConfigImpl;
import com.tencent.polaris.logging.LoggerFactory;
import com.tencent.polaris.plugins.connector.common.DestroyableServerConnector;
import com.tencent.polaris.plugins.connector.common.ServiceUpdateTask;
import com.tencent.polaris.plugins.connector.common.constant.ServiceUpdateTaskConstant;
import com.tencent.polaris.plugins.connector.grpc.Connection;
import com.tencent.polaris.plugins.connector.grpc.ConnectionManager;
import com.tencent.polaris.plugins.connector.grpc.GrpcServiceUpdateTask;
import com.tencent.polaris.plugins.connector.grpc.GrpcUtil;
import com.tencent.polaris.plugins.connector.grpc.SpecStreamClient;
import com.tencent.polaris.specification.api.v1.model.CodeProto;
import com.tencent.polaris.specification.api.v1.model.ModelProto;
import com.tencent.polaris.specification.api.v1.service.manage.ClientProto;
import com.tencent.polaris.specification.api.v1.service.manage.PolarisGRPCGrpc;
import com.tencent.polaris.specification.api.v1.service.manage.PolarisServiceContractGRPCGrpc;
import com.tencent.polaris.specification.api.v1.service.manage.RequestProto;
import com.tencent.polaris.specification.api.v1.service.manage.ResponseProto;
import com.tencent.polaris.specification.api.v1.service.manage.ServiceContractProto;
import com.tencent.polaris.specification.api.v1.service.manage.ServiceProto;
import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;

public class GrpcConnector
extends DestroyableServerConnector {
    private static final Logger LOG = LoggerFactory.getLogger(GrpcConnector.class);
    private final Map<ClusterType, AtomicReference<SpecStreamClient>> streamClients = new HashMap<ClusterType, AtomicReference<SpecStreamClient>>();
    private long messageTimeoutMs;
    private ConnectionManager connectionManager;
    private long connectionIdleTimeoutMs;
    private boolean initialized = false;
    private boolean standalone = true;
    private String id;
    private boolean isRegisterEnable = true;
    private boolean isDiscoveryEnable = true;
    private String clientInstanceId;
    private boolean isReportServiceContractEnable = true;
    private ServerConnectorConfigImpl connectorConfig;
    private ScheduledThreadPoolExecutor sendDiscoverExecutor;
    private ScheduledThreadPoolExecutor buildInExecutor;
    private ScheduledThreadPoolExecutor updateServiceExecutor;
    private CompletableFuture<String> readyFuture;
    private final Object lock = new Object();
    private final Map<ServiceEventKey.EventType, Boolean> supportedResourcesType = new ConcurrentHashMap<ServiceEventKey.EventType, Boolean>();

    private static TargetServer connectionToTargetNode(Connection connection) {
        Connection.ConnID connID = connection.getConnID();
        return new TargetServer(connID.getServiceKey(), connID.getHost(), connID.getPort(), connID.getProtocol());
    }

    public void init(InitContext ctx) throws PolarisException {
        if (!this.initialized) {
            this.supportedResourcesType.put(ServiceEventKey.EventType.INSTANCE, true);
            this.supportedResourcesType.put(ServiceEventKey.EventType.ROUTING, true);
            this.supportedResourcesType.put(ServiceEventKey.EventType.SERVICE, true);
            this.supportedResourcesType.put(ServiceEventKey.EventType.RATE_LIMITING, true);
            if (this.getName().equals(ctx.getValueContext().getServerConnectorProtocol())) {
                this.standalone = true;
                this.initActually(ctx, ctx.getConfig().getGlobal().getServerConnector());
            } else {
                this.standalone = false;
                ServerConnectorConfig serverConnectorConfig = null;
                for (ServerConnectorConfig c : ctx.getConfig().getGlobal().getServerConnectors()) {
                    if (!"grpc".equals(c.getProtocol())) continue;
                    serverConnectorConfig = c;
                }
                if (serverConnectorConfig != null) {
                    this.initActually(ctx, serverConnectorConfig);
                }
            }
        }
    }

    private void initActually(InitContext ctx, ServerConnectorConfig connectorConfig) {
        this.connectorConfig = (ServerConnectorConfigImpl)connectorConfig;
        this.readyFuture = new CompletableFuture();
        HashMap<ClusterType, CompletableFuture<String>> futures = new HashMap<ClusterType, CompletableFuture<String>>();
        futures.put(ClusterType.SERVICE_DISCOVER_CLUSTER, this.readyFuture);
        this.id = connectorConfig.getId();
        if (ctx.getConfig().getProvider().getRegisterConfigMap().containsKey(this.id)) {
            this.isRegisterEnable = ((RegisterConfig)ctx.getConfig().getProvider().getRegisterConfigMap().get(this.id)).isEnable();
            this.isReportServiceContractEnable = ((RegisterConfig)ctx.getConfig().getProvider().getRegisterConfigMap().get(this.id)).isReportServiceContractEnable();
        }
        if (ctx.getConfig().getConsumer().getDiscoveryConfigMap().containsKey(this.id)) {
            this.isDiscoveryEnable = ((DiscoveryConfig)ctx.getConfig().getConsumer().getDiscoveryConfigMap().get(this.id)).isEnable();
        }
        this.connectionManager = new ConnectionManager(ctx, connectorConfig, futures);
        this.connectionIdleTimeoutMs = connectorConfig.getConnectionIdleTimeout();
        this.messageTimeoutMs = connectorConfig.getMessageTimeout();
        this.sendDiscoverExecutor = new ScheduledThreadPoolExecutor(1, (ThreadFactory)new NamedThreadFactory(this.getName() + "-send-discovery"), new ThreadPoolExecutor.CallerRunsPolicy());
        this.sendDiscoverExecutor.setMaximumPoolSize(1);
        this.buildInExecutor = new ScheduledThreadPoolExecutor(0, (ThreadFactory)new NamedThreadFactory(this.getName() + "-builtin-discovery"), new ThreadPoolExecutor.CallerRunsPolicy());
        this.buildInExecutor.setMaximumPoolSize(1);
        this.streamClients.put(ClusterType.BUILTIN_CLUSTER, new AtomicReference());
        this.streamClients.put(ClusterType.SERVICE_DISCOVER_CLUSTER, new AtomicReference());
        this.updateServiceExecutor = new ScheduledThreadPoolExecutor(1, (ThreadFactory)new NamedThreadFactory(this.getName() + "-update-service"));
        this.updateServiceExecutor.setMaximumPoolSize(1);
        this.clientInstanceId = UUID.randomUUID().toString();
        this.initialized = true;
    }

    private void waitDiscoverReady() {
        try {
            this.readyFuture.get(this.messageTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        catch (TimeoutException e) {
            throw new RetriableException(ErrorCode.API_TIMEOUT, "discover service not ready");
        }
    }

    public void registerServiceHandler(ServiceEventHandler handler) {
        this.checkDestroyed();
        ServiceEventKey serviceEventKey = handler.getServiceEventKey();
        if (!this.checkEventSupported(serviceEventKey.getEventType())) {
            LOG.info("[ServerConnector] not supported event type for {}", (Object)serviceEventKey);
            handler.getEventHandler().onEventUpdate(new ServerEvent(serviceEventKey, (Object)this.buildEmptyResponse(serviceEventKey), null));
            return;
        }
        GrpcServiceUpdateTask serviceUpdateTask = new GrpcServiceUpdateTask(handler, this);
        this.submitServiceHandler(serviceUpdateTask, 0L);
    }

    private ResponseProto.DiscoverResponse buildEmptyResponse(ServiceEventKey serviceEventKey) {
        ResponseProto.DiscoverResponse.Builder builder = ResponseProto.DiscoverResponse.newBuilder();
        builder.setService(ServiceProto.Service.newBuilder().setName(StringValue.newBuilder().setValue(serviceEventKey.getService()).build()).setNamespace(StringValue.newBuilder().setValue(serviceEventKey.getNamespace()).build()));
        builder.setCode(UInt32Value.newBuilder().setValue(CodeProto.Code.ExecuteSuccess.getNumber()).build());
        builder.setType(GrpcUtil.buildDiscoverResponseType(serviceEventKey.getEventType()));
        return builder.build();
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private boolean checkEventSupported(final ServiceEventKey.EventType eventType) {
        Boolean aBoolean = this.supportedResourcesType.get(eventType);
        if (null != aBoolean) {
            return aBoolean;
        }
        Object object = this.lock;
        synchronized (object) {
            aBoolean = this.supportedResourcesType.get(eventType);
            if (null != aBoolean) {
                return aBoolean;
            }
            LOG.info("[ServerConnector] start to check compatible for event type {}", (Object)eventType);
            Connection connection = null;
            try {
                connection = this.connectionManager.getConnection("CheckCompatible", ClusterType.BUILTIN_CLUSTER);
                String reqId = GrpcUtil.nextGetInstanceReqId();
                PolarisGRPCGrpc.PolarisGRPCStub namingStub = PolarisGRPCGrpc.newStub((Channel)connection.getChannel());
                namingStub = GrpcUtil.attachRequestHeader(namingStub, reqId);
                namingStub = GrpcUtil.attachAccessToken(this.connectorConfig.getToken(), namingStub);
                final CountDownLatch countDownLatch = new CountDownLatch(1);
                StreamObserver discoverClient = namingStub.discover((StreamObserver)new StreamObserver<ResponseProto.DiscoverResponse>(){

                    public void onNext(ResponseProto.DiscoverResponse value) {
                        int code = value.getCode().getValue();
                        boolean supported = true;
                        if (code == CodeProto.Code.InvalidDiscoverResource.getNumber()) {
                            supported = false;
                        }
                        GrpcConnector.this.supportedResourcesType.put(eventType, supported);
                        LOG.info("[ServerConnector] success to check compatible for event type {}, result {}", (Object)eventType, (Object)supported);
                        countDownLatch.countDown();
                    }

                    public void onError(Throwable t) {
                        countDownLatch.countDown();
                        LOG.warn("[ServerConnector] fail to acquire check event type {}, cause: {}", (Object)eventType, (Object)t.getMessage());
                    }

                    public void onCompleted() {
                        countDownLatch.countDown();
                    }
                });
                RequestProto.DiscoverRequest.Builder req = RequestProto.DiscoverRequest.newBuilder();
                req.setType(GrpcUtil.buildDiscoverRequestType(eventType));
                discoverClient.onNext((Object)req.build());
                try {
                    countDownLatch.await();
                }
                catch (InterruptedException e) {
                    LOG.error("[ServerConnector] fail to wait check event type {}", (Object)eventType, (Object)e);
                }
                aBoolean = this.supportedResourcesType.get(eventType);
                if (null != aBoolean) {
                    boolean bl = aBoolean;
                    return bl;
                }
                LOG.error("[ServerConnector] timeout to wait check event type {}", (Object)eventType);
                throw new PolarisException(ErrorCode.API_TIMEOUT, "[ServerConnector] timeout to check compatible for event type " + eventType);
            }
            finally {
                if (null != connection) {
                    connection.release("CheckCompatible");
                }
            }
        }
    }

    protected void submitServiceHandler(ServiceUpdateTask updateTask, long delayMs) {
        ClusterType targetCluster = updateTask.getTargetClusterType();
        if (updateTask.setStatus(ServiceUpdateTaskConstant.Status.READY, ServiceUpdateTaskConstant.Status.RUNNING)) {
            if (targetCluster == ClusterType.BUILTIN_CLUSTER) {
                LOG.info("[ServerConnector]task for service {} has been scheduled builtin", (Object)updateTask);
                this.buildInExecutor.schedule((Runnable)updateTask, delayMs, TimeUnit.MILLISECONDS);
            } else {
                LOG.debug("[ServerConnector]task for service {} has been scheduled discover", (Object)updateTask);
                this.sendDiscoverExecutor.schedule((Runnable)updateTask, delayMs, TimeUnit.MILLISECONDS);
            }
        }
    }

    public void deRegisterServiceHandler(ServiceEventKey eventKey) throws PolarisException {
        this.checkDestroyed();
        ServiceUpdateTask serviceUpdateTask = (ServiceUpdateTask)this.updateTaskSet.get(eventKey);
        if (null != serviceUpdateTask) {
            boolean result = serviceUpdateTask.setType(ServiceUpdateTaskConstant.Type.LONG_RUNNING, ServiceUpdateTaskConstant.Type.TERMINATED);
            LOG.info("[ServerConnector]success to deRegister updateServiceTask {}, result is {}", (Object)eventKey, (Object)result);
        }
    }

    public CommonProviderResponse registerInstance(CommonProviderRequest req, Map<String, String> customHeader) throws PolarisException {
        if (!this.isRegisterEnable()) {
            return null;
        }
        this.checkDestroyed();
        Connection connection = null;
        ServiceKey serviceKey = new ServiceKey(req.getNamespace(), req.getService());
        try {
            this.waitDiscoverReady();
            connection = this.connectionManager.getConnection("RegisterInstance", ClusterType.SERVICE_DISCOVER_CLUSTER);
            req.setTargetServer(GrpcConnector.connectionToTargetNode(connection));
            PolarisGRPCGrpc.PolarisGRPCBlockingStub stub = PolarisGRPCGrpc.newBlockingStub((Channel)connection.getChannel());
            stub = GrpcUtil.attachRequestHeader(stub, GrpcUtil.nextInstanceRegisterReqId());
            stub = GrpcUtil.attachRequestHeader(stub, customHeader);
            stub = GrpcUtil.attachAccessToken(this.connectorConfig.getToken(), stub);
            ResponseProto.Response registerInstanceResponse = stub.registerInstance(this.buildRegisterInstanceRequest(req));
            GrpcUtil.checkResponse(registerInstanceResponse);
            if (!registerInstanceResponse.hasInstance()) {
                throw new PolarisException(ErrorCode.SERVER_USER_ERROR, "invalid register response: missing instance");
            }
            CommonProviderResponse resp = new CommonProviderResponse();
            resp.setInstanceID(registerInstanceResponse.getInstance().getId().getValue());
            resp.setExists(registerInstanceResponse.getCode().getValue() == 400201);
            CommonProviderResponse commonProviderResponse = resp;
            return commonProviderResponse;
        }
        catch (Throwable t) {
            if (t instanceof PolarisException) {
                throw t;
            }
            if (null != connection) {
                connection.reportFail(ErrorCode.NETWORK_ERROR);
            }
            GrpcUtil.checkGrpcException(t);
            throw new RetriableException(ErrorCode.NETWORK_ERROR, String.format("fail to register host %s:%d service %s", req.getHost(), req.getPort(), serviceKey), t);
        }
        finally {
            if (null != connection) {
                connection.release("RegisterInstance");
            }
        }
    }

    private ServiceProto.Instance buildRegisterInstanceRequest(CommonProviderRequest req) {
        ServiceProto.Instance.Builder instanceBuilder = ServiceProto.Instance.newBuilder();
        instanceBuilder.setService(StringValue.newBuilder().setValue(req.getService()).build());
        instanceBuilder.setNamespace(StringValue.newBuilder().setValue(req.getNamespace()).build());
        if (StringUtils.isNotBlank((String)req.getToken())) {
            instanceBuilder.setServiceToken(StringValue.newBuilder().setValue(req.getToken()).build());
        }
        instanceBuilder.setHost(StringValue.newBuilder().setValue(req.getHost()).build());
        instanceBuilder.setPort(UInt32Value.newBuilder().setValue(req.getPort()).build());
        if (StringUtils.isNotBlank((String)req.getProtocol())) {
            instanceBuilder.setProtocol(StringValue.newBuilder().setValue(req.getProtocol()).build());
        }
        if (StringUtils.isNotBlank((String)req.getVersion())) {
            instanceBuilder.setVersion(StringValue.newBuilder().setValue(req.getVersion()).build());
        }
        if (null != req.getWeight()) {
            instanceBuilder.setWeight(UInt32Value.newBuilder().setValue(req.getWeight().intValue()).build());
        }
        if (null != req.getPriority()) {
            instanceBuilder.setPriority(UInt32Value.newBuilder().setValue(req.getPriority().intValue()).build());
        }
        if (null != req.getMetadata()) {
            for (Map.Entry entry : req.getMetadata().entrySet()) {
                if (!StringUtils.isBlank((String)((String)entry.getValue()))) continue;
                entry.setValue("");
            }
            instanceBuilder.putAllMetadata(req.getMetadata());
        }
        if (null != req.getTtl()) {
            ServiceProto.HealthCheck.Builder healthCheckBuilder = ServiceProto.HealthCheck.newBuilder();
            healthCheckBuilder.setType(ServiceProto.HealthCheck.HealthCheckType.HEARTBEAT);
            healthCheckBuilder.setHeartbeat(ServiceProto.HeartbeatHealthCheck.newBuilder().setTtl(UInt32Value.newBuilder().setValue(req.getTtl().intValue()).build()).build());
            instanceBuilder.setHealthCheck(healthCheckBuilder.build());
        }
        ModelProto.Location.Builder locationBuilder = ModelProto.Location.newBuilder();
        if (StringUtils.isNotBlank((String)req.getRegion())) {
            locationBuilder.setRegion(StringValue.newBuilder().setValue(req.getRegion()));
        }
        if (StringUtils.isNotBlank((String)req.getZone())) {
            locationBuilder.setZone(StringValue.newBuilder().setValue(req.getZone()));
        }
        if (StringUtils.isNotBlank((String)req.getCampus())) {
            locationBuilder.setCampus(StringValue.newBuilder().setValue(req.getCampus()));
        }
        ModelProto.Location location = locationBuilder.build();
        instanceBuilder.setLocation(location);
        if (StringUtils.isNotEmpty((String)req.getInstanceID())) {
            instanceBuilder.setId(StringValue.newBuilder().setValue(req.getInstanceID()));
        }
        return instanceBuilder.build();
    }

    private ServiceProto.Instance buildHeartbeatRequest(CommonProviderRequest req) {
        ServiceProto.Instance.Builder instanceBuilder = ServiceProto.Instance.newBuilder();
        if (StringUtils.isNotBlank((String)req.getInstanceID())) {
            instanceBuilder.setId(StringValue.newBuilder().setValue(req.getInstanceID()).build());
        }
        if (StringUtils.isNotBlank((String)req.getService())) {
            instanceBuilder.setService(StringValue.newBuilder().setValue(req.getService()).build());
        }
        if (StringUtils.isNotBlank((String)req.getHost())) {
            instanceBuilder.setHost(StringValue.newBuilder().setValue(req.getHost()).build());
        }
        if (StringUtils.isNotBlank((String)req.getNamespace())) {
            instanceBuilder.setNamespace(StringValue.newBuilder().setValue(req.getNamespace()).build());
        }
        if (req.getPort() > 0) {
            instanceBuilder.setPort(UInt32Value.of((int)req.getPort()));
        }
        if (StringUtils.isNotBlank((String)req.getToken())) {
            instanceBuilder.setServiceToken(StringValue.newBuilder().setValue(req.getToken()).build());
        }
        return instanceBuilder.build();
    }

    private ClientProto.Client buildReportRequest(ReportClientRequest req) {
        ClientProto.Client.Builder builder = ClientProto.Client.newBuilder().setHost(StringValue.newBuilder().setValue(req.getClientHost())).setVersion(StringValue.newBuilder().setValue(req.getVersion()));
        Optional.ofNullable(req.getReporterMetaInfos()).ifPresent(reporterMetaInfos -> reporterMetaInfos.forEach(reporterMetaInfo -> builder.addStat(ClientProto.StatInfo.newBuilder().setTarget(StringValue.newBuilder().setValue(reporterMetaInfo.getTarget()).build()).setPort(UInt32Value.newBuilder().setValue(reporterMetaInfo.getPort().intValue()).build()).setPath(StringValue.newBuilder().setValue(reporterMetaInfo.getPath()).build()).setProtocol(StringValue.newBuilder().setValue(reporterMetaInfo.getProtocol()).build()).build())));
        builder.setId(StringValue.newBuilder().setValue(this.clientInstanceId).build());
        return builder.build();
    }

    private ServiceProto.Instance buildDeregisterInstanceRequest(CommonProviderRequest req) {
        ServiceProto.Instance.Builder instanceBuilder = ServiceProto.Instance.newBuilder();
        if (StringUtils.isNotBlank((String)req.getInstanceID())) {
            instanceBuilder.setId(StringValue.newBuilder().setValue(req.getInstanceID()).build());
        }
        if (StringUtils.isNotBlank((String)req.getNamespace())) {
            instanceBuilder.setNamespace(StringValue.newBuilder().setValue(req.getNamespace()).build());
        }
        if (StringUtils.isNotBlank((String)req.getService())) {
            instanceBuilder.setService(StringValue.newBuilder().setValue(req.getService()).build());
        }
        if (StringUtils.isNotBlank((String)req.getHost())) {
            instanceBuilder.setHost(StringValue.newBuilder().setValue(req.getHost()).build());
        }
        if (req.getPort() > 0) {
            instanceBuilder.setPort(UInt32Value.of((int)req.getPort()));
        }
        if (StringUtils.isNotBlank((String)req.getToken())) {
            instanceBuilder.setServiceToken(StringValue.newBuilder().setValue(req.getToken()).build());
        }
        return instanceBuilder.build();
    }

    public void deregisterInstance(CommonProviderRequest req) throws PolarisException {
        if (!this.isRegisterEnable()) {
            return;
        }
        this.checkDestroyed();
        Connection connection = null;
        ServiceKey serviceKey = new ServiceKey(req.getNamespace(), req.getService());
        try {
            this.waitDiscoverReady();
            connection = this.connectionManager.getConnection("DeregisterInstance", ClusterType.SERVICE_DISCOVER_CLUSTER);
            req.setTargetServer(GrpcConnector.connectionToTargetNode(connection));
            PolarisGRPCGrpc.PolarisGRPCBlockingStub stub = PolarisGRPCGrpc.newBlockingStub((Channel)connection.getChannel());
            stub = GrpcUtil.attachRequestHeader(stub, GrpcUtil.nextInstanceDeRegisterReqId());
            stub = GrpcUtil.attachAccessToken(this.connectorConfig.getToken(), stub);
            ResponseProto.Response deregisterInstanceResponse = stub.deregisterInstance(this.buildDeregisterInstanceRequest(req));
            GrpcUtil.checkResponse(deregisterInstanceResponse);
            LOG.debug("received deregister response {}", (Object)deregisterInstanceResponse);
        }
        catch (Throwable t) {
            if (t instanceof PolarisException) {
                throw t;
            }
            if (null != connection) {
                connection.reportFail(ErrorCode.NETWORK_ERROR);
            }
            GrpcUtil.checkGrpcException(t);
            throw new RetriableException(ErrorCode.NETWORK_ERROR, String.format("fail to deregister id %s, host %s:%d service %s", req.getInstanceID(), req.getHost(), req.getPort(), serviceKey), t);
        }
        finally {
            if (null != connection) {
                connection.release("DeregisterInstance");
            }
        }
    }

    public void heartbeat(CommonProviderRequest req) throws PolarisException {
        if (!this.isRegisterEnable()) {
            return;
        }
        this.checkDestroyed();
        Connection connection = null;
        ServiceKey serviceKey = new ServiceKey(req.getNamespace(), req.getService());
        long startTimestamp = 0L;
        try {
            this.waitDiscoverReady();
            connection = this.connectionManager.getConnection("InstanceHeartbeat", ClusterType.HEALTH_CHECK_CLUSTER);
            req.setTargetServer(GrpcConnector.connectionToTargetNode(connection));
            PolarisGRPCGrpc.PolarisGRPCBlockingStub stub = PolarisGRPCGrpc.newBlockingStub((Channel)connection.getChannel());
            stub = GrpcUtil.attachRequestHeader(stub, GrpcUtil.nextHeartbeatReqId());
            stub = GrpcUtil.attachAccessToken(this.connectorConfig.getToken(), stub);
            startTimestamp = System.currentTimeMillis();
            LOG.debug("start heartbeat at {} ms.", (Object)startTimestamp);
            ResponseProto.Response heartbeatResponse = ((PolarisGRPCGrpc.PolarisGRPCBlockingStub)stub.withDeadlineAfter(req.getTimeoutMs(), TimeUnit.MILLISECONDS)).heartbeat(this.buildHeartbeatRequest(req));
            GrpcUtil.checkResponse(heartbeatResponse);
            LOG.debug("received heartbeat response {}", (Object)heartbeatResponse);
        }
        catch (Throwable t) {
            if (t instanceof PolarisException) {
                throw t;
            }
            if (null != connection) {
                connection.reportFail(ErrorCode.NETWORK_ERROR);
            }
            GrpcUtil.checkGrpcException(t);
            throw new RetriableException(ErrorCode.NETWORK_ERROR, String.format("fail to heartbeat id %s, host %s:%d service %s", req.getInstanceID(), req.getHost(), req.getPort(), serviceKey), t);
        }
        finally {
            long endTimestamp = System.currentTimeMillis();
            LOG.debug("end heartbeat at {} ms. Diff {} ms", (Object)endTimestamp, (Object)(endTimestamp - startTimestamp));
            if (null != connection) {
                connection.release("InstanceHeartbeat");
            }
        }
    }

    public ReportClientResponse reportClient(ReportClientRequest req) throws PolarisException {
        this.checkDestroyed();
        this.waitDiscoverReady();
        Connection connection = null;
        ServiceKey serviceKey = new ServiceKey(req.getNamespace(), req.getService());
        try {
            connection = this.connectionManager.getConnection("ReportClient", ClusterType.SERVICE_DISCOVER_CLUSTER);
            req.setTargetServer(GrpcConnector.connectionToTargetNode(connection));
            PolarisGRPCGrpc.PolarisGRPCBlockingStub stub = PolarisGRPCGrpc.newBlockingStub((Channel)connection.getChannel());
            stub = GrpcUtil.attachRequestHeader(stub, GrpcUtil.nextHeartbeatReqId());
            stub = GrpcUtil.attachAccessToken(this.connectorConfig.getToken(), stub);
            ClientProto.Client request = this.buildReportRequest(req);
            ResponseProto.Response response = stub.reportClient(request);
            LOG.debug("reportClient req:{}, rsp:{}", (Object)req, (Object)TextFormat.shortDebugString((MessageOrBuilder)response));
            GrpcUtil.checkResponse(response);
            ReportClientResponse rsp = new ReportClientResponse();
            if (null == response.getClient().getLocation()) {
                throw new IllegalStateException(String.format("unexpected null response from clientReport api, response:%s", TextFormat.shortDebugString((MessageOrBuilder)response)));
            }
            rsp.setCampus(response.getClient().getLocation().getCampus().getValue());
            rsp.setZone(response.getClient().getLocation().getZone().getValue());
            rsp.setRegion(response.getClient().getLocation().getRegion().getValue());
            ReportClientResponse reportClientResponse = rsp;
            return reportClientResponse;
        }
        catch (Throwable t) {
            if (t instanceof PolarisException) {
                throw t;
            }
            if (null != connection) {
                connection.reportFail(ErrorCode.NETWORK_ERROR);
            }
            GrpcUtil.checkGrpcException(t);
            throw new RetriableException(ErrorCode.NETWORK_ERROR, String.format("fail to report client host %s, version %s service %s", req.getClientHost(), req.getVersion(), serviceKey), t);
        }
        finally {
            if (null != connection) {
                connection.release("ReportClient");
            }
        }
    }

    private ServiceContractProto.ServiceContract buildReportServiceContractRequest(ReportServiceContractRequest req) {
        ServiceContractProto.ServiceContract.Builder serviceContractBuilder = ServiceContractProto.ServiceContract.newBuilder();
        serviceContractBuilder.setName(StringUtils.defaultString((String)req.getName()));
        serviceContractBuilder.setService(StringUtils.defaultString((String)req.getService()));
        serviceContractBuilder.setNamespace(StringUtils.defaultString((String)req.getNamespace()));
        serviceContractBuilder.setProtocol(StringUtils.defaultString((String)req.getProtocol()));
        serviceContractBuilder.setVersion(StringUtils.defaultString((String)req.getVersion()));
        serviceContractBuilder.setContent(StringUtils.defaultString((String)req.getContent()));
        serviceContractBuilder.setRevision(StringUtils.defaultString((String)req.getRevision()));
        ArrayList<ServiceContractProto.InterfaceDescriptor> interfaceDescriptorList = new ArrayList<ServiceContractProto.InterfaceDescriptor>();
        for (InterfaceDescriptor i : req.getInterfaceDescriptors()) {
            ServiceContractProto.InterfaceDescriptor.Builder interfaceDescriptorBuilder = ServiceContractProto.InterfaceDescriptor.newBuilder();
            interfaceDescriptorBuilder.setName(StringUtils.defaultString((String)i.getName()));
            interfaceDescriptorBuilder.setMethod(StringUtils.defaultString((String)i.getMethod()));
            interfaceDescriptorBuilder.setPath(StringUtils.defaultString((String)i.getPath()));
            interfaceDescriptorBuilder.setContent(StringUtils.defaultString((String)i.getContent()));
            interfaceDescriptorList.add(interfaceDescriptorBuilder.build());
        }
        serviceContractBuilder.addAllInterfaces(interfaceDescriptorList);
        return serviceContractBuilder.build();
    }

    public ReportServiceContractResponse reportServiceContract(ReportServiceContractRequest req) throws PolarisException {
        if (!this.isReportServiceContractEnable()) {
            return null;
        }
        this.checkDestroyed();
        Connection connection = null;
        ServiceKey serviceKey = new ServiceKey(req.getNamespace(), req.getService());
        try {
            this.waitDiscoverReady();
            connection = this.connectionManager.getConnection("ReportServiceContract", ClusterType.SERVICE_DISCOVER_CLUSTER);
            req.setTargetServer(GrpcConnector.connectionToTargetNode(connection));
            PolarisServiceContractGRPCGrpc.PolarisServiceContractGRPCBlockingStub stub = PolarisServiceContractGRPCGrpc.newBlockingStub((Channel)connection.getChannel());
            stub = GrpcUtil.attachRequestHeader(stub, GrpcUtil.nextReportServiceContractReqId());
            stub = GrpcUtil.attachAccessToken(this.connectorConfig.getToken(), stub);
            ResponseProto.Response reportServiceContractResponse = stub.reportServiceContract(this.buildReportServiceContractRequest(req));
            GrpcUtil.checkResponse(reportServiceContractResponse);
            ReportServiceContractResponse reportServiceContractResponse2 = new ReportServiceContractResponse();
            return reportServiceContractResponse2;
        }
        catch (Throwable t) {
            if (t instanceof PolarisException) {
                throw t;
            }
            if (null != connection) {
                connection.reportFail(ErrorCode.NETWORK_ERROR);
            }
            GrpcUtil.checkGrpcException(t);
            throw new RetriableException(ErrorCode.NETWORK_ERROR, String.format("fail to report service contract, service %s", serviceKey), t);
        }
        finally {
            if (null != connection) {
                connection.release("ReportServiceContract");
            }
        }
    }

    public ServiceRuleByProto getServiceContract(CommonServiceContractRequest req) throws PolarisException {
        if (!this.isReportServiceContractEnable()) {
            return null;
        }
        this.checkDestroyed();
        Connection connection = null;
        ServiceKey serviceKey = new ServiceKey(req.getNamespace(), req.getService());
        try {
            this.waitDiscoverReady();
            connection = this.connectionManager.getConnection("ReportServiceContract", ClusterType.SERVICE_DISCOVER_CLUSTER);
            req.setTargetServer(GrpcConnector.connectionToTargetNode(connection));
            PolarisServiceContractGRPCGrpc.PolarisServiceContractGRPCBlockingStub stub = PolarisServiceContractGRPCGrpc.newBlockingStub((Channel)connection.getChannel());
            stub = GrpcUtil.attachRequestHeader(stub, GrpcUtil.nextReportServiceContractReqId());
            stub = GrpcUtil.attachAccessToken(this.connectorConfig.getToken(), stub);
            ResponseProto.Response response = stub.getServiceContract(req.toQuerySpec());
            GrpcUtil.checkResponse(response);
            ServiceContractProto.ServiceContract remoteVal = response.getServiceContract();
            ServiceRuleByProto serviceRuleByProto = new ServiceRuleByProto((Message)remoteVal, remoteVal.getRevision(), false, ServiceEventKey.EventType.SERVICE_CONTRACT);
            return serviceRuleByProto;
        }
        catch (Throwable t) {
            if (t instanceof PolarisException) {
                throw t;
            }
            if (null != connection) {
                connection.reportFail(ErrorCode.NETWORK_ERROR);
            }
            GrpcUtil.checkGrpcException(t);
            throw new RetriableException(ErrorCode.NETWORK_ERROR, String.format("fail to report service contract, service %s", serviceKey), t);
        }
        finally {
            if (null != connection) {
                connection.release("ReportServiceContract");
            }
        }
    }

    public void updateServers(ServiceEventKey svcEventKey) {
        this.connectionManager.makeReady(svcEventKey);
    }

    public boolean isInitialized() {
        return this.initialized;
    }

    public String getName() {
        return "grpc";
    }

    public String getId() {
        return this.id;
    }

    public boolean isRegisterEnable() {
        return this.isRegisterEnable;
    }

    public boolean isDiscoveryEnable() {
        return this.isDiscoveryEnable;
    }

    public boolean isReportServiceContractEnable() {
        return this.isReportServiceContractEnable;
    }

    public PluginType getType() {
        return PluginTypes.SERVER_CONNECTOR.getBaseType();
    }

    public void postContextInit(Extensions extensions) throws PolarisException {
        if (this.initialized) {
            this.connectionManager.setExtensions(extensions);
            this.updateServiceExecutor.scheduleWithFixedDelay(new ClearIdleStreamClientTask(), this.connectionIdleTimeoutMs, this.connectionIdleTimeoutMs, TimeUnit.MILLISECONDS);
            if (this.standalone) {
                this.updateServiceExecutor.scheduleWithFixedDelay((Runnable)new DestroyableServerConnector.UpdateServiceTask((DestroyableServerConnector)this), 500L, 500L, TimeUnit.MILLISECONDS);
            }
        }
    }

    public void doDestroy() {
        if (this.initialized) {
            LOG.info("start to destroy connector {}", (Object)this.getName());
            ThreadPoolUtils.waitAndStopThreadPools((ExecutorService[])new ExecutorService[]{this.sendDiscoverExecutor, this.buildInExecutor, this.updateServiceExecutor});
            this.destroyStreamClient();
            if (null != this.connectionManager) {
                this.connectionManager.destroy();
            }
        }
    }

    public ConnectionManager getConnectionManager() {
        return this.connectionManager;
    }

    public AtomicReference<SpecStreamClient> getStreamClient(ClusterType clusterType) {
        return this.streamClients.get(clusterType);
    }

    public long getConnectionIdleTimeoutMs() {
        return this.connectionIdleTimeoutMs;
    }

    private void destroyStreamClient() {
        for (AtomicReference<SpecStreamClient> streamClientRef : this.streamClients.values()) {
            SpecStreamClient streamClient = streamClientRef.get();
            if (null == streamClient) continue;
            streamClient.closeStream(true);
        }
    }

    public ServerConnectorConfigImpl getConnectorConfig() {
        return this.connectorConfig;
    }

    private class ClearIdleStreamClientTask
    implements Runnable {
        private ClearIdleStreamClientTask() {
        }

        @Override
        public void run() {
            for (AtomicReference streamClientRef : GrpcConnector.this.streamClients.values()) {
                SpecStreamClient streamClient = (SpecStreamClient)streamClientRef.get();
                if (null == streamClient) continue;
                streamClient.syncCloseExpireStream();
            }
        }
    }
}

