@@ -145,6 +145,7 @@ defmodule Task.Supervised do
145145 def stream ( enumerable , acc , reducer , mfa , options , spawn ) do
146146 next = & Enumerable . reduce ( enumerable , & 1 , fn x , acc -> { :suspend , [ x | acc ] } end )
147147 max_concurrency = Keyword . get ( options , :max_concurrency , System . schedulers_online )
148+ ordered? = Keyword . get ( options , :ordered , true )
148149 timeout = Keyword . get ( options , :timeout , 5000 )
149150 on_timeout = Keyword . get ( options , :on_timeout , :exit )
150151 parent = self ( )
@@ -168,6 +169,7 @@ defmodule Task.Supervised do
168169 reducer: reducer ,
169170 monitor_pid: monitor_pid ,
170171 monitor_ref: monitor_ref ,
172+ ordered: ordered? ,
171173 timeout: timeout ,
172174 on_timeout: on_timeout ,
173175 }
@@ -200,44 +202,51 @@ defmodule Task.Supervised do
200202 when max == 0
201203 when next == :done do
202204 % { monitor_pid: monitor_pid , monitor_ref: monitor_ref ,
203- timeout: timeout , on_timeout: on_timeout } = config
205+ timeout: timeout , on_timeout: on_timeout , ordered: ordered? } = config
204206
205207 receive do
206208 # The task at position "position" replied with "value". We put the
207209 # response in the "waiting" map and do nothing, since we'll only act on
208210 # this response when the replying task dies (we'll notice in the :down
209211 # message).
210- { { ^ monitor_ref , position } , value } ->
211- % { ^ position => { pid , :running } } = waiting
212- waiting = Map . put ( waiting , position , { pid , { :ok , value } } )
212+ { { ^ monitor_ref , position } , reply } ->
213+ % { ^ position => { pid , :running , element } } = waiting
214+ waiting = Map . put ( waiting , position , { pid , { :ok , reply } , element } )
213215 stream_reduce ( { :cont , acc } , max , spawned , delivered , waiting , next , config )
214216
215217 # The task at position "position" died for some reason. We check if it
216218 # replied already (then the death is peaceful) or if it's still running
217219 # (then the reply from this task will be {:exit, reason}). This message is
218220 # sent to us by the monitor process, not by the dying task directly.
219221 { kind , { ^ monitor_ref , position } , reason } when kind in [ :down , :timed_out ] ->
220- waiting =
222+ result =
221223 case waiting do
222224 # If the task replied, we don't care whether it went down for timeout
223225 # or for normal reasons.
224- % { ^ position => { _ , { :ok , _ } = ok } } ->
225- Map . put ( waiting , position , { nil , ok } )
226+ % { ^ position => { _ , { :ok , _ } = ok , _ } } ->
227+ ok
226228 # If the task exited by itself before replying, we emit {:exit, reason}.
227- % { ^ position => { _ , :running } } when kind == :down ->
228- Map . put ( waiting , position , { nil , { :exit , reason } } )
229+ % { ^ position => { _ , :running , element } } when kind == :down ->
230+ { :exit , reason , element }
229231 # If the task timed out before replying, we either exit (on_timeout: :exit)
230232 # or emit {:exit, :timeout} (on_timeout: :kill_task) (note the task is already
231233 # dead at this point).
232- % { ^ position => { _ , :running } } when kind == :timed_out ->
234+ % { ^ position => { _ , :running , element } } when kind == :timed_out ->
233235 if on_timeout == :exit do
234236 stream_cleanup_inbox ( monitor_pid , monitor_ref )
235- exit ( { :timeout , { __MODULE__ , :stream , [ timeout ] } } )
237+ exit ( { :timeout , { __MODULE__ , :stream , [ element , timeout ] } } )
236238 else
237- Map . put ( waiting , position , { nil , { :exit , :timeout } } )
239+ { :exit , :timeout , element }
238240 end
239241 end
240- stream_deliver ( { :cont , acc } , max + 1 , spawned , delivered , waiting , next , config )
242+
243+ if ordered? do
244+ waiting = Map . put ( waiting , position , { :done , result } )
245+ stream_deliver ( { :cont , acc } , max + 1 , spawned , delivered , waiting , next , config )
246+ else
247+ pair = deliver_now ( result , acc , next , config )
248+ stream_reduce ( pair , max + 1 , spawned , delivered + 1 , waiting , next , config )
249+ end
241250
242251 # The monitor process died. We just cleanup the messages from the monitor
243252 # process and exit.
@@ -269,6 +278,20 @@ defmodule Task.Supervised do
269278 end
270279 end
271280
281+ defp deliver_now ( reply , acc , next , config ) do
282+ % { reducer: reducer , monitor_pid: monitor_pid ,
283+ monitor_ref: monitor_ref , timeout: timeout } = config
284+ try do
285+ reducer . ( reply , acc )
286+ catch
287+ kind , reason ->
288+ stacktrace = System . stacktrace ( )
289+ is_function ( next ) && next . ( { :halt , [ ] } )
290+ stream_close ( monitor_pid , monitor_ref , timeout )
291+ :erlang . raise ( kind , reason , stacktrace )
292+ end
293+ end
294+
272295 defp stream_deliver ( { :suspend , acc } , max , spawned , delivered , waiting , next , config ) do
273296 continuation = & stream_deliver ( & 1 , max , spawned , delivered , waiting , next , config )
274297 { :suspended , acc , continuation }
@@ -283,7 +306,7 @@ defmodule Task.Supervised do
283306 monitor_ref: monitor_ref , timeout: timeout } = config
284307
285308 case waiting do
286- % { ^ delivered => { nil , reply } } ->
309+ % { ^ delivered => { :done , reply } } ->
287310 try do
288311 reducer . ( reply , acc )
289312 catch
@@ -341,7 +364,7 @@ defmodule Task.Supervised do
341364 receive do
342365 { :spawned , { ^ monitor_ref , ^ spawned } , pid } ->
343366 send ( pid , { self ( ) , { monitor_ref , spawned } } )
344- Map . put ( waiting , spawned , { pid , :running } )
367+ Map . put ( waiting , spawned , { pid , :running , value } )
345368 { :DOWN , ^ monitor_ref , _ , ^ monitor_pid , reason } ->
346369 stream_cleanup_inbox ( monitor_pid , monitor_ref )
347370 exit ( { reason , { __MODULE__ , :stream , [ timeout ] } } )
0 commit comments