Rust语言在并发编程领域的一大特色是其无锁并发模型,这主要得益于其所有权系统和生命周期的概念,以及对内存安全的静态保证。在Rust中,无锁编程是指在不使用传统的互斥锁、读写锁等同步原语的情况下,实现线程安全的并发数据结构和算法。下面是Rust无锁并发编程的关键点:
std::sync::atomic
模块,其中包括原子整型和布尔类型,以及原子引用等。原子操作允许在多线程环境中进行安全的内存访问,而不必加锁。std::sync::atomic
模块提供了多种原子类型和操作,包括:compare_and_swap
(CAS),load
,store
,fetch_add
等,这些操作提供了在多线程环境下安全修改原子类型的方法,同时确保操作的原子性和内存顺序。crossbeam-queue
库中的队列):通过CAS操作实现入队和出队操作,允许多个生产者和消费者并发操作。以下通过示例直观的观察无锁的实现:
use std::sync::atomic::{AtomicUsize, Ordering,AtomicPtr};
use std::sync::Arc;
use std::thread;
#[derive(Clone, Debug)]
struct SharedData {
// 使用AtomicUsize实现线程安全的计数器
counter: Arc<AtomicUsize>,
// 通过Arc共享不可变的字符串
message: Arc<String>,
}
impl SharedData {
fn new(message: &str) -> Self {
SharedData {
counter: Arc::new(AtomicUsize::new(0)),
message: Arc::from(message.to_owned()),
}
}
// 原子地增加计数器的值
fn increment_counter(&self) {
self.counter.fetch_add(1, Ordering::Relaxed);
}
// 获取当前计数器的值
fn get_counter(&self) -> usize {
self.counter.load(Ordering::SeqCst)
}
// 安全地获取共享的不可变消息
fn get_message(&self) -> &str {
&self.message
}
}
#[test]
fn testmain() {
// 创建共享数据实例
let shared_data = SharedData::new("Initial Message");
// 创建并运行线程来并发增加计数器的值
let mut handles = vec![];
for _ in 0..10 {
let data_ref = shared_data.clone();
let handle = thread::spawn(move || {
// 增加计数器
for _ in 0..100 {
data_ref.increment_counter();
}
// 可以安全地读取但不可修改消息
println!("Thread sees message: {}", data_ref.get_message());
});
handles.push(handle);
}
// 等待所有线程完成
for handle in handles {
handle.join().unwrap();
}
// 输出最终结果
println!("Final Counter: {}", shared_data.get_counter());
println!("Shared Message: {}", shared_data.get_message());
}
说明:
掌握上述示例中的一系列原子操作,可以举一反三,来实现rust中的无锁编程。
std::sync::atomic
模块,但Rust标准库不直接提供针对复杂类型如String的原子操作原语,需要用户自定义实现,这里实现一个类似AtomicUsize的 AtomicString类型,应用于多线程无锁模式中,考虑到在并发原因的CAS中,并不能避免ABA问题(ABA问题这里不做详述)。所以,我在 AtomicString加入版本控制来解决这个问题。use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::{ptr, thread};
struct AtomicString {
ptr: Arc<AtomicPtr<String>>,
version: Arc<AtomicUsize>,
}
impl AtomicString {
fn new(s: String) -> Self {
AtomicString {
ptr: Arc::new(AtomicPtr::new(Box::into_raw(Box::new(s)))),
version: Arc::new(AtomicUsize::new(0)),
}
}
fn update(&self, new_val: String) {
let mut spin_count = 0;
let mut yield_count = 0;
let new_ptr = Box::into_raw(Box::new(new_val));
loop {
let current_ptr = self.ptr.load(Ordering::Acquire);
let current_version = self.version.load(Ordering::Acquire);
let new_version = current_version + 1;
if self
.ptr
.compare_exchange_weak(current_ptr, new_ptr, Ordering::Release, Ordering::Relaxed)
.is_ok()
&& self
.version
.compare_exchange_weak(
current_version,
new_version,
Ordering::Release,
Ordering::Relaxed,
)
.is_ok()
{
unsafe {
if !current_ptr.is_null() {
drop(Box::from_raw(current_ptr));
}
}
break;
} else {
spin_count += 1;
if spin_count >= 30 {
spin_count = 0;
yield_count += 1;
thread::yield_now();
if yield_count >= 15 {
thread::sleep(Duration::from_millis(1));
yield_count = 0;
}
}
}
}
}
fn get(&self) -> Option<(String, usize)> {
let ptr = self.ptr.load(Ordering::Acquire);
if ptr.is_null() {
None
} else {
let version = self.version.load(Ordering::Acquire);
unsafe { Some(((*ptr).clone(), version)) }
}
}
}
说明:实现的核心在于update函数的实现。整个实现主要利用原子操作来实现无锁的更新和读取操作。
String
实例作为参数,将其转换为Box
然后得到裸指针,用AtomicPtr
持有,同时初始化版本号为0。compare_exchange_weak
尝试原子地更新ptr
和version
,如果当前值未被其他线程改变,则更新成功。要说明的是,自旋锁并非必要的。自旋锁如果持续时间过长,会损耗大量CPU资源,所以我这里加入了让步策略。spin_count
)和线程让步(thread::yield_now()
)来减少CPU占用,达到一定次数后甚至短暂休眠(thread::sleep(Duration::from_millis(1))
),以减少争用情况下的资源消耗。unsafe
块释放旧字符串的内存,确保内存安全。load(Ordering::Acquire)
来获取当前字符串的指针,确保之前对字符串的更新对这次读取可见。unsafe
调用(*ptr).clone()
来安全地克隆出一个String
实例返回。当然,为了程序功能可以比较完整,还需要实现 Drop trait,使其在离开作用域时能及时清理释放资源。
impl Drop for AtomicString {
fn drop(&mut self) {
let ptr = self.ptr.load(Ordering::Relaxed);
unsafe {
if !ptr.is_null() {
drop(Box::from_raw(ptr));
}
}
}
}
Rust的无锁并发编程模型通过其强大的类型系统和所有权模型,在编译时提供了内存安全的并发保证,减少了运行时的开销,提升了程序的性能和可靠性。通过原子操作和精心设计的数据结构,开发者能够实现高性能的并发逻辑,避免了传统锁机制的潜在死锁和性能瓶颈。对此开发者可能需要更多深入理解并发编程的原理和Rust的特性。