Django 实现 Websocket 广播、点对点发送消息的代码

1. Django实现WebSocket广播的代码

1.1 安装Django Channels

首先,我们需要安装Django Channels来实现WebSocket功能。可以使用以下命令安装:

pip install channels

安装完成后,需要将Channels添加到Django项目的INSTALLED_APPS配置中:

INSTALLED_APPS = [

...

'channels',

...

]

1.2 创建WebSocket消费者

接下来,我们需要创建一个名为consumers.py的文件,在其中定义WebSocket消费者类:

from channels.generic.websocket import WebsocketConsumer

import json

class BroadcastConsumer(WebsocketConsumer):

def connect(self):

self.accept()

def disconnect(self, close_code):

pass

def receive(self, text_data):

text_data_json = json.loads(text_data)

message = text_data_json['message']

# 在这里可以对消息进行处理

# 广播消息给所有连接的客户端

self.send(message)

# 点对点发送消息给指定的客户端

self.send(text_data=json.dumps({

'message': 'This is a private message.'

}))

1.3 配置Routing

为了让Django能够正确地路由WebSocket请求到对应的消费者,我们需要在项目的routing.py文件中添加配置:

from django.urls import path

from . import consumers

websocket_urlpatterns = [

path('ws/broadcast/', consumers.BroadcastConsumer.as_asgi()),

]

1.4 配置ASGI应用

为了让Django能够正确地处理WebSocket请求,我们需要在项目的asgi.py文件中添加配置:

from channels.routing import ProtocolTypeRouter, URLRouter

from django.core.asgi import get_asgi_application

import myapp.routing

application = ProtocolTypeRouter({

'http': get_asgi_application(),

'websocket': URLRouter(

myapp.routing.websocket_urlpatterns

),

})

1.5 创建WebSocket客户端

最后,我们可以创建一个WebSocket客户端来测试广播功能:

from websocket import create_connection

import json

# 连接WebSocket服务器

ws = create_connection('ws://localhost:8000/ws/broadcast/')

# 发送消息给服务器

message = json.dumps({'message': 'Hello, World!'})

ws.send(message)

# 接收服务器返回的消息

result = ws.recv()

print(result)

# 关闭连接

ws.close()

2. Django实现WebSocket点对点发送消息的代码

2.1 修改消费者代码

为了实现点对点发送消息的功能,我们需要修改消费者代码。首先,我们需要在WebSocket消费者类中添加一个成员变量来保存每个连接的客户端:

from channels.generic.websocket import WebsocketConsumer

import json

class ChatConsumer(WebsocketConsumer):

def connect(self):

self.accept()

# 将当前连接的客户端添加到群组中

self.channel_group_name = 'chat_room'

async_to_sync(self.channel_layer.group_add)(

self.channel_group_name,

self.channel_name

)

def disconnect(self, close_code):

# 将当前连接的客户端从群组中移除

async_to_sync(self.channel_layer.group_discard)(

self.channel_group_name,

self.channel_name

)

def receive(self, text_data):

text_data_json = json.loads(text_data)

message = text_data_json['message']

# 在这里可以对消息进行处理

# 广播消息给群组内的所有客户端

async_to_sync(self.channel_layer.group_send)(

self.channel_group_name,

{

'type': 'chat_message',

'message': message

}

)

def chat_message(self, event):

message = event['message']

# 发送消息给当前连接的客户端

self.send(text_data=json.dumps({

'message': message

}))

2.2 修改Routing配置

为了让Django能够正确地路由WebSocket请求至聊天消费者,我们需要将之前的BroadcastConsumer修改为ChatConsumer并修改Routing配置:

from django.urls import path

from . import consumers

websocket_urlpatterns = [

path('ws/chat/', consumers.ChatConsumer.as_asgi()),

]

2.3 运行WebSocket服务器

现在,我们可以运行WebSocket服务器并在浏览器中打开两个窗口,分别发送和接收消息:

# 运行服务器

python manage.py runserver

# 在浏览器中打开两个窗口,分别输入以下JavaScript代码发送和接收消息

var socket = new WebSocket('ws://localhost:8000/ws/chat/');

// 发送消息

socket.onopen = function(event) {

socket.send(JSON.stringify({'message': 'Hello, World!'}));

};

// 接收消息

socket.onmessage = function(event) {

console.log(event.data);

};

总结

本文介绍了如何使用Django实现WebSocket的广播和点对点发送消息的功能。首先,我们安装了Django Channels来支持WebSocket功能。然后,我们创建了一个WebSocket消费者类,并在其中处理连接、断开连接和接收消息的逻辑。接下来,我们通过配置Routing和ASGI应用,将WebSocket请求正确地路由到消费者类。最后,我们使用WebSocket客户端来测试广播和点对点发送消息的功能。

通过本文的介绍,我们可以看到,使用Django实现WebSocket功能非常简单。WebSocket广播和点对点发送消息可以帮助我们实现实时通信功能,非常适合于聊天室、在线游戏等场景。

后端开发标签