11defmodule Stream.Reducers do
2- # Collection of reducers shared by Enum and Stream.
2+ # Collection of reducers and utilities shared by Enum and Stream.
33 @ moduledoc false
44
5- defmacro chunk ( amount , step , limit , fun \\ nil ) do
6- quote do
7- fn entry , acc ( head , { buffer , count } , tail ) ->
8- buffer = [ entry | buffer ]
9- count = count + 1
10-
11- new_state =
12- if count >= unquote ( limit ) do
13- left = count - unquote ( step )
14- { Enum . take ( buffer , left ) , left }
15- else
16- { buffer , count }
17- end
5+ def chunk ( chunk_by , enumerable , count , step , leftover ) do
6+ limit = :erlang . max ( count , step )
7+ chunk_by . ( enumerable , { [ ] , 0 } , fn entry , { acc_buffer , acc_count } ->
8+ acc_buffer = [ entry | acc_buffer ]
9+ acc_count = acc_count + 1
1810
19- if count == unquote ( amount ) do
20- next_with_acc ( unquote ( fun ) , :lists . reverse ( buffer ) , head , new_state , tail )
11+ new_state =
12+ if acc_count >= limit do
13+ remaining = acc_count - step
14+ { Enum . take ( acc_buffer , remaining ) , remaining }
2115 else
22- skip ( acc ( head , new_state , tail ) )
16+ { acc_buffer , acc_count }
2317 end
18+
19+ if acc_count == count do
20+ { :cont , :lists . reverse ( acc_buffer ) , new_state }
21+ else
22+ { :cont , new_state }
2423 end
25- end
24+ end , fn { acc_buffer , acc_count } ->
25+ if is_nil ( leftover ) || acc_count == 0 do
26+ { :cont , [ ] }
27+ else
28+ { :cont , :lists . reverse ( acc_buffer , Enum . take ( leftover , count - acc_count ) ) , [ ] }
29+ end
30+ end )
31+ end
32+
33+ def chunk_by ( chunk_by , enumerable , fun ) do
34+ chunk_by . ( enumerable , nil , fn
35+ entry , nil ->
36+ { :cont , { [ entry ] , fun . ( entry ) } }
37+ entry , { acc , value } ->
38+ case fun . ( entry ) do
39+ ^ value -> { :cont , { [ entry | acc ] , value } }
40+ new_value -> { :cont , :lists . reverse ( acc ) , { [ entry ] , new_value } }
41+ end
42+ end , fn
43+ nil -> { :cont , :done }
44+ { acc , _value } -> { :cont , :lists . reverse ( acc ) , :done }
45+ end )
2646 end
2747
2848 defmacro chunk_by ( callback , fun \\ nil ) do
@@ -38,7 +58,7 @@ defmodule Stream.Reducers do
3858
3959 defmacro dedup ( callback , fun \\ nil ) do
4060 quote do
41- fn ( entry , acc ( head , prev , tail ) = acc ) ->
61+ fn entry , acc ( head , prev , tail ) = acc ->
4262 value = unquote ( callback ) . ( entry )
4363 case prev do
4464 { :value , ^ value } -> skip ( acc )
@@ -84,7 +104,7 @@ defmodule Stream.Reducers do
84104
85105 defmacro filter ( callback , fun \\ nil ) do
86106 quote do
87- fn ( entry , acc ) ->
107+ fn entry , acc ->
88108 if unquote ( callback ) . ( entry ) do
89109 next ( unquote ( fun ) , entry , acc )
90110 else
@@ -96,7 +116,7 @@ defmodule Stream.Reducers do
96116
97117 defmacro filter_map ( filter , mapper , fun \\ nil ) do
98118 quote do
99- fn ( entry , acc ) ->
119+ fn entry , acc ->
100120 if unquote ( filter ) . ( entry ) do
101121 next ( unquote ( fun ) , unquote ( mapper ) . ( entry ) , acc )
102122 else
@@ -108,7 +128,7 @@ defmodule Stream.Reducers do
108128
109129 defmacro map ( callback , fun \\ nil ) do
110130 quote do
111- fn ( entry , acc ) ->
131+ fn entry , acc ->
112132 next ( unquote ( fun ) , unquote ( callback ) . ( entry ) , acc )
113133 end
114134 end
@@ -127,7 +147,7 @@ defmodule Stream.Reducers do
127147
128148 defmacro reject ( callback , fun \\ nil ) do
129149 quote do
130- fn ( entry , acc ) ->
150+ fn entry , acc ->
131151 unless unquote ( callback ) . ( entry ) do
132152 next ( unquote ( fun ) , entry , acc )
133153 else
@@ -151,7 +171,7 @@ defmodule Stream.Reducers do
151171
152172 defmacro scan3 ( callback , fun \\ nil ) do
153173 quote do
154- fn ( entry , acc ( head , acc , tail ) ) ->
174+ fn entry , acc ( head , acc , tail ) ->
155175 value = unquote ( callback ) . ( entry , acc )
156176 next_with_acc ( unquote ( fun ) , value , head , value , tail )
157177 end
@@ -160,7 +180,7 @@ defmodule Stream.Reducers do
160180
161181 defmacro take ( fun \\ nil ) do
162182 quote do
163- fn ( entry , acc ( head , curr , tail ) = original ) ->
183+ fn entry , acc ( head , curr , tail ) = original ->
164184 case curr do
165185 0 ->
166186 { :halt , original }
@@ -187,7 +207,7 @@ defmodule Stream.Reducers do
187207
188208 defmacro take_while ( callback , fun \\ nil ) do
189209 quote do
190- fn ( entry , acc ) ->
210+ fn entry , acc ->
191211 if unquote ( callback ) . ( entry ) do
192212 next ( unquote ( fun ) , entry , acc )
193213 else
@@ -199,7 +219,7 @@ defmodule Stream.Reducers do
199219
200220 defmacro uniq_by ( callback , fun \\ nil ) do
201221 quote do
202- fn ( entry , acc ( head , prev , tail ) = original ) ->
222+ fn entry , acc ( head , prev , tail ) = original ->
203223 value = unquote ( callback ) . ( entry )
204224 if Map . has_key? ( prev , value ) do
205225 skip ( original )
@@ -212,7 +232,7 @@ defmodule Stream.Reducers do
212232
213233 defmacro with_index ( fun \\ nil ) do
214234 quote do
215- fn ( entry , acc ( head , counter , tail ) ) ->
235+ fn entry , acc ( head , counter , tail ) ->
216236 next_with_acc ( unquote ( fun ) , { entry , counter } , head , counter + 1 , tail )
217237 end
218238 end
0 commit comments