Skip to content

Commit 56d2ea2

Browse files
s1cksoerenreichardt
andcommitted
Make property schema building thread local
Co-authored-by: Sören Reichardt <soren.reichardt@neotechnology.com>
1 parent 499f54f commit 56d2ea2

File tree

4 files changed

+75
-15
lines changed

4 files changed

+75
-15
lines changed

core/src/main/java/org/neo4j/gds/core/loading/construction/GraphFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ static NodesBuilder nodesBuilder(
121121
maxOriginalId,
122122
threadCount,
123123
TokenToNodeLabel.lazy(),
124-
NodeLabelTokenToPropertyKeys.lazy(),
124+
NodeLabelTokenToPropertyKeys::lazy,
125125
new ConcurrentHashMap<>(),
126126
idMapBuilder,
127127
labelInformation,
@@ -162,7 +162,7 @@ private static NodesBuilder fromSchema(
162162
maxOriginalId,
163163
concurrency,
164164
TokenToNodeLabel.fixed(elementIdentifierLabelTokenMapping, labelTokenNodeLabelMapping),
165-
NodeLabelTokenToPropertyKeys.fixed(nodeSchema),
165+
() -> NodeLabelTokenToPropertyKeys.fixed(nodeSchema),
166166
new ConcurrentHashMap<>(propertyBuildersByPropertyKey),
167167
idMapBuilder,
168168
hasLabelInformation,

core/src/main/java/org/neo4j/gds/core/loading/construction/NodeLabelTokenToPropertyKeys.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@
2424
import org.neo4j.gds.api.schema.PropertySchema;
2525
import org.neo4j.gds.utils.StringJoining;
2626

27+
import java.util.HashMap;
2728
import java.util.HashSet;
2829
import java.util.Map;
2930
import java.util.Set;
30-
import java.util.concurrent.ConcurrentHashMap;
3131
import java.util.stream.Collectors;
3232

3333
abstract class NodeLabelTokenToPropertyKeys {
@@ -70,6 +70,25 @@ abstract Map<String, PropertySchema> propertySchemas(
7070
Map<String, PropertySchema> importPropertySchemas
7171
);
7272

73+
static NodeLabelTokenToPropertyKeys merge(
74+
NodeLabelTokenToPropertyKeys left,
75+
NodeLabelTokenToPropertyKeys right,
76+
Map<String, PropertySchema> importPropertySchemas
77+
) {
78+
var merge = NodeLabelTokenToPropertyKeys.lazy();
79+
80+
left.nodeLabels().forEach(nodeLabel -> {
81+
var propertyKeys = left.propertySchemas(nodeLabel, importPropertySchemas).keySet();
82+
merge.add(NodeLabelTokens.ofNodeLabel(nodeLabel), propertyKeys);
83+
});
84+
right.nodeLabels().forEach(nodeLabel -> {
85+
var propertyKeys = right.propertySchemas(nodeLabel, importPropertySchemas).keySet();
86+
merge.add(NodeLabelTokens.ofNodeLabel(nodeLabel), propertyKeys);
87+
});
88+
89+
return merge;
90+
}
91+
7392
private static class Fixed extends NodeLabelTokenToPropertyKeys {
7493

7594
private final NodeSchema nodeSchema;
@@ -136,10 +155,10 @@ Map<String, PropertySchema> propertySchemas(
136155

137156
private static class Lazy extends NodeLabelTokenToPropertyKeys {
138157

139-
private final ConcurrentHashMap<NodeLabelToken, Set<String>> labelToPropertyKeys;
158+
private final Map<NodeLabelToken, Set<String>> labelToPropertyKeys;
140159

141160
Lazy() {
142-
this.labelToPropertyKeys = new ConcurrentHashMap<>();
161+
this.labelToPropertyKeys = new HashMap<>();
143162
}
144163

145164
@Override

core/src/main/java/org/neo4j/gds/core/loading/construction/NodesBuilder.java

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.neo4j.values.virtual.MapValue;
5151

5252
import java.util.ArrayList;
53+
import java.util.Collection;
5354
import java.util.HashSet;
5455
import java.util.List;
5556
import java.util.Map;
@@ -58,6 +59,7 @@
5859
import java.util.concurrent.atomic.LongAdder;
5960
import java.util.function.Function;
6061
import java.util.function.LongPredicate;
62+
import java.util.function.Supplier;
6163
import java.util.stream.Collectors;
6264

6365
import static java.util.stream.Collectors.toMap;
@@ -81,13 +83,11 @@ public final class NodesBuilder {
8183

8284
private final ConcurrentMap<String, NodePropertiesFromStoreBuilder> propertyBuildersByPropertyKey;
8385

84-
private final NodeLabelTokenToPropertyKeys nodeLabelTokenToPropertyKeys;
85-
8686
NodesBuilder(
8787
long maxOriginalId,
8888
int concurrency,
8989
TokenToNodeLabel tokenToNodeLabel,
90-
NodeLabelTokenToPropertyKeys nodeLabelTokenToPropertyKeys,
90+
Supplier<NodeLabelTokenToPropertyKeys> nodeLabelTokenToPropertyKeysSupplier,
9191
ConcurrentMap<String, NodePropertiesFromStoreBuilder> propertyBuildersByPropertyKey,
9292
IdMapBuilder idMapBuilder,
9393
boolean hasLabelInformation,
@@ -97,7 +97,6 @@ public final class NodesBuilder {
9797
) {
9898
this.maxOriginalId = maxOriginalId;
9999
this.concurrency = concurrency;
100-
this.nodeLabelTokenToPropertyKeys = nodeLabelTokenToPropertyKeys;
101100
this.idMapBuilder = idMapBuilder;
102101
this.propertyStates = propertyStates;
103102
this.labelInformationBuilder = !hasLabelInformation
@@ -128,7 +127,7 @@ public final class NodesBuilder {
128127
hasLabelInformation,
129128
hasProperties,
130129
tokenToNodeLabel,
131-
nodeLabelTokenToPropertyKeys,
130+
nodeLabelTokenToPropertyKeysSupplier.get(),
132131
propertyBuilderFn
133132
)
134133
);
@@ -202,12 +201,15 @@ public Nodes build() {
202201
public Nodes build(long highestNeoId) {
203202
// Flush remaining buffer contents
204203
this.threadLocalBuilder.forEach(ThreadLocalBuilder::flush);
204+
// Collect token to property keys for final merge
205+
var nodeLabelTokenToPropertyKeysList = new ArrayList<NodeLabelTokenToPropertyKeys>();
206+
this.threadLocalBuilder.forEach(tlb -> nodeLabelTokenToPropertyKeysList.add(tlb.nodeLabelTokenToPropertyKeys));
205207
// Clean up resources held by local builders
206208
this.threadLocalBuilder.close();
207209

208210
var idMap = this.idMapBuilder.build(labelInformationBuilder, highestNeoId, concurrency);
209211
var nodeProperties = buildProperties(idMap);
210-
var nodeSchema = buildNodeSchema(idMap, nodeProperties);
212+
var nodeSchema = buildNodeSchema(idMap, nodeLabelTokenToPropertyKeysList, nodeProperties);
211213
var nodePropertyStore = NodePropertyStore.builder().properties(nodeProperties).build();
212214

213215
return ImmutableNodes.builder()
@@ -217,7 +219,11 @@ public Nodes build(long highestNeoId) {
217219
.build();
218220
}
219221

220-
private NodeSchema buildNodeSchema(IdMap idMap, Map<String, NodeProperty> nodeProperties) {
222+
private NodeSchema buildNodeSchema(
223+
IdMap idMap,
224+
Collection<NodeLabelTokenToPropertyKeys> localNodeLabelTokenToPropertyKeys,
225+
Map<String, NodeProperty> nodeProperties
226+
) {
221227
var nodeSchema = NodeSchema.empty();
222228
var importPropertySchemas = nodeProperties
223229
.entrySet()
@@ -227,12 +233,19 @@ private NodeSchema buildNodeSchema(IdMap idMap, Map<String, NodeProperty> nodePr
227233
// consider node labels without properties
228234
var nodeLabels = new HashSet<>(idMap.availableNodeLabels());
229235
// and also node labels with associated properties
230-
nodeLabels.addAll(nodeLabelTokenToPropertyKeys.nodeLabels());
231-
236+
localNodeLabelTokenToPropertyKeys.forEach(mapping -> nodeLabels.addAll(mapping.nodeLabels()));
237+
// merge into a global mapping
238+
var globalNodeLabelTokenToPropertyKeys = localNodeLabelTokenToPropertyKeys
239+
.stream()
240+
.reduce(
241+
NodeLabelTokenToPropertyKeys.lazy(),
242+
(left, right) -> NodeLabelTokenToPropertyKeys.merge(left, right, importPropertySchemas)
243+
);
244+
// and construct final node property schema
232245
nodeLabels.forEach(nodeLabel -> {
233246
nodeSchema.addLabel(
234247
nodeLabel,
235-
this.nodeLabelTokenToPropertyKeys.propertySchemas(nodeLabel, importPropertySchemas)
248+
globalNodeLabelTokenToPropertyKeys.propertySchemas(nodeLabel, importPropertySchemas)
236249
);
237250
});
238251

@@ -331,6 +344,10 @@ private static class ThreadLocalBuilder implements AutoCloseable {
331344
this.batchNodeProperties = new ArrayList<>(buffer.capacity());
332345
}
333346

347+
NodeLabelTokenToPropertyKeys nodeLabelTokenToPropertyKeys() {
348+
return this.nodeLabelTokenToPropertyKeys;
349+
}
350+
334351
public void addNode(long originalId, NodeLabelToken nodeLabels) {
335352
if (!seenNodeIdPredicate.test(originalId)) {
336353
long[] labels = getOrCreateLabelTokens(nodeLabels);

core/src/test/java/org/neo4j/gds/core/loading/construction/NodeLabelTokenToPropertyKeysTest.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,5 +199,29 @@ void testNodeLabelsFixed() {
199199

200200
assertThat(mapping.nodeLabels()).containsExactlyInAnyOrder(NodeLabel.ALL_NODES, NodeLabel.of("A"), NodeLabel.of("B"), NodeLabel.of("C"));
201201
}
202+
203+
@Test
204+
void testMerge() {
205+
var importPropertySchemas = Map.of(
206+
"foo", PropertySchema.of("foo", ValueType.LONG, DefaultValue.forLong(), PropertyState.TRANSIENT),
207+
"bar", PropertySchema.of("bar", ValueType.DOUBLE, DefaultValue.forDouble(), PropertyState.PERSISTENT)
208+
);
209+
210+
var left = NodeLabelTokenToPropertyKeys.lazy();
211+
left.add(NodeLabelTokens.ofStrings("A"), List.of("foo"));
212+
213+
var right = NodeLabelTokenToPropertyKeys.lazy();
214+
right.add(NodeLabelTokens.ofStrings("B"), List.of("bar"));
215+
216+
var mapping = NodeLabelTokenToPropertyKeys.merge(left, right, importPropertySchemas);
217+
218+
assertThat(mapping.nodeLabels()).containsExactlyInAnyOrder(NodeLabel.of("A"), NodeLabel.of("B"));
219+
assertThat(mapping.propertySchemas(NodeLabel.of("A"), importPropertySchemas)).isEqualTo(Map.of(
220+
"foo", PropertySchema.of("foo", ValueType.LONG, DefaultValue.forLong(), PropertyState.TRANSIENT)
221+
));
222+
assertThat(mapping.propertySchemas(NodeLabel.of("B"), importPropertySchemas)).isEqualTo(Map.of(
223+
"bar", PropertySchema.of("bar", ValueType.DOUBLE, DefaultValue.forDouble(), PropertyState.PERSISTENT)
224+
));
225+
}
202226
}
203227

0 commit comments

Comments
 (0)