Skip to content

Commit 9a53a98

Browse files
Register requested concurrency before running algorithm
Co-authored-by: Ioannis Panagiotas <ioannis.panagiotas@neotechnology.com>
1 parent 84ea16a commit 9a53a98

File tree

13 files changed

+370
-77
lines changed

13 files changed

+370
-77
lines changed

applications/algorithms/centrality/src/main/java/org/neo4j/gds/applications/algorithms/centrality/CentralityAlgorithms.java

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,12 @@ BitSet articulationPoints(Graph graph, AlgoBaseConfig configuration) {
125125

126126
var algorithm = new ArticulationPoints(graph, progressTracker);
127127

128-
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(algorithm, progressTracker, true);
128+
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(
129+
algorithm,
130+
progressTracker,
131+
true,
132+
configuration.concurrency()
133+
);
129134
}
130135

131136
BetwennessCentralityResult betweennessCentrality(Graph graph, BetweennessCentralityBaseConfig configuration) {
@@ -168,7 +173,12 @@ public BetwennessCentralityResult betweennessCentrality(
168173
terminationFlag
169174
);
170175

171-
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(algorithm, progressTracker, true);
176+
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(
177+
algorithm,
178+
progressTracker,
179+
true,
180+
parameters.concurrency()
181+
);
172182
}
173183

174184
BridgeResult bridges(Graph graph, AlgoBaseConfig configuration) {
@@ -178,7 +188,12 @@ BridgeResult bridges(Graph graph, AlgoBaseConfig configuration) {
178188

179189
var algorithm = new Bridges(graph, progressTracker);
180190

181-
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(algorithm, progressTracker, true);
191+
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(
192+
algorithm,
193+
progressTracker,
194+
true,
195+
configuration.concurrency()
196+
);
182197
}
183198

184199
public CELFResult celf(Graph graph, InfluenceMaximizationBaseConfig configuration) {
@@ -191,7 +206,12 @@ public CELFResult celf(Graph graph, InfluenceMaximizationBaseConfig configuratio
191206

192207
var algorithm = new CELF(graph, configuration.toParameters(), DefaultPool.INSTANCE, progressTracker);
193208

194-
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(algorithm, progressTracker, true);
209+
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(
210+
algorithm,
211+
progressTracker,
212+
true,
213+
configuration.concurrency()
214+
);
195215
}
196216

197217
ClosenessCentralityResult closenessCentrality(Graph graph, ClosenessCentralityBaseConfig configuration) {
@@ -218,7 +238,12 @@ ClosenessCentralityResult closenessCentrality(Graph graph, ClosenessCentralityBa
218238
terminationFlag
219239
);
220240

221-
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(algorithm, progressTracker, true);
241+
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(
242+
algorithm,
243+
progressTracker,
244+
true,
245+
configuration.concurrency()
246+
);
222247
}
223248

224249
DegreeCentralityResult degreeCentrality(Graph graph, DegreeCentralityConfig configuration) {
@@ -237,7 +262,12 @@ DegreeCentralityResult degreeCentrality(Graph graph, DegreeCentralityConfig conf
237262
progressTracker
238263
);
239264

240-
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(algorithm, progressTracker, true);
265+
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(
266+
algorithm,
267+
progressTracker,
268+
true,
269+
configuration.concurrency()
270+
);
241271
}
242272

243273
PageRankResult eigenVector(Graph graph, EigenvectorConfig configuration) {
@@ -279,7 +309,12 @@ HarmonicResult harmonicCentrality(Graph graph, AlgoBaseConfig configuration) {
279309
terminationFlag
280310
);
281311

282-
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(algorithm, progressTracker, true);
312+
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(
313+
algorithm,
314+
progressTracker,
315+
true,
316+
configuration.concurrency()
317+
);
283318
}
284319

285320
PregelResult hits(Graph graph, HitsConfig configuration) {
@@ -297,7 +332,12 @@ PregelResult hits(Graph graph, HitsConfig configuration) {
297332
progressTracker
298333
);
299334

300-
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(algorithm, progressTracker, true);
335+
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(
336+
algorithm,
337+
progressTracker,
338+
true,
339+
configuration.concurrency()
340+
);
301341
}
302342

303343
IndirectExposureResult indirectExposure(Graph graph, IndirectExposureConfig configuration) {
@@ -315,7 +355,12 @@ IndirectExposureResult indirectExposure(Graph graph, IndirectExposureConfig conf
315355
progressTracker
316356
);
317357

318-
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(algorithm, progressTracker, true);
358+
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(
359+
algorithm,
360+
progressTracker,
361+
true,
362+
configuration.concurrency()
363+
);
319364
}
320365

321366
public PageRankResult pageRank(Graph graph, PageRankConfig configuration) {

applications/algorithms/community/src/main/java/org/neo4j/gds/applications/algorithms/community/CommunityAlgorithms.java

Lines changed: 84 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,12 @@ ApproxMaxKCutResult approximateMaximumKCut(Graph graph, ApproxMaxKCutBaseConfig
120120
terminationFlag
121121
);
122122

123-
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(algorithm, progressTracker, true);
123+
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(
124+
algorithm,
125+
progressTracker,
126+
true,
127+
configuration.concurrency()
128+
);
124129
}
125130

126131
ConductanceResult conductance(Graph graph, ConductanceBaseConfig configuration) {
@@ -144,7 +149,12 @@ ConductanceResult conductance(Graph graph, ConductanceBaseConfig configuration)
144149
progressTracker
145150
);
146151

147-
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(algorithm, progressTracker, true);
152+
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(
153+
algorithm,
154+
progressTracker,
155+
true,
156+
configuration.concurrency()
157+
);
148158
}
149159

150160
K1ColoringResult k1Coloring(Graph graph, K1ColoringBaseConfig configuration) {
@@ -164,7 +174,12 @@ K1ColoringResult k1Coloring(Graph graph, K1ColoringBaseConfig configuration) {
164174
terminationFlag
165175
);
166176

167-
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(algorithm, progressTracker, true);
177+
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(
178+
algorithm,
179+
progressTracker,
180+
true,
181+
configuration.concurrency()
182+
);
168183
}
169184

170185
KCoreDecompositionResult kCore(Graph graph, AlgoBaseConfig configuration) {
@@ -173,7 +188,12 @@ KCoreDecompositionResult kCore(Graph graph, AlgoBaseConfig configuration) {
173188

174189
var algorithm = new KCoreDecomposition(graph, configuration.concurrency(), progressTracker, terminationFlag);
175190

176-
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(algorithm, progressTracker, true);
191+
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(
192+
algorithm,
193+
progressTracker,
194+
true,
195+
configuration.concurrency()
196+
);
177197
}
178198

179199
public KmeansResult kMeans(Graph graph, KmeansBaseConfig configuration) {
@@ -194,7 +214,12 @@ public KmeansResult kMeans(Graph graph, KmeansBaseConfig configuration) {
194214
.build();
195215
var algorithm = Kmeans.createKmeans(graph, configuration.toParameters(), kmeansContext, terminationFlag);
196216

197-
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(algorithm, progressTracker, true);
217+
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(
218+
algorithm,
219+
progressTracker,
220+
true,
221+
configuration.concurrency()
222+
);
198223
}
199224

200225
LabelPropagationResult labelPropagation(Graph graph, LabelPropagationBaseConfig configuration) {
@@ -217,7 +242,12 @@ LabelPropagationResult labelPropagation(Graph graph, LabelPropagationBaseConfig
217242
terminationFlag
218243
);
219244

220-
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(algorithm, progressTracker, true);
245+
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(
246+
algorithm,
247+
progressTracker,
248+
true,
249+
configuration.concurrency()
250+
);
221251
}
222252

223253
LocalClusteringCoefficientResult lcc(Graph graph, LocalClusteringCoefficientBaseConfig configuration) {
@@ -240,7 +270,12 @@ LocalClusteringCoefficientResult lcc(Graph graph, LocalClusteringCoefficientBase
240270
terminationFlag
241271
);
242272

243-
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(algorithm, progressTracker, true);
273+
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(
274+
algorithm,
275+
progressTracker,
276+
true,
277+
configuration.concurrency()
278+
);
244279
}
245280

246281
LeidenResult leiden(Graph graph, LeidenBaseConfig configuration) {
@@ -284,7 +319,12 @@ LeidenResult leiden(Graph graph, LeidenBaseConfig configuration) {
284319
terminationFlag
285320
);
286321

287-
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(algorithm, progressTracker, true);
322+
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(
323+
algorithm,
324+
progressTracker,
325+
true,
326+
configuration.concurrency()
327+
);
288328
}
289329

290330
LouvainResult louvain(Graph graph, LouvainBaseConfig configuration) {
@@ -307,7 +347,12 @@ LouvainResult louvain(Graph graph, LouvainBaseConfig configuration) {
307347
terminationFlag
308348
);
309349

310-
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(algorithm, progressTracker, true);
350+
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(
351+
algorithm,
352+
progressTracker,
353+
true,
354+
configuration.concurrency()
355+
);
311356
}
312357

313358
ModularityResult modularity(Graph graph, ModularityBaseConfig configuration) {
@@ -350,7 +395,12 @@ ModularityOptimizationResult modularityOptimization(Graph graph, ModularityOptim
350395
terminationFlag
351396
);
352397

353-
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(algorithm, progressTracker, true);
398+
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(
399+
algorithm,
400+
progressTracker,
401+
true,
402+
configuration.concurrency()
403+
);
354404
}
355405

356406
HugeLongArray scc(Graph graph, SccCommonBaseConfig configuration) {
@@ -361,7 +411,12 @@ HugeLongArray scc(Graph graph, SccCommonBaseConfig configuration) {
361411

362412
var algorithm = new Scc(graph, progressTracker, terminationFlag);
363413

364-
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(algorithm, progressTracker, true);
414+
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(
415+
algorithm,
416+
progressTracker,
417+
true,
418+
configuration.concurrency()
419+
);
365420
}
366421

367422
TriangleCountResult triangleCount(Graph graph, TriangleCountBaseConfig configuration) {
@@ -379,7 +434,12 @@ TriangleCountResult triangleCount(Graph graph, TriangleCountBaseConfig configura
379434
terminationFlag
380435
);
381436

382-
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(algorithm, progressTracker, true);
437+
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(
438+
algorithm,
439+
progressTracker,
440+
true,
441+
configuration.concurrency()
442+
);
383443
}
384444

385445
Stream<TriangleResult> triangles(Graph graph, ConcurrencyConfig configuration) {
@@ -411,7 +471,12 @@ DisjointSetStruct wcc(Graph graph, WccBaseConfig configuration) {
411471
terminationFlag
412472
);
413473

414-
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(algorithm, progressTracker, true);
474+
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(
475+
algorithm,
476+
progressTracker,
477+
true,
478+
configuration.concurrency()
479+
);
415480
}
416481

417482
private Task constructKMeansProgressTask(Graph graph, KmeansBaseConfig configuration) {
@@ -482,6 +547,11 @@ PregelResult speakerListenerLPA(Graph graph, SpeakerListenerLPAConfig configurat
482547
Optional.empty()
483548
);
484549

485-
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(algorithm, progressTracker, true);
550+
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(
551+
algorithm,
552+
progressTracker,
553+
true,
554+
configuration.concurrency()
555+
);
486556
}
487557
}

applications/algorithms/machine-learning/src/main/java/org/neo4j/gds/applications/algorithms/machinelearning/MachineLearningAlgorithms.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121

2222
import com.carrotsearch.hppc.BitSet;
2323
import org.neo4j.gds.algorithms.machinelearning.KGEPredictBaseConfig;
24-
import org.neo4j.gds.algorithms.machinelearning.KGEPredictResult;
2524
import org.neo4j.gds.algorithms.machinelearning.KGEPredictConfigTransformer;
25+
import org.neo4j.gds.algorithms.machinelearning.KGEPredictResult;
2626
import org.neo4j.gds.algorithms.machinelearning.TopKMapComputer;
2727
import org.neo4j.gds.api.Graph;
2828
import org.neo4j.gds.api.GraphStore;
@@ -79,7 +79,12 @@ KGEPredictResult kge(Graph graph, KGEPredictBaseConfig configuration) {
7979
terminationFlag
8080
);
8181

82-
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(algorithm, progressTracker, true);
82+
return algorithmMachinery.runAlgorithmsAndManageProgressTracker(
83+
algorithm,
84+
progressTracker,
85+
true,
86+
configuration.concurrency()
87+
);
8388
}
8489

8590
EdgeSplitter.SplitResult splitRelationships(GraphStore graphStore, SplitRelationshipsBaseConfig configuration) {

applications/algorithms/machinery/src/main/java/org/neo4j/gds/applications/algorithms/machinery/AlgorithmMachinery.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.neo4j.gds.applications.algorithms.machinery;
2121

2222
import org.neo4j.gds.Algorithm;
23+
import org.neo4j.gds.core.concurrency.Concurrency;
2324
import org.neo4j.gds.core.utils.progress.tasks.ProgressTracker;
2425

2526
/**
@@ -40,9 +41,11 @@ public class AlgorithmMachinery {
4041
public <RESULT> RESULT runAlgorithmsAndManageProgressTracker(
4142
Algorithm<RESULT> algorithm,
4243
ProgressTracker progressTracker,
43-
boolean shouldReleaseProgressTracker
44+
boolean shouldReleaseProgressTracker,
45+
Concurrency concurrency
4446
) {
4547
try {
48+
progressTracker.requestedConcurrency(concurrency);
4649
return algorithm.compute();
4750
} catch (Exception e) {
4851
progressTracker.endSubTaskWithFailure();

0 commit comments

Comments
 (0)