Skip to content

Commit 6cec56e

Browse files
committed
feat: Sender(auto_flush) with watermark.
1 parent c42e2da commit 6cec56e

File tree

2 files changed

+44
-14
lines changed

2 files changed

+44
-14
lines changed

src/questdb/ingress.pyx

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -266,14 +266,13 @@ cdef class TimestampNanos:
266266

267267

268268
cdef class Sender
269+
cdef class Buffer
269270

270271

271-
cdef int may_flush_on_row_complete(
272-
line_sender_buffer* buf,
273-
Sender sender) except -1:
272+
cdef int may_flush_on_row_complete(Buffer buffer, Sender sender) except -1:
274273
if sender._auto_flush_enabled:
275-
if line_sender_buffer_size(buf) >= sender._auto_flush_watermark:
276-
print('w00t')
274+
if len(buffer) >= sender._auto_flush_watermark:
275+
sender.flush(buffer)
277276

278277

279278
cdef class Buffer:
@@ -476,9 +475,7 @@ cdef class Buffer:
476475
if self._row_complete_sender != None:
477476
sender = PyWeakref_GetObject(self._row_complete_sender)
478477
if sender != NULL:
479-
may_flush_on_row_complete(
480-
self._impl,
481-
<Sender><object>sender)
478+
may_flush_on_row_complete(self, <Sender><object>sender)
482479

483480
cdef inline int _at_ts(self, TimestampNanos ts) except -1:
484481
cdef line_sender_error* err = NULL
@@ -772,7 +769,7 @@ cdef class Sender:
772769
cdef line_sender* _impl
773770
cdef Buffer _buffer
774771
cdef bint _auto_flush_enabled
775-
cdef size_t _auto_flush_watermark
772+
cdef ssize_t _auto_flush_watermark
776773
cdef object _init_capacity
777774
cdef object _max_name_len
778775

@@ -785,9 +782,9 @@ cdef class Sender:
785782
tuple auth=None,
786783
object tls=False,
787784
object read_timeout=None,
788-
int init_capacity=65536,
785+
int init_capacity=65536, # 64KiB
789786
int max_name_len=127,
790-
object auto_flush=32768):
787+
object auto_flush=64512): # 63KiB
791788
cdef line_sender_error* err = NULL
792789

793790
cdef line_sender_utf8 host_utf8
@@ -887,6 +884,10 @@ cdef class Sender:
887884
self._auto_flush_enabled = not not auto_flush
888885
self._auto_flush_watermark = int(auto_flush) \
889886
if self._auto_flush_enabled else 0
887+
if self._auto_flush_watermark < 0:
888+
raise ValueError(
889+
'auto_flush_watermark must be >= 0, '
890+
f'not {self._auto_flush_watermark}')
890891

891892
def new_buffer(self):
892893
"""

test/test.py

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#!/usr/bin/env python3
22

33
import sys
4-
from turtle import clear
54
sys.dont_write_bytecode = True
65
import os
76
import unittest
@@ -222,8 +221,38 @@ def test_auto_flush(self):
222221
with qi.Sender('localhost', server.port, auto_flush=4) as sender:
223222
server.accept()
224223
sender.row('tbl1', symbols={'sym1': 'val1'})
225-
# msgs = server.recv()
226-
# self.assertEqual(msgs, [b'tbl1,sym1=val1', b'tbl1,sym2=val2'])
224+
self.assertEqual(len(sender), 0) # auto-flushed buffer.
225+
msgs = server.recv()
226+
self.assertEqual(msgs, [b'tbl1,sym1=val1'])
227+
228+
def test_immediate_auto_flush(self):
229+
with Server() as server:
230+
with qi.Sender('localhost', server.port, auto_flush=True) as sender:
231+
server.accept()
232+
sender.row('tbl1', symbols={'sym1': 'val1'})
233+
self.assertEqual(len(sender), 0) # auto-flushed buffer.
234+
msgs = server.recv()
235+
self.assertEqual(msgs, [b'tbl1,sym1=val1'])
236+
237+
def test_dont_auto_flush(self):
238+
with Server() as server:
239+
with qi.Sender('localhost', server.port, auto_flush=0) as sender:
240+
server.accept()
241+
while len(sender) < 131072: # 128KiB
242+
sender.row('tbl1', symbols={'sym1': 'val1'})
243+
msgs = server.recv()
244+
self.assertEqual(msgs, [])
245+
246+
def test_dont_flush_on_exception(self):
247+
with Server() as server:
248+
with self.assertRaises(RuntimeError):
249+
with qi.Sender('localhost', server.port) as sender:
250+
server.accept()
251+
sender.row('tbl1', symbols={'sym1': 'val1'})
252+
self.assertEqual(str(sender), 'tbl1,sym1=val1\n')
253+
raise RuntimeError('Test exception')
254+
msgs = server.recv()
255+
self.assertEqual(msgs, [])
227256

228257

229258
if __name__ == '__main__':

0 commit comments

Comments
 (0)