@@ -313,9 +313,12 @@ class Session(object):
313313 # The last result received.
314314 _last_result = None
315315
316- # The collection of bookmarks after which the next
316+ # The set of bookmarks after which the next
317317 # :class:`.Transaction` should be carried out.
318- _bookmarks = ()
318+ _bookmarks_in = None
319+
320+ # The bookmark returned from the last commit.
321+ _bookmark_out = None
319322
320323 # Default maximum time to keep retrying failed transactions.
321324 _max_retry_time = default_config ["max_retry_time" ]
@@ -327,9 +330,11 @@ def __init__(self, acquirer, access_mode, **parameters):
327330 self ._default_access_mode = access_mode
328331 for key , value in parameters .items ():
329332 if key == "bookmark" :
330- self ._bookmarks = [value ] if value else []
333+ if value :
334+ self ._bookmarks_in = tuple ([value ])
331335 elif key == "bookmarks" :
332- self ._bookmarks = value or []
336+ if value :
337+ self ._bookmarks_in = tuple (value )
333338 elif key == "max_retry_time" :
334339 self ._max_retry_time = value
335340 else :
@@ -417,18 +422,35 @@ def run(self, statement, parameters=None, **kwparameters):
417422 """
418423 from neobolt .exceptions import ConnectionExpired
419424
420- if self .closed ():
421- raise SessionError ("Session closed" )
425+ self ._assert_open ()
422426 if not statement :
423427 raise ValueError ("Cannot run an empty statement" )
424428
425- if not self .has_transaction () :
429+ if not self ._connection :
426430 self ._connect ()
431+ cx = self ._connection
432+ protocol_version = cx .protocol_version
433+ server = cx .server
427434
428435 statement = ustr (statement )
429- parameters = fix_parameters (dict (parameters or {}, ** kwparameters ), self ._connection .protocol_version ,
430- supports_bytes = self ._connection .server .supports_bytes ())
431- result = self ._run (statement , parameters )
436+ parameters = fix_parameters (dict (parameters or {}, ** kwparameters ), protocol_version ,
437+ supports_bytes = server .supports_bytes ())
438+
439+ hydrant = PackStreamHydrator (protocol_version )
440+ metadata = {
441+ "statement" : statement ,
442+ "parameters" : parameters ,
443+ "server" : server ,
444+ "protocol_version" : protocol_version ,
445+ }
446+ self ._last_result = result = BoltStatementResult (self , hydrant , metadata )
447+ cx .run (statement , parameters , metadata )
448+ cx .pull_all (
449+ metadata ,
450+ on_records = lambda records : result ._records .extend (
451+ hydrant .hydrate_records (result .keys (), records )),
452+ on_summary = lambda : result .detach (sync = False ),
453+ )
432454
433455 if not self .has_transaction ():
434456 try :
@@ -503,16 +525,16 @@ def detach(self, result, sync=True):
503525 result ._session = None
504526 return count
505527
528+ def next_bookmarks (self ):
529+ """ The set of bookmarks to be passed into the next
530+ :class:`.Transaction`.
531+ """
532+ return self ._bookmarks_in
533+
506534 def last_bookmark (self ):
507535 """ The bookmark returned by the last :class:`.Transaction`.
508536 """
509- last = None
510- for bookmark in self ._bookmarks :
511- if last is None :
512- last = bookmark
513- else :
514- last = last_bookmark (last , bookmark )
515- return last
537+ return self ._bookmark_out
516538
517539 def has_transaction (self ):
518540 return bool (self ._transaction )
@@ -529,6 +551,7 @@ def begin_transaction(self, bookmark=None):
529551 :returns: new :class:`.Transaction` instance.
530552 :raise: :class:`.TransactionError` if a transaction is already open
531553 """
554+ self ._assert_open ()
532555 if self .has_transaction ():
533556 raise TransactionError ("Explicit transaction already open" )
534557
@@ -539,9 +562,15 @@ def begin_transaction(self, bookmark=None):
539562 from warnings import warn
540563 warn ("Passing bookmarks at transaction level is deprecated" , category = DeprecationWarning , stacklevel = 2 )
541564 _warned_about_transaction_bookmarks = True
542- self ._bookmarks = [bookmark ]
565+ self ._bookmarks_in = tuple ( [bookmark ])
543566
544- return self ._open_transaction ()
567+ self ._open_transaction ()
568+ return self ._transaction
569+
570+ def _open_transaction (self , access_mode = None ):
571+ self ._transaction = Transaction (self , on_close = self ._close_transaction )
572+ self ._connect (access_mode )
573+ self ._connection .begin (self ._bookmarks_in , {})
545574
546575 def commit_transaction (self ):
547576 """ Commit the current transaction.
@@ -554,14 +583,13 @@ def commit_transaction(self):
554583 raise TransactionError ("No transaction to commit" )
555584 metadata = {}
556585 try :
557- cx = self ._connection
558- cx .run (u"COMMIT" , {}, metadata )
559- cx .pull_all (metadata )
586+ self ._connection .commit (metadata )
560587 finally :
561588 self ._disconnect (sync = True )
562589 self ._transaction = None
563590 bookmark = metadata .get ("bookmark" )
564- self ._bookmarks = [bookmark ]
591+ self ._bookmarks_in = tuple ([bookmark ])
592+ self ._bookmark_out = bookmark
565593 return bookmark
566594
567595 def rollback_transaction (self ):
@@ -574,9 +602,7 @@ def rollback_transaction(self):
574602 raise TransactionError ("No transaction to rollback" )
575603 metadata = {}
576604 try :
577- cx = self ._connection
578- cx .run (u"ROLLBACK" , {}, metadata )
579- cx .pull_all (metadata )
605+ self ._connection .rollback (metadata )
580606 finally :
581607 self ._disconnect (sync = True )
582608 self ._transaction = None
@@ -593,7 +619,8 @@ def _run_transaction(self, access_mode, unit_of_work, *args, **kwargs):
593619 t0 = perf_counter ()
594620 while True :
595621 try :
596- tx = self ._open_transaction (access_mode )
622+ self ._open_transaction (access_mode )
623+ tx = self ._transaction
597624 try :
598625 result = unit_of_work (tx , * args , ** kwargs )
599626 except :
@@ -624,49 +651,17 @@ def _run_transaction(self, access_mode, unit_of_work, *args, **kwargs):
624651 raise ServiceUnavailable ("Transaction failed" )
625652
626653 def read_transaction (self , unit_of_work , * args , ** kwargs ):
654+ self ._assert_open ()
627655 return self ._run_transaction (READ_ACCESS , unit_of_work , * args , ** kwargs )
628656
629657 def write_transaction (self , unit_of_work , * args , ** kwargs ):
658+ self ._assert_open ()
630659 return self ._run_transaction (WRITE_ACCESS , unit_of_work , * args , ** kwargs )
631660
632661 def _assert_open (self ):
633662 if self .closed ():
634663 raise SessionError ("Session closed" )
635664
636- def _run (self , statement , parameters ):
637- self ._assert_open ()
638- cx = self ._connection
639- hydrant = PackStreamHydrator (cx .protocol_version )
640- metadata = {
641- "statement" : statement ,
642- "parameters" : parameters ,
643- "server" : cx .server ,
644- "protocol_version" : cx .protocol_version ,
645- }
646- self ._last_result = result = BoltStatementResult (self , hydrant , metadata )
647- cx .run (statement , parameters , metadata )
648- cx .pull_all (
649- metadata ,
650- on_records = lambda records : result ._records .extend (
651- hydrant .hydrate_records (result .keys (), records )),
652- on_summary = lambda : result .detach (sync = False ),
653- )
654- return result
655-
656- def _open_transaction (self , access_mode = None ):
657- self ._transaction = Transaction (self , on_close = self ._close_transaction )
658- self ._connect (access_mode )
659- self ._assert_open ()
660- metadata = {}
661- parameters = {}
662- if self ._bookmarks :
663- parameters ["bookmark" ] = self .last_bookmark () # TODO: remove in 2.0
664- parameters ["bookmarks" ] = self ._bookmarks
665- cx = self ._connection
666- cx .run (u"BEGIN" , parameters , metadata )
667- cx .pull_all (metadata )
668- return self ._transaction
669-
670665
671666class Transaction (object ):
672667 """ Container for multiple Cypher queries to be executed within
@@ -1401,21 +1396,6 @@ def iter_items(iterable):
14011396 yield key , value
14021397
14031398
1404- def last_bookmark (b0 , b1 ):
1405- """ Return the latest of two bookmarks by looking for the maximum
1406- integer value following the last colon in the bookmark string.
1407- """
1408- n = [None , None ]
1409- _ , _ , n [0 ] = b0 .rpartition (":" )
1410- _ , _ , n [1 ] = b1 .rpartition (":" )
1411- for i in range (2 ):
1412- try :
1413- n [i ] = int (n [i ])
1414- except ValueError :
1415- raise ValueError ("Invalid bookmark: {}" .format (b0 ))
1416- return b0 if n [0 ] > n [1 ] else b1
1417-
1418-
14191399def retry_delay_generator (initial_delay , multiplier , jitter_factor ):
14201400 delay = initial_delay
14211401 while True :
0 commit comments