Skip to content

Commit a68bf59

Browse files
committed
Cleanup on CypherAggregation::close in failure case
1 parent 93c9a08 commit a68bf59

File tree

5 files changed

+23
-12
lines changed

5 files changed

+23
-12
lines changed

cypher-aggregation/src/integrationTest/java/org/neo4j/gds/projection/ProductGraphAggregatorTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ private static Stream<Arguments> emptyGraphNames() {
124124
}
125125

126126
@Test
127-
void shouldFailTaskOnFailure() {
127+
void shouldFailTaskOnFailure() throws Exception {
128128
TestTaskStore taskStore = new TestTaskStore();
129129
var aggregator = new ProductGraphAggregator(
130130
DatabaseId.random(),
@@ -149,6 +149,9 @@ void shouldFailTaskOnFailure() {
149149
.hasCauseInstanceOf(IllegalArgumentException.class)
150150
.hasMessageContaining("The node has to be either a NODE or an INTEGER, but got String");
151151

152+
// assuming this gets called by Neo4j
153+
aggregator.close();
154+
152155
assertThat(taskStore.query())
153156
.map(i -> i.task().status())
154157
.containsExactly(Status.FAILED);

cypher-aggregation/src/main/java/org/neo4j/gds/projection/AlphaGraphAggregator.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ public void update(AnyValue[] input) throws ProcedureException {
7272
NoValue.NO_VALUE
7373
);
7474
} catch (Exception e) {
75-
super.onFailure();
7675
throw new ProcedureException(
7776
Status.Procedure.ProcedureCallFailed,
7877
e,

cypher-aggregation/src/main/java/org/neo4j/gds/projection/CypherAggregation.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public UserAggregationReducer createReducer(Context ctx) throws ProcedureExcepti
9898

9999
var queryEstimator = QueryEstimator.fromTransaction(DatabaseTransactionContext.of(databaseService, transaction));
100100

101-
return new ProductGraphAggregator(
101+
ProductGraphAggregator productGraphAggregator = new ProductGraphAggregator(
102102
DatabaseId.of(databaseService.databaseName()),
103103
username,
104104
writeMode,
@@ -108,5 +108,9 @@ public UserAggregationReducer createReducer(Context ctx) throws ProcedureExcepti
108108
taskStore,
109109
new LogAdapter(log)
110110
);
111+
112+
ctx.internalTransaction().registerCloseableResource(productGraphAggregator);
113+
114+
return productGraphAggregator;
111115
}
112116
}

cypher-aggregation/src/main/java/org/neo4j/gds/projection/GraphAggregator.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@
7070
import static org.neo4j.gds.projection.GraphImporter.NO_TARGET_NODE;
7171
import static org.neo4j.gds.utils.StringFormatting.formatWithLocale;
7272

73-
abstract class GraphAggregator implements UserAggregationReducer, UserAggregationUpdater {
73+
abstract class GraphAggregator implements UserAggregationReducer, UserAggregationUpdater, AutoCloseable {
7474

7575
static final String SOURCE_NODE_PROPERTIES = "sourceNodeProperties";
7676
static final String SOURCE_NODE_LABELS = "sourceNodeLabels";
@@ -95,6 +95,7 @@ abstract class GraphAggregator implements UserAggregationReducer, UserAggregatio
9595
// Used for initializing the data and rel importers
9696
private final Lock lock;
9797
private final ExtractNodeId extractNodeId;
98+
private final AtomicBoolean completedSuccessfully;
9899
private volatile @Nullable GraphImporter importer;
99100

100101
// #result() may be called twice, we cache the result of the first call to return it again in the second invocation
@@ -123,6 +124,7 @@ abstract class GraphAggregator implements UserAggregationReducer, UserAggregatio
123124
this.lock = new ReentrantLock();
124125
this.configValidator = new ConfigValidator();
125126
this.extractNodeId = new ExtractNodeId();
127+
this.completedSuccessfully = new AtomicBoolean(false);
126128
}
127129

128130
void projectNextRelationship(
@@ -284,7 +286,6 @@ public AnyValue result() throws ProcedureException {
284286
projectionMetric.start();
285287
result = buildGraph();
286288
} catch (Exception e) {
287-
this.onFailure();
288289
projectionMetric.failed(e);
289290
throw new ProcedureException(
290291
Status.Procedure.ProcedureCallFailed,
@@ -306,13 +307,11 @@ public AnyValue result() throws ProcedureException {
306307
builder.add("projectMillis", Values.longValue(result.projectMillis()));
307308
builder.add("configuration", ValueUtils.asAnyValue(result.configuration()));
308309
builder.add("query", ValueUtils.asAnyValue(result.query()));
309-
return builder.build();
310-
}
310+
MapValue projectResult = builder.build();
311311

312-
void onFailure() {
313-
if (progressTracker != null) {
314-
this.progressTracker.endSubTaskWithFailure();
315-
}
312+
this.completedSuccessfully.set(true);
313+
314+
return projectResult;
316315
}
317316

318317
public @Nullable AggregationResult buildGraph() {
@@ -390,6 +389,13 @@ private static RelationshipType typeConfig(
390389
);
391390
}
392391

392+
@Override
393+
public void close() throws Exception {
394+
if (!completedSuccessfully.get() && progressTracker != null) {
395+
this.progressTracker.endSubTaskWithFailure();
396+
}
397+
}
398+
393399
public static final class ConfigValidator {
394400
private static final Set<String> DATA_CONFIG_KEYS = Set.of(
395401
SOURCE_NODE_PROPERTIES,

cypher-aggregation/src/main/java/org/neo4j/gds/projection/ProductGraphAggregator.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ public void update(AnyValue[] input) throws ProcedureException {
5858
input[5]
5959
);
6060
} catch (Exception e) {
61-
super.onFailure();
6261
throw new ProcedureException(
6362
Status.Procedure.ProcedureCallFailed,
6463
e,

0 commit comments

Comments
 (0)