1use crate::core::{
10 gc::{__HeapRoot, Block, Context, Slot, ThreadSafeRootSet, collect_garbage_raw},
11 object::{CloneIn, Object},
12};
13use std::sync::OnceLock;
14use std::{
15 collections::{HashMap, VecDeque},
16 sync::{
17 Arc, Condvar, Mutex, RwLock,
18 atomic::{AtomicU64, AtomicUsize, Ordering},
19 },
20};
21
22pub(crate) type ChannelId = u64;
24
25struct GcState {
27 block: Block<false>,
29 roots_static: &'static ThreadSafeRootSet,
31 next_limit: usize,
33}
34
35impl GcState {
36 fn new(roots_static: &'static ThreadSafeRootSet) -> Self {
37 Self {
38 block: Block::new_local_unchecked(),
39 roots_static,
40 next_limit: Context::MIN_GC_BYTES,
41 }
42 }
43}
44
45struct ChannelData {
47 queue: VecDeque<__HeapRoot<Slot<Object<'static>>>>,
49 capacity: usize,
51 sender_count: usize,
53 receiver_count: usize,
55 sender_closed: bool,
57 receiver_closed: bool,
59}
60
61impl ChannelData {
62 fn new(capacity: usize) -> Self {
63 Self {
64 queue: VecDeque::with_capacity(capacity),
65 capacity,
66 sender_count: 0,
67 receiver_count: 0,
68 sender_closed: false,
69 receiver_closed: false,
70 }
71 }
72
73 fn is_empty(&self) -> bool {
74 self.queue.is_empty()
75 }
76
77 fn is_full(&self) -> bool {
78 self.queue.len() >= self.capacity
79 }
80}
81
82struct ChannelState {
84 data: Mutex<ChannelData>,
85 not_full: Condvar,
87 not_empty: Condvar,
89}
90
91impl ChannelState {
92 fn new(capacity: usize) -> Self {
93 Self {
94 data: Mutex::new(ChannelData::new(capacity)),
95 not_full: Condvar::new(),
96 not_empty: Condvar::new(),
97 }
98 }
99}
100
101pub(crate) struct ChannelManager {
103 gc_state: Mutex<GcState>,
106 gc_access: RwLock<()>,
110 channels: RwLock<HashMap<ChannelId, Arc<ChannelState>>>,
112 next_channel_id: AtomicU64,
114 recv_count: AtomicUsize,
116 gc_threshold: usize,
118}
119
120impl ChannelManager {
121 fn new() -> Self {
122 let roots_static: &'static ThreadSafeRootSet =
126 Box::leak(Box::new(ThreadSafeRootSet::default()));
127
128 Self {
129 gc_state: Mutex::new(GcState::new(roots_static)),
130 gc_access: RwLock::new(()),
131 channels: RwLock::new(HashMap::new()),
132 next_channel_id: AtomicU64::new(1),
133 recv_count: AtomicUsize::new(0),
134 gc_threshold: 10,
135 }
136 }
137
138 fn create_channel(&self, capacity: usize) -> ChannelId {
140 let id = self.next_channel_id.fetch_add(1, Ordering::SeqCst);
141 let mut channels = self.channels.write().unwrap();
142 channels.insert(id, Arc::new(ChannelState::new(capacity)));
143 id
144 }
145
146 pub(crate) fn increment_sender(&self, channel_id: ChannelId) {
148 let channels = self.channels.read().unwrap();
149 if let Some(state) = channels.get(&channel_id) {
150 let mut data = state.data.lock().unwrap();
151 data.sender_count += 1;
152 }
153 }
154
155 pub(crate) fn increment_receiver(&self, channel_id: ChannelId) {
157 let channels = self.channels.read().unwrap();
158 if let Some(state) = channels.get(&channel_id) {
159 let mut data = state.data.lock().unwrap();
160 data.receiver_count += 1;
161 }
162 }
163
164 pub(crate) fn try_send<'ob>(
166 &self,
167 channel_id: ChannelId,
168 obj: Object<'ob>,
169 ) -> Result<(), SendError> {
170 let channels = self.channels.read().unwrap();
171 let state = channels.get(&channel_id).ok_or(SendError::Closed)?;
172 let state = Arc::clone(state);
173 drop(channels);
174
175 let mut data = state.data.lock().unwrap();
176
177 if data.receiver_count == 0 || data.receiver_closed {
178 return Err(SendError::Closed);
179 }
180
181 if data.is_full() {
182 return Err(SendError::Full);
183 }
184
185 let _gc_guard = self.gc_access.read().unwrap();
187 let gc_state = self.gc_state.lock().unwrap();
188 let cloned = unsafe {
189 let obj_static = obj.clone_in(&gc_state.block);
190 std::mem::transmute::<Object<'_>, Object<'static>>(obj_static)
191 };
192 let rooted = __HeapRoot::new(Slot::new(cloned), gc_state.roots_static);
194 drop(gc_state);
195 drop(_gc_guard);
196
197 data.queue.push_back(rooted);
198
199 state.not_empty.notify_one();
200
201 Ok(())
202 }
203
204 pub(crate) fn send<'ob>(
206 &self,
207 channel_id: ChannelId,
208 obj: Object<'ob>,
209 ) -> Result<(), SendError> {
210 let channels = self.channels.read().unwrap();
211 let state = channels.get(&channel_id).ok_or(SendError::Closed)?;
212 let state = Arc::clone(state);
213 drop(channels);
214
215 let mut data = state.data.lock().unwrap();
216
217 while data.is_full() {
218 if data.receiver_count == 0 || data.receiver_closed {
219 return Err(SendError::Closed);
220 }
221 data = state.not_full.wait(data).unwrap();
222 }
223
224 if data.receiver_count == 0 || data.receiver_closed {
225 return Err(SendError::Closed);
226 }
227
228 let _gc_guard = self.gc_access.read().unwrap();
230 let gc_state = self.gc_state.lock().unwrap();
231 let cloned = unsafe {
232 let obj_static = obj.clone_in(&gc_state.block);
233 std::mem::transmute::<Object<'_>, Object<'static>>(obj_static)
234 };
235 let rooted = __HeapRoot::new(Slot::new(cloned), gc_state.roots_static);
237 drop(gc_state);
238 drop(_gc_guard);
239
240 data.queue.push_back(rooted);
241
242 state.not_empty.notify_one();
243
244 Ok(())
245 }
246
247 pub(crate) fn try_recv<'ob>(
249 &self,
250 channel_id: ChannelId,
251 target_block: &'ob Block<false>,
252 ) -> Result<Object<'ob>, RecvError> {
253 let channels = self.channels.read().unwrap();
254 let state = channels.get(&channel_id).ok_or(RecvError::Closed)?;
255 let state = Arc::clone(state);
256 drop(channels);
257
258 let mut data = state.data.lock().unwrap();
259
260 if let Some(rooted) = data.queue.pop_front() {
261 let _gc_guard = self.gc_access.read().unwrap();
262 let obj: Object<'static> = rooted.get_inner();
263 let result = obj.clone_in(target_block);
264 drop(_gc_guard);
265
266 drop(rooted);
268 drop(data);
269
270 state.not_full.notify_one();
271
272 let recv_count = self.recv_count.fetch_add(1, Ordering::SeqCst) + 1;
273 if recv_count >= self.gc_threshold {
274 self.recv_count.store(0, Ordering::SeqCst);
275 self.collect_garbage();
276 }
277
278 Ok(result)
279 } else if (data.sender_count == 0 || data.sender_closed) && data.is_empty() {
280 Err(RecvError::Closed)
281 } else {
282 Err(RecvError::Empty)
283 }
284 }
285
286 pub(crate) fn recv<'ob>(
288 &self,
289 channel_id: ChannelId,
290 target_block: &'ob Block<false>,
291 ) -> Result<Object<'ob>, RecvError> {
292 let channels = self.channels.read().unwrap();
293 let state = channels.get(&channel_id).ok_or(RecvError::Closed)?;
294 let state = Arc::clone(state);
295 drop(channels);
296
297 let mut data = state.data.lock().unwrap();
298
299 while data.is_empty() {
300 if data.sender_count == 0 || data.sender_closed {
301 return Err(RecvError::Closed);
302 }
303 data = state.not_empty.wait(data).unwrap();
304 }
305
306 if let Some(rooted) = data.queue.pop_front() {
307 let _gc_guard = self.gc_access.read().unwrap();
308 let obj: Object<'static> = rooted.get_inner();
309 let result = obj.clone_in(target_block);
310 drop(_gc_guard);
311
312 drop(rooted);
314 drop(data);
315
316 state.not_full.notify_one();
317
318 let recv_count = self.recv_count.fetch_add(1, Ordering::SeqCst) + 1;
319 if recv_count >= self.gc_threshold {
320 self.recv_count.store(0, Ordering::SeqCst);
321 self.collect_garbage();
322 }
323
324 Ok(result)
325 } else {
326 Err(RecvError::Closed)
327 }
328 }
329
330 pub(crate) fn close_sender(&self, channel_id: ChannelId) {
332 let channels = self.channels.read().unwrap();
333 if let Some(state) = channels.get(&channel_id) {
334 let mut data = state.data.lock().unwrap();
335 if data.sender_count > 0 {
336 data.sender_count -= 1;
337 }
338 if data.sender_count == 0 {
339 data.sender_closed = true;
340 drop(data);
341 state.not_empty.notify_all();
342 }
343 }
344 }
345
346 pub(crate) fn close_receiver(&self, channel_id: ChannelId) {
348 let channels = self.channels.read().unwrap();
349 if let Some(state) = channels.get(&channel_id) {
350 let mut data = state.data.lock().unwrap();
351 if data.receiver_count > 0 {
352 data.receiver_count -= 1;
353 }
354 if data.receiver_count == 0 {
355 data.receiver_closed = true;
356 let _gc_guard = self.gc_access.read().unwrap();
358 data.queue.clear();
359 drop(_gc_guard);
360 drop(data);
361 state.not_full.notify_all();
362 }
363 }
364 }
365
366 fn collect_garbage(&self) {
368 let _gc_guard = self.gc_access.write().unwrap();
369 let mut gc_state = self.gc_state.lock().unwrap();
370
371 unsafe {
375 let GcState { ref mut block, ref mut next_limit, roots_static } = *gc_state;
376 collect_garbage_raw(block, roots_static, false, next_limit);
377 }
378 }
379
380 pub(crate) fn cleanup_channel(&self, channel_id: ChannelId) {
382 let channels = self.channels.read().unwrap();
383 let should_remove = if let Some(state) = channels.get(&channel_id) {
384 let data = state.data.lock().unwrap();
385 data.sender_count == 0 && data.receiver_count == 0
386 } else {
387 false
388 };
389 drop(channels);
390
391 if should_remove {
392 let mut channels = self.channels.write().unwrap();
393 channels.remove(&channel_id);
394 }
395 }
396}
397
398unsafe impl Send for ChannelManager {}
409unsafe impl Sync for ChannelManager {}
410
411static CHANNEL_MANAGER: OnceLock<Arc<ChannelManager>> = OnceLock::new();
413
414pub(crate) fn get_manager() -> Arc<ChannelManager> {
416 Arc::clone(CHANNEL_MANAGER.get_or_init(|| Arc::new(ChannelManager::new())))
417}
418
419#[derive(Debug, Clone, Copy, PartialEq, Eq)]
421pub(crate) enum SendError {
422 Closed,
424 Full,
426 Timeout,
428}
429
430#[derive(Debug, Clone, Copy, PartialEq, Eq)]
432pub(crate) enum RecvError {
433 Closed,
435 Empty,
437 Timeout,
439}
440
441impl ChannelManager {
443 pub(crate) fn new_channel_pair(&self, capacity: usize) -> (ChannelId, ChannelId) {
444 let id = self.create_channel(capacity);
445 (id, id)
446 }
447}