RPC(远程过程调用)
解决的问题:之前的模式都是基于一对一的发,另外端收到,不能继续发。
使用多个队列对多个消费者之间分配耗时的任务
客服端
1 #!/usr/bin/env python
2 # -*- coding:utf-8 -*-
3
4 import pika
5 import uuid
6
7
8 class FibonacciRpcClient(object):
9
10 def __init__(self):
11 self.credentials = pika.PlainCredentials('admin', 'admin123456')
12 self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.1.6', credentials=self.credentials))
13 self.channel = self.connection.channel()
14
15 result = self.channel.queue_declare(queue='', exclusive=True)
16 self.callback_queue = result.method.queue
17
18 self.channel.basic_consume(
19 queue=self.callback_queue,
20 on_message_callback=self.on_response,
21 auto_ack=True)
22
23 def on_response(self, ch, method, props, body):
24 if self.corr_id == props.correlation_id:
25 self.response = body
26
27 def call(self, n):
28 self.response = None
29 self.corr_id = str(uuid.uuid4()) # 唯一标识 发的消息标个记号 然后服务端处理之后才能一一对应
30 self.channel.basic_publish(
31 exchange='',
32 routing_key='rpc_queue', # 服务端生成的rpc_queue
33 properties=pika.BasicProperties(
34 reply_to=self.callback_queue, # 客服端申明回调queue
35 correlation_id=self.corr_id, # 唯一值 服务端收到请求,在响应的时候,不清楚属于哪个请求的
36 ),
37 body=str(n))
38 count = 0
39 while self.response is None:
40 # 一直循环检测 并且在这里不会阻塞
41 self.connection.process_data_events()
42 count += 1
43 print(".......select.....", count)
44 return int(self.response)
45
46
47 fibonacci_rpc = FibonacciRpcClient()
48
49 print(" [x] Requesting fib(5)")
50 response = fibonacci_rpc.call(5)
51 print(" [.] Got %r" % response)
知识兔服务端
1 #!/usr/bin/env python
2 # -*- coding:utf-8 -*-
3
4 import pika
5
6 credentials = pika.PlainCredentials('admin', 'admin123456')
7 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.1.6', credentials=credentials))
8 channel = connection.channel()
9
10 channel.queue_declare(queue='rpc_queue')
11
12
13 def fib(n):
14 if n == 0:
15 return 0
16 elif n == 1:
17 return 1
18 else:
19 return fib(n - 1) + fib(n - 2)
20
21
22 def on_request(ch, method, props, body):
23 n = int(body)
24
25 print(" [.] fib(%s)" % n)
26 response = fib(n)
27
28 ch.basic_publish(exchange='',
29 routing_key=props.reply_to,
30 properties=pika.BasicProperties(correlation_id = \
31 props.correlation_id),
32 body=str(response))
33 ch.basic_ack(delivery_tag=method.delivery_tag)
34
35
36 channel.basic_qos(prefetch_count=1) # 根据任务处理能力,如果客服端处理不过来,那么就不会取任务继续处理
37 channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
38
39 print(" [x] Awaiting RPC requests")
40 channel.start_consuming()
知识兔运行结果:
1 D:\python\python.exe F:/abc/messagequeue/rpc_client.py
2 [x] Requesting fib(5)
3 .......select..... 1
4 .......select..... 2
5 .......select..... 3
6 .......select..... 4
7 .......select..... 5
8 .......select..... 6
9 .......select..... 7
10 .......select..... 8
11 .......select..... 9
12 .......select..... 10
13 .......select..... 11
14 .......select..... 12
15 .......select..... 13
16 .......select..... 14
17 .......select..... 15
18 .......select..... 16
19 .......select..... 17
20 .......select..... 18
21 .......select..... 19
22 .......select..... 20
23 .......select..... 21
24 .......select..... 22
25 .......select..... 23
26 .......select..... 24
27 .......select..... 25
28 .......select..... 26
29 .......select..... 27
30 .......select..... 28
31 [.] Got 5
知识兔