@@ -9,12 +9,12 @@ defmodule GenEvent.Stream do
99 * `:id` - the event stream id for cancellation
1010 * `:timeout` - the timeout in between events, defaults to `:infinity`
1111 * `:duration` - the duration of the subscription, defaults to `:infinity`
12- * `:mode` - if the subscription mode is sync or async, defaults to `:sync `
12+ * `:mode` - if the subscription mode is ack, sync or async, defaults to `:ack `
1313 """
14- defstruct manager: nil , id: nil , timeout: :infinity , duration: :infinity , mode: :sync
14+ defstruct manager: nil , id: nil , timeout: :infinity , duration: :infinity , mode: :ack
1515
1616 @ typedoc "The stream mode"
17- @ type mode :: :sync | :async
17+ @ type mode :: :ack | : sync | :async
1818
1919 @ type t :: % __MODULE__ {
2020 manager: GenEvent . manager ,
@@ -277,7 +277,7 @@ defmodule GenEvent do
277277 The stream is a `GenEvent` struct that implements the `Enumerable`
278278 protocol. Consumption of events only begins when enumeration starts.
279279
280- `The supported options are:
280+ ## Options
281281
282282 * `:id` - an id to identify all live stream instances; when an `:id` is
283283 given, existing streams can be called with via `cancel_streams`.
@@ -287,9 +287,23 @@ defmodule GenEvent do
287287 * `:duration` (Enumerable) - only consume events during the X milliseconds
288288 from the streaming start.
289289
290- * `:mode` - the mode to consume events, can be `:sync` (default) or
291- `:async`. On sync, the event manager waits for the event to be consumed
292- before moving on to the next event handler.
290+ * `:mode` - the mode to consume events, can be `:ack` (default), `:sync`
291+ or `:async`. See modes below.
292+
293+ ## Modes
294+
295+ GenEvent stream supports three different modes.
296+
297+ On `:ack`, the stream acknowledges each event, providing back pressure,
298+ but processing of the message happens asynchronously, allowing the event
299+ manager to move on to the next handler as soon as the event is
300+ acknowledged.
301+
302+ On `:sync`, the event manager waits for the event to be consumed
303+ before moving on to the next event handler.
304+
305+ On `:async`, all events are processed asynchronously but there is no
306+ ack (which means there is no backpressure).
293307
294308 """
295309 @ spec stream ( manager , Keyword . t ) :: GenEvent.Stream . t
@@ -299,7 +313,7 @@ defmodule GenEvent do
299313 id: Keyword . get ( options , :id ) ,
300314 timeout: Keyword . get ( options , :timeout , :infinity ) ,
301315 duration: Keyword . get ( options , :duration , :infinity ) ,
302- mode: Keyword . get ( options , :mode , :sync ) }
316+ mode: Keyword . get ( options , :mode , :ack ) }
303317 end
304318
305319 @ doc """
@@ -478,7 +492,7 @@ defimpl Enumerable, for: GenEvent.Stream do
478492 end
479493
480494 @ doc false
481- def handle_event ( event , { :sync , mon_pid , pid , ref } = state ) do
495+ def handle_event ( event , { mode , mon_pid , pid , ref } = state ) when mode in [ :sync , :ack ] do
482496 sync = Process . monitor ( mon_pid )
483497 send pid , { ref , sync , event }
484498 receive do
@@ -515,11 +529,17 @@ defimpl Enumerable, for: GenEvent.Stream do
515529
516530 defp wrap_reducer ( fun ) do
517531 fn
518- { nil , _manager , event } , acc ->
519- fun . ( event , acc )
520- { ref , manager , event } , acc ->
532+ { :ack , ref , manager , event } , acc ->
521533 send manager , { ref , :next }
522534 fun . ( event , acc )
535+ { :async , _ , _manager , event } , acc ->
536+ fun . ( event , acc )
537+ { :sync , ref , manager , event } , acc ->
538+ try do
539+ fun . ( event , acc )
540+ after
541+ send manager , { ref , :next }
542+ end
523543 end
524544 end
525545
@@ -548,7 +568,7 @@ defimpl Enumerable, for: GenEvent.Stream do
548568 send ( self ( ) , { :DOWN , mon_ref , :process , mon_pid , :normal } )
549569 exit ( { reason , { __MODULE__ , :next , [ stream , acc ] } } )
550570 { ^ mon_ref , sync_ref , event } ->
551- { { sync_ref , manager_pid , event } , acc }
571+ { { stream . mode , sync_ref , manager_pid , event } , acc }
552572 after
553573 timeout ->
554574 exit ( { :timeout , { __MODULE__ , :next , [ stream , acc ] } } )
0 commit comments