Skip to content

Commit 6230bf9

Browse files
pontusmelkeDarthMax
authored andcommitted
Consolidate error handling
Handle errors in the same way in NodeSubscriber and RelationshipSubscriber.
1 parent 1873fe3 commit 6230bf9

File tree

2 files changed

+18
-1
lines changed

2 files changed

+18
-1
lines changed

core/src/main/java/org/neo4j/gds/core/loading/CypherRelationshipLoader.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,9 @@ BatchLoadResult loadSingleBatch(InternalTransaction tx, int bufferSize) {
125125
}
126126
subscriber.initialize(subscription.fieldNames(), propertyDefaultValueByName);
127127
CypherLoadingUtils.consume(subscription);
128+
subscriber.error().ifPresent(e -> {
129+
throw e;
130+
});
128131
progressTracker.endSubTask("Relationships");
129132
return new BatchLoadResult(subscriber.rows(), -1L);
130133
}

core/src/main/java/org/neo4j/gds/core/loading/RelationshipSubscriber.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@
2525
import org.neo4j.gds.core.loading.construction.RelationshipsBuilder;
2626
import org.neo4j.gds.core.utils.progress.tasks.ProgressTracker;
2727
import org.neo4j.graphdb.QueryStatistics;
28+
import org.neo4j.kernel.impl.query.QueryExecutionKernelException;
2829
import org.neo4j.kernel.impl.query.QuerySubscriber;
2930
import org.neo4j.values.AnyValue;
3031
import org.neo4j.values.storable.NumberValue;
3132
import org.neo4j.values.storable.TextValue;
3233

34+
import java.util.Optional;
3335
import java.util.Set;
3436

3537
import static org.neo4j.gds.RelationshipType.ALL_RELATIONSHIPS;
@@ -65,6 +67,8 @@ class RelationshipSubscriber implements QuerySubscriber {
6567

6668
private RelationshipsBuilder allRelationshipsBuilder;
6769

70+
private Optional<RuntimeException> error = Optional.empty();
71+
6872
RelationshipSubscriber(
6973
IdMap idMap,
7074
CypherRelationshipLoader.Context loaderContext,
@@ -105,6 +109,10 @@ void initialize(String[] fieldNames, ObjectDoubleMap<String> propertyDefaultValu
105109
this.propertyValueBuffer = new double[propertyCount];
106110
}
107111

112+
Optional<RuntimeException> error() {
113+
return error;
114+
}
115+
108116
public long rows() {
109117
return rows;
110118
}
@@ -183,7 +191,13 @@ public void onRecordCompleted() {
183191
}
184192
@Override
185193
public void onError(Throwable throwable) {
186-
throw new RuntimeException(throwable);
194+
if (throwable instanceof RuntimeException) {
195+
this.error = Optional.of((RuntimeException) throwable);
196+
} else if (throwable instanceof QueryExecutionKernelException) {
197+
this.error = Optional.of(((QueryExecutionKernelException) throwable).asUserException());
198+
} else {
199+
this.error = Optional.of(new RuntimeException(throwable));
200+
}
187201
}
188202

189203
@Override

0 commit comments

Comments
 (0)