2626# cython: language_level=3
2727# cython: binding=True
2828
29+ """
30+ API for fast data ingestion into QuestDB.
31+ """
32+
2933from libc.stdint cimport uint8_t, int64_t
3034from cpython.datetime cimport datetime
3135from cpython.bool cimport bool , PyBool_Check
36+ from cpython.weakref cimport PyWeakref_NewRef, PyWeakref_GetObject
37+ from cpython.object cimport PyObject
3238
3339from .line_sender cimport *
3440
@@ -42,9 +48,12 @@ cdef extern from "Python.h":
4248 PyUnicode_2BYTE_KIND
4349 PyUnicode_4BYTE_KIND
4450
51+ # Note: Returning an `object` rather than `PyObject` as the function
52+ # returns a new reference rather than borrowing an existing one.
4553 object PyUnicode_FromKindAndData(
4654 int kind, const void * buffer , Py_ssize_t size)
4755
56+ # Ditto, see comment on why not returning a `PyObject` above.
4857 str PyUnicode_FromStringAndSize(
4958 const char * u, Py_ssize_t size)
5059
@@ -71,7 +80,7 @@ import pathlib
7180
7281import sys
7382
74- class IlpErrorCode (Enum ):
83+ class IngressErrorCode (Enum ):
7584 """ Category of Error."""
7685 CouldNotResolveAddr = line_sender_error_could_not_resolve_addr
7786 InvalidApiCall = line_sender_error_invalid_api_call
@@ -86,36 +95,36 @@ class IlpErrorCode(Enum):
8695 return self .name
8796
8897
89- class IlpError (Exception ):
98+ class IngressError (Exception ):
9099 """
91- An error whilst using the line sender or constructing its buffer .
100+ An error whilst using the ``Sender`` or constructing its ``Buffer`` .
92101 """
93102 def __init__ (self , code , msg ):
94103 super ().__init__(msg)
95104 self ._code = code
96105
97106 @property
98- def code (self ) -> IlpErrorCode :
107+ def code (self ) -> IngressErrorCode :
99108 return self._code
100109
101110
102111cdef inline object c_err_code_to_py(line_sender_error_code code ):
103112 if code == line_sender_error_could_not_resolve_addr:
104- return IlpErrorCode .CouldNotResolveAddr
113+ return IngressErrorCode .CouldNotResolveAddr
105114 elif code == line_sender_error_invalid_api_call:
106- return IlpErrorCode .InvalidApiCall
115+ return IngressErrorCode .InvalidApiCall
107116 elif code == line_sender_error_socket_error:
108- return IlpErrorCode .SocketError
117+ return IngressErrorCode .SocketError
109118 elif code == line_sender_error_invalid_utf8:
110- return IlpErrorCode .InvalidUtf8
119+ return IngressErrorCode .InvalidUtf8
111120 elif code == line_sender_error_invalid_name:
112- return IlpErrorCode .InvalidName
121+ return IngressErrorCode .InvalidName
113122 elif code == line_sender_error_invalid_timestamp:
114- return IlpErrorCode .InvalidTimestamp
123+ return IngressErrorCode .InvalidTimestamp
115124 elif code == line_sender_error_auth_error:
116- return IlpErrorCode .AuthError
125+ return IngressErrorCode .AuthError
117126 elif code == line_sender_error_tls_error:
118- return IlpErrorCode .TlsError
127+ return IngressErrorCode .TlsError
119128 else :
120129 raise ValueError (' Internal error converting error code.' )
121130
@@ -134,7 +143,7 @@ cdef inline object c_err_to_py(line_sender_error* err):
134143 PyUnicode_1BYTE_KIND,
135144 c_msg,
136145 < Py_ssize_t> c_len)
137- return IlpError (py_code, py_msg)
146+ return IngressError (py_code, py_msg)
138147 finally :
139148 line_sender_error_free(err)
140149
@@ -256,10 +265,15 @@ cdef class TimestampNanos:
256265 return self._value
257266
258267
259- ctypedef bint (*row_complete_cb )(
260- line_sender_buffer*,
261- void* ctx ,
262- line_sender_error**)
268+ cdef class Sender
269+
270+
271+ cdef int may_flush_on_row_complete(
272+ line_sender_buffer* buf ,
273+ Sender sender ) except -1:
274+ if sender._auto_flush_enabled:
275+ if line_sender_buffer_size(buf ) >= sender._auto_flush_watermark:
276+ print('w00t')
263277
264278
265279cdef class Buffer:
@@ -268,7 +282,7 @@ cdef class Buffer:
268282
269283 .. code-block:: python
270284
271- from questdb.ilp import Buffer
285+ from questdb.ing import Buffer
272286
273287 buf = Buffer()
274288 buf.row(
@@ -293,8 +307,7 @@ cdef class Buffer:
293307 * To see the contents , call ``str(buffer )``.
294308 """
295309 cdef line_sender_buffer* _impl
296- cdef row_complete_cb _row_complete_cb
297- cdef void* _row_complete_ctx
310+ cdef object _row_complete_sender
298311
299312 def __cinit__(self , init_capacity: int = 65536 , max_name_len: int = 127 ):
300313 """
@@ -307,12 +320,10 @@ cdef class Buffer:
307320 cdef inline _cinit_impl(self , size_t init_capacity, size_t max_name_len):
308321 self ._impl = line_sender_buffer_with_max_name_len(max_name_len)
309322 line_sender_buffer_reserve(self ._impl, init_capacity)
310- self ._row_complete_cb = NULL
311- self ._row_complete_ctx = NULL
323+ self ._row_complete_sender = None
312324
313325 def __dealloc__ (self ):
314- self ._row_complete_cb = NULL
315- self ._row_complete_ctx = NULL
326+ self ._row_complete_sender = None
316327 line_sender_buffer_free(self ._impl)
317328
318329 def reserve (self , additional: int ):
@@ -461,12 +472,13 @@ cdef class Buffer:
461472
462473 cdef inline int _may_trigger_row_complete(self ) except - 1 :
463474 cdef line_sender_error* err = NULL
464- if self ._row_complete_cb != NULL :
465- if not self ._row_complete_cb(
475+ cdef PyObject* sender = NULL
476+ if self ._row_complete_sender != None :
477+ sender = PyWeakref_GetObject(self ._row_complete_sender)
478+ if sender != NULL :
479+ may_flush_on_row_complete(
466480 self ._impl,
467- self ._row_complete_ctx,
468- & err):
469- raise c_err_to_py(err)
481+ < Sender>< object > sender)
470482
471483 cdef inline int _at_ts(self , TimestampNanos ts) except - 1 :
472484 cdef line_sender_error* err = NULL
@@ -515,8 +527,8 @@ cdef class Buffer:
515527 try :
516528 self ._table(table_name)
517529 if not (symbols or columns):
518- raise IlpError (
519- IlpErrorCode .InvalidApiCall,
530+ raise IngressError (
531+ IngressErrorCode .InvalidApiCall,
520532 ' Must specify at least one symbol or column' )
521533 if symbols is not None :
522534 for name, value in symbols.items():
@@ -752,6 +764,10 @@ cdef class Buffer:
752764
753765
754766cdef class Sender:
767+ # We need the Buffer held by a Sender can hold a weakref to its Sender.
768+ # This avoids a circular reference that requires the GC to clean up.
769+ cdef object __weakref__
770+
755771 cdef line_sender_opts* _opts
756772 cdef line_sender* _impl
757773 cdef Buffer _buffer
@@ -868,7 +884,7 @@ cdef class Sender:
868884 init_capacity = init_capacity,
869885 max_name_len = max_name_len)
870886
871- self ._auto_flush_enabled = auto_flush is not None
887+ self ._auto_flush_enabled = not not auto_flush
872888 self ._auto_flush_watermark = int (auto_flush) \
873889 if self ._auto_flush_enabled else 0
874890
@@ -886,16 +902,18 @@ cdef class Sender:
886902 def connect (self ):
887903 cdef line_sender_error* err = NULL
888904 if self ._opts == NULL :
889- raise IlpError (
890- IlpErrorCode .InvalidApiCall,
905+ raise IngressError (
906+ IngressErrorCode .InvalidApiCall,
891907 ' connect() can\' t be called after close().' )
892908 self ._impl = line_sender_connect(self ._opts, & err)
893909 if self ._impl == NULL :
894910 raise c_err_to_py(err)
895911 line_sender_opts_free(self ._opts)
896912 self ._opts = NULL
897- # self._buffer._row_complete_cb = may_flush_on_row_complete
898- # self._buffer._row_complete_ctx = self
913+
914+ # Request callbacks when rows are complete.
915+ if self ._buffer is not None :
916+ self ._buffer._row_complete_sender = PyWeakref_NewRef(self , None )
899917
900918 def __enter__ (self ):
901919 self .connect()
@@ -914,8 +932,8 @@ cdef class Sender:
914932 cdef line_sender_error* err = NULL
915933 cdef line_sender_buffer* c_buf = NULL
916934 if self ._impl == NULL :
917- raise IlpError (
918- IlpErrorCode .InvalidApiCall,
935+ raise IngressError (
936+ IngressErrorCode .InvalidApiCall,
919937 ' flush() can\' t be called: Not connected.' )
920938 if buffer is not None :
921939 c_buf = buffer ._impl
@@ -959,9 +977,3 @@ cdef class Sender:
959977
960978 def __dealloc__ (self ):
961979 self ._close()
962-
963-
964- # cdef int may_flush_on_row_complete(
965- # line_sender_buffer* buf,
966- # void* ctx) except -1:
967- # pass
0 commit comments