Skip to content

Commit b0b12e5

Browse files
committed
Initialize relationship schema per type
1 parent d35742e commit b0b12e5

File tree

3 files changed

+59
-48
lines changed

3 files changed

+59
-48
lines changed

core/src/main/java/org/neo4j/gds/core/loading/construction/GraphFactory.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,9 +222,10 @@ static RelationshipsBuilder relationshipsBuilder(
222222
Optional<Boolean> validateRelationships,
223223
Optional<Integer> concurrency,
224224
Optional<Boolean> indexInverse,
225-
Optional<ExecutorService> executorService
225+
Optional<ExecutorService> executorService,
226+
Optional<Boolean> loadRelationshipProperties
226227
) {
227-
var loadRelationshipProperties = !propertyConfigs.isEmpty();
228+
var doLoadRelationshipProperties = loadRelationshipProperties.orElse(!propertyConfigs.isEmpty());
228229

229230
var aggregations = propertyConfigs.isEmpty()
230231
? new Aggregation[]{aggregation.orElse(Aggregation.DEFAULT)}
@@ -290,7 +291,7 @@ static RelationshipsBuilder relationshipsBuilder(
290291
.bufferSize(bufferSize)
291292
.propertyConfigs(propertyConfigs)
292293
.isMultiGraph(isMultiGraph)
293-
.loadRelationshipProperty(loadRelationshipProperties)
294+
.loadRelationshipProperty(doLoadRelationshipProperties)
294295
.direction(Direction.fromOrientation(actualOrientation))
295296
.executorService(executorService.orElse(Pools.DEFAULT))
296297
.concurrency(finalConcurrency);

cypher-aggregation/src/main/java/org/neo4j/gds/projection/GraphAggregator.java

Lines changed: 54 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.neo4j.gds.api.schema.ImmutableGraphSchema;
3030
import org.neo4j.gds.api.schema.RelationshipPropertySchema;
3131
import org.neo4j.gds.api.schema.RelationshipSchema;
32+
import org.neo4j.gds.api.schema.RelationshipSchemaEntry;
3233
import org.neo4j.gds.compat.CompatUserAggregator;
3334
import org.neo4j.gds.core.Aggregation;
3435
import org.neo4j.gds.core.ConfigKeyValidation;
@@ -41,6 +42,7 @@
4142
import org.neo4j.gds.core.loading.ReadHelper;
4243
import org.neo4j.gds.core.loading.RelationshipImportResult;
4344
import org.neo4j.gds.core.loading.construction.GraphFactory;
45+
import org.neo4j.gds.core.loading.construction.ImmutablePropertyConfig;
4446
import org.neo4j.gds.core.loading.construction.NodeLabelToken;
4547
import org.neo4j.gds.core.loading.construction.NodeLabelTokens;
4648
import org.neo4j.gds.core.loading.construction.PropertyValues;
@@ -59,6 +61,7 @@
5961

6062
import java.util.ArrayList;
6163
import java.util.Arrays;
64+
import java.util.HashMap;
6265
import java.util.List;
6366
import java.util.Map;
6467
import java.util.Optional;
@@ -331,7 +334,7 @@ private static final class GraphImporter {
331334
private final String graphName;
332335
private final GraphProjectFromCypherAggregationConfig config;
333336
private final LazyIdMapBuilder idMapBuilder;
334-
private final @Nullable List<RelationshipPropertySchema> relationshipPropertySchemas;
337+
// private final @Nullable List<RelationshipPropertySchema> relationshipPropertySchemas;
335338

336339
private final boolean canWriteToDatabase;
337340
private final ExtractNodeId extractNodeId;
@@ -350,7 +353,7 @@ private GraphImporter(
350353
this.graphName = graphName;
351354
this.config = config;
352355
this.idMapBuilder = idMapBuilder;
353-
this.relationshipPropertySchemas = relationshipPropertySchemas;
356+
// this.relationshipPropertySchemas = relationshipPropertySchemas;
354357
this.canWriteToDatabase = canWriteToDatabase;
355358
this.lock = lock;
356359
this.relImporters = new ConcurrentHashMap<>();
@@ -469,23 +472,34 @@ void update(
469472
var intermediateSourceId = loadNode(sourceNode, sourceNodeLabels, sourceNodePropertyValues);
470473

471474
if (targetNode != NoValue.NO_VALUE) {
472-
var relImporter = this.relImporters.computeIfAbsent(relationshipType, this::newRelImporter);
475+
RelationshipsBuilder relImporter;
476+
// we do the check before to avoid having to create a new lambda instance on every call
477+
if (this.relImporters.containsKey(relationshipType)) {
478+
relImporter = this.relImporters.get(relationshipType);
479+
} else {
480+
var finalRelationshipProperties = relationshipProperties;
481+
relImporter = this.relImporters.computeIfAbsent(
482+
relationshipType,
483+
type -> newRelImporter(type, finalRelationshipProperties)
484+
);
485+
}
486+
473487
var intermediateTargetId = loadNode(targetNode, targetNodeLabels, targetNodePropertyValues);
474488

475-
if (this.relationshipPropertySchemas != null) {
476-
assert relationshipProperties != null;
477-
if (this.relationshipPropertySchemas.size() == 1) {
478-
var relationshipProperty = this.relationshipPropertySchemas.get(0).key();
479-
double propertyValue = loadOneRelationshipProperty(
480-
relationshipProperties,
481-
relationshipProperty
482-
);
483-
relImporter.addFromInternal(intermediateSourceId, intermediateTargetId, propertyValue);
489+
if (relationshipProperties != null) {
490+
if (relationshipProperties.size() == 1) {
491+
relationshipProperties.foreach((key, value) -> {
492+
var property = ReadHelper.extractValue(value, DefaultValue.DOUBLE_DEFAULT_FALLBACK);
493+
relImporter.addFromInternal(intermediateSourceId, intermediateTargetId, property);
494+
});
484495
} else {
485-
var propertyValues = loadMultipleRelationshipProperties(
486-
relationshipProperties,
487-
this.relationshipPropertySchemas
488-
);
496+
var propertyValues = new double[relationshipProperties.size()];
497+
int[] index = {0};
498+
relationshipProperties.foreach((key, value) -> {
499+
var property = ReadHelper.extractValue(value, DefaultValue.DOUBLE_DEFAULT_FALLBACK);
500+
var i = index[0]++;
501+
propertyValues[i] = property;
502+
});
489503
relImporter.addFromInternal(intermediateSourceId, intermediateTargetId, propertyValues);
490504
}
491505
} else {
@@ -528,8 +542,7 @@ AggregationResult result(String username, DatabaseId databaseId, ProgressTimer t
528542
.build();
529543
}
530544

531-
private RelationshipsBuilder newRelImporter(RelationshipType relType) {
532-
545+
private RelationshipsBuilder newRelImporter(RelationshipType relType, @Nullable MapValue properties) {
533546
var undirectedTypes = this.config.undirectedRelationshipTypes();
534547
var orientation = undirectedTypes.contains(relType.name) || undirectedTypes.contains("*")
535548
? UNDIRECTED
@@ -546,27 +559,12 @@ private RelationshipsBuilder newRelImporter(RelationshipType relType) {
546559
.indexInverse(indexInverse)
547560
.concurrency(this.config.readConcurrency());
548561

549-
// There is a potential race between initializing the relationships builder and the
550-
// relationship property schemas. Both happen under lock, but under different ones.
551-
// Relationship builders are initialized as part of computeIfAbsent which uses the
552-
// lock inside ConcurrentHashMap, while `this.relationshipPropertySchemas` is initialized
553-
// using the lock in this class.
554-
//
555-
// We have to ensure that the property schemas field is fully initialized, before we
556-
// create the relationships builder. This can only be achieved by using the same lock
557-
// for both actions. This should not affect performance, as we are doing this inside of
558-
// computeIfAbsent which is only called once.
559-
this.lock.lock();
560-
try {
561-
if (this.relationshipPropertySchemas != null) {
562-
for (var relationshipPropertySchema : this.relationshipPropertySchemas) {
563-
relationshipsBuilderBuilder.addPropertyConfig(
564-
GraphFactory.PropertyConfig.of(relationshipPropertySchema.key())
565-
);
566-
}
562+
if (properties != null) {
563+
for (String propertyKey : properties.keySet()) {
564+
relationshipsBuilderBuilder.addPropertyConfig(
565+
ImmutablePropertyConfig.builder().propertyKey(propertyKey).build()
566+
);
567567
}
568-
} finally {
569-
this.lock.unlock();
570568
}
571569

572570
return relationshipsBuilderBuilder.build();
@@ -643,20 +641,27 @@ private void buildRelationshipsWithProperties(
643641
AdjacencyCompressor.ValueMapper valueMapper
644642
) {
645643
var relationshipImportResultBuilder = RelationshipImportResult.builder();
646-
var relationshipSchemas = new ArrayList<RelationshipSchema>();
644+
var relationshipSchemas = new HashMap<RelationshipType, RelationshipSchemaEntry>();
647645

648646
this.relImporters.forEach((relationshipType, relImporter) -> {
649647
var relationships = relImporter.build(
650648
Optional.of(valueMapper),
651649
Optional.empty()
652650
);
653-
relationshipSchemas.add(relationships.relationshipSchema(relationshipType));
651+
var schema = relationships.relationshipSchema(relationshipType);
652+
relationshipSchemas.put(relationshipType, schema.get(relationshipType));
653+
// schema
654+
// .entries()
655+
// .forEach(entry -> relationshipSchemas.merge(
656+
// relationshipType,
657+
// entry,
658+
// RelationshipSchemaEntry::union
659+
// ));
660+
654661
relationshipImportResultBuilder.putImportResult(relationshipType, relationships);
655662
});
656663

657-
var relationshipSchema = relationshipSchemas
658-
.stream()
659-
.reduce(RelationshipSchema.empty(), RelationshipSchema::union);
664+
var relationshipSchema = new RelationshipSchema(relationshipSchemas);
660665

661666
graphStoreBuilder.relationshipImportResult(relationshipImportResultBuilder.build());
662667
this.graphSchemaBuilder.relationshipSchema(relationshipSchema);
@@ -674,8 +679,13 @@ static MapValue propertiesConfig(
674679
var nodeProperties = propertiesConfig.get(propertyKey);
675680

676681
if (nodeProperties instanceof MapValue) {
677-
return (MapValue) nodeProperties;
682+
var mapProperties = (MapValue) nodeProperties;
683+
if (mapProperties.isEmpty()) {
684+
return null;
685+
}
686+
return mapProperties;
678687
}
688+
679689
if (nodeProperties == NoValue.NO_VALUE) {
680690
return null;
681691
}

graph-schema-api/src/main/java/org/neo4j/gds/api/schema/RelationshipSchemaEntry.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ boolean isUndirected() {
6565
}
6666

6767
@Override
68-
RelationshipSchemaEntry union(RelationshipSchemaEntry other) {
68+
public RelationshipSchemaEntry union(RelationshipSchemaEntry other) {
6969
if (!other.identifier().equals(this.identifier())) {
7070
throw new UnsupportedOperationException(
7171
formatWithLocale(

0 commit comments

Comments
 (0)