Skip to content

sgnts.pipeline

TSPipeline: a time-series-aware Pipeline for SGN-TS.

Subclasses sgn's Pipeline to expose time-series configuration — in particular the stride that all sources will use.

Example usage

from sgnts.pipeline import TSPipeline

0.5-second stride instead of the default 1 second

pipeline = TSPipeline(stride=0.5) pipeline.connect(src, snk).run()

TSPipeline

Bases: Pipeline


              flowchart TD
              sgnts.pipeline.TSPipeline[TSPipeline]

              

              click sgnts.pipeline.TSPipeline href "" "sgnts.pipeline.TSPipeline"
            

A Pipeline with time-series stride configuration.

This subclass of :class:sgn.apps.Pipeline provides a convenient way to set the application stride — the amount of data (in seconds) that each source produces per iteration. Under the hood this sets :pyattr:Offset.SAMPLE_STRIDE_AT_MAX_RATE.

Parameters:

Name Type Description Default
stride float | None

float, the stride in seconds that every source in the pipeline will produce per frame. Must be positive and must map to an integer number of samples at Offset.MAX_RATE. Defaults to Offset.SAMPLE_STRIDE_AT_MAX_RATE / Offset.MAX_RATE (typically 1 second).

None
Source code in src/sgnts/pipeline.py
class TSPipeline(Pipeline):
    """A Pipeline with time-series stride configuration.

    This subclass of :class:`sgn.apps.Pipeline` provides a convenient
    way to set the application stride — the amount of data (in seconds)
    that each source produces per iteration.  Under the hood this sets
    :pyattr:`Offset.SAMPLE_STRIDE_AT_MAX_RATE`.

    Args:
        stride:
            float, the stride in seconds that every source in the
            pipeline will produce per frame.  Must be positive and must
            map to an integer number of samples at ``Offset.MAX_RATE``.
            Defaults to ``Offset.SAMPLE_STRIDE_AT_MAX_RATE /
            Offset.MAX_RATE`` (typically 1 second).
    """

    def __init__(self, stride: float | None = None) -> None:
        super().__init__()

        if stride is not None:
            if stride <= 0:
                raise ValueError(f"stride must be positive, got {stride}")

            stride_in_offsets = Offset.fromsec(stride)
            if stride_in_offsets <= 0:
                raise ValueError(
                    f"stride={stride} seconds is too small to represent "
                    f"at MAX_RATE={Offset.MAX_RATE}"
                )

            Offset.SAMPLE_STRIDE_AT_MAX_RATE = stride_in_offsets