rune/
channels.rs

1//! Channel operations for CSP-style concurrency.
2
3use crate::core::{
4    cons::Cons,
5    gc::Context,
6    object::{
7        ChannelReceiver, ChannelSender, Gc, IntoObject, NIL, Object, Symbol, make_channel_pair,
8    },
9};
10use rune_macros::defun;
11use std::time::Duration;
12
13#[defun]
14fn make_channel<'ob>(capacity: Option<i64>, cx: &'ob Context) -> Object<'ob> {
15    let cap = capacity.unwrap_or(1).max(1) as usize;
16    let (sender, receiver) = make_channel_pair(cap);
17    // Return (sender . receiver) as a cons pair
18    Cons::new(sender.into_obj(&cx.block), receiver.into_obj(&cx.block), cx).into()
19}
20
21#[defun]
22fn channel_send<'ob>(sender: Gc<&ChannelSender>, obj: Object<'ob>) -> Object<'ob> {
23    match sender.send(obj) {
24        Ok(()) => NIL,
25        Err(e) => Symbol::from(e).into(),
26    }
27}
28
29#[defun]
30fn channel_recv<'ob>(receiver: Gc<&ChannelReceiver>, cx: &'ob Context) -> Object<'ob> {
31    match receiver.recv(cx) {
32        Ok(obj) => obj,
33        Err(e) => Symbol::from(e).into(),
34    }
35}
36
37#[defun]
38fn channel_try_send<'ob>(sender: Gc<&ChannelSender>, obj: Object<'ob>) -> Object<'ob> {
39    match sender.try_send(obj) {
40        Ok(()) => NIL,
41        Err(e) => Symbol::from(e).into(),
42    }
43}
44
45#[defun]
46fn channel_try_recv<'ob>(receiver: Gc<&ChannelReceiver>, cx: &'ob Context) -> Object<'ob> {
47    match receiver.try_recv(cx) {
48        Ok(obj) => obj,
49        Err(e) => Symbol::from(e).into(),
50    }
51}
52
53#[defun]
54fn channel_send_timeout<'ob>(
55    sender: Gc<&ChannelSender>,
56    obj: Object<'ob>,
57    timeout_secs: f64,
58) -> Object<'ob> {
59    let timeout = Duration::from_secs_f64(timeout_secs.max(0.0));
60    match sender.send_timeout(obj, timeout) {
61        Ok(()) => NIL,
62        Err(e) => Symbol::from(e).into(),
63    }
64}
65
66#[defun]
67fn channel_recv_timeout<'ob>(
68    receiver: Gc<&ChannelReceiver>,
69    timeout_secs: f64,
70    cx: &'ob Context,
71) -> Object<'ob> {
72    let timeout = Duration::from_secs_f64(timeout_secs.max(0.0));
73    match receiver.recv_timeout(cx, timeout) {
74        Ok(obj) => obj,
75        Err(e) => Symbol::from(e).into(),
76    }
77}
78
79/// Close the sender side of a channel.
80///
81/// After closing, no more values can be sent through this sender.
82/// Any receivers waiting on this channel will be woken up.
83/// If all senders are closed or dropped, receivers will get 'channel-closed.
84#[defun]
85fn channel_sender_close<'ob>(sender: Gc<&ChannelSender>) -> Object<'ob> {
86    sender.close();
87    NIL
88}
89
90/// Close the receiver side of a channel.
91///
92/// After closing, no more values can be received from this receiver.
93/// This is useful for "moving" a receiver to another thread by cloning
94/// it into a new context and then closing the original.
95///
96/// This is idempotent - calling it multiple times has no additional effect.
97#[defun]
98fn channel_receiver_close<'ob>(receiver: Gc<&ChannelReceiver>) -> Object<'ob> {
99    receiver.close();
100    NIL
101}