Skip to content

Commit dfe3d2e

Browse files
DarthMaxMats-SX
authored andcommitted
Enable jobId based write back
1 parent 26cc360 commit dfe3d2e

File tree

1 file changed

+6
-61
lines changed

1 file changed

+6
-61
lines changed

graphdatascience/query_runner/aura_db_query_runner.py

Lines changed: 6 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import time
22
from typing import Any, Dict, List, Optional
3+
from uuid import uuid4
34

45
from pandas import DataFrame
56

@@ -120,20 +121,22 @@ def _remote_write_back(
120121
params["config"] = {}
121122

122123
# we pop these out so that they are not retained for the GDS proc call
123-
db_write_config = params["config"].pop("writeConfiguration", {}) # type: ignore
124124
db_arrow_config = params["config"].pop("arrowConfiguration", {}) # type: ignore
125-
self._inject_write_config(endpoint, params, db_write_config)
126125
self._inject_arrow_config(db_arrow_config)
127126

127+
job_id = params["config"]["jobId"] if "jobId" in params["config"] else str(uuid4())
128+
params["config"]["jobId"] = job_id
129+
128130
params["config"]["writeToResultStore"] = True # type: ignore
131+
129132
gds_write_result = self._gds_query_runner.call_procedure(
130133
endpoint, params, yields, database, logging, custom_error
131134
)
132135

133136
db_write_proc_params = {
134137
"graphName": params["graph_name"],
135138
"databaseName": self._gds_query_runner.database(),
136-
"writeConfiguration": db_write_config,
139+
"writeConfiguration": {"jobId": job_id},
137140
"arrowConfiguration": db_arrow_config,
138141
}
139142

@@ -165,61 +168,3 @@ def _inject_arrow_config(self, params: Dict[str, Any]) -> None:
165168
params["port"] = port
166169
params["token"] = token
167170
params["encrypted"] = self._encrypted
168-
169-
@staticmethod
170-
def _inject_write_config(proc_name: str, proc_params: Dict[str, Any], write_config: Dict[str, Any]) -> None:
171-
config = proc_params.get("config", {})
172-
173-
if "writeConcurrency" in config:
174-
write_config["concurrency"] = config["writeConcurrency"]
175-
elif "concurrency" in config:
176-
write_config["concurrency"] = config["concurrency"]
177-
178-
if "gds.shortestPath" in proc_name or "gds.allShortestPaths" in proc_name:
179-
write_config["relationshipType"] = config["writeRelationshipType"]
180-
181-
write_node_ids = config.get("writeNodeIds")
182-
write_costs = config.get("writeCosts")
183-
184-
if write_node_ids and write_costs:
185-
write_config["relationshipProperties"] = ["totalCost", "nodeIds", "costs"]
186-
elif write_node_ids:
187-
write_config["relationshipProperties"] = ["totalCost", "nodeIds"]
188-
elif write_costs:
189-
write_config["relationshipProperties"] = ["totalCost", "costs"]
190-
else:
191-
write_config["relationshipProperties"] = ["totalCost"]
192-
193-
elif "gds.graph." in proc_name:
194-
if "gds.graph.nodeProperties.write" == proc_name:
195-
properties = proc_params["properties"]
196-
write_config["nodeProperties"] = properties if isinstance(properties, list) else [properties]
197-
write_config["nodeLabels"] = proc_params["entities"]
198-
199-
elif "gds.graph.nodeLabel.write" == proc_name:
200-
write_config["nodeLabels"] = [proc_params["node_label"]]
201-
202-
elif "gds.graph.relationshipProperties.write" == proc_name:
203-
write_config["relationshipProperties"] = proc_params["relationship_properties"]
204-
write_config["relationshipType"] = proc_params["relationship_type"]
205-
206-
elif "gds.graph.relationship.write" == proc_name:
207-
if "relationship_property" in proc_params and proc_params["relationship_property"] != "":
208-
write_config["relationshipProperties"] = [proc_params["relationship_property"]]
209-
write_config["relationshipType"] = proc_params["relationship_type"]
210-
211-
else:
212-
raise ValueError(f"Unsupported procedure name: {proc_name}")
213-
214-
else:
215-
if "writeRelationshipType" in config:
216-
write_config["relationshipType"] = config["writeRelationshipType"]
217-
if "writeProperty" in config:
218-
write_config["relationshipProperties"] = [config["writeProperty"]]
219-
else:
220-
if "writeProperty" in config:
221-
write_config["nodeProperties"] = [config["writeProperty"]]
222-
if "nodeLabels" in proc_params:
223-
write_config["nodeLabels"] = proc_params["nodeLabels"]
224-
else:
225-
write_config["nodeLabels"] = ["*"]

0 commit comments

Comments
 (0)