在上一遍“Python RabbitMQ使用示例”中简单记录了在Python中RabbitMQ的使用示例。而在"Python gRPC小白使用示例"中也记录了使用gRPC进行远程过程调用的示例。但是,使用RabbitMQ也可以实现RPC的高并发调用。
实现方式
在RabbitMQ中,基本的消息如下图:

当将RabbitMQ作为RPC通道时,实际上使用了两个队列,其中一个队列发送调用请求,在请求的时候,将返回请求的队列主动告诉服务器,服务器调用完成后,将返回值写下到对应的队列中,示意图如下:

测试示例
知道了实现过程,就可以进行相应的测试:
RCPClient.py
import pika
import uuid
import settings
class RPCClient:
def __init__(self):
params = pika.ConnectionParameters(host='localhost')
self.connection = pika.BlockingConnection(params)
self.channel = self.connection.channel()
result = self.channel.queue_declare(#定义接收返回值队列
queue = str(uuid.uuid4()),
exclusive=True
)
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue = self.callback_queue,
on_message_callback = self.on_response
)
#调用远程方法
def call(self, n):
self.response = None
#调用远程方法
self.corr_id = str(uuid.uuid4()) #调用唯一标识
self.channel.basic_publish(
exchange = '',
routing_key = 'rpc_queue', #消息发送队列
properties=pika.BasicProperties(
correlation_id=self.corr_id,
reply_to=self.callback_queue
),
body = str(n)
)
#等待响应
while self.response is None:
self.connection.process_data_events() # 非阻塞版的start_consuming()
return int(self.response)
#接收到消息后调用
def on_response(self, ch, method, props, body):
# 如果收到的ID和本机生成的相同,则返回的结果就是我想要的指令返回的结果
if(self.corr_id == props.correlation_id):
self.response = body
if __name__ == "__main__":
client = RPCClient()
print(" [x] Requesting fib(7)")
response = client.call(7)
print(" [.] Got %r" % response)
RPCClient有以下几个需要注意的:
1、定义接收返回值的队列,并赋值给reply_to,主动告之服务端;
2、每次调用都生成一个唯一标识,并检查返回队列中,标识是否一致;
3、发送调用请求后到接收到值值,中间会有一定延时,需要进行等待。
RPCServer.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import pika
def fib(n):
if(n == 0):
return 0
if(n == 1):
return 1
return fib(n - 1) + fib(n - 2)
def on_request(ch, method, props, body):
#执行方法
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
#返回消息给客户端
ch.basic_publish(
exchange = '',
routing_key = props.reply_to, #消息发送队列
body = str(response),
properties=pika.BasicProperties(
correlation_id=props.correlation_id
)
)
ch.basic_ack(delivery_tag = method.delivery_tag) #任务完成,告诉客户端
if __name__ == "__main__":
params = pika.ConnectionParameters(host='localhost')
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.queue_declare(queue = 'rpc_queue') #指定一个队列
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue = 'rpc_queue',
on_message_callback = on_request
)
print(" [x] Awaiting RPC requests")
channel.start_consuming()
RPCServer有以下几个需要注意的:
1、返回返回值的时候,需要将调用方的correlation_id同时返回
相关链接
基于Rabbit实现的RPC:https://www.cnblogs.com/goldsunshine/p/8665456.htm
Python RabbitMQ使用示例:http://www.mirthsoft.com/post/2019/12/18/python-rabbitmq
Python gRPC小白使用示例:http://www.mirthsoft.com/post/2019/12/18/python-grpc-0011