@@ -83,7 +83,7 @@ defmodule Stream do
8383 like `Stream.cycle/1`, `Stream.unfold/2`, `Stream.resource/3` and more.
8484 """
8585
86- defrecord Lazy , enum: nil , funs: [ ] , accs: [ ]
86+ defrecord Lazy , enum: nil , funs: [ ] , accs: [ ] , done: nil
8787
8888 defimpl Enumerable , for: Lazy do
8989 @ compile :inline_list_funs
@@ -103,25 +103,36 @@ defmodule Stream do
103103 { :error , __MODULE__ }
104104 end
105105
106- defp do_reduce ( Lazy [ enum : enum , funs: funs , accs: accs ] , acc , fun ) do
106+ defp do_reduce ( Lazy [ enum : enum , funs: funs , accs: accs , done: done ] , acc , fun ) do
107107 composed = :lists . foldl ( fn fun , acc -> fun . ( acc ) end , fun , funs )
108- do_each ( & Enumerable . reduce ( enum , & 1 , composed ) , :lists . reverse ( accs ) , acc )
108+ do_each ( & Enumerable . reduce ( enum , & 1 , composed ) , done && { done , fun } , :lists . reverse ( accs ) , acc )
109109 end
110110
111- defp do_each ( _reduce , _accs , { :halt , acc } ) do
111+ defp do_each ( _reduce , _done , _accs , { :halt , acc } ) do
112112 { :halted , acc }
113113 end
114114
115- defp do_each ( reduce , accs , { :suspend , acc } ) do
116- { :suspended , acc , & do_each ( reduce , accs , & 1 ) }
115+ defp do_each ( reduce , done , accs , { :suspend , acc } ) do
116+ { :suspended , acc , & do_each ( reduce , done , accs , & 1 ) }
117117 end
118118
119- defp do_each ( reduce , accs , { :cont , acc } ) do
119+ defp do_each ( reduce , done , accs , { :cont , acc } ) do
120120 case reduce . ( { :cont , [ acc | accs ] } ) do
121- { reason , [ acc | _ ] } ->
122- { reason , acc }
123121 { :suspended , [ acc | accs ] , continuation } ->
124- { :suspended , acc , & do_each ( continuation , accs , & 1 ) }
122+ { :suspended , acc , & do_each ( continuation , done , accs , & 1 ) }
123+ { :halted , [ acc | _ ] } ->
124+ { :halted , acc }
125+ { :done , [ acc | _ ] = accs } ->
126+ case done do
127+ nil ->
128+ { :done , acc }
129+ { done , fun } ->
130+ case done . ( fun ) . ( accs ) do
131+ { :cont , [ acc | _ ] } -> { :done , acc }
132+ { :halt , [ acc | _ ] } -> { :halted , acc }
133+ { :suspend , [ acc | _ ] } -> { :suspended , acc , & ( { :done , & 1 |> elem ( 1 ) } ) }
134+ end
135+ end
125136 end
126137 end
127138 end
@@ -151,6 +162,33 @@ defmodule Stream do
151162
152163 ## Transformers
153164
165+ @ doc """
166+ Chunk the `enum` by buffering elements for which `fun` returns
167+ the same value and only emit them when `fun` returns a new value
168+ or the `enum` finishes,
169+
170+ ## Examples
171+
172+ iex> stream = Stream.chunks_by([1, 2, 2, 3, 4, 4, 6, 7, 7], &(rem(&1, 2) == 1))
173+ iex> Enum.to_list(stream)
174+ [[1], [2, 2], [3], [4, 4, 6], [7, 7]]
175+
176+ """
177+ @ spec chunks_by ( Enumerable . t , ( element -> any ) ) :: Enumerable . t
178+ def chunks_by ( enum , fun ) do
179+ lazy enum , nil ,
180+ fn ( f1 ) -> R . chunks_by ( fun , f1 ) end ,
181+ fn ( f1 ) -> & do_chunks_by ( & 1 , f1 ) end
182+ end
183+
184+ defp do_chunks_by ( acc ( _ , nil , _ ) = acc , _f1 ) do
185+ { :cont , acc }
186+ end
187+
188+ defp do_chunks_by ( acc ( h , { buffer , _ } , t ) , f1 ) do
189+ cont_with_acc ( f1 , :lists . reverse ( buffer ) , h , nil , t )
190+ end
191+
154192 @ doc """
155193 Lazily drops the next `n` items from the enumerable.
156194
@@ -720,23 +758,20 @@ defmodule Stream do
720758
721759 ## Helpers
722760
723- @ compile { :inline , lazy: 2 , lazy: 3 }
761+ @ compile { :inline , lazy: 2 , lazy: 3 , lazy: 4 }
724762
725- defp lazy ( enum , fun ) do
726- case enum do
727- Lazy [ funs : funs ] = lazy ->
728- lazy . funs ( [ fun | funs ] )
729- _ ->
730- Lazy [ enum : enum , funs: [ fun ] , accs: [ ] ]
731- end
732- end
763+ defp lazy ( Lazy [ funs : funs ] = lazy , fun ) ,
764+ do: lazy . funs ( [ fun | funs ] )
765+ defp lazy ( enum , fun ) ,
766+ do: Lazy [ enum : enum , funs: [ fun ] ]
733767
734- defp lazy ( enum , acc , fun ) do
735- case enum do
736- Lazy [ funs : funs , accs: accs ] = lazy ->
737- lazy . funs ( [ fun | funs ] ) . accs ( [ acc | accs ] )
738- _ ->
739- Lazy [ enum : enum , funs: [ fun ] , accs: [ acc ] ]
740- end
741- end
768+ defp lazy ( Lazy [ funs : funs , accs: accs ] = lazy , acc , fun ) ,
769+ do: lazy . funs ( [ fun | funs ] ) . accs ( [ acc | accs ] )
770+ defp lazy ( enum , acc , fun ) ,
771+ do: Lazy [ enum : enum , funs: [ fun ] , accs: [ acc ] ]
772+
773+ defp lazy ( Lazy [ done : nil , funs: funs , accs: accs ] = lazy , acc , fun , done ) ,
774+ do: lazy . funs ( [ fun | funs ] ) . accs ( [ acc | accs ] ) . done ( done )
775+ defp lazy ( enum , acc , fun , done ) ,
776+ do: Lazy [ enum : enum , funs: [ fun ] , accs: [ acc ] , done: done ]
742777end
0 commit comments