foundryts.functions.periodic_aggregate¶
foundryts.functions.periodic_aggregate(aggregate, window, window_type='end', alignment_timestamp=0)¶
Returns a function that aggregates values over discrete, periodic windows for a single time series.
A periodic window divides the time series into windows of fixed durations. For each window, an aggregate function is applied to the points within that window. The result is a time series with values representing the aggregate for each window. Windows with no data points are not included in the output.
This method is useful for summarizing data over regular periods, such as generating hourly, daily, or weekly summaries from a continuous stream of data.
Aggregation functions supported:
| Aggregation function | Description |
|---|---|
| min | Smallest value within each periodic window. |
| max | Largest value within each periodic window. |
| count | Count of points within each periodic window. |
| sum | Sum of values within each periodic window. |
| product | Product of values within each periodic window. |
| mean | Average of values within each periodic window. |
| standard_deviation | Standard deviation of values within each periodic window. |
| difference | Difference between current first point’s value and the last point’s value in each periodic window, providing the relative change over the fixed window. |
| percent_change | Percent change from the first point’s value to the last point’s value within each periodic window, providing the relative rate of change within the fixed window. |
| first | First value within each periodic window. |
| last | Last value within each periodic window. |
- Parameters:
- aggregate (str) – Aggregation function to apply, use a valid option from the Aggregation Function table above.
- window (int , datetime.timedelta , str) – Duration of each periodic window, e.g.,
5ms, or5e6. -
window_type (str , optional) –
Type of window to apply (default is “end”):
start: Window is inclusive at the start and exclusive at the end. The timestamp of the aggregation is the start of the window. end: Window is exclusive at the start and inclusive at the end. The timestamp of the aggregation is the end of the window. * alignment_timestamp (str | float | datetime.datetime , optional) – The timestamp used to align the result, such that ticks in the result time series will lie at integer multiples of the window duration from the alignment timestamp (default is 0). * Returns: A function that takes a single time series as input and computes the specified aggregate for each periodic window. * Return type: (FunctionNode) -> FunctionNode
Dataframe schema¶
| Column name | Type | Description |
|---|---|---|
| timestamp | pandas.Timestamp | Timestamp of the point |
| value | Union[float, str] | Value of the point |
:::callout{theme="success" title="See Also"}
cumulative_aggregate(), rolling_aggregate()
:::
:::callout{theme="warning" title="Note"} This function is only applicable to numeric series. :::
Examples¶
>>> series = F.points(
... (1, 1.0),
... (101, 2.0),
... (200, 4.0),
... (201, 8.0),
... (299, 16.0),
... (300, 32.0),
... (1000, 64.0),
... (12345, 128.0),
... name="series",
... )
>>> series.to_pandas()
timestamp value
0 1970-01-01 00:00:00.000000001 1.0
1 1970-01-01 00:00:00.000000101 2.0
2 1970-01-01 00:00:00.000000200 4.0
3 1970-01-01 00:00:00.000000201 8.0
4 1970-01-01 00:00:00.000000299 16.0
5 1970-01-01 00:00:00.000000300 32.0
6 1970-01-01 00:00:00.000001000 64.0
7 1970-01-01 00:00:00.000012345 128.0
>>> periodic_difference = F.periodic_aggregate("difference", "100ns")(series) # window_type defaults to end
# 5 windows with (1,2,3,1,1) points:
# window 1: [(1, 1.0)]
# window 2: [
# (101, 2.0),
# (200, 4.0)
# ]
# window 3: [
# (201, 8.0),
# (200, 16.0),
# (300. 32.0)
# ]
# window 4: [(1000, 64.0)]
# window 5: [(12345, 128.0)]
>>> periodic_difference.to_pandas()
timestamp value
0 1970-01-01 00:00:00.000000100 0.0
1 1970-01-01 00:00:00.000000200 2.0
2 1970-01-01 00:00:00.000000300 24.0
3 1970-01-01 00:00:00.000001000 0.0
4 1970-01-01 00:00:00.000012400 0.0
>>> periodic_percentage_change = F.periodic_aggregate("percent_change", "100ns")(series) # window_type defaults to end
# 5 windows with (1,2,3,1,1) points:
# window 1: [(1, 1.0)]
# window 2: [
# (101, 2.0),
# (200, 4.0)
# ]
# window 3: [
# (201, 8.0),
# (200, 16.0),
# (300. 32.0)
# ]
# window 4: [(1000, 64.0)]
# window 5: [(12345, 128.0)]
>>> periodic_percentage_change.to_pandas()
timestamp value
0 1970-01-01 00:00:00.000000100 0.0
1 1970-01-01 00:00:00.000000200 1.0
2 1970-01-01 00:00:00.000000300 3.0
3 1970-01-01 00:00:00.000001000 0.0
4 1970-01-01 00:00:00.000012400 0.0
>>> periodic_percentage_change_window_start = F.periodic_aggregate(
... "percent_change", "100ns", "start"
... )(series)
# 6 windows with (1,1,3,1,1,1) points:
# window 0: [(1, 1.0)]
# window 1: [(101, 2.0)]
# window 2: [
# (200, 4.0),
# (201, 8.0),
# (299, 16.0)
# ]
# window 3: [(300, 32.0)]
# window 4: [(1000, 64.0)]
# window 5: [(12345, 128.0)]
>>> periodic_percentage_change_window_start.to_pandas()
timestamp value
0 1970-01-01 00:00:00.000000000 0.0
1 1970-01-01 00:00:00.000000100 0.0
2 1970-01-01 00:00:00.000000200 3.0
3 1970-01-01 00:00:00.000000300 0.0
4 1970-01-01 00:00:00.000001000 0.0
5 1970-01-01 00:00:00.000012300 0.0
>>> periodic_percentage_change_with_alignment = F.periodic_aggregate(
... "percent_change", "100ns", alignment_timestamp=50
... )(series)
# 6 windows with (1,1,2,2,1,1) points:
# window 0: [(1, 1.0)]
# window 1: [(101, 2.0)]
# window 2: [
# (200, 4.0),
# (201, 8.0)
# ]
# window 3: [
# (299, 16.0),
# (300, 32.0),
# ]
# window 5: [(1000, 64.0)]
# window 4: [(12345, 128.0)]
>>> periodic_percentage_change_with_alignment.to_pandas()
timestamp value
0 1970-01-01 00:00:00.000000050 0.0
1 1970-01-01 00:00:00.000000150 0.0
2 1970-01-01 00:00:00.000000250 1.0
3 1970-01-01 00:00:00.000000350 1.0
4 1970-01-01 00:00:00.000001050 0.0
5 1970-01-01 00:00:00.000012350 0.0
中文翻译¶
foundryts.functions.periodic_aggregate¶
foundryts.functions.periodic_aggregate(aggregate, window, window_type='end', alignment_timestamp=0)¶
返回一个函数,用于对单个时间序列在离散的周期性窗口(periodic window)内聚合数值。
周期性窗口将时间序列划分为固定时长的窗口。对于每个窗口,对该窗口内的数据点应用聚合函数。结果是一个时间序列,其值代表每个窗口的聚合结果。不包含任何数据点的窗口不会出现在输出中。
此方法适用于按固定周期汇总数据,例如从连续数据流中生成每小时、每天或每周的汇总。
支持的聚合函数:
| 聚合函数 | 描述 |
|---|---|
| min | 每个周期性窗口内的最小值。 |
| max | 每个周期性窗口内的最大值。 |
| count | 每个周期性窗口内的数据点数量。 |
| sum | 每个周期性窗口内值的总和。 |
| product | 每个周期性窗口内值的乘积。 |
| mean | 每个周期性窗口内值的平均值。 |
| standard_deviation | 每个周期性窗口内值的标准差。 |
| difference | 每个周期性窗口内当前第一个点的值与最后一个点的值之间的差值, 表示固定窗口内的相对变化。 |
| percent_change | 每个周期性窗口内从第一个点的值到最后一个点的值的百分比变化, 表示固定窗口内的相对变化率。 |
| first | 每个周期性窗口内的第一个值。 |
| last | 每个周期性窗口内的最后一个值。 |
- 参数:
- aggregate (str) – 要应用的聚合函数,请使用上方聚合函数表中的有效选项。
- window (int , datetime.timedelta , str) – 每个周期性窗口的时长,例如
5ms或5e6。 -
window_type (str , 可选) –
要应用的窗口类型(默认为 "end"):
start: 窗口左闭右开。聚合结果的时间戳为窗口的开始时间。 end: 窗口左开右闭。聚合结果的时间戳为窗口的结束时间。 * alignment_timestamp (str | float | datetime.datetime , 可选) – 用于对齐结果的时间戳,使得结果时间序列中的时间点位于该对齐时间戳的窗口时长整数倍处(默认为 0)。 * 返回: 一个函数,接收单个时间序列作为输入,并为每个周期性窗口计算指定的聚合值。 * 返回类型: (FunctionNode) -> FunctionNode
数据框模式(Dataframe schema)¶
| 列名 | 类型 | 描述 |
|---|---|---|
| timestamp | pandas.Timestamp | 数据点的时间戳 |
| value | Union[float, str] | 数据点的值 |
:::callout{theme="success" title="另请参阅"}
cumulative_aggregate(), rolling_aggregate()
:::
:::callout{theme="warning" title="注意"} 此函数仅适用于数值型序列。 :::
示例¶
>>> series = F.points(
... (1, 1.0),
... (101, 2.0),
... (200, 4.0),
... (201, 8.0),
... (299, 16.0),
... (300, 32.0),
... (1000, 64.0),
... (12345, 128.0),
... name="series",
... )
>>> series.to_pandas()
timestamp value
0 1970-01-01 00:00:00.000000001 1.0
1 1970-01-01 00:00:00.000000101 2.0
2 1970-01-01 00:00:00.000000200 4.0
3 1970-01-01 00:00:00.000000201 8.0
4 1970-01-01 00:00:00.000000299 16.0
5 1970-01-01 00:00:00.000000300 32.0
6 1970-01-01 00:00:00.000001000 64.0
7 1970-01-01 00:00:00.000012345 128.0
>>> periodic_difference = F.periodic_aggregate("difference", "100ns")(series) # window_type 默认为 end
# 5 个窗口,分别包含 (1,2,3,1,1) 个数据点:
# 窗口 1:[(1, 1.0)]
# 窗口 2:[
# (101, 2.0),
# (200, 4.0)
# ]
# 窗口 3:[
# (201, 8.0),
# (200, 16.0),
# (300. 32.0)
# ]
# 窗口 4:[(1000, 64.0)]
# 窗口 5:[(12345, 128.0)]
>>> periodic_difference.to_pandas()
timestamp value
0 1970-01-01 00:00:00.000000100 0.0
1 1970-01-01 00:00:00.000000200 2.0
2 1970-01-01 00:00:00.000000300 24.0
3 1970-01-01 00:00:00.000001000 0.0
4 1970-01-01 00:00:00.000012400 0.0
>>> periodic_percentage_change = F.periodic_aggregate("percent_change", "100ns")(series) # window_type 默认为 end
# 5 个窗口,分别包含 (1,2,3,1,1) 个数据点:
# 窗口 1:[(1, 1.0)]
# 窗口 2:[
# (101, 2.0),
# (200, 4.0)
# ]
# 窗口 3:[
# (201, 8.0),
# (200, 16.0),
# (300. 32.0)
# ]
# 窗口 4:[(1000, 64.0)]
# 窗口 5:[(12345, 128.0)]
>>> periodic_percentage_change.to_pandas()
timestamp value
0 1970-01-01 00:00:00.000000100 0.0
1 1970-01-01 00:00:00.000000200 1.0
2 1970-01-01 00:00:00.000000300 3.0
3 1970-01-01 00:00:00.000001000 0.0
4 1970-01-01 00:00:00.000012400 0.0
>>> periodic_percentage_change_window_start = F.periodic_aggregate(
... "percent_change", "100ns", "start"
... )(series)
# 6 个窗口,分别包含 (1,1,3,1,1,1) 个数据点:
# 窗口 0:[(1, 1.0)]
# 窗口 1:[(101, 2.0)]
# 窗口 2:[
# (200, 4.0),
# (201, 8.0),
# (299, 16.0)
# ]
# 窗口 3:[(300, 32.0)]
# 窗口 4:[(1000, 64.0)]
# 窗口 5:[(12345, 128.0)]
>>> periodic_percentage_change_window_start.to_pandas()
timestamp value
0 1970-01-01 00:00:00.000000000 0.0
1 1970-01-01 00:00:00.000000100 0.0
2 1970-01-01 00:00:00.000000200 3.0
3 1970-01-01 00:00:00.000000300 0.0
4 1970-01-01 00:00:00.000001000 0.0
5 1970-01-01 00:00:00.000012300 0.0
>>> periodic_percentage_change_with_alignment = F.periodic_aggregate(
... "percent_change", "100ns", alignment_timestamp=50
... )(series)
# 6 个窗口,分别包含 (1,1,2,2,1,1) 个数据点:
# 窗口 0:[(1, 1.0)]
# 窗口 1:[(101, 2.0)]
# 窗口 2:[
# (200, 4.0),
# (201, 8.0)
# ]
# 窗口 3:[
# (299, 16.0),
# (300, 32.0),
# ]
# 窗口 5:[(1000, 64.0)]
# 窗口 4:[(12345, 128.0)]
>>> periodic_percentage_change_with_alignment.to_pandas()
timestamp value
0 1970-01-01 00:00:00.000000050 0.0
1 1970-01-01 00:00:00.000000150 0.0
2 1970-01-01 00:00:00.000000250 1.0
3 1970-01-01 00:00:00.000000350 1.0
4 1970-01-01 00:00:00.000001050 0.0
5 1970-01-01 00:00:00.000012350 0.0