Rust:Websocket 服务端Warp
将Cargo.toml
文件修改为如下所示:
-
[package]
-
name = "server"
-
version = "0.1.0"
-
edition = "2022"
-
-
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
-
-
[dependencies]
-
tokio = { version = "1", features = ["full"] }
-
warp = "0.3"
-
futures-util = { version = "0.3", default-features = false, features = ["sink"] }
-
tokio-stream = "0.1"
-
pretty_env_logger = "0.4"
-
// #![deny(warnings)]
-
use std::collections::HashMap;
-
use std::convert::Infallible;
-
use std::sync::{
-
atomic::{AtomicUsize, Ordering},
-
Arc,
-
};
-
-
use futures_util::{SinkExt, StreamExt, TryFutureExt};
-
use tokio::sync::{mpsc, RwLock};
-
use tokio_stream::wrappers::UnboundedReceiverStream;
-
use warp::ws::{Message, WebSocket};
-
use warp::Filter;
-
-
/// 我们的全局唯一用户 ID 计数器。
-
static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
-
-
/// 我们当前连接用户的状态。
-
///
-
/// - 键是他们的 id
-
/// - 值是 `warp::ws::Message` 的发送者
-
type Users = Arc<RwLock<HashMap<usize, mpsc::UnboundedSender<Message>>>>;
-
-
-
async fn main() {
-
pretty_env_logger::init();
-
-
// 跟踪所有连接的用户,key是usize,value
-
// 是一个 websocket 发送者。
-
let users = Users::default();
-
// 把我们的“状态”变成一个新的过滤器...
-
//注释掉了
-
//let users = warp::any().map(move || users.clone());
-
-
// GET /chat -> websocket 升级
-
let chat = warp::path("chat")
-
// `ws()` 过滤器将准备 Websocket 握手...
-
.and(warp::ws())
-
.and(with_users(users.clone()))
-
.map(|ws: warp::ws::Ws, users| {
-
// 如果握手成功,这将调用我们的函数。
-
ws.on_upgrade(move |socket| user_connected(socket, users))
-
});
-
-
let files = warp::fs::dir("static");
-
let routes = chat.or(files);
-
-
warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
-
}
-
-
//包一下,让它可以克隆
-
fn with_users(users: Users) -> impl Filter<Extract = (Users,), Error = Infallible> Clone {
-
warp::any().map(move || users.clone())
-
}
-
-
async fn user_connected(ws: WebSocket, users: Users) {
-
// 使用计数器为该用户分配一个新的唯一 ID。
-
let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
-
-
eprintln!("new chat user: {}", my_id);
-
-
// 将套接字拆分为消息的发送者和接收者。
-
let (mut user_ws_tx, mut user_ws_rx) = ws.split();
-
-
// 使用无界通道来处理消息的缓冲和刷新
-
// 到 websocket...
-
let (tx, rx) = mpsc::unbounded_channel();
-
let mut rx = UnboundedReceiverStream::new(rx);
-
-
tokio::task::spawn(async move {
-
while let Some(message) = rx.next().await {
-
user_ws_tx
-
.send(message)
-
.unwrap_or_else(|e| {
-
eprintln!("websocket send error: {}", e);
-
})
-
.await;
-
}
-
});
-
-
// 将发件人保存在我们的已连接用户列表中。
-
users.write().await.insert(my_id, tx.clone());
-
-
// 返回一个 `Future`,它基本上是一个状态机管理
-
// 这个特定用户的连接。
-
-
// 用户每次发送消息,广播给
-
// 所有其他用户...
-
while let Some(result) = user_ws_rx.next().await {
-
let msg = match result {
-
Ok(msg) => {
-
-
msg
-
-
}
-
Err(e) => {
-
eprintln!("websocket error(uid={}): {}", my_id, e);
-
break;
-
}
-
};
-
-
//因为我不需要把用户每次发送消息,广播给所有其他用户,先注释掉了
-
user_message(my_id, msg, &users).await;
-
}
-
-
// 只要用户停留,user_ws_rx 流将继续处理
-
// 连接的。 一旦他们断开连接,然后...
-
user_disconnected(my_id, &users).await;
-
}
-
-
async fn user_message(my_id: usize, msg: Message, users: &Users) {
-
// 跳过任何非文本消息...
-
let msg = if let Ok(s) = msg.to_str() {
-
s
-
} else {
-
return;
-
};
-
-
let new_msg = format!("<User#{}>: {}", my_id, msg);
-
-
// 来自该用户的新消息,将其发送给其他所有人(相同的 uid 除外)...
-
for (&uid, tx) in users.read().await.iter() {
-
if my_id != uid {
-
if let Err(_disconnected) = tx.send(Message::text(new_msg.clone())) {
-
// tx 断开连接,我们的 `user_disconnected` 代码
-
// 应该发生在另一个任务中,仅此而已
-
// 在这里做。
-
}
-
}
-
}
-
}
-
-
async fn user_disconnected(my_id: usize, users: &Users) {
-
eprintln!("good bye user: {}", my_id);
-
-
// 流关闭,所以从用户列表中删除
-
users.write().await.remove(&my_id);
-
}
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgaecaa
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13