Skip to content

Commit aa2b808

Browse files
committed
Fix Cypher projections under parallel runtime
1 parent e2c5f5d commit aa2b808

File tree

7 files changed

+39
-16
lines changed

7 files changed

+39
-16
lines changed

compatibility/4.4/neo4j-kernel-adapter/src/main/java/org/neo4j/gds/compat/_44/Neo4jProxyImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -836,6 +836,11 @@ public long transactionId(KernelTransactionHandle kernelTransactionHandle) {
836836
return kernelTransactionHandle.lastTransactionTimestampWhenStarted();
837837
}
838838

839+
@Override
840+
public long transactionId(KernelTransaction kernelTransaction) {
841+
return kernelTransaction.lastTransactionTimestampWhenStarted();
842+
}
843+
839844
@Override
840845
public void reserveNeo4jIds(IdGeneratorFactory generatorFactory, int size, CursorContext cursorContext) {
841846
IdGenerator idGenerator = generatorFactory.get(RecordIdType.NODE);

compatibility/5-common/neo4j-kernel-adapter/src/main/java17/org/neo4j/gds/compat/_5x/CommonNeo4jProxyImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -784,6 +784,11 @@ public long transactionId(KernelTransactionHandle kernelTransactionHandle) {
784784
return kernelTransactionHandle.getTransactionSequenceNumber();
785785
}
786786

787+
@Override
788+
public long transactionId(KernelTransaction kernelTransaction) {
789+
return kernelTransaction.getTransactionSequenceNumber();
790+
}
791+
787792
@Override
788793
public void reserveNeo4jIds(IdGeneratorFactory generatorFactory, int size, CursorContext cursorContext) {
789794
IdGenerator idGenerator = generatorFactory.get(RecordIdType.NODE);

compatibility/api/neo4j-kernel-adapter/src/main/java/org/neo4j/gds/compat/Neo4jProxyApi.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,8 @@ UserFunctionSignature userFunctionSignature(
322322

323323
long transactionId(KernelTransactionHandle kernelTransactionHandle);
324324

325+
long transactionId(KernelTransaction kernelTransaction);
326+
325327
void reserveNeo4jIds(IdGeneratorFactory generatorFactory, int size, CursorContext cursorContext);
326328

327329
TransactionalContext newQueryContext(

compatibility/common/neo4j-kernel-adapter/src/main/java/org/neo4j/gds/compat/Neo4jProxy.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,10 @@ public static long transactionId(KernelTransactionHandle kernelTransactionHandle
440440
return IMPL.transactionId(kernelTransactionHandle);
441441
}
442442

443+
public static long transactionId(KernelTransaction kernelTransaction) {
444+
return IMPL.transactionId(kernelTransaction);
445+
}
446+
443447
public static void reserveNeo4jIds(IdGeneratorFactory generatorFactory, int size, CursorContext cursorContext) {
444448
IMPL.reserveNeo4jIds(generatorFactory, size, cursorContext);
445449
}

cypher-aggregation/src/main/java/org/neo4j/gds/projection/AlphaCypherAggregation.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.neo4j.gds.api.DatabaseId;
2424
import org.neo4j.gds.compat.CompatUserAggregationFunction;
2525
import org.neo4j.gds.compat.CompatUserAggregator;
26+
import org.neo4j.gds.compat.GraphDatabaseApiProxy;
2627
import org.neo4j.gds.compat.Neo4jProxy;
2728
import org.neo4j.gds.core.Username;
2829
import org.neo4j.gds.core.loading.Capabilities.WriteMode;
@@ -35,6 +36,7 @@
3536
import org.neo4j.internal.kernel.api.procs.QualifiedName;
3637
import org.neo4j.internal.kernel.api.procs.UserFunctionSignature;
3738
import org.neo4j.kernel.api.procedure.Context;
39+
import org.neo4j.kernel.impl.api.KernelTransactions;
3840
import org.neo4j.procedure.Name;
3941
import org.neo4j.values.AnyValue;
4042
import org.neo4j.values.storable.TextValue;
@@ -108,7 +110,8 @@ public CompatUserAggregator create(Context ctx) throws ProcedureException {
108110
var metricsFacade = Neo4jProxy.lookupComponentProvider(ctx, MetricsFacade.class, true);
109111
var username = Neo4jProxy.lookupComponentProvider(ctx, Username.class, true);
110112
var transaction = Neo4jProxy.lookupComponentProvider(ctx, Transaction.class, true);
111-
var queryProvider = ExecutingQueryProvider.fromTransaction(transaction);
113+
var ktxs = GraphDatabaseApiProxy.resolveDependency(databaseService, KernelTransactions.class);
114+
var queryProvider = ExecutingQueryProvider.fromTransaction(ktxs, transaction);
112115

113116
var runsOnCompositeDatabase = Neo4jProxy.isCompositeDatabase(databaseService);
114117
var writeMode = runsOnCompositeDatabase

cypher-aggregation/src/main/java/org/neo4j/gds/projection/CypherAggregation.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.neo4j.gds.api.DatabaseId;
2424
import org.neo4j.gds.compat.CompatUserAggregationFunction;
2525
import org.neo4j.gds.compat.CompatUserAggregator;
26+
import org.neo4j.gds.compat.GraphDatabaseApiProxy;
2627
import org.neo4j.gds.compat.Neo4jProxy;
2728
import org.neo4j.gds.core.Username;
2829
import org.neo4j.gds.core.loading.Capabilities.WriteMode;
@@ -35,6 +36,7 @@
3536
import org.neo4j.internal.kernel.api.procs.QualifiedName;
3637
import org.neo4j.internal.kernel.api.procs.UserFunctionSignature;
3738
import org.neo4j.kernel.api.procedure.Context;
39+
import org.neo4j.kernel.impl.api.KernelTransactions;
3840
import org.neo4j.procedure.Name;
3941
import org.neo4j.values.AnyValue;
4042
import org.neo4j.values.storable.TextValue;
@@ -107,7 +109,8 @@ public CompatUserAggregator create(Context ctx) throws ProcedureException {
107109
var metricsFacade = Neo4jProxy.lookupComponentProvider(ctx, MetricsFacade.class, true);
108110
var username = Neo4jProxy.lookupComponentProvider(ctx, Username.class, true);
109111
var transaction = Neo4jProxy.lookupComponentProvider(ctx, Transaction.class, true);
110-
var queryProvider = ExecutingQueryProvider.fromTransaction(transaction);
112+
var ktxs = GraphDatabaseApiProxy.resolveDependency(databaseService, KernelTransactions.class);
113+
var queryProvider = ExecutingQueryProvider.fromTransaction(ktxs, transaction);
111114

112115
var runsOnCompositeDatabase = Neo4jProxy.isCompositeDatabase(databaseService);
113116
var writeMode = runsOnCompositeDatabase

cypher-aggregation/src/main/java/org/neo4j/gds/projection/ExecutingQueryProvider.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,18 @@
1919
*/
2020
package org.neo4j.gds.projection;
2121

22+
import org.neo4j.gds.compat.Neo4jProxy;
2223
import org.neo4j.graphdb.Transaction;
23-
import org.neo4j.kernel.impl.api.KernelStatement;
24+
import org.neo4j.kernel.impl.api.KernelTransactions;
2425
import org.neo4j.kernel.impl.coreapi.InternalTransaction;
2526

2627
import java.util.Optional;
2728

2829
interface ExecutingQueryProvider {
2930
Optional<String> executingQuery();
3031

31-
static ExecutingQueryProvider fromTransaction(Transaction transaction) {
32-
return new TxQuery(transaction);
32+
static ExecutingQueryProvider fromTransaction(KernelTransactions ktxs, Transaction transaction) {
33+
return new TxQuery(ktxs, transaction);
3334
}
3435

3536
static ExecutingQueryProvider empty() {
@@ -39,9 +40,11 @@ static ExecutingQueryProvider empty() {
3940

4041

4142
final class TxQuery implements ExecutingQueryProvider {
43+
private final KernelTransactions ktxs;
4244
private final Transaction transaction;
4345

44-
TxQuery(Transaction transaction) {
46+
TxQuery(KernelTransactions ktxs, Transaction transaction) {
47+
this.ktxs = ktxs;
4548
this.transaction = transaction;
4649
}
4750

@@ -51,15 +54,13 @@ public Optional<String> executingQuery() {
5154
return Optional.empty();
5255
}
5356

54-
try (var statement = ((InternalTransaction) this.transaction).kernelTransaction().acquireStatement()) {
55-
if (!(statement instanceof KernelStatement)) {
56-
return Optional.empty();
57-
}
58-
59-
return ((KernelStatement) statement).queryRegistry().executingQuery().flatMap(eq ->
60-
eq.snapshot()
61-
.obfuscatedQueryText()
62-
.or(() -> Optional.ofNullable(eq.rawQueryText())));
63-
}
57+
var txId = Neo4jProxy.transactionId(((InternalTransaction) this.transaction).kernelTransaction());
58+
return this.ktxs.activeTransactions().stream()
59+
.filter(handle -> Neo4jProxy.transactionId(handle) == txId)
60+
.flatMap(handle -> handle.executingQuery().stream())
61+
.flatMap(eq -> eq.snapshot()
62+
.obfuscatedQueryText()
63+
.or(() -> Optional.ofNullable(eq.rawQueryText())).stream())
64+
.findAny();
6465
}
6566
}

0 commit comments

Comments
 (0)