国产av日韩一区二区三区精品,成人性爱视频在线观看,国产,欧美,日韩,一区,www.成色av久久成人,2222eeee成人天堂

目次
測(cè)試 " >測(cè)試
ホームページ Java &#&面接の質(zhì)問 アリババのインタビュアー: RPC フレームワークを手書きで書いてください。

アリババのインタビュアー: RPC フレームワークを手書きで書いてください。

Aug 17, 2023 pm 04:24 PM
RPC フレームワーク

面接官がよく尋ねる質(zhì)問:

  • 登録センターをどのように設(shè)計(jì)するか?
  • メッセージ キューを設(shè)計(jì)するにはどうすればよいですか?
  • 永続化フレームワークを設(shè)計(jì)するにはどうすればよいですか?
  • RPC フレームワークを設(shè)計(jì)するにはどうすればよいですか?
  • ......

今日は、「RPC 実裝原則」について話しましょう (その他の関連トピック: XX の設(shè)計(jì)方法)シリーズ、Knowledge Planet に掲載されています)それでは、まず質(zhì)問を明確にしましょう: RPC とは何ですか?

RPCとはRemote Procedure Callの略で、遠(yuǎn)隔手続き呼び出しのことです。

#RPC はコンピュータ通信プロトコルです。このプロトコルを使用すると、開発者がこの対話を追加でプログラムすることなく、あるコンピュータ上で実行されているプログラムが別のコンピュータ上のサブルーチンを呼び出すことができます。

2 つ以上のアプリケーションが異なるサーバーに分散されており、アプリケーション間の呼び出しはローカル メソッド呼び出しに似ていることに注意してください。次に、RPC 呼び出しで何が起こるかを分析してみましょう。

RPC 呼び出しの基本プロセス

提供されているものなど、業(yè)界で最も人気のある RPC フレームワークのいくつかby Dubbo インターフェイスベースのリモート メソッド呼び出しは、クライアントがリモート サービスを呼び出すためにインターフェイスの定義を知る必要があるだけであることを意味します。 Java では、インターフェイスはインスタンス メソッドを直接呼び出すことができません。この操作は実裝クラス オブジェクトを通じて実行する必要があります。つまり、クライアントはこれらのインターフェイスのプロキシ オブジェクトを生成する必要があります。この目的のために、Java は動(dòng)的プロキシを生成するための Proxy および InvocationHandler のサポートを提供します。プロキシ オブジェクトがある場(chǎng)合、それぞれの特定のメソッドはどのように呼び出されますか? JDK ダイナミック プロキシによって生成されたプロキシ オブジェクトが指定されたメソッドを呼び出すと、InvocationHandler で定義された #invoke メソッドが実際に実行され、リモート メソッドの呼び出しが完了して結(jié)果が取得されます。

クライアントのことはさておき、振り返ってみると、RPC は 2 臺(tái)のコンピュータ間の呼び出しです。本質(zhì)的には 2 臺(tái)のホスト間のネットワーク通信です。ネットワーク通信に関しては、シリアル化、逆シリアル化、エンコードとデコードなどが必要です。考慮する必要がある問題と同時(shí)に、実際、ほとんどのシステムはクラスター內(nèi)にデプロイされています。複數(shù)のホスト/コンテナーが同じサービスを外部に提供します。クラスター內(nèi)のノードの數(shù)が多い場(chǎng)合、サービス アドレスの管理も難しくなります。一般的には、各サービスノードがそのアドレスと提供するサービスリストを登録センターに登録し、登録センターがサービスリストを一元管理することで、いくつかの問題を解決し、クライアントに新しい機(jī)能を追加します。ジョブ、つまりサービス ディスカバリは、一般的に言えば、登録センターからリモート方式に対応するサービス リストを検索し、そこから特定の戦略に従ってサービス アドレスを選択して、ネットワーク通信を完了することです。

クライアントと登録センターについて説明した後、もう 1 つの重要な役割は當(dāng)然サーバーです。サーバーの最も重要なタスクは、サービス インターフェイスの実際の実裝を提供し、特定のポート上のネットワーク リクエストを監(jiān)視することです。リクエストの後、対応するパラメータ (サービス インターフェイス、メソッド、リクエスト パラメータなど) がネットワーク リクエストから取得され、これらのパラメータに基づいて、リフレクションを通じてインターフェイスの実際の実裝が呼び出され、結(jié)果が取得されます。対応する応答ストリームに書き込まれます。

要約すると、基本的な RPC 呼び出しプロセスは大まかに次のとおりです。

アリババのインタビュアー: RPC フレームワークを手書きで書いてください。

# #基本的な実裝

サーバー (プロデューサー)

サービス インターフェイス:

RPC では、プロダクションの作成者とコンシューマー共通のサービス インターフェイス API を持っています。以下のようにHelloServiceインターフェースを定義します。

/**
 * @Descrption  服務(wù)接口
 ***/
public interface HelloService {
    String sayHello(String somebody);
}

サービス実裝:

プロデューサは、サービス インターフェイスの実裝を提供し、HelloServiceImpl 実裝クラスを作成する必要があります。

/**
 * @Descrption 服務(wù)實(shí)現(xiàn)
 ***/
public class HelloServiceImpl implements HelloService {
    @Override
    public String sayHello(String somebody) {
        return "hello " + somebody + "!";
    }
}

サービス登録:

この例では、Spring を使用して Bean を管理し、カスタム XML とパーサーを使用してサービス実裝クラスをコンテナにロードします (もちろん、カスタム アノテーションも使用できます) (ここでは説明しません) サービス インターフェイス情報(bào)を登録センターに登録します。

最初に XSD をカスタマイズします:

<xsd:element name="service">
    <xsd:complexType>
        <xsd:complexContent>
            <xsd:extension base="beans:identifiedType">
                <xsd:attribute name="interface" type="xsd:string" use="required"/>
                <xsd:attribute name="timeout" type="xsd:int" use="required"/>
                <xsd:attribute name="serverPort" type="xsd:int" use="required"/>
                <xsd:attribute name="ref" type="xsd:string" use="required"/>
                <xsd:attribute name="weight" type="xsd:int" use="optional"/>
                <xsd:attribute name="workerThreads" type="xsd:int" use="optional"/>
                <xsd:attribute name="appKey" type="xsd:string" use="required"/>
                <xsd:attribute name="groupName" type="xsd:string" use="optional"/>
            </xsd:extension>
        </xsd:complexContent>
    </xsd:complexType>
</xsd:element>

スキーマと XSD、スキーマと対応するハンドラーのマッピングをそれぞれ指定します。

スキーマ:

http\://www.storm.com/schema/storm-service.xsd=META-INF/storm-service.xsd
http\://www.storm.com/schema/storm-reference.xsd=META-INF/storm-reference.xsd

ハンドラー:

http\://www.storm.com/schema/storm-service=com.hsunfkqm.storm.framework.spring.StormServiceNamespaceHandler
http\://www.storm.com/schema/storm-reference=com.hsunfkqm.storm.framework.spring.StormRemoteReferenceNamespaceHandler

書き込んだファイルをクラスパスの META-INF ディレクトリに置きます:

アリババのインタビュアー: RPC フレームワークを手書きで書いてください。
圖片

在 Spring 配置文件中配置服務(wù)類:

<!-- 發(fā)布遠(yuǎn)程服務(wù) -->
 <bean id="helloService" class="com.hsunfkqm.storm.framework.test.HelloServiceImpl"/>
 <storm:service id="helloServiceRegister"
                     interface="com.hsunfkqm.storm.framework.test.HelloService"
                     ref="helloService"
                     groupName="default"
                     weight="2"
                     appKey="ares"
                     workerThreads="100"
                     serverPort="8081"
                     timeout="600"/>

編寫對(duì)應(yīng)的 Handler 和 Parser:

StormServiceNamespaceHandler:

import org.springframework.beans.factory.xml.NamespaceHandlerSupport;

/**
 * @author 孫浩
 * @Descrption 服務(wù)發(fā)布自定義標(biāo)簽
 ***/
public class StormServiceNamespaceHandler extends NamespaceHandlerSupport {
    @Override
    public void init() {
        registerBeanDefinitionParser("service", new ProviderFactoryBeanDefinitionParser());
    }
}

ProviderFactoryBeanDefinitionParser:

protected Class getBeanClass(Element element) {
        return ProviderFactoryBean.class;
    }

    protected void doParse(Element element, BeanDefinitionBuilder bean) {

        try {
            String serviceItf = element.getAttribute("interface");
            String serverPort = element.getAttribute("serverPort");
            String ref = element.getAttribute("ref");
            // ....
            bean.addPropertyValue("serverPort", Integer.parseInt(serverPort));
            bean.addPropertyValue("serviceItf", Class.forName(serviceItf));
            bean.addPropertyReference("serviceObject", ref);
            //...
            if (NumberUtils.isNumber(weight)) {
                bean.addPropertyValue("weight", Integer.parseInt(weight));
            }
            //...
       } catch (Exception e) {
            // ...        
      }
    }

ProviderFactoryBean:

/**
 * @Descrption 服務(wù)發(fā)布
 ***/
public class ProviderFactoryBean implements FactoryBean, InitializingBean {

    //服務(wù)接口
    private Class<?> serviceItf;
    //服務(wù)實(shí)現(xiàn)
    private Object serviceObject;
    //服務(wù)端口
    private String serverPort;
    //服務(wù)超時(shí)時(shí)間
    private long timeout;
    //服務(wù)代理對(duì)象,暫時(shí)沒有用到
    private Object serviceProxyObject;
    //服務(wù)提供者唯一標(biāo)識(shí)
    private String appKey;
    //服務(wù)分組組名
    private String groupName = "default";
    //服務(wù)提供者權(quán)重,默認(rèn)為 1 , 范圍為 [1-100]
    private int weight = 1;
    //服務(wù)端線程數(shù),默認(rèn) 10 個(gè)線程
    private int workerThreads = 10;

    @Override
    public Object getObject() throws Exception {
        return serviceProxyObject;
    }

    @Override
    public Class<?> getObjectType() {
        return serviceItf;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        //啟動(dòng) Netty 服務(wù)端
        NettyServer.singleton().start(Integer.parseInt(serverPort));
        //注冊(cè)到 zk, 元數(shù)據(jù)注冊(cè)中心
        List<ProviderService> providerServiceList = buildProviderServiceInfos();
        IRegisterCenter4Provider registerCenter4Provider = RegisterCenter.singleton();
        registerCenter4Provider.registerProvider(providerServiceList);
    }
}

//================RegisterCenter#registerProvider======================
@Override
public void registerProvider(final List<ProviderService> serviceMetaData) {
    if (CollectionUtils.isEmpty(serviceMetaData)) {
        return;
    }

    //連接 zk, 注冊(cè)服務(wù)
    synchronized (RegisterCenter.class) {
        for (ProviderService provider : serviceMetaData) {
            String serviceItfKey = provider.getServiceItf().getName();

            List<ProviderService> providers = providerServiceMap.get(serviceItfKey);
            if (providers == null) {
                providers = Lists.newArrayList();
            }
            providers.add(provider);
            providerServiceMap.put(serviceItfKey, providers);
        }

        if (zkClient == null) {
            zkClient = new ZkClient(ZK_SERVICE, ZK_SESSION_TIME_OUT, ZK_CONNECTION_TIME_OUT, new SerializableSerializer());
        }

        //創(chuàng)建 ZK 命名空間/當(dāng)前部署應(yīng)用 APP 命名空間/
        String APP_KEY = serviceMetaData.get(0).getAppKey();
        String ZK_PATH = ROOT_PATH + "/" + APP_KEY;
        boolean exist = zkClient.exists(ZK_PATH);
        if (!exist) {
            zkClient.createPersistent(ZK_PATH, true);
        }

        for (Map.Entry<String, List<ProviderService>> entry : providerServiceMap.entrySet()) {
            //服務(wù)分組
            String groupName = entry.getValue().get(0).getGroupName();
            //創(chuàng)建服務(wù)提供者
            String serviceNode = entry.getKey();
            String servicePath = ZK_PATH + "/" + groupName + "/" + serviceNode + "/" + PROVIDER_TYPE;
            exist = zkClient.exists(servicePath);
            if (!exist) {
                zkClient.createPersistent(servicePath, true);
            }

            //創(chuàng)建當(dāng)前服務(wù)器節(jié)點(diǎn)
            int serverPort = entry.getValue().get(0).getServerPort();//服務(wù)端口
            int weight = entry.getValue().get(0).getWeight();//服務(wù)權(quán)重
            int workerThreads = entry.getValue().get(0).getWorkerThreads();//服務(wù)工作線程
            String localIp = IPHelper.localIp();
            String currentServiceIpNode = servicePath + "/" + localIp + "|" + serverPort + "|" + weight + "|" + workerThreads + "|" + groupName;
            exist = zkClient.exists(currentServiceIpNode);
            if (!exist) {
                //注意,這里創(chuàng)建的是臨時(shí)節(jié)點(diǎn)
                zkClient.createEphemeral(currentServiceIpNode);
            }
            //監(jiān)聽注冊(cè)服務(wù)的變化,同時(shí)更新數(shù)據(jù)到本地緩存
            zkClient.subscribeChildChanges(servicePath, new IZkChildListener() {
                @Override
                public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                    if (currentChilds == null) {
                        currentChilds = Lists.newArrayList();
                    }
                    //存活的服務(wù) IP 列表
                    List<String> activityServiceIpList = Lists.newArrayList(Lists.transform(currentChilds, new Function<String, String>() {
                        @Override
                        public String apply(String input) {
                            return StringUtils.split(input, "|")[0];
                        }
                    }));
                    refreshActivityService(activityServiceIpList);
                }
            });

        }
    }
}

至此服務(wù)實(shí)現(xiàn)類已被載入 Spring 容器中,且服務(wù)接口信息也注冊(cè)到了注冊(cè)中心。

網(wǎng)絡(luò)通信:

作為生產(chǎn)者對(duì)外提供 RPC 服務(wù),必須有一個(gè)網(wǎng)絡(luò)程序來來監(jiān)聽請(qǐng)求和做出響應(yīng)。在 Java 領(lǐng)域 Netty 是一款高性能的 NIO 通信框架,很多的框架的通信都是采用 Netty 來實(shí)現(xiàn)的,本例中也采用它當(dāng)做通信服務(wù)器。

構(gòu)建并啟動(dòng) Netty 服務(wù)監(jiān)聽指定端口:

public void start(final int port) {
        synchronized (NettyServer.class) {
            if (bossGroup != null || workerGroup != null) {
                return;
            }

            bossGroup = new NioEventLoopGroup();
            workerGroup = new NioEventLoopGroup();
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap
                    .group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //注冊(cè)解碼器 NettyDecoderHandler
                            ch.pipeline().addLast(new NettyDecoderHandler(StormRequest.class, serializeType));
                            //注冊(cè)編碼器 NettyEncoderHandler
                            ch.pipeline().addLast(new NettyEncoderHandler(serializeType));
                            //注冊(cè)服務(wù)端業(yè)務(wù)邏輯處理器 NettyServerInvokeHandler
                            ch.pipeline().addLast(new NettyServerInvokeHandler());
                        }
                    });
            try {
                channel = serverBootstrap.bind(port).sync().channel();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

上面的代碼中向 Netty 服務(wù)的 Pipeline 中添加了編解碼和業(yè)務(wù)處理器,當(dāng)接收到請(qǐng)求時(shí),經(jīng)過編解碼后,真正處理業(yè)務(wù)的是業(yè)務(wù)處理器,即 NettyServerInvokeHandler,該處理器繼承自 SimpleChannelInboundHandler,當(dāng)數(shù)據(jù)讀取完成將觸發(fā)一個(gè)事件,并調(diào)用 NettyServerInvokeHandler#channelRead0 方法來處理請(qǐng)求。

@Override
protected void channelRead0(ChannelHandlerContext ctx, StormRequest request) throws Exception {
    if (ctx.channel().isWritable()) {
        //從服務(wù)調(diào)用對(duì)象里獲取服務(wù)提供者信息
        ProviderService metaDataModel = request.getProviderService();
        long consumeTimeOut = request.getInvokeTimeout();
        final String methodName = request.getInvokedMethodName();

        //根據(jù)方法名稱定位到具體某一個(gè)服務(wù)提供者
        String serviceKey = metaDataModel.getServiceItf().getName();
        //獲取限流工具類
        int workerThread = metaDataModel.getWorkerThreads();
        Semaphore semaphore = serviceKeySemaphoreMap.get(serviceKey);
        if (semaphore == null) {
            synchronized (serviceKeySemaphoreMap) {
                semaphore = serviceKeySemaphoreMap.get(serviceKey);
                if (semaphore == null) {
                    semaphore = new Semaphore(workerThread);
                    serviceKeySemaphoreMap.put(serviceKey, semaphore);
                }
            }
        }

        //獲取注冊(cè)中心服務(wù)
        IRegisterCenter4Provider registerCenter4Provider = RegisterCenter.singleton();
        List<ProviderService> localProviderCaches = registerCenter4Provider.getProviderServiceMap().get(serviceKey);

        Object result = null;
        boolean acquire = false;

        try {
            ProviderService localProviderCache = Collections2.filter(localProviderCaches, new Predicate<ProviderService>() {
                @Override
                public boolean apply(ProviderService input) {
                    return StringUtils.equals(input.getServiceMethod().getName(), methodName);
                }
            }).iterator().next();
            Object serviceObject = localProviderCache.getServiceObject();

            //利用反射發(fā)起服務(wù)調(diào)用
            Method method = localProviderCache.getServiceMethod();
            //利用 semaphore 實(shí)現(xiàn)限流
            acquire = semaphore.tryAcquire(consumeTimeOut, TimeUnit.MILLISECONDS);
            if (acquire) {
                result = method.invoke(serviceObject, request.getArgs());
                //System.out.println("---------------"+result);
            }
        } catch (Exception e) {
            System.out.println(JSON.toJSONString(localProviderCaches) + "  " + methodName+" "+e.getMessage());
            result = e;
        } finally {
            if (acquire) {
                semaphore.release();
            }
        }
        //根據(jù)服務(wù)調(diào)用結(jié)果組裝調(diào)用返回對(duì)象
        StormResponse response = new StormResponse();
        response.setInvokeTimeout(consumeTimeOut);
        response.setUniqueKey(request.getUniqueKey());
        response.setResult(result);
        //將服務(wù)調(diào)用返回對(duì)象回寫到消費(fèi)端
        ctx.writeAndFlush(response);
    } else {
        logger.error("------------channel closed!---------------");
    }
}

此處還有部分細(xì)節(jié)如自定義的編解碼器等,篇幅所限不在此詳述,繼承 MessageToByteEncoder 和 ByteToMessageDecoder 覆寫對(duì)應(yīng)的 encode 和 decode 方法即可自定義編解碼器,使用到的序列化工具如 Hessian/Proto 等可參考對(duì)應(yīng)的官方文檔。

請(qǐng)求和響應(yīng)包裝:

為便于封裝請(qǐng)求和響應(yīng),定義兩個(gè) bean 來表示請(qǐng)求和響應(yīng)。

請(qǐng)求:

/**
 * @author 孫浩
 * @Descrption
 ***/
public class StormRequest implements Serializable {

    private static final long serialVersionUID = -5196465012408804755L;
    //UUID,唯一標(biāo)識(shí)一次返回值
    private String uniqueKey;
    //服務(wù)提供者信息
    private ProviderService providerService;
    //調(diào)用的方法名稱
    private String invokedMethodName;
    //傳遞參數(shù)
    private Object[] args;
    //消費(fèi)端應(yīng)用名
    private String appName;
    //消費(fèi)請(qǐng)求超時(shí)時(shí)長(zhǎng)
    private long invokeTimeout;
    // getter/setter
}

響應(yīng):

/**
 * @Descrption
 ***/
public class StormResponse implements Serializable {
    private static final long serialVersionUID = 5785265307118147202L;
    //UUID, 唯一標(biāo)識(shí)一次返回值
    private String uniqueKey;
    //客戶端指定的服務(wù)超時(shí)時(shí)間
    private long invokeTimeout;
    //接口調(diào)用返回的結(jié)果對(duì)象
    private Object result;
    //getter/setter
}

客戶端(消費(fèi)者)

客戶端(消費(fèi)者)在 RPC 調(diào)用中主要是生成服務(wù)接口的代理對(duì)象,并從注冊(cè)中心獲取對(duì)應(yīng)的服務(wù)列表發(fā)起網(wǎng)絡(luò)請(qǐng)求。

客戶端和服務(wù)端一樣采用 Spring 來管理 bean 解析 XML 配置等不再贅述,重點(diǎn)看下以下幾點(diǎn):

1、通過 JDK 動(dòng)態(tài)代理來生成引入服務(wù)接口的代理對(duì)象

public Object getProxy() {
    return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{targetInterface}, this);
}

2、從注冊(cè)中心獲取服務(wù)列表并依據(jù)某種策略選取其中一個(gè)服務(wù)節(jié)點(diǎn)

//服務(wù)接口名稱
String serviceKey = targetInterface.getName();
//獲取某個(gè)接口的服務(wù)提供者列表
IRegisterCenter4Invoker registerCenter4Consumer = RegisterCenter.singleton();
List<ProviderService> providerServices = registerCenter4Consumer.getServiceMetaDataMap4Consume().get(serviceKey);
//根據(jù)軟負(fù)載策略,從服務(wù)提供者列表選取本次調(diào)用的服務(wù)提供者
ClusterStrategy clusterStrategyService = ClusterEngine.queryClusterStrategy(clusterStrategy);
ProviderService providerService = clusterStrategyService.select(providerServices);

3、通過 Netty 建立連接,發(fā)起網(wǎng)絡(luò)請(qǐng)求

/**
 * @author 孫浩
 * @Descrption Netty 消費(fèi)端 bean 代理工廠
 ***/
public class RevokerProxyBeanFactory implements InvocationHandler {
    private ExecutorService fixedThreadPool = null;
    //服務(wù)接口
    private Class<?> targetInterface;
    //超時(shí)時(shí)間
    private int consumeTimeout;
    //調(diào)用者線程數(shù)
    private static int threadWorkerNumber = 10;
    //負(fù)載均衡策略
    private String clusterStrategy;

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        ...

        //復(fù)制一份服務(wù)提供者信息
        ProviderService newProvider = providerService.copy();
        //設(shè)置本次調(diào)用服務(wù)的方法以及接口
        newProvider.setServiceMethod(method);
        newProvider.setServiceItf(targetInterface);

        //聲明調(diào)用 AresRequest 對(duì)象,AresRequest 表示發(fā)起一次調(diào)用所包含的信息
        final StormRequest request = new StormRequest();
        //設(shè)置本次調(diào)用的唯一標(biāo)識(shí)
        request.setUniqueKey(UUID.randomUUID().toString() + "-" + Thread.currentThread().getId());
        //設(shè)置本次調(diào)用的服務(wù)提供者信息
        request.setProviderService(newProvider);
        //設(shè)置本次調(diào)用的方法名稱
        request.setInvokedMethodName(method.getName());
        //設(shè)置本次調(diào)用的方法參數(shù)信息
        request.setArgs(args);

        try {
            //構(gòu)建用來發(fā)起調(diào)用的線程池
            if (fixedThreadPool == null) {
                synchronized (RevokerProxyBeanFactory.class) {
                    if (null == fixedThreadPool) {
                        fixedThreadPool = Executors.newFixedThreadPool(threadWorkerNumber);
                    }
                }
            }
            //根據(jù)服務(wù)提供者的 ip,port, 構(gòu)建 InetSocketAddress 對(duì)象,標(biāo)識(shí)服務(wù)提供者地址
            String serverIp = request.getProviderService().getServerIp();
            int serverPort = request.getProviderService().getServerPort();
            InetSocketAddress inetSocketAddress = new InetSocketAddress(serverIp, serverPort);
            //提交本次調(diào)用信息到線程池 fixedThreadPool, 發(fā)起調(diào)用
            Future<StormResponse> responseFuture = fixedThreadPool.submit(RevokerServiceCallable.of(inetSocketAddress, request));
            //獲取調(diào)用的返回結(jié)果
            StormResponse response = responseFuture.get(request.getInvokeTimeout(), TimeUnit.MILLISECONDS);
            if (response != null) {
                return response.getResult();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return null;
    }
    //  ...
}

Netty 的響應(yīng)是異步的,為了在方法調(diào)用返回前獲取到響應(yīng)結(jié)果,需要將異步的結(jié)果同步化。

4、Netty 異步返回的結(jié)果存入阻塞隊(duì)列

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, StormResponse response) throws Exception {
    //將 Netty 異步返回的結(jié)果存入阻塞隊(duì)列,以便調(diào)用端同步獲取
    RevokerResponseHolder.putResultValue(response);
}

5、請(qǐng)求發(fā)出后同步獲取結(jié)果

//提交本次調(diào)用信息到線程池 fixedThreadPool, 發(fā)起調(diào)用
Future<StormResponse> responseFuture = fixedThreadPool.submit(RevokerServiceCallable.of(inetSocketAddress, request));
//獲取調(diào)用的返回結(jié)果
StormResponse response = responseFuture.get(request.getInvokeTimeout(), TimeUnit.MILLISECONDS);
if (response != null) {
    return response.getResult();
}

//===================================================
//從返回結(jié)果容器中獲取返回結(jié)果,同時(shí)設(shè)置等待超時(shí)時(shí)間為 invokeTimeout
long invokeTimeout = request.getInvokeTimeout();
StormResponse response = RevokerResponseHolder.getValue(request.getUniqueKey(), invokeTimeout);

測(cè)試

Server:

/**
 * @Descrption
 ***/
public class MainServer {
    public static void main(String[] args) throws Exception {
        //發(fā)布服務(wù)
        final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("storm-server.xml");
        System.out.println(" 服務(wù)發(fā)布完成");
    }
}

Client:

public class Client {

    private static final Logger logger = LoggerFactory.getLogger(Client.class);

    public static void main(String[] args) throws Exception {

        final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("storm-client.xml");
        final HelloService helloService = (HelloService) context.getBean("helloService");
        String result = helloService.sayHello("World");
        System.out.println(result);
        for (;;) {

        }
    }
}

結(jié)果

生產(chǎn)者:

アリババのインタビュアー: RPC フレームワークを手書きで書いてください。
圖片

消費(fèi)者:

アリババのインタビュアー: RPC フレームワークを手書きで書いてください。
圖片

注冊(cè)中心:

アリババのインタビュアー: RPC フレームワークを手書きで書いてください。
圖片

總結(jié)

本文簡(jiǎn)單介紹了 RPC 的整個(gè)流程,并實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的 RPC 調(diào)用。希望閱讀完本文之后,能加深你對(duì) RPC 的一些認(rèn)識(shí)。

生產(chǎn)者端流程:

  • サービス インターフェイスとキャッシュをロードします
  • サービス登録、サービス インターフェイスとサービス ホスト情報(bào)を登録センターに書き込みます (この例では ZooKeeper を使用します)
  • ネットワーク サーバーを起動(dòng)し、
  • Reflection をリッスンし、ローカルで呼び出します

コンシューマ側(cè)プロセス:

  • プロキシ サービス インターフェイスはプロキシ オブジェクトを生成します
  • サービス検出 (ZooKeeper に接続し、サービス アドレス リストを取得し、適切なサービスを取得します)クライアント ロード ポリシー アドレス)
  • #リモート メソッド呼び出し (この例では、Netty 経由でメッセージを送信し、応答結(jié)果を取得します)
#

以上がアリババのインタビュアー: RPC フレームワークを手書きで書いてください。の詳細(xì)內(nèi)容です。詳細(xì)については、PHP 中國語 Web サイトの他の関連記事を參照してください。

このウェブサイトの聲明
この記事の內(nèi)容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰屬します。このサイトは、それに相當(dāng)する法的責(zé)任を負(fù)いません。盜作または侵害の疑いのあるコンテンツを見つけた場(chǎng)合は、admin@php.cn までご連絡(luò)ください。

ホットAIツール

Undress AI Tool

Undress AI Tool

脫衣畫像を無料で

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード寫真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

寫真から衣服を削除するオンライン AI ツール。

Clothoff.io

Clothoff.io

AI衣類リムーバー

Video Face Swap

Video Face Swap

完全無料の AI 顔交換ツールを使用して、あらゆるビデオの顔を簡(jiǎn)単に交換できます。

ホットツール

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター

SublimeText3 中國語版

SublimeText3 中國語版

中國語版、とても使いやすい

ゼンドスタジオ 13.0.1

ゼンドスタジオ 13.0.1

強(qiáng)力な PHP 統(tǒng)合開発環(huán)境

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開発ツール

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

手書きの RPC フレームワークは、実際には 13 歳のふりをするためだけのものではありません。 手書きの RPC フレームワークは、実際には 13 歳のふりをするためだけのものではありません。 Aug 16, 2023 pm 05:01 PM

あなたは質(zhì)問されていないかもしれません、おそらくあなたは幸運(yùn)であるかもしれません、あるいはあなたはまだこのレベルに達(dá)していないかもしれません。通常、月給は20,000以上で、基本的にデザインに関する質(zhì)問をいくつか受けます。面接官の観點(diǎn)から: この種の質(zhì)問をすることは、多くの技術(shù)的なポイントを含む 8 部構(gòu)成のエッセイを書くよりも効果的です。例: デザイン パターン、通信プロトコル、動(dòng)的エージェント、仮想化、スレッド プールなどに関する知識(shí)。

Go 言語の RPC フレームワークの原理と応用 Go 言語の RPC フレームワークの原理と応用 Jun 01, 2023 pm 03:01 PM

1. RPC フレームワークの概念 分散システムでは、異なるサーバーとクライアントの間でデータを転送する必要があることが多く、RPC (RemoteProcedureCall) フレームワークは一般的に使用される技術(shù)手段です。 RPC フレームワークを使用すると、アプリケーションはリモート メッセージングを通じて別の実行環(huán)境の関數(shù)やメソッドを呼び出すことができるため、プログラムを別のコンピュータで実行できるようになります?,F(xiàn)在、Google の gRPC、Thrift、Hessian など、多くの RPC フレームワークが市場(chǎng)に出回っています。この記事では主に

アリババのインタビュアー: RPC フレームワークを手書きで書いてください。 アリババのインタビュアー: RPC フレームワークを手書きで書いてください。 Aug 17, 2023 pm 04:24 PM

RPC はコンピュータ通信プロトコルです。このプロトコルを使用すると、開発者がこの対話を追加でプログラムすることなく、あるコンピュータ上で実行されているプログラムが別のコンピュータ上のサブルーチンを呼び出すことができます。

どのような RPC フレームワークがありますか? どのような RPC フレームワークがありますか? Aug 03, 2023 am 10:17 AM

RPC フレームワークには次のものが含まれます: 1. Google によって開発された高性能のオープンソース RPC フレームワークである gRPC; 2. Facebook によって開発されオープンソース化されているクロス言語 RPC フレームワークである Apache Thrift; 3. 高パフォーマンスのオープンソース RPC フレームワークである Apache Dubboパフォーマンス、軽量な RPC フレームワーク、大規(guī)模な分散システムに適しています; 4. Apache Axis2、Web サービス標(biāo)準(zhǔn)に基づく RPC フレームワーク; 5. Spring Cloud、分散システムを構(gòu)築するためのオープンソース フレームワーク。

PHP で RPC フレームワークを開発するにはどうすればよいですか? PHP で RPC フレームワークを開発するにはどうすればよいですか? May 13, 2023 pm 03:22 PM

RPC (RemoteProcedureCall) は、異なるプロセスが異なる物理マシン上のネットワークを介して通信および共同作業(yè)できるようにするプロセス間通信プロトコルです。 RPC フレームワークは、開発者が分散システムの開発を容易に実裝できるため、ますます注目を集めています。この記事では、PHP を使用して RPC フレームワークを開発する方法を段階的に紹介します。 1. RPC フレームワークとは何ですか? RPC フレームワークは、リモート プロシージャ コールを?qū)g裝するために使用されるフレームワークです。 RPCベースの場(chǎng)合

Go 言語 RPC フレームワークの評(píng)価: パフォーマンス、使いやすさ、コミュニティ サポートの比較 Go 言語 RPC フレームワークの評(píng)価: パフォーマンス、使いやすさ、コミュニティ サポートの比較 Feb 27, 2024 pm 09:12 PM

Go 言語は重要な現(xiàn)代プログラミング言語として、分散システム開発でますます使用されています。分散システムを構(gòu)築する場(chǎng)合、多くの場(chǎng)合、RPC (リモート プロシージャ コール) フレームワークの選択が重要になります。この記事では、現(xiàn)在主流の Go 言語 RPC フレームワークの水平評(píng)価を?qū)g施し、パフォーマンス、使いやすさ、コミュニティ サポートの観點(diǎn)から長(zhǎng)所と短所を比較し、具體的なコード例を添付します。 1. パフォーマンスの比較 分散システムでは、多くの場(chǎng)合、パフォーマンスは開発者が注目する主要な指標(biāo)の 1 つです。以下は主なものです

PHP7.0 の RPC フレームワークとは何ですか? PHP7.0 の RPC フレームワークとは何ですか? May 29, 2023 am 11:10 AM

コンピュータ技術(shù)の継続的な発展に伴い、分散システムが主流になり、リモート プロシージャ コール (RPC) は分散システムを?qū)g裝する重要な手段です。人気の Web プログラミング言語として、PHP には獨(dú)自の RPC フレームワークもあり、その中には PHP7.0 バージョンでいくつかの新しい RPC フレームワークが導(dǎo)入されました。この記事では、PHP7.0における一般的なRPCフレームワークとその特徴を紹介します。 PHPRemoteProcedureCall(phpRPC)phpRPC は軽量 RP です

Go 言語で同時(shí)実行性の高い RPC フレームワークを?qū)g裝する方法 Go 言語で同時(shí)実行性の高い RPC フレームワークを?qū)g裝する方法 Aug 05, 2023 pm 12:49 PM

Go 言語で高同時(shí)実行 RPC フレームワークを?qū)g裝する方法の紹介: インターネットの急速な発展に伴い、高同時(shí)実行アプリケーションがますます注目を集めています。 RPC (RemoteProcedureCall) フレームワークを使用するのが一般的な解決策です。この記事では、Go 言語で同時(shí)実行性の高い RPC フレームワークを?qū)g裝する方法をコード例とともに紹介します。 RPC フレームワークの紹介: RPC は、コンピュータ プログラムが別のアドレス空間 (通常はリモート コンピュータ上にある) にあるサブルーチンを、何もせずに呼び出すことを可能にする通信プロトコルです。

See all articles