• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Rust:Websocket 服务端Warp

武飞扬头像
爱学习的佳
帮助1

Cargo.toml文件修改为如下所示:

  1.  
    [package]
  2.  
    name = "server"
  3.  
    version = "0.1.0"
  4.  
    edition = "2022"
  5.  
     
  6.  
    # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
  7.  
     
  8.  
    [dependencies]
  9.  
    tokio = { version = "1", features = ["full"] }
  10.  
    warp = "0.3"
  11.  
    futures-util = { version = "0.3", default-features = false, features = ["sink"] }
  12.  
    tokio-stream = "0.1"
  13.  
    pretty_env_logger = "0.4"
  1.  
    // #![deny(warnings)]
  2.  
    use std::collections::HashMap;
  3.  
    use std::convert::Infallible;
  4.  
    use std::sync::{
  5.  
    atomic::{AtomicUsize, Ordering},
  6.  
    Arc,
  7.  
    };
  8.  
     
  9.  
    use futures_util::{SinkExt, StreamExt, TryFutureExt};
  10.  
    use tokio::sync::{mpsc, RwLock};
  11.  
    use tokio_stream::wrappers::UnboundedReceiverStream;
  12.  
    use warp::ws::{Message, WebSocket};
  13.  
    use warp::Filter;
  14.  
     
  15.  
    /// 我们的全局唯一用户 ID 计数器。
  16.  
    static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
  17.  
     
  18.  
    /// 我们当前连接用户的状态。
  19.  
    ///
  20.  
    /// - 键是他们的 id
  21.  
    /// - 值是 `warp::ws::Message` 的发送者
  22.  
    type Users = Arc<RwLock<HashMap<usize, mpsc::UnboundedSender<Message>>>>;
  23.  
     
  24.  
    #[tokio::main]
  25.  
    async fn main() {
  26.  
    pretty_env_logger::init();
  27.  
     
  28.  
    // 跟踪所有连接的用户,key是usize,value
  29.  
    // 是一个 websocket 发送者。
  30.  
    let users = Users::default();
  31.  
    // 把我们的“状态”变成一个新的过滤器...
  32.  
    //注释掉了
  33.  
    //let users = warp::any().map(move || users.clone());
  34.  
     
  35.  
    // GET /chat -> websocket 升级
  36.  
    let chat = warp::path("chat")
  37.  
    // `ws()` 过滤器将准备 Websocket 握手...
  38.  
    .and(warp::ws())
  39.  
    .and(with_users(users.clone()))
  40.  
    .map(|ws: warp::ws::Ws, users| {
  41.  
    // 如果握手成功,这将调用我们的函数。
  42.  
    ws.on_upgrade(move |socket| user_connected(socket, users))
  43.  
    });
  44.  
     
  45.  
    let files = warp::fs::dir("static");
  46.  
    let routes = chat.or(files);
  47.  
     
  48.  
    warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
  49.  
    }
  50.  
     
  51.  
    //包一下,让它可以克隆
  52.  
    fn with_users(users: Users) -> impl Filter<Extract = (Users,), Error = Infallible> Clone {
  53.  
    warp::any().map(move || users.clone())
  54.  
    }
  55.  
     
  56.  
    async fn user_connected(ws: WebSocket, users: Users) {
  57.  
    // 使用计数器为该用户分配一个新的唯一 ID。
  58.  
    let my_id = NEXT_USER_ID.fetch_add(1, Ordering::Relaxed);
  59.  
     
  60.  
    eprintln!("new chat user: {}", my_id);
  61.  
     
  62.  
    // 将套接字拆分为消息的发送者和接收者。
  63.  
    let (mut user_ws_tx, mut user_ws_rx) = ws.split();
  64.  
     
  65.  
    // 使用无界通道来处理消息的缓冲和刷新
  66.  
    // 到 websocket...
  67.  
    let (tx, rx) = mpsc::unbounded_channel();
  68.  
    let mut rx = UnboundedReceiverStream::new(rx);
  69.  
     
  70.  
    tokio::task::spawn(async move {
  71.  
    while let Some(message) = rx.next().await {
  72.  
    user_ws_tx
  73.  
    .send(message)
  74.  
    .unwrap_or_else(|e| {
  75.  
    eprintln!("websocket send error: {}", e);
  76.  
    })
  77.  
    .await;
  78.  
    }
  79.  
    });
  80.  
     
  81.  
    // 将发件人保存在我们的已连接用户列表中。
  82.  
    users.write().await.insert(my_id, tx.clone());
  83.  
     
  84.  
    // 返回一个 `Future`,它基本上是一个状态机管理
  85.  
    // 这个特定用户的连接。
  86.  
     
  87.  
    // 用户每次发送消息,广播给
  88.  
    // 所有其他用户...
  89.  
    while let Some(result) = user_ws_rx.next().await {
  90.  
    let msg = match result {
  91.  
    Ok(msg) => {
  92.  
     
  93.  
    msg
  94.  
     
  95.  
    }
  96.  
    Err(e) => {
  97.  
    eprintln!("websocket error(uid={}): {}", my_id, e);
  98.  
    break;
  99.  
    }
  100.  
    };
  101.  
     
  102.  
    //因为我不需要把用户每次发送消息,广播给所有其他用户,先注释掉了
  103.  
    user_message(my_id, msg, &users).await;
  104.  
    }
  105.  
     
  106.  
    // 只要用户停留,user_ws_rx 流将继续处理
  107.  
    // 连接的。 一旦他们断开连接,然后...
  108.  
    user_disconnected(my_id, &users).await;
  109.  
    }
  110.  
     
  111.  
    async fn user_message(my_id: usize, msg: Message, users: &Users) {
  112.  
    // 跳过任何非文本消息...
  113.  
    let msg = if let Ok(s) = msg.to_str() {
  114.  
    s
  115.  
    } else {
  116.  
    return;
  117.  
    };
  118.  
     
  119.  
    let new_msg = format!("<User#{}>: {}", my_id, msg);
  120.  
     
  121.  
    // 来自该用户的新消息,将其发送给其他所有人(相同的 uid 除外)...
  122.  
    for (&uid, tx) in users.read().await.iter() {
  123.  
    if my_id != uid {
  124.  
    if let Err(_disconnected) = tx.send(Message::text(new_msg.clone())) {
  125.  
    // tx 断开连接,我们的 `user_disconnected` 代码
  126.  
    // 应该发生在另一个任务中,仅此而已
  127.  
    // 在这里做。
  128.  
    }
  129.  
    }
  130.  
    }
  131.  
    }
  132.  
     
  133.  
    async fn user_disconnected(my_id: usize, users: &Users) {
  134.  
    eprintln!("good bye user: {}", my_id);
  135.  
     
  136.  
    // 流关闭,所以从用户列表中删除
  137.  
    users.write().await.remove(&my_id);
  138.  
    }

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgaecaa
系列文章
更多 icon
同类精品
更多 icon
继续加载