Rust—无锁并发模型编程


Rust语言在并发编程领域的一大特色是其无锁并发模型,这主要得益于其所有权系统和生命周期的概念,以及对内存安全的静态保证。在Rust中,无锁编程是指在不使用传统的互斥锁、读写锁等同步原语的情况下,实现线程安全的并发数据结构和算法。下面是Rust无锁并发编程的关键点:

所有权系统
  • 所有权:Rust的核心特性之一,每个值都有一个明确的所有者,当所有者超出作用域时,值会被自动清理,这减少了内存管理的复杂性和竞态条件的风险。
  • 借用:通过引用和借用检查器(Borrow Checker),Rust在编译时强制执行所有权规则,确保同一时间内只有一个可变引用或多个不可变引用存在,避免数据竞争。
生命周期
  • 生命周期管理:Rust的生命周期概念帮助编译器理解数据的存活时间,确保引用不会悬空,这对于无锁编程至关重要,因为它防止了并发访问已释放的内存。
原子操作
  • 原子操作是无锁并发原语的核心
  • 原子类型:Rust标准库提供了std::sync::atomic模块,其中包括原子整型和布尔类型,以及原子引用等。原子操作允许在多线程环境中进行安全的内存访问,而不必加锁。
  • 无锁数据结构:通过原子操作和内存序(memory ordering)控制,可以构建无锁数据结构,如无锁队列、栈等,这些数据结构在多线程环境中可以安全地并发读写,而不需要显式锁。

  1. Rust实现无锁并发编程的关键在于利用原子类型原子操作。这些操作可以在多线程环境中直接在硬件层面进行,无需操作系统介入,从而避免了传统锁机制的开销和竞争条件。Rust标准库中的std::sync::atomic模块提供了多种原子类型和操作,包括:
    • AtomicPtr, AtomicBool, AtomicUsize, AtomicIsize, AtomicU8...AtomicU64, AtomicI8...AtomicI64:这些类型代表可以原子地读取和修改的整数或布尔值。
    • 原子操作:包括但不限于compare_and_swap (CAS),loadstorefetch_add等,这些操作提供了在多线程环境下安全修改原子类型的方法,同时确保操作的原子性和内存顺序。
  2. 利用原子操作,可以构建一系列无锁数据结构,例如:
    • 无锁队列(如crossbeam-queue库中的队列):通过CAS操作实现入队和出队操作,允许多个生产者和消费者并发操作。
    • 无锁栈:同样利用原子操作来实现压栈和弹栈的原子性,确保并发安全性。
    • 原子引用计数(Arc):允许在多个所有者之间共享数据,是一种实现无锁共享的基础工具。


以下通过示例直观的观察无锁的实现:

use std::sync::atomic::{AtomicUsize, Ordering,AtomicPtr};
use std::sync::Arc;
use std::thread;

#[derive(Clone, Debug)]
struct SharedData {
    // 使用AtomicUsize实现线程安全的计数器
    counter: Arc<AtomicUsize>,
    // 通过Arc共享不可变的字符串
    message: Arc<String>,
}

impl SharedData {
    fn new(message: &str) -> Self {
        SharedData {
            counter: Arc::new(AtomicUsize::new(0)),
            message: Arc::from(message.to_owned()),
        }
    }

    // 原子地增加计数器的值
    fn increment_counter(&self) {
        self.counter.fetch_add(1, Ordering::Relaxed);
    }

    // 获取当前计数器的值
    fn get_counter(&self) -> usize {
        self.counter.load(Ordering::SeqCst)
    }

    // 安全地获取共享的不可变消息
    fn get_message(&self) -> &str {
        &self.message
    }
}

#[test]
fn testmain() {
    // 创建共享数据实例
    let shared_data = SharedData::new("Initial Message");

    // 创建并运行线程来并发增加计数器的值
    let mut handles = vec![];
    for _ in 0..10 {
        let data_ref = shared_data.clone();
        let handle = thread::spawn(move || {
            // 增加计数器
            for _ in 0..100 {
                data_ref.increment_counter();
            }
            // 可以安全地读取但不可修改消息
            println!("Thread sees message: {}", data_ref.get_message());
        });
        handles.push(handle);
    }

    // 等待所有线程完成
    for handle in handles {
        handle.join().unwrap();
    }

    // 输出最终结果
    println!("Final Counter: {}", shared_data.get_counter());
    println!("Shared Message: {}", shared_data.get_message());
}

说明:

    • 程序中通过AtomicUsize类型的counter字段实现了无锁的计数器。fetch_add方法在增加计数器时使用了原子操作,确保了即使在多线程环境下,每次增加操作也是不可分割的,避免了数据竞争和不一致,替代传统的锁机制(如互斥锁),消除了锁带来的性能开销和潜在的死锁风险。
    • 使用Arc<T>(原子引用计数指针)来共享不可变数据(在这里是String类型的消息),允许多个线程安全地访问同一个数据副本,而无需额外的锁来保护数据。Arc内部通过原子操作来管理引用计数,确保了当所有引用消失时数据能够被正确清理,这也是无锁编程的一种体现,通过数据共享而非数据复制来提高效率和减少同步开销
    • 通过clone方法为每个线程创建SharedData的引用,不会引起数据的深拷贝或加锁操作,这使得线程间的数据传递高效且无锁。每个线程独立地执行increment_counter操作,完全异步且无须等待其他线程释放锁资源。
    • 在原子操作中选择内存序(Memory Ordering),如在fetch_add中使用了Ordering::Relaxed,在load中使用了Ordering::SeqCst保证了必要的同步要求,又避免了不必要的性能损失。

掌握上述示例中的一系列原子操作,可以举一反三,来实现rust中的无锁编程。

    • Rust标准库提供了std::sync::atomic模块,但Rust标准库不直接提供针对复杂类型如String的原子操作原语,需要用户自定义实现,这里实现一个类似AtomicUsize的 AtomicString类型,应用于多线程无锁模式中,考虑到在并发原因的CAS中,并不能避免ABA问题(ABA问题这里不做详述)。所以,我在 AtomicString加入版本控制来解决这个问题。
    • AtomicString.rs
use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::{ptr, thread};

struct AtomicString {
    ptr: Arc<AtomicPtr<String>>,
    version: Arc<AtomicUsize>,
}

impl AtomicString {
    fn new(s: String) -> Self {
        AtomicString {
            ptr: Arc::new(AtomicPtr::new(Box::into_raw(Box::new(s)))),
            version: Arc::new(AtomicUsize::new(0)),
        }
    }

    fn update(&self, new_val: String) {
        let mut spin_count = 0;
        let mut yield_count = 0;
        let new_ptr = Box::into_raw(Box::new(new_val));
        loop {
            let current_ptr = self.ptr.load(Ordering::Acquire);
            let current_version = self.version.load(Ordering::Acquire);
            let new_version = current_version + 1;

            if self
                .ptr
                .compare_exchange_weak(current_ptr, new_ptr, Ordering::Release, Ordering::Relaxed)
                .is_ok()
                && self
                    .version
                    .compare_exchange_weak(
                        current_version,
                        new_version,
                        Ordering::Release,
                        Ordering::Relaxed,
                    )
                    .is_ok()
            {
                unsafe {
                    if !current_ptr.is_null() {
                        drop(Box::from_raw(current_ptr));
                    }
                }
                break;
            } else {
                spin_count += 1;
                if spin_count >= 30 {
                    spin_count = 0;
                    yield_count += 1;
                    thread::yield_now();
                    if yield_count >= 15 {
                        thread::sleep(Duration::from_millis(1));
                        yield_count = 0;
                    }
                }
            }
        }
    }

    fn get(&self) -> Option<(String, usize)> {
        let ptr = self.ptr.load(Ordering::Acquire);
        if ptr.is_null() {
            None
        } else {
            let version = self.version.load(Ordering::Acquire);
            unsafe { Some(((*ptr).clone(), version)) }
        }
    }

}

说明:实现的核心在于update函数的实现。整个实现主要利用原子操作来实现无锁的更新和读取操作。

  • new
    • 接受一个String实例作为参数,将其转换为Box然后得到裸指针,用AtomicPtr持有,同时初始化版本号为0。
  • update
    • 自旋锁策略: 通过循环尝试更新字符串和版本号直到成功。使用compare_exchange_weak尝试原子地更新ptrversion,如果当前值未被其他线程改变,则更新成功。要说明的是,自旋锁并非必要的。自旋锁如果持续时间过长,会损耗大量CPU资源,所以我这里加入了让步策略。
    • 让步策略: 当更新失败时,通过自旋(spin_count)和线程让步(thread::yield_now())来减少CPU占用,达到一定次数后甚至短暂休眠(thread::sleep(Duration::from_millis(1))),以减少争用情况下的资源消耗。
    • 释放内存: 成功更新后,通过unsafe块释放旧字符串的内存,确保内存安全。
  • get
    • 安全访问: 使用load(Ordering::Acquire)来获取当前字符串的指针,确保之前对字符串的更新对这次读取可见。
    • 空检查与克隆: 检查指针是否为空,若非空,则通过unsafe调用(*ptr).clone()来安全地克隆出一个String实例返回。

当然,为了程序功能可以比较完整,还需要实现 Drop trait,使其在离开作用域时能及时清理释放资源。

impl Drop for AtomicString {
    fn drop(&mut self) {
        let ptr = self.ptr.load(Ordering::Relaxed);
        unsafe {
            if !ptr.is_null() {
                drop(Box::from_raw(ptr));
            }
        }
    }
}

Rust的无锁并发编程模型通过其强大的类型系统和所有权模型,在编译时提供了内存安全的并发保证,减少了运行时的开销,提升了程序的性能和可靠性。通过原子操作和精心设计的数据结构,开发者能够实现高性能的并发逻辑,避免了传统锁机制的潜在死锁和性能瓶颈。对此开发者可能需要更多深入理解并发编程的原理和Rust的特性。