Python 使用channels实现websocket从编码到部署详解

python使用channels实现websocket的推送信息功能

想法来源:

之前刷Leetcode的时候嘛,右上角都会有全服的刷题消息推送以及个人的收到的回复和关注消息推送。

因此就去查了查如何实现这种消息推送的功能。

在阅读了一番官方文档后,有了大致的一个理解和思路,毕竟阅读纯英文文档还是没有读中文文档那么直观(哈哈哈,就当考研锻炼英语阅读了,好了接下来进入正题!)

以下是我使用channels从消息推送功能实现到云服务器部署daphne(asgi服务器)的全过程,以及中间遇到的一些BUG.


一、什么是websocket

websocket是一个全双工的通信协议,它和HTTP协议存在交集,但不完全相同,可以用“对HTTP协议的一种升级”来解释websocket。它的诞生主要用于弥补AJAX段轮询,HTTP长轮询的不足,它可以有效的减小带宽资源的浪费。


二、AJAX短轮询和HTTP长轮询,websocket的区别

1.AJAX短轮询:使用ajax技术,http协议,采用poll方式进行轮询,其轮询时间间隔较短,可以缩小到1s,但是细想,这种方法,每次都要建立TCP连接,而且HTTP协议有着无连接,无状态的特性,那么每次建立TCP连接,发送HTTP请求,所传输的数据报会非常多,这样很占用带宽资源。

2.HTTP长轮询:long poll指的是通过ajax发送请求,服务器在相应头中会包含Connection:keep-alive。这样会客户端发起请求后,服务器端保持一段时间,直到有数据了,再将数据发送给客户端。期间,客户端和服务端的连接会一直保持。这种方法,虽然后请求的次数可能减少,但是还是会占用很多带宽,同时,因为还是基于HTTP的,所以还是会不断建立连接,断开连接,建立连接,每次建立连接都会进行3次握手,断开连接进行4次握手。

注:对于HTTP请求的缺点,我们可以看出,服务端只能被动的处理客户端发来的信息,而不能主动的向客户端发送数据。当然可以通过ajax长短轮询实现,但是弊端还是很明显的:浪费带宽。因此就诞生了基于全双工协议通信的websocket。

3.websocket协议:websocket协议很大程度上解决了HTTP协议中服务端无法主动发送数据给客户端的问题。websocket好处在于只需要依赖一次HTTP的握手,就可以建立websocket通信,这样服务端和客户端就可以创建持久性的连接,并可以相互发送消息。

三、F12分析协议的升级过程

{width=80%}

分析:

这是我博客建立websocket的一张图,我所用红箭头指的地方都跟websocket协议有关,接下来依次分析。

① 请求URL:以ws开头表示采用websocket协议,当然这是一种协议规定,我在nginx中还需要反向代理到相应能够支持处理websocket协议的异步框架的服务器上。

② 状态码为101表示切换协议成功。

③ upgrade:表示我要从HTTP协议升级到Websocket协议。

④ sec-WebSocket-Key:在Request中请求头中添加该行,表示对websocket协议的一种加密,服务端接收请求信息后,会对其进行再次加密,然后放到Response的Sec-webScoket-Accept,最后客户端对sec-WebSocket-Key加密和Sec-webScoket-Accept比较,相同的话表示建立websocket协议成功。

⑤ sec-WebSocket-Version:这个表示websocket的版本,固定为13。

四、python使用channels实现websocket协议

首先附上channels的官方文档(虽然都是英文)

https://channels.readthedocs.io/en/latest

以我的例子作为讲解:

1.首先安装channels和channels_redis

pip install channels

pip install channels_redis

2.配置asgi应用程序,方便云服务器部署使用

因为我是的django3.0降级下来的。所以会自带asgi.py,没有的话可以在根目录创建一个文件。

在其中加入:

1
2
3
4
5
6
7
8
9
import os
import django
from channels.routing import get_default_application

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mblog.settings")

django.setup() # 加载异步application

application = get_default_application()

注:主要作用就是启动django时候,同时创建asgi的application。

3.配置项目总routing

项目总routing,官方推荐放在根目录中,跟urls.py靠在一起,其实他们俩的用处类似,同样是经过中间件,然后匹配路由进入相应的处理函数。只不过遵守的是不同的协议,各自的叫法也不一样。

我这里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

from channels.routing import ProtocolTypeRouter,URLRouter
from channels.auth import AuthMiddlewareStack,AuthMiddleware
from chat import routing as chat_routing
from mainsite import routing as inform_routing

application = ProtocolTypeRouter({
# (http->django views is added by default)
# 下面跟着不同协议路由,可以支持多个协议
'websocket': AuthMiddlewareStack(
URLRouter(
# chat_routing.websocket_urlpatterns,
# 这里的路由路径只能有一个
inform_routing.websocket_urlpatterns,
)
),
})


注:如果里面没有,它会默认自动添加django视图所支持HTTP的协议。

4.配置具体app中的子路由

子路由是分布在不同的app中,如上所示,我的子routing添加在inform_routing.websocket_urlpatterns中。

1
2
3
4
5
6
7
8
from django.urls import path, re_path
from mainsite import consumers

websocket_urlpatterns = [
# 官方解释path可能存在某种bug,用re_path既可以支持正则,也可以支持path路由匹配规则
re_path(r'inform/',consumers.InformConsumer),
]

5.编写Consumer

Consumer的主要作用类似django的视图,不过可以运行编写异步代码或者多线程。

以往使用async/await的时候,通常会将他们放到asyncio.get_event_loop()建立的循环事件队列中,然后调用loop.run_until_complete(asyncio.wait(tasks))循环执行异步程序。

而Consumer提供了一种一种机制,可以在其中编写任何异步函数,当事件发生时,这些函数将会被回调。

我的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85

import json

from asgiref.sync import async_to_sync
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.layers import get_channel_layer

import logging

common_log = logging.getLogger('django') # 自制的logging系统


class InformConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.group_name = 'yunbo_inform'

# 将用户加进组里
await self.channel_layer.group_add( # 所用的通道方法都必须是异步方法
self.group_name,
self.channel_name
)

await self.accept()

async def disconnect(self, code):
# 将用户移除组
await self.channel_layer.group_discard(
self.group_name,
self.channel_name
)

async def receive(self, text_data=None, bytes_data=None, **kwargs):
# 用于接收客户端的消息,可以保存到redis中
text_data_json = json.loads(text_data)
message = text_data_json['message']

# 群发笔记相关消息
await self.channel_layer.group_send(
self.group_name,
# 事件
{
'type': 'web_update_inform', # 事件名
'message': message # 参数
}
)

async def web_update_inform(self, event):
"""网站更新消息发送"""
func = event['func']
username = event['username']
article = event['article']
head_image = event['head_image']
trigger_username = event['trigger_username']
message = {
'func': func,
'username': username,
'article': article,
'head_image': head_image,
'trigger_username':trigger_username
}
await self.send(text_data=json.dumps(message))


# 外部发送消息到channels
def send_inform(func, username, article, head_image, trigger_username=None, group_name=None):
"""从外部发送消息到channels"""

if group_name == None:
group_name = 'yunbo_inform'

channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
group_name,
{
# 事件跟随组名
'type': 'web_update_inform',
'func': func,
'username': username,
'article': article,
'head_image': head_image,
'trigger_username':trigger_username
}
)


分析:

群发消息的实现原理:receive消息 –>发送到group中 –>通过group广播出去

关于单独两个consumer互相发送消息的,我还没有用到,用到了我会做相关笔记。

AsyncWebsocketConsumer表示继承了异步AsyncConsumer。还有些其他的例如WebsocketConsumerJsonWebsocketConsumer(自动解码和解码成json格式的同步Consumer)AsyncJsonWebsocketConsume,AsyncHttpConsumer,具体的方法可以参考官方文档。

前端创建websocket对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

//创建websocket对象
const informSocket = new WebSocket('ws://' + window.location.host + '/inform/');
// 第一次建立websocket连接成功执行
informSocket.onopen = function() {
var data_get_inform = {
'username': username
}
$.ajax({
url: notes_add_url,
type: 'POST',
headers: {
"X-CSRFToken": csrftoken,
},
....
//当服务器端推送数据过来的时候执行
informSocket.onmessage = function(e) {
let message_ = JSON.parse(e.data);
var head_image = message_.head_image;
cur_notice = $(".badge").text();
cur_notice = parseInt(cur_notice) + 1;
$(".badge").text(cur_notice);
var data_get_inform = {
'username': username
}
$.ajax({
url: notes_add_url,
type: 'POST',
headers: {
"X-CSRFToken": csrftoken,
},
data: data_get_inform,
...

以上是部分Js代码。

其他主要方法:

websocket四大事件:

open Socket.onopen 连接建立时触发

message Socket.onmessage 客户端接收服务端数据时触发

error Socket.onerror 通信发生错误时触发

close Socket.onclose 连接关闭时触发

websocket两大方法:

Socket.send() 使用连接发送数据

Socket.close() 关闭连接

五、Ubuntu16+supervisor+daphne+nginx部署asgi

在云服务器上安装daphne
pip install daphne

我安装的daphne默认在/home/admin/.local/bin/下了,会提示script daphne ... not on path,意思是你不能直接在中断输入daphne,运行command,我试了网上的一些方法,什么export PATH=/home/admin/.local/bin:$PATH作用只能是暂时的。因此我直接写了绝对路径。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[program:daphne]
directory=/home/admin/mblog
# 这里的8006要和nginx中代理的服务器端口一致
command=/home/admin/.local/bin/daphne -b 127.0.0.1 -p 8006 --proxy-headers mblog.asgi:application
user=admin
autostart=true
autorestart=true
startsecs=10
stdout_logfile=/var/log/websockets.log
stdout_logfile_maxbytes=10MB
stdout_logfile_backups=10
redirect_stderr=True
stderr_logfile=/var/log/websocket_worker_err.log
stderr_logfile_maxbytes=10MB
stderr_logfile_backups=10

配置完毕后,通过supervisorctl启动,sudo supervisorcd -c /etc/supervisor/supervisord.conf

sudo supervisorctl start dephna

注意:注释要单独一行,不要有拼写错误


配置好了daphne后,接下来配置nginx

1
2
3
4
# 首先配置在nginx.conf中配置代理节点,配置在upstream好处是,以后如果想要部署集群的话,就很方便了
upstream websockets{
server 127.0.0.1:8006;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 然后进入到default.conf中配置具体的server,设置具体的location路由匹配
# 这里的/表示从根路径起的匹配,这里要配置正确,不然前端就算建立websocket通信,还是会返回404错误
location /inform {
proxy_pass http://websockets;
# proxy_connect_timeout 2s
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection 'upgrade';
proxy_redirect off;
proxy_set_header Host $host;
# proxy_set_header X-Real_IP $remote_addr_IP;
proxy_set_header X-Real_IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Host $server_name;
# proxy_read_timeout 60s;#默认为60s
# proxy_send_timeout 60s;#默认为60s
}

配置完后,重启nginx就可以了,sudo service nginx restart


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!