Skip to content

Commit b3c2b74

Browse files
committed
Internal responses
1 parent 0a235a0 commit b3c2b74

File tree

8 files changed

+116
-139
lines changed

8 files changed

+116
-139
lines changed

neo4j/__init__.py

Lines changed: 84 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
STATEMENT_TYPE_SCHEMA_WRITE = "s"
9090

9191

92+
# TODO: remove in 2.0
9293
_warned_about_transaction_bookmarks = False
9394

9495

@@ -426,7 +427,10 @@ def run(self, statement, parameters=None, **kwparameters):
426427
if not self.has_transaction():
427428
self._connect()
428429

429-
result = self.__run__(statement, dict(parameters or {}, **kwparameters))
430+
statement = ustr(statement)
431+
parameters = fix_parameters(dict(parameters or {}, **kwparameters), self._connection.protocol_version,
432+
supports_bytes=self._connection.server.supports_bytes())
433+
result = self._run(statement, parameters)
430434

431435
if not self.has_transaction():
432436
try:
@@ -477,25 +481,28 @@ def sync(self):
477481
return detail_count
478482
return 0
479483

480-
def detach(self, result):
484+
def detach(self, result, sync=True):
481485
""" Detach a result from this session by fetching and buffering any
482486
remaining records.
483487
484488
:param result:
489+
:param sync:
485490
:returns: number of records fetched
486491
"""
487492
count = 0
488493

489-
self.send()
490-
fetch = self.fetch
491-
while result.attached():
492-
count += fetch()
494+
if sync and result.attached():
495+
self.send()
496+
fetch = self.fetch
497+
while result.attached():
498+
count += fetch()
493499

494500
if self._last_result is result:
495501
self._last_result = None
496502
if not self.has_transaction():
497503
self._disconnect(sync=False)
498504

505+
result._session = None
499506
return count
500507

501508
def last_bookmark(self):
@@ -530,6 +537,7 @@ def begin_transaction(self, bookmark=None):
530537
if self.has_transaction():
531538
raise TransactionError("Explicit transaction already open")
532539

540+
# TODO: remove in 2.0
533541
if bookmark is not None:
534542
global _warned_about_transaction_bookmarks
535543
if not _warned_about_transaction_bookmarks:
@@ -540,7 +548,7 @@ def begin_transaction(self, bookmark=None):
540548

541549
self._create_transaction()
542550
self._connect()
543-
self.__begin__()
551+
self._begin()
544552
return self._transaction
545553

546554
def commit_transaction(self):
@@ -552,9 +560,7 @@ def commit_transaction(self):
552560
if not self.has_transaction():
553561
raise TransactionError("No transaction to commit")
554562
self._transaction = None
555-
result = self.__commit__()
556-
result.consume()
557-
bookmark = self.__bookmark__(result)
563+
bookmark = self._commit()
558564
self._bookmarks = [bookmark]
559565
return bookmark
560566

@@ -567,11 +573,7 @@ def rollback_transaction(self):
567573
if not self.has_transaction():
568574
raise TransactionError("No transaction to rollback")
569575
self._destroy_transaction()
570-
rollback_result = self.__rollback__()
571-
try:
572-
rollback_result.consume()
573-
except ServiceUnavailable:
574-
pass
576+
self._rollback()
575577

576578
def reset(self):
577579
""" Reset the session.
@@ -593,7 +595,7 @@ def _run_transaction(self, access_mode, unit_of_work, *args, **kwargs):
593595
try:
594596
self._create_transaction()
595597
self._connect(access_mode)
596-
self.__begin__()
598+
self._begin()
597599
tx = self._transaction
598600
try:
599601
result = unit_of_work(tx, *args, **kwargs)
@@ -630,43 +632,61 @@ def read_transaction(self, unit_of_work, *args, **kwargs):
630632
def write_transaction(self, unit_of_work, *args, **kwargs):
631633
return self._run_transaction(WRITE_ACCESS, unit_of_work, *args, **kwargs)
632634

633-
def _run(self, statement, parameters):
634-
from neobolt.bolt.connection import RUN, PULL_ALL
635-
from neobolt.bolt.response import Response
635+
def _assert_open(self):
636636
if self.closed():
637637
raise SessionError("Session closed")
638638

639-
run_response = Response(self._connection)
640-
pull_all_response = Response(self._connection)
641-
self._last_result = result = BoltStatementResult(self, run_response, pull_all_response)
642-
result.statement = ustr(statement)
643-
result.parameters = fix_parameters(parameters, self._connection.protocol_version,
644-
supports_bytes=self._connection.server.supports_bytes())
645-
646-
self._connection.append(RUN, (result.statement, result.parameters), response=run_response)
647-
self._connection.append(PULL_ALL, response=pull_all_response)
648-
639+
def _run(self, statement, parameters):
640+
self._assert_open()
641+
cx = self._connection
642+
hydrant = PackStreamHydrator(cx.protocol_version)
643+
metadata = {
644+
"statement": statement,
645+
"parameters": parameters,
646+
"server": cx.server,
647+
"protocol_version": cx.protocol_version,
648+
}
649+
self._last_result = result = BoltStatementResult(self, hydrant, metadata)
650+
cx.run(statement, parameters, metadata)
651+
cx.pull_all(
652+
metadata,
653+
on_records=lambda records: result._records.extend(
654+
hydrant.hydrate_records(result.keys(), records)),
655+
on_summary=lambda: result.detach(sync=False),
656+
)
649657
return result
650658

651-
def __run__(self, statement, parameters):
652-
return self._run(statement, parameters)
653-
654-
def __begin__(self):
659+
def _begin(self):
660+
self._assert_open()
661+
metadata = {}
662+
parameters = {}
655663
if self._bookmarks:
656-
parameters = {"bookmark": self.last_bookmark(), "bookmarks": self._bookmarks}
657-
else:
658-
parameters = {}
659-
return self.__run__(u"BEGIN", parameters)
660-
661-
def __commit__(self):
662-
return self.__run__(u"COMMIT", {})
663-
664-
def __rollback__(self):
665-
return self.__run__(u"ROLLBACK", {})
664+
parameters["bookmark"] = self.last_bookmark() # TODO: remove in 2.0
665+
parameters["bookmarks"] = self._bookmarks
666+
cx = self._connection
667+
cx.run(u"BEGIN", parameters, metadata)
668+
cx.pull_all(metadata)
669+
670+
def _commit(self):
671+
self._assert_open()
672+
metadata = {}
673+
try:
674+
cx = self._connection
675+
cx.run(u"COMMIT", {}, metadata)
676+
cx.pull_all(metadata)
677+
finally:
678+
self._disconnect(sync=True)
679+
return metadata.get("bookmark")
666680

667-
def __bookmark__(self, result):
668-
summary = result.summary()
669-
return summary.metadata.get("bookmark")
681+
def _rollback(self):
682+
self._assert_open()
683+
metadata = {}
684+
try:
685+
cx = self._connection
686+
cx.run(u"ROLLBACK", {}, metadata)
687+
cx.pull_all(metadata)
688+
finally:
689+
self._disconnect(sync=True)
670690

671691

672692
class Transaction(object):
@@ -796,18 +816,10 @@ class StatementResult(object):
796816
:meth:`.Session.run` and :meth:`.Transaction.run`.
797817
"""
798818

799-
#: The statement text that was executed to produce this result.
800-
statement = None
801-
802-
#: Dictionary of parameters passed with the statement.
803-
parameters = None
804-
805-
zipper = zip
806-
807-
def __init__(self, session, hydrant):
819+
def __init__(self, session, hydrant, metadata):
808820
self._session = session
809821
self._hydrant = hydrant
810-
self._keys = None
822+
self._metadata = metadata
811823
self._records = deque()
812824
self._summary = None
813825

@@ -826,14 +838,14 @@ def attached(self):
826838
"""
827839
return self._session and not self._session.closed()
828840

829-
def detach(self):
841+
def detach(self, sync=True):
830842
""" Detach this result from its parent session by fetching the
831843
remainder of this result from the network into the buffer.
832844
833845
:returns: number of records fetched
834846
"""
835847
if self.attached():
836-
return self._session.detach(self)
848+
return self._session.detach(self, sync=sync)
837849
else:
838850
return 0
839851

@@ -842,13 +854,14 @@ def keys(self):
842854
843855
:returns: tuple of key names
844856
"""
845-
if self._keys is not None:
846-
return self._keys
847-
if self.attached():
848-
self._session.send()
849-
while self.attached() and self._keys is None:
850-
self._session.fetch()
851-
return self._keys
857+
try:
858+
return self._metadata["fields"]
859+
except KeyError:
860+
if self.attached():
861+
self._session.send()
862+
while self.attached() and "fields" not in self._metadata:
863+
self._session.fetch()
864+
return self._metadata.get("fields")
852865

853866
def records(self):
854867
""" Generator for records obtained from this result.
@@ -873,6 +886,8 @@ def summary(self):
873886
:returns: The :class:`.ResultSummary` for this result
874887
"""
875888
self.detach()
889+
if self._summary is None:
890+
self._summary = BoltStatementResultSummary(**self._metadata)
876891
return self._summary
877892

878893
def consume(self):
@@ -908,22 +923,17 @@ def peek(self):
908923
909924
:returns: the next :class:`.Record` or :const:`None` if none remain
910925
"""
911-
hydrate = self._hydrant.hydrate
912-
zipper = self.zipper
913-
keys = self.keys()
914926
records = self._records
915927
if records:
916-
values = records[0]
917-
return zipper(keys, hydrate(values))
928+
return records[0]
918929
if not self.attached():
919930
return None
920931
if self.attached():
921932
self._session.send()
922933
while self.attached() and not records:
923934
self._session.fetch()
924935
if records:
925-
values = records[0]
926-
return zipper(keys, hydrate(values))
936+
return records[0]
927937
return None
928938

929939
def graph(self):
@@ -941,47 +951,8 @@ class BoltStatementResult(StatementResult):
941951
""" A handler for the result of Cypher statement execution.
942952
"""
943953

944-
@classmethod
945-
def zipper(cls, k, v):
946-
return Record(zip(k, v))
947-
948-
def __init__(self, session, run_response, pull_all_response):
949-
from neobolt.exceptions import CypherError
950-
951-
super(BoltStatementResult, self).__init__(session, PackStreamHydrator(session._connection.protocol_version))
952-
953-
all_metadata = {}
954-
955-
def on_header(metadata):
956-
# Called on receipt of the result header.
957-
all_metadata.update(metadata)
958-
self._keys = tuple(metadata.get("fields", ()))
959-
960-
def on_records(records):
961-
# Called on receipt of one or more result records.
962-
self._records.extend(map(lambda record: self.zipper(self.keys(), self._hydrant.hydrate(record)), records))
963-
964-
def on_footer(metadata):
965-
# Called on receipt of the result footer.
966-
connection = self.session._connection
967-
all_metadata.update(metadata, statement=self.statement, parameters=self.parameters,
968-
server=connection.server, protocol_version=connection.protocol_version)
969-
self._summary = BoltStatementResultSummary(**all_metadata)
970-
self._session, session_ = None, self._session
971-
session_.detach(self)
972-
973-
def on_failure(metadata):
974-
# Called on execution failure.
975-
self.session.reset()
976-
on_footer(metadata)
977-
raise CypherError.hydrate(**metadata)
978-
979-
run_response.on_success = on_header
980-
run_response.on_failure = on_failure
981-
982-
pull_all_response.on_records = on_records
983-
pull_all_response.on_success = on_footer
984-
pull_all_response.on_failure = on_failure
954+
def __init__(self, session, hydrant, metadata):
955+
super(BoltStatementResult, self).__init__(session, hydrant, metadata)
985956

986957
def value(self, item=0, default=None):
987958
""" Return the remainder of the result as a list of values.

neo4j/compat/__init__.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,3 +126,25 @@ def perf_counter():
126126
from urllib.parse import urlparse, parse_qs
127127
except ImportError:
128128
from urlparse import urlparse, parse_qs
129+
130+
131+
def deprecated(message):
132+
""" Decorator for deprecating functions and methods.
133+
134+
::
135+
136+
@deprecated("'foo' has been deprecated in favour of 'bar'")
137+
def foo(x):
138+
pass
139+
140+
"""
141+
def f__(f):
142+
def f_(*args, **kwargs):
143+
from warnings import warn
144+
warn(message, category=DeprecationWarning, stacklevel=2)
145+
return f(*args, **kwargs)
146+
f_.__name__ = f.__name__
147+
f_.__doc__ = f.__doc__
148+
f_.__dict__.update(f.__dict__)
149+
return f_
150+
return f__

neo4j/meta.py

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,25 +21,3 @@
2121

2222
# This version number will be automatically set in release-prepare build.
2323
version = "1.7.0.dev0"
24-
25-
26-
def deprecated(message):
27-
""" Decorator for deprecating functions and methods.
28-
29-
::
30-
31-
@deprecated("'foo' has been deprecated in favour of 'bar'")
32-
def foo(x):
33-
pass
34-
35-
"""
36-
def f__(f):
37-
def f_(*args, **kwargs):
38-
from warnings import warn
39-
warn(message, category=DeprecationWarning, stacklevel=2)
40-
return f(*args, **kwargs)
41-
f_.__name__ = f.__name__
42-
f_.__doc__ = f.__doc__
43-
f_.__dict__.update(f.__dict__)
44-
return f_
45-
return f__

0 commit comments

Comments
 (0)