python使用channels实现websocket的推送信息功能 想法来源:
之前刷Leetcode的时候嘛,右上角都会有全服的刷题消息推送以及个人的收到的回复和关注消息推送。
因此就去查了查如何实现这种消息推送的功能。
在阅读了一番官方文档后,有了大致的一个理解和思路,毕竟阅读纯英文文档还是没有读中文文档那么直观(哈哈哈,就当考研锻炼英语阅读了,好了接下来进入正题!)
以下是我使用channels从消息推送功能实现到云服务器部署daphne(asgi服务器)的全过程,以及中间遇到的一些BUG.
一、什么是websocket websocket是一个全双工的通信协议,它和HTTP协议存在交集,但不完全相同,可以用“对HTTP协议的一种升级”来解释websocket。它的诞生主要用于弥补AJAX段轮询,HTTP长轮询的不足,它可以有效的减小带宽资源的浪费。
二、AJAX短轮询和HTTP长轮询,websocket的区别 1.AJAX短轮询:使用ajax技术,http协议,采用poll方式进行轮询,其轮询时间间隔较短,可以缩小到1s,但是细想,这种方法,每次都要建立TCP连接,而且HTTP协议有着无连接,无状态的特性,那么每次建立TCP连接,发送HTTP请求,所传输的数据报会非常多,这样很占用带宽资源。
2.HTTP长轮询:long poll指的是通过ajax发送请求,服务器在相应头中会包含Connection:keep-alive。这样会客户端发起请求后,服务器端保持一段时间,直到有数据了,再将数据发送给客户端。期间,客户端和服务端的连接会一直保持。这种方法,虽然后请求的次数可能减少,但是还是会占用很多带宽,同时,因为还是基于HTTP的,所以还是会不断建立连接,断开连接,建立连接,每次建立连接都会进行3次握手,断开连接进行4次握手。
注: 对于HTTP请求的缺点,我们可以看出,服务端只能被动的处理客户端发来的信息,而不能主动的向客户端发送数据。当然可以通过ajax长短轮询实现,但是弊端还是很明显的:浪费带宽。因此就诞生了基于全双工协议通信的websocket。
3.websocket协议:websocket协议很大程度上解决了HTTP协议中服务端无法主动发送数据给客户端的问题。websocket好处在于只需要依赖一次HTTP的握手,就可以建立websocket通信,这样服务端和客户端就可以创建持久性的连接,并可以相互发送消息。
三、F12分析协议的升级过程 {width=80%}
分析:
这是我博客建立websocket的一张图,我所用红箭头指的地方都跟websocket协议有关,接下来依次分析。
① 请求URL:以ws开头表示采用websocket协议,当然这是一种协议规定,我在nginx中还需要反向代理到相应能够支持处理websocket协议的异步框架的服务器上。
② 状态码为101表示切换协议成功。
③ upgrade:表示我要从HTTP协议升级到Websocket协议。
④ sec-WebSocket-Key:在Request中请求头中添加该行,表示对websocket协议的一种加密,服务端接收请求信息后,会对其进行再次加密,然后放到Response的Sec-webScoket-Accept,最后客户端对sec-WebSocket-Key加密和Sec-webScoket-Accept比较,相同的话表示建立websocket协议成功。
⑤ sec-WebSocket-Version:这个表示websocket的版本,固定为13。
四、python使用channels实现websocket协议 首先附上channels的官方文档(虽然都是英文)
https://channels.readthedocs.io/en/latest
以我的例子作为讲解:
1.首先安装channels和channels_redis pip install channels
pip install channels_redis
2.配置asgi应用程序,方便云服务器部署使用 因为我是的django3.0降级下来的。所以会自带asgi.py
,没有的话可以在根目录创建一个文件。
在其中加入:
1 2 3 4 5 6 7 8 9 import osimport djangofrom channels.routing import get_default_application os.environ.setdefault("DJANGO_SETTINGS_MODULE" , "mblog.settings" ) django.setup() application = get_default_application()
注: 主要作用就是启动django时候,同时创建asgi的application。
3.配置项目总routing 项目总routing,官方推荐放在根目录中,跟urls.py靠在一起,其实他们俩的用处类似,同样是经过中间件,然后匹配路由进入相应的处理函数。只不过遵守的是不同的协议,各自的叫法也不一样。
我这里:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from channels.routing import ProtocolTypeRouter,URLRouterfrom channels.auth import AuthMiddlewareStack,AuthMiddlewarefrom chat import routing as chat_routingfrom mainsite import routing as inform_routing application = ProtocolTypeRouter({ 'websocket' : AuthMiddlewareStack( URLRouter( inform_routing.websocket_urlpatterns, ) ), })
注: 如果里面没有,它会默认自动添加django视图所支持HTTP的协议。
4.配置具体app中的子路由 子路由是分布在不同的app中,如上所示,我的子routing添加在inform_routing.websocket_urlpatterns
中。
1 2 3 4 5 6 7 8 from django.urls import path, re_pathfrom mainsite import consumers websocket_urlpatterns = [ re_path(r'inform/' ,consumers.InformConsumer), ]
5.编写Consumer Consumer的主要作用类似django的视图,不过可以运行编写异步代码或者多线程。
以往使用async/await的时候,通常会将他们放到asyncio.get_event_loop()
建立的循环事件队列中,然后调用loop.run_until_complete(asyncio.wait(tasks))
循环执行异步程序。
而Consumer提供了一种一种机制,可以在其中编写任何异步函数,当事件发生时,这些函数将会被回调。
我的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 import jsonfrom asgiref.sync import async_to_syncfrom channels.generic.websocket import AsyncWebsocketConsumerfrom channels.layers import get_channel_layerimport logging common_log = logging.getLogger('django' ) class InformConsumer (AsyncWebsocketConsumer ): async def connect (self ): self.group_name = 'yunbo_inform' await self.channel_layer.group_add( self.group_name, self.channel_name ) await self.accept() async def disconnect (self, code ): await self.channel_layer.group_discard( self.group_name, self.channel_name ) async def receive (self, text_data=None , bytes_data=None , **kwargs ): text_data_json = json.loads(text_data) message = text_data_json['message' ] await self.channel_layer.group_send( self.group_name, { 'type' : 'web_update_inform' , 'message' : message } ) async def web_update_inform (self, event ): """网站更新消息发送""" func = event['func' ] username = event['username' ] article = event['article' ] head_image = event['head_image' ] trigger_username = event['trigger_username' ] message = { 'func' : func, 'username' : username, 'article' : article, 'head_image' : head_image, 'trigger_username' :trigger_username } await self.send(text_data=json.dumps(message))def send_inform (func, username, article, head_image, trigger_username=None , group_name=None ): """从外部发送消息到channels""" if group_name == None : group_name = 'yunbo_inform' channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)( group_name, { 'type' : 'web_update_inform' , 'func' : func, 'username' : username, 'article' : article, 'head_image' : head_image, 'trigger_username' :trigger_username } )
分析:
群发消息的实现原理:receive消息 –>发送到group中 –>通过group广播出去
关于单独两个consumer互相发送消息的,我还没有用到,用到了我会做相关笔记。
AsyncWebsocketConsumer
表示继承了异步AsyncConsumer
。还有些其他的例如WebsocketConsumer
,JsonWebsocketConsumer
(自动解码和解码成json格式的同步Consumer)AsyncJsonWebsocketConsume
,AsyncHttpConsumer
,具体的方法可以参考官方文档。
前端创建websocket对象 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 const informSocket = new WebSocket('ws://' + window .location.host + '/inform/' ); informSocket.onopen = function ( ) { var data_get_inform = { 'username' : username } $.ajax({ url: notes_add_url, type: 'POST' , headers: { "X-CSRFToken" : csrftoken, }, .... informSocket.onmessage = function (e ) { let message_ = JSON .parse(e.data); var head_image = message_.head_image; cur_notice = $(".badge" ).text(); cur_notice = parseInt (cur_notice) + 1 ; $(".badge" ).text(cur_notice); var data_get_inform = { 'username' : username } $.ajax({ url: notes_add_url, type: 'POST' , headers: { "X-CSRFToken" : csrftoken, }, data: data_get_inform, ...
以上是部分Js代码。
其他主要方法:
websocket四大事件:
open Socket.onopen
连接建立时触发
message Socket.onmessage
客户端接收服务端数据时触发
error Socket.onerror
通信发生错误时触发
close Socket.onclose
连接关闭时触发
websocket两大方法:
Socket.send()
使用连接发送数据
Socket.close()
关闭连接
五、Ubuntu16+supervisor+daphne+nginx部署asgi 在云服务器上安装daphnepip install daphne
我安装的daphne默认在/home/admin/.local/bin/
下了,会提示script daphne ... not on path
,意思是你不能直接在中断输入daphne
,运行command,我试了网上的一些方法,什么export PATH=/home/admin/.local/bin:$PATH
作用只能是暂时的。因此我直接写了绝对路径。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 [program:daphne] directory=/home/admin/mblog # 这里的8006 要和nginx中代理的服务器端口一致 command=/home/admin/.local/bin/daphne -b 127 .0 .0 .1 -p 8006 --proxy-headers mblog.asgi:application user=admin autostart=true autorestart=true startsecs=10 stdout_logfile=/var/log/websockets.log stdout_logfile_maxbytes=10 MB stdout_logfile_backups=10 redirect_stderr=True stderr_logfile=/var/log/websocket_worker_err.log stderr_logfile_maxbytes=10 MB stderr_logfile_backups=10
配置完毕后,通过supervisorctl启动,sudo supervisorcd -c /etc/supervisor/supervisord.conf
sudo supervisorctl start dephna
注意:注释要单独一行,不要有拼写错误
配置好了daphne后,接下来配置nginx
1 2 3 4 upstream websockets{ server 127.0 .0 .1 :8006 ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 location /inform { proxy_pass http://websockets; proxy_http_version 1.1 ; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection 'upgrade' ; proxy_redirect off; proxy_set_header Host $host; proxy_set_header X-Real_IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Host $server_name; }
配置完后,重启nginx就可以了,sudo service nginx restart