Skip to content

Commit 27a705c

Browse files
committed
Merge branch 'main' into ma/examples-only
2 parents ccb34d5 + 4c877ef commit 27a705c

File tree

4 files changed

+125
-26
lines changed

4 files changed

+125
-26
lines changed

TODO.rst

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ TODO
55

66
Build Tooling
77
=============
8-
* **[HIGH]** Integrate tooling to build binaries for a matrix of operating
9-
systems, architectures and Python versions. This will also be our CI.
10-
This should help: https://github.com/pypa/cibuildwheel
8+
* **[MEDIUM]** Transition to Azure, move Linux arm to ARM pipeline without QEMU.
9+
10+
* **[MEDIUM]** Automate Apple Silicon.
1111

1212

1313
Docs
@@ -18,19 +18,14 @@ Docs
1818
See: https://www.sphinx-doc.org/en/master/usage/restructuredtext/directives.html#directive-literalinclude
1919
The examples should be in the ``examples/`` directory in the repo.
2020

21-
* **[MEDIUM]** Document on a per-version basis.
22-
2321
* **[MEDIUM]** These examples should be tested as part of the unit tests (as they
2422
are in the C client). This is to ensure they don't "bit rot" as the code
2523
changes.
2624

25+
* **[LOW]** Document on a per-version basis.
2726

2827
Development
2928
===========
30-
* **[HIGH]** Review API naming!
31-
32-
* **[HIGH]** Implement the auto-commit logic based on a watermark.
33-
3429
* **[MEDIUM]** Once we're done with them, merge in changes in the ``py_client_tweaks`` branch
3530
of the C client.
3631

docs/conf.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,18 @@
4949
napoleon_use_ivar = True
5050
napoleon_use_rtype = False
5151
napoleon_use_param = False
52+
53+
autodoc_default_options = {
54+
'special-members': '__init__ , __str__ , __enter__ , __exit__',
55+
'undoc-members': True
56+
}
57+
58+
59+
# def do_not_skip_dunder_members(_app, _what, name, _obj, would_skip, _options):
60+
# if name in ('__init__', '__call__', '__str__', '__enter__', '__exit__'):
61+
# return False
62+
# return would_skip
63+
64+
65+
# def setup(app):
66+
# app.connect('autodoc-skip-member', do_not_skip_dunder_members)

src/questdb/ingress.pyx

Lines changed: 52 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ class IngressErrorCode(Enum):
9191
AuthError = line_sender_error_auth_error
9292
TlsError = line_sender_error_tls_error
9393

94-
def __str__(self):
94+
def __str__(self) -> str:
9595
return self.name
9696

9797

@@ -266,14 +266,13 @@ cdef class TimestampNanos:
266266

267267

268268
cdef class Sender
269+
cdef class Buffer
269270

270271

271-
cdef int may_flush_on_row_complete(
272-
line_sender_buffer* buf,
273-
Sender sender) except -1:
272+
cdef int may_flush_on_row_complete(Buffer buffer, Sender sender) except -1:
274273
if sender._auto_flush_enabled:
275-
if line_sender_buffer_size(buf) >= sender._auto_flush_watermark:
276-
print('w00t')
274+
if len(buffer) >= sender._auto_flush_watermark:
275+
sender.flush(buffer)
277276

278277

279278
cdef class Buffer:
@@ -307,6 +306,8 @@ cdef class Buffer:
307306
* To see the contents, call ``str(buffer)``.
308307
"""
309308
cdef line_sender_buffer* _impl
309+
cdef size_t _init_capacity
310+
cdef size_t _max_name_len
310311
cdef object _row_complete_sender
311312

312313
def __cinit__(self, init_capacity: int=65536, max_name_len: int=127):
@@ -320,12 +321,33 @@ cdef class Buffer:
320321
cdef inline _cinit_impl(self, size_t init_capacity, size_t max_name_len):
321322
self._impl = line_sender_buffer_with_max_name_len(max_name_len)
322323
line_sender_buffer_reserve(self._impl, init_capacity)
324+
self._init_capacity = init_capacity
325+
self._max_name_len = max_name_len
323326
self._row_complete_sender = None
324327

325328
def __dealloc__(self):
326329
self._row_complete_sender = None
327330
line_sender_buffer_free(self._impl)
328331

332+
@property
333+
def init_capacity(self) -> int:
334+
"""
335+
The initial capacity of the buffer when first created.
336+
337+
This may grow over time, see ``capacity()``.
338+
"""
339+
return self._init_capacity
340+
341+
@property
342+
def max_name_len(self) -> int:
343+
"""Maximum length of a table or column name."""
344+
return self._max_name_len
345+
346+
@property
347+
def max_name_len(self) -> int:
348+
"""Maximum length of a table or column name."""
349+
return self._max_name_len
350+
329351
def reserve(self, additional: int):
330352
"""
331353
Ensure the buffer has at least `additional` bytes of future capacity.
@@ -476,9 +498,7 @@ cdef class Buffer:
476498
if self._row_complete_sender != None:
477499
sender = PyWeakref_GetObject(self._row_complete_sender)
478500
if sender != NULL:
479-
may_flush_on_row_complete(
480-
self._impl,
481-
<Sender><object>sender)
501+
may_flush_on_row_complete(self, <Sender><object>sender)
482502

483503
cdef inline int _at_ts(self, TimestampNanos ts) except -1:
484504
cdef line_sender_error* err = NULL
@@ -772,9 +792,9 @@ cdef class Sender:
772792
cdef line_sender* _impl
773793
cdef Buffer _buffer
774794
cdef bint _auto_flush_enabled
775-
cdef size_t _auto_flush_watermark
776-
cdef object _init_capacity
777-
cdef object _max_name_len
795+
cdef ssize_t _auto_flush_watermark
796+
cdef size_t _init_capacity
797+
cdef size_t _max_name_len
778798

779799
def __cinit__(
780800
self,
@@ -785,9 +805,9 @@ cdef class Sender:
785805
tuple auth=None,
786806
object tls=False,
787807
object read_timeout=None,
788-
int init_capacity=65536,
808+
int init_capacity=65536, # 64KiB
789809
int max_name_len=127,
790-
object auto_flush=32768):
810+
object auto_flush=64512): # 63KiB
791811
cdef line_sender_error* err = NULL
792812

793813
cdef line_sender_utf8 host_utf8
@@ -887,6 +907,10 @@ cdef class Sender:
887907
self._auto_flush_enabled = not not auto_flush
888908
self._auto_flush_watermark = int(auto_flush) \
889909
if self._auto_flush_enabled else 0
910+
if self._auto_flush_watermark < 0:
911+
raise ValueError(
912+
'auto_flush_watermark must be >= 0, '
913+
f'not {self._auto_flush_watermark}')
890914

891915
def new_buffer(self):
892916
"""
@@ -895,10 +919,20 @@ cdef class Sender:
895919
The buffer is set up with the configured `init_capacity` and
896920
`max_name_len`.
897921
"""
898-
self._buffer = Buffer(
922+
return Buffer(
899923
init_capacity=self._init_capacity,
900924
max_name_len=self._max_name_len)
901925

926+
@property
927+
def init_capacity(self) -> int:
928+
"""The initial capacity of the sender's internal buffer."""
929+
return self._init_capacity
930+
931+
@property
932+
def max_name_len(self) -> int:
933+
"""Maximum length of a table or column name."""
934+
return self._max_name_len
935+
902936
def connect(self):
903937
cdef line_sender_error* err = NULL
904938
if self._opts == NULL:
@@ -929,6 +963,9 @@ cdef class Sender:
929963
self._buffer.row(*args, **kwargs)
930964

931965
cpdef flush(self, Buffer buffer=None, bint clear=True):
966+
if buffer is None and not clear:
967+
raise ValueError('The internal buffer must always be cleared.')
968+
932969
cdef line_sender_error* err = NULL
933970
cdef line_sender_buffer* c_buf = NULL
934971
if self._impl == NULL:

test/test.py

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,16 @@ def test_flush_3(self):
176176
sender.row('tbl1', symbols={'a': 'b'})
177177
sender.flush()
178178

179+
def test_flush_4(self):
180+
# Clearing of the internal buffer is not allowed.
181+
with Server() as server:
182+
with self.assertRaises(ValueError):
183+
with qi.Sender('localhost', server.port) as sender:
184+
server.accept()
185+
sender.row('tbl1', symbols={'a': 'b'})
186+
sender.flush(buffer=None, clear=False)
187+
188+
179189
def test_independent_buffer(self):
180190
buf = qi.Buffer()
181191
buf.row('tbl1', symbols={'sym1': 'val1'})
@@ -211,8 +221,50 @@ def test_auto_flush(self):
211221
with qi.Sender('localhost', server.port, auto_flush=4) as sender:
212222
server.accept()
213223
sender.row('tbl1', symbols={'sym1': 'val1'})
214-
# msgs = server.recv()
215-
# self.assertEqual(msgs, [b'tbl1,sym1=val1', b'tbl1,sym2=val2'])
224+
self.assertEqual(len(sender), 0) # auto-flushed buffer.
225+
msgs = server.recv()
226+
self.assertEqual(msgs, [b'tbl1,sym1=val1'])
227+
228+
def test_immediate_auto_flush(self):
229+
with Server() as server:
230+
with qi.Sender('localhost', server.port, auto_flush=True) as sender:
231+
server.accept()
232+
sender.row('tbl1', symbols={'sym1': 'val1'})
233+
self.assertEqual(len(sender), 0) # auto-flushed buffer.
234+
msgs = server.recv()
235+
self.assertEqual(msgs, [b'tbl1,sym1=val1'])
236+
237+
def test_dont_auto_flush(self):
238+
with Server() as server:
239+
with qi.Sender('localhost', server.port, auto_flush=0) as sender:
240+
server.accept()
241+
while len(sender) < 131072: # 128KiB
242+
sender.row('tbl1', symbols={'sym1': 'val1'})
243+
msgs = server.recv()
244+
self.assertEqual(msgs, [])
245+
246+
def test_dont_flush_on_exception(self):
247+
with Server() as server:
248+
with self.assertRaises(RuntimeError):
249+
with qi.Sender('localhost', server.port) as sender:
250+
server.accept()
251+
sender.row('tbl1', symbols={'sym1': 'val1'})
252+
self.assertEqual(str(sender), 'tbl1,sym1=val1\n')
253+
raise RuntimeError('Test exception')
254+
msgs = server.recv()
255+
self.assertEqual(msgs, [])
256+
257+
def test_new_buffer(self):
258+
sender = qi.Sender(
259+
host='localhost',
260+
port=9009,
261+
init_capacity=1024,
262+
max_name_len=10)
263+
buffer = sender.new_buffer()
264+
self.assertEqual(buffer.init_capacity, 1024)
265+
self.assertEqual(buffer.max_name_len, 10)
266+
self.assertEqual(buffer.init_capacity, sender.init_capacity)
267+
self.assertEqual(buffer.max_name_len, sender.max_name_len)
216268

217269

218270
if __name__ == '__main__':

0 commit comments

Comments
 (0)