Skip to content

Bucket

A bucket of data in Reduct Storage

Source code in reduct/bucket.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
class Bucket:
    """A bucket of data in Reduct Storage"""

    def __init__(self, name: str, http: HttpClient):
        self._http = http
        self.name = name

    async def get_settings(self) -> BucketSettings:
        """
        Get current bucket settings
        Returns:
             BucketSettings:
        Raises:
            ReductError: if there is an HTTP error
        """
        return (await self.get_full_info()).settings

    async def set_settings(self, settings: BucketSettings):
        """
        Update bucket settings
        Args:
            settings: new settings
        Raises:
            ReductError: if there is an HTTP error
        """
        await self._http.request_all("PUT", f"/b/{self.name}", data=settings.json())

    async def info(self) -> BucketInfo:
        """
        Get statistics about bucket
        Returns:
           BucketInfo:
        Raises:
            ReductError: if there is an HTTP error
        """
        return (await self.get_full_info()).info

    async def get_entry_list(self) -> List[EntryInfo]:
        """
        Get list of entries with their stats
        Returns:
            List[EntryInfo]
        Raises:
            ReductError: if there is an HTTP error
        """
        return (await self.get_full_info()).entries

    async def remove(self):
        """
        Remove bucket
        Raises:
            ReductError: if there is an HTTP error
        """
        await self._http.request_all("DELETE", f"/b/{self.name}")

    async def remove_entry(self, entry_name: str):
        """
        Remove entry from bucket
        Args:
            entry_name: name of entry
        Raises:
            ReductError: if there is an HTTP error
        """
        await self._http.request_all("DELETE", f"/b/{self.name}/{entry_name}")

    @asynccontextmanager
    async def read(
        self,
        entry_name: str,
        timestamp: Optional[Union[int, datetime, float, str]] = None,
        head: bool = False,
    ) -> Record:
        """
        Read a record from entry
        Args:
            entry_name: name of entry in the bucket. If None: get the latest record.
                The timestamp can be int (UNIX timestamp in microseconds),
                datetime, float (UNIX timestamp in seconds), or str (ISO 8601 string).
            timestamp: UNIX timestamp in microseconds - if None: get the latest record
            head: if True: get only the header of a recod with metadata
        Returns:
            async context with a record
        Raises:
            ReductError: if there is an HTTP error
        Examples:
            >>> async def reader():
            >>>     async with bucket.read("entry", timestamp=123456789) as record:
            >>>         data = await record.read_all()
        """
        params = {"ts": unix_timestamp_from_any(timestamp)} if timestamp else None
        method = "HEAD" if head else "GET"
        async with self._http.request(
            method, f"/b/{self.name}/{entry_name}", params=params
        ) as resp:
            yield parse_record(resp)

    async def write(
        self,
        entry_name: str,
        data: Union[bytes, AsyncIterator[bytes]],
        timestamp: Optional[Union[int, datetime, float, str]] = None,
        content_length: Optional[int] = None,
        **kwargs,
    ):
        """
        Write a record to entry

        Args:
            entry_name: name of entry in the bucket
            data: bytes to write or async iterator
            timestamp: timestamp of record. int (UNIX timestamp in microseconds),
                datetime, float (UNIX timestamp in seconds), str (ISO 8601 string).
                If None: current time
            content_length: content size in bytes,
                needed only when the data is an iterator
        Keyword Args:
            labels (dict): labels as key-values
            content_type (str): content type of data
        Raises:
            ReductError: if there is an HTTP error

        Examples:
            >>> await bucket.write("entry-1", b"some_data",
            >>>    timestamp="2021-09-10T10:30:00")
            >>>
            >>> # You can write data chunk-wise using an asynchronous iterator and the
            >>> # size of the content:
            >>>
            >>> async def sender():
            >>>     for chunk in [b"part1", b"part2", b"part3"]:
            >>>         yield chunk
            >>> await bucket.write("entry-1", sender(), content_length=15)

        """
        timestamp = unix_timestamp_from_any(
            timestamp if timestamp else int(time.time_ns() / 1000)
        )
        params = {"ts": timestamp}
        await self._http.request_all(
            "POST",
            f"/b/{self.name}/{entry_name}",
            params=params,
            data=data,
            content_length=content_length if content_length is not None else len(data),
            **kwargs,
        )

    async def write_batch(
        self, entry_name: str, batch: Batch
    ) -> Dict[int, ReductError]:
        """
        Write a batch of records to entries in a sole request

        Args:
            entry_name: name of entry in the bucket
            batch: list of records
        Returns:
            dict of errors with timestamps as keys
        Raises:
            ReductError: if there is an HTTP  or communication error
        """

        record_headers = {}
        content_length = 0
        for time_stamp, record in batch.items():
            content_length += record.size
            header = f"{record.size},{record.content_type}"
            for label, value in record.labels.items():
                if "," in label or "=" in label:
                    header += f',{label}="{value}"'
                else:
                    header += f",{label}={value}"

            record_headers[f"{TIME_PREFIX}{time_stamp}"] = header

        async def iter_body():
            for _, rec in batch.items():
                yield await rec.read_all()

        _, headers = await self._http.request_all(
            "POST",
            f"/b/{self.name}/{entry_name}/batch",
            data=iter_body(),
            extra_headers=record_headers,
            content_length=content_length,
        )

        errors = {}
        for key, value in headers.items():
            if key.startswith(ERROR_PREFIX):
                errors[int(key[len(ERROR_PREFIX) :])] = ReductError.from_header(value)

        return errors

    async def query(
        self,
        entry_name: str,
        start: Optional[Union[int, datetime, float, str]] = None,
        stop: Optional[Union[int, datetime, float, str]] = None,
        ttl: Optional[int] = None,
        **kwargs,
    ) -> AsyncIterator[Record]:
        """
        Query data for a time interval
        The time interval is defined by the start and stop parameters that can be:
        int (UNIX timestamp in microseconds), datetime,
        float (UNIX timestamp in seconds) or str (ISO 8601 string).

        Args:
            entry_name: name of entry in the bucket
            start: the beginning of the time interval.
                If None, then from the first record
            stop: the end of the time interval. If None, then to the latest record
            ttl: Time To Live of the request in seconds
        Keyword Args:
            include (dict): query records which have all labels from this dict
            exclude (dict): query records which doesn't have all labels from this
            head (bool): if True: get only the header of a recod with metadata
            limit (int): limit the number of records
        Returns:
             AsyncIterator[Record]: iterator to the records

        Examples:
            >>> async for record in bucket.query("entry-1", stop=time.time_ns() / 1000):
            >>>     data: bytes = record.read_all()
            >>>     # or
            >>>     async for chunk in record.read(n=1024):
            >>>         print(chunk)
        """
        start = unix_timestamp_from_any(start) if start else None
        stop = unix_timestamp_from_any(stop) if stop else None

        query_id = await self._query(entry_name, start, stop, ttl, **kwargs)
        last = False
        method = "HEAD" if "head" in kwargs and kwargs["head"] else "GET"

        if self._http.api_version and self._http.api_version >= "1.5":
            while not last:
                async with self._http.request(
                    method, f"/b/{self.name}/{entry_name}/batch?q={query_id}"
                ) as resp:
                    if resp.status == 204:
                        return
                    async for record in parse_batched_records(resp):
                        last = record.last
                        yield record
        else:
            while not last:
                async with self._http.request(
                    method, f"/b/{self.name}/{entry_name}?q={query_id}"
                ) as resp:
                    if resp.status == 204:
                        return
                    last = int(resp.headers["x-reduct-last"]) != 0
                    yield parse_record(resp, last)

    async def get_full_info(self) -> BucketFullInfo:
        """
        Get full information about bucket (settings, statistics, entries)
        """
        body, _ = await self._http.request_all("GET", f"/b/{self.name}")
        return BucketFullInfo.model_validate_json(body)

    async def subscribe(
        self,
        entry_name: str,
        start: Optional[Union[int, datetime, float, str]] = None,
        poll_interval=1.0,
        **kwargs,
    ) -> AsyncIterator[Record]:
        """
        Query records from the start timestamp and wait for new records
        The time interval is defined by the start and stop parameters
        that can be: int (UNIX timestamp in microseconds) datetime,
        float (UNIX timestamp in seconds) or str (ISO 8601 string).

        Args:
            entry_name: name of entry in the bucket
            start: the beginning timestamp to read records.
                If None, then from the first record.
            poll_interval: inteval to ask new records in seconds
        Keyword Args:
            include (dict): query records which have all labels from this dict
            exclude (dict): query records which doesn't have all labels from this dict
            head (bool): if True: get only the header of a recod with metadata
        Returns:
             AsyncIterator[Record]: iterator to the records

        Examples:
            >>> async for record in bucket.subscribes("entry-1"):
            >>>     data: bytes = record.read_all()
            >>>     # or
            >>>     async for chunk in record.read(n=1024):
            >>>         print(chunk)
        """
        query_id = await self._query(
            entry_name, start, None, poll_interval * 2 + 1, continuous=True, **kwargs
        )

        method = "HEAD" if "head" in kwargs and kwargs["head"] else "GET"
        if self._http.api_version and self._http.api_version >= "1.5":
            while True:
                async with self._http.request(
                    method, f"/b/{self.name}/{entry_name}/batch?q={query_id}"
                ) as resp:
                    if resp.status == 204:
                        await asyncio.sleep(poll_interval)
                        continue

                    async for record in parse_batched_records(resp):
                        yield record
        else:
            while True:
                async with self._http.request(
                    method, f"/b/{self.name}/{entry_name}?q={query_id}"
                ) as resp:
                    if resp.status == 204:
                        await asyncio.sleep(poll_interval)
                        continue

                    yield parse_record(resp, False)

    async def _query(self, entry_name, start, stop, ttl, **kwargs):
        params = {}
        if start:
            params["start"] = int(start)
        if stop:
            params["stop"] = int(stop)
        if ttl:
            params["ttl"] = int(ttl)

        if "include" in kwargs:
            for name, value in kwargs["include"].items():
                params[f"include-{name}"] = str(value)
        if "exclude" in kwargs:
            for name, value in kwargs["exclude"].items():
                params[f"exclude-{name}"] = str(value)

        if "continuous" in kwargs:
            params["continuous"] = "true" if kwargs["continuous"] else "false"

        if "limit" in kwargs:
            params["limit"] = kwargs["limit"]

        url = f"/b/{self.name}/{entry_name}"
        data, _ = await self._http.request_all(
            "GET",
            f"{url}/q",
            params=params,
        )
        query_id = json.loads(data)["id"]
        return query_id

get_entry_list() async

Get list of entries with their stats Returns: List[EntryInfo] Raises: ReductError: if there is an HTTP error

Source code in reduct/bucket.py
149
150
151
152
153
154
155
156
157
async def get_entry_list(self) -> List[EntryInfo]:
    """
    Get list of entries with their stats
    Returns:
        List[EntryInfo]
    Raises:
        ReductError: if there is an HTTP error
    """
    return (await self.get_full_info()).entries

get_full_info() async

Get full information about bucket (settings, statistics, entries)

Source code in reduct/bucket.py
368
369
370
371
372
373
async def get_full_info(self) -> BucketFullInfo:
    """
    Get full information about bucket (settings, statistics, entries)
    """
    body, _ = await self._http.request_all("GET", f"/b/{self.name}")
    return BucketFullInfo.model_validate_json(body)

get_settings() async

Get current bucket settings Returns: BucketSettings: Raises: ReductError: if there is an HTTP error

Source code in reduct/bucket.py
119
120
121
122
123
124
125
126
127
async def get_settings(self) -> BucketSettings:
    """
    Get current bucket settings
    Returns:
         BucketSettings:
    Raises:
        ReductError: if there is an HTTP error
    """
    return (await self.get_full_info()).settings

info() async

Get statistics about bucket Returns: BucketInfo: Raises: ReductError: if there is an HTTP error

Source code in reduct/bucket.py
139
140
141
142
143
144
145
146
147
async def info(self) -> BucketInfo:
    """
    Get statistics about bucket
    Returns:
       BucketInfo:
    Raises:
        ReductError: if there is an HTTP error
    """
    return (await self.get_full_info()).info

query(entry_name, start=None, stop=None, ttl=None, **kwargs) async

Query data for a time interval The time interval is defined by the start and stop parameters that can be: int (UNIX timestamp in microseconds), datetime, float (UNIX timestamp in seconds) or str (ISO 8601 string).

Parameters:

Name Type Description Default
entry_name str

name of entry in the bucket

required
start Optional[Union[int, datetime, float, str]]

the beginning of the time interval. If None, then from the first record

None
stop Optional[Union[int, datetime, float, str]]

the end of the time interval. If None, then to the latest record

None
ttl Optional[int]

Time To Live of the request in seconds

None

Keyword Args: include (dict): query records which have all labels from this dict exclude (dict): query records which doesn't have all labels from this head (bool): if True: get only the header of a recod with metadata limit (int): limit the number of records Returns: AsyncIterator[Record]: iterator to the records

Examples:

>>> async for record in bucket.query("entry-1", stop=time.time_ns() / 1000):
>>>     data: bytes = record.read_all()
>>>     # or
>>>     async for chunk in record.read(n=1024):
>>>         print(chunk)
Source code in reduct/bucket.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
async def query(
    self,
    entry_name: str,
    start: Optional[Union[int, datetime, float, str]] = None,
    stop: Optional[Union[int, datetime, float, str]] = None,
    ttl: Optional[int] = None,
    **kwargs,
) -> AsyncIterator[Record]:
    """
    Query data for a time interval
    The time interval is defined by the start and stop parameters that can be:
    int (UNIX timestamp in microseconds), datetime,
    float (UNIX timestamp in seconds) or str (ISO 8601 string).

    Args:
        entry_name: name of entry in the bucket
        start: the beginning of the time interval.
            If None, then from the first record
        stop: the end of the time interval. If None, then to the latest record
        ttl: Time To Live of the request in seconds
    Keyword Args:
        include (dict): query records which have all labels from this dict
        exclude (dict): query records which doesn't have all labels from this
        head (bool): if True: get only the header of a recod with metadata
        limit (int): limit the number of records
    Returns:
         AsyncIterator[Record]: iterator to the records

    Examples:
        >>> async for record in bucket.query("entry-1", stop=time.time_ns() / 1000):
        >>>     data: bytes = record.read_all()
        >>>     # or
        >>>     async for chunk in record.read(n=1024):
        >>>         print(chunk)
    """
    start = unix_timestamp_from_any(start) if start else None
    stop = unix_timestamp_from_any(stop) if stop else None

    query_id = await self._query(entry_name, start, stop, ttl, **kwargs)
    last = False
    method = "HEAD" if "head" in kwargs and kwargs["head"] else "GET"

    if self._http.api_version and self._http.api_version >= "1.5":
        while not last:
            async with self._http.request(
                method, f"/b/{self.name}/{entry_name}/batch?q={query_id}"
            ) as resp:
                if resp.status == 204:
                    return
                async for record in parse_batched_records(resp):
                    last = record.last
                    yield record
    else:
        while not last:
            async with self._http.request(
                method, f"/b/{self.name}/{entry_name}?q={query_id}"
            ) as resp:
                if resp.status == 204:
                    return
                last = int(resp.headers["x-reduct-last"]) != 0
                yield parse_record(resp, last)

read(entry_name, timestamp=None, head=False) async

Read a record from entry Args: entry_name: name of entry in the bucket. If None: get the latest record. The timestamp can be int (UNIX timestamp in microseconds), datetime, float (UNIX timestamp in seconds), or str (ISO 8601 string). timestamp: UNIX timestamp in microseconds - if None: get the latest record head: if True: get only the header of a recod with metadata Returns: async context with a record Raises: ReductError: if there is an HTTP error Examples: >>> async def reader(): >>> async with bucket.read("entry", timestamp=123456789) as record: >>> data = await record.read_all()

Source code in reduct/bucket.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
@asynccontextmanager
async def read(
    self,
    entry_name: str,
    timestamp: Optional[Union[int, datetime, float, str]] = None,
    head: bool = False,
) -> Record:
    """
    Read a record from entry
    Args:
        entry_name: name of entry in the bucket. If None: get the latest record.
            The timestamp can be int (UNIX timestamp in microseconds),
            datetime, float (UNIX timestamp in seconds), or str (ISO 8601 string).
        timestamp: UNIX timestamp in microseconds - if None: get the latest record
        head: if True: get only the header of a recod with metadata
    Returns:
        async context with a record
    Raises:
        ReductError: if there is an HTTP error
    Examples:
        >>> async def reader():
        >>>     async with bucket.read("entry", timestamp=123456789) as record:
        >>>         data = await record.read_all()
    """
    params = {"ts": unix_timestamp_from_any(timestamp)} if timestamp else None
    method = "HEAD" if head else "GET"
    async with self._http.request(
        method, f"/b/{self.name}/{entry_name}", params=params
    ) as resp:
        yield parse_record(resp)

remove() async

Remove bucket Raises: ReductError: if there is an HTTP error

Source code in reduct/bucket.py
159
160
161
162
163
164
165
async def remove(self):
    """
    Remove bucket
    Raises:
        ReductError: if there is an HTTP error
    """
    await self._http.request_all("DELETE", f"/b/{self.name}")

remove_entry(entry_name) async

Remove entry from bucket Args: entry_name: name of entry Raises: ReductError: if there is an HTTP error

Source code in reduct/bucket.py
167
168
169
170
171
172
173
174
175
async def remove_entry(self, entry_name: str):
    """
    Remove entry from bucket
    Args:
        entry_name: name of entry
    Raises:
        ReductError: if there is an HTTP error
    """
    await self._http.request_all("DELETE", f"/b/{self.name}/{entry_name}")

set_settings(settings) async

Update bucket settings Args: settings: new settings Raises: ReductError: if there is an HTTP error

Source code in reduct/bucket.py
129
130
131
132
133
134
135
136
137
async def set_settings(self, settings: BucketSettings):
    """
    Update bucket settings
    Args:
        settings: new settings
    Raises:
        ReductError: if there is an HTTP error
    """
    await self._http.request_all("PUT", f"/b/{self.name}", data=settings.json())

subscribe(entry_name, start=None, poll_interval=1.0, **kwargs) async

Query records from the start timestamp and wait for new records The time interval is defined by the start and stop parameters that can be: int (UNIX timestamp in microseconds) datetime, float (UNIX timestamp in seconds) or str (ISO 8601 string).

Parameters:

Name Type Description Default
entry_name str

name of entry in the bucket

required
start Optional[Union[int, datetime, float, str]]

the beginning timestamp to read records. If None, then from the first record.

None
poll_interval

inteval to ask new records in seconds

1.0

Keyword Args: include (dict): query records which have all labels from this dict exclude (dict): query records which doesn't have all labels from this dict head (bool): if True: get only the header of a recod with metadata Returns: AsyncIterator[Record]: iterator to the records

Examples:

>>> async for record in bucket.subscribes("entry-1"):
>>>     data: bytes = record.read_all()
>>>     # or
>>>     async for chunk in record.read(n=1024):
>>>         print(chunk)
Source code in reduct/bucket.py
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
async def subscribe(
    self,
    entry_name: str,
    start: Optional[Union[int, datetime, float, str]] = None,
    poll_interval=1.0,
    **kwargs,
) -> AsyncIterator[Record]:
    """
    Query records from the start timestamp and wait for new records
    The time interval is defined by the start and stop parameters
    that can be: int (UNIX timestamp in microseconds) datetime,
    float (UNIX timestamp in seconds) or str (ISO 8601 string).

    Args:
        entry_name: name of entry in the bucket
        start: the beginning timestamp to read records.
            If None, then from the first record.
        poll_interval: inteval to ask new records in seconds
    Keyword Args:
        include (dict): query records which have all labels from this dict
        exclude (dict): query records which doesn't have all labels from this dict
        head (bool): if True: get only the header of a recod with metadata
    Returns:
         AsyncIterator[Record]: iterator to the records

    Examples:
        >>> async for record in bucket.subscribes("entry-1"):
        >>>     data: bytes = record.read_all()
        >>>     # or
        >>>     async for chunk in record.read(n=1024):
        >>>         print(chunk)
    """
    query_id = await self._query(
        entry_name, start, None, poll_interval * 2 + 1, continuous=True, **kwargs
    )

    method = "HEAD" if "head" in kwargs and kwargs["head"] else "GET"
    if self._http.api_version and self._http.api_version >= "1.5":
        while True:
            async with self._http.request(
                method, f"/b/{self.name}/{entry_name}/batch?q={query_id}"
            ) as resp:
                if resp.status == 204:
                    await asyncio.sleep(poll_interval)
                    continue

                async for record in parse_batched_records(resp):
                    yield record
    else:
        while True:
            async with self._http.request(
                method, f"/b/{self.name}/{entry_name}?q={query_id}"
            ) as resp:
                if resp.status == 204:
                    await asyncio.sleep(poll_interval)
                    continue

                yield parse_record(resp, False)

write(entry_name, data, timestamp=None, content_length=None, **kwargs) async

Write a record to entry

Parameters:

Name Type Description Default
entry_name str

name of entry in the bucket

required
data Union[bytes, AsyncIterator[bytes]]

bytes to write or async iterator

required
timestamp Optional[Union[int, datetime, float, str]]

timestamp of record. int (UNIX timestamp in microseconds), datetime, float (UNIX timestamp in seconds), str (ISO 8601 string). If None: current time

None
content_length Optional[int]

content size in bytes, needed only when the data is an iterator

None

Keyword Args: labels (dict): labels as key-values content_type (str): content type of data Raises: ReductError: if there is an HTTP error

Examples:

>>> await bucket.write("entry-1", b"some_data",
>>>    timestamp="2021-09-10T10:30:00")
>>>
>>> # You can write data chunk-wise using an asynchronous iterator and the
>>> # size of the content:
>>>
>>> async def sender():
>>>     for chunk in [b"part1", b"part2", b"part3"]:
>>>         yield chunk
>>> await bucket.write("entry-1", sender(), content_length=15)
Source code in reduct/bucket.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
async def write(
    self,
    entry_name: str,
    data: Union[bytes, AsyncIterator[bytes]],
    timestamp: Optional[Union[int, datetime, float, str]] = None,
    content_length: Optional[int] = None,
    **kwargs,
):
    """
    Write a record to entry

    Args:
        entry_name: name of entry in the bucket
        data: bytes to write or async iterator
        timestamp: timestamp of record. int (UNIX timestamp in microseconds),
            datetime, float (UNIX timestamp in seconds), str (ISO 8601 string).
            If None: current time
        content_length: content size in bytes,
            needed only when the data is an iterator
    Keyword Args:
        labels (dict): labels as key-values
        content_type (str): content type of data
    Raises:
        ReductError: if there is an HTTP error

    Examples:
        >>> await bucket.write("entry-1", b"some_data",
        >>>    timestamp="2021-09-10T10:30:00")
        >>>
        >>> # You can write data chunk-wise using an asynchronous iterator and the
        >>> # size of the content:
        >>>
        >>> async def sender():
        >>>     for chunk in [b"part1", b"part2", b"part3"]:
        >>>         yield chunk
        >>> await bucket.write("entry-1", sender(), content_length=15)

    """
    timestamp = unix_timestamp_from_any(
        timestamp if timestamp else int(time.time_ns() / 1000)
    )
    params = {"ts": timestamp}
    await self._http.request_all(
        "POST",
        f"/b/{self.name}/{entry_name}",
        params=params,
        data=data,
        content_length=content_length if content_length is not None else len(data),
        **kwargs,
    )

write_batch(entry_name, batch) async

Write a batch of records to entries in a sole request

Parameters:

Name Type Description Default
entry_name str

name of entry in the bucket

required
batch Batch

list of records

required

Returns: dict of errors with timestamps as keys Raises: ReductError: if there is an HTTP or communication error

Source code in reduct/bucket.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
async def write_batch(
    self, entry_name: str, batch: Batch
) -> Dict[int, ReductError]:
    """
    Write a batch of records to entries in a sole request

    Args:
        entry_name: name of entry in the bucket
        batch: list of records
    Returns:
        dict of errors with timestamps as keys
    Raises:
        ReductError: if there is an HTTP  or communication error
    """

    record_headers = {}
    content_length = 0
    for time_stamp, record in batch.items():
        content_length += record.size
        header = f"{record.size},{record.content_type}"
        for label, value in record.labels.items():
            if "," in label or "=" in label:
                header += f',{label}="{value}"'
            else:
                header += f",{label}={value}"

        record_headers[f"{TIME_PREFIX}{time_stamp}"] = header

    async def iter_body():
        for _, rec in batch.items():
            yield await rec.read_all()

    _, headers = await self._http.request_all(
        "POST",
        f"/b/{self.name}/{entry_name}/batch",
        data=iter_body(),
        extra_headers=record_headers,
        content_length=content_length,
    )

    errors = {}
    for key, value in headers.items():
        if key.startswith(ERROR_PREFIX):
            errors[int(key[len(ERROR_PREFIX) :])] = ReductError.from_header(value)

    return errors