Skip to content

Commit 247951c

Browse files
authored
Merge pull request #6646 from DarthMax/to_undirected
Implementation of To Undirected procedure
2 parents bb192eb + 1266b4c commit 247951c

File tree

21 files changed

+1242
-141
lines changed

21 files changed

+1242
-141
lines changed
Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Neo4j is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*/
20+
package org.neo4j.gds.beta.undirected;
21+
22+
import org.jetbrains.annotations.NotNull;
23+
import org.neo4j.gds.Algorithm;
24+
import org.neo4j.gds.Orientation;
25+
import org.neo4j.gds.RelationshipType;
26+
import org.neo4j.gds.api.CompositeRelationshipIterator;
27+
import org.neo4j.gds.api.Graph;
28+
import org.neo4j.gds.api.GraphStore;
29+
import org.neo4j.gds.api.PropertyState;
30+
import org.neo4j.gds.api.RelationshipIterator;
31+
import org.neo4j.gds.api.RelationshipProperty;
32+
import org.neo4j.gds.api.RelationshipPropertyStore;
33+
import org.neo4j.gds.api.schema.Direction;
34+
import org.neo4j.gds.api.schema.PropertySchema;
35+
import org.neo4j.gds.api.schema.RelationshipPropertySchema;
36+
import org.neo4j.gds.core.concurrency.RunWithConcurrency;
37+
import org.neo4j.gds.core.loading.SingleTypeRelationshipImportResult;
38+
import org.neo4j.gds.core.loading.construction.GraphFactory;
39+
import org.neo4j.gds.core.loading.construction.RelationshipsAndDirection;
40+
import org.neo4j.gds.core.loading.construction.RelationshipsBuilder;
41+
import org.neo4j.gds.core.loading.construction.RelationshipsBuilderBuilder;
42+
import org.neo4j.gds.core.utils.partition.DegreePartition;
43+
import org.neo4j.gds.core.utils.partition.PartitionUtils;
44+
import org.neo4j.gds.core.utils.progress.tasks.ProgressTracker;
45+
import org.neo4j.values.storable.NumberType;
46+
47+
import java.util.List;
48+
import java.util.Optional;
49+
import java.util.concurrent.ExecutorService;
50+
import java.util.function.Function;
51+
import java.util.stream.Collectors;
52+
import java.util.stream.IntStream;
53+
54+
public class ToUndirected extends Algorithm<SingleTypeRelationshipImportResult> {
55+
private final GraphStore graphStore;
56+
private final ToUndirectedConfig config;
57+
private final ExecutorService executorService;
58+
59+
protected ToUndirected(
60+
GraphStore graphStore,
61+
ToUndirectedConfig config,
62+
ProgressTracker progressTracker,
63+
ExecutorService executorService
64+
) {
65+
super(progressTracker);
66+
67+
this.graphStore = graphStore;
68+
this.config = config;
69+
this.executorService = executorService;
70+
}
71+
72+
@Override
73+
public SingleTypeRelationshipImportResult compute() {
74+
progressTracker.beginSubTask();
75+
76+
RelationshipType fromRelationshipType = RelationshipType.of(config.relationshipType());
77+
78+
var propertySchemas = graphStore
79+
.schema()
80+
.relationshipSchema()
81+
.propertySchemasFor(fromRelationshipType);
82+
var propertyKeys = propertySchemas.stream().map(PropertySchema::key).collect(Collectors.toList());
83+
84+
var relationshipsBuilder = initializeRelationshipsBuilder(propertySchemas);
85+
86+
var tasks = createTasks(fromRelationshipType, propertyKeys, relationshipsBuilder);
87+
88+
progressTracker.beginSubTask();
89+
90+
RunWithConcurrency.
91+
builder()
92+
.tasks(tasks)
93+
.concurrency(config.concurrency())
94+
.executor(executorService)
95+
.terminationFlag(terminationFlag)
96+
.build()
97+
.run();
98+
99+
progressTracker.endSubTask();
100+
101+
progressTracker.beginSubTask();
102+
var relationships = relationshipsBuilder.buildAll();
103+
progressTracker.endSubTask();
104+
105+
SingleTypeRelationshipImportResult result = createResult(
106+
propertySchemas,
107+
propertyKeys,
108+
relationships
109+
);
110+
111+
progressTracker.endSubTask();
112+
113+
return result;
114+
}
115+
116+
@NotNull
117+
private RelationshipsBuilder initializeRelationshipsBuilder(List<RelationshipPropertySchema> propertySchemas) {
118+
RelationshipsBuilderBuilder relationshipsBuilderBuilder = GraphFactory.initRelationshipsBuilder()
119+
.concurrency(config.concurrency())
120+
.nodes(graphStore.nodes())
121+
.executorService(executorService)
122+
.orientation(Orientation.UNDIRECTED)
123+
.validateRelationships(false);
124+
125+
propertySchemas.forEach(propertySchema ->
126+
relationshipsBuilderBuilder.addPropertyConfig(propertySchema.aggregation(), propertySchema.defaultValue())
127+
);
128+
129+
return relationshipsBuilderBuilder.build();
130+
}
131+
132+
private static SingleTypeRelationshipImportResult createResult(
133+
List<RelationshipPropertySchema> propertySchemas,
134+
List<String> propertyKeys,
135+
List<RelationshipsAndDirection> relationships
136+
) {
137+
var topology = relationships.get(0).relationships().topology();
138+
var propertyValues = IntStream.range(0, propertyKeys.size())
139+
.boxed()
140+
.collect(Collectors.toMap(
141+
propertyKeys::get,
142+
idx -> RelationshipProperty.of(
143+
propertyKeys.get(idx),
144+
NumberType.FLOATING_POINT,
145+
PropertyState.TRANSIENT,
146+
relationships.get(idx).relationships().properties().orElseThrow(IllegalStateException::new),
147+
propertySchemas.get(idx).defaultValue(),
148+
propertySchemas.get(idx).aggregation()
149+
)
150+
));
151+
152+
RelationshipPropertyStore propertyStore = RelationshipPropertyStore.builder()
153+
.putAllRelationshipProperties(propertyValues)
154+
.build();
155+
156+
return SingleTypeRelationshipImportResult.builder()
157+
.topology(topology)
158+
.properties(propertyStore)
159+
.direction(Direction.UNDIRECTED)
160+
.build();
161+
}
162+
163+
@NotNull
164+
private List<Runnable> createTasks(
165+
RelationshipType fromRelationshipType,
166+
List<String> propertyKeys,
167+
RelationshipsBuilder relationshipsBuilder
168+
) {
169+
Function<DegreePartition, Runnable> taskCreator;
170+
if (propertyKeys.size() == 1) {
171+
Graph graph = graphStore.getGraph(fromRelationshipType, Optional.of(propertyKeys.get(0)));
172+
173+
taskCreator = (partition) -> new ToUndirectedTaskWithSingleProperty(
174+
relationshipsBuilder,
175+
graph.concurrentCopy(),
176+
partition,
177+
progressTracker
178+
);
179+
}
180+
else {
181+
CompositeRelationshipIterator relationshipIterator = graphStore.getCompositeRelationshipIterator(
182+
fromRelationshipType,
183+
propertyKeys
184+
);
185+
186+
taskCreator = (partition) -> new ToUndirectedTaskWithMultipleProperties(
187+
relationshipsBuilder,
188+
relationshipIterator.concurrentCopy(),
189+
partition,
190+
progressTracker
191+
);
192+
}
193+
194+
return PartitionUtils.degreePartition(
195+
graphStore.getGraph(fromRelationshipType),
196+
config.concurrency(),
197+
taskCreator,
198+
Optional.empty()
199+
);
200+
}
201+
202+
@Override
203+
public void release() {
204+
205+
}
206+
207+
private static final class ToUndirectedTaskWithSingleProperty implements Runnable {
208+
209+
private final RelationshipsBuilder relationshipsBuilder;
210+
private final RelationshipIterator relationshipIterator;
211+
private final DegreePartition partition;
212+
private final ProgressTracker progressTracker;
213+
214+
private ToUndirectedTaskWithSingleProperty(
215+
RelationshipsBuilder relationshipsBuilder,
216+
RelationshipIterator relationshipIterator,
217+
DegreePartition partition,
218+
ProgressTracker progressTracker
219+
) {
220+
this.relationshipsBuilder = relationshipsBuilder;
221+
this.relationshipIterator = relationshipIterator;
222+
this.partition = partition;
223+
this.progressTracker = progressTracker;
224+
}
225+
226+
@Override
227+
public void run() {
228+
for (long i = partition.startNode(); i < partition.startNode() + partition.nodeCount(); i++) {
229+
relationshipIterator.forEachRelationship(i, 0.0D, (source, target, property) -> {
230+
relationshipsBuilder.addFromInternal(target, source, property);
231+
return true;
232+
});
233+
progressTracker.logProgress();
234+
}
235+
}
236+
}
237+
238+
private static final class ToUndirectedTaskWithMultipleProperties implements Runnable {
239+
240+
private final RelationshipsBuilder relationshipsBuilder;
241+
private final CompositeRelationshipIterator relationshipIterator;
242+
private final DegreePartition partition;
243+
private final ProgressTracker progressTracker;
244+
245+
private ToUndirectedTaskWithMultipleProperties(
246+
RelationshipsBuilder relationshipsBuilder,
247+
CompositeRelationshipIterator relationshipIterator,
248+
DegreePartition partition,
249+
ProgressTracker progressTracker
250+
) {
251+
this.relationshipsBuilder = relationshipsBuilder;
252+
this.relationshipIterator = relationshipIterator;
253+
this.partition = partition;
254+
this.progressTracker = progressTracker;
255+
}
256+
257+
@Override
258+
public void run() {
259+
for (long i = partition.startNode(); i < partition.startNode() + partition.nodeCount(); i++) {
260+
relationshipIterator.forEachRelationship(i, (source, target, properties) -> {
261+
relationshipsBuilder.addFromInternal(target, source, properties);
262+
return true;
263+
});
264+
progressTracker.logProgress();
265+
}
266+
}
267+
}
268+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Neo4j is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*/
20+
package org.neo4j.gds.beta.undirected;
21+
22+
import org.neo4j.gds.GraphStoreAlgorithmFactory;
23+
import org.neo4j.gds.RelationshipType;
24+
import org.neo4j.gds.api.GraphStore;
25+
import org.neo4j.gds.core.concurrency.Pools;
26+
import org.neo4j.gds.core.huge.CompressedAdjacencyList;
27+
import org.neo4j.gds.core.huge.UncompressedAdjacencyList;
28+
import org.neo4j.gds.core.utils.mem.MemoryEstimation;
29+
import org.neo4j.gds.core.utils.mem.MemoryEstimations;
30+
import org.neo4j.gds.core.utils.mem.MemoryRange;
31+
import org.neo4j.gds.core.utils.progress.tasks.ProgressTracker;
32+
import org.neo4j.gds.core.utils.progress.tasks.Task;
33+
import org.neo4j.gds.core.utils.progress.tasks.Tasks;
34+
35+
public class ToUndirectedAlgorithmFactory extends GraphStoreAlgorithmFactory<ToUndirected, ToUndirectedConfig> {
36+
37+
@Override
38+
public ToUndirected build(
39+
GraphStore graphStore,
40+
ToUndirectedConfig configuration,
41+
ProgressTracker progressTracker
42+
) {
43+
return new ToUndirected(graphStore, configuration, progressTracker, Pools.DEFAULT);
44+
}
45+
46+
@Override
47+
public String taskName() {
48+
return "ToUndirected";
49+
}
50+
51+
@Override
52+
public Task progressTask(GraphStore graphStore, ToUndirectedConfig config) {
53+
return Tasks.task(
54+
"ToUndirected",
55+
Tasks.leaf("Create Undirected Relationships", graphStore.nodeCount()),
56+
Tasks.leaf("Build undirected Adjacency list")
57+
);
58+
}
59+
60+
@Override
61+
public MemoryEstimation memoryEstimation(ToUndirectedConfig configuration) {
62+
RelationshipType relationshipType = RelationshipType.of(configuration.relationshipType());
63+
64+
var builder = MemoryEstimations.builder(ToUndirected.class)
65+
.add("relationships", CompressedAdjacencyList.adjacencyListEstimation(relationshipType, true));
66+
67+
builder.perGraphDimension("properties", ((graphDimensions, concurrency) -> {
68+
long max = graphDimensions.relationshipPropertyTokens().keySet().stream().mapToLong(__ ->
69+
UncompressedAdjacencyList
70+
.adjacencyPropertiesEstimation(relationshipType, true)
71+
.estimate(graphDimensions, concurrency)
72+
.memoryUsage().max
73+
).sum();
74+
return MemoryRange.of(0, max);
75+
}));
76+
77+
return builder.build();
78+
}
79+
}

0 commit comments

Comments
 (0)