@@ -128,7 +128,6 @@ defmodule Stream do
128128 end
129129 end
130130
131- @ type t :: Lazy . t | ( acc , ( element , acc -> acc ) -> acc )
132131 @ type acc :: any
133132 @ type element :: any
134133 @ type index :: non_neg_integer
@@ -146,7 +145,7 @@ defmodule Stream do
146145 [6,7,8,9,10]
147146
148147 """
149- @ spec drop ( Enumerable . t , non_neg_integer ) :: t
148+ @ spec drop ( Enumerable . t , non_neg_integer ) :: Enumerable . t
150149 def drop ( enum , n ) when n >= 0 do
151150 lazy enum , n , fn ( f1 ) ->
152151 fn
@@ -170,7 +169,7 @@ defmodule Stream do
170169 [6,7,8,9,10]
171170
172171 """
173- @ spec drop_while ( Enumerable . t , ( element -> as_boolean ( term ) ) ) :: t
172+ @ spec drop_while ( Enumerable . t , ( element -> as_boolean ( term ) ) ) :: Enumerable . t
174173 def drop_while ( enum , f ) do
175174 lazy enum , true , fn ( f1 ) ->
176175 fn entry , [ h , bool | t ] = orig ->
@@ -195,7 +194,7 @@ defmodule Stream do
195194 [2]
196195
197196 """
198- @ spec filter ( Enumerable . t , ( element -> as_boolean ( term ) ) ) :: t
197+ @ spec filter ( Enumerable . t , ( element -> as_boolean ( term ) ) ) :: Enumerable . t
199198 def filter ( enum , f ) do
200199 lazy enum , fn ( f1 ) ->
201200 fn ( entry , acc ) ->
@@ -215,7 +214,7 @@ defmodule Stream do
215214 [2,4,6]
216215
217216 """
218- @ spec map ( Enumerable . t , ( element -> any ) ) :: t
217+ @ spec map ( Enumerable . t , ( element -> any ) ) :: Enumerable . t
219218 def map ( enum , f ) do
220219 lazy enum , fn ( f1 ) ->
221220 fn ( entry , acc ) ->
@@ -235,7 +234,7 @@ defmodule Stream do
235234 [1, 2, 2, 4, 3, 6]
236235
237236 """
238- @ spec flat_map ( Enumerable . t , ( element -> any ) ) :: t
237+ @ spec flat_map ( Enumerable . t , ( element -> any ) ) :: Enumerable . t
239238 def flat_map ( enum , f ) do
240239 lazy enum , fn ( f1 ) ->
241240 fn ( entry , acc ) ->
@@ -276,7 +275,7 @@ defmodule Stream do
276275 [1,3]
277276
278277 """
279- @ spec reject ( Enumerable . t , ( element -> as_boolean ( term ) ) ) :: t
278+ @ spec reject ( Enumerable . t , ( element -> as_boolean ( term ) ) ) :: Enumerable . t
280279 def reject ( enum , f ) do
281280 lazy enum , fn ( f1 ) ->
282281 fn ( entry , acc ) ->
@@ -300,7 +299,7 @@ defmodule Stream do
300299 [1,2,3,1,2]
301300
302301 """
303- @ spec take ( Enumerable . t , non_neg_integer ) :: t
302+ @ spec take ( Enumerable . t , non_neg_integer ) :: Enumerable . t
304303 def take ( enum , n ) when n > 0 do
305304 lazy enum , n , fn ( f1 ) ->
306305 fn ( entry , [ h , n | t ] = orig ) ->
@@ -327,7 +326,7 @@ defmodule Stream do
327326 [1,2,3,4,5]
328327
329328 """
330- @ spec take_while ( Enumerable . t , ( element -> as_boolean ( term ) ) ) :: t
329+ @ spec take_while ( Enumerable . t , ( element -> as_boolean ( term ) ) ) :: Enumerable . t
331330 def take_while ( enum , f ) do
332331 lazy enum , fn ( f1 ) ->
333332 fn ( entry , acc ) ->
@@ -351,7 +350,7 @@ defmodule Stream do
351350 [{1,0},{2,1},{3,2}]
352351
353352 """
354- @ spec with_index ( Enumerable . t ) :: t
353+ @ spec with_index ( Enumerable . t ) :: Enumerable . t
355354 def with_index ( enum ) do
356355 lazy enum , 0 , fn ( f1 ) ->
357356 fn ( entry , [ h , counter | t ] ) ->
@@ -373,7 +372,7 @@ defmodule Stream do
373372 [1,2,3,4,5,6,7,8,9]
374373
375374 """
376- @ spec concat ( Enumerable . t ) :: t
375+ @ spec concat ( Enumerable . t ) :: Enumerable . t
377376 def concat ( enumerables ) do
378377 & do_concat ( enumerables , & 1 , & 2 )
379378 end
@@ -394,37 +393,87 @@ defmodule Stream do
394393 [1,2,3,1,2,3]
395394
396395 """
397- @ spec concat ( Enumerable . t , Enumerable . t ) :: t
396+ @ spec concat ( Enumerable . t , Enumerable . t ) :: Enumerable . t
398397 def concat ( first , second ) do
399398 & do_concat ( [ first , second ] , & 1 , & 2 )
400399 end
401400
402401 defp do_concat( enumerables, acc , fun ) do
403- fun = & do_concat_each ( fun , & 1 , & 2 )
404- Enumerable. reduce ( enumerables , acc , fn x , acc ->
405- do_concat_reduce ( & Enumerable . reduce ( x , & 1 , fun ) , { :cont , acc } )
406- end )
402+ step = &do_concat_step / 2
403+ next = & Enumerable . reduce ( enumerables , & 1 , step )
404+ do _concat ( [ ] , next , acc , fun )
407405 end
408406
409- defp do_concat_reduce ( reduce , acc ) do
410- try do
411- reduce . ( acc )
412- catch
413- { :stream_concat , acc } -> acc
414- else
415- { :done , acc } -> { :cont , acc }
416- { :halted , acc } -> { :cont , acc }
417- { :suspended , acc , c } -> { :suspend , acc , & do_concat_reduce ( c , & 1 ) }
407+ defp do_concat ( next_acc , next , acc , fun ) do
408+ case next . ( { :cont , next_acc } ) do
409+ { :suspended , [ enum | next_acc ] , next } ->
410+ do_concat ( next_acc , next , acc , fun , & Enumerable . reduce ( enum , & 1 , fun ) )
411+ { reason , _ } ->
412+ { reason , elem ( acc , 1 ) }
418413 end
419414 end
420415
421- defp do_concat_each ( fun , x , acc ) do
422- case fun . ( x , acc ) do
423- { :halt , _ } = h -> throw ( { :stream_concat , h } )
424- { _ , _ } = o -> o
416+ defp do_concat ( next_acc , next , acc , fun , reduce ) do
417+ case reduce . ( acc ) do
418+ { :done , acc } -> do_concat ( next_acc , next , { :cont , acc } , fun )
419+ { :halted , acc } = h -> h
420+ { :suspended , acc , c } -> { :suspended , acc , & do_concat ( next_acc , next , & 1 , fun , c ) }
421+ end
422+ end
423+
424+ defp do_concat_step ( x , acc ) do
425+ { :suspend , [ x | acc ] }
426+ end
427+
428+ @ doc """
429+ Zips two collections together, lazily.
430+
431+ The zipping finishes as soon as any enumerable completes.
432+
433+ ## Examples
434+
435+ iex> concat = Stream.concat(1..3, 4..6)
436+ iex> cycle = Stream.cycle([:a, :b, :c])
437+ iex> Stream.zip(concat, cycle) |> Enum.to_list
438+ [{1,:a},{2,:b},{3,:c},{4,:a},{5,:b},{6,:c}]
439+
440+ """
441+ @spec zip( Enumerable. t , Enumerable . t ) :: Enumerable . t
442+ def zip ( left , right ) do
443+ step = & do_zip_step ( & 1 , & 2 )
444+ left_fun = & Enumerable . reduce ( left , & 1 , step )
445+ right_fun = & Enumerable . reduce ( right , & 1 , step )
446+
447+ # Return a function as a lazy enumerator.
448+ & do_zip ( left_fun , [ ] , right_fun , [ ] , & 1 , & 2 )
449+ end
450+
451+ defp do_zip( _left_fun, _left_acc , _right_fun , _right_acc , { :halt , acc } , _fun ) do
452+ { :halted , acc }
453+ end
454+
455+ defp do_zip ( left_fun , left_acc , right_fun , right_acc , { :suspend , acc } , fun ) do
456+ { :suspended , acc , & do_zip ( left_fun , left_acc , right_fun , right_acc , & 1 , fun ) }
457+ end
458+
459+ defp do_zip ( left_fun , left_acc , right_fun , right_acc , { :cont , acc } , callback ) do
460+ case left_fun . ( { :cont , left_acc } ) do
461+ { :suspended , [ x | left_acc ] , left_fun } ->
462+ case right_fun . ( { :cont , right_acc } ) do
463+ { :suspended , [ y | right_acc ] , right_fun } ->
464+ do_zip ( left_fun , left_acc , right_fun , right_acc , callback . ( { x , y } , acc ) , callback )
465+ { reason , _ } ->
466+ { reason , acc }
467+ end
468+ { reason , _ } ->
469+ { reason , acc }
425470 end
426471 end
427472
473+ defp do_zip_step ( x , acc ) do
474+ { :suspend , [ x | acc ] }
475+ end
476+
428477 ## Sources
429478
430479 @ doc """
@@ -438,7 +487,7 @@ defmodule Stream do
438487 [1,2,3,1,2]
439488
440489 """
441- @ spec cycle ( Enumerable . t ) :: t
490+ @ spec cycle ( Enumerable . t ) :: Enumerable . t
442491 def cycle ( enumerable ) do
443492 fn acc , fun ->
444493 reduce = & Enumerable . reduce ( enumerable , & 1 , fun )
@@ -475,7 +524,7 @@ defmodule Stream do
475524 [0,1,2,3,4]
476525
477526 """
478- @ spec iterate ( element , ( element -> element ) ) :: t
527+ @ spec iterate ( element , ( element -> element ) ) :: Enumerable . t
479528 def iterate ( start_value , next_fun ) do
480529 unfold ( { :ok , start_value } , fn
481530 { :ok , value } ->
@@ -495,7 +544,7 @@ defmodule Stream do
495544 [0.4435846174457203, 0.7230402056221108, 0.94581636451987]
496545
497546 """
498- @ spec repeatedly ( ( ( ) -> element ) ) :: t
547+ @ spec repeatedly ( ( ( ) -> element ) ) :: Enumerable . t
499548 def repeatedly ( generator_fun ) when is_function ( generator_fun , 0 ) do
500549 & do_repeatedly ( generator_fun , & 1 , & 2 )
501550 end
@@ -539,7 +588,7 @@ defmodule Stream do
539588 fn file -> File.close!(file) end)
540589
541590 """
542- @ spec resource ( ( ( ) -> acc ) , ( acc -> { element , acc } | nil ) , ( acc -> term ) ) :: t
591+ @ spec resource ( ( ( ) -> acc ) , ( acc -> { element , acc } | nil ) , ( acc -> term ) ) :: Enumerable . t
543592 def resource ( start_fun , next_fun , after_fun ) do
544593 fn acc , fun ->
545594 next_acc = start_fun . ( )
@@ -563,7 +612,7 @@ defmodule Stream do
563612 iex> Stream.unfold(5, fn 0 -> nil; n -> {n, n-1} end) |> Enum.to_list()
564613 [5, 4, 3, 2, 1]
565614 """
566- @ spec unfold ( acc , ( acc -> { element , acc } | nil ) ) :: t
615+ @ spec unfold ( acc , ( acc -> { element , acc } | nil ) ) :: Enumerable . t
567616 def unfold ( next_acc , next_fun ) do
568617 & do_unfold ( next_acc , next_fun , & 1 , & 2 )
569618 end
0 commit comments