Getting started(入门指南)¶
This guide will show you how to set up data expectations in a Python transforms repository. For a high-level overview of data expectations, refer to the documentation.
Repository setup: transforms-expectations library¶
Open the library search panel on the left side of your Code Repository. The transforms-expectations library should already be installed. Validate this by checking the list of installed libraries. If transforms-expectations is not already installed, search for and install it now.
Transform setup¶
Import the expectations and Check into your transforms file:
from transforms.api import transform_df, Input, Output, Check
from transforms import expectations as E
For some common schema and column expectations you may want to import types as well:
from pyspark.sql import types as T
Create checks¶
The basic structure of a single check:
Check(expectation, 'Check unique name', on_error='WARN/FAIL')
- expectation - a single expectation, which can be a composite expectation (e.g. using an any/all operator) of multiple sub-expectations
- Check unique name - This must be unique in the transform (the same name cannot be shared among outputs and inputs) and will identify the check across apps (e.g. Data Health, Builds application)
- on_error - Defines the behavior of the job when expectations are not met:
FAIL(default) - Job will be aborted if check failsWARN- Job will continue and a warning will be generated and handled by Data Health
:::callout{theme="warning" title="Warning"}
Setting on_error='FAIL' is not supported for virtual table outputs in single-node transforms.
:::
Assign checks to dataset
Each check should be passed to a single input or output. Pass a single check as checks=check1 or multiple checks in an array: checks=[check1, check2, ...]
:::callout{title="Multiple checks"} Use multiple checks to create more legible Expectations structure and control the behavior of each meaningful check separately. :::
An example for a simple primary key check on the output:
```python tab="Polars" from transforms.api import transform, Input, Output, Check from transforms import expectations as E import polars as pl
@transform.using( output=Output( "/path/dataset", checks=Check(E.primary_key('id'), 'Primary Key', on_error='FAIL') ), input=Input("/path/input"), ) def compute(output, input): input_df = input.polars() return output.write_table(input_df)
```python tab="DuckDB"
from transforms.api import transform, Input, Output, Check
from transforms import expectations as E
@transform.using(
output=Output(
"/path/dataset",
checks=Check(E.primary_key('id'), 'Primary Key', on_error='FAIL')
),
input=Input("/path/input"),
)
def compute(ctx, output, input):
query = ctx.duckdb().conn.sql("SELECT * FROM input")
return output.write_table(query)
```python tab="Pandas" from transforms.api import transform, Input, Output, Check from transforms import expectations as E import pandas as pd
@transform.using( output=Output( "/path/dataset", checks=Check(E.primary_key('id'), 'Primary Key', on_error='FAIL') ), input=Input("/path/input"), ) def compute(output, input): input_df = input.pandas() return output.write_table(input_df)
```python tab="PySpark"
from transforms.api import transform_df, Input, Output, Check
from transforms import expectations as E
@transform_df(
Output(
"/path/dataset",
checks=Check(E.primary_key('id'), 'Primary Key', on_error='FAIL')
),
input=Input("/path/input")
)
def my_compute_function(input):
return input
Complex checks¶
You can also add more complex checks using composite expectations. For example, let us check that column age is not null in a given range. Notice that we can define the composite expectation and use it in multiple checks within the transform, applying different behavior on errors.
:::callout{theme="neutral"} A check is monitored as a whole even when it consists of a composite expectation. If you want to monitor (that is, watch and get notifications) specific parts of the composite expectation, it is recommended that you split it to several different checks. :::
```python tab="Polars" from transforms.api import transform, Input, Output, Check from transforms import expectations as E import polars as pl
We assume an age is valid if it is between 0 and 200.¶
expect_valid_age = E.all( E.col('age').non_null(), E.col('age').gte(0), E.col('age').lt(200) )
@transform.using( output=Output( "/path/dataset", checks=[ Check(E.primary_key('id'), 'Primary Key', on_error='FAIL'), Check(expect_valid_age, 'Valid age on output', on_error='FAIL') ] ), input=Input( "Users/data/input", checks=Check(expect_valid_age, 'Valid age on input', on_error='WARN') ) ) def compute(output, input): input_df = input.polars() return output.write_table(input_df)
```python tab="DuckDB"
from transforms.api import transform, Input, Output, Check
from transforms import expectations as E
# We assume an age is valid if it is between 0 and 200.
expect_valid_age = E.all(
E.col('age').non_null(),
E.col('age').gte(0),
E.col('age').lt(200)
)
@transform.using(
output=Output(
"/path/dataset",
checks=[
Check(E.primary_key('id'), 'Primary Key', on_error='FAIL'),
Check(expect_valid_age, 'Valid age on output', on_error='FAIL')
]
),
input=Input(
"Users/data/input",
checks=Check(expect_valid_age, 'Valid age on input', on_error='WARN')
)
)
def compute(ctx, output, input):
conn = ctx.duckdb().conn
query = conn.sql("SELECT * FROM input")
return output.write_table(query)
```python tab="Pandas" from transforms.api import transform, Input, Output, Check from transforms import expectations as E import pandas as pd
We assume an age is valid if it is between 0 and 200.¶
expect_valid_age = E.all( E.col('age').non_null(), E.col('age').gte(0), E.col('age').lt(200) )
@transform.using( output=Output( "/path/dataset", checks=[ Check(E.primary_key('id'), 'Primary Key', on_error='FAIL'), Check(expect_valid_age, 'Valid age on output', on_error='FAIL') ] ), input=Input( "Users/data/input", checks=Check(expect_valid_age, 'Valid age on input', on_error='WARN') ) ) def compute(output, input): input_df = input.pandas() return output.write_table(input_df)
```python tab="PySpark"
from transforms.api import transform_df, Input, Output, Check
from transforms import expectations as E
# We assume an age is valid if it is between 0 and 200.
expect_valid_age = E.all(
E.col('age').non_null(),
E.col('age').gte(0),
E.col('age').lt(200)
)
@transform_df(
Output(
"/Users/data/dataset",
checks=[
Check(E.primary_key('id'), 'Primary Key', on_error='FAIL'),
Check(expect_valid_age, 'Valid age on output', on_error='FAIL')
]
),
input=Input(
"Users/data/input",
checks=Check(expect_valid_age, 'Valid age on input', on_error='WARN')
)
)
def my_compute_function(input):
return input
中文翻译¶
入门指南¶
本指南将演示如何在Python转换仓库中设置数据期望(Data Expectations)。有关数据期望的概述,请参阅文档。
仓库设置:transforms-expectations库¶
打开代码仓库左侧的库搜索面板。transforms-expectations库应已安装。请检查已安装库列表以确认。如果尚未安装,请立即搜索并安装。
转换设置¶
在转换文件中导入expectations和Check:
from transforms.api import transform_df, Input, Output, Check
from transforms import expectations as E
如需使用常见的模式(schema)和列(column)期望,可同时导入types:
from pyspark.sql import types as T
创建检查¶
单个检查的基本结构:
Check(expectation, 'Check unique name', on_error='WARN/FAIL')
- expectation - 单个期望,可以是多个子期望的组合期望(例如使用any/all运算符)
- Check unique name - 在转换中必须唯一(输出和输入间不能共享相同名称),用于跨应用(如Data Health、Builds应用)标识该检查
- on_error - 定义期望未满足时的作业行为:
FAIL(默认)- 检查失败时作业将中止WARN- 作业继续执行,生成警告并由Data Health处理
:::callout{theme="warning" title="警告"}
在单节点转换中,不支持对虚拟表(Virtual Table)输出设置on_error='FAIL'。
:::
为数据集分配检查
每个检查应分配给单个输入或输出。单个检查使用checks=check1传递,多个检查使用数组:checks=[check1, check2, ...]
:::callout{title="多重检查"} 使用多个检查可以构建更清晰的期望结构,并分别控制每个有意义检查的行为。 :::
以下是对输出进行简单主键检查的示例:
```python tab="Polars" from transforms.api import transform, Input, Output, Check from transforms import expectations as E import polars as pl
@transform.using( output=Output( "/path/dataset", checks=Check(E.primary_key('id'), 'Primary Key', on_error='FAIL') ), input=Input("/path/input"), ) def compute(output, input): input_df = input.polars() return output.write_table(input_df)
```python tab="DuckDB"
from transforms.api import transform, Input, Output, Check
from transforms import expectations as E
@transform.using(
output=Output(
"/path/dataset",
checks=Check(E.primary_key('id'), 'Primary Key', on_error='FAIL')
),
input=Input("/path/input"),
)
def compute(ctx, output, input):
query = ctx.duckdb().conn.sql("SELECT * FROM input")
return output.write_table(query)
```python tab="Pandas" from transforms.api import transform, Input, Output, Check from transforms import expectations as E import pandas as pd
@transform.using( output=Output( "/path/dataset", checks=Check(E.primary_key('id'), 'Primary Key', on_error='FAIL') ), input=Input("/path/input"), ) def compute(output, input): input_df = input.pandas() return output.write_table(input_df)
```python tab="PySpark"
from transforms.api import transform_df, Input, Output, Check
from transforms import expectations as E
@transform_df(
Output(
"/path/dataset",
checks=Check(E.primary_key('id'), 'Primary Key', on_error='FAIL')
),
input=Input("/path/input")
)
def my_compute_function(input):
return input
复杂检查¶
您还可以使用组合期望添加更复杂的检查。例如,检查age列在指定范围内不为空。请注意,我们可以定义组合期望并在转换中的多个检查中使用它,对错误应用不同的行为。
:::callout{theme="neutral"} 即使检查由组合期望构成,它也会作为一个整体被监控。如果您希望监控(即观察并接收通知)组合期望的特定部分,建议将其拆分为多个不同的检查。 :::
```python tab="Polars" from transforms.api import transform, Input, Output, Check from transforms import expectations as E import polars as pl
我们假设年龄在0到200之间为有效¶
expect_valid_age = E.all( E.col('age').non_null(), E.col('age').gte(0), E.col('age').lt(200) )
@transform.using( output=Output( "/path/dataset", checks=[ Check(E.primary_key('id'), 'Primary Key', on_error='FAIL'), Check(expect_valid_age, 'Valid age on output', on_error='FAIL') ] ), input=Input( "Users/data/input", checks=Check(expect_valid_age, 'Valid age on input', on_error='WARN') ) ) def compute(output, input): input_df = input.polars() return output.write_table(input_df)
```python tab="DuckDB"
from transforms.api import transform, Input, Output, Check
from transforms import expectations as E
# 我们假设年龄在0到200之间为有效
expect_valid_age = E.all(
E.col('age').non_null(),
E.col('age').gte(0),
E.col('age').lt(200)
)
@transform.using(
output=Output(
"/path/dataset",
checks=[
Check(E.primary_key('id'), 'Primary Key', on_error='FAIL'),
Check(expect_valid_age, 'Valid age on output', on_error='FAIL')
]
),
input=Input(
"Users/data/input",
checks=Check(expect_valid_age, 'Valid age on input', on_error='WARN')
)
)
def compute(ctx, output, input):
conn = ctx.duckdb().conn
query = conn.sql("SELECT * FROM input")
return output.write_table(query)
```python tab="Pandas" from transforms.api import transform, Input, Output, Check from transforms import expectations as E import pandas as pd
我们假设年龄在0到200之间为有效¶
expect_valid_age = E.all( E.col('age').non_null(), E.col('age').gte(0), E.col('age').lt(200) )
@transform.using( output=Output( "/path/dataset", checks=[ Check(E.primary_key('id'), 'Primary Key', on_error='FAIL'), Check(expect_valid_age, 'Valid age on output', on_error='FAIL') ] ), input=Input( "Users/data/input", checks=Check(expect_valid_age, 'Valid age on input', on_error='WARN') ) ) def compute(output, input): input_df = input.pandas() return output.write_table(input_df)
```python tab="PySpark"
from transforms.api import transform_df, Input, Output, Check
from transforms import expectations as E
# 我们假设年龄在0到200之间为有效
expect_valid_age = E.all(
E.col('age').non_null(),
E.col('age').gte(0),
E.col('age').lt(200)
)
@transform_df(
Output(
"/Users/data/dataset",
checks=[
Check(E.primary_key('id'), 'Primary Key', on_error='FAIL'),
Check(expect_valid_age, 'Valid age on output', on_error='FAIL')
]
),
input=Input(
"Users/data/input",
checks=Check(expect_valid_age, 'Valid age on input', on_error='WARN')
)
)
def my_compute_function(input):
return input