Skip to content

Commit 3392c94

Browse files
Implement MaxFlow
Co-authored-by: Ioannis Panagiotas <ioannis.panagiotas@neo4j.com>
1 parent 875867f commit 3392c94

File tree

4 files changed

+519
-0
lines changed

4 files changed

+519
-0
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
2+
/*
3+
* Copyright (c) "Neo4j"
4+
* Neo4j Sweden AB [http://neo4j.com]
5+
*
6+
* This file is part of Neo4j.
7+
*
8+
* Neo4j is free software: you can redistribute it and/or modify
9+
* it under the terms of the GNU General Public License as published by
10+
* the Free Software Foundation, either version 3 of the License, or
11+
* (at your option) any later version.
12+
*
13+
* This program is distributed in the hope that it will be useful,
14+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
15+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16+
* GNU General Public License for more details.
17+
*
18+
* You should have received a copy of the GNU General Public License
19+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
20+
*/
21+
package org.neo4j.gds.maxflow;
22+
23+
import org.neo4j.gds.core.concurrency.Concurrency;
24+
25+
public record MaxFlowParameters(Concurrency concurrency, long alpha, long beta, double freq) {
26+
}
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Neo4j is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*/
20+
package org.neo4j.gds.maxflow;
21+
22+
import org.neo4j.gds.Algorithm;
23+
import org.neo4j.gds.api.Graph;
24+
import org.neo4j.gds.collections.ha.HugeDoubleArray;
25+
import org.neo4j.gds.collections.ha.HugeLongArray;
26+
import org.neo4j.gds.collections.haa.HugeAtomicDoubleArray;
27+
import org.neo4j.gds.core.utils.paged.HugeAtomicBitSet;
28+
import org.neo4j.gds.core.utils.paged.ParallelDoublePageCreator;
29+
import org.neo4j.gds.core.utils.progress.tasks.ProgressTracker;
30+
import org.neo4j.gds.termination.TerminationFlag;
31+
32+
import java.util.concurrent.ExecutorService;
33+
import java.util.concurrent.atomic.AtomicLong;
34+
35+
public final class MaxFlow extends Algorithm<FlowResult> {
36+
static final double FREQ = 0.5;
37+
static final int ALPHA = 6;
38+
static final int BETA = 12;
39+
private final Graph graph;
40+
private final long source;
41+
private final long target;
42+
private final MaxFlowParameters parameters;
43+
private final ExecutorService executorService;
44+
45+
private MaxFlow(
46+
Graph graph,
47+
long source,
48+
long target,
49+
MaxFlowParameters parameters,
50+
ExecutorService executorService,
51+
ProgressTracker progressTracker,
52+
TerminationFlag terminationFlag
53+
) {
54+
super(progressTracker);
55+
this.graph = graph;
56+
this.source = source;
57+
this.target = target;
58+
this.parameters = parameters;
59+
this.executorService = executorService;
60+
this.terminationFlag = terminationFlag;
61+
}
62+
63+
public static MaxFlow create(
64+
Graph graph,
65+
long sourceNode,
66+
long targetNode,
67+
MaxFlowParameters parameters,
68+
ExecutorService executorService,
69+
ProgressTracker progressTracker,
70+
TerminationFlag terminationFlag
71+
) {
72+
return new MaxFlow(
73+
graph,
74+
sourceNode,
75+
targetNode,
76+
parameters,
77+
executorService,
78+
progressTracker,
79+
terminationFlag
80+
);
81+
}
82+
83+
public FlowResult compute() {
84+
var preflow = initPreflow(source);
85+
maximizeFlow(preflow, source, target);
86+
maximizeFlow(preflow, target, source);
87+
return preflow.flowGraph().createFlowResult(target);
88+
}
89+
90+
private Preflow initPreflow(long source) {
91+
var excess = HugeDoubleArray.newArray(graph.nodeCount());
92+
excess.setAll(x -> 0D);
93+
var flowGraph = FlowGraph.create(graph);
94+
flowGraph.forEachRelationship(
95+
source, (s, t, relIdx, residualCapacity, isReverse) -> {
96+
flowGraph.push(relIdx, residualCapacity, isReverse);
97+
excess.set(t, residualCapacity);
98+
return true;
99+
}
100+
);
101+
var label = HugeLongArray.newArray(flowGraph.nodeCount());
102+
return new Preflow(flowGraph, excess, label);
103+
}
104+
105+
private void maximizeFlow(Preflow preflow, long sourceNode, long targetNode) { //make non-static
106+
var flowGraph = preflow.flowGraph();
107+
var excess = preflow.excess();
108+
var label = preflow.label();
109+
110+
var nodeCount = flowGraph.nodeCount();
111+
var edgeCount = flowGraph.edgeCount();
112+
113+
var addedExcess = HugeAtomicDoubleArray.of(
114+
nodeCount,
115+
ParallelDoublePageCreator.passThrough(parameters.concurrency())
116+
); //fixme
117+
var tempLabel = HugeLongArray.newArray(nodeCount);
118+
var isDiscovered = HugeAtomicBitSet.create(nodeCount);
119+
var workingSet = new AtomicWorkingSet(nodeCount);
120+
for (var nodeId = 0; nodeId < nodeCount; nodeId++) {
121+
if (excess.get(nodeId) > 0.0) {
122+
workingSet.push(nodeId);
123+
}
124+
}
125+
126+
var workSinceLastGR = new AtomicLong(Long.MAX_VALUE);
127+
128+
while (!workingSet.isEmpty()) {
129+
if (parameters.freq() * workSinceLastGR.doubleValue() > parameters.alpha() * nodeCount + edgeCount) {
130+
GlobalRelabeling.globalRelabeling(flowGraph, label, sourceNode, targetNode, parameters.concurrency());
131+
workSinceLastGR.set(0L);
132+
}
133+
Discharging.processWorkingSet(
134+
flowGraph,
135+
excess,
136+
label,
137+
tempLabel,
138+
addedExcess,
139+
isDiscovered,
140+
workingSet,
141+
targetNode,
142+
parameters.beta(),
143+
workSinceLastGR,
144+
parameters.concurrency()
145+
);
146+
}
147+
}
148+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Neo4j is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*/
20+
package org.neo4j.gds.maxflow;
21+
22+
import org.neo4j.gds.collections.ha.HugeDoubleArray;
23+
import org.neo4j.gds.collections.ha.HugeLongArray;
24+
25+
record Preflow(FlowGraph flowGraph, HugeDoubleArray excess, HugeLongArray label) {
26+
}

0 commit comments

Comments
 (0)