Skip to content

Commit af1b187

Browse files
authored
docs: Add ticking data example with timer-based flushing. (#7)
1 parent 7112da1 commit af1b187

File tree

5 files changed

+67
-9
lines changed

5 files changed

+67
-9
lines changed

docs/examples.rst

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,14 @@ all data is sent.
4646
.. literalinclude:: ../examples/buffer.py
4747
:language: python
4848

49+
50+
Ticking Random Data and Timer-based Flush
51+
=========================================
52+
53+
The following example somewhat mimics the behavior of a loop in an application.
54+
55+
It creates random ticking data at a random interval and flushes it explicitly
56+
based on a timer if the auto-flushing logic was not triggered recently.
57+
58+
.. literalinclude:: ../examples/random_data.py
59+
:language: python

examples/auth_and_tls.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
from questdb.ingress import Sender
22

33

4-
def example():
4+
def example(host: str = 'localhost', port: int = 9009):
55
# See: https://questdb.io/docs/reference/api/ilp/authenticate
66
auth = (
77
"testUser1", # kid
88
"5UjEMuA0Pj5pjK8a-fa24dyIf-Es5mYny3oE_Wmus48", # d
99
"fLKYEaoEb9lrn3nkwLDA-M_xnuFOdSt9y0Z7_vWSHLU", # x
1010
"Dt5tbS1dEDMSYfym3fgMv0B99szno-dFc1rYF9t0aac") # y
11-
with Sender('localhost', 9009, auth=auth, tls=True) as sender:
11+
with Sender(host, port, auth=auth, tls=True) as sender:
1212
sender.row(
1313
'line_sender_example',
1414
symbols={'id': 'OMEGA'},
@@ -21,4 +21,4 @@ def example():
2121

2222

2323
if __name__ == '__main__':
24-
example()
24+
example()

examples/basic.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
from questdb.ingress import Sender
22

33

4-
def example():
5-
with Sender('localhost', 9009) as sender:
4+
def example(host: str = 'localhost', port: int = 9009):
5+
with Sender(host, port) as sender:
66
sender.row(
77
'line_sender_example',
88
symbols={'id': 'OMEGA'},
@@ -15,4 +15,4 @@ def example():
1515

1616

1717
if __name__ == '__main__':
18-
example()
18+
example()

examples/buffer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
from questdb.ingress import Sender, TimestampNanos
22

33

4-
def example():
5-
with Sender('localhost', 9009) as sender:
4+
def example(host: str = 'localhost', port: int = 9009):
5+
with Sender(host, port) as sender:
66
buffer = sender.new_buffer()
77
buffer.row(
88
'line_sender_buffer_example',
@@ -18,4 +18,4 @@ def example():
1818

1919

2020
if __name__ == '__main__':
21-
example()
21+
example()

examples/random_data.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
from questdb.ingress import Sender
2+
import random
3+
import uuid
4+
import time
5+
6+
7+
def example(host: str = 'localhost', port: int = 9009):
8+
table_name: str = str(uuid.uuid1())
9+
watermark = 1024 # Flush if the internal buffer exceeds 1KiB
10+
with Sender(host=host, port=port, auto_flush=watermark) as sender:
11+
total_rows = 0
12+
last_flush = time.monotonic()
13+
try:
14+
print("Ctrl^C to terminate...")
15+
while True:
16+
time.sleep(random.randint(0, 750) / 1000) # sleep up to 750 ms
17+
18+
print('Inserting row...')
19+
sender.row(
20+
table_name,
21+
symbols={
22+
'src': random.choice(('ALPHA', 'BETA', 'OMEGA')),
23+
'dst': random.choice(('ALPHA', 'BETA', 'OMEGA'))},
24+
columns={
25+
'price': random.randint(200, 500),
26+
'qty': random.randint(1, 5)})
27+
total_rows += 1
28+
29+
# If the internal buffer is empty, then auto-flush triggered.
30+
if len(sender) == 0:
31+
print('Auto-flush triggered.')
32+
last_flush = time.monotonic()
33+
34+
# Flush at least once every five seconds.
35+
if time.monotonic() - last_flush > 5:
36+
print('Timer-flushing triggered.')
37+
sender.flush()
38+
last_flush = time.monotonic()
39+
40+
except KeyboardInterrupt:
41+
print(f"table: {table_name}, total rows sent: {total_rows}")
42+
print("(wait commitLag for all rows to be available)")
43+
print("bye!")
44+
45+
46+
if __name__ == '__main__':
47+
example()

0 commit comments

Comments
 (0)