Streaming stateful transformations(流式有状态转换)¶
Foundry pipelines provide stateful data streaming transformations that enable complex data transform behavior at fast streaming speeds.
What is state?¶
In data streaming, a data transformation is stateful when each output row may depend on information contained in previously processed rows. The information that persists between rows is called state. The state may be accessed and mutated by each row.
An example of a stateful transform is a sum aggregate transform. Consider the following sample table, where a sum aggregate transform can be used to calculate the number of sensor readings with the value hot on each day. More recently streamed rows are higher in the table.
| Day | SensorReading | Timestamp |
|---|---|---|
| Monday | hot | 3 |
| Monday | cold | 2 |
| Monday | hot | 1 |
A stateful sum aggregate transform that computes the live running number of hot readings for each day may have output that looks like the following:
| Day | SensorReading | Timestamp | State |
|---|---|---|---|
| Monday | hot | 3 | 2 |
| Monday | cold | 2 | 1 |
| Monday | hot | 1 | 1 |
Stateful transforms may store state of any serializable type including entire rows, which can enable them to achieve complex behavior. State transforms in Pipeline Builder are pre-built and automatically handle the state type and how the state type evolves.
The content of the state is not always accessible to the user. State can be used to enable backend processing behavior. For example, reading a stream input source in a data pipeline Flink job will store the state of the last offset for each partition; this enables the job to recover from the last successful point after a failure or restart.
Why is state powerful in data streaming?¶
Foundry data streaming uses the Flink architecture to provide low-latency data pipelining; each row is computed and passed downstream to the next operation immediately after processing. Unlike batch transformations, streaming transformations determine the next output one row at a time, without a full view of the data.
For non-stateful (also known as stateless) streaming transformations, this architecture means that transform logic can only depend on one row at a time. For example, a stateless transform could always add 5 to an integer column.
By contrast, stateful streaming transforms have access to persistent data about previous rows while continuing to be able to process rows one at a time and immediately as they arrive.
Foundry stateful streaming provides the option for exactly-once guarantees, which is the default pipeline configuration. When selected, rows that cause the state to mutate are guaranteed to mutate the state exactly once, and in-order by key. This enables precise and complex data streaming behaviors.
For instance, if you intend to sum, your sum will always be accurate even if your stream restarts or has a failure. If using a sort, the sort will always produce exactly one output per input, even if you restart the job mid-sort after the job has produced a partial sorted output that is not live.
Keyed state¶
All Pipeline Builder stateful streaming transformations use keyed state and require the user to specify partition key columns. Stateful transforms are processed separately for rows with different values for key columns. This allows the backend to parallelize processing and scale to large data volumes.
For example, consider the stateful sum aggregate example, computing the live running count of hot readings each day. Note that in this example, the Day column is used as the partition key.
| Day | SensorReading | Timestamp | State |
|---|---|---|---|
| Tuesday | hot | 5 | 1 |
| Monday | hot | 4 | 2 |
| Tuesday | cold | 3 | 0 |
| Monday | cold | 2 | 1 |
| Monday | hot | 1 | 1 |
Notice how state is computed independently for rows with Day value Monday and those with value Tuesday. The occurrence of rows with key value Tuesday do not affect what is stored for Monday, and the state for these keys would be unaffected if more rows arrived with a different value for the Day column, like Wednesday.
Keys should be chosen carefully as keys that result in inefficiently distributed records can artificially increase load and limit throughput. See streaming keys best practices.
Event time and watermarks¶
Stateful streaming transforms often rely on timing information. Because streams are ongoing and may at any moment receive a new real-time row, it often makes sense to group together temporally near rows. For instance, the Outer Caching Join Transform joins rows from two input streams together only when they share values for join columns and when the row timestamps are within an expiry limit. Pipeline Builder streaming uses Flink event time to achieve stateful transformations in a close-to-deterministic way that will have the same or very similar output upon replay.
Stateful transforms in Pipeline Builder that perform a time-based operation require the Assign Timestamps And Watermarks transform upstream and will produce a validation error if it is missing in your pipeline graph. Assign timestamps and watermarks assigns each row an "event time," usually a timestamp column contained in the row. The watermark is each transform operation's mostly deterministic sense of "what is the current time," and is a monotonically increasing value that closely follows behind the maximum event time value seen in any input row to that operator. For example, when an Outer Caching Join determines if entry in cache has expired, it checks if the watermark is greater than or equal to the expiry time, which will be true only when the join has received an input row with at least that event time.
For a transform operator with a single stream input, the watermark is the minimum of the maximum event times row seen by each parallel instance. For a transform with more than one stream input, the watermark is the minimum of the watermarks of the inputs.
Replay will result in similar, but sometimes slightly different, outputs. This is because upon replay, different partition keys may be assigned different Flink parallel instances, and operators with multiple inputs might have be processed upstream at different rates. Flink processing time is unsupported and not recommended since it can produce significantly different and potentially unintuitive results upon replay.
If a parallel instance is not receiving records, it will not produce a watermark. This will keep the overall watermark of the transform operator from advancing and is often a symptom of poorly distributed keys, which has important implications for state expiry and windows. To resolve this, configure stream idleness in Pipeline Builder.
If a parallel instance does not receive records for the configured amount of processing time, then its watermark will no longer be considered for the computation of the overall transform operator watermark. While this configuration can resolve the issue of stalled watermarks, setting this timeout too short can cause slower instances to be marked as idle erroneously. Faster instances will then progress the overall transform operator watermark, which can lead to more dropped records.
State expiry¶
Storing large state can be lead to performance bottlenecks that can impact negatively throughput and latency, so Pipeline Builder requires the user to limit state size.
Typically, state is limited through user-provided cache time expiry. For stateful transforms that require a cache time parameter, state is usually stored in state cache for each key until the watermark is beyond the last event time seen for that key, plus the expiry.
In the case of a stalled watermark, stateful transforms will not evict state promptly. This can lead to unexpected outputs and unbounded state growth.
Windows and triggers¶
The Aggregate Over Window transform allows user to set windows, which are strategies for grouping rows and their state together, as well as triggers, which are strategies for when the aggregate should produce output.
Windows¶
The currently supported windows are:
- Tumbling event time: Divides time into fixed-length, non-overlapping, continuous intervals. Rows with the same key and an event time that falls within the same interval are grouped together. For example, you can group together all rows on the same date with event time in the same hour of the day.
- Count: Given a user-specified count n, for each key, groups together the most recent n rows with that key.
- Session: Groups together rows that are part of the same session. Rows are in the same session if they share a key and there is no break of rows with that key in event time for more than the user-specified session gap. For example, in a dataset containing data about streaming platform user actions, you can group together all rows for one user workflow until the user takes a break.
Windows that depend on time (such as the tumbling event time window and session window) will eventually close once the watermark advances far enough.
- If the allowed lateness is not set or is zero, the windows stay open until the watermark passes the end time of the window, at which point the window closes, may produce output, and deletes its state.
- If allowed lateness is specified, the window will stay open until the watermark passes the end of the window plus allowed lateness. This allows late arriving or out of order records to still be part of a window even if the watermark is past the end of the window.
- Rows that arrive when the watermark is past the end of the window plus the allowed lateness will always be dropped because the window has already been closed and its state deleted.
Windows that depend on time also allow specification of a custom trigger.
In the case of a stalled watermark, windows will stay open for much longer. This can lead to rows not being emitted, unbounded state growth, or missing trigger fires.
Triggers¶
The currently supported triggers are:
- After watermark trigger: Causes the window to output when the watermark passes the end of the window. Allows specification of other custom triggers for when the watermark is before the end of the window, and when the watermark is after the end of the window (when the window is still alive due to its allowed lateness). For example, a user may want no outputs until the window closes, but want to see every output for late arrive records in the allowed lateness period.
- Count trigger: Given a user specified count n, causes the window to output for each key after it receives a multiple of n rows.
- Window close trigger: Fires an output only as the window closes and deletes state. Will fire only exactly once per window and only at the end of the window.
Stateful streaming best practices¶
Large state can have negative performance implications, so when designing stateful pipelines it is recommended to use as "tight" of a state expiry policy as possible. This usually means not setting cache time expiry to be larger than necessary, nor setting a count larger than necessary for count windows.
For pipelines that require large state, performance (including throughput, checkpoint duration, and latency) scales with the parallelism of the Flink job. Parallelism can be edited in streaming pipeline settings, where larger parallelism allows for increased data processing capacity and increased state read and write speed.
Appropriate keys should be chosen for stateful transforms, because too many values for key columns or imbalances in row distribution can lead to bottlenecks or trouble scaling.
中文翻译¶
流式有状态转换¶
Foundry 流水线提供有状态数据流转换功能,能够在快速流式处理速度下实现复杂的数据转换行为。
什么是状态?¶
在数据流处理中,当每个输出行可能依赖于先前处理行中包含的信息时,该数据转换被称为有状态的。在行之间持久保存的信息称为状态。每一行都可以访问和修改该状态。
有状态转换的一个示例是求和聚合转换。考虑以下示例表,其中可以使用求和聚合转换来计算每天传感器读数为hot的次数。最近流式传输的行在表中位置更高。
| 天 | 传感器读数 | 时间戳 |
|---|---|---|
| 星期一 | hot | 3 |
| 星期一 | cold | 2 |
| 星期一 | hot | 1 |
一个计算每天hot读数实时累计数量的有状态求和聚合转换,其输出可能如下所示:
| 天 | 传感器读数 | 时间戳 | 状态 |
|---|---|---|---|
| 星期一 | hot | 3 | 2 |
| 星期一 | cold | 2 | 1 |
| 星期一 | hot | 1 | 1 |
有状态转换可以存储任何可序列化类型的状态,包括整行数据,这使它们能够实现复杂的行为。Pipeline Builder 中的状态转换是预构建的,会自动处理状态类型及其演变方式。
用户并不总能访问状态的内容。状态可用于启用后端处理行为。例如,在数据流水线 Flink 作业中读取流输入源时,会存储每个分区最后偏移量的状态;这使得作业在发生故障或重启后能够从最后一个成功点恢复。
为什么状态在数据流处理中如此强大?¶
Foundry 数据流处理使用 Flink 架构来提供低延迟的数据流水线处理;每一行在计算后立即传递到下游的下一个操作。与批处理转换不同,流式转换一次只确定一个输出行,而无需查看数据的全貌。
对于无状态(也称为stateless)流式转换,这种架构意味着转换逻辑一次只能依赖一行数据。例如,一个无状态转换可以始终向整数列加5。
相比之下,有状态流式转换可以访问关于先前行的持久数据,同时仍然能够逐行处理数据并在数据到达时立即处理。
Foundry 有状态流式处理提供精确一次语义保证选项,这是默认的流水线配置。当选择此选项时,导致状态发生变化的数据行保证恰好改变状态一次,并且按键有序排列。这使得精确且复杂的数据流处理行为成为可能。
例如,如果您打算求和,即使流重启或发生故障,您的求和结果也始终是准确的。如果使用排序,即使作业在排序过程中因产生部分排序输出(尚未生效)而重启,排序也始终为每个输入产生恰好一个输出。
键控状态¶
所有 Pipeline Builder 有状态流式转换都使用键控状态,并要求用户指定分区键列。对于键列值不同的行,有状态转换会分别进行处理。这使得后端能够并行处理并扩展到大数据量。
例如,考虑有状态求和聚合示例,计算每天hot读数的实时累计数量。请注意,在此示例中,Day列用作分区键。
| 天 | 传感器读数 | 时间戳 | 状态 |
|---|---|---|---|
| 星期二 | hot | 5 | 1 |
| 星期一 | hot | 4 | 2 |
| 星期二 | cold | 3 | 0 |
| 星期一 | cold | 2 | 1 |
| 星期一 | hot | 1 | 1 |
请注意,对于Day值为Monday和Tuesday的行,状态是独立计算的。键值为Tuesday的行的出现不会影响为Monday存储的内容,并且如果更多具有不同Day列值(如Wednesday)的行到达,这些键的状态也不会受到影响。
应谨慎选择键,因为导致记录分布不均的键可能会人为增加负载并限制吞吐量。请参阅流式键最佳实践。
事件时间和水位线¶
有状态流式转换通常依赖于时间信息。由于流是持续进行的,并且可能随时接收到新的实时行,因此将时间上接近的行分组通常是有意义的。例如,外部缓存连接转换仅当两个输入流的行共享连接列的值且行时间戳在过期限制内时,才将它们连接在一起。Pipeline Builder 流式处理使用 Flink 事件时间以接近确定性的方式实现有状态转换,在重放时会产生相同或非常相似的输出。
Pipeline Builder 中执行基于时间操作的有状态转换需要上游的分配时间戳和水位线转换,如果流水线图中缺少该转换,则会生成验证错误。分配时间戳和水位线为每一行分配一个"事件时间",通常是行中包含的时间戳列。水位线是每个转换操作对"当前时间"的近乎确定性的感知,是一个单调递增的值,紧密跟随该操作符在任何输入行中看到的最大事件时间值。例如,当外部缓存连接确定缓存中的条目是否已过期时,它会检查水位线是否大于或等于过期时间,只有当连接接收到至少具有该事件时间的输入行时,该条件才为真。
对于具有单个流输入的转换操作符,水位线是每个并行实例看到的最大事件时间行的最小值。对于具有多个流输入的转换,水位线是输入的水位线的最小值。
重放会产生相似但有时略有不同的输出。这是因为在重放时,不同的分区键可能被分配给不同的 Flink 并行实例,并且具有多个输入的操作符可能以不同的速率在上游被处理。不支持也不推荐使用 Flink 处理时间,因为它可能在重放时产生显著不同且可能违反直觉的结果。
如果某个并行实例没有接收到记录,它将不会产生水位线。这将阻止转换操作符的整体水位线前进,通常是键分布不均的症状,这对状态过期和窗口有重要影响。要解决此问题,请在 Pipeline Builder 中配置流空闲状态。
如果某个并行实例在配置的处理时间内没有接收到记录,则其水位线将不再被考虑用于计算整体转换操作符的水位线。虽然此配置可以解决水位线停滞的问题,但将此超时设置得太短可能会导致较慢的实例被错误地标记为空闲。较快的实例随后会推进整体转换操作符的水位线,这可能导致更多记录被丢弃。
状态过期¶
存储大量状态可能导致性能瓶颈,对吞吐量和延迟产生负面影响,因此 Pipeline Builder 要求用户限制状态大小。
通常,通过用户提供的缓存时间过期来限制状态。对于需要缓存时间参数的有状态转换,状态通常存储在状态的cache中,针对每个键,直到水位线超过该键最后看到的事件时间加上过期时间。
在水位线停滞的情况下,有状态转换不会及时驱逐状态。这可能导致意外输出和无限制的状态增长。
窗口和触发器¶
窗口聚合转换允许用户设置窗口(用于将行及其状态分组在一起的策略)以及触发器(用于确定聚合何时应产生输出的策略)。
窗口¶
当前支持的窗口包括:
- 滚动事件时间: 将时间划分为固定长度、不重叠、连续的间隔。具有相同键且事件时间落在同一间隔内的行被分组在一起。例如,您可以将同一日期内事件时间在同一小时内的所有行分组在一起。
- 计数: 给定用户指定的计数 n,对于每个键,将具有该键的最新 n 行分组在一起。
- 会话: 将属于同一会话的行分组在一起。如果行共享一个键,并且事件时间中没有超过用户指定的会话间隔的该键的行中断,则这些行属于同一会话。例如,在包含流媒体平台用户操作数据的数据集中,您可以将一个用户工作流的所有行分组在一起,直到用户暂停操作。
依赖于时间的窗口(例如滚动事件时间窗口和会话窗口)最终会在水位线前进足够远时关闭。
- 如果未设置允许延迟或设置为零,窗口将保持打开状态,直到水位线超过窗口的结束时间,此时窗口关闭,可能产生输出,并删除其状态。
- 如果指定了允许延迟,窗口将保持打开状态,直到水位线超过窗口结束时间加上允许延迟。这允许延迟到达或乱序的记录仍然成为窗口的一部分,即使水位线已超过窗口结束时间。
- 当水位线超过窗口结束时间加上允许延迟时到达的行将始终被丢弃,因为窗口已关闭且其状态已被删除。
依赖于时间的窗口还允许指定自定义触发器。
在水位线停滞的情况下,窗口将保持打开状态更长时间。这可能导致行未被发出、无限制的状态增长或触发器未触发。
触发器¶
当前支持的触发器包括:
- 水位线后触发器: 当水位线超过窗口结束时间时,使窗口输出。允许指定其他自定义触发器,用于水位线在窗口结束时间之前以及水位线在窗口结束时间之后(由于允许延迟,窗口仍然存活)的情况。例如,用户可能希望在窗口关闭之前没有输出,但希望在允许延迟期间看到每个延迟到达记录的输出。
- 计数触发器: 给定用户指定的计数 n,对于每个键,在接收到 n 的倍数行后,使窗口输出。
- 窗口关闭触发器: 仅在窗口关闭并删除状态时触发输出。每个窗口仅精确触发一次,且仅在窗口结束时触发。
有状态流式处理最佳实践¶
大量状态可能对性能产生负面影响,因此在设计有状态流水线时,建议使用尽可能"紧凑"的状态过期策略。这通常意味着不要将缓存时间过期设置得比必要的大,也不要为计数窗口设置比必要大的计数。
对于需要大量状态的流水线,性能(包括吞吐量、检查点持续时间和延迟)与 Flink 作业的并行度成正比。可以在流式流水线设置中编辑并行度,更大的并行度允许增加数据处理能力和提高状态读写速度。
应为有状态转换选择合适的键,因为键列值过多或行分布不平衡可能导致瓶颈或扩展困难。