Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Lib/test/test_asyncio/test_unix_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1179,8 +1179,6 @@ async def runner():
wsock.close()


# TODO: RUSTPYTHON, fork() segfaults due to stale parking_lot global state
@unittest.skip("TODO: RUSTPYTHON")
@support.requires_fork()
class TestFork(unittest.TestCase):

Expand Down
31 changes: 31 additions & 0 deletions crates/common/src/lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,34 @@ pub type PyRwLockWriteGuard<'a, T> = RwLockWriteGuard<'a, RawRwLock, T>;
pub type PyMappedRwLockWriteGuard<'a, T> = MappedRwLockWriteGuard<'a, RawRwLock, T>;

// can add fn const_{mutex,rw_lock}() if necessary, but we probably won't need to

/// Reset a `PyMutex` to its initial (unlocked) state after `fork()`.
///
/// After `fork()`, locks held by dead parent threads would deadlock in the
/// child. This zeroes the raw lock bytes directly, bypassing the normal unlock
/// path which may interact with parking_lot's internal waiter queues.
///
/// # Safety
///
/// Must only be called from the single-threaded child process immediately
/// after `fork()`, before any other thread is created.
#[cfg(unix)]
pub unsafe fn reinit_mutex_after_fork<T: ?Sized>(mutex: &PyMutex<T>) {
// lock_api::Mutex<R, T> layout: raw R at offset 0, then UnsafeCell<T>.
// Zeroing R resets to unlocked for both parking_lot::RawMutex (AtomicU8)
// and RawCellMutex (Cell<bool>).
unsafe {
let ptr = mutex as *const PyMutex<T> as *mut u8;
core::ptr::write_bytes(ptr, 0, core::mem::size_of::<RawMutex>());
}
}

/// Return the current thread's parking_lot thread ID.
///
/// This is the same ID stored in the `owner` field of `RawReentrantMutex`
/// when the current thread holds it.
#[cfg(all(unix, feature = "threading"))]
pub fn current_thread_id() -> core::num::NonZeroUsize {
use lock_api::GetThreadId;
RawThreadId.nonzero_thread_id()
}
35 changes: 35 additions & 0 deletions crates/vm/src/stdlib/imp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,41 @@ mod lock {
fn lock_held(_vm: &VirtualMachine) -> bool {
IMP_LOCK.is_locked()
}

/// Reset import lock after fork() — only if held by a dead thread.
///
/// `IMP_LOCK` is a reentrant mutex. If the *current* (surviving) thread
/// held it at fork time, the child must be able to release it normally.
/// Only reset if a now-dead thread was the owner.
///
/// # Safety
///
/// Must only be called from single-threaded child after fork().
#[cfg(unix)]
pub(crate) unsafe fn reinit_after_fork() {
use core::sync::atomic::{AtomicUsize, Ordering};

unsafe {
// RawReentrantMutex layout: owner: AtomicUsize at offset 0
let owner_ptr = &IMP_LOCK as *const RawRMutex as *const AtomicUsize;
let owner = (*owner_ptr).load(Ordering::Relaxed);

if owner != 0 {
let current = rustpython_common::lock::current_thread_id().get();
if owner != current {
// Held by a dead thread — reset to unlocked
let ptr = &IMP_LOCK as *const RawRMutex as *mut u8;
core::ptr::write_bytes(ptr, 0, core::mem::size_of::<RawRMutex>());
}
}
}
}
}

/// Re-export for fork safety code in posix.rs
#[cfg(all(unix, feature = "threading"))]
pub(crate) unsafe fn reinit_imp_lock_after_fork() {
unsafe { lock::reinit_after_fork() }
}

#[cfg(not(feature = "threading"))]
Expand Down
42 changes: 39 additions & 3 deletions crates/vm/src/stdlib/posix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,23 +660,59 @@ pub mod module {
}

fn py_os_after_fork_child(vm: &VirtualMachine) {
// Reset low-level state before any Python code runs in the child.
// Signal triggers from the parent must not fire in the child.
// Phase 1: Reset all internal locks FIRST.
// After fork(), locks held by dead parent threads would deadlock
// if we try to acquire them. This must happen before anything else.
#[cfg(feature = "threading")]
reinit_locks_after_fork(vm);

// Phase 2: Reset low-level atomic state (no locks needed).
crate::signal::clear_after_fork();
crate::stdlib::signal::_signal::clear_wakeup_fd_after_fork();

// Reset weakref stripe locks that may have been held during fork.
#[cfg(feature = "threading")]
crate::object::reset_weakref_locks_after_fork();

// Mark all other threads as done before running Python callbacks
// Phase 3: Clean up thread state. Locks are now reinit'd so we can
// acquire them normally instead of using try_lock().
#[cfg(feature = "threading")]
crate::stdlib::thread::after_fork_child(vm);

// Phase 4: Run Python-level at-fork callbacks.
let after_forkers_child: Vec<PyObjectRef> = vm.state.after_forkers_child.lock().clone();
run_at_forkers(after_forkers_child, false, vm);
}

/// Reset all parking_lot-based locks in the interpreter state after fork().
///
/// After fork(), only the calling thread survives. Any locks held by other
/// (now-dead) threads would cause deadlocks. We unconditionally reset them
/// to unlocked by zeroing the raw lock bytes, following CPython's
/// `_PyRuntimeState_ReInitThreads` pattern.
#[cfg(all(unix, feature = "threading"))]
fn reinit_locks_after_fork(vm: &VirtualMachine) {
use rustpython_common::lock::reinit_mutex_after_fork;

unsafe {
// PyGlobalState PyMutex locks
reinit_mutex_after_fork(&vm.state.before_forkers);
reinit_mutex_after_fork(&vm.state.after_forkers_child);
reinit_mutex_after_fork(&vm.state.after_forkers_parent);
reinit_mutex_after_fork(&vm.state.atexit_funcs);
reinit_mutex_after_fork(&vm.state.global_trace_func);
reinit_mutex_after_fork(&vm.state.global_profile_func);

// PyGlobalState parking_lot::Mutex locks (same type as PyMutex)
reinit_mutex_after_fork(&vm.state.thread_frames);
reinit_mutex_after_fork(&vm.state.thread_handles);
reinit_mutex_after_fork(&vm.state.shutdown_handles);

// Import lock (RawReentrantMutex<RawMutex, RawThreadId>)
crate::stdlib::imp::reinit_imp_lock_after_fork();
}
}

fn py_os_after_fork_parent(vm: &VirtualMachine) {
let after_forkers_parent: Vec<PyObjectRef> = vm.state.after_forkers_parent.lock().clone();
run_at_forkers(after_forkers_parent, false, vm);
Expand Down
99 changes: 44 additions & 55 deletions crates/vm/src/stdlib/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,9 @@ pub(crate) mod _thread {

#[pymethod]
fn _at_fork_reinit(&self, _vm: &VirtualMachine) -> PyResult<()> {
if self.mu.is_locked() {
unsafe {
self.mu.unlock();
};
}
// Casting to AtomicCell is as unsafe as CPython code.
// Using AtomicCell will prevent compiler optimizer move it to somewhere later unsafe place.
// It will be not under the cell anymore after init call.

// Reset the mutex to unlocked by directly writing the INIT value.
// Do NOT call unlock() here — after fork(), unlock_slow() would
// try to unpark stale waiters from dead parent threads.
let new_mut = RawMutex::INIT;
unsafe {
let old_mutex: &AtomicCell<RawMutex> = core::mem::transmute(&self.mu);
Expand Down Expand Up @@ -247,11 +241,9 @@ pub(crate) mod _thread {

#[pymethod]
fn _at_fork_reinit(&self, _vm: &VirtualMachine) -> PyResult<()> {
if self.mu.is_locked() {
unsafe {
self.mu.unlock();
};
}
// Reset the reentrant mutex to unlocked by directly writing INIT.
// Do NOT call unlock() — after fork(), the slow path would try
// to unpark stale waiters from dead parent threads.
self.count.store(0, core::sync::atomic::Ordering::Relaxed);
let new_mut = RawRMutex::INIT;
unsafe {
Expand Down Expand Up @@ -884,104 +876,101 @@ pub(crate) mod _thread {

/// Called after fork() in child process to mark all other threads as done.
/// This prevents join() from hanging on threads that don't exist in the child.
///
/// Precondition: `reinit_locks_after_fork()` has already been called, so all
/// parking_lot-based locks in VmState are in unlocked state.
#[cfg(unix)]
pub fn after_fork_child(vm: &VirtualMachine) {
let current_ident = get_ident();

// Update main thread ident - after fork, the current thread becomes the main thread
vm.state.main_thread_ident.store(current_ident);

// Reinitialize frame slot for current thread
// Reinitialize frame slot for current thread.
// Locks are already reinit'd, so lock() is safe.
crate::vm::thread::reinit_frame_slot_after_fork(vm);

// Clean up thread handles if we can acquire the lock.
// Use try_lock because the mutex might have been held during fork.
// If we can't acquire it, just skip - the child process will work
// correctly with new handles it creates.
if let Some(mut handles) = vm.state.thread_handles.try_lock() {
// Clean up dead weak refs and mark non-current threads as done
// Clean up thread handles. All VmState locks were reinit'd to unlocked,
// so lock() won't deadlock. Per-thread Arc<Mutex<ThreadHandleInner>>
// locks are also reinit'd below before use.
{
let mut handles = vm.state.thread_handles.lock();
handles.retain(|(inner_weak, done_event_weak): &HandleEntry| {
let Some(inner) = inner_weak.upgrade() else {
return false; // Remove dead entries
return false;
};
let Some(done_event) = done_event_weak.upgrade() else {
return false;
};

// Try to lock the inner state - skip if we can't
let Some(mut inner_guard) = inner.try_lock() else {
return false;
};
// Reinit this per-handle lock in case a dead thread held it
reinit_parking_lot_mutex(&inner);
let mut inner_guard = inner.lock();

// Skip current thread and not-started threads
if inner_guard.ident == current_ident {
return true;
}
if inner_guard.state == ThreadHandleState::NotStarted {
return true;
}

// Mark as done and notify waiters
inner_guard.state = ThreadHandleState::Done;
inner_guard.join_handle = None; // Can't join OS thread from child
inner_guard.join_handle = None;
drop(inner_guard);

// Try to notify waiters - skip if we can't acquire the lock
// Reinit and set the done event
let (lock, cvar) = &*done_event;
if let Some(mut done) = lock.try_lock() {
*done = true;
cvar.notify_all();
}
reinit_parking_lot_mutex(lock);
*lock.lock() = true;
cvar.notify_all();

true
});
}

// Clean up shutdown_handles as well.
// This is critical to prevent _shutdown() from waiting on threads
// that don't exist in the child process after fork.
if let Some(mut handles) = vm.state.shutdown_handles.try_lock() {
// Mark all non-current threads as done in shutdown_handles
// Clean up shutdown_handles.
{
let mut handles = vm.state.shutdown_handles.lock();
handles.retain(|(inner_weak, done_event_weak): &ShutdownEntry| {
let Some(inner) = inner_weak.upgrade() else {
return false; // Remove dead entries
return false;
};
let Some(done_event) = done_event_weak.upgrade() else {
return false;
};

// Try to lock the inner state - skip if we can't
let Some(mut inner_guard) = inner.try_lock() else {
return false;
};
reinit_parking_lot_mutex(&inner);
let mut inner_guard = inner.lock();

// Skip current thread
if inner_guard.ident == current_ident {
return true;
}

// Keep handles for threads that have not been started yet.
// They are safe to start in the child process.
if inner_guard.state == ThreadHandleState::NotStarted {
return true;
}

// Mark as done so _shutdown() won't wait on it
inner_guard.state = ThreadHandleState::Done;
drop(inner_guard);

// Notify waiters
let (lock, cvar) = &*done_event;
if let Some(mut done) = lock.try_lock() {
*done = true;
cvar.notify_all();
}
reinit_parking_lot_mutex(lock);
*lock.lock() = true;
cvar.notify_all();

false // Remove from shutdown_handles - these threads don't exist in child
false
});
}
}

/// Reset a parking_lot::Mutex to unlocked state after fork.
#[cfg(unix)]
fn reinit_parking_lot_mutex<T: ?Sized>(mutex: &parking_lot::Mutex<T>) {
unsafe {
let ptr = mutex as *const parking_lot::Mutex<T> as *mut u8;
core::ptr::write_bytes(ptr, 0, core::mem::size_of::<parking_lot::RawMutex>());
}
}

// Thread handle state enum
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ThreadHandleState {
Expand Down
15 changes: 8 additions & 7 deletions crates/vm/src/vm/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,20 @@ pub fn cleanup_current_thread_frames(vm: &VirtualMachine) {

/// Reinitialize frame slot after fork. Called in child process.
/// Creates a fresh slot and registers it for the current thread.
///
/// Precondition: `reinit_locks_after_fork()` has already reset all
/// VmState locks to unlocked.
#[cfg(feature = "threading")]
pub fn reinit_frame_slot_after_fork(vm: &VirtualMachine) {
let current_ident = crate::stdlib::thread::get_ident();
let new_slot = Arc::new(parking_lot::Mutex::new(None));

// Try to update the global registry. If we can't get the lock
// (parent thread might have been holding it during fork), skip.
if let Some(mut registry) = vm.state.thread_frames.try_lock() {
registry.clear();
registry.insert(current_ident, new_slot.clone());
}
// Lock is safe: reinit_locks_after_fork() already reset it to unlocked.
let mut registry = vm.state.thread_frames.lock();
registry.clear();
registry.insert(current_ident, new_slot.clone());
drop(registry);

// Always update thread-local to point to the new slot
CURRENT_FRAME_SLOT.with(|s| {
*s.borrow_mut() = Some(new_slot);
});
Expand Down
Loading