早上过来发现昨天上线的代码还是有个问题,好在很快解决了,觉得有必要做个小总结了。
python实现mq消息接收处理
框架选择
因为想不到怎么在Django里加上mq消息处理,所以就暴露出一个接口直接来调用。公司使用的是activemq,其支持4种协议:
- OpenWire for high performance clients in Java, C, C++, C#
- Stomp support so that clients can be written easily in C, Ruby, Perl, Python, PHP, ActionScript/Flash, Smalltalk to talk to ActiveMQ as well as any other popular Message Broker
- AMQP v1.0 support
- MQTT v3.1 support allowing for connections in an IoT environment.
从中可以看到,最适合python的就是Stomp协议了。在客户端列表中可以找到不同实现语言对应的客户端,这里我选择了stomp.py,谁让他排在搜索页面前面呢(其名称就是一种很好的SEO方式)。
mq客户端的实现
这里曾经遇到困扰好几天的坑:
协议的选择
之前没接触过消息队列这块,天真地以为activemq就是mq的一种协议。豆油给我一个mq服务器地址和端口号(61616)后,使用stomp.py连接总是出错,连接时可以收到mq服务器返回的消息,解析却总是出错:
1
2
3
4
5
6
7
8
9
10
11
12
13Exception in thread StompReceiverThread-1:
Traceback (most recent call last):
File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "path/of/project/lib/python3.6/site-packages/stomp.py-4.1.19-py3.6.egg/stomp/transport.py", line 332, in __receiver_loop
f = utils.parse_frame(frame)
File "path/of/project/lib/python3.6/site-packages/stomp.py-4.1.19-py3.6.egg/stomp/utils.py", line 138, in parse_frame
preamble = decode(frame[0:preamble_end])
File "path/of/project/lib/python3.6/site-packages/stomp.py-4.1.19-py3.6.egg/stomp/backward3.py", line 29, in decode
return byte_data.decode()
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xf0 in position 0: invalid continuation byte
调试几天都是不行,就去提了个issue。
后来才发现是activemq实现了四种协议的服务,而不同协议开放的端口号不一样。stomp默认端口号为61613,端口号一换立马可以接收到消息。连接问题解决。
如何加入到Django里
被这个问题也是困扰了好久,单独的一个脚本到底如何加入到django里。曾经想过在启动django里随即运行该脚本,却一直找不到方法。因为我想把更多的精力放到对客户分数的处理上,而不是花太多的时间来处理后端的问题。所以比较急着想把这个问题解决,然而越急越无法找到实现的办法,也舍不得花时间来思考是不是这条路是不是对的路。正所谓我们都在不断赶路忘记了出路。
久久无果后,索性第一个版本就没加入消息队列的处理,回头处理数据去!
这两天忽然想到要不就把接收消息的代码单独拎出来运行,接收到消息就直接调用本地接口就可以了。然后直接用python启动就好了,调用本地接口也OK。
有时候,被一个问题困扰太久就容易陷进去,不可自拔。
又出问题了
然后把这个接收消息的脚本scp到服务器再运行又出现了2个问题:
- 服务器没有python3
- 还要安装各种依赖库(会出现各种问题)
没办法要为这个单独的脚本制作一个docker镜像了,有点高射炮打蚊子的感觉。但是好在可以做到平台无关性,不用去解决各种依赖的问题。
折腾一通后,可以正常启动运行了。然而天有不测风云,在本地可以正常运行的脚本,到了这里却出错了:
1
2
3
4
5
6
7
8
9> r = requests.delete('http://0.0.0.0:8000/credit/apply-del/ED201******53')
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/urllib3/connection.py", line 141, in _new_conn
(self.host, self.port), self.timeout, **extra_kw)
File "/usr/local/lib/python3.6/site-packages/urllib3/util/connection.py", line 83, in create_connection
raise err
File "/usr/local/lib/python3.6/site-packages/urllib3/util/connection.py", line 73, in create_connection
sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused悲剧(被拒)了。
一开始以为是不是iptables配置的问题,可是没听说过iptables用来防本地访问的呀,在终端里用curl执行了一下:
1
2$ curl -X "DELETE" "http://0.0.0.0:8000/credit/apply-del/ED201******53"
{"detail":"Not found.","status_code":404}没问题(忽略404),也就是不是防火墙的问题之类的。又怀疑是requests库的问题,又进这个镜像里用python3自带的urllib.request执行也是被拒。正想着难道非要执行curl命令才行?不合常理呀。
这时候机智的我灵光一现,难不成是在docker里运行的问题?docker就算是一个轻量级的虚拟机了,网络应该默认是bridge形式的。
嗯,改为**–net=host**,搞定!
最后附上处理消息队列的脚本(关键地方已经打码):
1 | #!/usr/bin/env python |
以及启动脚本:
1 |
|
算了Dockerfile也放出来吧:
1 | FROM python:3 |