跳转至

Write incremental transforms with media sets(使用媒体集编写增量转换(Write incremental transforms with media sets))

Media sets can be read from and written to incrementally. For an overview of incremental transforms and when to use them, see the incremental overview and incremental reference.

To make your media transforms incremental, use the incremental decorator and set v2_semantics=True. If v2_semantics is not set, then media sets cannot be used incrementally.

from transforms.api import transform, incremental
from transforms.mediasets import MediaSetInput, MediaSetOutput

@incremental(v2_semantics=True)
@transform(
    input_PNGs=MediaSetInput('/examples/input_PNGs'),
    output_PNGs=MediaSetOutput('/examples/output_PNGs'),
)
def upload_pngs(input_PNGs, output_PNGs):

    # Returns a dataframe that only includes the media items added since the last build
    listed_pngs = input_PNGs.dataframe()

    def fast_copy_media_item(row):
        output_PNGs.fast_copy_media_item(input_PNGs, row.media_item_rid, row.path)

    # Fast copies all of the items in `listed_pngs` into the output media set
    # These items will be appended to the output if this transform is running incrementally, or they will replace the
    # output if the transform is not running incrementally
    listed_pngs.foreach(fast_copy_media_item)

In the example above, the transform will write to output_PNGs using the modify write mode. Only the media items that have been added to the input media set since the last build will be processed. If the transform cannot run incrementally, the output will be written with the replace write mode and the entire input will be read. See below for requirements.

When v2_semantics is set to True, incremental media sets can be used in combination with any number of other incremental inputs and outputs. This includes datasets and virtual tables.

Requirements for incremental computation

Every incremental input and output contributes to determining whether a transform can run incrementally. Refer to the incremental transforms reference for more information on when a dataset will prevent a transform from running incrementally.

A media set output can prevent a transform from running incrementally when:

  • It was most recently built in a different transform than the other outputs in a multi-output build.
  • It is a transactional media set and was modified since the most recent build. This includes user uploads and deletions.

A media set input can prevent a transform from running incrementally when:

  • The contents of the media set were replaced. For example, if it was written to using the replace write mode.

If the media set input is included as a snapshot_input, then it will not prevent the build from running incrementally, even if its contents are replaced. See the documentation on snapshot inputs.

Unlike datasets, path overwrites and media item deletions will not prevent a transform from running incrementally.

Incremental read modes

In an incremental transform, media set inputs can be listed using one of three modes:

  • added: Only the items added to the branch since the last build will be included.
  • previous: Only the items in the branch that existed when the last build ran will be included.
  • current: All items in the media set branch will be included.

The union of added and previous is always equal to current.

:::callout{theme="warning"} If the transform is not running incrementally, for example, if the contents of the input were replaced since the last build, then a listing using the previous mode will be empty. The listing will not include the items that were present in the previous build. :::

The default read mode is added when running incrementally, and current when not. However, the read mode can be specified using the mode parameter in any listing method:

from transforms.api import transform, incremental
from transforms.mediasets import MediaSetInput, MediaSetOutput

@incremental(v2_semantics=True)
@transform(
    input_PNGs=MediaSetInput('/examples/input_PNGs'),
    output_PNGs=MediaSetOutput('/examples/output_PNGs'),
)
def upload_pngs(input_PNGs, output_PNGs):
    # Will use `added` if running incrementally, or `current` if not
    listed_pngs = input_PNGs.dataframe(deduplicate_by_path=False)

    # Will always read in `previous` mode
    previous_listed_pngs = input_PNGs.dataframe(deduplicate_by_path=False, mode="previous")

If a path is overwritten and the listing deduplicates by path, only the most recent item will be included. If you want to process all input items at a given path, then you must always specify deduplicate_by_path=False.

Incremental write modes

When writing to an incremental media set output, the write mode can be set at runtime. This is useful if the transform contains custom logic that determines whether to run the build incrementally. In the example below, the build will not run incrementally if any paths were overwritten since the previous build:

from transforms.api import transform, incremental
from transforms.mediasets import MediaSetInput, MediaSetOutput

@incremental(v2_semantics=True)
@transform(
    input_PNGs=MediaSetInput('/examples/input_PNGs'),
    output_PNGs=MediaSetOutput('/examples/output_PNGs'),
)
def upload_pngs(input_PNGs, output_PNGs):
    previous_dataframe = input_PNGs.dataframe(deduplicate_by_path=False, mode="previous")
    added_dataframe = input_PNGs.dataframe(deduplicate_by_path=False, mode="added")

    # Calculates if any paths have been overwritten in the `input_PNGs` media set since
    # the most recent run of this transform
    paths_overwritten = previous_dataframe.join(added_dataframe, mode="inner", on="path").count() > 0

    if paths_overwritten:
        # The full input media set will be read and the output media set will be replaced
        # with the items written in this transform
        read_mode = "current"
        output_PNGs.set_write_mode("replace")
    else:
        # Only the newly added items in the input media set will be read and the items written in this transform will
        # be appended to the output media set
        read_mode = "added"
        output_PNGs.set_write_mode("modify")

Incremental transforms and branches

Media sets do not support incremental fallback branches. When running an incremental transform on a new branch, the incremental decorator will recommend a snapshot, as the output is currently empty. Therefore, running the same build on the main branch will not necessarily result in a snapshot.

Incremental transforms and transactionless media sets

Transactionless media sets use the modify write mode and cannot use the replace write mode. This means that a transactionless media set cannot be a snapshot. If a transactionless media set is an output of an incremental transform, but the transform can't run incrementally, the build will fail. In this case, you should investigate why the build cannot run incrementally.

Abort incremental transforms

:::callout{theme="warning"} It can be risky to abort outputs during an incremental build. For more information, see the documentation on aborted transactions. :::

Individual media set outputs cannot be aborted during a build. Instead, we recommend using the .abort_job() method on the TransformContext to abort the entire job rather than aborting individual outputs. This will allow subsequent runs to be incremental.

Limit batch size of incremental inputs

Typically for incremental transforms, all unprocessed media items of each input media set are processed in the same job. However, in some situations, the number of media items processed by a job can vary significantly:

  • An incremental transform is built in SNAPSHOT mode and the entire input is read from the beginning. For example, the semantic version of the transform was increased.
  • An input media set has accumulated many media items, so a downstream incremental transform now has to process many media items in a single job.

You can configure a batch limit on each incremental input media set of a transform to constrain the quantity of media items read in each job. This concept is analogous to a transaction limit of an input dataset.

A batch limit is an upper bound on the number of media items processed per job. This limit is imposed before any media item path deduplication. For example, if the batch limit is 100 and the transform deduplicates by media item path, it may appear that fewer than 100 media items have been processed.

The example below configures an incremental transform to use the following:

  • Two incremental media set inputs, each with a different batch limit.
  • An incremental media set input that does not use a batch limit.
from transforms.api import incremental, transform
from transforms.mediasets import MediaSetInput, MediaSetOutput

@incremental(
    v2_semantics=True,
    strict_append=True,
)
@transform(
    input_media_set_1=MediaSetInput("/examples/media-set-input_1", batch_limit=100),
    input_media_set_2=MediaSetInput("/examples/media-set-input_2", batch_limit=500),
    input_media_set_3=MediaSetInput("/examples/media-set-input_3"),
    output_media_set=MediaSetOutput("/examples/media-set-output"),
)
def compute(ctx, input_media_set_1, input_media_set_2, input_media_set_3, output_media_set):
    ...

The expected usage is to create a build schedule to keep the output up-to-date.


中文翻译

使用媒体集编写增量转换(Write incremental transforms with media sets)

媒体集(Media sets)支持增量读取和写入。有关增量转换及其使用场景的概述,请参阅增量概述增量参考

要使媒体集转换支持增量操作,请使用增量装饰器(incremental decorator)并设置v2_semantics=True。如果未设置v2_semantics,则媒体集无法以增量方式使用。

from transforms.api import transform, incremental
from transforms.mediasets import MediaSetInput, MediaSetOutput

@incremental(v2_semantics=True)
@transform(
    input_PNGs=MediaSetInput('/examples/input_PNGs'),
    output_PNGs=MediaSetOutput('/examples/output_PNGs'),
)
def upload_pngs(input_PNGs, output_PNGs):

    # 返回仅包含自上次构建以来新增媒体项的数据框
    listed_pngs = input_PNGs.dataframe()

    def fast_copy_media_item(row):
        output_PNGs.fast_copy_media_item(input_PNGs, row.media_item_rid, row.path)

    # 将`listed_pngs`中的所有项快速复制到输出媒体集中
    # 如果此转换以增量方式运行,这些项将追加到输出中;
    # 如果非增量运行,则将替换输出内容
    listed_pngs.foreach(fast_copy_media_item)

在上述示例中,转换将使用modify写入模式写入output_PNGs。仅处理自上次构建以来添加到输入媒体集中的媒体项。如果转换无法以增量方式运行,则输出将使用replace写入模式,并读取整个输入内容。具体要求如下。

v2_semantics设置为True时,增量媒体集可以与任意数量的其他增量输入和输出组合使用,包括数据集(Datasets)和虚拟表(Virtual tables)。

增量计算的要求(Requirements for incremental computation)

每个增量输入和输出都会影响转换是否能够以增量方式运行。有关数据集何时会阻止转换以增量方式运行的更多信息,请参阅增量转换参考

媒体集输出(Media set output)在以下情况下可能阻止转换以增量方式运行:

  • 在多输出构建中,该输出最近一次构建是在与其他输出不同的转换中完成的。
  • 该输出是事务性媒体集(transactional media set),且自最近一次构建以来被修改过,包括用户上传和删除操作。

媒体集输入(Media set input)在以下情况下可能阻止转换以增量方式运行:

  • 媒体集的内容被替换,例如使用replace写入模式写入时。

如果媒体集输入作为snapshot_input包含在内,则即使其内容被替换,也不会阻止构建以增量方式运行。请参阅快照输入文档

与数据集不同,路径覆盖(path overwrites)和媒体项删除(media item deletions)不会阻止转换以增量方式运行。

增量读取模式(Incremental read modes)

在增量转换中,媒体集输入可以使用以下三种模式之一进行列出:

  • added 仅包含自上次构建以来添加到分支中的项。
  • previous 仅包含上次构建运行时分支中已存在的项。
  • current 包含媒体集分支中的所有项。

addedprevious的并集始终等于current

:::callout{theme="warning"} 如果转换未以增量方式运行(例如,自上次构建以来输入内容被替换),则使用previous模式列出的结果将为空。该列表不会包含上次构建中存在的项。 :::

默认读取模式为:增量运行时使用added,非增量运行时使用current。但可以在任何列出方法中使用mode参数指定读取模式:

from transforms.api import transform, incremental
from transforms.mediasets import MediaSetInput, MediaSetOutput

@incremental(v2_semantics=True)
@transform(
    input_PNGs=MediaSetInput('/examples/input_PNGs'),
    output_PNGs=MediaSetOutput('/examples/output_PNGs'),
)
def upload_pngs(input_PNGs, output_PNGs):
    # 增量运行时使用`added`,非增量运行时使用`current`
    listed_pngs = input_PNGs.dataframe(deduplicate_by_path=False)

    # 始终使用`previous`模式读取
    previous_listed_pngs = input_PNGs.dataframe(deduplicate_by_path=False, mode="previous")

如果路径被覆盖且列表按路径去重,则仅包含最新的项。如果希望处理给定路径下的所有输入项,则必须始终指定deduplicate_by_path=False

增量写入模式(Incremental write modes)

写入增量媒体集输出时,可以在运行时设置写入模式。这在转换包含自定义逻辑以确定是否以增量方式运行构建时非常有用。以下示例中,如果自上次构建以来有任何路径被覆盖,则构建不会以增量方式运行:

from transforms.api import transform, incremental
from transforms.mediasets import MediaSetInput, MediaSetOutput

@incremental(v2_semantics=True)
@transform(
    input_PNGs=MediaSetInput('/examples/input_PNGs'),
    output_PNGs=MediaSetOutput('/examples/output_PNGs'),
)
def upload_pngs(input_PNGs, output_PNGs):
    previous_dataframe = input_PNGs.dataframe(deduplicate_by_path=False, mode="previous")
    added_dataframe = input_PNGs.dataframe(deduplicate_by_path=False, mode="added")

    # 计算自最近一次运行此转换以来,`input_PNGs`媒体集中是否有路径被覆盖
    paths_overwritten = previous_dataframe.join(added_dataframe, mode="inner", on="path").count() > 0

    if paths_overwritten:
        # 将读取完整的输入媒体集,并用此转换写入的项替换输出媒体集
        read_mode = "current"
        output_PNGs.set_write_mode("replace")
    else:
        # 仅读取输入媒体集中新增的项,并将此转换写入的项追加到输出媒体集中
        read_mode = "added"
        output_PNGs.set_write_mode("modify")

增量转换与分支(Incremental transforms and branches)

媒体集不支持增量回退分支(incremental fallback branches)。在新分支上运行增量转换时,由于输出当前为空,增量装饰器将建议执行快照(snapshot)。因此,在主分支上运行相同的构建不一定会产生快照。

增量转换与无事务媒体集(Incremental transforms and transactionless media sets)

无事务媒体集(Transactionless media sets)使用modify写入模式,不能使用replace写入模式。这意味着无事务媒体集不能作为快照。如果无事务媒体集是增量转换的输出,但转换无法以增量方式运行,则构建将失败。在这种情况下,应调查构建无法以增量方式运行的原因。

中止增量转换(Abort incremental transforms)

:::callout{theme="warning"} 在增量构建中中止输出可能存在风险。更多信息请参阅中止事务文档。 :::

在构建过程中无法单独中止某个媒体集输出。建议使用TransformContext上的.abort_job()方法来中止整个作业,而不是中止单个输出。这样后续运行可以保持增量模式。

限制增量输入的批处理大小(Limit batch size of incremental inputs)

通常,对于增量转换,每个输入媒体集的所有未处理媒体项会在同一个作业中处理。但在某些情况下,作业处理的媒体项数量可能会有显著变化:

  • 增量转换以SNAPSHOT模式构建,从头开始读取整个输入。例如,转换的语义版本被提升。
  • 输入媒体集累积了大量媒体项,导致下游增量转换需要在单个作业中处理大量媒体项。

您可以在转换的每个增量输入媒体集上配置批处理限制(batch limit),以限制每个作业读取的媒体项数量。此概念类似于输入数据集的事务限制(transaction limit)

批处理限制是每个作业处理的媒体项数量的上限。此限制在媒体项路径去重之前施加。例如,如果批处理限制为100,且转换按媒体项路径去重,则实际处理的媒体项可能看起来少于100个。

以下示例配置了一个增量转换,使用以下设置:

  • 两个增量媒体集输入,每个具有不同的批处理限制。
  • 一个不使用批处理限制的增量媒体集输入。
from transforms.api import incremental, transform
from transforms.mediasets import MediaSetInput, MediaSetOutput

@incremental(
    v2_semantics=True,
    strict_append=True,
)
@transform(
    input_media_set_1=MediaSetInput("/examples/media-set-input_1", batch_limit=100),
    input_media_set_2=MediaSetInput("/examples/media-set-input_2", batch_limit=500),
    input_media_set_3=MediaSetInput("/examples/media-set-input_3"),
    output_media_set=MediaSetOutput("/examples/media-set-output"),
)
def compute(ctx, input_media_set_1, input_media_set_2, input_media_set_3, output_media_set):
    ...

预期用法是创建构建计划以保持输出更新。