rune/
channel_manager.rs

1//! Channel Manager - manages all channel state in a dedicated garbage-collected block
2//!
3//! This module provides a singleton ChannelManager that
4//! can be accessed from multiple threads.
5//! It has its own Block and manages its roots manually using the Heap.
6//! All channel operations go through this manager, allowing proper garbage
7//! collection of in-transit objects.
8
9use crate::core::{
10    gc::{__HeapRoot, Block, Context, Slot, ThreadSafeRootSet, collect_garbage_raw},
11    object::{CloneIn, Object},
12};
13use std::sync::OnceLock;
14use std::{
15    collections::{HashMap, VecDeque},
16    sync::{
17        Arc, Condvar, Mutex, RwLock,
18        atomic::{AtomicU64, AtomicUsize, Ordering},
19    },
20};
21
22/// Unique identifier for a channel
23pub(crate) type ChannelId = u64;
24
25/// GC state for the channel manager, all guarded by a single Mutex to prevent deadlocks
26struct GcState {
27    /// The block where all in-transit objects are stored
28    block: Block<false>,
29    /// Static reference to the root set (points into gc_state.roots)
30    roots_static: &'static ThreadSafeRootSet,
31    /// Next GC threshold (in bytes)
32    next_limit: usize,
33}
34
35impl GcState {
36    fn new(roots_static: &'static ThreadSafeRootSet) -> Self {
37        Self {
38            block: Block::new_local_unchecked(),
39            roots_static,
40            next_limit: Context::MIN_GC_BYTES,
41        }
42    }
43}
44
45/// Data for a single channel stored in the manager
46struct ChannelData {
47    /// Queue of objects rooted via HeapRoot
48    queue: VecDeque<__HeapRoot<Slot<Object<'static>>>>,
49    /// Maximum capacity of the channel
50    capacity: usize,
51    /// Number of active senders
52    sender_count: usize,
53    /// Number of active receivers
54    receiver_count: usize,
55    /// Whether senders have been explicitly closed
56    sender_closed: bool,
57    /// Whether receivers have been explicitly closed
58    receiver_closed: bool,
59}
60
61impl ChannelData {
62    fn new(capacity: usize) -> Self {
63        Self {
64            queue: VecDeque::with_capacity(capacity),
65            capacity,
66            sender_count: 0,
67            receiver_count: 0,
68            sender_closed: false,
69            receiver_closed: false,
70        }
71    }
72
73    fn is_empty(&self) -> bool {
74        self.queue.is_empty()
75    }
76
77    fn is_full(&self) -> bool {
78        self.queue.len() >= self.capacity
79    }
80}
81
82/// State for a single channel, with synchronization
83struct ChannelState {
84    data: Mutex<ChannelData>,
85    /// Condvar to wake senders waiting for space
86    not_full: Condvar,
87    /// Condvar to wake receivers waiting for data
88    not_empty: Condvar,
89}
90
91impl ChannelState {
92    fn new(capacity: usize) -> Self {
93        Self {
94            data: Mutex::new(ChannelData::new(capacity)),
95            not_full: Condvar::new(),
96            not_empty: Condvar::new(),
97        }
98    }
99}
100
101/// The channel manager state
102pub(crate) struct ChannelManager {
103    /// GC state (block, roots, and next_limit) all guarded by a single Mutex
104    /// We also leak a static reference to the RootSet for HeapRoot creation
105    gc_state: Mutex<GcState>,
106    /// Ensures mutual exclusion between GC and channel operations.
107    /// Channel operations acquire read lock when accessing rooted data,
108    /// GC acquires write lock for exclusive access during trace.
109    gc_access: RwLock<()>,
110    /// All channels managed by this manager
111    channels: RwLock<HashMap<ChannelId, Arc<ChannelState>>>,
112    /// Counter for generating unique channel IDs
113    next_channel_id: AtomicU64,
114    /// Counter for receives since last GC
115    recv_count: AtomicUsize,
116    /// Trigger GC after this many receives
117    gc_threshold: usize,
118}
119
120impl ChannelManager {
121    fn new() -> Self {
122        // It is safe to leak the rootset here, `ChannelManager` is intended
123        // to be lazily instantiated once for the whole remaining lifetime of
124        // the program
125        let roots_static: &'static ThreadSafeRootSet =
126            Box::leak(Box::new(ThreadSafeRootSet::default()));
127
128        Self {
129            gc_state: Mutex::new(GcState::new(roots_static)),
130            gc_access: RwLock::new(()),
131            channels: RwLock::new(HashMap::new()),
132            next_channel_id: AtomicU64::new(1),
133            recv_count: AtomicUsize::new(0),
134            gc_threshold: 10,
135        }
136    }
137
138    /// Create a new channel and return its ID
139    fn create_channel(&self, capacity: usize) -> ChannelId {
140        let id = self.next_channel_id.fetch_add(1, Ordering::SeqCst);
141        let mut channels = self.channels.write().unwrap();
142        channels.insert(id, Arc::new(ChannelState::new(capacity)));
143        id
144    }
145
146    /// Increment sender count for a channel
147    pub(crate) fn increment_sender(&self, channel_id: ChannelId) {
148        let channels = self.channels.read().unwrap();
149        if let Some(state) = channels.get(&channel_id) {
150            let mut data = state.data.lock().unwrap();
151            data.sender_count += 1;
152        }
153    }
154
155    /// Increment receiver count for a channel
156    pub(crate) fn increment_receiver(&self, channel_id: ChannelId) {
157        let channels = self.channels.read().unwrap();
158        if let Some(state) = channels.get(&channel_id) {
159            let mut data = state.data.lock().unwrap();
160            data.receiver_count += 1;
161        }
162    }
163
164    /// Try to send an object to a channel (non-blocking)
165    pub(crate) fn try_send<'ob>(
166        &self,
167        channel_id: ChannelId,
168        obj: Object<'ob>,
169    ) -> Result<(), SendError> {
170        let channels = self.channels.read().unwrap();
171        let state = channels.get(&channel_id).ok_or(SendError::Closed)?;
172        let state = Arc::clone(state);
173        drop(channels);
174
175        let mut data = state.data.lock().unwrap();
176
177        if data.receiver_count == 0 || data.receiver_closed {
178            return Err(SendError::Closed);
179        }
180
181        if data.is_full() {
182            return Err(SendError::Full);
183        }
184
185        // Clone the object into the manager's block
186        let _gc_guard = self.gc_access.read().unwrap();
187        let gc_state = self.gc_state.lock().unwrap();
188        let cloned = unsafe {
189            let obj_static = obj.clone_in(&gc_state.block);
190            std::mem::transmute::<Object<'_>, Object<'static>>(obj_static)
191        };
192        // Wrap in HeapRoot to protect from GC
193        let rooted = __HeapRoot::new(Slot::new(cloned), gc_state.roots_static);
194        drop(gc_state);
195        drop(_gc_guard);
196
197        data.queue.push_back(rooted);
198
199        state.not_empty.notify_one();
200
201        Ok(())
202    }
203
204    /// Send an object to a channel (blocking)
205    pub(crate) fn send<'ob>(
206        &self,
207        channel_id: ChannelId,
208        obj: Object<'ob>,
209    ) -> Result<(), SendError> {
210        let channels = self.channels.read().unwrap();
211        let state = channels.get(&channel_id).ok_or(SendError::Closed)?;
212        let state = Arc::clone(state);
213        drop(channels);
214
215        let mut data = state.data.lock().unwrap();
216
217        while data.is_full() {
218            if data.receiver_count == 0 || data.receiver_closed {
219                return Err(SendError::Closed);
220            }
221            data = state.not_full.wait(data).unwrap();
222        }
223
224        if data.receiver_count == 0 || data.receiver_closed {
225            return Err(SendError::Closed);
226        }
227
228        // Clone the object into the manager's block
229        let _gc_guard = self.gc_access.read().unwrap();
230        let gc_state = self.gc_state.lock().unwrap();
231        let cloned = unsafe {
232            let obj_static = obj.clone_in(&gc_state.block);
233            std::mem::transmute::<Object<'_>, Object<'static>>(obj_static)
234        };
235        // Wrap in HeapRoot to protect from GC
236        let rooted = __HeapRoot::new(Slot::new(cloned), gc_state.roots_static);
237        drop(gc_state);
238        drop(_gc_guard);
239
240        data.queue.push_back(rooted);
241
242        state.not_empty.notify_one();
243
244        Ok(())
245    }
246
247    /// Try to receive an object from a channel (non-blocking)
248    pub(crate) fn try_recv<'ob>(
249        &self,
250        channel_id: ChannelId,
251        target_block: &'ob Block<false>,
252    ) -> Result<Object<'ob>, RecvError> {
253        let channels = self.channels.read().unwrap();
254        let state = channels.get(&channel_id).ok_or(RecvError::Closed)?;
255        let state = Arc::clone(state);
256        drop(channels);
257
258        let mut data = state.data.lock().unwrap();
259
260        if let Some(rooted) = data.queue.pop_front() {
261            let _gc_guard = self.gc_access.read().unwrap();
262            let obj: Object<'static> = rooted.get_inner();
263            let result = obj.clone_in(target_block);
264            drop(_gc_guard);
265
266            // HeapRoot is dropped here, unrooting the object
267            drop(rooted);
268            drop(data);
269
270            state.not_full.notify_one();
271
272            let recv_count = self.recv_count.fetch_add(1, Ordering::SeqCst) + 1;
273            if recv_count >= self.gc_threshold {
274                self.recv_count.store(0, Ordering::SeqCst);
275                self.collect_garbage();
276            }
277
278            Ok(result)
279        } else if (data.sender_count == 0 || data.sender_closed) && data.is_empty() {
280            Err(RecvError::Closed)
281        } else {
282            Err(RecvError::Empty)
283        }
284    }
285
286    /// Receive an object from a channel (blocking)
287    pub(crate) fn recv<'ob>(
288        &self,
289        channel_id: ChannelId,
290        target_block: &'ob Block<false>,
291    ) -> Result<Object<'ob>, RecvError> {
292        let channels = self.channels.read().unwrap();
293        let state = channels.get(&channel_id).ok_or(RecvError::Closed)?;
294        let state = Arc::clone(state);
295        drop(channels);
296
297        let mut data = state.data.lock().unwrap();
298
299        while data.is_empty() {
300            if data.sender_count == 0 || data.sender_closed {
301                return Err(RecvError::Closed);
302            }
303            data = state.not_empty.wait(data).unwrap();
304        }
305
306        if let Some(rooted) = data.queue.pop_front() {
307            let _gc_guard = self.gc_access.read().unwrap();
308            let obj: Object<'static> = rooted.get_inner();
309            let result = obj.clone_in(target_block);
310            drop(_gc_guard);
311
312            // HeapRoot is dropped here, unrooting the object
313            drop(rooted);
314            drop(data);
315
316            state.not_full.notify_one();
317
318            let recv_count = self.recv_count.fetch_add(1, Ordering::SeqCst) + 1;
319            if recv_count >= self.gc_threshold {
320                self.recv_count.store(0, Ordering::SeqCst);
321                self.collect_garbage();
322            }
323
324            Ok(result)
325        } else {
326            Err(RecvError::Closed)
327        }
328    }
329
330    /// Close the sender side of a channel
331    pub(crate) fn close_sender(&self, channel_id: ChannelId) {
332        let channels = self.channels.read().unwrap();
333        if let Some(state) = channels.get(&channel_id) {
334            let mut data = state.data.lock().unwrap();
335            if data.sender_count > 0 {
336                data.sender_count -= 1;
337            }
338            if data.sender_count == 0 {
339                data.sender_closed = true;
340                drop(data);
341                state.not_empty.notify_all();
342            }
343        }
344    }
345
346    /// Close the receiver side of a channel
347    pub(crate) fn close_receiver(&self, channel_id: ChannelId) {
348        let channels = self.channels.read().unwrap();
349        if let Some(state) = channels.get(&channel_id) {
350            let mut data = state.data.lock().unwrap();
351            if data.receiver_count > 0 {
352                data.receiver_count -= 1;
353            }
354            if data.receiver_count == 0 {
355                data.receiver_closed = true;
356                // Clear the queue (HeapRoots will be dropped and unrooted)
357                let _gc_guard = self.gc_access.read().unwrap();
358                data.queue.clear();
359                drop(_gc_guard);
360                drop(data);
361                state.not_full.notify_all();
362            }
363        }
364    }
365
366    /// Trigger garbage collection in the manager's block
367    fn collect_garbage(&self) {
368        let _gc_guard = self.gc_access.write().unwrap();
369        let mut gc_state = self.gc_state.lock().unwrap();
370
371        // SAFETY: We have exclusive access via both locks:
372        // - gc_access write lock ensures no concurrent HeapRoot access
373        // - gc_state mutex ensures exclusive access to Block's RefCells
374        unsafe {
375            let GcState { ref mut block, ref mut next_limit, roots_static } = *gc_state;
376            collect_garbage_raw(block, roots_static, false, next_limit);
377        }
378    }
379
380    /// Remove a channel that has no more senders or receivers
381    pub(crate) fn cleanup_channel(&self, channel_id: ChannelId) {
382        let channels = self.channels.read().unwrap();
383        let should_remove = if let Some(state) = channels.get(&channel_id) {
384            let data = state.data.lock().unwrap();
385            data.sender_count == 0 && data.receiver_count == 0
386        } else {
387            false
388        };
389        drop(channels);
390
391        if should_remove {
392            let mut channels = self.channels.write().unwrap();
393            channels.remove(&channel_id);
394        }
395    }
396}
397
398// SAFETY: ChannelManager is safe to Send/Sync across threads because:
399// 1. All state is protected by Mutex or is Atomic
400// 2. The Block<false> contains RefCell fields (drop_stack, lisp_hashtables) but:
401//    - These are only accessed during GC, which holds the gc_state mutex
402//    - The mutex ensures no concurrent access to these RefCells
403//    - No references to Block internals escape the mutex guard
404// 3. The ThreadSafeRootSet uses Mutex internally and is 'static
405// 4. HeapRoot instances in channel queues are Send/Sync
406// 5. All public methods are designed to be called concurrently from multiple threads
407
408unsafe impl Send for ChannelManager {}
409unsafe impl Sync for ChannelManager {}
410
411/// Global singleton channel manager
412static CHANNEL_MANAGER: OnceLock<Arc<ChannelManager>> = OnceLock::new();
413
414/// Get a reference to the global channel manager
415pub(crate) fn get_manager() -> Arc<ChannelManager> {
416    Arc::clone(CHANNEL_MANAGER.get_or_init(|| Arc::new(ChannelManager::new())))
417}
418
419/// Error type for send operations
420#[derive(Debug, Clone, Copy, PartialEq, Eq)]
421pub(crate) enum SendError {
422    /// Receiver has been dropped
423    Closed,
424    /// Channel is full (only for try_send)
425    Full,
426    /// Send operation timed out
427    Timeout,
428}
429
430/// Error type for receive operations
431#[derive(Debug, Clone, Copy, PartialEq, Eq)]
432pub(crate) enum RecvError {
433    /// All senders have been dropped
434    Closed,
435    /// Channel is empty (only for try_recv)
436    Empty,
437    /// Receive operation timed out
438    Timeout,
439}
440
441// Public API for channel operations
442impl ChannelManager {
443    pub(crate) fn new_channel_pair(&self, capacity: usize) -> (ChannelId, ChannelId) {
444        let id = self.create_channel(capacity);
445        (id, id)
446    }
447}