Skip to content

Commit b0e1c09

Browse files
author
José Valim
committed
Add Stream.transform/3
Note: the function name may be modified before the next release.
1 parent 96a1628 commit b0e1c09

File tree

2 files changed

+179
-93
lines changed

2 files changed

+179
-93
lines changed

lib/elixir/lib/stream.ex

Lines changed: 159 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,22 @@ defmodule Stream do
360360
end
361361
end
362362

363+
@doc """
364+
Creates a stream that will apply the given function on enumeration and
365+
flatten the result.
366+
367+
## Examples
368+
369+
iex> stream = Stream.flat_map([1, 2, 3], fn(x) -> [x, x * 2] end)
370+
iex> Enum.to_list(stream)
371+
[1, 2, 2, 4, 3, 6]
372+
373+
"""
374+
@spec flat_map(Enumerable.t, (element -> Enumerable.t)) :: Enumerable.t
375+
def flat_map(enum, mapper) do
376+
transform(enum, nil, fn val, nil -> { mapper.(val), nil } end)
377+
end
378+
363379
@doc """
364380
Creates a stream that filters elements according to
365381
the given function on enumeration.
@@ -410,99 +426,6 @@ defmodule Stream do
410426
lazy enum, fn(f1) -> R.map(fun, f1) end
411427
end
412428

413-
@doc """
414-
Creates a stream that applies the given function to each
415-
element, emits the result and uses the same result as the accumulator
416-
for the next computation.
417-
418-
## Examples
419-
420-
iex> stream = Stream.scan(1..5, &(&1 + &2))
421-
iex> Enum.to_list(stream)
422-
[1,3,6,10,15]
423-
424-
"""
425-
@spec scan(Enumerable.t, (element, acc -> any)) :: Enumerable.t
426-
def scan(enum, fun) do
427-
lazy enum, :first, fn(f1) -> R.scan_2(fun, f1) end
428-
end
429-
430-
@doc """
431-
Creates a stream that applies the given function to each
432-
element, emits the result and uses the same result as the accumulator
433-
for the next computation. Uses the given `acc` as the starting value.
434-
435-
## Examples
436-
437-
iex> stream = Stream.scan(1..5, 0, &(&1 + &2))
438-
iex> Enum.to_list(stream)
439-
[1,3,6,10,15]
440-
441-
"""
442-
@spec scan(Enumerable.t, acc, (element, acc -> any)) :: Enumerable.t
443-
def scan(enum, acc, fun) do
444-
lazy enum, acc, fn(f1) -> R.scan_3(fun, f1) end
445-
end
446-
447-
@doc """
448-
Creates a stream that will apply the given function on enumeration and
449-
flatten the result.
450-
451-
## Examples
452-
453-
iex> stream = Stream.flat_map([1, 2, 3], fn(x) -> [x, x * 2] end)
454-
iex> Enum.to_list(stream)
455-
[1, 2, 2, 4, 3, 6]
456-
457-
"""
458-
@spec flat_map(Enumerable.t, (element -> any)) :: Enumerable.t
459-
def flat_map(enum, mapper) do
460-
&do_flat_map(enum, mapper, &1, &2)
461-
end
462-
463-
defp do_flat_map(enumerables, mapper, acc, fun) do
464-
fun = &do_flat_map_each(fun, &1, &2)
465-
step = &do_flat_map_step/2
466-
next = &Enumerable.reduce(enumerables, &1, step)
467-
do_flat_map([], next, mapper, acc, fun)
468-
end
469-
470-
defp do_flat_map(next_acc, next, mapper, acc, fun) do
471-
case next.({ :cont, next_acc }) do
472-
{ :suspended, [val|next_acc], next } ->
473-
do_flat_map(next_acc, next, mapper, acc, fun, &Enumerable.reduce(mapper.(val), &1, fun))
474-
{ reason, _ } ->
475-
{ reason, elem(acc, 1) }
476-
end
477-
end
478-
479-
defp do_flat_map(next_acc, next, mapper, acc, fun, reduce) do
480-
try do
481-
reduce.(acc)
482-
catch
483-
{ :stream_flat_map, h } ->
484-
next.({ :halt, next_acc })
485-
{ :halted, h }
486-
kind, reason ->
487-
next.({ :halt, next_acc })
488-
:erlang.raise(kind, reason, :erlang.get_stacktrace)
489-
else
490-
{ _, acc } -> do_flat_map(next_acc, next, mapper, { :cont, acc }, fun)
491-
{ :suspended, acc, c } -> { :suspended, acc, &do_flat_map(next_acc, next, mapper, &1, fun, c) }
492-
end
493-
end
494-
495-
defp do_flat_map_each(f, x, acc) do
496-
case f.(x, acc) do
497-
{ :halt, h } -> throw({ :stream_flat_map, h })
498-
{ _, _ } = o -> o
499-
end
500-
end
501-
502-
defp do_flat_map_step(x, acc) do
503-
{ :suspend, [x|acc] }
504-
end
505-
506429
@doc """
507430
Creates a stream that will reject elements according to
508431
the given function on enumeration.
@@ -543,6 +466,40 @@ defmodule Stream do
543466
:ok
544467
end
545468

469+
@doc """
470+
Creates a stream that applies the given function to each
471+
element, emits the result and uses the same result as the accumulator
472+
for the next computation.
473+
474+
## Examples
475+
476+
iex> stream = Stream.scan(1..5, &(&1 + &2))
477+
iex> Enum.to_list(stream)
478+
[1,3,6,10,15]
479+
480+
"""
481+
@spec scan(Enumerable.t, (element, acc -> any)) :: Enumerable.t
482+
def scan(enum, fun) do
483+
lazy enum, :first, fn(f1) -> R.scan_2(fun, f1) end
484+
end
485+
486+
@doc """
487+
Creates a stream that applies the given function to each
488+
element, emits the result and uses the same result as the accumulator
489+
for the next computation. Uses the given `acc` as the starting value.
490+
491+
## Examples
492+
493+
iex> stream = Stream.scan(1..5, 0, &(&1 + &2))
494+
iex> Enum.to_list(stream)
495+
[1,3,6,10,15]
496+
497+
"""
498+
@spec scan(Enumerable.t, acc, (element, acc -> any)) :: Enumerable.t
499+
def scan(enum, acc, fun) do
500+
lazy enum, acc, fn(f1) -> R.scan_3(fun, f1) end
501+
end
502+
546503
@doc """
547504
Lazily takes the next `n` items from the enumerable and stops
548505
enumeration.
@@ -639,6 +596,115 @@ defmodule Stream do
639596
lazy enum, fn(f1) -> R.take_while(fun, f1) end
640597
end
641598

599+
600+
@doc """
601+
Transforms an existing stream.
602+
603+
It expects an accumulator and a function that receives each stream item
604+
and an accumulator, and must return a tuple containing a new stream
605+
(often a list) with the new accumulator or simply return the atom `:halt`.
606+
607+
## Examples
608+
609+
`Stream.transform/3` is a useful as it can be used as basis to implement
610+
many of the functions defined in this module. For example, we can implement
611+
`Stream.take(enum, n)` as follows:
612+
613+
iex> enum = 1..100
614+
iex> n = 3
615+
iex> stream = Stream.transform(enum, 0, fn i, acc ->
616+
...> if acc < n, do: { [i], acc + 1 }, else: :halt
617+
...> end)
618+
iex> Enum.to_list(stream)
619+
[1,2,3]
620+
621+
"""
622+
@spec transform(Enumerable.t, acc, fun) :: Enumerable.t when
623+
fun: (element, acc -> { Enumerable.t, acc } | :halt),
624+
acc: any
625+
def transform(enum, acc, reducer) do
626+
&do_transform(enum, acc, reducer, &1, &2)
627+
end
628+
629+
defp do_transform(enumerables, user_acc, user, inner_acc, fun) do
630+
inner = &do_transform_each(&1, &2, fun)
631+
step = &do_transform_step(&1, &2)
632+
next = &Enumerable.reduce(enumerables, &1, step)
633+
do_transform(user_acc, user, fun, [], next, inner_acc, inner)
634+
end
635+
636+
defp do_transform(user_acc, user, fun, next_acc, next, inner_acc, inner) do
637+
case next.({ :cont, next_acc }) do
638+
{ :suspended, [val|next_acc], next } ->
639+
try do
640+
user.(val, user_acc)
641+
catch
642+
kind, reason ->
643+
next.({ :halt, next_acc })
644+
:erlang.raise(kind, reason, :erlang.get_stacktrace)
645+
else
646+
{ [], user_acc } ->
647+
do_transform(user_acc, user, fun, next_acc, next, inner_acc, inner)
648+
{ list, user_acc } when is_list(list) ->
649+
do_list_transform(user_acc, user, fun, next_acc, next, inner_acc, inner, &Enumerable.List.reduce(list, &1, fun))
650+
{ other, user_acc } ->
651+
do_other_transform(user_acc, user, fun, next_acc, next, inner_acc, inner, &Enumerable.reduce(other, &1, inner))
652+
:halt ->
653+
next.({ :halt, next_acc })
654+
{ :halted, elem(inner_acc, 1) }
655+
end
656+
{ reason, _ } ->
657+
{ reason, elem(inner_acc, 1) }
658+
end
659+
end
660+
661+
defp do_list_transform(user_acc, user, fun, next_acc, next, inner_acc, inner, reduce) do
662+
try do
663+
reduce.(inner_acc)
664+
catch
665+
kind, reason ->
666+
next.({ :halt, next_acc })
667+
:erlang.raise(kind, reason, :erlang.get_stacktrace)
668+
else
669+
{ :done, acc } ->
670+
do_transform(user_acc, user, fun, next_acc, next, { :cont, acc }, inner)
671+
{ :halted, acc } ->
672+
next.({ :halt, next_acc })
673+
{ :halted, acc }
674+
{ :suspended, acc, c } ->
675+
{ :suspended, acc, &do_list_transform(user_acc, user, fun, next_acc, next, &1, inner, c) }
676+
end
677+
end
678+
679+
defp do_other_transform(user_acc, user, fun, next_acc, next, inner_acc, inner, reduce) do
680+
try do
681+
reduce.(inner_acc)
682+
catch
683+
{ :stream_transform, h } ->
684+
next.({ :halt, next_acc })
685+
{ :halted, h }
686+
kind, reason ->
687+
next.({ :halt, next_acc })
688+
:erlang.raise(kind, reason, :erlang.get_stacktrace)
689+
else
690+
{ _, acc } ->
691+
do_transform(user_acc, user, fun, next_acc, next, { :cont, acc }, inner)
692+
{ :suspended, acc, c } ->
693+
{ :suspended, acc, &do_other_transform(user_acc, user, fun, next_acc, next, &1, inner, c) }
694+
end
695+
end
696+
697+
defp do_transform_each(x, acc, f) do
698+
case f.(x, acc) do
699+
{ :halt, h } -> throw({ :stream_transform, h })
700+
{ _, _ } = o -> o
701+
end
702+
end
703+
704+
defp do_transform_step(x, acc) do
705+
{ :suspend, [x|acc] }
706+
end
707+
642708
@doc """
643709
Creates a stream that only emits elements if they are unique.
644710

lib/elixir/test/elixir/stream_test.exs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,26 @@ defmodule StreamTest do
300300
assert Process.get(:stream_flat_map)
301301
end
302302

303+
test "transform" do
304+
stream = Stream.transform([1, 2, 3], 0, &{ [&1, &2], &1 + &2 })
305+
assert is_lazy(stream)
306+
assert Enum.to_list(stream) == [1, 0, 2, 1, 3, 3]
307+
308+
nats = Stream.iterate(1, &(&1 + 1))
309+
assert Stream.transform(nats, 0, &{ [&1, &2], &1 + &2 }) |> Enum.take(6) == [1, 0, 2, 1, 3, 3]
310+
end
311+
312+
test "transform with halt" do
313+
stream = Stream.resource(fn -> 1 end,
314+
fn acc -> { acc, acc + 1 } end,
315+
fn _ -> Process.put(:stream_transform, true) end)
316+
stream = Stream.transform(stream, 0, fn i, acc -> if acc < 3, do: { [i], acc + 1 }, else: :halt end)
317+
318+
Process.put(:stream_transform, false)
319+
assert Enum.to_list(stream) == [1,2,3]
320+
assert Process.get(:stream_transform)
321+
end
322+
303323
test "iterate" do
304324
stream = Stream.iterate(0, &(&1+2))
305325
assert Enum.take(stream, 5) == [0,2,4,6,8]

0 commit comments

Comments
 (0)