Skip to content

Commit e3e75e5

Browse files
author
José Valim
committed
Ensure Stream.zip/2 and Stream.flat_map/2 follow up suspended calls
1 parent 5369f24 commit e3e75e5

File tree

3 files changed

+90
-21
lines changed

3 files changed

+90
-21
lines changed

lib/elixir/lib/enum.ex

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ defprotocol Enumerable do
7272
`:suspended` tuple must be explicitly handled by the caller and
7373
never leak. In practice, this means regular enumeration functions
7474
just need to concern about `:done` and `:halted` results.
75+
76+
Furthermore, a `:suspend` call must always be followed by another call,
77+
eventually halting or continuing until the end.
7578
"""
7679
@type result :: { :done, term } | { :halted, term } | { :suspended, term, continuation }
7780

lib/elixir/lib/stream.ex

Lines changed: 40 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -450,10 +450,13 @@ defmodule Stream do
450450
try do
451451
reduce.(acc)
452452
catch
453-
{ :stream_flat_map, h } -> { :halted, h }
453+
{ :stream_flat_map, h } ->
454+
reduce.({ :halt, elem(acc, 1) })
455+
next.({ :halt, next_acc })
456+
{ :halted, h }
454457
else
455-
{ _, acc } -> do_flat_map(next_acc, next, mapper, { :cont, acc }, fun)
456-
{ :suspended, acc, c } -> { :suspended, acc, &do_flat_map(next_acc, next, mapper, &1, fun, c) }
458+
{ _, acc } -> do_flat_map(next_acc, next, mapper, { :cont, acc }, fun)
459+
{ :suspended, acc, c } -> { :suspended, acc, &do_flat_map(next_acc, next, mapper, &1, fun, c) }
457460
end
458461
end
459462

@@ -677,31 +680,43 @@ defmodule Stream do
677680
right_fun = &Enumerable.reduce(right, &1, step)
678681

679682
# Return a function as a lazy enumerator.
680-
&do_zip(left_fun, [], right_fun, [], &1, &2)
683+
&do_zip([{ left_fun, [] }, { right_fun, [] }], &1, &2)
681684
end
682685

683-
defp do_zip(_left_fun, _left_acc, _right_fun, _right_acc, { :halt, acc }, _fun) do
686+
defp do_zip(zips, { :halt, acc }, _fun) do
687+
do_zip_close(zips)
684688
{ :halted, acc }
685689
end
686690

687-
defp do_zip(left_fun, left_acc, right_fun, right_acc, { :suspend, acc }, fun) do
688-
{ :suspended, acc, &do_zip(left_fun, left_acc, right_fun, right_acc, &1, fun) }
691+
defp do_zip(zips, { :suspend, acc }, fun) do
692+
{ :suspended, acc, &do_zip(zips, &1, fun) }
689693
end
690694

691-
defp do_zip(left_fun, left_acc, right_fun, right_acc, { :cont, acc }, callback) do
692-
case left_fun.({ :cont, left_acc }) do
693-
{ :suspended, [x|left_acc], left_fun } ->
694-
case right_fun.({ :cont, right_acc }) do
695-
{ :suspended, [y|right_acc], right_fun } ->
696-
do_zip(left_fun, left_acc, right_fun, right_acc, callback.({ x, y }, acc), callback)
697-
{ reason, _ } ->
698-
{ reason, acc }
699-
end
700-
{ reason, _ } ->
701-
{ reason, acc }
695+
defp do_zip(zips, { :cont, acc }, callback) do
696+
do_zip(zips, acc, callback, [], [])
697+
end
698+
699+
defp do_zip([{ fun, fun_acc }|t], acc, callback, list, buffer) do
700+
case fun.({ :cont, fun_acc }) do
701+
{ :suspended, [i|fun_acc], fun } ->
702+
do_zip(t, acc, callback, [i|list], [{ fun, fun_acc }|buffer])
703+
{ _, _ } ->
704+
do_zip_close(:lists.reverse(buffer) ++ t)
705+
{ :done, acc }
702706
end
703707
end
704708

709+
defp do_zip([], acc, callback, list, buffer) do
710+
zipped = list_to_tuple(:lists.reverse(list))
711+
do_zip(:lists.reverse(buffer), callback.(zipped, acc), callback)
712+
end
713+
714+
defp do_zip_close([]), do: :ok
715+
defp do_zip_close([{ fun, acc }|t]) do
716+
fun.({ :halt, acc })
717+
do_zip_close(t)
718+
end
719+
705720
defp do_zip_step(x, acc) do
706721
{ :suspend, [x|acc] }
707722
end
@@ -829,7 +844,8 @@ defmodule Stream do
829844
{ :suspended, acc, &do_resource(next_acc, next_fun, &1, fun, after_fun) }
830845
end
831846

832-
defp do_resource(_next_acc, _next_fun, { :halt, acc }, _fun, _after_fun) do
847+
defp do_resource(next_acc, _next_fun, { :halt, acc }, _fun, after_fun) do
848+
after_fun.(next_acc)
833849
{ :halted, acc }
834850
end
835851

@@ -841,8 +857,11 @@ defmodule Stream do
841857
after_fun.(next_acc)
842858
:erlang.raise(kind, reason, :erlang.get_stacktrace)
843859
else
844-
nil -> { :done, acc }
845-
{ v, next_acc } -> do_resource(next_acc, next_fun, fun.(v, acc), fun, after_fun)
860+
nil ->
861+
after_fun.(next_acc)
862+
{ :done, acc }
863+
{ v, next_acc } ->
864+
do_resource(next_acc, next_fun, fun.(v, acc), fun, after_fun)
846865
end
847866
end
848867

lib/elixir/test/elixir/stream_test.exs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,28 @@ defmodule StreamTest do
242242
assert Enum.zip(list, list) == Enum.zip(stream, stream)
243243
end
244244

245+
test "flat_map does not leave stream suspended" do
246+
stream = Stream.flat_map [1,2,3],
247+
fn i ->
248+
Stream.resource(fn -> i end,
249+
fn acc -> { acc, acc + 1 } end,
250+
fn _ -> Process.put(:stream_flat_map, true) end)
251+
end
252+
253+
Process.put(:stream_flat_map, false)
254+
assert stream |> Enum.take(3) == [1,2,3]
255+
assert Process.get(:stream_flat_map)
256+
257+
stream = Stream.resource(fn -> 1 end,
258+
fn acc -> { acc, acc + 1 } end,
259+
fn _ -> Process.put(:stream_flat_map, true) end)
260+
stream = Stream.flat_map(stream, fn i -> [i, i + 1, i + 2] end)
261+
262+
Process.put(:stream_flat_map, false)
263+
assert stream |> Enum.take(3) == [1,2,3]
264+
assert Process.get(:stream_flat_map)
265+
end
266+
245267
test "iterate" do
246268
stream = Stream.iterate(0, &(&1+2))
247269
assert Enum.take(stream, 5) == [0,2,4,6,8]
@@ -402,6 +424,31 @@ defmodule StreamTest do
402424
[{1,:a},{2,:b},{3,:c},{4,:a},{5,:b},{6,:c}]
403425
end
404426

427+
test "zip/2 does not leave streams suspended" do
428+
stream = Stream.resource(fn -> 1 end,
429+
fn acc -> { acc, acc + 1 } end,
430+
fn _ -> Process.put(:stream_zip, true) end)
431+
432+
Process.put(:stream_zip, false)
433+
assert Stream.zip([:a, :b, :c], stream) |> Enum.to_list == [a: 1, b: 2, c: 3]
434+
assert Process.get(:stream_zip)
435+
436+
Process.put(:stream_zip, false)
437+
assert Stream.zip(stream, [:a, :b, :c]) |> Enum.to_list == [{ 1, :a }, { 2, :b }, { 3, :c }]
438+
assert Process.get(:stream_zip)
439+
end
440+
441+
test "zip/2 does not leave streams suspended on halt" do
442+
stream = Stream.resource(fn -> 1 end,
443+
fn acc -> { acc, acc + 1 } end,
444+
fn _ -> Process.put(:stream_zip, :done) end)
445+
446+
assert Stream.zip([:a, :b, :c, :d, :e], stream) |> Enum.take(3) ==
447+
[a: 1, b: 2, c: 3]
448+
449+
assert Process.get(:stream_zip) == :done
450+
end
451+
405452
test "with_index" do
406453
stream = Stream.with_index([1,2,3])
407454
assert is_lazy(stream)

0 commit comments

Comments
 (0)