From 5f8fb6a41b91b6c552b3b75affe0e426ac3cec3e Mon Sep 17 00:00:00 2001 From: "Jeong, YunWon" Date: Tue, 3 Feb 2026 10:10:49 +0900 Subject: [PATCH] Reinit parking_lot locks after fork in child process MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After fork(), only the calling thread survives. Locks held by dead parent threads stay locked forever, causing deadlocks when the child tries to acquire them. - Add reinit_pymutex_after_fork / reinit_pyrwlock_after_fork utilities that zero the raw lock bytes directly - Reset all PyGlobalState locks, thread-handle locks, and the import lock (IMP_LOCK) at the start of py_os_after_fork_child - Remove unlock() calls from _at_fork_reinit — unlock_slow() would attempt to unpark stale waiters from dead threads - Replace try_lock() with lock() in after_fork_child now that locks are guaranteed to be in unlocked state Note that this does not resolve any data deadlock issue --- Lib/test/test_asyncio/test_unix_events.py | 2 - crates/common/src/lock.rs | 31 +++++++ crates/vm/src/stdlib/imp.rs | 35 ++++++++ crates/vm/src/stdlib/posix.rs | 42 +++++++++- crates/vm/src/stdlib/thread.rs | 99 ++++++++++------------- crates/vm/src/vm/thread.rs | 15 ++-- 6 files changed, 157 insertions(+), 67 deletions(-) diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index 0faf32f79ea..520f5c733c3 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -1179,8 +1179,6 @@ async def runner(): wsock.close() -# TODO: RUSTPYTHON, fork() segfaults due to stale parking_lot global state -@unittest.skip("TODO: RUSTPYTHON") @support.requires_fork() class TestFork(unittest.TestCase): diff --git a/crates/common/src/lock.rs b/crates/common/src/lock.rs index ca5ffe8de37..4ca5a0329bc 100644 --- a/crates/common/src/lock.rs +++ b/crates/common/src/lock.rs @@ -40,3 +40,34 @@ pub type PyRwLockWriteGuard<'a, T> = RwLockWriteGuard<'a, RawRwLock, T>; pub type PyMappedRwLockWriteGuard<'a, T> = MappedRwLockWriteGuard<'a, RawRwLock, T>; // can add fn const_{mutex,rw_lock}() if necessary, but we probably won't need to + +/// Reset a `PyMutex` to its initial (unlocked) state after `fork()`. +/// +/// After `fork()`, locks held by dead parent threads would deadlock in the +/// child. This zeroes the raw lock bytes directly, bypassing the normal unlock +/// path which may interact with parking_lot's internal waiter queues. +/// +/// # Safety +/// +/// Must only be called from the single-threaded child process immediately +/// after `fork()`, before any other thread is created. +#[cfg(unix)] +pub unsafe fn reinit_mutex_after_fork(mutex: &PyMutex) { + // lock_api::Mutex layout: raw R at offset 0, then UnsafeCell. + // Zeroing R resets to unlocked for both parking_lot::RawMutex (AtomicU8) + // and RawCellMutex (Cell). + unsafe { + let ptr = mutex as *const PyMutex as *mut u8; + core::ptr::write_bytes(ptr, 0, core::mem::size_of::()); + } +} + +/// Return the current thread's parking_lot thread ID. +/// +/// This is the same ID stored in the `owner` field of `RawReentrantMutex` +/// when the current thread holds it. +#[cfg(all(unix, feature = "threading"))] +pub fn current_thread_id() -> core::num::NonZeroUsize { + use lock_api::GetThreadId; + RawThreadId.nonzero_thread_id() +} diff --git a/crates/vm/src/stdlib/imp.rs b/crates/vm/src/stdlib/imp.rs index 1c78e835a2d..e147eea127b 100644 --- a/crates/vm/src/stdlib/imp.rs +++ b/crates/vm/src/stdlib/imp.rs @@ -30,6 +30,41 @@ mod lock { fn lock_held(_vm: &VirtualMachine) -> bool { IMP_LOCK.is_locked() } + + /// Reset import lock after fork() — only if held by a dead thread. + /// + /// `IMP_LOCK` is a reentrant mutex. If the *current* (surviving) thread + /// held it at fork time, the child must be able to release it normally. + /// Only reset if a now-dead thread was the owner. + /// + /// # Safety + /// + /// Must only be called from single-threaded child after fork(). + #[cfg(unix)] + pub(crate) unsafe fn reinit_after_fork() { + use core::sync::atomic::{AtomicUsize, Ordering}; + + unsafe { + // RawReentrantMutex layout: owner: AtomicUsize at offset 0 + let owner_ptr = &IMP_LOCK as *const RawRMutex as *const AtomicUsize; + let owner = (*owner_ptr).load(Ordering::Relaxed); + + if owner != 0 { + let current = rustpython_common::lock::current_thread_id().get(); + if owner != current { + // Held by a dead thread — reset to unlocked + let ptr = &IMP_LOCK as *const RawRMutex as *mut u8; + core::ptr::write_bytes(ptr, 0, core::mem::size_of::()); + } + } + } + } +} + +/// Re-export for fork safety code in posix.rs +#[cfg(all(unix, feature = "threading"))] +pub(crate) unsafe fn reinit_imp_lock_after_fork() { + unsafe { lock::reinit_after_fork() } } #[cfg(not(feature = "threading"))] diff --git a/crates/vm/src/stdlib/posix.rs b/crates/vm/src/stdlib/posix.rs index 5bbfef0f93b..3fcb6a74b59 100644 --- a/crates/vm/src/stdlib/posix.rs +++ b/crates/vm/src/stdlib/posix.rs @@ -660,8 +660,13 @@ pub mod module { } fn py_os_after_fork_child(vm: &VirtualMachine) { - // Reset low-level state before any Python code runs in the child. - // Signal triggers from the parent must not fire in the child. + // Phase 1: Reset all internal locks FIRST. + // After fork(), locks held by dead parent threads would deadlock + // if we try to acquire them. This must happen before anything else. + #[cfg(feature = "threading")] + reinit_locks_after_fork(vm); + + // Phase 2: Reset low-level atomic state (no locks needed). crate::signal::clear_after_fork(); crate::stdlib::signal::_signal::clear_wakeup_fd_after_fork(); @@ -669,14 +674,45 @@ pub mod module { #[cfg(feature = "threading")] crate::object::reset_weakref_locks_after_fork(); - // Mark all other threads as done before running Python callbacks + // Phase 3: Clean up thread state. Locks are now reinit'd so we can + // acquire them normally instead of using try_lock(). #[cfg(feature = "threading")] crate::stdlib::thread::after_fork_child(vm); + // Phase 4: Run Python-level at-fork callbacks. let after_forkers_child: Vec = vm.state.after_forkers_child.lock().clone(); run_at_forkers(after_forkers_child, false, vm); } + /// Reset all parking_lot-based locks in the interpreter state after fork(). + /// + /// After fork(), only the calling thread survives. Any locks held by other + /// (now-dead) threads would cause deadlocks. We unconditionally reset them + /// to unlocked by zeroing the raw lock bytes, following CPython's + /// `_PyRuntimeState_ReInitThreads` pattern. + #[cfg(all(unix, feature = "threading"))] + fn reinit_locks_after_fork(vm: &VirtualMachine) { + use rustpython_common::lock::reinit_mutex_after_fork; + + unsafe { + // PyGlobalState PyMutex locks + reinit_mutex_after_fork(&vm.state.before_forkers); + reinit_mutex_after_fork(&vm.state.after_forkers_child); + reinit_mutex_after_fork(&vm.state.after_forkers_parent); + reinit_mutex_after_fork(&vm.state.atexit_funcs); + reinit_mutex_after_fork(&vm.state.global_trace_func); + reinit_mutex_after_fork(&vm.state.global_profile_func); + + // PyGlobalState parking_lot::Mutex locks (same type as PyMutex) + reinit_mutex_after_fork(&vm.state.thread_frames); + reinit_mutex_after_fork(&vm.state.thread_handles); + reinit_mutex_after_fork(&vm.state.shutdown_handles); + + // Import lock (RawReentrantMutex) + crate::stdlib::imp::reinit_imp_lock_after_fork(); + } + } + fn py_os_after_fork_parent(vm: &VirtualMachine) { let after_forkers_parent: Vec = vm.state.after_forkers_parent.lock().clone(); run_at_forkers(after_forkers_parent, false, vm); diff --git a/crates/vm/src/stdlib/thread.rs b/crates/vm/src/stdlib/thread.rs index 22457b3f17f..97fcb87c387 100644 --- a/crates/vm/src/stdlib/thread.rs +++ b/crates/vm/src/stdlib/thread.rs @@ -147,15 +147,9 @@ pub(crate) mod _thread { #[pymethod] fn _at_fork_reinit(&self, _vm: &VirtualMachine) -> PyResult<()> { - if self.mu.is_locked() { - unsafe { - self.mu.unlock(); - }; - } - // Casting to AtomicCell is as unsafe as CPython code. - // Using AtomicCell will prevent compiler optimizer move it to somewhere later unsafe place. - // It will be not under the cell anymore after init call. - + // Reset the mutex to unlocked by directly writing the INIT value. + // Do NOT call unlock() here — after fork(), unlock_slow() would + // try to unpark stale waiters from dead parent threads. let new_mut = RawMutex::INIT; unsafe { let old_mutex: &AtomicCell = core::mem::transmute(&self.mu); @@ -247,11 +241,9 @@ pub(crate) mod _thread { #[pymethod] fn _at_fork_reinit(&self, _vm: &VirtualMachine) -> PyResult<()> { - if self.mu.is_locked() { - unsafe { - self.mu.unlock(); - }; - } + // Reset the reentrant mutex to unlocked by directly writing INIT. + // Do NOT call unlock() — after fork(), the slow path would try + // to unpark stale waiters from dead parent threads. self.count.store(0, core::sync::atomic::Ordering::Relaxed); let new_mut = RawRMutex::INIT; unsafe { @@ -884,6 +876,9 @@ pub(crate) mod _thread { /// Called after fork() in child process to mark all other threads as done. /// This prevents join() from hanging on threads that don't exist in the child. + /// + /// Precondition: `reinit_locks_after_fork()` has already been called, so all + /// parking_lot-based locks in VmState are in unlocked state. #[cfg(unix)] pub fn after_fork_child(vm: &VirtualMachine) { let current_ident = get_ident(); @@ -891,29 +886,27 @@ pub(crate) mod _thread { // Update main thread ident - after fork, the current thread becomes the main thread vm.state.main_thread_ident.store(current_ident); - // Reinitialize frame slot for current thread + // Reinitialize frame slot for current thread. + // Locks are already reinit'd, so lock() is safe. crate::vm::thread::reinit_frame_slot_after_fork(vm); - // Clean up thread handles if we can acquire the lock. - // Use try_lock because the mutex might have been held during fork. - // If we can't acquire it, just skip - the child process will work - // correctly with new handles it creates. - if let Some(mut handles) = vm.state.thread_handles.try_lock() { - // Clean up dead weak refs and mark non-current threads as done + // Clean up thread handles. All VmState locks were reinit'd to unlocked, + // so lock() won't deadlock. Per-thread Arc> + // locks are also reinit'd below before use. + { + let mut handles = vm.state.thread_handles.lock(); handles.retain(|(inner_weak, done_event_weak): &HandleEntry| { let Some(inner) = inner_weak.upgrade() else { - return false; // Remove dead entries + return false; }; let Some(done_event) = done_event_weak.upgrade() else { return false; }; - // Try to lock the inner state - skip if we can't - let Some(mut inner_guard) = inner.try_lock() else { - return false; - }; + // Reinit this per-handle lock in case a dead thread held it + reinit_parking_lot_mutex(&inner); + let mut inner_guard = inner.lock(); - // Skip current thread and not-started threads if inner_guard.ident == current_ident { return true; } @@ -921,67 +914,63 @@ pub(crate) mod _thread { return true; } - // Mark as done and notify waiters inner_guard.state = ThreadHandleState::Done; - inner_guard.join_handle = None; // Can't join OS thread from child + inner_guard.join_handle = None; drop(inner_guard); - // Try to notify waiters - skip if we can't acquire the lock + // Reinit and set the done event let (lock, cvar) = &*done_event; - if let Some(mut done) = lock.try_lock() { - *done = true; - cvar.notify_all(); - } + reinit_parking_lot_mutex(lock); + *lock.lock() = true; + cvar.notify_all(); true }); } - // Clean up shutdown_handles as well. - // This is critical to prevent _shutdown() from waiting on threads - // that don't exist in the child process after fork. - if let Some(mut handles) = vm.state.shutdown_handles.try_lock() { - // Mark all non-current threads as done in shutdown_handles + // Clean up shutdown_handles. + { + let mut handles = vm.state.shutdown_handles.lock(); handles.retain(|(inner_weak, done_event_weak): &ShutdownEntry| { let Some(inner) = inner_weak.upgrade() else { - return false; // Remove dead entries + return false; }; let Some(done_event) = done_event_weak.upgrade() else { return false; }; - // Try to lock the inner state - skip if we can't - let Some(mut inner_guard) = inner.try_lock() else { - return false; - }; + reinit_parking_lot_mutex(&inner); + let mut inner_guard = inner.lock(); - // Skip current thread if inner_guard.ident == current_ident { return true; } - - // Keep handles for threads that have not been started yet. - // They are safe to start in the child process. if inner_guard.state == ThreadHandleState::NotStarted { return true; } - // Mark as done so _shutdown() won't wait on it inner_guard.state = ThreadHandleState::Done; drop(inner_guard); - // Notify waiters let (lock, cvar) = &*done_event; - if let Some(mut done) = lock.try_lock() { - *done = true; - cvar.notify_all(); - } + reinit_parking_lot_mutex(lock); + *lock.lock() = true; + cvar.notify_all(); - false // Remove from shutdown_handles - these threads don't exist in child + false }); } } + /// Reset a parking_lot::Mutex to unlocked state after fork. + #[cfg(unix)] + fn reinit_parking_lot_mutex(mutex: &parking_lot::Mutex) { + unsafe { + let ptr = mutex as *const parking_lot::Mutex as *mut u8; + core::ptr::write_bytes(ptr, 0, core::mem::size_of::()); + } + } + // Thread handle state enum #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ThreadHandleState { diff --git a/crates/vm/src/vm/thread.rs b/crates/vm/src/vm/thread.rs index fb8621d1526..376daa83707 100644 --- a/crates/vm/src/vm/thread.rs +++ b/crates/vm/src/vm/thread.rs @@ -86,19 +86,20 @@ pub fn cleanup_current_thread_frames(vm: &VirtualMachine) { /// Reinitialize frame slot after fork. Called in child process. /// Creates a fresh slot and registers it for the current thread. +/// +/// Precondition: `reinit_locks_after_fork()` has already reset all +/// VmState locks to unlocked. #[cfg(feature = "threading")] pub fn reinit_frame_slot_after_fork(vm: &VirtualMachine) { let current_ident = crate::stdlib::thread::get_ident(); let new_slot = Arc::new(parking_lot::Mutex::new(None)); - // Try to update the global registry. If we can't get the lock - // (parent thread might have been holding it during fork), skip. - if let Some(mut registry) = vm.state.thread_frames.try_lock() { - registry.clear(); - registry.insert(current_ident, new_slot.clone()); - } + // Lock is safe: reinit_locks_after_fork() already reset it to unlocked. + let mut registry = vm.state.thread_frames.lock(); + registry.clear(); + registry.insert(current_ident, new_slot.clone()); + drop(registry); - // Always update thread-local to point to the new slot CURRENT_FRAME_SLOT.with(|s| { *s.borrow_mut() = Some(new_slot); });