2020package org .neo4j .gds .pregel .proc ;
2121
2222import org .neo4j .gds .Algorithm ;
23+ import org .neo4j .gds .RelationshipType ;
24+ import org .neo4j .gds .api .GraphStore ;
2325import org .neo4j .gds .api .properties .nodes .DoubleArrayNodePropertyValues ;
2426import org .neo4j .gds .api .properties .nodes .LongArrayNodePropertyValues ;
2527import org .neo4j .gds .api .properties .nodes .NodePropertyValues ;
28+ import org .neo4j .gds .beta .indexInverse .InverseRelationshipsAlgorithmFactory ;
29+ import org .neo4j .gds .beta .indexInverse .InverseRelationshipsConfigImpl ;
2630import org .neo4j .gds .beta .pregel .PregelConfig ;
2731import org .neo4j .gds .beta .pregel .PregelResult ;
2832import org .neo4j .gds .beta .pregel .PregelSchema ;
2933import org .neo4j .gds .core .utils .paged .HugeObjectArray ;
34+ import org .neo4j .gds .core .utils .progress .TaskRegistryFactory ;
3035import org .neo4j .gds .core .write .ImmutableNodeProperty ;
3136import org .neo4j .gds .core .write .NodeProperty ;
3237import org .neo4j .gds .executor .ComputationResult ;
38+ import org .neo4j .gds .executor .validation .AfterLoadValidation ;
39+ import org .neo4j .gds .executor .validation .ValidationConfiguration ;
40+ import org .neo4j .gds .utils .StringJoining ;
41+ import org .neo4j .logging .Log ;
3342
43+ import java .util .Collection ;
3444import java .util .List ;
45+ import java .util .Locale ;
3546import java .util .stream .Collectors ;
3647
3748import static org .neo4j .gds .utils .StringFormatting .formatWithLocale ;
3849
39- final class PregelBaseProc {
50+ public final class PregelBaseProc {
4051
41- static <ALGO extends Algorithm <PregelResult >, CONFIG extends PregelConfig >
42- List <NodeProperty > nodeProperties (
43- ComputationResult <ALGO , PregelResult , CONFIG > computationResult ,
44- String propertyPrefix
52+ public static <CONFIG extends PregelConfig > ValidationConfiguration <CONFIG > ensureIndexValidation (
53+ Log log , TaskRegistryFactory taskRegistryFactory
54+ ) {
55+ return new ValidationConfiguration <>() {
56+ @ Override
57+ public List <AfterLoadValidation <CONFIG >> afterLoadValidations () {
58+ return List .of (
59+ (graphStore , graphProjectConfig , config ) -> ensureDirectedRelationships (
60+ graphStore , config .internalRelationshipTypes (graphStore )
61+ ),
62+ (graphStore , graphProjectConfig , config ) -> ensureInverseIndexesExist (graphStore ,
63+ config .internalRelationshipTypes (graphStore ),
64+ config .concurrency (),
65+ log ,
66+ taskRegistryFactory
67+ )
68+ );
69+ }
70+ };
71+ }
72+
73+ static void ensureInverseIndexesExist (
74+ GraphStore graphStore ,
75+ Collection <RelationshipType > relationshipTypes ,
76+ int concurrency ,
77+ Log log ,
78+ TaskRegistryFactory taskRegistryFactory
79+ ) {
80+ relationshipTypes
81+ .stream ()
82+ .filter (relType -> !graphStore .inverseIndexedRelationshipTypes ().contains (relType ))
83+ .forEach (relType -> {
84+ var inverseConfig = InverseRelationshipsConfigImpl
85+ .builder ()
86+ .concurrency (concurrency )
87+ .relationshipType (relType .name )
88+ .build ();
89+
90+ var inverseIndex = new InverseRelationshipsAlgorithmFactory ()
91+ .build (graphStore , inverseConfig , log , taskRegistryFactory )
92+ .compute ();
93+
94+ graphStore .addInverseIndex (relType , inverseIndex );
95+ });
96+ }
97+
98+ static void ensureDirectedRelationships (
99+ GraphStore graphStore , Collection <RelationshipType > relationshipTypes
100+ ) {
101+ var relationshipSchema = graphStore .schema ().relationshipSchema ();
102+ var undirectedTypes = relationshipTypes
103+ .stream ()
104+ .filter (relationshipSchema ::isUndirected )
105+ .map (RelationshipType ::name )
106+ .collect (Collectors .toList ());
107+
108+ if (!undirectedTypes .isEmpty ()) {
109+ throw new IllegalArgumentException (String .format (
110+ Locale .US ,
111+ "This algorithm requires a directed graph, but the following configured relationship types are undirected: %s." ,
112+ StringJoining .join (undirectedTypes )
113+ ));
114+ }
115+ }
116+
117+ static <ALGO extends Algorithm <PregelResult >, CONFIG extends PregelConfig > List <NodeProperty > nodeProperties (
118+ ComputationResult <ALGO , PregelResult , CONFIG > computationResult , String propertyPrefix
45119 ) {
46120 var compositeNodeValue = computationResult .result ().nodeValues ();
47121 var schema = compositeNodeValue .schema ();
48122 // TODO change this to generic prefix setting
49123
50- return schema .elements ()
124+ return schema
125+ .elements ()
51126 .stream ()
52127 .filter (element -> element .visibility () == PregelSchema .Visibility .PUBLIC )
53128 .map (element -> {
@@ -62,24 +137,22 @@ List<NodeProperty> nodeProperties(
62137 nodePropertyValues = compositeNodeValue .doubleProperties (propertyKey ).asNodeProperties ();
63138 break ;
64139 case LONG_ARRAY :
65- nodePropertyValues = new HugeObjectArrayLongArrayPropertyValues (
66- compositeNodeValue .longArrayProperties (propertyKey )
67- );
140+ nodePropertyValues = new HugeObjectArrayLongArrayPropertyValues (compositeNodeValue .longArrayProperties (
141+ propertyKey ));
68142 break ;
69143 case DOUBLE_ARRAY :
70- nodePropertyValues = new HugeObjectArrayDoubleArrayPropertyValues (
71- compositeNodeValue .doubleArrayProperties (propertyKey )
72- );
144+ nodePropertyValues = new HugeObjectArrayDoubleArrayPropertyValues (compositeNodeValue .doubleArrayProperties (
145+ propertyKey ));
73146 break ;
74147 default :
75148 throw new IllegalArgumentException ("Unsupported property type: " + element .propertyType ());
76149 }
77150
78- return ImmutableNodeProperty .of (
79- formatWithLocale ("%s%s" , propertyPrefix , propertyKey ),
151+ return ImmutableNodeProperty .of (formatWithLocale ("%s%s" , propertyPrefix , propertyKey ),
80152 nodePropertyValues
81153 );
82- }).collect (Collectors .toList ());
154+ })
155+ .collect (Collectors .toList ());
83156 }
84157
85158 private PregelBaseProc () {}
0 commit comments