Skip to content

Commit 8e73d84

Browse files
hindogIoannisPanagiotas
authored andcommitted
Allow termination in RandomWalk while filling buffer
1 parent af4c7e9 commit 8e73d84

File tree

1 file changed

+11
-3
lines changed

1 file changed

+11
-3
lines changed

algo/src/main/java/org/neo4j/gds/traversal/RandomWalk.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.concurrent.BlockingQueue;
3939
import java.util.concurrent.CompletableFuture;
4040
import java.util.concurrent.ExecutorService;
41+
import java.util.concurrent.TimeUnit;
4142
import java.util.concurrent.atomic.AtomicInteger;
4243
import java.util.concurrent.atomic.AtomicLong;
4344
import java.util.stream.Collectors;
@@ -178,7 +179,10 @@ private void tasksRunner(
178179
progressTracker.endSubTask("create walks");
179180

180181
try {
181-
walks.put(tombstone);
182+
boolean finished = false;
183+
while (!finished && terminationFlag.running()) {
184+
finished = walks.offer(tombstone, 100, TimeUnit.MILLISECONDS);
185+
}
182186
} catch (InterruptedException exception) {
183187
Thread.currentThread().interrupt();
184188
}
@@ -326,9 +330,13 @@ public void run() {
326330
private boolean flushBuffer(int bufferLength) {
327331
bufferLength = Math.min(bufferLength, this.buffer.length);
328332

329-
for (int i = 0; i < bufferLength && terminationFlag.running(); i++) {
333+
int i = 0;
334+
while (i < bufferLength && terminationFlag.running()) {
330335
try {
331-
walks.put(this.buffer[i]);
336+
// allow termination to occur if queue is full
337+
if (walks.offer(this.buffer[i], 100, TimeUnit.MILLISECONDS)) {
338+
i++;
339+
}
332340
} catch (InterruptedException e) {
333341
Thread.currentThread().interrupt();
334342
return false;

0 commit comments

Comments
 (0)