Skip to content

Commit ff797d0

Browse files
committed
Fix check for sendMessages for PartitionedComputer
1 parent 2e2545f commit ff797d0

File tree

5 files changed

+24
-15
lines changed

5 files changed

+24
-15
lines changed

pregel/src/main/java/org/neo4j/gds/beta/pregel/ForkJoinComputeStep.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public void compute() {
121121
this.compute();
122122
} else {
123123
computeBatch();
124-
hasSentMessage.set(computeContext.hasSentMessage());
124+
hasSentMessage.compareAndSet(false, computeContext.hasSentMessage());
125125
tryComplete();
126126
}
127127
}

pregel/src/main/java/org/neo4j/gds/beta/pregel/ForkJoinComputer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.neo4j.gds.core.utils.partition.Partition;
3131
import org.neo4j.gds.core.utils.progress.tasks.ProgressTracker;
3232

33+
import java.util.Optional;
3334
import java.util.concurrent.ForkJoinPool;
3435
import java.util.concurrent.atomic.AtomicBoolean;
3536
import java.util.function.Supplier;
@@ -108,6 +109,7 @@ void release() {
108109
messenger,
109110
voteBits,
110111
iteration,
112+
Optional.empty(),
111113
progressTracker
112114
);
113115

@@ -148,6 +150,7 @@ void release() {
148150
messenger,
149151
voteBits,
150152
iteration,
153+
Optional.empty(),
151154
progressTracker
152155
);
153156

pregel/src/main/java/org/neo4j/gds/beta/pregel/PartitionedComputeStep.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,14 @@
1919
*/
2020
package org.neo4j.gds.beta.pregel;
2121

22+
import org.apache.commons.lang3.mutable.MutableBoolean;
2223
import org.apache.commons.lang3.mutable.MutableInt;
2324
import org.neo4j.gds.beta.pregel.context.ComputeContext;
2425
import org.neo4j.gds.beta.pregel.context.InitContext;
2526
import org.neo4j.gds.core.utils.paged.HugeAtomicBitSet;
2627
import org.neo4j.gds.core.utils.partition.Partition;
2728
import org.neo4j.gds.core.utils.progress.tasks.ProgressTracker;
2829

29-
import java.util.concurrent.atomic.AtomicBoolean;
30-
3130
public final class PartitionedComputeStep<
3231
CONFIG extends PregelConfig,
3332
ITERATOR extends Messages.MessageIterator,
@@ -45,7 +44,7 @@ public final class PartitionedComputeStep<
4544
private final Messenger<ITERATOR> messenger;
4645

4746
private final MutableInt iteration;
48-
private final AtomicBoolean hasSentMessage;
47+
private final MutableBoolean hasSentMessage;
4948
private final NodeValue nodeValue;
5049

5150
PartitionedComputeStep(
@@ -58,7 +57,7 @@ public final class PartitionedComputeStep<
5857
Messenger<ITERATOR> messenger,
5958
HugeAtomicBitSet voteBits,
6059
MutableInt iteration,
61-
AtomicBoolean hasSentMessage,
60+
MutableBoolean hasSentMessage,
6261
ProgressTracker progressTracker
6362
) {
6463
this.initFunction = initFunction;
@@ -78,7 +77,6 @@ public final class PartitionedComputeStep<
7877
@Override
7978
public void run() {
8079
computeBatch();
81-
hasSentMessage.set(computeContext().hasSentMessage());
8280
}
8381

8482
@Override
@@ -128,10 +126,10 @@ public ProgressTracker progressTracker() {
128126

129127
void init(int iteration) {
130128
this.iteration.setValue(iteration);
131-
this.hasSentMessage.set(false);
129+
hasSentMessage.setValue(false);
132130
}
133131

134132
boolean hasSentMessage() {
135-
return hasSentMessage.get();
133+
return hasSentMessage.getValue();
136134
}
137135
}

pregel/src/main/java/org/neo4j/gds/beta/pregel/PartitionedComputer.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
*/
2020
package org.neo4j.gds.beta.pregel;
2121

22+
import org.apache.commons.lang3.mutable.MutableBoolean;
2223
import org.apache.commons.lang3.mutable.MutableInt;
2324
import org.jetbrains.annotations.NotNull;
2425
import org.neo4j.gds.api.Graph;
@@ -35,7 +36,6 @@
3536
import java.util.List;
3637
import java.util.Optional;
3738
import java.util.concurrent.ExecutorService;
38-
import java.util.concurrent.atomic.AtomicBoolean;
3939
import java.util.function.Function;
4040

4141
import static org.neo4j.gds.utils.StringFormatting.formatWithLocale;
@@ -137,7 +137,7 @@ void release() {
137137
Partition partition
138138
) {
139139
MutableInt iteration = new MutableInt(0);
140-
var hasSentMessages = new AtomicBoolean(false);
140+
var hasSentMessages = new MutableBoolean(false);
141141

142142
var initContext = new InitContext<>(
143143
graph,
@@ -154,6 +154,7 @@ void release() {
154154
messenger,
155155
voteBits,
156156
iteration,
157+
Optional.of(hasSentMessages),
157158
progressTracker
158159
);
159160

@@ -179,7 +180,7 @@ void release() {
179180
Partition partition
180181
) {
181182
MutableInt iteration = new MutableInt(0);
182-
var hasSentMessages = new AtomicBoolean(false);
183+
var hasSentMessages = new MutableBoolean(false);
183184

184185
var initContext = new BidirectionalInitContext<>(
185186
graph,
@@ -196,6 +197,7 @@ void release() {
196197
messenger,
197198
voteBits,
198199
iteration,
200+
Optional.of(hasSentMessages),
199201
progressTracker
200202
);
201203

pregel/src/main/java/org/neo4j/gds/beta/pregel/context/ComputeContext.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
*/
2020
package org.neo4j.gds.beta.pregel.context;
2121

22+
import org.apache.commons.lang3.mutable.MutableBoolean;
2223
import org.apache.commons.lang3.mutable.MutableInt;
2324
import org.neo4j.gds.api.Graph;
2425
import org.neo4j.gds.beta.pregel.BasePregelComputation;
@@ -28,6 +29,8 @@
2829
import org.neo4j.gds.core.utils.paged.HugeAtomicBitSet;
2930
import org.neo4j.gds.core.utils.progress.tasks.ProgressTracker;
3031

32+
import java.util.Optional;
33+
3134
/**
3235
* A context that is used during the computation. It allows an implementation
3336
* to send messages to other nodes and change the state of the currently
@@ -39,7 +42,7 @@ public class ComputeContext<CONFIG extends PregelConfig> extends NodeCentricCont
3942

4043
private final Messenger<?> messenger;
4144
private final MutableInt iteration;
42-
private boolean hasSendMessage;
45+
private final MutableBoolean hasSendMessage;
4346

4447
protected BasePregelComputation<CONFIG> computation;
4548

@@ -50,6 +53,7 @@ public ComputeContext(Graph graph,
5053
Messenger<?> messenger,
5154
HugeAtomicBitSet voteBits,
5255
MutableInt iteration,
56+
Optional<MutableBoolean> hasSendMessage,
5357
ProgressTracker progressTracker) {
5458
super(graph, config, nodeValue, progressTracker);
5559
this.computation = computation;
@@ -59,7 +63,7 @@ public ComputeContext(Graph graph,
5963
this.messenger = messenger;
6064
this.voteBits = voteBits;
6165
this.iteration = iteration;
62-
this.hasSendMessage = false;
66+
this.hasSendMessage = hasSendMessage.orElse(new MutableBoolean(false));
6367
}
6468

6569
private final SendMessagesFunction sendMessagesFunction;
@@ -150,7 +154,7 @@ public void sendToNeighbors(double message) {
150154
*/
151155
public void sendTo(long targetNodeId, double message) {
152156
messenger.sendTo(targetNodeId, message);
153-
this.hasSendMessage = true;
157+
this.hasSendMessage.setValue(true);
154158
}
155159

156160
private void sendToNeighbors(long sourceNodeId, double message) {
@@ -168,7 +172,7 @@ private void sendToNeighborsWeighted(long sourceNodeId, double message) {
168172
}
169173

170174
public boolean hasSentMessage() {
171-
return hasSendMessage;
175+
return hasSendMessage.getValue();
172176
}
173177

174178
@FunctionalInterface
@@ -188,6 +192,7 @@ public BidirectionalComputeContext(
188192
Messenger<?> messenger,
189193
HugeAtomicBitSet voteBits,
190194
MutableInt iteration,
195+
Optional<MutableBoolean> hasSendMessage,
191196
ProgressTracker progressTracker
192197
) {
193198
super(
@@ -198,6 +203,7 @@ public BidirectionalComputeContext(
198203
messenger,
199204
voteBits,
200205
iteration,
206+
hasSendMessage,
201207
progressTracker
202208
);
203209

0 commit comments

Comments
 (0)