跳转至

foundryts.NodeCollection

class foundryts.NodeCollection(*nodes, **kwargs)

A collection of FunctionNode or SummarizerNode.

A NodeCollection is an iterable that can be passed to functions expecting multiple time series, or for mapping each node in the collection to a function.

For raw series and point sets, the dataframe of a NodeCollection contains an extra column called series denoting the series that the points belong to. The series value will be the series ID either set in the time series sync or via an alias for foundryts.functions.points().

:::callout{theme="warning" title="Note"} Ensure that you do not directly pass a NodeCollection to functions expecting a single input time series as this will error out the operation. For applying the same function on all elements of the node, use NodeCollection.map() which will map the transformed points or summary to the series in the final dataframe. Refer to FoundryTS functions documentation to review the number of inputs for functions. :::

Examples

>>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1")
>>> series_1.to_pandas()
                      timestamp  value
0 1970-01-01 00:00:00.000000001  100.0
1 1970-01-01 00:00:00.000000002  200.0
2 1970-01-01 00:00:00.000000003  300.0
>>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2")
>>> series_2.to_pandas()
                      timestamp  value
0 1970-01-01 00:00:00.000000001  200.0
1 1970-01-01 00:00:00.000000002  400.0
2 1970-01-01 00:00:00.000000003  600.0
>>> nc = NodeCollection([series_1, series_2])
>>> nc.to_pandas()
     series                     timestamp  value
0  series-1 1970-01-01 00:00:00.000000001  100.0
1  series-1 1970-01-01 00:00:00.000000002  200.0
2  series-1 1970-01-01 00:00:00.000000003  300.0
3  series-2 1970-01-01 00:00:00.000000001  200.0
4  series-2 1970-01-01 00:00:00.000000002  400.0
5  series-2 1970-01-01 00:00:00.000000003  600.0
>>> scatter_plt = F.scatter()(nc) # scatter() uses exactly 2 input time series
>>> scatter_plt.to_pandas()
   is_truncated  points.first_value  points.second_value              points.timestamp
0         False               100.0                200.0 1970-01-01 00:00:00.000000001
1         False               200.0                400.0 1970-01-01 00:00:00.000000002
2         False               300.0                600.0 1970-01-01 00:00:00.000000003
>>> scaled_nc = F.scale(10)(nc) # error - scale() works on a single input series
>>> scaled_nc = nc.map(F.scale(10)) # ok - we're mapping each series in the collection to the result of scale()
>>> scaled_nc.to_pandas()
     series                     timestamp   value
0  series-1 1970-01-01 00:00:00.000000001  1000.0
1  series-1 1970-01-01 00:00:00.000000002  2000.0
2  series-1 1970-01-01 00:00:00.000000003  3000.0
3  series-2 1970-01-01 00:00:00.000000001  2000.0
4  series-2 1970-01-01 00:00:00.000000002  4000.0
5  series-2 1970-01-01 00:00:00.000000003  6000.0

columns()

Returns a tuple of strings representing the column names of the pandas.DataFrame that would be produced by evaluating this collection to a dataframe.

:::callout{theme="warning" title="Note"} Keys of nested objects will be flattened into a tuple with nested keys joined with ..

Non-uniform collections will contain the union of all columns of each element in the collection. :::

  • Returns: Tuple containing names of the columns in the resulting dataframe which the collection gets evaluated to.
  • Return type: Tuple[str]

Examples

>>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1")
>>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2")
>>> nc = NodeCollection(series_1, series_2)
>>> nc.columns() # note the additional series column
('series', 'timestamp', 'value')
>>> dist = F.distribution()(nc)
>>> dist.columns()
('delta', 'distribution_values.count', 'distribution_values.end', 'distribution_values.start',
'end', 'end_timestamp', 'start', 'start_timestamp')
>>> mixed_nc = NodeCollection([F.distribution()(series_1), F.statistics()(series_2)])
('series', 'delta', 'distribution_values.count', 'distribution_values.end',
'distribution_values.start', 'end', 'end_timestamp', 'start', 'start_timestamp',
'count', 'earliest_point.timestamp', 'earliest_point.value', 'largest_point.timestamp',
'largest_point.value', 'latest_point.timestamp', 'latest_point.value', 'mean', 'smallest_point.timestamp',
'smallest_point.value')

map(func)

Map each time series in the collection to a output for the input function.

This is useful for applying the same set of queries to a collection of time-series together.

  • Parameters: func ( ( ) -> FunctionNode | SummarizerNode) – FoundryTS supported function to apply on each time series in the collection.
  • Returns: The updated NodeCollection with each item mapped to the corresponding output of applying the func.
  • Return type: Iterable[NodeCollection]

:::callout{theme="success" title="See Also"} NodeCollection.map_intervals() :::

Examples

>>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1")
>>> series_1.to_pandas()
                    timestamp  value
0 1970-01-01 00:00:00.000000001  100.0
1 1970-01-01 00:00:00.000000002  200.0
2 1970-01-01 00:00:00.000000003  300.0
>>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2")
>>> series_2.to_pandas()
                    timestamp  value
0 1970-01-01 00:00:00.000000001  200.0
1 1970-01-01 00:00:00.000000002  400.0
2 1970-01-01 00:00:00.000000003  600.0
>>> nc = NodeCollection([series_1, series_2])
>>> scaled_nc = nc.map(F.scale(10))
>>> scaled_nc.to_pandas()
     series                     timestamp   value
0  series-1 1970-01-01 00:00:00.000000001  1000.0
1  series-1 1970-01-01 00:00:00.000000002  2000.0
2  series-1 1970-01-01 00:00:00.000000003  3000.0
3  series-2 1970-01-01 00:00:00.000000001  2000.0
4  series-2 1970-01-01 00:00:00.000000002  4000.0
5  series-2 1970-01-01 00:00:00.000000003  6000.0
>>> mapped_summary_nc = nc.map(F.distribution())
>>> mapped_summary_nc.to_pandas()
     series  delta  distribution_values.count  distribution_values.end  distribution_values.start    end end_timestamp  start               start_timestamp
0  series-1   20.0                          1                    120.0                      100.0  300.0    2262-01-01  100.0 1677-09-21 00:12:43.145225216
1  series-1   20.0                          1                    200.0                      180.0  300.0    2262-01-01  100.0 1677-09-21 00:12:43.145225216
2  series-1   20.0                          1                    300.0                      280.0  300.0    2262-01-01  100.0 1677-09-21 00:12:43.145225216
3  series-2   40.0                          1                    240.0                      200.0  600.0    2262-01-01  200.0 1677-09-21 00:12:43.145225216
4  series-2   40.0                          1                    400.0                      360.0  600.0    2262-01-01  200.0 1677-09-21 00:12:43.145225216
5  series-2   40.0                          1                    600.0                      560.0  600.0    2262-01-01  200.0 1677-09-21 00:12:43.145225216

map_intervals(intervals, interval_name=None)

Creates a time range for all time series in the collection using the intervals.

Each interval is used to create a foundryts.functions.time_range() on the input time series, which can be used for further transformations and analysis. This is best used with creating Interval either manually or by converting the result of foundryts.functions.time_range() to Interval.

The resulting dataframe has additional columns for interval.start and interval.end.

  • Parameters:
  • intervals (Interval | List [Interval ]) – One or more intervals to create time ranges for all time series in the collection.
  • interval_name (str , optional) – Optional alias for the intervals column in the dataframe.
  • Returns: The updated NodeCollection with each item mapped to the corresponding output of applying the func.
  • Return type: Iterable[NodeCollection]

:::callout{theme="success" title="See Also"} foundryts.functions.time_series_search() :::

Examples

>>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1")
>>> series_1.to_pandas()
                      timestamp  value
0 1970-01-01 00:00:00.000000001  100.0
1 1970-01-01 00:00:00.000000002  200.0
2 1970-01-01 00:00:00.000000003  300.0
>>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2")
>>> series_2.to_pandas()
                      timestamp  value
0 1970-01-01 00:00:00.000000001  200.0
1 1970-01-01 00:00:00.000000002  400.0
2 1970-01-01 00:00:00.000000003  600.0
>>> nc = NodeCollection([series_1, series_2])
>>> from foundryts.core.interval import Interval
>>> intervals = [Interval(1, 2), Interval(2, 3), Interval(3, 4), Intervals(1,3)]
>>> nc = nc.map_intervals(intervals, interval_name="interval")
>>> nc.to_pandas()
     series interval  interval.start  interval.end                     timestamp  value
0  series-1                        1             2 1970-01-01 00:00:00.000000001  100.0
1  series-1                        2             3 1970-01-01 00:00:00.000000002  200.0
2  series-1                        3             4 1970-01-01 00:00:00.000000003  300.0
3  series-1                        1             3 1970-01-01 00:00:00.000000001  100.0
4  series-1                        1             3 1970-01-01 00:00:00.000000002  200.0
5  series-2                        1             2 1970-01-01 00:00:00.000000001  200.0
6  series-2                        2             3 1970-01-01 00:00:00.000000002  400.0
7  series-2                        3             4 1970-01-01 00:00:00.000000003  600.0
8  series-2                        1             3 1970-01-01 00:00:00.000000001  200.0
9  series-2                        1             3 1970-01-01 00:00:00.000000002  400.0
>>> scaled_nc = nc.map(F.scale(1000))
>>> scaled_nc.to_pandas() # applying the scale() function on each time range created from the intervals
     series interval  interval.start  interval.end                     timestamp     value
0  series-1                        1             2 1970-01-01 00:00:00.000000001  100000.0
1  series-1                        2             3 1970-01-01 00:00:00.000000002  200000.0
2  series-1                        3             4 1970-01-01 00:00:00.000000003  300000.0
3  series-1                        1             3 1970-01-01 00:00:00.000000001  100000.0
4  series-1                        1             3 1970-01-01 00:00:00.000000002  200000.0
5  series-2                        1             2 1970-01-01 00:00:00.000000001  200000.0
6  series-2                        2             3 1970-01-01 00:00:00.000000002  400000.0
7  series-2                        3             4 1970-01-01 00:00:00.000000003  600000.0
8  series-2                        1             3 1970-01-01 00:00:00.000000001  200000.0
9  series-2                        1             3 1970-01-01 00:00:00.000000002  400000.0

to_dataframe(numPartitions=16)

Evaluates all time series in the collection and concatenate the results to a pyspark.sql.DataFrame.

PySpark DataFrames enable distributed data processing and parallelized transformations. They can be useful when working with dataframes with a large number of rows, for example loading all the points in a raw series or the result of a FunctionNode, or evaluating the results of multiple SummarizerNode or FunctionNode together.

An additional series column will note which series the results belong to using the series ID or foundryts.functions.points() alias.

  • Parameters: numPartitions (int , optional) – Specifies the number of partitions for distributing the time series data in the collection across Spark executors, optimizing parallel data processing. Higher values can improve performance when the executor count is high, whereas lower values may be more efficient with fewer executors. Adjust this based on the size of the time series collection and your Spark configuration (e.g., number of executors and executor memory) (default is 16).
  • Returns: Output of all time series and operations in the collection evaluated to a PySpark dataframe.
  • Return type: pyspark.sql.DataFrame

:::callout{theme="success" title="See Also"} NodeCollection.to_pandas() :::

:::callout{theme="warning" title="Note"} Set numPartitions to an appropriate value for a large collection. :::

Examples

>>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1")
>>> series_1.to_pandas()
                      timestamp  value
0 1970-01-01 00:00:00.000000001  100.0
1 1970-01-01 00:00:00.000000002  200.0
2 1970-01-01 00:00:00.000000003  300.0
>>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2")
>>> series_2.to_pandas()
                      timestamp  value
0 1970-01-01 00:00:00.000000001  200.0
1 1970-01-01 00:00:00.000000002  400.0
2 1970-01-01 00:00:00.000000003  600.0
>>> nc = NodeCollection(series_1, series_2)
>>> nc.to_dataframe()
+---------+-----------------------------+-----+
|  series |                 timestamp   |value|
+---------+-----------------------------+-----+
|series-1 |1970-01-01 00:00:00.000000001|100.0|
|series-1 |1970-01-01 00:00:00.000000002|200.0|
|series-1 |1970-01-01 00:00:00.000000003|300.0|
|series-2 |1970-01-01 00:00:00.000000001|200.0|
|series-2 |1970-01-01 00:00:00.000000002|400.0|
|series-2 |1970-01-01 00:00:00.000000003|600.0|
+---------+-----------------------------+-----+

to_pandas(parallel=4, mode='thread')

Evaluates the time series queries in this collection and concatenates the results to a pandas.DataFrame.

Refer to FunctionNode.to_pandas() and SummarizerNode.to_pandas() for details on the shape of the dataframe.

An additional series column will note which series the results belong to using the series ID or foundryts.functions.points() alias.

  • Parameters:
  • parallel (int , optional) – Number of parallel threads or processes used to evaluate the time series queries. If set to 1, no multiprocessing is done (default is 4).
  • mode (str , optional) – Valid options are process or thread, each controlling the type of multiprocessing threadpool.
  • Returns: Output of the time series queries in the collection evaluated to a Pandas dataframe.
  • Return type: pd.DataFrame

:::callout{theme="success" title="See Also"} NodeCollection.to_dataframe() :::

:::callout{theme="warning" title="Note"} The result of each time series query in the collection is stored and concatenated to the final dataframe locally, so results must fit in memory.

Parallel execution will increase memory usage and resource consumption. Using process mode may introduce overhead. Excessive parallelism can degrade performance. :::

Examples

>>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1")
>>> series_1.to_pandas()
                      timestamp  value
0 1970-01-01 00:00:00.000000001  100.0
1 1970-01-01 00:00:00.000000002  200.0
2 1970-01-01 00:00:00.000000003  300.0
>>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2")
>>> series_2.to_pandas()
                      timestamp  value
0 1970-01-01 00:00:00.000000001  200.0
1 1970-01-01 00:00:00.000000002  400.0
2 1970-01-01 00:00:00.000000003  600.0
>>> nc = NodeCollection(series_1, series_2)
>>> nc.to_pandas()
      series                     timestamp  value
0  series-1 1970-01-01 00:00:00.000000001  100.0
1  series-1 1970-01-01 00:00:00.000000002  200.0
2  series-1 1970-01-01 00:00:00.000000003  300.0
3  series-2 1970-01-01 00:00:00.000000001  200.0
4  series-2 1970-01-01 00:00:00.000000002  400.0
5  series-2 1970-01-01 00:00:00.000000003  600.0

to_rdd(numPartitions=16)

Deprecated

Deprecated since version 1.0.0.

Return an RDD of (key, object) by evaluating the nodes in this collection. If the nodes in this NodeCollection resulted from grouping or windowing operations, the respective key will contain the given metadata keys or intervals. Otherwise it will be the node’s series identifier.

  • Parameters: numPartitions (int) – number of partitions to use for spark execution
  • Return type: pyspark.RDD

types()

Returns a tuple of types for the column of the pandas.DataFrame that would be produced by evaluating the collection to a dataframe.

  • Returns: Tuple containing types of the columns in the resulting dataframe which the collection gets evaluated to.
  • Return type: Tuple[Type]

:::callout{theme="warning" title="Note"} Non-uniform collections will contain the union of all types of each element in the collection. :::

Examples

>>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1")
>>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2")
>>> nc = NodeCollection(series_1, series_2)
>>> nc.types()
(<class 'str'>, <class 'int'>, <class 'float'>)
>>> dist = F.distribution()(nc)
>>> dist.types()
(<class 'float'>, <class 'int'>, <class 'float'>, <class 'float'>, <class 'float'>,
<class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'float'>,
<class 'pandas._libs.tslibs.timestamps.Timestamp'>)
>>> mixed_nc = NodeCollection([F.distribution()(series_1), F.statistics()(series_2)])
>>> print(mixed_nc.types())
(<class 'str'>, <class 'float'>, <class 'int'>, <class 'float'>, <class 'float'>,
<class 'float'>, <class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'float'>,
<class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'int'>,
<class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'float'>,
<class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'float'>,
<class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'float'>,
<class 'float'>, <class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'float'>)

中文翻译

foundryts.NodeCollection

class foundryts.NodeCollection(*nodes, **kwargs)

FunctionNodeSummarizerNode 的集合。

NodeCollection 是一个可迭代对象,可传递给需要多个时间序列(time series)的函数,或用于将集合中的每个节点映射到某个函数。

对于原始序列(raw series)和点集(point sets),NodeCollection 的数据帧(dataframe)包含一个名为 series 的额外列,用于标识点所属的序列。series 的值将是序列 ID,该 ID 可在时间序列同步中设置,或通过 foundryts.functions.points() 的别名设置。

:::callout{theme="warning" title="注意"} 请确保不要直接将 NodeCollection 传递给期望单个输入时间序列的函数,否则操作将报错。如需对节点的所有元素应用同一函数,请使用 NodeCollection.map(),该方法会将转换后的点或摘要映射到最终数据帧中的序列。请参阅 FoundryTS 函数文档以了解函数的输入数量。 :::

示例

>>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1")
>>> series_1.to_pandas()
                      timestamp  value
0 1970-01-01 00:00:00.000000001  100.0
1 1970-01-01 00:00:00.000000002  200.0
2 1970-01-01 00:00:00.000000003  300.0
>>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2")
>>> series_2.to_pandas()
                      timestamp  value
0 1970-01-01 00:00:00.000000001  200.0
1 1970-01-01 00:00:00.000000002  400.0
2 1970-01-01 00:00:00.000000003  600.0
>>> nc = NodeCollection([series_1, series_2])
>>> nc.to_pandas()
     series                     timestamp  value
0  series-1 1970-01-01 00:00:00.000000001  100.0
1  series-1 1970-01-01 00:00:00.000000002  200.0
2  series-1 1970-01-01 00:00:00.000000003  300.0
3  series-2 1970-01-01 00:00:00.000000001  200.0
4  series-2 1970-01-01 00:00:00.000000002  400.0
5  series-2 1970-01-01 00:00:00.000000003  600.0
>>> scatter_plt = F.scatter()(nc) # scatter() 精确使用 2 个输入时间序列
>>> scatter_plt.to_pandas()
   is_truncated  points.first_value  points.second_value              points.timestamp
0         False               100.0                200.0 1970-01-01 00:00:00.000000001
1         False               200.0                400.0 1970-01-01 00:00:00.000000002
2         False               300.0                600.0 1970-01-01 00:00:00.000000003
>>> scaled_nc = F.scale(10)(nc) # 错误 - scale() 作用于单个输入序列
>>> scaled_nc = nc.map(F.scale(10)) # 正确 - 我们将集合中的每个序列映射到 scale() 的结果
>>> scaled_nc.to_pandas()
     series                     timestamp   value
0  series-1 1970-01-01 00:00:00.000000001  1000.0
1  series-1 1970-01-01 00:00:00.000000002  2000.0
2  series-1 1970-01-01 00:00:00.000000003  3000.0
3  series-2 1970-01-01 00:00:00.000000001  2000.0
4  series-2 1970-01-01 00:00:00.000000002  4000.0
5  series-2 1970-01-01 00:00:00.000000003  6000.0

columns()

返回一个字符串元组,表示将该集合求值为数据帧后生成的 pandas.DataFrame 的列名。

:::callout{theme="warning" title="注意"} 嵌套对象的键将被展平为元组,嵌套键之间用 . 连接。

非均匀集合(non-uniform collections)将包含集合中每个元素的所有列的并集。 :::

  • 返回: 包含集合求值后生成的数据帧中列名的元组。
  • 返回类型: Tuple[str]

示例

>>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1")
>>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2")
>>> nc = NodeCollection(series_1, series_2)
>>> nc.columns() # 注意额外的 series 列
('series', 'timestamp', 'value')
>>> dist = F.distribution()(nc)
>>> dist.columns()
('delta', 'distribution_values.count', 'distribution_values.end', 'distribution_values.start',
'end', 'end_timestamp', 'start', 'start_timestamp')
>>> mixed_nc = NodeCollection([F.distribution()(series_1), F.statistics()(series_2)])
('series', 'delta', 'distribution_values.count', 'distribution_values.end',
'distribution_values.start', 'end', 'end_timestamp', 'start', 'start_timestamp',
'count', 'earliest_point.timestamp', 'earliest_point.value', 'largest_point.timestamp',
'largest_point.value', 'latest_point.timestamp', 'latest_point.value', 'mean', 'smallest_point.timestamp',
'smallest_point.value')

map(func)

将集合中的每个时间序列映射到输入函数的输出。

这对于将同一组查询应用于时间序列集合非常有用。

  • 参数: func ( ( ) -> FunctionNode | SummarizerNode) – 要应用于集合中每个时间序列的 FoundryTS 支持函数。
  • 返回: 更新后的 NodeCollection,其中每个项已映射到应用 func 后的相应输出。
  • 返回类型: Iterable[NodeCollection]

:::callout{theme="success" title="另请参阅"} NodeCollection.map_intervals() :::

示例

>>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1")
>>> series_1.to_pandas()
                    timestamp  value
0 1970-01-01 00:00:00.000000001  100.0
1 1970-01-01 00:00:00.000000002  200.0
2 1970-01-01 00:00:00.000000003  300.0
>>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2")
>>> series_2.to_pandas()
                    timestamp  value
0 1970-01-01 00:00:00.000000001  200.0
1 1970-01-01 00:00:00.000000002  400.0
2 1970-01-01 00:00:00.000000003  600.0
>>> nc = NodeCollection([series_1, series_2])
>>> scaled_nc = nc.map(F.scale(10))
>>> scaled_nc.to_pandas()
     series                     timestamp   value
0  series-1 1970-01-01 00:00:00.000000001  1000.0
1  series-1 1970-01-01 00:00:00.000000002  2000.0
2  series-1 1970-01-01 00:00:00.000000003  3000.0
3  series-2 1970-01-01 00:00:00.000000001  2000.0
4  series-2 1970-01-01 00:00:00.000000002  4000.0
5  series-2 1970-01-01 00:00:00.000000003  6000.0
>>> mapped_summary_nc = nc.map(F.distribution())
>>> mapped_summary_nc.to_pandas()
     series  delta  distribution_values.count  distribution_values.end  distribution_values.start    end end_timestamp  start               start_timestamp
0  series-1   20.0                          1                    120.0                      100.0  300.0    2262-01-01  100.0 1677-09-21 00:12:43.145225216
1  series-1   20.0                          1                    200.0                      180.0  300.0    2262-01-01  100.0 1677-09-21 00:12:43.145225216
2  series-1   20.0                          1                    300.0                      280.0  300.0    2262-01-01  100.0 1677-09-21 00:12:43.145225216
3  series-2   40.0                          1                    240.0                      200.0  600.0    2262-01-01  200.0 1677-09-21 00:12:43.145225216
4  series-2   40.0                          1                    400.0                      360.0  600.0    2262-01-01  200.0 1677-09-21 00:12:43.145225216
5  series-2   40.0                          1                    600.0                      560.0  600.0    2262-01-01  200.0 1677-09-21 00:12:43.145225216

map_intervals(intervals, interval_name=None)

使用区间(intervals)为集合中的所有时间序列创建时间范围。

每个区间用于在输入时间序列上创建一个 foundryts.functions.time_range(),可用于进一步的转换和分析。此方法最适合与创建 Interval 配合使用,可以手动创建,也可以通过将 foundryts.functions.time_range() 的结果转换为 Interval 来创建。

生成的数据帧包含 interval.startinterval.end 的额外列。

  • 参数:
  • intervals (Interval | List [Interval ]) – 一个或多个区间,用于为集合中的所有时间序列创建时间范围。
  • interval_name (str , 可选) – 数据帧中区间列的可选别名。
  • 返回: 更新后的 NodeCollection,其中每个项已映射到应用 func 后的相应输出。
  • 返回类型: Iterable[NodeCollection]

:::callout{theme="success" title="另请参阅"} foundryts.functions.time_series_search() :::

示例

>>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1")
>>> series_1.to_pandas()
                      timestamp  value
0 1970-01-01 00:00:00.000000001  100.0
1 1970-01-01 00:00:00.000000002  200.0
2 1970-01-01 00:00:00.000000003  300.0
>>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2")
>>> series_2.to_pandas()
                      timestamp  value
0 1970-01-01 00:00:00.000000001  200.0
1 1970-01-01 00:00:00.000000002  400.0
2 1970-01-01 00:00:00.000000003  600.0
>>> nc = NodeCollection([series_1, series_2])
>>> from foundryts.core.interval import Interval
>>> intervals = [Interval(1, 2), Interval(2, 3), Interval(3, 4), Intervals(1,3)]
>>> nc = nc.map_intervals(intervals, interval_name="interval")
>>> nc.to_pandas()
     series interval  interval.start  interval.end                     timestamp  value
0  series-1                        1             2 1970-01-01 00:00:00.000000001  100.0
1  series-1                        2             3 1970-01-01 00:00:00.000000002  200.0
2  series-1                        3             4 1970-01-01 00:00:00.000000003  300.0
3  series-1                        1             3 1970-01-01 00:00:00.000000001  100.0
4  series-1                        1             3 1970-01-01 00:00:00.000000002  200.0
5  series-2                        1             2 1970-01-01 00:00:00.000000001  200.0
6  series-2                        2             3 1970-01-01 00:00:00.000000002  400.0
7  series-2                        3             4 1970-01-01 00:00:00.000000003  600.0
8  series-2                        1             3 1970-01-01 00:00:00.000000001  200.0
9  series-2                        1             3 1970-01-01 00:00:00.000000002  400.0
>>> scaled_nc = nc.map(F.scale(1000))
>>> scaled_nc.to_pandas() # 对从区间创建的每个时间范围应用 scale() 函数
     series interval  interval.start  interval.end                     timestamp     value
0  series-1                        1             2 1970-01-01 00:00:00.000000001  100000.0
1  series-1                        2             3 1970-01-01 00:00:00.000000002  200000.0
2  series-1                        3             4 1970-01-01 00:00:00.000000003  300000.0
3  series-1                        1             3 1970-01-01 00:00:00.000000001  100000.0
4  series-1                        1             3 1970-01-01 00:00:00.000000002  200000.0
5  series-2                        1             2 1970-01-01 00:00:00.000000001  200000.0
6  series-2                        2             3 1970-01-01 00:00:00.000000002  400000.0
7  series-2                        3             4 1970-01-01 00:00:00.000000003  600000.0
8  series-2                        1             3 1970-01-01 00:00:00.000000001  200000.0
9  series-2                        1             3 1970-01-01 00:00:00.000000002  400000.0

to_dataframe(numPartitions=16)

求值集合中的所有时间序列,并将结果连接成一个 pyspark.sql.DataFrame

PySpark DataFrame 支持分布式数据处理和并行化转换。当处理具有大量行的数据帧时,例如加载原始序列中的所有点或 FunctionNode 的结果,或同时求值多个 SummarizerNodeFunctionNode 的结果时,它们非常有用。

一个额外的 series 列将使用序列 ID 或 foundryts.functions.points() 别名来标识结果属于哪个序列。

  • 参数: numPartitions (int , 可选) – 指定用于在 Spark 执行器之间分布集合中时间序列数据的分区数,以优化并行数据处理。当执行器数量较多时,较高的值可以提高性能,而较低的值在执行器较少时可能更高效。根据时间序列集合的大小和 Spark 配置(例如执行器数量和执行器内存)调整此参数(默认为 16)。
  • 返回: 集合中所有时间序列和操作求值后输出到 PySpark 数据帧的结果。
  • 返回类型: pyspark.sql.DataFrame

:::callout{theme="success" title="另请参阅"} NodeCollection.to_pandas() :::

:::callout{theme="warning" title="注意"} 对于大型集合,请将 numPartitions 设置为适当的值。 :::

示例

>>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1")
>>> series_1.to_pandas()
                      timestamp  value
0 1970-01-01 00:00:00.000000001  100.0
1 1970-01-01 00:00:00.000000002  200.0
2 1970-01-01 00:00:00.000000003  300.0
>>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2")
>>> series_2.to_pandas()
                      timestamp  value
0 1970-01-01 00:00:00.000000001  200.0
1 1970-01-01 00:00:00.000000002  400.0
2 1970-01-01 00:00:00.000000003  600.0
>>> nc = NodeCollection(series_1, series_2)
>>> nc.to_dataframe()
+---------+-----------------------------+-----+
|  series |                 timestamp   |value|
+---------+-----------------------------+-----+
|series-1 |1970-01-01 00:00:00.000000001|100.0|
|series-1 |1970-01-01 00:00:00.000000002|200.0|
|series-1 |1970-01-01 00:00:00.000000003|300.0|
|series-2 |1970-01-01 00:00:00.000000001|200.0|
|series-2 |1970-01-01 00:00:00.000000002|400.0|
|series-2 |1970-01-01 00:00:00.000000003|600.0|
+---------+-----------------------------+-----+

to_pandas(parallel=4, mode='thread')

求值此集合中的时间序列查询,并将结果连接成一个 pandas.DataFrame

有关数据帧形状的详细信息,请参阅 FunctionNode.to_pandas()SummarizerNode.to_pandas()

一个额外的 series 列将使用序列 ID 或 foundryts.functions.points() 别名来标识结果属于哪个序列。

  • 参数:
  • parallel (int , 可选) – 用于求值时间序列查询的并行线程或进程数。如果设置为 1,则不进行多处理(默认为 4)。
  • mode (str , 可选) – 有效选项为 processthread,分别控制多处理线程池的类型。
  • 返回: 集合中时间序列查询求值后输出到 Pandas 数据帧的结果。
  • 返回类型: pd.DataFrame

:::callout{theme="success" title="另请参阅"} NodeCollection.to_dataframe() :::

:::callout{theme="warning" title="注意"} 集合中每个时间序列查询的结果都会在本地存储并连接到最终数据帧中,因此结果必须适合内存。

并行执行会增加内存使用和资源消耗。使用 process 模式可能会引入开销。过度的并行化可能会降低性能。 :::

示例

>>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1")
>>> series_1.to_pandas()
                      timestamp  value
0 1970-01-01 00:00:00.000000001  100.0
1 1970-01-01 00:00:00.000000002  200.0
2 1970-01-01 00:00:00.000000003  300.0
>>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2")
>>> series_2.to_pandas()
                      timestamp  value
0 1970-01-01 00:00:00.000000001  200.0
1 1970-01-01 00:00:00.000000002  400.0
2 1970-01-01 00:00:00.000000003  600.0
>>> nc = NodeCollection(series_1, series_2)
>>> nc.to_pandas()
      series                     timestamp  value
0  series-1 1970-01-01 00:00:00.000000001  100.0
1  series-1 1970-01-01 00:00:00.000000002  200.0
2  series-1 1970-01-01 00:00:00.000000003  300.0
3  series-2 1970-01-01 00:00:00.000000001  200.0
4  series-2 1970-01-01 00:00:00.000000002  400.0
5  series-2 1970-01-01 00:00:00.000000003  600.0

to_rdd(numPartitions=16)

已弃用

自版本 1.0.0 起已弃用。

通过求值此集合中的节点,返回一个 (key, object) 的 RDD。如果此 NodeCollection 中的节点来自分组或窗口操作,则相应的键将包含给定的元数据键或区间。否则,键将是节点的序列标识符。

  • 参数: numPartitions (int) – 用于 Spark 执行的分区数
  • 返回类型: pyspark.RDD

types()

返回一个类型元组,表示将集合求值为数据帧后生成的 pandas.DataFrame 的列类型。

  • 返回: 包含集合求值后生成的数据帧中列类型的元组。
  • 返回类型: Tuple[Type]

:::callout{theme="warning" title="注意"} 非均匀集合将包含集合中每个元素的所有类型的并集。 :::

示例

>>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1")
>>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2")
>>> nc = NodeCollection(series_1, series_2)
>>> nc.types()
(<class 'str'>, <class 'int'>, <class 'float'>)
>>> dist = F.distribution()(nc)
>>> dist.types()
(<class 'float'>, <class 'int'>, <class 'float'>, <class 'float'>, <class 'float'>,
<class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'float'>,
<class 'pandas._libs.tslibs.timestamps.Timestamp'>)
>>> mixed_nc = NodeCollection([F.distribution()(series_1), F.statistics()(series_2)])
>>> print(mixed_nc.types())
(<class 'str'>, <class 'float'>, <class 'int'>, <class 'float'>, <class 'float'>,
<class 'float'>, <class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'float'>,
<class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'int'>,
<class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'float'>,
<class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'float'>,
<class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'float'>,
<class 'float'>, <class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'float'>)