• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Rust Programming The Complete Developor‘s Guide--05 Smart Pointers 和amp; Thread

武飞扬头像
血_影
帮助1

Type Conversion From/Into

From/Into

  • Rust has a robust type system
    • More reliable & maintainable code
    • Cumbersome to work with similar & warpper types
      • Usually requires extra repreated code
  • Traits can be used to easily convert between types:
    • From
      • Convert from one type to another
    • Into
      • Convert one type into another type
        Traits: From/Into
  • From:
    • Associated method on a type
      • TypeName::from()
    • Implementing From automatically implements Into
  • Into:
    • self method on a type
      • variable.into()

From/Into Example

let owned = String::from("slice");
let owned: String = "slice".into();
fn to_owned(slice: &str) -> String {
    slice.into()
}

Implementing From

enum Status {
    Broken(u8),
    Working,
}

impl From<u8> for Status {
    // type from u8 into Status
    fn from(code: u8) -> Self {
        match code {
            0 => Status::Working,
            c => Status::Broken(c),
        }
    }
}

fn legacy_interface() -> u8 {
    5
}

let status: Status = legacy_interface().into();
let status = Status::from(legacy_interface());
学新通

Pro Tips

  • From/Into cannot fail
  • Almost always want to implement From for errors
  • Prefer implementing From instead of Into
    • Into is automatically implemented with From
  • Use .into() when:
    • Obvious what resulting type will be
  • Use Type::from() when:
    • Important to know the resulting type

Question Mark operator

struct Job;
enum JobError {
    Expired,
    Missing,
    Other(u8),
}

impl From<u8> for JobError {
    fn from(code: u8) -> Self {
        match code {
            1 => Self::Expired,
            2 => Self::Missing,
            c => Self::Other(c),
        }
    }
}

// ? will attempt to convert whatever type is within 
// into whatever type is beging returend as an error
fn execute_job(job: Job) -> Result<(), JobError> {
    Err(2)?
}
学新通

Recap

  • From/Into allow conversion between types
    • The conversion cannot fail
  • Prefer implementing From ove Into
    • Into gets implemented automatically when From is implemented
  • The Question Mark operator will automatically use a From implementation to convert errors

TryFrom/TryInto

  • Fallible type conversion
    • Use when there is the possibility of failure
  • Just like From/Into, except it returns a Result
    • TryFrom will auto-implement TryInto
      Implementing TryFrom
use std::convert::TryFrom;

enum NonZeroError {
    IsZero,
}

struct NonZero(i32);

impl TryFrom<i32> for NonZero {
    type Error = NonZeroError;
    fn try_from(value: i32) -> Result<Self, Self::Error> {
        if value == 0 {
            Err(NonZeroError::IsZero)
        } else {
            Ok(NonZero(value))
        }
    }
}
学新通

Usage

use std::convert::{TryFrom, TryInto};
enum NonZeroError {
    IsZero,
}

struct NonZero(i32);

impl TryFrom<i32> for NonZero {
    type Error = NonZeroError;
    fn try_from(value: i32) -> Result<Self, Self::Error> {
        if value == 0 {
            Err(NonZeroError::IsZero)
        } else {
            Ok(NonZero(value))
        }
    }
}

fn main() {
    match NonZero::try_from(9) {
        Ok(nonzero) => println!("not zero"),
        Err(e) => println!("is zero"),
    }

    let whoops: Result<NonZero, _> = 0_i32.try_into();
    match whoops {
        Ok(nonzero) => println!("not zero"),
        Err(e) => println!("is zero"),
    }
}
学新通

Recap

  • TryFrom/TryInto allow conversion between types
    • Conversion can fail
  • Prefer implementing TryFrom over TryInto
    • TryInto gets implemented automatically when TryFrom is implemented

Demo From/Into

Example1

struct Uppercase(String);

impl From<String> for Uppercase {
    fn from(data: String) -> Self {
        Uppercase(data.to_uppercase())
    }
}

impl From<&str> for Uppercase {
    fn from(data: &str) -> Self {
        Uppercase(data.to_uppercase())
    }
}

impl std::fmt::Display for Uppercase {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}", self.0)
    }
}
fn main() {
    let upper = Uppercase::from("lowercase");
    println!("{}", upper);
    let upper: Uppercase = "lowercase".into();
    println!("{}", upper);
}
学新通

Example

enum KeyPress {
    Down,
    Up,
}

struct KeyEvent {
    keycode: u16,
    state: KeyPress,
}

enum InputEvent {
    Key(u16, KeyPress),
    Mouse,
}

impl From<KeyEvent> for InputEvent {
    fn from(ev: KeyEvent) -> Self {
        InputEvent::Key(ev.keycode, ev.state)
    }
}

fn main() {
    let key_ev = KeyEvent {
        keycode: 5,
        state: KeyPress::Down,
    };

    let input_ev = InputEvent::from(key_ev);

    let key_ev = KeyEvent {
        keycode: 6,
        state: KeyPress::Up,
    };
    let input_ev: InputEvent = key_ev.into();
}
学新通

Example

use thiserror::Error;

#[derive(Debug, Error)]
enum NetworkError {
    #[error("connection timed out")]
    Timeout,
}

#[derive(Debug, Error)]
enum DatabaseError {
    #[error("error querying database")]
    QueryFailure,
}

#[derive(Debug, Error)]
enum ApiError {
    #[error("network error:{0}")]
    Network(NetworkError),
    #[error("databse error:{0}")]
    Database(DatabaseError),
}

impl From<NetworkError> for ApiError {
    fn from(err: NetworkError) -> Self {
        Self::Network(err)
    }
}

impl From<DatabaseError> for ApiError {
    fn from(err: DatabaseError) -> Self {
        Self::Database(err)
    }
}

fn do_stuff() -> Result<(), ApiError> {
    Err(NetworkError::Timeout)?
}
学新通

Activity TryFrom/TryInto

use std::convert::TryFrom;
use thiserror::Error;

#[derive(Debug, Error)]
enum RgbError {
    #[error("hex colors must begin with a hash (#)")]
    MissingHash,
    #[error("failed to parse hex digit:{0}")]
    ParseError(std::num::ParseIntError),
    #[error("invalid hex color length (must be 6)")]
    LengthError,
}

#[derive(Debug, Eq, PartialEq)]
struct Rgb(u8, u8, u8);

impl TryFrom<&str> for Rgb {
    type Error = RgbError;
    fn try_from(hex: &str) -> Result<Self, Self::Error> {
        if !hex.starts_with('#') {
            return Err(RgbError::MissingHash);
        }

        if hex.len() != 7 {
            return Err(RgbError::LengthError);
        }

        let (r, g, b) = (
            u8::from_str_radix(&hex[1..=2], 16)?,
            u8::from_str_radix(&hex[3..=4], 16)?,
            u8::from_str_radix(&hex[5..=6], 16)?,
        );

        Ok(Self(r, g, b))
    }
}

impl From<std::num::ParseIntError> for RgbError {
    fn from(err: std::num::ParseIntError) -> Self {
        Self::ParseError(err)
    }
}

fn main() {}

#[cfg(test)]
mod test {
    use super::Rgb;
    use std::convert::TryFrom;

    #[test]
    fn converts_valid_hex_color() {
        let expected = Rgb(0, 204, 102);
        let actual = Rgb::try_from("#00cc66");
        assert_eq!(
            actual.is_ok(),
            true,
            "valid hex code should be converted to Rgb"
        );
        assert_eq!(actual.unwrap(), expected, "wrong converted to Rgb");
    }

    #[test]
    fn fails_on_invalid_hex_digits() {
        assert_eq!(
            Rgb::try_from("#0011yy").is_err(),
            true,
            "should be an error with invalid hex color"
        );
    }

    #[test]
    fn fails_when_missing_hash() {
        assert_eq!(
            Rgb::try_from("001100").is_err(),
            true,
            "should be an error with missing hash symbol"
        );
    }
}
学新通

Numeric Primitives Limits & Type Conversion

Many Numeric Types

  • 8, 16, 32, 64, and 128 bit integers
    • Signed & unsigned
  • isize & usize
    • Pointer sized numeric types
      • usize used to index into arrays
    • Depends on architecture: 16bit, 64bit, etc
  • 32bit & 64bit floating point

Min/Max: Unsigned Integer

  Type     Min     Max
  u8        0      255
  u16       0      65535
  u32       0      4294967295
  u64       0      18446744073709551615
  u128      0      <BIG>

Min/Max: Signed Integer

             Min
    Type     Max
            -128
    i8       127
            -32768
    i16      32767
            -2147483648
    i32      2147483647
            -9223372036854775808
    i64      9223372036854775807
            -<BIG>
    i128     <BIG>

Literal Numeric Annotations

15u8;
-12i16;
999_usize;
13_456_019u32;
17.7f32;

Type Safety

let whoops = 300u8;

error: literal out of range for `u8`
 --> src/main.rs:2:18
  |
2 |     let whoops = 300u8;
  |                  ^^^^^
  |
  = note: `#[deny(overflowing_literals)]` on by default
  = note: the literal `300u8` does not fit into the type `u8` whose range is `0..=255`

Conversion

  • Integers can be converted between types
    • u8 will always fit into a u16
      • Lossless conversion
    • u16 cannot fit into u8, but it can still be converted
      • Value will be a number in the range of the target type
  • Math operations require all operands to be the same type
    • Convert to the largest type needed

Cast Syntax

let a = 15u8 as u16;
let b = a as u8   20u16 as u8;

Casting to less bits

  • (Source value) - (Target max 1)
    • Repeat until the value fits in the type
  • Alternatively: (Source value) modulus (Target max 1)
  • This happens automatically when using as to convert
                      Source        Target
                        u16 ----->    u8
 600u16 as u8 /*=88*/ 0..65535      0..255
                         600 - 256 = 344
                         344 - 256 = 88

Converting Floats To Integer

  • Float to integer is a saturating conversion
    • The value will be clamped to the minimum or maximum of the target type
  • Decimal points are truncated/dropped
800.5f32 as u8 // =255
-300f32  as u8 // = 0          Source     Target
                                f32  -->  u8  0..255
800.5f32 as i8 // = 127         f32  --> i8 -128..127
-300f32  as i8 // = -128

Checked Casting

u8::try_from(300u16);

Recap

  • Numeric types can be cast using the as keyword
  • Use TryFrom when you want to be sure the value will properly fit
  • Annotations can be used with numeric literals to specify the type
    • Can use undercore(_) as a digit separator
  • Compiler error to create a numeric literal outside of appropriate range

Demo Advanced Closures

fn math(a: i32, b: i32, op: Box<dyn Fn(i32, i32) -> i32>) -> i32 {
    op(a, b)
}

fn main() {
    let name = "Jayson";
    let add = Box::new(move |a, b| {
        println!("add name:{}", name);
    a   b});

    // name no longer accessible 
    let sub = Box::new(|a, b| a - b);
    let mul = Box::new(|a, b| a * b);

    println!("{}", math(2, 2, add));
    println!("{}", math(2, 2, sub));
    println!("{}", math(2, 2, mul));
}
学新通

Parallel Execution Threads

Thread Basics

  • A thread uses serial execution
    • Each line of code is executed one at a time
  • Multicore CPUs can have multiple threads
    • Threads still executes serially
    • Each thread can execute different tasks
      • Better CPU utilization
  • Threads are isolated from one another
    • Require addtional work to communicate
      • Should communicate infrequently for performance reasons
        Working With Threads
  • Threads are “spawned”(created)
    • Threads can spawn threads
    • Use the “main” thread for spawning in most cases
      • fn main() is the main thread
  • Code is no longer executed line-by-line with threads
    • Requires careful planning
  • When a thread completes work, it should be “joined” back into the main thread
    • Ensures that the thread has completed
      Thread Memory
  • Threads have “thread-local” memory
    • Owned by the thread
    • Only accessible in the thread
  • Data can be copied or moved into threads
    • Can be done when thread created
    • Becomes thread-local
      Spawning a Thread
use std::thread;
// JoinHandle<type>
let handle = thread::spawn(move||{
    // .. code ..
});
handle.join();

Recap

  • Threads are non-deterministic
    • Execution order will vary each time the program runs
  • Ending the main thread will terminate all spawned threads
    • Join on the main thread to wait for threads to complete
  • Each thread has it’s own chunk of memory

Demo Threads

use std::thread::{self, JoinHandle};
use std::time::Duration;

fn main() {
    let iterations = 10;
    let a = thread::spawn(move || {
        for i in 1..=iterations {
            println!("A: {}", i);
        }
    });

    let b = thread::spawn(move || {
        for i in 1..=iterations {
            println!("    B: {}", i);
        }
    });
    a.join();
    b.join();

    let value: JoinHandle<usize> = thread::spawn(move || {
        thread::sleep(Duration::from_secs(2));
        42
    });

    match value.join() {
        Ok(n) => println!("value:{}", n),
        Err(e) => println!("error joining thread:{:?}", e),
    }
// sending a vector to the thread
    let data = vec!['a', 'b', 'c', 'd'];
    let caps = thread::spawn(move || {
        let data: Vec<char> = data.iter().map(|c| c.to_ascii_uppercase()).collect();
        data
    });

    println!("waiting for value");
    match caps.join() {
        Ok(n) => println!("value:{:?}", n),
        Err(e) => println!("error joining thread:{:?}", e),
    }
}
学新通

Activity Threads

use std::thread::{self, JoinHandle};
use std::time::Duration;

fn msg_hello() -> &'static str {
    std::thread::sleep(Duration::from_millis(1000));
    "Hello, "
}

fn msg_thread() -> &'static str {
    std::thread::sleep(Duration::from_millis(1000));
    "Thread, "
}
fn msg_exciting() -> &'static str {
    std::thread::sleep(Duration::from_millis(1000));
    "Exciting"
}

fn main() {
    let msg_one = thread::spawn(move || msg_hello());
    let msg_two = thread::spawn(move || msg_thread());
    let msg_three = thread::spawn(move || msg_exciting());

    let msg_one = msg_one.join().expect("failed to join msg one");
    let msg_two = msg_two.join().expect("failed to join msg two");
    let msg_three = msg_three.join().expect("failed to join msg three");

    println!("{} {} {}", msg_one, msg_two, msg_three);
}
学新通

Parallel Execution Channels

Channels

  • One-way communication between threads
    • Message passing
    • Sender and Receiver
  • Can have limited or unlimited capacity
  • crossbeam-channel crate
    • Use docs.rs website to view documentation for crates
[dependencies]
crossbeam-channel = "*"

Message Passing

  • enum commonly used for messages
    • match allows easy message handling
  • Guaranteed in-order delivery
  • Can be blocking or non-blocking
    • Block on Sender: Cahannel full
    • Block on Receiver: No messages
    • Behavior determined by function,not by channel

Example

use crossbeam_channel::unbounded;
fn main() {
    // Sender<type>  Receiver<type>
    let (sender, receiver) = unbounded();
    sender.send("hello channle");
    // waiting until there's a messsage on the channel
    match receiver.recv() {
        Ok(msg) => println!("receive msg: {}", msg),
        Err(e) => println!("receive error: {:?}", e),
    }
}

Recap

  • Channels offer unidirectional communication
  • Composed of Send and Receive ends
    • Ends can be cloned and sent to threads
  • Channel operations can be blocking or non-blocking
  • Any data can be sent across a channel
    • enum is useful because of variants

Demo Channels

use crossbeam_channel::unbounded;

use std::thread;
enum ThreadMsg {
    PrintData(String),
    Sum(i64, i64),
    Quit,
}

fn main() {
    let (s, r) = unbounded();

    let handle = thread::spawn(move || loop {
        match r.recv() {
            Ok(msg) => match msg {
                ThreadMsg::PrintData(d) => println!("{}", d),
                ThreadMsg::Sum(lhs, rhs) => println!("{}   {} = {}", lhs, rhs, lhs   rhs),
                ThreadMsg::Quit => {
                    println!("thread terminating");
                    break;
                }
            },
            Err(e) => {
                println!("disconnected");
                break;
            }
        }
    });

    s.send(ThreadMsg::PrintData("hello from main".to_owned()));
    s.send(ThreadMsg::Sum(10, 20));
    s.send(ThreadMsg::Quit);
    
    drop(s);

    handle.join();
}
学新通

Demo Bidirectional Thread Communication

use crossbeam_channel::unbounded;

use std::thread;
enum WorkerMsg {
    PrintData(String),
    Sum(i64, i64),
    Quit,
}

enum MainMsg {
    SumResult(i64),
    WorkerQuit,
}

fn main() {
    let (worker_tx, worker_rx) = unbounded();
    let (main_tx, main_rx) = unbounded();

    let worker = thread::spawn(move || loop {
        match worker_rx.recv() {
            Ok(msg) => match msg {
                WorkerMsg::PrintData(d) => println!("{}", d),
                WorkerMsg::Sum(lhs, rhs) => {
                    println!("Worker: summing......");
                    main_tx.send(MainMsg::SumResult(lhs   rhs));
                }
                WorkerMsg::Quit => {
                    println!("Worker: terminating");
                    main_tx.send(MainMsg::WorkerQuit);
                    break;
                }
            },
            Err(e) => {
                println!("disconnected");
                main_tx.try_send(MainMsg::WorkerQuit);
                break;
            }
        }
    });

    worker_tx.send(WorkerMsg::PrintData("hello from main".to_owned()));
    worker_tx.send(WorkerMsg::Sum(10, 20));
    worker_tx.send(WorkerMsg::Quit);

    while let Ok(msg) = main_rx.recv() {
        match msg {
            MainMsg::SumResult(answer) => println!("Main: answer = {}", answer),
            MainMsg::WorkerQuit => println!("Main: worker terminated"),
        }
    }

    worker.join();
}
学新通

Activity Channels

use colored::*;
use crossbeam_channel::{unbounded, Receiver};
use std::thread::{self, JoinHandle};

enum LightMsg {
    ChangeColor(u8, u8, u8),
    Disconnect,
    Off,
    On,
}

enum LightStatus {
    Off,
    On,
}

fn spawn_light_thread(receiver: Receiver<LightMsg>) -> JoinHandle<LightStatus> {
    let handle = thread::spawn(move || {
        let mut light_status = LightStatus::Off;
        loop {
            if let Ok(msg) = receiver.recv() {
                match msg {
                    LightMsg::ChangeColor(r, g, b) => {
                        println!("color changed to:{}", "     ".on_truecolor(r, g, b));
                        match light_status {
                            LightStatus::Off => println!("Light is OFF"),
                            LightStatus::On => println!("Light is ON"),
                        }
                    }
                    LightMsg::On => {
                        println!("Truned light on");
                        light_status = LightStatus::On;
                    }
                    LightMsg::Off => {
                        println!("Truned light off");
                        light_status = LightStatus::Off;
                    }
                    LightMsg::Disconnect => {
                        println!("disconnecting");
                        light_status = LightStatus::Off;
                        break;
                    }
                }
            } else {
                println!("channel disconnecting");
                light_status = LightStatus::Off;
                break;
            }
        }

        light_status
    });

    handle
}

fn main() {
    let (s, r) = unbounded();
    let light = spawn_light_thread(r);

    s.send(LightMsg::On);
    s.send(LightMsg::ChangeColor(255, 0, 0));
    s.send(LightMsg::ChangeColor(0, 255, 0));
    s.send(LightMsg::ChangeColor(0, 0, 255));
    s.send(LightMsg::Off);
    s.send(LightMsg::Disconnect);

    let light_status = light.join();
}
学新通

Shared Ownership Smart Pointers

Smart Pointers

  • Allow multiple owners of data
  • Reference counted - “Rc”
    • Data deleted only when last owner is dropped
  • Atomic reference counted - “Arc”
    • Safe to use with multiple threads
      Recap
  • Rc & Arc are used to share ownership
  • Data is dropped once all owners are dropped
  • Rc for single-threading
    • Rc::clone to make a new reference
  • Arc for multi-threading
    • Arc::clone to make a new reference

Interior Mutability Cell & RefCell

Interior Mutability

  • Mutable data is somethings problematic
    • Compiler, errors, ownership issue, etc.
  • Possible to create permanently mutable memory
    • Less restrictive than compiler
      • Trade-offs in implementation & performance
        Cell
  • Permanently mutable memory location
    • Can always be mutated,even if the containing structure is immutable
  • Accesing Cell data always results in a move or copy
  • Data should be copy-able
    • #[derive(Clone, Copy)]
    • Inefficient for large data types
      • Limit to numbers and booleans
  • Prefer mut
    RefCell
  • Permanently mutable memory location
    • Can always be mutated, even if the containing structure is immutable
  • Accessing RefCell data always results in a borrow
    • Efficient data access(compared to Cell)
    • Borrow checked at runtime
      • Will panic at runtime if rules are broken
      • Only one mutable borrow at a time
  • Prefer &mut
  • Not thread-safe
    Recap
  • Cell & RefCell allow permanent mutation
    • Cell returns owned data
    • RefCell returns borrowed data
  • RefCell borrowing can panic at runtime
    • *try_borrow and try_borrow_mut are non-panicking version
  • Prefer to use mut and &mut
    • Use Cell & RefCell only when it’s not possible to express intentions otherwise
  • Not thread-safe

Demo Smart Pointers & Interior Mutability

use std::borrow::BorrowMut;
use std::cell::RefCell;
use std::rc::Rc;

#[derive(Debug)]
enum MenuItem {
    Drink,
    Salad,
}

#[derive(Debug)]
struct ItemOrder {
    item: MenuItem,
    quantity: u32,
}

#[derive(Debug)]
struct TableOrder {
    items: Vec<ItemOrder>,
}

fn new_table_order() -> TableOrder {
    TableOrder {
        items: vec![ItemOrder {
            item: MenuItem::Drink,
            quantity: 1,
        }],
    }
}

type Order = Rc<RefCell<Vec<TableOrder>>>;

#[derive(Debug)]
struct Chef(Order);

#[derive(Debug)]
struct WaitStaff(Order);

#[derive(Debug)]
struct Accounting(Order);

fn main() {
    let orders = Rc::new(RefCell::new(vec![]));

    let chef = Chef(Rc::clone(&orders));
    let wait_staff = WaitStaff(Rc::clone(&orders));
    let account = Accounting(Rc::clone(&orders));

    let order = new_table_order();
    {
        (*orders).borrow_mut().push(order);
    }

    dbg!(chef.0.borrow());
    drop(chef);
    dbg!(wait_staff.0.borrow());
    dbg!(account.0.borrow());
}
学新通

Activity Smart Pointers & RefCell

use std::borrow::BorrowMut;
use std::cell::RefCell;
use std::rc::Rc;

#[derive(Debug)]
enum Vehicle {
    Car,
    Truck,
}

#[derive(Debug, Eq, PartialEq)]
enum Status {
    Available,
    Unavailable,
    Rented,
    Maintenance,
}

#[derive(Debug)]
struct Rental {
    status: Status,
    vehicle: Vehicle,
    vin: String,
}

struct Corporate(Rc<RefCell<Vec<Rental>>>);

struct StoreFront(Rc<RefCell<Vec<Rental>>>);

#[cfg(test)]
mod test {
    use super::*;

    #[test]
    fn updata_status() {
        let vehicles = vec![
            Rental {
                status: Status::Available,
                vehicle: Vehicle::Car,
                vin: "123".to_owned(),
            },
            Rental {
                status: Status::Maintenance,
                vehicle: Vehicle::Truck,
                vin: "abc".to_owned(),
            },
        ];
        let vehicle = Rc::new(RefCell::new(vehicles));

        let corporate = Corporate(Rc::clone(&vehicle));
        let storefront = StoreFront(Rc::clone(&vehicle));

        {
            let mut rentals = (*storefront.0).borrow_mut();
            if let Some(car) = rentals.get_mut(0) {
                assert_eq!(car.status, Status::Available);
                car.status = Status::Rented;
            }
        }

        {
            let mut rentals = (*corporate.0).borrow_mut();
            if let Some(car) = rentals.get_mut(0) {
                assert_eq!(car.status, Status::Rented);
                car.status = Status::Available;
            }
        }

        let rentals = (*storefront.0).borrow();
        if let Some(car) = rentals.get(0) {
            assert_eq!(car.status, Status::Available);
        }
    }
}
学新通

Shared Ownership Threads & Mutex

Shared Data w/Threading

  • Threads execute non-deterministically
    • Can read/write at random times
  • Multiple threads can work with the same data
    • Data can become corrupted easily
      • Difficult to work with threads
        Synchronizition
  • Data needs to be synchronized for safe access
  • Common synchronization primitive is a Mutex
    • Mutually Exclusive lock
  • Uses atomic operations to ensure that data is only accessed by one thread at a time
    • Atomic operations are “all or nothing” operations, enforced by the CPU
      • Data stays consistent
        Mutex
  • Mutexes wrap data, making data mutually exclusive
    • Only one thread can access at a time
    • All other threads will wait until finished
  • Mutexes cannot be shared among threads
    • Wrap with a smart pointer(Arc)
    • Share the Arc among threads
  • Use parking_lot crate for a Mutex
    • Better API & performance than stdlib
      How Mutex Works: Locks
use parking_lot::Mutex;
use std::sync::Arc;
use std::thread;

struct Counter(usize);

fn main() {
    let counter = Counter(0);
    // Arc<Mutex<Counter>>
    let shared_counter = Arc::new(Mutex::new(counter));
    let thread_1_counter = Arc::clone(&shared_counter);
    let thread_2_counter = shared_counter.clone();

    let thread_1 = thread::spawn(move || {
        let mut counter = thread_1_counter.lock();
        counter.0  = 1;
    });

    let thread_2 = thread::spawn(move || {
        let mut counter = thread_2_counter.lock();
        counter.0  = 1;
    });

    thread_1.join().and_then(|_| thread_2.join());
    println!("{}", shared_counter.lock().0);
}
学新通

Recap

  • Data access from threads must be synchronized
    • Wrap data in a Mutex
    • Use .lock() to acquire a lock
    • Unlocking occurs when the lock is dropped
  • Mutexes cannot be shared
    • Wrap in Arc to share between threads
  • Lock a minimum amount of time by performing computations before taking a lock

Threading Deadlocks

  • A deadlock is a situation where locks are waiting on one another
    • Threads become “stuck” and are unable to continue
  • Deadlocks can occur when:
    • Using multiple locks
    • Recursing while taking a lock
    • Locking the same lock twice

Recursice Deadlock Example

use parking_lot::Mutex;
fn recurse(data: Rc<Mutex<u32>>, remaining: usize) -> usize{
    let mut locked = data.lock();
    match remaining {
        rem if rem == 0 => 0,
        rem => recurse(Rc::clone(&data), rem - 1),
    }
}

Fix Deadlock - ReentrantMutex

use parking_lot::ReentrantMutex;
fn recurse(data: Rc<ReentrantMutex<u32>>, remaining: usize) -> usize{
    let mut locked = data.lock();
    match remaining {
        rem if rem == 0 => 0,
        rem => recurse(Rc::clone(&data), rem - 1),
    }
}

Threaded Deadlock Example

type ArcAccount = Arc<Mutex<Account>>;
struct Account {
    balance: i64,
}

fn transfer(from: ArcAccount, to: ArcAccount, amount: i64) {
    let mut from = from.lock();
    let mut to = to.lock();
    from.balance -= amount;
    to.balance  = amount;
}

let t1 = thread::spawn(move || {
    transfer(a, b, 500);
});
let t2 = thread::spawn(move || {
    transfer(b, a, 800);
});
学新通

Fix Deadlock - Retry On Failure

fn transfer(from: ArcAccount, to: ArcAccount, amount: i64) {
    loop {
        if let Some(mut from) = from.try_lock(); {
            if let Some(mut to) = to.try_lock(); {
                from.balance -= amount;
                to.balance  = amount;
            }
        }
       
        thread::sleep(Duration::from_millis(2));
    }
}

Thread Contention / Backoff

use backoff::ExponentialBackoff;

fn transfer(from: ArcAccount, to: ArcAccount, amount: i64) {
    let op = || {
        if let Some(mut from) = from.try_lock(); {
            if let Some(mut to) = to.try_lock(); {
                from.balance -= amount;
                to.balance  = amount;
                return Ok(());
            }
        }
        Err(0)?
    };
    let backoff = ExponentialBackoff::default();
    backoff::retry(backoff, op);
}
学新通

Recap

  • Deadlocks are permanently stuck locks
  • ReentrantMutex allows multiple locks from the same thread
    • Use for recursive functions
    • Anytime you need to lock the same lock more than once
  • try_lock() can prevent deadlocks
    • Drop all locks used in function and try again afer a short period
    • Use the backoff crate for optimal performance

Demo Mutex

use parking_lot::Mutex;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

type SharedSignData = Arc<Mutex<String>>;

struct DigitalSignBoard {
    display: SharedSignData,
}

impl DigitalSignBoard {
    fn update(&self) {
        let data = self.display.lock();
        println!("sign data = '{}'", data);
    }
}

fn spawn_display_thread(display_data: SharedSignData) {
    thread::spawn(|| {
        let board = DigitalSignBoard {
            display: display_data,
        };

        loop {
            board.update();
            thread::sleep(Duration::from_millis(200));
        }
    });
}

fn change_data(display_data: SharedSignData, new_data: &str) {
    let mut data = display_data.lock();
    *data = new_data.to_owned();
}

fn main() {
    let display_data = Arc::new(Mutex::new("initial".to_owned()));
    spawn_display_thread(Arc::clone(&display_data));

    thread::sleep(Duration::from_millis(100));
    change_data(Arc::clone(&display_data), "message 1");

    thread::sleep(Duration::from_millis(600));
    change_data(Arc::clone(&display_data), "another message");

    thread::sleep(Duration::from_millis(600));
    change_data(Arc::clone(&display_data), "goodbye");

    thread::sleep(Duration::from_millis(600));
}
学新通

Arctivity Mutex

use crossbeam_channel::{unbounded, Receiver, Sender};
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::Duration;
use std::vec;

/// job given to workers
#[derive(Clone)]
enum Job {
    Print(String),
    Sum(isize, isize),
}

/// message sent to workers
enum Message {
    AddJob(Job),
    Quit,
}

struct Worker<M> {
    tx: Sender<M>,
    _rx: Receiver<M>,
    handle: JoinHandle<()>,
}

impl Worker<Message> {
    fn add_job(&self, job: Job) {
        self.tx
            .send(Message::AddJob(job))
            .expect("failed to add job")
    }

    fn join(self) {
        self.handle.join().expect("failed to join thread");
    }

    fn send_msg(&self, msg: Message) {
        self.tx.send(msg).expect("failed to send message");
    }
}

/// create a new worker to receive jobs
fn spawn_worker(counter: Arc<Mutex<isize>>) -> Worker<Message> {
    let (tx, rx) = unbounded();

    let rx_thread = rx.clone();
    let handle = thread::spawn(move || {
        let mut jobs = VecDeque::new();
        loop {
            loop {
                for job in jobs.pop_front() {
                    match job {
                        Job::Print(msg) => println!("{}", msg),
                        Job::Sum(lhs, rhs) => println!("{}   {} = {}", lhs, rhs, lhs   rhs),
                    }
                    let mut counter = counter.lock();
                    *counter  = 1;
                }

                if let Ok(msg) = rx_thread.try_recv() {
                    match msg {
                        Message::AddJob(job) => {
                            jobs.push_back(job);
                            continue;
                        }
                        Message::Quit => return,
                    }
                } else {
                    break;
                }
            }

            thread::sleep(Duration::from_millis(100));
        }
    });

    Worker {
        tx,
        _rx: rx,
        handle,
    }
}

fn main() {
    let jobs = vec![
        Job::Print("hello".to_owned()),
        Job::Sum(2, 2),
        Job::Print("world".to_owned()),
        Job::Sum(4, 4),
        Job::Print("two words".to_owned()),
        Job::Sum(1, 1),
        Job::Print("a print job".to_owned()),
        Job::Sum(10, 10),
        Job::Print("message".to_owned()),
        Job::Sum(3, 4),
        Job::Print("thread".to_owned()),
        Job::Sum(8, 8),
    ];

    let jobs_sent = jobs.len();
    let mut job_counter = Arc::new(Mutex::new(0));
    let mut workers = vec![];
    for _ in 0..4 {
        let worker = spawn_worker(Arc::clone(&job_counter));
        workers.push(worker);
    }

    let mut worker_ring = workers.iter().cycle();
    for job in jobs.into_iter() {
        let worker = worker_ring.next().expect("failed to get worker");
        worker.add_job(job);
    }
    // ask all workers to quit
    for worker in &workers {
        worker.send_msg(Message::Quit);
    }
    // wait for workers to terminate
    for worker in workers {
        worker.join();
    }

    println!("jobs sent:{}", jobs_sent);
    let jobs_completed = job_counter.lock();
    println!("jobs_completed: {}", jobs_completed);
}
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgkkagf
系列文章
更多 icon
同类精品
更多 icon
继续加载