/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.common.remote.client.grpc;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.grpc.auto.BiRequestStreamGrpc;
import com.alibaba.nacos.api.grpc.auto.Payload;
import com.alibaba.nacos.api.grpc.auto.RequestGrpc;
import com.alibaba.nacos.api.remote.request.ConnectionSetupRequest;
import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.request.ServerCheckRequest;
import com.alibaba.nacos.api.remote.response.ErrorResponse;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.remote.response.ServerCheckResponse;
import com.alibaba.nacos.common.remote.ConnectionType;
import com.alibaba.nacos.common.remote.client.Connection;
import com.alibaba.nacos.common.remote.client.RpcClient;
import com.alibaba.nacos.common.remote.client.RpcClientStatus;
import com.alibaba.nacos.common.remote.client.grpc.GrpcConnection;
import com.alibaba.nacos.common.remote.client.grpc.GrpcUtils;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.common.utils.ThreadFactoryBuilder;
import com.alibaba.nacos.common.utils.ThreadUtils;
import com.alibaba.nacos.common.utils.VersionUtils;
import com.alibaba.nacos.shaded.com.google.common.util.concurrent.ListenableFuture;
import com.alibaba.nacos.shaded.io.grpc.CompressorRegistry;
import com.alibaba.nacos.shaded.io.grpc.DecompressorRegistry;
import com.alibaba.nacos.shaded.io.grpc.ManagedChannel;
import com.alibaba.nacos.shaded.io.grpc.ManagedChannelBuilder;
import com.alibaba.nacos.shaded.io.grpc.stub.StreamObserver;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class GrpcClient
extends RpcClient {
    static final Logger LOGGER = LoggerFactory.getLogger(GrpcClient.class);
    protected static final String NACOS_SERVER_GRPC_PORT_OFFSET_KEY = "nacos.server.grpc.port.offset";
    private ThreadPoolExecutor grpcExecutor = null;
    private Integer threadPoolCoreSize;
    private Integer threadPoolMaxSize;
    private static final long DEFAULT_MAX_INBOUND_MESSAGE_SIZE = 0xA00000L;
    private static final long DEFAULT_KEEP_ALIVE_TIME = 360000L;

    @Override
    public ConnectionType getConnectionType() {
        return ConnectionType.GRPC;
    }

    public GrpcClient(String name) {
        super(name);
    }

    public void setThreadPoolCoreSize(Integer threadPoolCoreSize) {
        this.threadPoolCoreSize = threadPoolCoreSize;
    }

    public void setThreadPoolMaxSize(Integer threadPoolMaxSize) {
        this.threadPoolMaxSize = threadPoolMaxSize;
    }

    protected Integer getThreadPoolCoreSize() {
        return this.threadPoolCoreSize != null ? this.threadPoolCoreSize : ThreadUtils.getSuitableThreadCount(2);
    }

    protected Integer getThreadPoolMaxSize() {
        return this.threadPoolMaxSize != null ? this.threadPoolMaxSize : ThreadUtils.getSuitableThreadCount(8);
    }

    protected ThreadPoolExecutor createGrpcExecutor(String serverIp) {
        ThreadPoolExecutor grpcExecutor = new ThreadPoolExecutor((int)this.getThreadPoolCoreSize(), (int)this.getThreadPoolMaxSize(), 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10000), new ThreadFactoryBuilder().daemon(true).nameFormat("nacos-grpc-client-executor-" + serverIp + "-%d").build());
        grpcExecutor.allowCoreThreadTimeOut(true);
        return grpcExecutor;
    }

    @Override
    public void shutdown() throws NacosException {
        super.shutdown();
        if (this.grpcExecutor != null) {
            LOGGER.info("Shutdown grpc executor " + this.grpcExecutor);
            this.grpcExecutor.shutdown();
        }
    }

    private RequestGrpc.RequestFutureStub createNewChannelStub(String serverIp, int serverPort) {
        Object o = ((ManagedChannelBuilder)((ManagedChannelBuilder)((ManagedChannelBuilder)((ManagedChannelBuilder)((ManagedChannelBuilder)ManagedChannelBuilder.forAddress(serverIp, serverPort).executor(this.grpcExecutor)).compressorRegistry(CompressorRegistry.getDefaultInstance())).decompressorRegistry(DecompressorRegistry.getDefaultInstance())).maxInboundMessageSize(this.getInboundMessageSize())).keepAliveTime(this.keepAliveTimeMillis(), TimeUnit.MILLISECONDS)).usePlaintext();
        ManagedChannel managedChannelTemp = ((ManagedChannelBuilder)o).build();
        return RequestGrpc.newFutureStub(managedChannelTemp);
    }

    private int getInboundMessageSize() {
        String messageSize = System.getProperty("nacos.remote.client.grpc.maxinbound.message.size", String.valueOf(0xA00000L));
        return Integer.parseInt(messageSize);
    }

    private int keepAliveTimeMillis() {
        String keepAliveTimeMillis = System.getProperty("nacos.remote.grpc.keep.alive.millis", String.valueOf(360000L));
        return Integer.parseInt(keepAliveTimeMillis);
    }

    private void shuntDownChannel(ManagedChannel managedChannel) {
        if (managedChannel != null && !managedChannel.isShutdown()) {
            managedChannel.shutdownNow();
        }
    }

    private Response serverCheck(String ip, int port, RequestGrpc.RequestFutureStub requestBlockingStub) {
        try {
            if (requestBlockingStub == null) {
                return null;
            }
            ServerCheckRequest serverCheckRequest = new ServerCheckRequest();
            Payload grpcRequest = GrpcUtils.convert(serverCheckRequest);
            ListenableFuture<Payload> responseFuture = requestBlockingStub.request(grpcRequest);
            Payload response = (Payload)responseFuture.get(3000L, TimeUnit.MILLISECONDS);
            return (Response)GrpcUtils.parse(response);
        }
        catch (Exception e) {
            LoggerUtils.printIfErrorEnabled(LOGGER, "Server check fail, please check server {} ,port {} is available , error ={}", ip, port, e);
            return null;
        }
    }

    private StreamObserver<Payload> bindRequestStream(BiRequestStreamGrpc.BiRequestStreamStub streamStub, final GrpcConnection grpcConn) {
        return streamStub.requestBiStream(new StreamObserver<Payload>(){

            @Override
            public void onNext(Payload payload) {
                block5: {
                    LoggerUtils.printIfDebugEnabled(LOGGER, "[{}]Stream server request receive, original info: {}", grpcConn.getConnectionId(), payload.toString());
                    try {
                        Object parseBody = GrpcUtils.parse(payload);
                        Request request = (Request)parseBody;
                        if (request == null) break block5;
                        try {
                            Response response = GrpcClient.this.handleServerRequest(request);
                            if (response != null) {
                                response.setRequestId(request.getRequestId());
                                GrpcClient.this.sendResponse(response);
                                break block5;
                            }
                            LOGGER.warn("[{}]Fail to process server request, ackId->{}", (Object)grpcConn.getConnectionId(), (Object)request.getRequestId());
                        }
                        catch (Exception e) {
                            LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Handle server request exception: {}", grpcConn.getConnectionId(), payload.toString(), e.getMessage());
                            Response errResponse = ErrorResponse.build(-500, "Handle server request error");
                            errResponse.setRequestId(request.getRequestId());
                            GrpcClient.this.sendResponse(errResponse);
                        }
                    }
                    catch (Exception e) {
                        LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Error to process server push response: {}", grpcConn.getConnectionId(), payload.getBody().getValue().toStringUtf8());
                    }
                }
            }

            @Override
            public void onError(Throwable throwable) {
                boolean isRunning = GrpcClient.this.isRunning();
                boolean isAbandon = grpcConn.isAbandon();
                if (isRunning && !isAbandon) {
                    LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream error, switch server,error={}", grpcConn.getConnectionId(), throwable);
                    if (GrpcClient.this.rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                        GrpcClient.this.switchServerAsync();
                    }
                } else {
                    LoggerUtils.printIfWarnEnabled(LOGGER, "[{}]Ignore error event,isRunning:{},isAbandon={}", grpcConn.getConnectionId(), isRunning, isAbandon);
                }
            }

            @Override
            public void onCompleted() {
                boolean isRunning = GrpcClient.this.isRunning();
                boolean isAbandon = grpcConn.isAbandon();
                if (isRunning && !isAbandon) {
                    LoggerUtils.printIfErrorEnabled(LOGGER, "[{}]Request stream onCompleted, switch server", grpcConn.getConnectionId());
                    if (GrpcClient.this.rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                        GrpcClient.this.switchServerAsync();
                    }
                } else {
                    LoggerUtils.printIfInfoEnabled(LOGGER, "[{}]Ignore complete event,isRunning:{},isAbandon={}", grpcConn.getConnectionId(), isRunning, isAbandon);
                }
            }
        });
    }

    private void sendResponse(Response response) {
        try {
            ((GrpcConnection)this.currentConnection).sendResponse(response);
        }
        catch (Exception e) {
            LOGGER.error("[{}]Error to send ack response, ackId->{}", (Object)this.currentConnection.getConnectionId(), (Object)response.getRequestId());
        }
    }

    @Override
    public Connection connectToServer(RpcClient.ServerInfo serverInfo) {
        try {
            if (this.grpcExecutor == null) {
                this.grpcExecutor = this.createGrpcExecutor(serverInfo.getServerIp());
            }
            int port = serverInfo.getServerPort() + this.rpcPortOffset();
            RequestGrpc.RequestFutureStub newChannelStubTemp = this.createNewChannelStub(serverInfo.getServerIp(), port);
            if (newChannelStubTemp != null) {
                Response response = this.serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);
                if (response == null || !(response instanceof ServerCheckResponse)) {
                    this.shuntDownChannel((ManagedChannel)newChannelStubTemp.getChannel());
                    return null;
                }
                BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc.newStub(newChannelStubTemp.getChannel());
                GrpcConnection grpcConn = new GrpcConnection(serverInfo, this.grpcExecutor);
                grpcConn.setConnectionId(((ServerCheckResponse)response).getConnectionId());
                StreamObserver<Payload> payloadStreamObserver = this.bindRequestStream(biRequestStreamStub, grpcConn);
                grpcConn.setPayloadStreamObserver(payloadStreamObserver);
                grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);
                grpcConn.setChannel((ManagedChannel)newChannelStubTemp.getChannel());
                ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();
                conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());
                conSetupRequest.setLabels(super.getLabels());
                conSetupRequest.setAbilities(this.clientAbilities);
                conSetupRequest.setTenant(super.getTenant());
                grpcConn.sendRequest(conSetupRequest);
                Thread.sleep(100L);
                return grpcConn;
            }
            return null;
        }
        catch (Exception e) {
            LOGGER.error("[{}]Fail to connect to server!,error={}", (Object)this.getName(), (Object)e);
            return null;
        }
    }
}

