Skip to content

Commit 340f44f

Browse files
committed
Merge branch 'main' into ma/examples-only
2 parents cc5f8a4 + c054c55 commit 340f44f

File tree

7 files changed

+106
-86
lines changed

7 files changed

+106
-86
lines changed

.gitignore

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
src/questdb/ilp.html
2-
src/questdb/ilp.c
1+
src/questdb/ingress.html
2+
src/questdb/ingress.c
33
rustup-init.exe
44

55
# Byte-compiled / optimized / DLL files

docs/api.rst

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22
API Reference
33
=============
44

5-
questdb.ilp
6-
===========
5+
questdb.ingress
6+
===============
77

88
.. testsetup::
99

10-
from questdb.ilp import *
10+
from questdb.ingress import *
1111

12-
.. automodule:: questdb.ilp
12+
.. automodule:: questdb.ingress
1313
:members:
1414
:undoc-members:
1515
:show-inheritance:

docs/installation.rst

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ following statements from a ``python3`` interactive shell:
2828

2929
.. code-block:: python
3030
31-
>>> import questdb.ilp
32-
>>> buf = questdb.ilp.Buffer()
31+
>>> import questdb.ingress
32+
>>> buf = questdb.ingress.Buffer()
3333
>>> buf.row('test', symbols={'a': 'b'})
34-
<questdb.ilp.Buffer object at 0x104b68240>
34+
<questdb.ingress.Buffer object at 0x104b68240>
3535
>>> str(buf)
3636
'test,a=b\n'

setup.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
WIN_32BIT_CARGO_TARGET = 'i686-pc-windows-msvc'
2222

2323

24-
def ilp_extension():
24+
def ingress_extension():
2525
lib_name = None
2626
lib_paths = []
2727
libraries = []
@@ -52,8 +52,8 @@ def ilp_extension():
5252
raise NotImplementedError(f'Unsupported platform: {PLATFORM}')
5353

5454
return Extension(
55-
"questdb.ilp",
56-
["src/questdb/ilp.pyx"],
55+
"questdb.ingress",
56+
["src/questdb/ingress.pyx"],
5757
include_dirs=["c-questdb-client/include"],
5858
library_dirs=lib_paths,
5959
libraries=libraries,
@@ -124,7 +124,7 @@ def readme():
124124
platforms=['any'],
125125
python_requires='>=3.7',
126126
install_requires=[],
127-
ext_modules = cythonize([ilp_extension()], annotate=True),
127+
ext_modules = cythonize([ingress_extension()], annotate=True),
128128
cmdclass={'build_ext': questdb_build_ext},
129129
zip_safe = False,
130130
package_dir={'': 'src'},
Lines changed: 56 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,15 @@
2626
# cython: language_level=3
2727
# cython: binding=True
2828

29+
"""
30+
API for fast data ingestion into QuestDB.
31+
"""
32+
2933
from libc.stdint cimport uint8_t, int64_t
3034
from cpython.datetime cimport datetime
3135
from cpython.bool cimport bool, PyBool_Check
36+
from cpython.weakref cimport PyWeakref_NewRef, PyWeakref_GetObject
37+
from cpython.object cimport PyObject
3238

3339
from .line_sender cimport *
3440

@@ -42,9 +48,12 @@ cdef extern from "Python.h":
4248
PyUnicode_2BYTE_KIND
4349
PyUnicode_4BYTE_KIND
4450

51+
# Note: Returning an `object` rather than `PyObject` as the function
52+
# returns a new reference rather than borrowing an existing one.
4553
object PyUnicode_FromKindAndData(
4654
int kind, const void* buffer, Py_ssize_t size)
4755

56+
# Ditto, see comment on why not returning a `PyObject` above.
4857
str PyUnicode_FromStringAndSize(
4958
const char* u, Py_ssize_t size)
5059

@@ -71,7 +80,7 @@ import pathlib
7180

7281
import sys
7382

74-
class IlpErrorCode(Enum):
83+
class IngressErrorCode(Enum):
7584
"""Category of Error."""
7685
CouldNotResolveAddr = line_sender_error_could_not_resolve_addr
7786
InvalidApiCall = line_sender_error_invalid_api_call
@@ -86,36 +95,36 @@ class IlpErrorCode(Enum):
8695
return self.name
8796

8897

89-
class IlpError(Exception):
98+
class IngressError(Exception):
9099
"""
91-
An error whilst using the line sender or constructing its buffer.
100+
An error whilst using the ``Sender`` or constructing its ``Buffer``.
92101
"""
93102
def __init__(self, code, msg):
94103
super().__init__(msg)
95104
self._code = code
96105

97106
@property
98-
def code(self) -> IlpErrorCode:
107+
def code(self) -> IngressErrorCode:
99108
return self._code
100109

101110

102111
cdef inline object c_err_code_to_py(line_sender_error_code code):
103112
if code == line_sender_error_could_not_resolve_addr:
104-
return IlpErrorCode.CouldNotResolveAddr
113+
return IngressErrorCode.CouldNotResolveAddr
105114
elif code == line_sender_error_invalid_api_call:
106-
return IlpErrorCode.InvalidApiCall
115+
return IngressErrorCode.InvalidApiCall
107116
elif code == line_sender_error_socket_error:
108-
return IlpErrorCode.SocketError
117+
return IngressErrorCode.SocketError
109118
elif code == line_sender_error_invalid_utf8:
110-
return IlpErrorCode.InvalidUtf8
119+
return IngressErrorCode.InvalidUtf8
111120
elif code == line_sender_error_invalid_name:
112-
return IlpErrorCode.InvalidName
121+
return IngressErrorCode.InvalidName
113122
elif code == line_sender_error_invalid_timestamp:
114-
return IlpErrorCode.InvalidTimestamp
123+
return IngressErrorCode.InvalidTimestamp
115124
elif code == line_sender_error_auth_error:
116-
return IlpErrorCode.AuthError
125+
return IngressErrorCode.AuthError
117126
elif code == line_sender_error_tls_error:
118-
return IlpErrorCode.TlsError
127+
return IngressErrorCode.TlsError
119128
else:
120129
raise ValueError('Internal error converting error code.')
121130

@@ -134,7 +143,7 @@ cdef inline object c_err_to_py(line_sender_error* err):
134143
PyUnicode_1BYTE_KIND,
135144
c_msg,
136145
<Py_ssize_t>c_len)
137-
return IlpError(py_code, py_msg)
146+
return IngressError(py_code, py_msg)
138147
finally:
139148
line_sender_error_free(err)
140149

@@ -256,10 +265,15 @@ cdef class TimestampNanos:
256265
return self._value
257266

258267

259-
ctypedef bint (*row_complete_cb)(
260-
line_sender_buffer*,
261-
void* ctx,
262-
line_sender_error**)
268+
cdef class Sender
269+
270+
271+
cdef int may_flush_on_row_complete(
272+
line_sender_buffer* buf,
273+
Sender sender) except -1:
274+
if sender._auto_flush_enabled:
275+
if line_sender_buffer_size(buf) >= sender._auto_flush_watermark:
276+
print('w00t')
263277

264278

265279
cdef class Buffer:
@@ -268,7 +282,7 @@ cdef class Buffer:
268282

269283
.. code-block:: python
270284

271-
from questdb.ilp import Buffer
285+
from questdb.ing import Buffer
272286

273287
buf = Buffer()
274288
buf.row(
@@ -293,8 +307,7 @@ cdef class Buffer:
293307
* To see the contents, call ``str(buffer)``.
294308
"""
295309
cdef line_sender_buffer* _impl
296-
cdef row_complete_cb _row_complete_cb
297-
cdef void* _row_complete_ctx
310+
cdef object _row_complete_sender
298311

299312
def __cinit__(self, init_capacity: int=65536, max_name_len: int=127):
300313
"""
@@ -307,12 +320,10 @@ cdef class Buffer:
307320
cdef inline _cinit_impl(self, size_t init_capacity, size_t max_name_len):
308321
self._impl = line_sender_buffer_with_max_name_len(max_name_len)
309322
line_sender_buffer_reserve(self._impl, init_capacity)
310-
self._row_complete_cb = NULL
311-
self._row_complete_ctx = NULL
323+
self._row_complete_sender = None
312324

313325
def __dealloc__(self):
314-
self._row_complete_cb = NULL
315-
self._row_complete_ctx = NULL
326+
self._row_complete_sender = None
316327
line_sender_buffer_free(self._impl)
317328

318329
def reserve(self, additional: int):
@@ -461,12 +472,13 @@ cdef class Buffer:
461472

462473
cdef inline int _may_trigger_row_complete(self) except -1:
463474
cdef line_sender_error* err = NULL
464-
if self._row_complete_cb != NULL:
465-
if not self._row_complete_cb(
475+
cdef PyObject* sender = NULL
476+
if self._row_complete_sender != None:
477+
sender = PyWeakref_GetObject(self._row_complete_sender)
478+
if sender != NULL:
479+
may_flush_on_row_complete(
466480
self._impl,
467-
self._row_complete_ctx,
468-
&err):
469-
raise c_err_to_py(err)
481+
<Sender><object>sender)
470482

471483
cdef inline int _at_ts(self, TimestampNanos ts) except -1:
472484
cdef line_sender_error* err = NULL
@@ -515,8 +527,8 @@ cdef class Buffer:
515527
try:
516528
self._table(table_name)
517529
if not (symbols or columns):
518-
raise IlpError(
519-
IlpErrorCode.InvalidApiCall,
530+
raise IngressError(
531+
IngressErrorCode.InvalidApiCall,
520532
'Must specify at least one symbol or column')
521533
if symbols is not None:
522534
for name, value in symbols.items():
@@ -752,6 +764,10 @@ cdef class Buffer:
752764

753765

754766
cdef class Sender:
767+
# We need the Buffer held by a Sender can hold a weakref to its Sender.
768+
# This avoids a circular reference that requires the GC to clean up.
769+
cdef object __weakref__
770+
755771
cdef line_sender_opts* _opts
756772
cdef line_sender* _impl
757773
cdef Buffer _buffer
@@ -868,7 +884,7 @@ cdef class Sender:
868884
init_capacity=init_capacity,
869885
max_name_len=max_name_len)
870886

871-
self._auto_flush_enabled = auto_flush is not None
887+
self._auto_flush_enabled = not not auto_flush
872888
self._auto_flush_watermark = int(auto_flush) \
873889
if self._auto_flush_enabled else 0
874890

@@ -886,16 +902,18 @@ cdef class Sender:
886902
def connect(self):
887903
cdef line_sender_error* err = NULL
888904
if self._opts == NULL:
889-
raise IlpError(
890-
IlpErrorCode.InvalidApiCall,
905+
raise IngressError(
906+
IngressErrorCode.InvalidApiCall,
891907
'connect() can\'t be called after close().')
892908
self._impl = line_sender_connect(self._opts, &err)
893909
if self._impl == NULL:
894910
raise c_err_to_py(err)
895911
line_sender_opts_free(self._opts)
896912
self._opts = NULL
897-
# self._buffer._row_complete_cb = may_flush_on_row_complete
898-
# self._buffer._row_complete_ctx = self
913+
914+
# Request callbacks when rows are complete.
915+
if self._buffer is not None:
916+
self._buffer._row_complete_sender = PyWeakref_NewRef(self, None)
899917

900918
def __enter__(self):
901919
self.connect()
@@ -914,8 +932,8 @@ cdef class Sender:
914932
cdef line_sender_error* err = NULL
915933
cdef line_sender_buffer* c_buf = NULL
916934
if self._impl == NULL:
917-
raise IlpError(
918-
IlpErrorCode.InvalidApiCall,
935+
raise IngressError(
936+
IngressErrorCode.InvalidApiCall,
919937
'flush() can\'t be called: Not connected.')
920938
if buffer is not None:
921939
c_buf = buffer._impl
@@ -959,9 +977,3 @@ cdef class Sender:
959977

960978
def __dealloc__(self):
961979
self._close()
962-
963-
964-
# cdef int may_flush_on_row_complete(
965-
# line_sender_buffer* buf,
966-
# void* ctx) except -1:
967-
# pass

test/system_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from fixture import QuestDbFixture, install_questdb, CA_PATH, AUTH
1313

1414

15-
import questdb.ilp as ilp
15+
import questdb.ingress as qi
1616

1717

1818
QUESTDB_VERSION = '6.4.2'
@@ -69,7 +69,7 @@ def _test_scenario(self, qdb, auth, tls):
6969
port = qdb.tls_line_tcp_port if tls else qdb.line_tcp_port
7070
pending = None
7171
table_name = uuid.uuid4().hex
72-
with ilp.Sender('localhost', port, auth=auth, tls=tls) as sender:
72+
with qi.Sender('localhost', port, auth=auth, tls=tls) as sender:
7373
for _ in range(3):
7474
sender.row(
7575
table_name,

0 commit comments

Comments
 (0)