HynoO 网络聊天室
使用 Rust 作服务端,基于 WebSocket 的类 IRC 网络聊天室
HynoO 网络聊天室
为什么做这个?
简单介绍 WebSocket
通常而言,Web 应用采用轮询的方式实现推送,即浏览器每隔一段时间向服务器发送 HTTP 请求,然后服务器处理信息并返回给浏览器。这种方式(HTTP 协议)的问题在于,必须由客户端去发起请求,浏览器用固定频率去请求服务器,不仅实时性不够,而且容易造成资源浪费。
WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行的全双工通讯协议,服务器可以主动发消息给客户端。客户端与服务器建立连接后,双方即可互相发送消息,而不需要向 HTTP 协议那样请求
客户端的 WebSocket 连接报文中有 Upgrade: websocket 字段,更详细的属性介绍可见[WebSocket - Web API | MDN (mozilla.org)](https://developer.mozilla.org/zh-CN/docs/Web/API/WebSocket) |
什么是 IRC
IRC (Internet Relay Chat),是一种应用层的协议,主要用来群体聊天。其原理非常简单,本质上就是个广播过程。IRC 用户与服务器连接后,向服务器发送消息,服务器会把消息广播给所有用户,服务器只起到中继作用。
一个比较特殊的是,IRC 的聊天服务通常有“频道”频道名称以 # 符号开始,用户在相同的频道时可以互相交流。
IRC 是 1988 年出现的,即使到了现在,还有一群技术遗老人在使用这项服务聊天交流,然而如今有 WebSocket 这种协议了,我认为 WebSocket 是比 IRC 更适合做聊天室的,WebSocket 的实时性可以实现更多东西。
在 Rust 里使用 WebSocket
使用第三方库 warp
提供的 web 服务器框架和 tokio
的异步运行框架
在 Cargo.toml
里添加依赖:
1
2
3
[dependencies]
warp = "0.3"
tokio = { version = "1", features = ["full"] }
建立 WebSocket 过滤器:
1
2
3
4
5
6
7
8
9
10
async fn main() {
let rooms: Rooms = Arc::new(Mutex::new(HashMap::new()));
let chat = warp::path("chat")
.and(warp::ws())
.map(move |ws: warp::ws::Ws| {
let rooms = rooms.clone();
ws.on_upgrade(move |socket| handle_init(socket, rooms))
});
warp::serve(chat).run(([127, 0, 0, 1], 3030)).await;
}
warp::path("chat").and(warp::ws())
是创建了一个过滤器,只匹配路径为 “/chat” 且请求升级到 WebSocket 的 HTTP 请求。warp::serve(chat).run(([127, 0, 0, 1], 3030)).await;
使用 chat
过滤器创建一个 Warp 服务器,启动并监听 127.0.0.1:3030
1
2
3
4
5
6
7
8
9
10
let (user_ws_tx, mut user_ws_rx) = ws.split(); // 将 ws: WebSocket 分解为发送和接收两部分
// 接收消息
let msg = user_ws_rx
.next()
.await
.unwrap()
.unwrap()
.to_str()
.unwrap()
.to_string();
SplitStream<WebSocket>
→ Next<SplitStream<WebSocket>>
→ Option<Result<Message, Error>>
→ Result<Message, Error>
→ Message
→ Result<&str, ()>
→ &str
→ String
注意这里的 Message
是 warp
自己的结构体,不是我自己定义的:
1
2
3
4
#[derive(Eq, PartialEq, Clone)]
pub struct Message {
inner: protocol::Message,
}
发送消息如下:
1
2
3
let message = ...; // String
let ws_msg = warp::ws::Message::text(message); // Message
user_ws_tx.send(ws_msg).await.unwarp();
后端部分🦀
首先先介绍一下 unbounded_channel()
,它是 tokio
库里的一个函数,该函数创建一个 unbounded 的mpsc channel。
mpsc(Multi-producer, Single-consumer)
bounded channel:
mpsc::channel()
,需要指定通道容量限制unbounded channel:通道无限存放消息直到内存耗尽,Sender 可以无需等待地不断向通道发送消息。需要保证内存不会耗尽。
unbounded_channel()
返回一个发送器 Sender 和一个接收器 Receiver,其中 Sender 可被多个线程共享,实现了 Clone
特征,Receiver 则不能。使用 Sender 的 send
方法发送消息,Receiver 的 recv
方法接收消息,这两个方法都是异步的。
我的想法是:对于每一个 WebSocket 连接 ws
,都为其创建一个 unbounded_channel()
:tx
和 rx
。用一个 HashMap 来存 <String, tx>
。当该 ws
接收到消息时,使用 HashMap 里的所有 tx
来发送消息到对应的 rx
。给 rx
一个异步处理部分,当 rx
接收到消息时,使用 rx
对应的 ws
向客户端发现消息。
使用 unbounded_channel()
作为中间人,可以让多个消息存在消息链表里依次处理。消息可以一口气全发到 rx
里,然后 rx
一个个发送到对应的 ws
。
频道的实现就是在前面 HashMap 外面再套一个 HashMap,发送消息时对频道名称进行判断。
定义结构体和类型:
1
2
3
4
5
6
7
struct MessageData {
username: String,
content: String,
}
type Users =
Arc<Mutex<HashMap<String, tokio::sync::mpsc::UnboundedSender<Result<Message, warp::Error>>>>>;
type Rooms = Arc<Mutex<HashMap<String, Users>>>;
建立 WebSocket 过滤器并运行服务器:
1
2
3
4
5
6
7
8
let chat = warp::path("chat")
.and(warp::ws())
.map(move |ws: warp::ws::Ws| {
let rooms = rooms.clone();
//let users = users.clone();
ws.on_upgrade(move |socket| handle_init(socket, rooms))
});
warp::serve(chat).run(([127, 0, 0, 1], 3030)).await;
当有 WebSocket 连接时,转到处理函数 handle_init
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
async fn handle_init(ws: WebSocket, rooms: Rooms) {
let (user_ws_tx, mut user_ws_rx) = ws.split(); //分解 WebSocket 为 Sender 和 Receiver
let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); //建立一个 unbounded_channel
let msg = user_ws_rx //直接尝试获得消息,规定连接时的第一个消息为{room_id, username}
.next()
.await
.unwrap()
.unwrap()
.to_str()
.unwrap()
.to_string();
let v: Value = serde_json::from_str(&msg).unwrap(); // msg 处理成 json
let room_id = v["room_id"].as_str().unwrap().to_string(); //取得 room_id
let username = v["username"].as_str().unwrap().to_string(); //取得 username
//判断是否存在 room,有则插入这个 user,否则创建 room
if rooms.lock().unwrap().contains_key(&room_id) {
rooms
.lock()
.unwrap()
.get(&room_id)
.unwrap()
.lock()
.unwrap()
.insert(username.clone(), tx.clone());
} else {
let users = Arc::new(Mutex::new(HashMap::new()));
users.lock().unwrap().insert(username.clone(), tx.clone());
rooms.lock().unwrap().insert(room_id.clone(), users);
}
//转到 handle_connection
handle_connection(username, room_id, user_ws_tx, user_ws_rx, rooms.clone(), rx).await;
}
handle_connection
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
async fn handle_connection(
user_id: String,
room_id: String,
mut user_ws_tx: SplitSink<WebSocket, Message>,
mut user_ws_rx: SplitStream<WebSocket>,
rooms: Rooms,
mut rx: tokio::sync::mpsc::UnboundedReceiver<Result<Message, warp::Error>>,
) {
//为 rx 创建异步任务,收到消息时用 user_ws_tx 发送消息
tokio::task::spawn(async move {
while let Some(result) = rx.recv().await {
let message = match result {
Ok(msg) => msg.to_str().unwrap().to_string(),
Err(e) => {
eprintln!("error {}", e);
break;
}
};
let ws_msg = warp::ws::Message::text(message);
user_ws_tx.send(ws_msg).await.unwrap();
}
});
//user_ws_rx 接收消息时,用对应 room 里的每一个 tx 发送消息,对应的 rx 会接受到,也就是上面那部分
while let Some(result) = user_ws_rx.next().await {
match result {
Ok(msg) => {
let v: Value = serde_json::from_str(&msg.to_str().unwrap()).unwrap();
let room_id = v["room_id"].as_str().unwrap().to_string();
if rooms.lock().unwrap().contains_key(&room_id) {
for (roomname, users) in rooms.lock().unwrap().iter() {
if roomname != &room_id {
continue;
}
for (_, user) in users.lock().unwrap().iter() {
user.send(Ok(msg.clone())).unwrap();
}
}
}
}
Err(e) => {
eprintln!("websocket error(uid={}): {}", user_id, e);
break;
}
}
}
// remove user from users
rooms
.lock()
.unwrap()
.get(&room_id)
.unwrap()
.lock()
.unwrap()
.remove(&user_id);
// if users is empty, remove room
if rooms
.lock()
.unwrap()
.get(&room_id)
.unwrap()
.lock()
.unwrap()
.is_empty()
{
rooms.lock().unwrap().remove(&room_id);
}
}
前端部分
使用 React+Vite,并没有使用什么 UI 库
注意 WebSocket 要用 useRef
,不然组件重新渲染时会重置连接。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
const Room = () => {
const { name: roomName } = useParams();
const [username, setUsername] = useState<string | null>();
const [messages, setMessages] = useState<
{ username: string; content: string }[]
>([]);
const wsRef = useRef<WebSocket | null>(null);
//Get room name
if (roomName === undefined) return null;
let name = roomName;
if (name[0] === ">") {
name = name.slice(1);
} else return null;
//Get username
useEffect(() => {
const username_tmp = window.prompt("Please enter your username");
setUsername(username_tmp);
}, []);
//Connect to WebSocket
useEffect(() => {
wsRef.current = new WebSocket("ws://localhost:3030/chat");
wsRef.current.onopen = () => {
console.log("connected");
const message = {
room_id: roomName,
username: username,
};
wsRef.current?.send(JSON.stringify(message));
};
wsRef.current.onmessage = (e) => {
const newMessage = JSON.parse(e.data);
setMessages((oldMessages) => [...oldMessages, newMessage]);
};
}, [username]);
//Send message
const sendMessage = () => {
const input = document.getElementById("message") as HTMLInputElement;
const content = input.value;
if (content === "") return;
const message = {
room_id: roomName,
username: username,
content: content,
};
wsRef.current?.send(JSON.stringify(message));
input.value = "";
};
return (
<div className="room">
<h2>Room: {name}</h2>
<div className="partingLine"></div>
<div style=>
<div className="roomBox">
{messages.map((message, index) => (
<MessageBox
key={index}
username={message.username}
content={message.content}
/>
))}
</div>
</div>
<div className="input">
<textarea
id="message"
onKeyDown={(e) => {
if (e.key === "Enter" && !e.shiftKey) {
e.preventDefault();
sendMessage();
}
}}
/>
<button onClick={sendMessage}>Send</button>
</div>
</div>
);
};
TODO
- 异常错误处理
- 管理员
- 编辑器:数学公式、图片等
tokio
似乎有broadcast
这种东西,不知能不能用- 觉得服务端写的实在草率(