Post

HynoO 网络聊天室

使用 Rust 作服务端,基于 WebSocket 的类 IRC 网络聊天室

HynoO 网络聊天室

为什么做这个?

  • 久仰 Rust 大名,虽然我对 Rust 什么都不知道,梗图倒是看了一堆,忍不了一点,学学看🦀。

    image-20240505143244955

  • 从我大一第一次接触网站制作时,网络聊天室一直是我考虑的一个东西。无头骑士异闻录里的匿名聊天室 DOLLARS,Lain 里的连线世界,以及在尝试 CTF 时了解到黑客匿名聊天网站 HackChat 等等,兄弟,真的帅。

简单介绍 WebSocket

​ 通常而言,Web 应用采用轮询的方式实现推送,即浏览器每隔一段时间向服务器发送 HTTP 请求,然后服务器处理信息并返回给浏览器。这种方式(HTTP 协议)的问题在于,必须由客户端去发起请求,浏览器用固定频率去请求服务器,不仅实时性不够,而且容易造成资源浪费。

​ WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行的全双工通讯协议,服务器可以主动发消息给客户端。客户端与服务器建立连接后,双方即可互相发送消息,而不需要向 HTTP 协议那样请求

​ 客户端的 WebSocket 连接报文中有 Upgrade: websocket 字段,更详细的属性介绍可见[WebSocket - Web APIMDN (mozilla.org)](https://developer.mozilla.org/zh-CN/docs/Web/API/WebSocket)

什么是 IRC

​ IRC (Internet Relay Chat),是一种应用层的协议,主要用来群体聊天。其原理非常简单,本质上就是个广播过程。IRC 用户与服务器连接后,向服务器发送消息,服务器会把消息广播给所有用户,服务器只起到中继作用。

​ 一个比较特殊的是,IRC 的聊天服务通常有“频道”频道名称以 # 符号开始,用户在相同的频道时可以互相交流。

​ IRC 是 1988 年出现的,即使到了现在,还有一群技术遗老人在使用这项服务聊天交流,然而如今有 WebSocket 这种协议了,我认为 WebSocket 是比 IRC 更适合做聊天室的,WebSocket 的实时性可以实现更多东西。

在 Rust 里使用 WebSocket

​ 使用第三方库 warp 提供的 web 服务器框架和 tokio 的异步运行框架

​ 在 Cargo.toml 里添加依赖:

1
2
3
[dependencies]
warp = "0.3"
tokio = { version = "1", features = ["full"] }

​ 建立 WebSocket 过滤器:

1
2
3
4
5
6
7
8
9
10
async fn main() {
    let rooms: Rooms = Arc::new(Mutex::new(HashMap::new()));
    let chat = warp::path("chat")
        .and(warp::ws())
        .map(move |ws: warp::ws::Ws| {
            let rooms = rooms.clone();
            ws.on_upgrade(move |socket| handle_init(socket, rooms))
        });
    warp::serve(chat).run(([127, 0, 0, 1], 3030)).await;
}

warp::path("chat").and(warp::ws()) 是创建了一个过滤器,只匹配路径为 “/chat” 且请求升级到 WebSocket 的 HTTP 请求。warp::serve(chat).run(([127, 0, 0, 1], 3030)).await; 使用 chat 过滤器创建一个 Warp 服务器,启动并监听 127.0.0.1:3030

1
2
3
4
5
6
7
8
9
10
	let (user_ws_tx, mut user_ws_rx) = ws.split(); // 将 ws: WebSocket 分解为发送和接收两部分
	// 接收消息
    let msg = user_ws_rx	
        .next()
        .await
        .unwrap()
        .unwrap()
        .to_str()
        .unwrap()
        .to_string();

SplitStream<WebSocket>Next<SplitStream<WebSocket>>Option<Result<Message, Error>>Result<Message, Error>MessageResult<&str, ()>&strString

​ 注意这里的 Messagewarp 自己的结构体,不是我自己定义的:

1
2
3
4
#[derive(Eq, PartialEq, Clone)]
pub struct Message {
    inner: protocol::Message,
}

​ 发送消息如下:

1
2
3
let message = ...;	// String
let ws_msg = warp::ws::Message::text(message);	// Message
user_ws_tx.send(ws_msg).await.unwarp();

后端部分🦀

​ 首先先介绍一下 unbounded_channel() ,它是 tokio 库里的一个函数,该函数创建一个 unbounded 的mpsc channel。

mpsc(Multi-producer, Single-consumer)

bounded channel: mpsc::channel() ,需要指定通道容量限制

unbounded channel:通道无限存放消息直到内存耗尽,Sender 可以无需等待地不断向通道发送消息。需要保证内存不会耗尽。

unbounded_channel() 返回一个发送器 Sender 和一个接收器 Receiver,其中 Sender 可被多个线程共享,实现了 Clone 特征,Receiver 则不能。使用 Sender 的 send 方法发送消息,Receiver 的 recv 方法接收消息,这两个方法都是异步的。

​ 我的想法是:对于每一个 WebSocket 连接 ws,都为其创建一个 unbounded_channel()txrx。用一个 HashMap 来存 <String, tx>。当该 ws 接收到消息时,使用 HashMap 里的所有 tx 来发送消息到对应的 rx。给 rx 一个异步处理部分,当 rx 接收到消息时,使用 rx 对应的 ws 向客户端发现消息。

​ 使用 unbounded_channel() 作为中间人,可以让多个消息存在消息链表里依次处理。消息可以一口气全发到 rx 里,然后 rx 一个个发送到对应的 ws

​ 频道的实现就是在前面 HashMap 外面再套一个 HashMap,发送消息时对频道名称进行判断。


定义结构体和类型:

1
2
3
4
5
6
7
struct MessageData {
    username: String,
    content: String,
}
type Users =
    Arc<Mutex<HashMap<String, tokio::sync::mpsc::UnboundedSender<Result<Message, warp::Error>>>>>;
type Rooms = Arc<Mutex<HashMap<String, Users>>>;

建立 WebSocket 过滤器并运行服务器:

1
2
3
4
5
6
7
8
    let chat = warp::path("chat")
        .and(warp::ws())
        .map(move |ws: warp::ws::Ws| {
            let rooms = rooms.clone();
            //let users = users.clone();
            ws.on_upgrade(move |socket| handle_init(socket, rooms))
        });
    warp::serve(chat).run(([127, 0, 0, 1], 3030)).await;

当有 WebSocket 连接时,转到处理函数 handle_init

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
async fn handle_init(ws: WebSocket, rooms: Rooms) {
    let (user_ws_tx, mut user_ws_rx) = ws.split();	//分解 WebSocket 为 Sender 和 Receiver
    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();	//建立一个 unbounded_channel
    let msg = user_ws_rx	//直接尝试获得消息,规定连接时的第一个消息为{room_id, username}
        .next()
        .await
        .unwrap()
        .unwrap()
        .to_str()
        .unwrap()
        .to_string();

    let v: Value = serde_json::from_str(&msg).unwrap();	// msg 处理成 json
    let room_id = v["room_id"].as_str().unwrap().to_string();	//取得 room_id
    let username = v["username"].as_str().unwrap().to_string();	//取得 username
    //判断是否存在 room,有则插入这个 user,否则创建 room 
    if rooms.lock().unwrap().contains_key(&room_id) {	
        rooms
            .lock()
            .unwrap()
            .get(&room_id)
            .unwrap()
            .lock()
            .unwrap()
            .insert(username.clone(), tx.clone());
    } else {
        let users = Arc::new(Mutex::new(HashMap::new()));
        users.lock().unwrap().insert(username.clone(), tx.clone());
        rooms.lock().unwrap().insert(room_id.clone(), users);
    }
    //转到 handle_connection
    handle_connection(username, room_id, user_ws_tx, user_ws_rx, rooms.clone(), rx).await;
}

handle_connection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
async fn handle_connection(
    user_id: String,
    room_id: String,
    mut user_ws_tx: SplitSink<WebSocket, Message>,
    mut user_ws_rx: SplitStream<WebSocket>,
    rooms: Rooms,
    mut rx: tokio::sync::mpsc::UnboundedReceiver<Result<Message, warp::Error>>,
) {
    //为 rx 创建异步任务,收到消息时用 user_ws_tx 发送消息
    tokio::task::spawn(async move {
        while let Some(result) = rx.recv().await {
            let message = match result {
                Ok(msg) => msg.to_str().unwrap().to_string(),
                Err(e) => {
                    eprintln!("error {}", e);
                    break;
                }
            };
            let ws_msg = warp::ws::Message::text(message);
            user_ws_tx.send(ws_msg).await.unwrap();
        }
    });
	
    //user_ws_rx 接收消息时,用对应 room 里的每一个 tx 发送消息,对应的 rx 会接受到,也就是上面那部分
    while let Some(result) = user_ws_rx.next().await {
        match result {
            Ok(msg) => {
                let v: Value = serde_json::from_str(&msg.to_str().unwrap()).unwrap();
                let room_id = v["room_id"].as_str().unwrap().to_string();
                if rooms.lock().unwrap().contains_key(&room_id) {
                    for (roomname, users) in rooms.lock().unwrap().iter() {
                        if roomname != &room_id {
                            continue;
                        }
                        for (_, user) in users.lock().unwrap().iter() {
                            user.send(Ok(msg.clone())).unwrap();
                        }
                    }
                }
            }
            Err(e) => {
                eprintln!("websocket error(uid={}): {}", user_id, e);
                break;
            }
        }
    }
    // remove user from users
    rooms
        .lock()
        .unwrap()
        .get(&room_id)
        .unwrap()
        .lock()
        .unwrap()
        .remove(&user_id);
    // if users is empty, remove room
    if rooms
        .lock()
        .unwrap()
        .get(&room_id)
        .unwrap()
        .lock()
        .unwrap()
        .is_empty()
    {
        rooms.lock().unwrap().remove(&room_id);
    }
}

前端部分

使用 React+Vite,并没有使用什么 UI 库

注意 WebSocket 要用 useRef,不然组件重新渲染时会重置连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
const Room = () => {
  const { name: roomName } = useParams();
  const [username, setUsername] = useState<string | null>();
  const [messages, setMessages] = useState<
    { username: string; content: string }[]
  >([]);
  const wsRef = useRef<WebSocket | null>(null);
  //Get room name
  if (roomName === undefined) return null;
  let name = roomName;
  if (name[0] === ">") {
    name = name.slice(1);
  } else return null;

  //Get username
  useEffect(() => {
    const username_tmp = window.prompt("Please enter your username");
    setUsername(username_tmp);
  }, []);

  //Connect to WebSocket
  useEffect(() => {
    wsRef.current = new WebSocket("ws://localhost:3030/chat");
    wsRef.current.onopen = () => {
      console.log("connected");
      const message = {
        room_id: roomName,
        username: username,
      };
      wsRef.current?.send(JSON.stringify(message));
    };
    wsRef.current.onmessage = (e) => {
      const newMessage = JSON.parse(e.data);
      setMessages((oldMessages) => [...oldMessages, newMessage]);
    };
  }, [username]);

  //Send message
  const sendMessage = () => {
    const input = document.getElementById("message") as HTMLInputElement;
    const content = input.value;
    if (content === "") return;
    const message = {
      room_id: roomName,
      username: username,
      content: content,
    };
    wsRef.current?.send(JSON.stringify(message));
    input.value = "";
  };

  return (
    <div className="room">
      <h2>Room: {name}</h2>
      <div className="partingLine"></div>
      <div style=>
        <div className="roomBox">
          {messages.map((message, index) => (
            <MessageBox
              key={index}
              username={message.username}
              content={message.content}
            />
          ))}
        </div>
      </div>
      <div className="input">
        <textarea
          id="message"
          onKeyDown={(e) => {
            if (e.key === "Enter" && !e.shiftKey) {
              e.preventDefault();
              sendMessage();
            }
          }}
        />
        <button onClick={sendMessage}>Send</button>
      </div>
    </div>
  );
};

TODO

  • 异常错误处理
  • 管理员
  • 编辑器:数学公式、图片等
  • tokio 似乎有 broadcast 这种东西,不知能不能用
  • 觉得服务端写的实在草率(
This post is licensed under CC BY 4.0 by the author.