跳转至

Getting started(入门指南)

This guide will show you how to set up data expectations in a Python transforms repository. For a high-level overview of data expectations, refer to the documentation.

Repository setup: transforms-expectations library

Open the library search panel on the left side of your Code Repository. The transforms-expectations library should already be installed. Validate this by checking the list of installed libraries. If transforms-expectations is not already installed, search for and install it now.

Transform setup

Import the expectations and Check into your transforms file:

from transforms.api import transform_df, Input, Output, Check
from transforms import expectations as E

For some common schema and column expectations you may want to import types as well:

from pyspark.sql import types as T

Create checks

The basic structure of a single check:

Check(expectation, 'Check unique name', on_error='WARN/FAIL')
  • expectation - a single expectation, which can be a composite expectation (e.g. using an any/all operator) of multiple sub-expectations
  • Check unique name - This must be unique in the transform (the same name cannot be shared among outputs and inputs) and will identify the check across apps (e.g. Data Health, Builds application)
  • on_error - Defines the behavior of the job when expectations are not met:
  • FAIL (default) - Job will be aborted if check fails
  • WARN - Job will continue and a warning will be generated and handled by Data Health

:::callout{theme="warning" title="Warning"} Setting on_error='FAIL' is not supported for virtual table outputs in single-node transforms. :::

Assign checks to dataset

Each check should be passed to a single input or output. Pass a single check as checks=check1 or multiple checks in an array: checks=[check1, check2, ...]

:::callout{title="Multiple checks"} Use multiple checks to create more legible Expectations structure and control the behavior of each meaningful check separately. :::

An example for a simple primary key check on the output:

```python tab="Polars" from transforms.api import transform, Input, Output, Check from transforms import expectations as E import polars as pl

@transform.using( output=Output( "/path/dataset", checks=Check(E.primary_key('id'), 'Primary Key', on_error='FAIL') ), input=Input("/path/input"), ) def compute(output, input): input_df = input.polars() return output.write_table(input_df)

```python tab="DuckDB"
from transforms.api import transform, Input, Output, Check
from transforms import expectations as E


@transform.using(
    output=Output(
        "/path/dataset",
        checks=Check(E.primary_key('id'), 'Primary Key', on_error='FAIL')
    ),
    input=Input("/path/input"),
)
def compute(ctx, output, input):
    query = ctx.duckdb().conn.sql("SELECT * FROM input")
    return output.write_table(query)

```python tab="Pandas" from transforms.api import transform, Input, Output, Check from transforms import expectations as E import pandas as pd

@transform.using( output=Output( "/path/dataset", checks=Check(E.primary_key('id'), 'Primary Key', on_error='FAIL') ), input=Input("/path/input"), ) def compute(output, input): input_df = input.pandas() return output.write_table(input_df)

```python tab="PySpark"
from transforms.api import transform_df, Input, Output, Check
from transforms import expectations as E


@transform_df(
    Output(
        "/path/dataset",
        checks=Check(E.primary_key('id'), 'Primary Key', on_error='FAIL')
    ),
    input=Input("/path/input")
)
def my_compute_function(input):
    return input

Complex checks

You can also add more complex checks using composite expectations. For example, let us check that column age is not null in a given range. Notice that we can define the composite expectation and use it in multiple checks within the transform, applying different behavior on errors.

:::callout{theme="neutral"} A check is monitored as a whole even when it consists of a composite expectation. If you want to monitor (that is, watch and get notifications) specific parts of the composite expectation, it is recommended that you split it to several different checks. :::

```python tab="Polars" from transforms.api import transform, Input, Output, Check from transforms import expectations as E import polars as pl

We assume an age is valid if it is between 0 and 200.

expect_valid_age = E.all( E.col('age').non_null(), E.col('age').gte(0), E.col('age').lt(200) )

@transform.using( output=Output( "/path/dataset", checks=[ Check(E.primary_key('id'), 'Primary Key', on_error='FAIL'), Check(expect_valid_age, 'Valid age on output', on_error='FAIL') ] ), input=Input( "Users/data/input", checks=Check(expect_valid_age, 'Valid age on input', on_error='WARN') ) ) def compute(output, input): input_df = input.polars() return output.write_table(input_df)

```python tab="DuckDB"
from transforms.api import transform, Input, Output, Check
from transforms import expectations as E


# We assume an age is valid if it is between 0 and 200.
expect_valid_age = E.all(
    E.col('age').non_null(),
    E.col('age').gte(0),
    E.col('age').lt(200)
)


@transform.using(
    output=Output(
        "/path/dataset",
        checks=[
            Check(E.primary_key('id'), 'Primary Key', on_error='FAIL'),
            Check(expect_valid_age, 'Valid age on output', on_error='FAIL')
        ]
    ),
    input=Input(
        "Users/data/input",
        checks=Check(expect_valid_age, 'Valid age on input', on_error='WARN')
    )
)
def compute(ctx, output, input):
    conn = ctx.duckdb().conn
    query = conn.sql("SELECT * FROM input")
    return output.write_table(query)

```python tab="Pandas" from transforms.api import transform, Input, Output, Check from transforms import expectations as E import pandas as pd

We assume an age is valid if it is between 0 and 200.

expect_valid_age = E.all( E.col('age').non_null(), E.col('age').gte(0), E.col('age').lt(200) )

@transform.using( output=Output( "/path/dataset", checks=[ Check(E.primary_key('id'), 'Primary Key', on_error='FAIL'), Check(expect_valid_age, 'Valid age on output', on_error='FAIL') ] ), input=Input( "Users/data/input", checks=Check(expect_valid_age, 'Valid age on input', on_error='WARN') ) ) def compute(output, input): input_df = input.pandas() return output.write_table(input_df)

```python tab="PySpark"
from transforms.api import transform_df, Input, Output, Check
from transforms import expectations as E


# We assume an age is valid if it is between 0 and 200.
expect_valid_age = E.all(
    E.col('age').non_null(),
    E.col('age').gte(0),
    E.col('age').lt(200)
)


@transform_df(
    Output(
        "/Users/data/dataset",
        checks=[
            Check(E.primary_key('id'), 'Primary Key', on_error='FAIL'),
            Check(expect_valid_age, 'Valid age on output', on_error='FAIL')
        ]
    ),
    input=Input(
        "Users/data/input",
        checks=Check(expect_valid_age, 'Valid age on input', on_error='WARN')
    )
)
def my_compute_function(input):
    return input


中文翻译


入门指南

本指南将演示如何在Python转换仓库中设置数据期望(Data Expectations)。有关数据期望的概述,请参阅文档

仓库设置:transforms-expectations库

打开代码仓库左侧的库搜索面板。transforms-expectations库应已安装。请检查已安装库列表以确认。如果尚未安装,请立即搜索并安装。

转换设置

在转换文件中导入expectationsCheck

from transforms.api import transform_df, Input, Output, Check
from transforms import expectations as E

如需使用常见的模式(schema)和列(column)期望,可同时导入types

from pyspark.sql import types as T

创建检查

单个检查的基本结构:

Check(expectation, 'Check unique name', on_error='WARN/FAIL')
  • expectation - 单个期望,可以是多个子期望的组合期望(例如使用any/all运算符)
  • Check unique name - 在转换中必须唯一(输出和输入间不能共享相同名称),用于跨应用(如Data Health、Builds应用)标识该检查
  • on_error - 定义期望未满足时的作业行为:
  • FAIL(默认)- 检查失败时作业将中止
  • WARN - 作业继续执行,生成警告并由Data Health处理

:::callout{theme="warning" title="警告"} 在单节点转换中,不支持对虚拟表(Virtual Table)输出设置on_error='FAIL'。 :::

为数据集分配检查

每个检查应分配给单个输入或输出。单个检查使用checks=check1传递,多个检查使用数组:checks=[check1, check2, ...]

:::callout{title="多重检查"} 使用多个检查可以构建更清晰的期望结构,并分别控制每个有意义检查的行为。 :::

以下是对输出进行简单主键检查的示例:

```python tab="Polars" from transforms.api import transform, Input, Output, Check from transforms import expectations as E import polars as pl

@transform.using( output=Output( "/path/dataset", checks=Check(E.primary_key('id'), 'Primary Key', on_error='FAIL') ), input=Input("/path/input"), ) def compute(output, input): input_df = input.polars() return output.write_table(input_df)

```python tab="DuckDB"
from transforms.api import transform, Input, Output, Check
from transforms import expectations as E


@transform.using(
    output=Output(
        "/path/dataset",
        checks=Check(E.primary_key('id'), 'Primary Key', on_error='FAIL')
    ),
    input=Input("/path/input"),
)
def compute(ctx, output, input):
    query = ctx.duckdb().conn.sql("SELECT * FROM input")
    return output.write_table(query)

```python tab="Pandas" from transforms.api import transform, Input, Output, Check from transforms import expectations as E import pandas as pd

@transform.using( output=Output( "/path/dataset", checks=Check(E.primary_key('id'), 'Primary Key', on_error='FAIL') ), input=Input("/path/input"), ) def compute(output, input): input_df = input.pandas() return output.write_table(input_df)

```python tab="PySpark"
from transforms.api import transform_df, Input, Output, Check
from transforms import expectations as E


@transform_df(
    Output(
        "/path/dataset",
        checks=Check(E.primary_key('id'), 'Primary Key', on_error='FAIL')
    ),
    input=Input("/path/input")
)
def my_compute_function(input):
    return input

复杂检查

您还可以使用组合期望添加更复杂的检查。例如,检查age列在指定范围内不为空。请注意,我们可以定义组合期望并在转换中的多个检查中使用它,对错误应用不同的行为。

:::callout{theme="neutral"} 即使检查由组合期望构成,它也会作为一个整体被监控。如果您希望监控(即观察并接收通知)组合期望的特定部分,建议将其拆分为多个不同的检查。 :::

```python tab="Polars" from transforms.api import transform, Input, Output, Check from transforms import expectations as E import polars as pl

我们假设年龄在0到200之间为有效

expect_valid_age = E.all( E.col('age').non_null(), E.col('age').gte(0), E.col('age').lt(200) )

@transform.using( output=Output( "/path/dataset", checks=[ Check(E.primary_key('id'), 'Primary Key', on_error='FAIL'), Check(expect_valid_age, 'Valid age on output', on_error='FAIL') ] ), input=Input( "Users/data/input", checks=Check(expect_valid_age, 'Valid age on input', on_error='WARN') ) ) def compute(output, input): input_df = input.polars() return output.write_table(input_df)

```python tab="DuckDB"
from transforms.api import transform, Input, Output, Check
from transforms import expectations as E


# 我们假设年龄在0到200之间为有效
expect_valid_age = E.all(
    E.col('age').non_null(),
    E.col('age').gte(0),
    E.col('age').lt(200)
)


@transform.using(
    output=Output(
        "/path/dataset",
        checks=[
            Check(E.primary_key('id'), 'Primary Key', on_error='FAIL'),
            Check(expect_valid_age, 'Valid age on output', on_error='FAIL')
        ]
    ),
    input=Input(
        "Users/data/input",
        checks=Check(expect_valid_age, 'Valid age on input', on_error='WARN')
    )
)
def compute(ctx, output, input):
    conn = ctx.duckdb().conn
    query = conn.sql("SELECT * FROM input")
    return output.write_table(query)

```python tab="Pandas" from transforms.api import transform, Input, Output, Check from transforms import expectations as E import pandas as pd

我们假设年龄在0到200之间为有效

expect_valid_age = E.all( E.col('age').non_null(), E.col('age').gte(0), E.col('age').lt(200) )

@transform.using( output=Output( "/path/dataset", checks=[ Check(E.primary_key('id'), 'Primary Key', on_error='FAIL'), Check(expect_valid_age, 'Valid age on output', on_error='FAIL') ] ), input=Input( "Users/data/input", checks=Check(expect_valid_age, 'Valid age on input', on_error='WARN') ) ) def compute(output, input): input_df = input.pandas() return output.write_table(input_df)

```python tab="PySpark"
from transforms.api import transform_df, Input, Output, Check
from transforms import expectations as E


# 我们假设年龄在0到200之间为有效
expect_valid_age = E.all(
    E.col('age').non_null(),
    E.col('age').gte(0),
    E.col('age').lt(200)
)


@transform_df(
    Output(
        "/Users/data/dataset",
        checks=[
            Check(E.primary_key('id'), 'Primary Key', on_error='FAIL'),
            Check(expect_valid_age, 'Valid age on output', on_error='FAIL')
        ]
    ),
    input=Input(
        "Users/data/input",
        checks=Check(expect_valid_age, 'Valid age on input', on_error='WARN')
    )
)
def my_compute_function(input):
    return input