22
33use Carbon \Carbon ;
44use Illuminate \Queue \DatabaseQueue ;
5+ use Illuminate \Queue \Jobs \DatabaseJob ;
6+ use MongoDB \Operation \FindOneAndUpdate ;
7+ use DB ;
58
69class MongoQueue extends DatabaseQueue
710{
811 /**
9- * Get the next available job for the queue.
12+ * Pop the next job off of the queue.
13+ *
14+ * @param string $queue
15+ *
16+ * @return \Illuminate\Contracts\Queue\Job|null
17+ */
18+ public function pop ($ queue = null )
19+ {
20+ $ queue = $ this ->getQueue ($ queue );
21+
22+ if (!is_null ($ this ->expire ))
23+ {
24+ $ this ->releaseJobsThatHaveBeenReservedTooLong ($ queue );
25+ }
26+
27+ if ($ job = $ this ->getNextAvailableJobAndReserve ($ queue ))
28+ {
29+ return new DatabaseJob (
30+ $ this ->container , $ this , $ job , $ queue
31+ );
32+ }
33+ }
34+
35+ /**
36+ * Get the next available job for the queue and mark it as reserved.
37+ *
38+ * When using multiple daemon queue listeners to process jobs there
39+ * is a possibility that multiple processes can end up reading the
40+ * same record before one has flagged it as reserved.
41+ *
42+ * This race condition can result in random jobs being run more then
43+ * once. To solve this we use findOneAndUpdate to lock the next jobs
44+ * record while flagging it as reserved at the same time.
45+ *
46+ * @param string|null $queue
1047 *
11- * @param string|null $queue
1248 * @return \StdClass|null
1349 */
14- protected function getNextAvailableJob ($ queue )
50+ protected function getNextAvailableJobAndReserve ($ queue )
1551 {
16- $ job = $ this ->database ->table ($ this ->table )
17- ->lockForUpdate ()
18- ->where ('queue ' , $ this ->getQueue ($ queue ))
19- ->where ('reserved ' , 0 )
20- ->where ('available_at ' , '<= ' , $ this ->getTime ())
21- ->orderBy ('id ' , 'asc ' )
22- ->first ();
52+ $ job = DB ::getCollection ($ this ->table )->findOneAndUpdate (
53+ [
54+ 'queue ' => $ this ->getQueue ($ queue ),
55+ 'reserved ' => 0 ,
56+ 'available_at ' => ['$lte ' => $ this ->getTime ()],
2357
24- if ($ job ) {
25- $ job = (object ) $ job ;
58+ ],
59+ [
60+ '$set ' => [
61+ 'reserved ' => 1 ,
62+ 'reserved_at ' => $ this ->getTime (),
63+ ],
64+ ],
65+ [
66+ 'returnNewDocument ' => true ,
67+ 'sort ' => ['available_at ' => 1 ],
68+ ]
69+ );
70+
71+ if ($ job )
72+ {
2673 $ job ->id = $ job ->_id ;
2774 }
2875
29- return $ job ?: null ;
76+ return $ job ;
3077 }
3178
3279 /**
@@ -40,16 +87,16 @@ protected function releaseJobsThatHaveBeenReservedTooLong($queue)
4087 $ expired = Carbon::now ()->subSeconds ($ this ->expire )->getTimestamp ();
4188
4289 $ reserved = $ this ->database ->collection ($ this ->table )
43- ->where ('queue ' , $ this ->getQueue ($ queue ))
44- ->where ('reserved ' , 1 )
45- ->where ('reserved_at ' , '<= ' , $ expired )->get ();
90+ ->where ('queue ' , $ this ->getQueue ($ queue ))
91+ ->where ('reserved ' , 1 )
92+ ->where ('reserved_at ' , '<= ' , $ expired )->get ();
4693
4794 foreach ($ reserved as $ job ) {
4895 $ attempts = $ job ['attempts ' ] + 1 ;
4996 $ this ->releaseJob ($ job ['_id ' ], $ attempts );
5097 }
5198 }
52-
99+
53100 /**
54101 * Release the given job ID from reservation.
55102 *
0 commit comments