Skip to content

Commit a457393

Browse files
CopilotByron
andcommitted
Implement on-demand thread for non-blocking piped data processing
Co-authored-by: Byron <63622+Byron@users.noreply.github.com>
1 parent 896cb4d commit a457393

File tree

2 files changed

+180
-2
lines changed

2 files changed

+180
-2
lines changed

gix-filter/src/driver/apply.rs

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,24 @@ impl State {
111111
) -> Result<Option<MaybeDelayed<'a>>, Error> {
112112
match self.maybe_launch_process(driver, operation, ctx.rela_path)? {
113113
Some(Process::SingleFile { mut child, command }) => {
114-
std::io::copy(src, &mut child.stdin.take().expect("configured"))?;
114+
// To avoid deadlock when the filter immediately echoes input to output (like `cat`),
115+
// we need to write to stdin and read from stdout concurrently. If we write all data
116+
// to stdin before reading from stdout, and the pipe buffer fills up, both processes
117+
// will block: the filter blocks writing to stdout (buffer full), and we block writing
118+
// to stdin (waiting for the filter to consume data).
119+
//
120+
// Solution: Read all data into a buffer, then spawn a thread to write it to stdin
121+
// while we can immediately read from stdout.
122+
let mut input_data = Vec::new();
123+
std::io::copy(src, &mut input_data)?;
124+
125+
let stdin = child.stdin.take().expect("configured");
126+
let write_thread = WriterThread::spawn(input_data, stdin)?;
127+
115128
Ok(Some(MaybeDelayed::Immediate(Box::new(ReadFilterOutput {
116129
inner: child.stdout.take(),
117130
child: driver.required.then_some((child, command)),
131+
write_thread: Some(write_thread),
118132
}))))
119133
}
120134
Some(Process::MultiFile { client, key }) => {
@@ -202,11 +216,67 @@ pub enum MaybeDelayed<'a> {
202216
Immediate(Box<dyn std::io::Read + 'a>),
203217
}
204218

219+
/// A helper to manage writing to stdin on a separate thread to avoid deadlock.
220+
struct WriterThread {
221+
handle: Option<std::thread::JoinHandle<std::io::Result<()>>>,
222+
}
223+
224+
impl WriterThread {
225+
/// Spawn a thread that will write all data from `data` to `stdin`.
226+
fn spawn(data: Vec<u8>, mut stdin: std::process::ChildStdin) -> std::io::Result<Self> {
227+
let handle = std::thread::Builder::new()
228+
.name("gix-filter-stdin-writer".into())
229+
.spawn(move || {
230+
use std::io::Write;
231+
stdin.write_all(&data)?;
232+
// Explicitly drop stdin to close the pipe and signal EOF to the child
233+
drop(stdin);
234+
Ok(())
235+
})?;
236+
237+
Ok(Self { handle: Some(handle) })
238+
}
239+
240+
/// Wait for the writer thread to complete and return any error that occurred.
241+
fn join(&mut self) -> std::io::Result<()> {
242+
if let Some(handle) = self.handle.take() {
243+
match handle.join() {
244+
Ok(result) => result,
245+
Err(panic_info) => {
246+
let msg = if let Some(s) = panic_info.downcast_ref::<String>() {
247+
format!("Writer thread panicked: {s}")
248+
} else if let Some(s) = panic_info.downcast_ref::<&str>() {
249+
format!("Writer thread panicked: {s}")
250+
} else {
251+
"Writer thread panicked while writing to filter stdin".to_string()
252+
};
253+
Err(std::io::Error::other(msg))
254+
}
255+
}
256+
} else {
257+
Ok(())
258+
}
259+
}
260+
}
261+
262+
impl Drop for WriterThread {
263+
fn drop(&mut self) {
264+
// Best effort join on drop. If the thread panicked or failed, log it for debugging.
265+
// We can't propagate errors from Drop, so we only log them. Thread panics during Drop
266+
// are unusual but can occur if the filter process closes stdin unexpectedly.
267+
if let Err(_err) = self.join() {
268+
gix_trace::debug!(err = %_err, "Failed to join writer thread during drop");
269+
}
270+
}
271+
}
272+
205273
/// A utility type to facilitate streaming the output of a filter process.
206274
struct ReadFilterOutput {
207275
inner: Option<std::process::ChildStdout>,
208276
/// The child is present if we need its exit code to be positive.
209277
child: Option<(std::process::Child, std::process::Command)>,
278+
/// The thread writing to stdin, if any. Must be joined when reading is done.
279+
write_thread: Option<WriterThread>,
210280
}
211281

212282
pub(crate) fn handle_io_err(err: &std::io::Error, running: &mut HashMap<BString, process::Client>, process: &BStr) {
@@ -222,9 +292,28 @@ impl std::io::Read for ReadFilterOutput {
222292
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
223293
match self.inner.as_mut() {
224294
Some(inner) => {
225-
let num_read = inner.read(buf)?;
295+
let num_read = match inner.read(buf) {
296+
Ok(n) => n,
297+
Err(e) => {
298+
// On read error, ensure we join the writer thread before propagating the error
299+
if let Some(mut write_thread) = self.write_thread.take() {
300+
// Try to join but prioritize the original read error
301+
if let Err(_thread_err) = write_thread.join() {
302+
gix_trace::debug!(thread_err = %_thread_err, read_err = %e, "Writer thread error during failed read");
303+
}
304+
}
305+
return Err(e);
306+
}
307+
};
308+
226309
if num_read == 0 {
227310
self.inner.take();
311+
312+
// Join the writer thread first to ensure all data has been written
313+
if let Some(mut write_thread) = self.write_thread.take() {
314+
write_thread.join()?;
315+
}
316+
228317
if let Some((mut child, cmd)) = self.child.take() {
229318
let status = child.wait()?;
230319
if !status.success() {

gix-filter/tests/filter/driver.rs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,95 @@ pub(crate) mod apply {
371371
Ok(())
372372
}
373373

374+
#[serial]
375+
#[test]
376+
fn large_file_with_cat_filter_does_not_hang() -> crate::Result {
377+
// This test reproduces issue #2080 where using `cat` as a filter with a large file
378+
// causes a deadlock. The pipe buffer is typically 64KB on Linux, so we use files
379+
// larger than that to ensure the buffer fills up.
380+
381+
// Typical pipe buffer sizes on Unix systems
382+
const PIPE_BUFFER_SIZE: usize = 64 * 1024; // 64KB
383+
384+
let mut state = gix_filter::driver::State::default();
385+
386+
// Create a driver that uses `cat` command (which echoes input to output immediately)
387+
let driver = Driver {
388+
name: "cat".into(),
389+
clean: Some("cat".into()),
390+
smudge: Some("cat".into()),
391+
process: None,
392+
required: false,
393+
};
394+
395+
// Test with multiple sizes to ensure robustness
396+
for size in [
397+
PIPE_BUFFER_SIZE,
398+
2 * PIPE_BUFFER_SIZE,
399+
8 * PIPE_BUFFER_SIZE,
400+
16 * PIPE_BUFFER_SIZE,
401+
] {
402+
let input = vec![b'a'; size];
403+
404+
// Apply the filter - this should not hang
405+
let mut filtered = state
406+
.apply(
407+
&driver,
408+
&mut input.as_slice(),
409+
driver::Operation::Smudge,
410+
context_from_path("large-file.txt"),
411+
)?
412+
.expect("filter present");
413+
414+
let mut output = Vec::new();
415+
filtered.read_to_end(&mut output)?;
416+
417+
assert_eq!(
418+
input.len(),
419+
output.len(),
420+
"cat should pass through all data unchanged for {size} bytes"
421+
);
422+
assert_eq!(input, output, "cat should not modify the data");
423+
}
424+
Ok(())
425+
}
426+
427+
#[serial]
428+
#[test]
429+
fn large_file_with_cat_filter_early_drop() -> crate::Result {
430+
// Test that dropping the reader early doesn't cause issues (thread cleanup)
431+
let mut state = gix_filter::driver::State::default();
432+
433+
let driver = Driver {
434+
name: "cat".into(),
435+
clean: Some("cat".into()),
436+
smudge: Some("cat".into()),
437+
process: None,
438+
required: false,
439+
};
440+
441+
let input = vec![b'x'; 256 * 1024];
442+
443+
// Apply the filter but only read a small amount
444+
let mut filtered = state
445+
.apply(
446+
&driver,
447+
&mut input.as_slice(),
448+
driver::Operation::Clean,
449+
context_from_path("early-drop.txt"),
450+
)?
451+
.expect("filter present");
452+
453+
let mut output = vec![0u8; 100];
454+
filtered.read_exact(&mut output)?;
455+
assert_eq!(output, vec![b'x'; 100], "should read first 100 bytes");
456+
457+
// Drop the reader early - the thread should still clean up properly
458+
drop(filtered);
459+
460+
Ok(())
461+
}
462+
374463
pub(crate) fn extract_delayed_key(res: Option<apply::MaybeDelayed<'_>>) -> driver::Key {
375464
match res {
376465
Some(apply::MaybeDelayed::Immediate(_)) | None => {

0 commit comments

Comments
 (0)