File tree Expand file tree Collapse file tree 4 files changed +15
-18
lines changed
main/java/org/neo4j/gds/maxflow
test/java/org/neo4j/gds/maxflow Expand file tree Collapse file tree 4 files changed +15
-18
lines changed Original file line number Diff line number Diff line change @@ -66,6 +66,15 @@ void batchPush(HugeLongArrayQueue queue) {
6666 }
6767 }
6868
69+ void batchPushAndConsume (HugeLongArrayQueue queue , Consumer <Long > consumer ) {
70+ long idx = size .getAndAdd (queue .size ());
71+ while (!queue .isEmpty ()) {
72+ var node = queue .remove ();
73+ workingSet .set (idx ++, node );
74+ consumer .accept (node );
75+ }
76+ }
77+
6978 long getAndAdd (long batchSize ) {
7079 return index .getAndAdd (batchSize );
7180 }
Original file line number Diff line number Diff line change @@ -84,8 +84,7 @@ public void run() {
8484 switch (phase ) {
8585 case PHASE .DISCHARGE -> dischargeWorkingSet ();
8686 case PHASE .SYNC_WORKING_SET -> syncWorkingSet ();
87- case PHASE .UPDATE_WORKING_SET -> updateWorkingSet ();
88- case PHASE .SYNC_NEW_WORKING_SET -> syncNewWorkingSet ();
87+ case PHASE .UPDATE_WORKING_SET -> updateAndSyncNewWorkingSet ();
8988 }
9089 }
9190
@@ -178,24 +177,18 @@ void syncWorkingSet() {
178177 phase = PHASE .UPDATE_WORKING_SET ;
179178 }
180179
181- void updateWorkingSet () {
182- workingSet .batchPush (localDiscoveredVertices );
183- phase = PHASE .SYNC_NEW_WORKING_SET ;
184- }
185-
186- void syncNewWorkingSet () {
187- batchConsumeWorkingSet ((v ) -> {
180+ void updateAndSyncNewWorkingSet () {
181+ workingSet .batchPushAndConsume (localDiscoveredVertices , (v ) -> {
188182 excess .addTo (v , addedExcess .get (v ));
189183 addedExcess .set (v , 0 );
190184 isDiscovered .clear (v );
191185 });
192- phase = PHASE .UPDATE_WORKING_SET ;
186+ phase = PHASE .DISCHARGE ;
193187 }
194188
195189 enum PHASE {
196190 DISCHARGE ,
197191 SYNC_WORKING_SET ,
198192 UPDATE_WORKING_SET ,
199- SYNC_NEW_WORKING_SET
200193 }
201194}
Original file line number Diff line number Diff line change @@ -67,11 +67,7 @@ public static void processWorkingSet(
6767 RunWithConcurrency .builder ().concurrency (concurrency ).tasks (dischargeTasks ).build ().run ();
6868 workingSet .reset ();
6969
70- //Update working set
70+ //Update and sync new working set
7171 RunWithConcurrency .builder ().concurrency (concurrency ).tasks (dischargeTasks ).build ().run ();
72-
73- //Sync new working set
74- RunWithConcurrency .builder ().concurrency (concurrency ).tasks (dischargeTasks ).build ().run ();
75- workingSet .resetIdx ();
7672 }
7773}
Original file line number Diff line number Diff line change @@ -110,9 +110,8 @@ void discharge() {
110110 assertThat (excess .get (graph .toMappedNodeId ("a" ))).isEqualTo (0D );
111111 assertThat (excess .get (graph .toMappedNodeId ("c" ))).isEqualTo (8D );
112112
113- task .updateWorkingSet ();
113+ task .updateAndSyncNewWorkingSet ();
114114
115- task .syncNewWorkingSet ();
116115 workingSet .resetIdx ();
117116
118117 assertThat (label .get (graph .toMappedNodeId ("a" ))).isEqualTo (2L );
You can’t perform that action at this time.
0 commit comments