rune/
threads.rs

1//! Multi-threaded elisp support.
2//!
3//! The `go` function spawns a new thread to evaluate an expression and returns
4//! a channel receiver that will eventually contain the result.
5//!
6//! # Result Format
7//!
8//! - On success: The receiver contains the evaluation result
9//! - On error: The receiver contains `('thread-errored . "error message")`
10//!
11//! # Usage from Elisp
12//!
13//! ```elisp
14//! ;; Successful evaluation
15//! (let ((receiver (go (+ 1 2))))
16//!   (channel-recv receiver))  ; Returns 3
17//!
18//! ;; Error handling with pcase
19//! (let ((receiver (go (undefined-function))))
20//!   (pcase (channel-recv receiver)
21//!     (`(thread-errored . ,msg)
22//!      (message "Thread error: %s" msg))
23//!     (result
24//!      (message "Got result: %s" result))))
25//!
26//! ;; Discard result (fire and forget)
27//! (go (message "Background task"))
28//! ```
29
30use crate::core::{
31    cons::Cons,
32    env::{Env, sym},
33    gc::{Block, Context, RootSet},
34    object::{IntoObject, Object, make_channel_pair},
35};
36use rune_core::macros::root;
37use rune_macros::defun;
38use std::thread;
39
40#[defun]
41fn go<'ob>(obj: Object, cx: &'ob Context) -> Object<'ob> {
42    go_internal(obj, cx)
43}
44
45fn go_internal<'ob>(obj: Object, cx: &'ob Context) -> Object<'ob> {
46    // Create a rendezvous channel (capacity 1) for the result
47    let (sender, receiver) = make_channel_pair(1);
48
49    let (init_sender, init_receiver) = make_channel_pair(1);
50    init_sender.send(obj).expect("channel is immediately used after creation");
51    init_sender.close();
52
53    // Spawn thread to evaluate expression
54    crate::debug::enable_debug();
55    thread::spawn(move || {
56        // Create block inside the thread
57        let block = Block::new_local_unchecked();
58        let roots = &RootSet::default();
59        let cx = &mut Context::from_block(block, roots);
60        root!(env, new(Env), cx);
61
62        // Reconstruct the object from raw
63        let obj = init_receiver.recv(cx).expect("go_internal parent thread waited on the send operation in this channel before spawning the thread");
64        init_receiver.close();
65        root!(obj, cx);
66
67        // Evaluate expression and create result
68        let result = match crate::interpreter::eval(obj, None, env, cx) {
69            Ok(value) => value,
70            Err(err) => {
71                // Create ('thread-errored . "error message") cons pair
72                let error_msg = cx.add(err.to_string());
73                Cons::new(sym::THREAD_ERRORED, error_msg, cx).into()
74            }
75        };
76
77        // Send result through the channel
78        // The result is already in the thread's context (block),
79        // and will be double-copied: once into channel's buffer_block,
80        // then once into receiver's context when received
81        let _ = sender.send(result);
82        sender.close();
83        cx.garbage_collect(true);
84    });
85
86    // Return the receiver to the caller
87    receiver.into_obj(&cx.block).into()
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93    use crate::core::object::{ChannelReceiver, Gc, ObjectType};
94
95    #[test]
96    fn test_go() {
97        let roots = &RootSet::default();
98        let cx = &mut Context::new(roots);
99        let obj = cx.add("test string");
100        let receiver_obj = go_internal(obj, cx);
101
102        // Verify it returns a ChannelReceiver
103        assert!(matches!(receiver_obj.untag(), ObjectType::ChannelReceiver(_)));
104
105        // Receive and verify we got a result
106        let receiver: Gc<&ChannelReceiver> = receiver_obj.try_into().unwrap();
107        let result = receiver.recv(cx);
108        assert!(result.is_ok());
109    }
110
111    #[test]
112    fn test_go_eval() {
113        let roots = &RootSet::default();
114        let cx = &mut Context::new(roots);
115
116        let receivers = [
117            go_internal(crate::reader::read("(if nil 1 2 3)", cx).unwrap().0, cx),
118            go_internal(crate::reader::read("(progn (defvar foo 1) foo)", cx).unwrap().0, cx),
119            go_internal(crate::reader::read("(progn (defvar foo 1) (makunbound 'foo) (let ((fn #'(lambda () (defvar foo 3))) (foo 7)) (funcall fn)) foo)", cx).unwrap().0, cx),
120        ];
121
122        for recv_obj in receivers {
123            let receiver: Gc<&ChannelReceiver> = recv_obj.try_into().unwrap();
124            let result = receiver.recv(cx);
125            assert!(result.is_ok(), "Expected successful evaluation");
126        }
127    }
128
129    #[test]
130    fn test_go_message() {
131        let roots = &RootSet::default();
132        println!("hello main thread");
133        let cx = &mut Context::new(roots);
134        let obj = crate::reader::read("(message \"hello from thread\")", cx).unwrap().0;
135        let receiver_obj = go_internal(obj, cx);
136
137        let receiver: Gc<&ChannelReceiver> = receiver_obj.try_into().unwrap();
138        let result = receiver.recv(cx);
139        assert!(result.is_ok());
140    }
141
142    #[test]
143    fn test_go_error() {
144        let roots = &RootSet::default();
145        let cx = &mut Context::new(roots);
146
147        // This expression should error (undefined function)
148        let obj = crate::reader::read("(undefined-function-xyz)", cx).unwrap().0;
149        let receiver_obj = go_internal(obj, cx);
150
151        let receiver: Gc<&ChannelReceiver> = receiver_obj.try_into().unwrap();
152        let result = receiver.recv(cx).unwrap();
153
154        // Should receive ('thread-errored . "error message") cons pair
155        match result.untag() {
156            ObjectType::Cons(cons) => {
157                let car = cons.car();
158                // Verify car is 'thread-errored symbol
159                assert!(matches!(car.untag(), ObjectType::Symbol(_)));
160                // Verify cdr is a string with error message
161                let cdr = cons.cdr();
162                assert!(matches!(cdr.untag(), ObjectType::String(_)));
163            }
164            _ => panic!("Expected cons pair for error result, got: {result:?}"),
165        }
166    }
167
168    #[test]
169    fn test_go_discard_receiver() {
170        let roots = &RootSet::default();
171        let cx = &mut Context::new(roots);
172
173        // Spawn thread but discard receiver immediately
174        let _receiver = go_internal(cx.add("discarded"), cx);
175        let _ = _receiver;
176
177        cx.garbage_collect(true);
178
179        // Thread should complete normally even though receiver was dropped
180        // Wait briefly to allow thread to attempt send
181        std::thread::sleep(std::time::Duration::from_millis(50));
182
183        cx.garbage_collect(true);
184        // If we get here without panic, test passes
185    }
186}