Data expectations reference(数据期望参考)¶
Below is the categorized list of all available data expectations.
Operators¶
from transforms import expectations as E
E.true() # Always passes
E.false() # Always fails
E.all(e1,e2,...) # Passes when all sub-expectations pass
E.any(e1,e2,...) # Passes when any of the sub-expectations pass
E.negate(e1) # Passes when the sub-expectation fails
For example:
from transforms import expectations as E
E.all(
E.col('a').gt(0),
E.col('a').lt(100),
E.any(
E.col('b').gt(100),
E.col('b').lt(0)
)
)
Column expectations¶
Column expectations begin with E.col('column_name').
When using the any operator each row would be checked individually for any of the column expectations.
For example, to validate that the value of column c1 should be greater than 10 or lower than 0:
from transforms import expectations as E
E.any(
E.col("c1").lt(0),
E.col("c1").gt(10)
)
Greater or less than¶
from transforms import expectations as E
E.col('c').gt(number|string)
E.col('c').gte(number|string)
E.col('c').lt(number|string)
E.col('c').lte(number|string)
For example:
from transforms import expectations as E
E.col(‘age').lt(120)
:::callout{theme="neutral"}
This expectation ignores null values (meaning that null values will automatically pass). To check for nulls, use E.col('col').non_null().
:::
Column comparison¶
from transforms import expectations as E
E.col('c1').equals_col('c2') # Column c1 value is equal to column c2 value
E.col('c1').not_equals_col('c2') # Column c1 value is not equal to column c2 value
E.col('c1').gt_col('c2') # Column c1 value is greater than column c2 value
E.col('c1').gte_col('c2') # c1 value is greater than or equal c2 value
E.col('c1').lt_col('c2') # Column c1 value is lower than column c2 value
E.col('c1').lte_col('c2') # c1 value is lower than or equal c2 value
Property comparison¶
from transforms import expectations as E
E.col('c').null_percentage().lt(value)
E.col('c').null_count().gt(value)
E.col('c').distinct_count().equals(value)
E.col('c').approx_distinct_count().equals(value) # Faster version of distinct_count which guarantees a relative standard deviation of the error of max 5%
E.col('c').sum().gt(value) # Only works for numeric columns
E.col('c').standard_deviation_sample().gt(value) # Only works for numeric columns
E.col('c').standard_deviation_population().gt(value) # Only works for numeric columns
For example:
from transforms import expectations as E
E.col("myCol").null_percentage().lt(0.1) # myCol is less than 10% null values
E.col("myCol").null_count().gt(100) # myCol has more than 100 null values
E.col("myCol").distinct_count().equals(5) # myCol has 5 distinct values (since version 0.11.0)
E.col('myCol').approx_distinct_count().equals(5). # myCol has approximately 5 distinct values
E.col('myCol').sum().equals(5) # myCol values sum to 5
E.col('myCol').standard_deviation_sample().gt(5) # myCol has sample standard deviation greater than 5
E.col('myCol').standard_deviation_population().gt(5) # myCol has population standard deviation greater than 5
Equals¶
from transforms import expectations as E
E.col('c').equals(value) # Column value equals input
For example:
from transforms import expectations as E
E.col('test_column').equals("success")
:::callout{theme="neutral"}
This expectation ignores null values (meaning that null values will automatically pass). To check for nulls, use E.col('col').non_null().
:::
Not equals¶
from transforms import expectations as E
E.col('c').not_equals(value) # Column value does not equal input
For example:
from transforms import expectations as E
E.col('test_column').not_equals("failure")
:::callout{theme="neutral"}
This expectation ignores null values (meaning that null values will automatically pass). To check for nulls, use E.col('col').non_null().
:::
Null¶
from transforms import expectations as E
E.col('c').non_null() # Column value is not null
E.col('c').is_null() # Column value is null
Is in¶
This expectation verifies that the column value is within a list of approved values. For Array columns, see is in (array).
from transforms import expectations as E
E.col('c').is_in(a, b, ...) # Column value is in given list
:::callout{theme="neutral"}
This expectation fails on null values unless you add None to the allowed values.
:::
rlike (regex)¶
Regex partial match, similar to pyspark.sql.functions.rlike.
from transforms import expectations as E
E.col('c').rlike(regex expression) # Column value matches a regex expression (partial match)
For example:
from transforms import expectations as E
E.col('flight_number').rlike(r"^\D{2}\d{2,4}$")
:::callout{theme="neutral"}
This expectation ignores null values (meaning that null values will automatically pass). To check for nulls, use E.col('col').non_null().
:::
Has type¶
from transforms import expectations as E
E.col('c').has_type(Type) # Column 'c' is of type Type
:::callout{theme="neutral"} Type expectations leverage Polars types ↗ for single-node compute, regardless of engine, and Spark SQL types ↗ for PySpark compute. You must import the relevant library and reference types appropriately, as in the below example. :::
For example:
```python tab="Single-node (Lightweight)" from transforms import expectations as E import polars as pl
E.col('age').has_type(pl.Int32)
```python tab="PySpark"
from transforms import expectations as E
from pyspark.sql import types as T
E.col('age').has_type(T.LongType())
Exists¶
from transforms import expectations as E
E.col('c').exists() # Column 'c' exists in the output dataframe
This expectation checks whether a column with the provided name exists in the output dataframe. The column may be of any type.
Timestamp expectations¶
Timestamp expectations will only work on columns of type Timestamp. Date columns are not currently supported.
Static timestamp comparison¶
Compare the values in a timestamp column with a static timestamp. The static timestamp can be provided either as an ISO8601 formatted string, or as a Python datetime object. All timestamps have to be timezone aware to avoid ambiguity.
:::callout{theme="warning" title="Warning"}
Never use the static timestamp expectation with a timestamp derived from `datetime.now()`. While this might initially appear to yield correct results, this behavior is unsupported and could result in incorrect results without warning. Additionally, Data Health and messages across Foundry will not reference the correct timestamp if you use a static timestamp expectation with a timestamp derived from datetime.now(). Instead, use relative timestamp comparison expectations.
:::
from transforms import expectations as E
E.col("timestamp").is_after("2020-12-14T11:32:23+0000")
E.col("timestamp").is_before(datetime(2017, 11, 28, 23, 55, 59, 342380))
E.col("timestamp").is_on_or_after("2020-12-14T11:32:23+0000")
E.col("timestamp").is_on_or_before("2020-12-14T11:32:23+0000")
Column timestamp comparison¶
Compare the values in a timestamp column against the values in another timestamp column. An optional offset (integer number of seconds) can be provided, and will be added to the values of the other column.
The comparison will be:
first_column ($OPERATOR) second_column + offset_in_seconds
from transforms import expectations as E
E.col("timestamp").is_after_col("second_timestamp")
E.col("timestamp").is_on_or_after_col("second_timestamp")
# Operators accept an optional offset_in_second argument
# Check `second_timestamp` is less than an hour after `timestamp`
E.col("timestamp").is_before_col("second_timestamp", 3600)
# Check `second_timestamp` is more than 2 hours before `timestamp`
E.col("timestamp").is_on_or_before_col("second_timestamp", -7200)
Relative timestamp comparison¶
Compare the values of a timestamp column against the time at which the check is run (such as when the build happens) plus a user-specified offset. The offset can be provided as an integer number of seconds, or as a timedelta Python object.
:::callout{theme="warning" title="Relative timestamp comparison precision"} We expect the relative timestamp comparison to be precise up to a few minutes. This is due to the imprecision for the time at which the check is instantiated or run. The exact timestamps used will be available after the check runs and presented in the Expectations check result. :::
Two main methods are provided: timestamp_offset_from_current_time and timestamp_offset_to_current_time. We provide two different methods to help with reasonning against relative time offsets in a natural way. Therefore, we only support positive time offsets as arguments. If you need to use a negative offset, consider using the other method instead.
timestamp_offset_from_current_time¶
This method is intended for use relative times in the future, where timestamp - now() is a positive value. This value will then be compared to the provided offset. All regular comparison operators are available for comparison.
from datetime import timedelta
from transforms import expectations as E
# Timestamp values are less than 1 hour in the future
A = E.col("timestamp").timestamp_offset_from_current_time().lt(3600)
# Timestamp values are more than 2 hours in the future
B = E.col("timestamp").timestamp_offset_from_current_time().gt(timedelta(hours=2))
timestamp_offset_to_current_time¶
This method is intended for use with relative times in the past, where now() - timestamp is a positive value. This value will then be compared to the provided offset. All regular comparison operators are available for comparison.
from datetime import timedelta
from transforms import expectations as E
# Timestamp values are less than 90 minutes in the past
C = E.col("timestamp").timestamp_offset_to_current_time().lt(5400)
# Timestamp values are more than 2 hours in the past
D = E.col("timestamp").timestamp_offset_to_current_time().gt(timedelta(hours=2))
Example: Expected results¶
Assuming the check is running at 4pm on January 1st, here are the results we expect for the checks above for different values of timestamp.
< ------- PAST --------------------- NOW -------------------- FUTURE ------>
| 1pm | 2pm | 3pm | 4pm | 5pm | 6pm | 7pm |
---+----------+----------+----------+----------+----------+----------+----------+
A | PASS | PASS | PASS | PASS | FAIL | FAIL | FAIL |
---+----------+----------+----------+----------+----------+----------+----------+
B | FAIL | FAIL | FAIL | FAIL | FAIL | FAIL* | PASS |
---+----------+----------+----------+----------+----------+----------+----------+
C | FAIL | FAIL | PASS | PASS | PASS | PASS | PASS |
---+----------+----------+----------+----------+----------+----------+----------+
D | PASS | FAIL* | FAIL | FAIL | FAIL | FAIL | FAIL |
---+----------+----------+----------+----------+----------+----------+----------+
The comparisons in checks B and D are strict comparison. Use ge and le for non-strict comparison.
Group and property timestamp comparison¶
Most timestamp comparisons are also available on derived properties of regular dataframes or grouped dataframes.
from datetime import timedelta
from transforms import expectations as E
# Check that the highest timestamp is after a given static date
E.col("timestamp").max_value().is_after("2020-12-14T12:23:50+0000")
# Check that the oldest timestamp is less than 1 day in the past
E.col("timestamp").min_value().timestamp_offset_to_current_time().lt(timedelta(days=1))
# Check that the last date in each category is more than 2 month in the future
E.group_by("category")
.col("timestamp")
.max_value()
.timestamp_offset_from_current_time()
.gt(timedelta(months=2))
Array expectations¶
Not all expectations work for array-type columns. Array-type columns can only use the specific expectations described below.
Array is in¶
The is_in expectation also works on columns of array type.
For arrays, this expectation tests that arrays only contain the values specified in the is_in clause.
from transforms import expectations as E
E.col('array_col').is_in('a', 'b', 'c') # Any array in the 'array_col' can only contain values 'a', 'b' or 'c'.
Array contains¶
The array_contains expectation allows you to check that each row of the array column contains a specific value.
from transforms import expectations as E
E.col('array_col').array_contains('a') # All rows must contain value 'a' in 'array_col'.
Array size¶
The size expectation allows you to check that each row of the array has a specific size.
from transforms import expectations as E
E.col('array_col').size().gt(1) # 'array_col' must have length greater than 1.
E.col('array_col').size().equals(2) # 'array_col' must have length equal to 2.
Group-by expectations¶
Group-by expectations begin with E.group_by('column_1', 'column_2', ...). Group-by expectations allow setting expectations on a combination of columns.
Is unique¶
from transforms import expectations as E
E.group_by('col1', 'col2').is_unique() # When combined together, the values of the combined columns are unique within the dataset
Row count¶
Row count expectation tests the row count for each group. If the group_by is empty, this tests against the row count of the entire dataset.
from transforms import expectations as E
E.group_by('col1', 'col2').count().gt(100) # For each group by 'col1', 'col2', the row count must be greater than 100
E.group_by().count().lt(100) # The row count of the dataset is less than 100.
E.count().equals(0) # Shorthand for an empty group_by. Assert row count of the dataset is equal to 0.
Group-by property expectations¶
All property comparison expectations can also be used as grouped expectations.
from transforms import expectations as E
E.group_by('col1').col('value_col').distinct_count().equals(3) # For each group by 'col1', the distinct count of 'value_col' must be equal to 3.
E.group_by('col1').col('value_col').null_percentage().lt(0.5) # For each group by 'col1', the null percentage of 'value_col' must be less than 50%.
Primary key¶
Primary key expectations take one or more column names and verify:
- Each column has no null values
- The combination of columns is unique
from transforms import expectations as E
E.primary_key('c1') # Column `c1` is unique and not null.
E.primary_key('c1', 'c2',...) # Columns {'c1', 'c2',...} are each not null and together are unique
| Expectation | Description | Example |
|---|---|---|
| E.primary_key('c1') | Column c1 is unique and not null |
E.primary_key('object_id') |
| E.primary_key('c1', 'c2',...) | Columns {'c1', 'c2',...} are each not null and together are unique | E.primary_key('time', 'event') |
For example:
from transforms import expectations as E
E.primary_key('time', 'event')
Schema expectations¶
All schema expectations start with E.schema().
Use the appropriate data type checks based on the compute engine used for your transform. For the default single-node (lightweight) transforms, use checks with Polars data types ↗. For distributed (Spark) transforms, use checks with PySpark SQL types ↗.
from transforms import expectations as E
E.schema().contains({'col_name':type}) # Dataset columns must contain the listed columns.
E.schema().equals({'col_name':type}) # Dataset columns match exactly the listed columns (no additions)
E.schema().is_subset_of({'col_name':type}) # Dataset columns must be a subset of the listed columns.
# All columns in the dataset must be defined in the check.
# Columns can be defined in the check without being present in the dataset.
For example:
```python tab="Lightweight" from transforms import expectations as E import polars as pl
E.schema().contains( { 'name': pl.String(), 'int_list': pl.List(pl.Int32) } )
```python tab="Distributed"
from transforms import expectations as E
from pyspark.sql import types as T
E.schema().contains(
{
'id': T.IntegerType(),
'name': T.StringType()
}
)
Conditional¶
Conditional expectations take three expectations and verify:
- Rows passing the when-expectation also pass the then-expectation
- Rows failing the when-expectation pass the otherwise-expectations
from transforms import expectations as E
E.when(
when_exp,
then_exp
).otherwise(
otherwise_exp
)
For example, when "myCol" is greater than 0, then "myOtherCol" must be in ["a"], otherwise "myOtherCol" must be in ["b"].
from transforms import expectations as E
E.when(
E.col("myCol").gt(0),
E.col("myOtherCol").is_in("a")
).otherwise(
E.col("myOtherCol").is_in("b")
)
:::callout{title="Always true / always false expectations"}
Use E.true() and E.false() to set simple defaults for the otherwise branch of the conditional expectation.
:::
Foreign value expectations [Experimental]¶
:::callout{theme="warning" title="Experimental"} Foreign value expectations are in the experimental phase of development and may not be available on your enrollment. Functionality may change during active development. :::
Foreign value expectations verify relationships between data in different datasets. These expectations involve joins and can be very expensive to evaluate, so use cautiously.
Referential integrity¶
This expectation verifies that all values in the selected column of an expected dataset are present in a specified column of a foreign dataset. Null values are ignored.
The foreign column to match against is qualified by a dataset reference created using the name of the other dataset: E.dataset_ref('other_dataset_name').col('f_col').
The foreign datataset must be an input to your transform (you cannot simply pass in a RID or a path), and the reference in E.dataset_ref should be the name of the variable to which it is assigned.
Using the column reference is similar to the usage of is_in():
E.col('pk').is_in_foreign_col(E.dataset_ref('other_dataset').col('fk'))
Cross-dataset row count comparisons¶
Cross-dataset row count comparisons can be used to compare the number of rows in one dataset with the number of rows in another dataset.
For example, we can check that an output row count is the same as the row count of an input dataset:
E.count().equals(E.dataset_ref('input_dataset_name').count())
The dataset to compare against is qualified with a dataset reference created using the name of the other dataset.
You can use the following operators for dataset row count comparisons:
from transforms import expectations as E
E.count().equals(E.dataset_ref('input_dataset_name').count()) # Equal to
E.count().lt(E.dataset_ref('input_dataset_name').count()) # Less than
E.count().lte(E.dataset_ref('input_dataset_name').count()) # Less than or equal to
E.count().gte(E.dataset_ref('input_dataset_name').count()) # Greater than or equal to
E.count().gt(E.dataset_ref('input_dataset_name').count()) # Greater than
中文翻译¶
数据期望参考¶
以下是所有可用数据期望的分类列表。
运算符¶
from transforms import expectations as E
E.true() # 始终通过
E.false() # 始终失败
E.all(e1,e2,...) # 当所有子期望都通过时通过
E.any(e1,e2,...) # 当任一子期望通过时通过
E.negate(e1) # 当子期望失败时通过
例如:
from transforms import expectations as E
E.all(
E.col('a').gt(0),
E.col('a').lt(100),
E.any(
E.col('b').gt(100),
E.col('b').lt(0)
)
)
列期望¶
列期望以 E.col('column_name') 开头。
使用 any 运算符时,每一行将单独检查是否满足任一列期望。
例如,验证列 c1 的值应大于 10 或小于 0:
from transforms import expectations as E
E.any(
E.col("c1").lt(0),
E.col("c1").gt(10)
)
大于或小于¶
from transforms import expectations as E
E.col('c').gt(number|string)
E.col('c').gte(number|string)
E.col('c').lt(number|string)
E.col('c').lte(number|string)
例如:
from transforms import expectations as E
E.col(‘age').lt(120)
:::callout{theme="neutral"}
此期望会忽略空值(即空值将自动通过)。要检查空值,请使用 E.col('col').non_null()。
:::
列比较¶
from transforms import expectations as E
E.col('c1').equals_col('c2') # 列 c1 的值等于列 c2 的值
E.col('c1').not_equals_col('c2') # 列 c1 的值不等于列 c2 的值
E.col('c1').gt_col('c2') # 列 c1 的值大于列 c2 的值
E.col('c1').gte_col('c2') # c1 的值大于或等于 c2 的值
E.col('c1').lt_col('c2') # 列 c1 的值小于列 c2 的值
E.col('c1').lte_col('c2') # c1 的值小于或等于 c2 的值
属性比较¶
from transforms import expectations as E
E.col('c').null_percentage().lt(value)
E.col('c').null_count().gt(value)
E.col('c').distinct_count().equals(value)
E.col('c').approx_distinct_count().equals(value) # distinct_count 的更快版本,保证误差的相对标准差不超过 5%
E.col('c').sum().gt(value) # 仅适用于数值列
E.col('c').standard_deviation_sample().gt(value) # 仅适用于数值列
E.col('c').standard_deviation_population().gt(value) # 仅适用于数值列
例如:
from transforms import expectations as E
E.col("myCol").null_percentage().lt(0.1) # myCol 的空值比例小于 10%
E.col("myCol").null_count().gt(100) # myCol 有超过 100 个空值
E.col("myCol").distinct_count().equals(5) # myCol 有 5 个不同的值(自版本 0.11.0 起)
E.col('myCol').approx_distinct_count().equals(5) # myCol 大约有 5 个不同的值
E.col('myCol').sum().equals(5) # myCol 的值总和为 5
E.col('myCol').standard_deviation_sample().gt(5) # myCol 的样本标准差大于 5
E.col('myCol').standard_deviation_population().gt(5) # myCol 的总体标准差大于 5
等于¶
from transforms import expectations as E
E.col('c').equals(value) # 列值等于输入值
例如:
from transforms import expectations as E
E.col('test_column').equals("success")
:::callout{theme="neutral"}
此期望会忽略空值(即空值将自动通过)。要检查空值,请使用 E.col('col').non_null()。
:::
不等于¶
from transforms import expectations as E
E.col('c').not_equals(value) # 列值不等于输入值
例如:
from transforms import expectations as E
E.col('test_column').not_equals("failure")
:::callout{theme="neutral"}
此期望会忽略空值(即空值将自动通过)。要检查空值,请使用 E.col('col').non_null()。
:::
空值¶
from transforms import expectations as E
E.col('c').non_null() # 列值不为空
E.col('c').is_null() # 列值为空
属于¶
此期望验证列值是否在允许值的列表中。 对于数组列,请参见 属于(数组)。
from transforms import expectations as E
E.col('c').is_in(a, b, ...) # 列值在给定的列表中
:::callout{theme="neutral"}
此期望在遇到空值时失败,除非您将 None 添加到允许值中。
:::
rlike(正则表达式)¶
正则表达式部分匹配,类似于 pyspark.sql.functions.rlike。
from transforms import expectations as E
E.col('c').rlike(regex expression) # 列值匹配正则表达式(部分匹配)
例如:
from transforms import expectations as E
E.col('flight_number').rlike(r"^\D{2}\d{2,4}$")
:::callout{theme="neutral"}
此期望会忽略空值(即空值将自动通过)。要检查空值,请使用 E.col('col').non_null()。
:::
具有类型¶
from transforms import expectations as E
E.col('c').has_type(Type) # 列 'c' 的类型为 Type
:::callout{theme="neutral"} 类型期望在单节点计算中使用 Polars 类型 ↗,无论使用何种引擎;在 PySpark 计算中使用 Spark SQL 类型 ↗。您必须导入相关库并正确引用类型,如下例所示。 :::
例如:
```python tab="单节点(轻量级)" from transforms import expectations as E import polars as pl
E.col('age').has_type(pl.Int32)
```python tab="PySpark"
from transforms import expectations as E
from pyspark.sql import types as T
E.col('age').has_type(T.LongType())
存在¶
from transforms import expectations as E
E.col('c').exists() # 列 'c' 存在于输出数据框中
此期望检查具有指定名称的列是否存在于输出数据框中。 该列可以是任何类型。
时间戳期望¶
时间戳期望仅适用于 Timestamp 类型的列。目前不支持 Date 列。
静态时间戳比较¶
将时间戳列中的值与静态时间戳进行比较。静态时间戳可以以 ISO8601 格式的字符串或 Python datetime 对象的形式提供。 所有时间戳必须具有时区信息以避免歧义。
:::callout{theme="warning" title="警告"}
切勿将静态时间戳期望与从 datetime.now() 派生的时间戳一起使用。虽然这最初可能看起来能产生正确的结果,但此行为不受支持,并且可能导致在无警告的情况下产生错误结果。此外,如果您将静态时间戳期望与从 datetime.now() 派生的时间戳一起使用,数据健康(Data Health)和 Foundry 中的消息将不会引用正确的时间戳。请改用相对时间戳比较期望。
:::
from transforms import expectations as E
E.col("timestamp").is_after("2020-12-14T11:32:23+0000")
E.col("timestamp").is_before(datetime(2017, 11, 28, 23, 55, 59, 342380))
E.col("timestamp").is_on_or_after("2020-12-14T11:32:23+0000")
E.col("timestamp").is_on_or_before("2020-12-14T11:32:23+0000")
列时间戳比较¶
将一个时间戳列中的值与另一个时间戳列中的值进行比较。可以提供可选的偏移量(整数秒),该偏移量将添加到另一列的值中。
比较方式为:
第一列 ($运算符) 第二列 + 偏移量(秒)
from transforms import expectations as E
E.col("timestamp").is_after_col("second_timestamp")
E.col("timestamp").is_on_or_after_col("second_timestamp")
# 运算符接受可选的 offset_in_second 参数
# 检查 `second_timestamp` 是否在 `timestamp` 之后不到一小时
E.col("timestamp").is_before_col("second_timestamp", 3600)
# 检查 `second_timestamp` 是否在 `timestamp` 之前超过 2 小时
E.col("timestamp").is_on_or_before_col("second_timestamp", -7200)
相对时间戳比较¶
将时间戳列的值与检查运行的时间(例如构建发生的时间)加上用户指定的偏移量进行比较。偏移量可以以整数秒或 timedelta Python 对象的形式提供。
:::callout{theme="warning" title="相对时间戳比较精度"} 我们预计相对时间戳比较的精度可达几分钟。这是由于检查实例化或运行的时间存在不精确性。检查运行后,将提供所使用的确切时间戳,并显示在期望检查结果中。 :::
提供了两种主要方法:timestamp_offset_from_current_time 和 timestamp_offset_to_current_time。我们提供两种不同的方法,以帮助以自然的方式推理相对时间偏移。因此,我们只支持正的时间偏移作为参数。如果您需要使用负偏移,请考虑改用另一种方法。
timestamp_offset_from_current_time¶
此方法用于未来的相对时间,其中 timestamp - now() 为正值。然后将此值与提供的偏移量进行比较。所有常规比较运算符都可用于比较。
from datetime import timedelta
from transforms import expectations as E
# 时间戳值在未来不到 1 小时
A = E.col("timestamp").timestamp_offset_from_current_time().lt(3600)
# 时间戳值在未来超过 2 小时
B = E.col("timestamp").timestamp_offset_from_current_time().gt(timedelta(hours=2))
timestamp_offset_to_current_time¶
此方法用于过去的相对时间,其中 now() - timestamp 为正值。然后将此值与提供的偏移量进行比较。所有常规比较运算符都可用于比较。
from datetime import timedelta
from transforms import expectations as E
# 时间戳值在过去不到 90 分钟
C = E.col("timestamp").timestamp_offset_to_current_time().lt(5400)
# 时间戳值在过去超过 2 小时
D = E.col("timestamp").timestamp_offset_to_current_time().gt(timedelta(hours=2))
示例:预期结果¶
假设检查在 1 月 1 日下午 4 点运行,以下是对于不同时间戳值,上述检查的预期结果。
< ------- 过去 --------------------- 现在 -------------------- 未来 ------>
| 下午1点 | 下午2点 | 下午3点 | 下午4点 | 下午5点 | 下午6点 | 下午7点 |
---+----------+----------+----------+----------+----------+----------+----------+
A | 通过 | 通过 | 通过 | 通过 | 失败 | 失败 | 失败 |
---+----------+----------+----------+----------+----------+----------+----------+
B | 失败 | 失败 | 失败 | 失败 | 失败 | 失败* | 通过 |
---+----------+----------+----------+----------+----------+----------+----------+
C | 失败 | 失败 | 通过 | 通过 | 通过 | 通过 | 通过 |
---+----------+----------+----------+----------+----------+----------+----------+
D | 通过 | 失败* | 失败 | 失败 | 失败 | 失败 | 失败 |
---+----------+----------+----------+----------+----------+----------+----------+
检查 B 和 D 中的比较是严格比较。对于非严格比较,请使用 ge 和 le。
分组和属性时间戳比较¶
大多数时间戳比较也可用于常规数据框或分组数据框的派生属性。
from datetime import timedelta
from transforms import expectations as E
# 检查最高时间戳是否在给定的静态日期之后
E.col("timestamp").max_value().is_after("2020-12-14T12:23:50+0000")
# 检查最早的时间戳是否在过去不到 1 天
E.col("timestamp").min_value().timestamp_offset_to_current_time().lt(timedelta(days=1))
# 检查每个类别中的最后日期是否在未来超过 2 个月
E.group_by("category")
.col("timestamp")
.max_value()
.timestamp_offset_from_current_time()
.gt(timedelta(months=2))
数组期望¶
并非所有期望都适用于数组类型的列。数组类型的列只能使用下面描述的特定期望。
数组属于¶
is_in 期望也适用于数组类型的列。
对于数组,此期望测试数组是否仅包含 is_in 子句中指定的值。
from transforms import expectations as E
E.col('array_col').is_in('a', 'b', 'c') # 'array_col' 中的任何数组只能包含值 'a'、'b' 或 'c'。
数组包含¶
array_contains 期望允许您检查数组列的每一行是否包含特定值。
from transforms import expectations as E
E.col('array_col').array_contains('a') # 所有行必须在 'array_col' 中包含值 'a'。
数组大小¶
size 期望允许您检查数组的每一行是否具有特定大小。
from transforms import expectations as E
E.col('array_col').size().gt(1) # 'array_col' 的长度必须大于 1。
E.col('array_col').size().equals(2) # 'array_col' 的长度必须等于 2。
分组期望¶
分组期望以 E.group_by('column_1', 'column_2', ...) 开头。分组期望允许对列的组合设置期望。
唯一¶
from transforms import expectations as E
E.group_by('col1', 'col2').is_unique() # 当组合在一起时,组合列的值在数据集中是唯一的
行数¶
行数期望测试每个组的行数。如果 group_by 为空,则测试整个数据集的行数。
from transforms import expectations as E
E.group_by('col1', 'col2').count().gt(100) # 对于每个按 'col1'、'col2' 分组的组,行数必须大于 100
E.group_by().count().lt(100) # 数据集的行数小于 100。
E.count().equals(0) # 空 group_by 的简写。断言数据集的行数等于 0。
分组属性期望¶
所有属性比较期望也可以用作分组期望。
from transforms import expectations as E
E.group_by('col1').col('value_col').distinct_count().equals(3) # 对于每个按 'col1' 分组的组,'value_col' 的不同值计数必须等于 3。
E.group_by('col1').col('value_col').null_percentage().lt(0.5) # 对于每个按 'col1' 分组的组,'value_col' 的空值比例必须小于 50%。
主键¶
主键期望接受一个或多个列名并验证:
- 每个列没有空值
- 列的组合是唯一的
from transforms import expectations as E
E.primary_key('c1') # 列 `c1` 是唯一的且不为空。
E.primary_key('c1', 'c2',...) # 列 {'c1', 'c2',...} 各自不为空且组合起来是唯一的
| 期望 | 描述 | 示例 |
|---|---|---|
| E.primary_key('c1') | 列 c1 是唯一的且不为空 |
E.primary_key('object_id') |
| E.primary_key('c1', 'c2',...) | 列 {'c1', 'c2',...} 各自不为空且组合起来是唯一的 | E.primary_key('time', 'event') |
例如:
from transforms import expectations as E
E.primary_key('time', 'event')
模式期望¶
所有模式期望都以 E.schema() 开头。
根据您的转换所使用的计算引擎,使用适当的数据类型检查。对于默认的单节点(轻量级)转换,使用 Polars 数据类型 ↗ 进行检查。对于分布式(Spark)转换,使用 PySpark SQL 类型 ↗ 进行检查。
from transforms import expectations as E
E.schema().contains({'col_name':type}) # 数据集列必须包含列出的列。
E.schema().equals({'col_name':type}) # 数据集列必须与列出的列完全匹配(无额外列)
E.schema().is_subset_of({'col_name':type}) # 数据集列必须是列出列的子集。
# 数据集中的所有列都必须在检查中定义。
# 检查中可以定义数据集中不存在的列。
例如:
```python tab="轻量级" from transforms import expectations as E import polars as pl
E.schema().contains( { 'name': pl.String(), 'int_list': pl.List(pl.Int32) } )
```python tab="分布式"
from transforms import expectations as E
from pyspark.sql import types as T
E.schema().contains(
{
'id': T.IntegerType(),
'name': T.StringType()
}
)
条件期望¶
条件期望接受三个期望并验证:
- 通过 when 期望的行也必须通过 then 期望
- 未通过 when 期望的行必须通过 otherwise 期望
from transforms import expectations as E
E.when(
when_exp,
then_exp
).otherwise(
otherwise_exp
)
例如,当 "myCol" 大于 0 时,则 "myOtherCol" 必须在 ["a"] 中,否则 "myOtherCol" 必须在 ["b"] 中。
from transforms import expectations as E
E.when(
E.col("myCol").gt(0),
E.col("myOtherCol").is_in("a")
).otherwise(
E.col("myOtherCol").is_in("b")
)
:::callout{title="始终为真 / 始终为假期望"}
使用 E.true() 和 E.false() 为条件期望的 otherwise 分支设置简单的默认值。
:::
外键值期望 [实验性]¶
:::callout{theme="warning" title="实验性"} 外键值期望处于实验性开发阶段,可能无法在您的环境中使用。功能在活跃开发期间可能会发生变化。 :::
外键值期望验证不同数据集中数据之间的关系。这些期望涉及连接操作,评估成本可能非常高,因此请谨慎使用。
参照完整性¶
此期望验证期望数据集选定列中的所有值是否都存在于外部数据集指定列中。空值将被忽略。
要匹配的外部列通过使用另一个数据集的名称创建的数据集引用来限定:E.dataset_ref('other_dataset_name').col('f_col')。
外部数据集必须是您转换的输入(您不能简单地传入 RID 或路径),并且 E.dataset_ref 中的引用应该是分配给该数据集的变量名称。
使用列引用类似于使用 is_in():
E.col('pk').is_in_foreign_col(E.dataset_ref('other_dataset').col('fk'))
跨数据集行数比较¶
跨数据集行数比较可用于比较一个数据集中的行数与另一个数据集中的行数。
例如,我们可以检查输出行数是否与输入数据集的行数相同:
E.count().equals(E.dataset_ref('input_dataset_name').count())
要比较的数据集通过使用另一个数据集的名称创建的数据集引用来限定。
您可以使用以下运算符进行数据集行数比较:
from transforms import expectations as E
E.count().equals(E.dataset_ref('input_dataset_name').count()) # 等于
E.count().lt(E.dataset_ref('input_dataset_name').count()) # 小于
E.count().lte(E.dataset_ref('input_dataset_name').count()) # 小于或等于
E.count().gte(E.dataset_ref('input_dataset_name').count()) # 大于或等于
E.count().gt(E.dataset_ref('input_dataset_name').count()) # 大于