Subscribing to Data¶
This guide shows you how to use ReductStore Python SDK to subscribe to new records in a bucket.
Prerequisites¶
If you don't have a ReductStore instance running, you can easily spin one up as a Docker container. To do this, run the following command:
docker run -p 8383:8383 reduct/store:latest
Also, if you haven't already installed the SDK, you can use the pip
package manager to install the reduct-py
package:
pip install reduct-py
Example¶
For demonstration purposes, we will create a script with two coroutines:
writer
will write data to an entry in a bucket with the current timestamp and the labelgood
.subscriber
will subscribe to the entry and records which have the labelgood
equal toTrue
.
When the subscriber
will receive 10 records, it stops the subscription and the writer
.
This is the whole script:
import asyncio
from time import time_ns
from reduct import Client, Bucket
client = Client("http://127.0.0.1:8383")
running = True
async def writer():
"""Write a blob with toggling good flag"""
bucket: Bucket = await client.create_bucket("bucket", exist_ok=True)
good = True
for _ in range(21):
data = b"Some blob of data"
ts = int(time_ns() / 10000)
await bucket.write("entry-1", data, ts, labels=dict(good=good))
print(f"Writer: Record written: ts={ts}, good={good}")
good = not good
await asyncio.sleep(1)
# --8<-- [start:subscriber]
async def subscriber():
"""Subscribe on good records and exit after ten received"""
global running
bucket: Bucket = await client.create_bucket("bucket", exist_ok=True)
counter = 0
async for record in bucket.subscribe(
"entry-1",
start=int(time_ns() / 10000),
poll_interval=0.2,
include=dict(good=True),
):
print(
f"Subscriber: Good record received: ts={record.timestamp}, labels={record.labels}"
)
counter += 1
if counter == 10:
break
# --8<-- [end:subscriber]
async def main():
await asyncio.gather(writer(), subscriber())
if __name__ == "__main__":
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
The most important part of the script is the subscriber
coroutine:
async def subscriber():
"""Subscribe on good records and exit after ten received"""
global running
bucket: Bucket = await client.create_bucket("bucket", exist_ok=True)
counter = 0
async for record in bucket.subscribe(
"entry-1",
start=int(time_ns() / 10000),
poll_interval=0.2,
include=dict(good=True),
):
print(
f"Subscriber: Good record received: ts={record.timestamp}, labels={record.labels}"
)
counter += 1
if counter == 10:
break
The subcribe
method queries records from the entry-1
entry from the current time and subscribes to new records that have
the label good
equal true
. Since ReductStore provides an HTTP API, the subscribe
method polls the entry for
each poll_interval
seconds.
When to use subscribing¶
The subscribing is useful when you want your application to be notified when new records are added to an entry. Some possible use cases are:
- You can use ReductStore as a simple message broker with persistent storage.
- Replication of data from one ReductStore instance to another one. For example, your can subscribe to records with certain labels and write them to another ReductStore instance for long-term storage.
- Ingesting data from a ReductStore instance to a data warehouse.