不积跬步,无以至千里;不积小流,无以成江海。

Dean's blog

  • Join Us on Facebook!
  • Follow Us on Twitter!
  • LinkedIn
  • Subcribe to Our RSS Feed

Python使用RabbitMQ实现RPC调用示例

在上一遍“Python RabbitMQ使用示例”中简单记录了在Python中RabbitMQ的使用示例。而在"Python gRPC小白使用示例"中也记录了使用gRPC进行远程过程调用的示例。但是,使用RabbitMQ也可以实现RPC的高并发调用。

实现方式

在RabbitMQ中,基本的消息如下图:

    

当将RabbitMQ作为RPC通道时,实际上使用了两个队列,其中一个队列发送调用请求,在请求的时候,将返回请求的队列主动告诉服务器,服务器调用完成后,将返回值写下到对应的队列中,示意图如下:

测试示例

知道了实现过程,就可以进行相应的测试:

RCPClient.py

import pika
import uuid
import settings

class RPCClient:
    def __init__(self):
        params = pika.ConnectionParameters(host='localhost')
        self.connection = pika.BlockingConnection(params)
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(#定义接收返回值队列
            queue = str(uuid.uuid4()), 
            exclusive=True
        )
        self.callback_queue = result.method.queue
        self.channel.basic_consume(
            queue = self.callback_queue,
            on_message_callback = self.on_response
        )
        
    #调用远程方法
    def call(self, n):
        self.response = None

        #调用远程方法
        self.corr_id = str(uuid.uuid4()) #调用唯一标识
        self.channel.basic_publish(
            exchange = '',
            routing_key = 'rpc_queue',  #消息发送队列
            properties=pika.BasicProperties(
                correlation_id=self.corr_id,
                reply_to=self.callback_queue
            ),
            body = str(n)
        )

        #等待响应
        while self.response is None:
            self.connection.process_data_events() # 非阻塞版的start_consuming()

        return int(self.response)


    #接收到消息后调用
    def on_response(self, ch, method, props, body):
        # 如果收到的ID和本机生成的相同,则返回的结果就是我想要的指令返回的结果
        if(self.corr_id == props.correlation_id):
            self.response = body

if __name__ == "__main__":
    client = RPCClient()

    print(" [x] Requesting fib(7)")
    response = client.call(7)
    print(" [.] Got %r" % response)

RPCClient有以下几个需要注意的:

        1、定义接收返回值的队列,并赋值给reply_to,主动告之服务端;

        2、每次调用都生成一个唯一标识,并检查返回队列中,标识是否一致;

        3、发送调用请求后到接收到值值,中间会有一定延时,需要进行等待。

 

RPCServer.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import pika

def fib(n):
    if(n == 0): 
        return 0
    if(n == 1):
        return 1
    return fib(n - 1) + fib(n - 2)

def on_request(ch, method, props, body):
    #执行方法
    n = int(body)
    print(" [.] fib(%s)" % n)
    response = fib(n)

    #返回消息给客户端
    ch.basic_publish(
        exchange = '',
        routing_key = props.reply_to,  #消息发送队列
        body = str(response),
        properties=pika.BasicProperties(
            correlation_id=props.correlation_id
        )
    )
    ch.basic_ack(delivery_tag = method.delivery_tag)  #任务完成,告诉客户端

if __name__ == "__main__":
    params = pika.ConnectionParameters(host='localhost')
    connection = pika.BlockingConnection(params)
    channel = connection.channel()
    channel.queue_declare(queue = 'rpc_queue') #指定一个队列
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(
        queue = 'rpc_queue', 
        on_message_callback = on_request
    )
    print(" [x] Awaiting RPC requests")
    channel.start_consuming()

RPCServer有以下几个需要注意的:

        1、返回返回值的时候,需要将调用方的correlation_id同时返回

 

相关链接

基于Rabbit实现的RPC:https://www.cnblogs.com/goldsunshine/p/8665456.htm

Python RabbitMQ使用示例:http://www.mirthsoft.com/post/2019/12/18/python-rabbitmq

Python gRPC小白使用示例:http://www.mirthsoft.com/post/2019/12/18/python-grpc-0011

不允许评论
粤ICP备17049187号-1