File tree Expand file tree Collapse file tree 2 files changed +38
-5
lines changed
main/java/org/neo4j/gds/traversal
test/java/org/neo4j/gds/traversal Expand file tree Collapse file tree 2 files changed +38
-5
lines changed Original file line number Diff line number Diff line change @@ -96,7 +96,7 @@ public Stream<long[]> compute() {
9696 ? new NextNodeSupplier .GraphNodeSupplier (graph .nodeCount ())
9797 : NextNodeSupplier .ListNodeSupplier .of (config , graph );
9898
99- var terminationFlag = new ExternalTerminationFlag (this . terminationFlag );
99+ var terminationFlag = new ExternalTerminationFlag (this );
100100
101101 BlockingQueue <long []> walks = new ArrayBlockingQueue <>(config .walkBufferSize ());
102102 long [] TOMB = new long [0 ];
@@ -198,15 +198,15 @@ private Stream<long[]> walksQueueConsumer(
198198
199199 private static final class ExternalTerminationFlag implements TerminationFlag {
200200 private volatile boolean running = true ;
201- private final TerminationFlag inner ;
201+ private final Algorithm <?> algo ;
202202
203- ExternalTerminationFlag (TerminationFlag inner ) {
204- this .inner = inner ;
203+ ExternalTerminationFlag (Algorithm <?> algo ) {
204+ this .algo = algo ;
205205 }
206206
207207 @ Override
208208 public boolean running () {
209- return this .running && this .inner .running ();
209+ return this .running && this .algo . getTerminationFlag () .running ();
210210 }
211211
212212 void stop () {
Original file line number Diff line number Diff line change 3636import org .neo4j .gds .compat .Neo4jProxy ;
3737import org .neo4j .gds .compat .TestLog ;
3838import org .neo4j .gds .core .concurrency .Pools ;
39+ import org .neo4j .gds .core .utils .TerminationFlag ;
3940import org .neo4j .gds .core .utils .progress .GlobalTaskStore ;
4041import org .neo4j .gds .core .utils .progress .TaskRegistryFactory ;
4142import org .neo4j .gds .core .utils .progress .tasks .ProgressTracker ;
@@ -411,6 +412,38 @@ void testWithConfiguredOffsetStartNodes() {
411412 .anyMatch (walk -> walk [0 ] == bInternalId );
412413 }
413414
415+ /**
416+ * Ensure that when termination flag is set externally, we terminate the walk
417+ * @throws InterruptedException
418+ */
419+ @ Test
420+ void testPartialReadMultipleRuns () {
421+ for (int i = 0 ; i < 3 ; i ++) {
422+ Node2VecStreamConfig config = ImmutableNode2VecStreamConfig .builder ()
423+ .walkBufferSize (1 )
424+ .build ();
425+
426+ var randomWalk = RandomWalk .create (
427+ graph ,
428+ config ,
429+ ProgressTracker .NULL_TRACKER ,
430+ Pools .DEFAULT
431+ );
432+
433+ var stream = randomWalk .compute ();
434+ long count = stream .limit (10 ).count ();
435+
436+ randomWalk .setTerminationFlag (new TerminationFlag () {
437+ @ Override
438+ public boolean running () {
439+ return false ;
440+ }
441+ });
442+
443+ assertEquals (10 , count );
444+ }
445+ }
446+
414447 @ Nested
415448 class ProgressTracking {
416449
You can’t perform that action at this time.
0 commit comments