Skip to content

Commit bdcb343

Browse files
committed
Blocking async NIFs
1 parent 5ab4ac1 commit bdcb343

File tree

10 files changed

+882
-23
lines changed

10 files changed

+882
-23
lines changed

rustler/src/codegen_runtime.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,14 @@ unsafe impl NifReturnable for OwnedBinary {
5858
}
5959
}
6060

61+
// Allow returning NifReturned directly from NIFs
62+
// This is useful for advanced use cases like yielding NIFs
63+
unsafe impl NifReturnable for NifReturned {
64+
unsafe fn into_returned(self, _env: Env) -> NifReturned {
65+
self
66+
}
67+
}
68+
6169
pub enum NifReturned {
6270
Term(NIF_TERM),
6371
Raise(NIF_TERM),

rustler/src/runtime/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@ pub use tokio::TokioRuntime;
1111
#[cfg(feature = "tokio-rt")]
1212
pub use tokio::{ConfigError, RuntimeConfig};
1313

14+
#[cfg(feature = "async-rt")]
15+
pub mod yielding;
16+
17+
#[cfg(feature = "async-rt")]
18+
pub use yielding::{yield_now, yielding_nif_run, YieldingTaskState};
19+
1420
#[cfg(rustler_unstable)]
1521
pub mod channel;
1622

rustler/src/runtime/yielding.rs

Lines changed: 311 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,311 @@
1+
/// True cooperative yielding NIFs using enif_schedule_nif
2+
///
3+
/// This approach makes NIF calls appear synchronous to Elixir while yielding internally.
4+
use crate::codegen_runtime::NifReturned;
5+
use crate::schedule::SchedulerFlags;
6+
use crate::wrapper::NIF_TERM;
7+
use crate::{Encoder, Env, ResourceArc};
8+
use std::ffi::CString;
9+
use std::future::Future;
10+
use std::pin::Pin;
11+
use std::sync::Mutex;
12+
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
13+
14+
// Type-erased poll function that takes Env and returns encoded result
15+
type PollFn = dyn FnMut(&mut Context<'_>, Env) -> Poll<NIF_TERM> + Send;
16+
17+
/// Saved state for a yielding computation
18+
pub struct YieldingTaskState {
19+
/// Type-erased future polling function
20+
poll_fn: Mutex<Pin<Box<PollFn>>>,
21+
}
22+
23+
impl crate::Resource for YieldingTaskState {}
24+
25+
// Auto-register the resource
26+
crate::codegen_runtime::inventory::submit! {
27+
crate::resource::Registration::new::<YieldingTaskState>()
28+
}
29+
30+
/// Run a future cooperatively, yielding to the BEAM scheduler as needed.
31+
///
32+
/// This is the main entry point for yielding NIFs. Call this with your async code
33+
/// and it will handle yielding automatically.
34+
///
35+
/// # Example
36+
///
37+
/// ```ignore
38+
/// use rustler::codegen_runtime::NifReturned;
39+
///
40+
/// #[rustler::nif]
41+
/// fn my_yielding_nif(env: Env) -> NifReturned {
42+
/// yielding_nif_run(env, async {
43+
/// // Your async code here - yields automatically
44+
/// let mut sum = 0;
45+
/// for i in 0..1000 {
46+
/// sum += i;
47+
/// // Yield periodically to avoid blocking
48+
/// yield_now().await;
49+
/// }
50+
/// sum
51+
/// })
52+
/// }
53+
/// ```
54+
///
55+
/// From Elixir, this appears as a normal blocking call:
56+
/// ```elixir
57+
/// result = MyNif.my_yielding_nif() # Blocks cooperatively until done
58+
/// ```
59+
pub fn yielding_nif_run<F, T>(env: Env, future: F) -> NifReturned
60+
where
61+
F: Future<Output = T> + Send + 'static,
62+
T: Encoder + Send + 'static,
63+
{
64+
start_yielding(env, future)
65+
}
66+
67+
/// Internal function for managing continuation state.
68+
///
69+
/// This should not be called directly by users.
70+
pub fn yielding_nif<F, T>(
71+
env: Env,
72+
state: Option<ResourceArc<YieldingTaskState>>,
73+
future: F,
74+
) -> NifReturned
75+
where
76+
F: Future<Output = T> + Send + 'static,
77+
T: Encoder + Send + 'static,
78+
{
79+
match state {
80+
None => {
81+
// Initial call - create state and start
82+
start_yielding(env, future)
83+
}
84+
Some(state_resource) => {
85+
// Continuation - resume from state
86+
resume_yielding(env, state_resource)
87+
}
88+
}
89+
}
90+
91+
/// Start a new yielding computation
92+
fn start_yielding<F, T>(env: Env, future: F) -> NifReturned
93+
where
94+
F: Future<Output = T> + Send + 'static,
95+
T: Encoder + Send + 'static,
96+
{
97+
// Box and pin the future
98+
let mut future = Box::pin(future);
99+
100+
// Create type-erased poll function
101+
let poll_fn: Pin<Box<PollFn>> =
102+
Box::pin(
103+
move |ctx: &mut Context<'_>, env: Env| match future.as_mut().poll(ctx) {
104+
Poll::Ready(result) => Poll::Ready(result.encode(env).as_c_arg()),
105+
Poll::Pending => Poll::Pending,
106+
},
107+
);
108+
109+
// Create task state resource
110+
let task_state = YieldingTaskState {
111+
poll_fn: Mutex::new(poll_fn),
112+
};
113+
let resource = ResourceArc::new(task_state);
114+
115+
// Poll immediately
116+
poll_and_return(env, resource)
117+
}
118+
119+
/// Resume a yielding computation from saved state
120+
fn resume_yielding(env: Env, state: ResourceArc<YieldingTaskState>) -> NifReturned {
121+
poll_and_return(env, state)
122+
}
123+
124+
/// Poll the future and return appropriate NifReturned
125+
fn poll_and_return(env: Env, state: ResourceArc<YieldingTaskState>) -> NifReturned {
126+
// Create a simple waker that does nothing (we'll poll again on reschedule)
127+
let waker = noop_waker();
128+
let mut context = Context::from_waker(&waker);
129+
130+
// Poll the future first - don't check timeslice before giving it a chance to complete
131+
let result = {
132+
let mut poll_fn = state
133+
.poll_fn
134+
.lock()
135+
.expect("YieldingTaskState mutex poisoned");
136+
137+
// SAFETY: We're not moving the function, just calling it
138+
let f = unsafe { poll_fn.as_mut().get_unchecked_mut() };
139+
f(&mut context, env)
140+
};
141+
142+
match result {
143+
Poll::Ready(term) => {
144+
// Future completed - return result
145+
NifReturned::Term(term)
146+
}
147+
Poll::Pending => {
148+
// Future still running - check if we should yield
149+
// Consume a small amount of timeslice (10%) and check if we should continue
150+
if crate::schedule::consume_timeslice(env, 10) {
151+
// Still have timeslice - could poll again immediately
152+
// But for now, let's reschedule to give other work a chance
153+
reschedule_continuation(env, state)
154+
} else {
155+
// Timeslice exhausted - definitely reschedule
156+
reschedule_continuation(env, state)
157+
}
158+
}
159+
}
160+
}
161+
162+
/// Reschedule the continuation to run again
163+
fn reschedule_continuation(env: Env, state: ResourceArc<YieldingTaskState>) -> NifReturned {
164+
// Encode the state resource as an argument for the continuation
165+
let state_term = state.encode(env).as_c_arg();
166+
167+
NifReturned::Reschedule {
168+
fun_name: CString::new("__yielding_continuation").unwrap(),
169+
flags: SchedulerFlags::Normal,
170+
fun: yielding_continuation_raw,
171+
args: vec![state_term],
172+
}
173+
}
174+
175+
/// Raw C-ABI continuation function called by enif_schedule_nif
176+
unsafe extern "C" fn yielding_continuation_raw(
177+
env_ptr: *mut crate::sys::ErlNifEnv,
178+
argc: i32,
179+
argv: *const NIF_TERM,
180+
) -> NIF_TERM {
181+
// Create Env from the pointer
182+
let env = Env::new_internal(&env_ptr, env_ptr, crate::env::EnvKind::Callback);
183+
184+
// Decode the state resource from argv[0]
185+
if argc != 1 {
186+
return env.error_tuple("Expected 1 argument").as_c_arg();
187+
}
188+
189+
let state_term = crate::Term::new(env, *argv);
190+
191+
match state_term.decode::<ResourceArc<YieldingTaskState>>() {
192+
Ok(state) => {
193+
// Resume the computation
194+
match resume_yielding(env, state) {
195+
NifReturned::Term(term) => term,
196+
NifReturned::Reschedule {
197+
fun_name,
198+
flags,
199+
fun,
200+
args,
201+
} => {
202+
// Call enif_schedule_nif to reschedule again
203+
unsafe {
204+
crate::sys::enif_schedule_nif(
205+
env_ptr,
206+
fun_name.as_ptr(),
207+
flags as i32,
208+
fun,
209+
args.len() as i32,
210+
args.as_ptr(),
211+
)
212+
}
213+
}
214+
NifReturned::BadArg => crate::types::atom::error().encode(env).as_c_arg(),
215+
NifReturned::Raise(term) => term,
216+
}
217+
}
218+
Err(_) => {
219+
// Failed to decode state - return error
220+
env.error_tuple("Invalid task state").as_c_arg()
221+
}
222+
}
223+
}
224+
225+
/// Create a no-op waker
226+
///
227+
/// Since we're using cooperative yielding with enif_schedule_nif, we don't need
228+
/// the waker to do anything. We'll poll again when we're rescheduled.
229+
fn noop_waker() -> Waker {
230+
fn noop_clone(_: *const ()) -> RawWaker {
231+
noop_raw_waker()
232+
}
233+
fn noop(_: *const ()) {}
234+
235+
fn noop_raw_waker() -> RawWaker {
236+
RawWaker::new(
237+
std::ptr::null(),
238+
&RawWakerVTable::new(noop_clone, noop, noop, noop),
239+
)
240+
}
241+
242+
unsafe { Waker::from_raw(noop_raw_waker()) }
243+
}
244+
245+
/// A simple future that yields once before completing.
246+
///
247+
/// This is useful for inserting yield points in your async code to check
248+
/// the timeslice and give the scheduler a chance to run other work.
249+
///
250+
/// # Example
251+
///
252+
/// ```ignore
253+
/// async fn process_large_file(path: String) -> Result<Vec<u8>> {
254+
/// let mut buffer = Vec::new();
255+
/// let mut file = std::fs::File::open(path)?;
256+
///
257+
/// loop {
258+
/// let mut chunk = vec![0u8; 4096];
259+
/// match file.read(&mut chunk)? {
260+
/// 0 => break,
261+
/// n => {
262+
/// buffer.extend_from_slice(&chunk[..n]);
263+
/// // Yield to scheduler periodically
264+
/// yield_now().await;
265+
/// }
266+
/// }
267+
/// }
268+
///
269+
/// Ok(buffer)
270+
/// }
271+
/// ```
272+
pub struct YieldNow {
273+
yielded: bool,
274+
}
275+
276+
impl Future for YieldNow {
277+
type Output = ();
278+
279+
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
280+
if self.yielded {
281+
Poll::Ready(())
282+
} else {
283+
self.yielded = true;
284+
Poll::Pending
285+
}
286+
}
287+
}
288+
289+
/// Yield control back to the BEAM scheduler.
290+
///
291+
/// This returns a future that yields once before completing, allowing
292+
/// the scheduler to run other work if needed.
293+
pub fn yield_now() -> YieldNow {
294+
YieldNow { yielded: false }
295+
}
296+
297+
#[cfg(test)]
298+
mod tests {
299+
use super::*;
300+
301+
#[test]
302+
fn test_yield_now_completes() {
303+
// YieldNow should return Pending once, then Ready
304+
let mut future = Box::pin(yield_now());
305+
let waker = noop_waker();
306+
let mut ctx = Context::from_waker(&waker);
307+
308+
assert!(matches!(future.as_mut().poll(&mut ctx), Poll::Pending));
309+
assert!(matches!(future.as_mut().poll(&mut ctx), Poll::Ready(())));
310+
}
311+
}

rustler_codegen/src/lib.rs

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -103,17 +103,7 @@ pub fn nif(args: TokenStream, input: TokenStream) -> TokenStream {
103103

104104
let input = syn::parse_macro_input!(input as syn::ItemFn);
105105

106-
// Reject async functions in #[rustler::nif]
107-
if input.sig.asyncness.is_some() {
108-
return syn::Error::new_spanned(
109-
input.sig.asyncness,
110-
"async functions are not supported with #[rustler::nif]. Use #[rustler::task] instead.",
111-
)
112-
.to_compile_error()
113-
.into();
114-
}
115-
116-
nif::transcoder_decorator(nif_attributes, input).into()
106+
nif::transcoder_decorator(nif_attributes, input, false).into()
117107
}
118108

119109
/// Wrap an async function as a spawned task that returns a reference.
@@ -162,7 +152,7 @@ pub fn task(args: TokenStream, input: TokenStream) -> TokenStream {
162152
.into();
163153
}
164154

165-
nif::transcoder_decorator(nif_attributes, input).into()
155+
nif::transcoder_decorator(nif_attributes, input, true).into()
166156
}
167157

168158
/// Derives implementations for the `Encoder` and `Decoder` traits

0 commit comments

Comments
 (0)