44import logging
55
66from gevent .monkey import get_original
7+ from gevent .lock import Semaphore
78
89# Guarantee that Task threads will always be proper system threads, regardless of Gevent patches
910Event = get_original ("threading" , "Event" )
@@ -19,6 +20,7 @@ class ClientEvent(object):
1920
2021 def __init__ (self ):
2122 self .events = {}
23+ self ._setting_lock = Semaphore (value = 1 )
2224
2325 def wait (self , timeout : int = 5 ):
2426 """Wait for the next data frame (invoked from each client's thread)."""
@@ -40,23 +42,24 @@ def wait(self, timeout: int = 5):
4042
4143 def set (self , timeout = 5 ):
4244 """Signal that a new frame is available."""
43- now = time .time ()
44- remove = None
45- for ident , event in self .events .items ():
46- if not event [0 ].is_set ():
47- # if this client's event is not set, then set it
48- # also update the last set timestamp to now
49- event [0 ].set ()
50- event [1 ] = now
51- else :
52- # if the client's event is already set, it means the client
53- # did not process a previous frame
54- # if the event stays set for more than `timeout` seconds, then assume
55- # the client is gone and remove it
56- if now - event [1 ] >= timeout :
57- remove = ident
58- if remove :
59- del self .events [remove ]
45+ with self ._setting_lock :
46+ now = time .time ()
47+ remove_keys = set ()
48+ for ident , event in self .events .items ():
49+ if not event [0 ].is_set ():
50+ # if this client's event is not set, then set it
51+ # also update the last set timestamp to now
52+ event [0 ].set ()
53+ event [1 ] = now
54+ else :
55+ # if the client's event is already set, it means the client
56+ # did not process a previous frame
57+ # if the event stays set for more than `timeout` seconds, then
58+ # assume the client is gone and remove it
59+ if now - event [1 ] >= timeout :
60+ remove_keys .add (ident )
61+ if remove_keys :
62+ del self .events [ident ]
6063
6164 def clear (self ):
6265 """Clear frame event, once processed."""
0 commit comments