Skip to content

Commit 2e2545f

Browse files
committed
Remove contention on hasSendMessage atomic boolean
1 parent 620ad77 commit 2e2545f

File tree

5 files changed

+22
-22
lines changed

5 files changed

+22
-22
lines changed

pregel/src/main/java/org/neo4j/gds/beta/pregel/ForkJoinComputeStep.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,9 @@ public final class ForkJoinComputeStep<
4343

4444
private final InitFunction<CONFIG, INIT_CONTEXT> initFunction;
4545
private final ComputeFunction<CONFIG, COMPUTE_CONTEXT> computeFunction;
46-
private final Supplier<INIT_CONTEXT> initContext;
47-
private final Supplier<COMPUTE_CONTEXT> computeContext;
46+
private final Supplier<INIT_CONTEXT> initContextSupplier;
47+
private final Supplier<COMPUTE_CONTEXT> computeContextSupplier;
48+
private final COMPUTE_CONTEXT computeContext;
4849
private final NodeValue nodeValue;
4950
private final HugeAtomicBitSet voteBits;
5051
private final Messenger<ITERATOR> messenger;
@@ -56,8 +57,8 @@ public final class ForkJoinComputeStep<
5657
ForkJoinComputeStep(
5758
InitFunction<CONFIG, INIT_CONTEXT> initFunction,
5859
ComputeFunction<CONFIG, COMPUTE_CONTEXT> computeFunction,
59-
Supplier<INIT_CONTEXT> initContext,
60-
Supplier<COMPUTE_CONTEXT> computeContext,
60+
Supplier<INIT_CONTEXT> initContextSupplier,
61+
Supplier<COMPUTE_CONTEXT> computeContextSupplier,
6162
MutableInt iteration,
6263
Partition nodeBatch,
6364
NodeValue nodeValue,
@@ -70,15 +71,16 @@ public final class ForkJoinComputeStep<
7071
super(parent);
7172
this.initFunction = initFunction;
7273
this.computeFunction = computeFunction;
73-
this.initContext = initContext;
74-
this.computeContext = computeContext;
74+
this.initContextSupplier = initContextSupplier;
75+
this.computeContextSupplier = computeContextSupplier;
7576
this.iteration = iteration;
7677
this.voteBits = voteBits;
7778
this.nodeBatch = nodeBatch;
7879
this.nodeValue = nodeValue;
7980
this.messenger = messenger;
8081
this.hasSentMessage = sentMessage;
8182
this.progressTracker = progressTracker;
83+
this.computeContext = computeContextSupplier.get();
8284
}
8385

8486
@Override
@@ -99,8 +101,8 @@ public void compute() {
99101
var leftTask = new ForkJoinComputeStep<>(
100102
initFunction,
101103
computeFunction,
102-
initContext,
103-
computeContext,
104+
initContextSupplier,
105+
computeContextSupplier,
104106
iteration,
105107
leftBatch,
106108
nodeValue,
@@ -119,6 +121,7 @@ public void compute() {
119121
this.compute();
120122
} else {
121123
computeBatch();
124+
hasSentMessage.set(computeContext.hasSentMessage());
122125
tryComplete();
123126
}
124127
}
@@ -155,12 +158,12 @@ public Partition nodeBatch() {
155158

156159
@Override
157160
public INIT_CONTEXT initContext() {
158-
return initContext.get();
161+
return initContextSupplier.get();
159162
}
160163

161164
@Override
162165
public COMPUTE_CONTEXT computeContext() {
163-
return computeContext.get();
166+
return computeContext;
164167
}
165168

166169
@Override

pregel/src/main/java/org/neo4j/gds/beta/pregel/ForkJoinComputer.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@ void release() {
108108
messenger,
109109
voteBits,
110110
iteration,
111-
hasSentMessages,
112111
progressTracker
113112
);
114113

@@ -149,7 +148,6 @@ void release() {
149148
messenger,
150149
voteBits,
151150
iteration,
152-
hasSentMessages,
153151
progressTracker
154152
);
155153

pregel/src/main/java/org/neo4j/gds/beta/pregel/PartitionedComputeStep.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ public final class PartitionedComputeStep<
7878
@Override
7979
public void run() {
8080
computeBatch();
81+
hasSentMessage.set(computeContext().hasSentMessage());
8182
}
8283

8384
@Override

pregel/src/main/java/org/neo4j/gds/beta/pregel/PartitionedComputer.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@ void release() {
154154
messenger,
155155
voteBits,
156156
iteration,
157-
hasSentMessages,
158157
progressTracker
159158
);
160159

@@ -197,7 +196,6 @@ void release() {
197196
messenger,
198197
voteBits,
199198
iteration,
200-
hasSentMessages,
201199
progressTracker
202200
);
203201

pregel/src/main/java/org/neo4j/gds/beta/pregel/context/ComputeContext.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
import org.neo4j.gds.core.utils.paged.HugeAtomicBitSet;
2929
import org.neo4j.gds.core.utils.progress.tasks.ProgressTracker;
3030

31-
import java.util.concurrent.atomic.AtomicBoolean;
32-
3331
/**
3432
* A context that is used during the computation. It allows an implementation
3533
* to send messages to other nodes and change the state of the currently
@@ -41,7 +39,8 @@ public class ComputeContext<CONFIG extends PregelConfig> extends NodeCentricCont
4139

4240
private final Messenger<?> messenger;
4341
private final MutableInt iteration;
44-
private final AtomicBoolean hasSendMessage;
42+
private boolean hasSendMessage;
43+
4544
protected BasePregelComputation<CONFIG> computation;
4645

4746
public ComputeContext(Graph graph,
@@ -51,7 +50,6 @@ public ComputeContext(Graph graph,
5150
Messenger<?> messenger,
5251
HugeAtomicBitSet voteBits,
5352
MutableInt iteration,
54-
AtomicBoolean hasSendMessage,
5553
ProgressTracker progressTracker) {
5654
super(graph, config, nodeValue, progressTracker);
5755
this.computation = computation;
@@ -61,7 +59,7 @@ public ComputeContext(Graph graph,
6159
this.messenger = messenger;
6260
this.voteBits = voteBits;
6361
this.iteration = iteration;
64-
this.hasSendMessage = hasSendMessage;
62+
this.hasSendMessage = false;
6563
}
6664

6765
private final SendMessagesFunction sendMessagesFunction;
@@ -152,7 +150,7 @@ public void sendToNeighbors(double message) {
152150
*/
153151
public void sendTo(long targetNodeId, double message) {
154152
messenger.sendTo(targetNodeId, message);
155-
this.hasSendMessage.set(true);
153+
this.hasSendMessage = true;
156154
}
157155

158156
private void sendToNeighbors(long sourceNodeId, double message) {
@@ -169,6 +167,10 @@ private void sendToNeighborsWeighted(long sourceNodeId, double message) {
169167
});
170168
}
171169

170+
public boolean hasSentMessage() {
171+
return hasSendMessage;
172+
}
173+
172174
@FunctionalInterface
173175
interface SendMessagesFunction {
174176
void sendToNeighbors(long sourceNodeId, double message);
@@ -186,7 +188,6 @@ public BidirectionalComputeContext(
186188
Messenger<?> messenger,
187189
HugeAtomicBitSet voteBits,
188190
MutableInt iteration,
189-
AtomicBoolean hasSendMessage,
190191
ProgressTracker progressTracker
191192
) {
192193
super(
@@ -197,7 +198,6 @@ public BidirectionalComputeContext(
197198
messenger,
198199
voteBits,
199200
iteration,
200-
hasSendMessage,
201201
progressTracker
202202
);
203203

0 commit comments

Comments
 (0)