Skip to content

Commit 405d9db

Browse files
authored
Merge pull request #6896 from s1ck/nodes-builder-threadlocal-property-schema-2.3
Make property schema building thread-local
2 parents 9bd3b22 + 5165902 commit 405d9db

File tree

4 files changed

+116
-30
lines changed

4 files changed

+116
-30
lines changed

core/src/main/java/org/neo4j/gds/core/loading/construction/GraphFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ static NodesBuilder nodesBuilder(
121121
maxOriginalId,
122122
threadCount,
123123
TokenToNodeLabel.lazy(),
124-
NodeLabelTokenToPropertyKeys.lazy(),
124+
NodeLabelTokenToPropertyKeys::lazy,
125125
new ConcurrentHashMap<>(),
126126
idMapBuilder,
127127
labelInformation,
@@ -162,7 +162,7 @@ private static NodesBuilder fromSchema(
162162
maxOriginalId,
163163
concurrency,
164164
TokenToNodeLabel.fixed(elementIdentifierLabelTokenMapping, labelTokenNodeLabelMapping),
165-
NodeLabelTokenToPropertyKeys.fixed(nodeSchema),
165+
() -> NodeLabelTokenToPropertyKeys.fixed(nodeSchema),
166166
new ConcurrentHashMap<>(propertyBuildersByPropertyKey),
167167
idMapBuilder,
168168
hasLabelInformation,

core/src/main/java/org/neo4j/gds/core/loading/construction/NodeLabelTokenToPropertyKeys.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@
2424
import org.neo4j.gds.api.schema.PropertySchema;
2525
import org.neo4j.gds.utils.StringJoining;
2626

27+
import java.util.HashMap;
2728
import java.util.HashSet;
2829
import java.util.Map;
2930
import java.util.Set;
30-
import java.util.concurrent.ConcurrentHashMap;
3131
import java.util.stream.Collectors;
3232

3333
abstract class NodeLabelTokenToPropertyKeys {
@@ -70,6 +70,29 @@ abstract Map<String, PropertySchema> propertySchemas(
7070
Map<String, PropertySchema> importPropertySchemas
7171
);
7272

73+
/**
74+
* Computes the union of the two given mappings without
75+
* changing the contents of the mappings themselves.
76+
*/
77+
static NodeLabelTokenToPropertyKeys union(
78+
NodeLabelTokenToPropertyKeys left,
79+
NodeLabelTokenToPropertyKeys right,
80+
Map<String, PropertySchema> importPropertySchemas
81+
) {
82+
var union = NodeLabelTokenToPropertyKeys.lazy();
83+
84+
left.nodeLabels().forEach(nodeLabel -> {
85+
var propertyKeys = left.propertySchemas(nodeLabel, importPropertySchemas).keySet();
86+
union.add(NodeLabelTokens.ofNodeLabel(nodeLabel), propertyKeys);
87+
});
88+
right.nodeLabels().forEach(nodeLabel -> {
89+
var propertyKeys = right.propertySchemas(nodeLabel, importPropertySchemas).keySet();
90+
union.add(NodeLabelTokens.ofNodeLabel(nodeLabel), propertyKeys);
91+
});
92+
93+
return union;
94+
}
95+
7396
private static class Fixed extends NodeLabelTokenToPropertyKeys {
7497

7598
private final NodeSchema nodeSchema;
@@ -136,10 +159,10 @@ Map<String, PropertySchema> propertySchemas(
136159

137160
private static class Lazy extends NodeLabelTokenToPropertyKeys {
138161

139-
private final ConcurrentHashMap<NodeLabelToken, Set<String>> labelToPropertyKeys;
162+
private final Map<NodeLabelToken, Set<String>> labelToPropertyKeys;
140163

141164
Lazy() {
142-
this.labelToPropertyKeys = new ConcurrentHashMap<>();
165+
this.labelToPropertyKeys = new HashMap<>();
143166
}
144167

145168
@Override

core/src/main/java/org/neo4j/gds/core/loading/construction/NodesBuilder.java

Lines changed: 48 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.neo4j.values.virtual.MapValue;
5151

5252
import java.util.ArrayList;
53+
import java.util.Collection;
5354
import java.util.HashSet;
5455
import java.util.List;
5556
import java.util.Map;
@@ -58,6 +59,7 @@
5859
import java.util.concurrent.atomic.LongAdder;
5960
import java.util.function.Function;
6061
import java.util.function.LongPredicate;
62+
import java.util.function.Supplier;
6163
import java.util.stream.Collectors;
6264

6365
import static java.util.stream.Collectors.toMap;
@@ -81,13 +83,11 @@ public final class NodesBuilder {
8183

8284
private final ConcurrentMap<String, NodePropertiesFromStoreBuilder> propertyBuildersByPropertyKey;
8385

84-
private final NodeLabelTokenToPropertyKeys nodeLabelTokenToPropertyKeys;
85-
8686
NodesBuilder(
8787
long maxOriginalId,
8888
int concurrency,
8989
TokenToNodeLabel tokenToNodeLabel,
90-
NodeLabelTokenToPropertyKeys nodeLabelTokenToPropertyKeys,
90+
Supplier<NodeLabelTokenToPropertyKeys> nodeLabelTokenToPropertyKeysSupplier,
9191
ConcurrentMap<String, NodePropertiesFromStoreBuilder> propertyBuildersByPropertyKey,
9292
IdMapBuilder idMapBuilder,
9393
boolean hasLabelInformation,
@@ -97,7 +97,6 @@ public final class NodesBuilder {
9797
) {
9898
this.maxOriginalId = maxOriginalId;
9999
this.concurrency = concurrency;
100-
this.nodeLabelTokenToPropertyKeys = nodeLabelTokenToPropertyKeys;
101100
this.idMapBuilder = idMapBuilder;
102101
this.propertyStates = propertyStates;
103102
this.labelInformationBuilder = !hasLabelInformation
@@ -128,7 +127,7 @@ public final class NodesBuilder {
128127
hasLabelInformation,
129128
hasProperties,
130129
tokenToNodeLabel,
131-
nodeLabelTokenToPropertyKeys,
130+
nodeLabelTokenToPropertyKeysSupplier.get(),
132131
propertyBuilderFn
133132
)
134133
);
@@ -200,14 +199,11 @@ public Nodes build() {
200199
}
201200

202201
public Nodes build(long highestNeoId) {
203-
// Flush remaining buffer contents
204-
this.threadLocalBuilder.forEach(ThreadLocalBuilder::flush);
205-
// Clean up resources held by local builders
206-
this.threadLocalBuilder.close();
202+
var localLabelTokenToPropertyKeys = closeThreadLocalBuilders();
207203

208204
var idMap = this.idMapBuilder.build(labelInformationBuilder, highestNeoId, concurrency);
209205
var nodeProperties = buildProperties(idMap);
210-
var nodeSchema = buildNodeSchema(idMap, nodeProperties);
206+
var nodeSchema = buildNodeSchema(idMap, localLabelTokenToPropertyKeys, nodeProperties);
211207
var nodePropertyStore = NodePropertyStore.builder().properties(nodeProperties).build();
212208

213209
return ImmutableNodes.builder()
@@ -217,26 +213,53 @@ public Nodes build(long highestNeoId) {
217213
.build();
218214
}
219215

220-
private NodeSchema buildNodeSchema(IdMap idMap, Map<String, NodeProperty> nodeProperties) {
221-
var nodeSchema = NodeSchema.empty();
222-
var importPropertySchemas = nodeProperties
216+
private List<NodeLabelTokenToPropertyKeys> closeThreadLocalBuilders() {
217+
// Flush remaining buffer contents
218+
this.threadLocalBuilder.forEach(ThreadLocalBuilder::flush);
219+
// Collect token to property keys for final union
220+
var labelTokenToPropertyKeys = new ArrayList<NodeLabelTokenToPropertyKeys>();
221+
this.threadLocalBuilder.forEach(tlb -> labelTokenToPropertyKeys.add(tlb.nodeLabelTokenToPropertyKeys));
222+
// Clean up resources held by local builders
223+
this.threadLocalBuilder.close();
224+
225+
return labelTokenToPropertyKeys;
226+
}
227+
228+
private NodeSchema buildNodeSchema(
229+
IdMap idMap,
230+
Collection<NodeLabelTokenToPropertyKeys> localLabelTokenToPropertyKeys,
231+
Map<String, NodeProperty> nodeProperties
232+
) {
233+
234+
// Collect the property schemas from the imported property values.
235+
var propertyKeysToSchema = nodeProperties
223236
.entrySet()
224237
.stream()
225238
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().propertySchema()));
226-
227-
// consider node labels without properties
239+
// Union the label to property key mappings from each import thread.
240+
var globalLabelTokenToPropertyKeys = localLabelTokenToPropertyKeys
241+
.stream()
242+
.reduce(
243+
NodeLabelTokenToPropertyKeys.lazy(),
244+
(left, right) -> NodeLabelTokenToPropertyKeys.union(left, right, propertyKeysToSchema)
245+
);
246+
// Collect node labels without properties from the id map
247+
// as they are not stored in the above union mapping.
228248
var nodeLabels = new HashSet<>(idMap.availableNodeLabels());
229-
// and also node labels with associated properties
230-
nodeLabels.addAll(nodeLabelTokenToPropertyKeys.nodeLabels());
231-
232-
nodeLabels.forEach(nodeLabel -> {
233-
nodeSchema.addLabel(
234-
nodeLabel,
235-
this.nodeLabelTokenToPropertyKeys.propertySchemas(nodeLabel, importPropertySchemas)
249+
// Add labels that actually have node properties attached.
250+
localLabelTokenToPropertyKeys.forEach(localMapping -> nodeLabels.addAll(localMapping.nodeLabels()));
251+
252+
// Use all labels and the global label to property
253+
// key mapping to construct the final node schema.
254+
return nodeLabels.stream()
255+
.reduce(
256+
NodeSchema.empty(),
257+
(unionSchema, nodeLabel) -> unionSchema.addLabel(
258+
nodeLabel,
259+
globalLabelTokenToPropertyKeys.propertySchemas(nodeLabel, propertyKeysToSchema)
260+
),
261+
(lhs, rhs) -> lhs
236262
);
237-
});
238-
239-
return nodeSchema;
240263
}
241264

242265
private Map<String, NodeProperty> buildProperties(IdMap idMap) {

core/src/test/java/org/neo4j/gds/core/loading/construction/NodeLabelTokenToPropertyKeysTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,5 +199,45 @@ void testNodeLabelsFixed() {
199199

200200
assertThat(mapping.nodeLabels()).containsExactlyInAnyOrder(NodeLabel.ALL_NODES, NodeLabel.of("A"), NodeLabel.of("B"), NodeLabel.of("C"));
201201
}
202+
203+
@Test
204+
void testMerge() {
205+
var importPropertySchemas = Map.of(
206+
"foo", PropertySchema.of("foo", ValueType.LONG, DefaultValue.forLong(), PropertyState.TRANSIENT),
207+
"bar", PropertySchema.of("bar", ValueType.DOUBLE, DefaultValue.forDouble(), PropertyState.PERSISTENT),
208+
"baz", PropertySchema.of("baz", ValueType.LONG_ARRAY, DefaultValue.forLongArray(), PropertyState.PERSISTENT)
209+
);
210+
211+
var aLabel = NodeLabel.of("A");
212+
var bLabel = NodeLabel.of("B");
213+
var cLabel = NodeLabel.of("C");
214+
215+
var mapping0 = NodeLabelTokenToPropertyKeys.lazy();
216+
mapping0.add(NodeLabelTokens.ofNodeLabels(aLabel), List.of("foo"));
217+
mapping0.add(NodeLabelTokens.ofNodeLabels(cLabel), List.of("bar"));
218+
219+
var mapping1 = NodeLabelTokenToPropertyKeys.lazy();
220+
mapping1.add(NodeLabelTokens.ofNodeLabels(bLabel), List.of("bar"));
221+
222+
var mapping2 = NodeLabelTokenToPropertyKeys.lazy();
223+
mapping2.add(NodeLabelTokens.ofNodeLabels(aLabel, cLabel), List.of("baz"));
224+
225+
var union = NodeLabelTokenToPropertyKeys.union(mapping0, mapping1, importPropertySchemas);
226+
union = NodeLabelTokenToPropertyKeys.union(union, mapping2, importPropertySchemas);
227+
228+
assertThat(union.nodeLabels()).containsExactlyInAnyOrder(aLabel, bLabel, cLabel);
229+
230+
assertThat(union.propertySchemas(aLabel, importPropertySchemas)).isEqualTo(Map.of(
231+
"foo", PropertySchema.of("foo", ValueType.LONG, DefaultValue.forLong(), PropertyState.TRANSIENT),
232+
"baz", PropertySchema.of("baz", ValueType.LONG_ARRAY, DefaultValue.forLongArray(), PropertyState.PERSISTENT)
233+
));
234+
assertThat(union.propertySchemas(bLabel, importPropertySchemas)).isEqualTo(Map.of(
235+
"bar", PropertySchema.of("bar", ValueType.DOUBLE, DefaultValue.forDouble(), PropertyState.PERSISTENT)
236+
));
237+
assertThat(union.propertySchemas(cLabel, importPropertySchemas)).isEqualTo(Map.of(
238+
"bar", PropertySchema.of("bar", ValueType.DOUBLE, DefaultValue.forDouble(), PropertyState.PERSISTENT),
239+
"baz", PropertySchema.of("baz", ValueType.LONG_ARRAY, DefaultValue.forLongArray(), PropertyState.PERSISTENT)
240+
));
241+
}
202242
}
203243

0 commit comments

Comments
 (0)