跳转至

Use media sets with Python transforms(在 Python 转换中使用媒体集(Media Sets))

You can use media sets in Python transforms for PDF text extraction, optical character recognition (OCR), image tiling, metadata parsing, and more. The following sections explain how to set up media sets in your Python repository and how to read to and write from media sets with Python transforms.

Here is a step-by-step example of how to create a media set batch pipeline with Code Repositories.

You can also learn how to write incremental transforms with media sets.

:::callout{theme="neutral"} Media transformations are currently not supported in Code Repository's Preview functionality. Any transforms utilizing media sets can be built but not previewed. :::

Import the transforms-media library into your repository

To use decorators specific to media sets, you first need to import the transforms-media library into your repository. You can do this by navigating to the Libraries file drawer on the left side of the Code Repositories interface. Search for transforms-media, then install the library.

Add a dependency on transforms-media in your code repository.

You must use the @transform decorator when working with media sets. Media set inputs and outputs can be passed in using transforms.mediasets.MediaSetInput and transforms.mediasets.MediaSetOutput specifications. During a build, these specifications are resolved into transforms.mediasets.MediaSetInputParam and transforms.mediasets.MediaSetOutputParam objects, respectively. These MediaSetInputParam and MediaSetOutputParam objects provide access to the media set within the compute function. Any number of media set inputs or outputs can be used in combination with any other valid transform inputs and outputs (such as tabular datasets). For example:

from transforms.api import transform
from transforms.mediasets import MediaSetInput, MediaSetOutput


@transform.spark.using(
    images=MediaSetInput('/examples/images'),
    output_images=MediaSetOutput('/examples/output_images')
)
def translate_images(images, output_images):
    ...

Read from media sets

You can access individual media items either by the file path or RID:

from transforms.api import transform
from transforms.mediasets import MediaSetInput, MediaSetOutput


@transform.spark.using(
    images=MediaSetInput('/examples/images'),
    output_images=MediaSetOutput('/examples/output_images')
)
def translate_images(images, output_images):
    image1 = images.get_media_item_by_path("image1")
    image2 = images.get_media_item("ri.mio.main.media-item.123")
    ...

However, you will likely want to transform all the items in your media set. To do this, you must first pull the items into a dataframe using a listing method. In the example below, we list all items in the input media set and write the resulting dataframe to a tabular output:

from transforms.api import transform, Output
from transforms.mediasets import MediaSetInput


@transform.spark.using(
    images=MediaSetInput('/examples/images'),
    listing_output=Output('/examples/listed_images')
)
def translate_images(ctx, images, listing_output):
    media_items_listing = images.list_media_items_by_path_with_media_reference(ctx)

    # You can perform regular PySpark transformations on media_items_listing

    column_typeclasses = {'mediaReference': [{'kind': 'reference', 'name': 'media_reference'}]}
    listing_output.write_dataframe(media_items_listing, column_typeclasses=column_typeclasses)

If multiple items in the media set are at a particular path, only the most recent will be included in the listing. The listing will have the following schema:

+--------------------------+-----------+-------------------+
|        mediaItemRid      |    path   |  mediaReference   |
+--------------------------+-----------+-------------------+
| ri.mio.main.media-item.1 | item1.jpg |  {{reference1}}   |
| ri.mio.main.media-item.2 | item2.jpg |  {{reference2}}   |
| ri.mio.main.media-item.3 | item3.jpg |  {{reference3}}   |
+--------------------------+-----------+-------------------+

Note that the above example only shows the top three rows of the listing.

Setting the typeclass of the mediaReference column allows the column to be read as a media reference.

Calls to get_media_item(), get_media_item_by_path(), and so on return a Python file-like stream object. All options accepted by io.open() are also supported. Note that items are read as streams, meaning that random access is not supported.

You can also return metadata about individual media items without downloading the full item. The metadata will include information such as the dimensions for images, length for audio, and more. For a full reference of available metadata, see the appendix below. The example below adds a column to the media item listing with the metadata for each image.

from transforms.api import transform, Output
from transforms.mediasets import MediaSetInput
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
from conjure_python_client import ConjureEncoder


@transform.spark.using(
    images=MediaSetInput('/examples/images'),
    listing_output_with_metadata=Output('/examples/listed_images_with_metadata')
)
def translate_images(ctx, images, listing_output_with_metadata):

    def get_metadata(media_item_rid):
        metadata = images.get_media_item_metadata(media_item_rid)
        return ConjureEncoder().default(metadata)

    metadata_udf = F.udf(get_metadata, StringType())

    media_items_listing = images.list_media_items_by_path_with_media_reference(ctx)
    listing_with_metadata = media_items_listing.withColumn('metadata', metadata_udf(F.col('mediaItemRid')))
    column_typeclasses = {'mediaReference': [{'kind': 'reference', 'name': 'media_reference'}]}
    listing_output_with_metadata.write_dataframe(listing_with_metadata, column_typeclasses=column_typeclasses)

Media sets support a certain number of built-in transformations out of the box. See the appendix below for the API and list of supported transformations. Calls to these transformations will also return a Python file-like stream object. To use these built-in transformations, call the appropriate method on the media set input. For example:

@transform.spark.using(
    images=MediaSetInput('/examples/images'),
    image_text_output=Output('/examples/listed_images_with_text')
)
def translate_images(ctx, images, image_text_output):

    def get_ocr_on_image(media_item_rid):
        return images.transform_image_to_text_ocr_output_text(media_item_rid).read().decode('utf-8')

    ocr_on_image_udf = F.udf(get_ocr_on_image, StringType())

    media_items_listing = images.list_media_items_by_path_with_media_reference(ctx)
    listing_with_ocr = media_items_listing.withColumn('text', ocr_on_image_udf(F.col('mediaItemRid')))
    column_typeclasses = {'mediaReference': [{'kind': 'reference', 'name': 'media_reference'}]}
    image_text_output.write_dataframe(listing_with_ocr, column_typeclasses=column_typeclasses)

Create a media set

In addition to creating a media set within a project or folder using the media set creation dialog, you can also create a media set directly within Code Repositories to be used as an output of your Python transform.

First, choose an output location and a name for the new media set in the MediaSetOutput specification.

Select the path you have just defined, which at this point should be underlined in red; on hover, you should see an error message indicating the media set does not exist.

From the lightbulb icon on the left side of the line, select Create media set.

Create media set output prompt

Go through the dialog steps to choose the desired media set schema and complete any other required configuration on your new media set.

Create media set dialog

After selecting Create, the MediaSetOutput specification will be populated with the details you've provided. These annotation fields define how the media set will be created.

Create media set annotations

The new media set will be created after the Python transform has been built for the first time, after which the annotation fields should not be edited.

Write to media sets

Media sets can be used as outputs to transformations by using the MediaSetOutput specification.

To upload an item, call the put_media_item() endpoint on the output media set. This endpoint accepts any file-like object and a path which will be used to identify the item in the output media set. The following is a basic example:

from transforms.api import transform
from transforms.mediasets import MediaSetInput, MediaSetOutput


@transform.spark.using(
    images=MediaSetInput('/examples/images'),
    output_images=MediaSetOutput('/examples/output_images')
)
def upload_images(images, output_images):
    with images.get_media_item_by_path("image1.jpg") as input_image:
        output_images.put_media_item(input_image, "copied_image1.jpg")

When copying items from one media set to another, you can use the fast_copy_media_item() method on the output. This is a faster and more efficient option than downloading and re-uploading the media item:

@transform.spark.using(
    images=MediaSetInput('/examples/images'),
    output_images=MediaSetOutput('/examples/output_images')
)
def upload_images(images, output_images):
    origin_media_item_rid = images.get_media_item_rid_by_path("image1.jpg").item
    output_images.fast_copy_media_item(images, origin_media_item_rid, "fast_copied_image1.jpg")

Items can be uploaded to media sets in user-defined functions (UDFs) for higher parallelism. In the example below, we transform the PDFs in the input media set into JPEGs using the built-in PDF to JPEG transformation and upload those JPEGs to a new output media set. We then write out a tabular dataset containing the media references of those uploaded JPEGs:

from transforms.api import transform, Output
from transforms.mediasets import MediaSetInput, MediaSetOutput
from pyspark.sql import functions as F
from pyspark.sql.types import StringType


@transform.spark.using(
    pdfs=MediaSetInput('/examples/PDFs'),
    output_images=MediaSetOutput('/examples/JPEGs'),
    output_references=Output('/examples/JPEG listing')
)
def upload_images(ctx, pdfs, output_images, output_references):

    def upload_jpg(media_item_rid, path):
        with pdfs.transform_document_to_jpg(media_item_rid, 0) as jpeg:
            response = output_images.put_media_item(jpeg, path)
        return response.media_item_rid

    upload_udf = F.udf(upload_jpg, StringType())

    listed_pdfs = pdfs.list_media_items_by_path(ctx)
    media_reference_template = output_images.media_reference_template()
    uploaded_jpegs = listed_pdfs\
        .withColumn('uploaded_media_item_rid', upload_udf(F.col('mediaItemRid'), F.col('path')))\
        .select('path', 'uploaded_media_item_rid')\
        .withColumn("mediaReference", F.format_string(media_reference_template,
                                                      'uploaded_media_item_rid'))

    column_typeclasses = {'mediaReference': [{'kind': 'reference', 'name': 'media_reference'}]}
    output_references.write_dataframe(uploaded_jpegs, column_typeclasses=column_typeclasses)

Media set write modes

Media sets can be written to using one of two write modes:

  • modify: Uploaded items will be added in addition to the existing items in the media set branch.
  • replace: Uploaded items will fully replace the media set branch.

The default write mode depends on the transaction policy of the media set. Transactional media sets default to replace. Transactionless media sets use the modify write mode and this cannot be changed as branches in transactionless media sets cannot be reset to empty.

The write mode of a media set output can be changed dynamically at runtime. This can be helpful in scenarios where the decision to fully replace an output is based on custom criteria in your pipeline.

To change the write mode of a media set, you can use the .set_write_mode() method on the media set output. The write mode can be changed at any point up until an item is uploaded to the output. For example:

from transforms.api import transform, Input
from transforms.mediasets import MediaSetOutput

@transform.spark.using(
    input_PNGs=Input('/examples/input_PNGs'),
    output_PNGs=MediaSetOutput('/examples/output_PNGs'),
)
def upload_pngs(input_PNGs, output_PNGs):
    if should_replace(input_PNGs):
        output_PNGs.set_write_mode("replace")
    else:
        output_PNGs.set_write_mode("modify")

    output_PNGs.put_dataset_files(input_PNGs)

Upload from a filesystem (Catalog) dataset

Using put_dataset_files

The Python media set SDK has built-in tooling to upload the files from a conventional dataset in the Palantir filesystem (known as the Catalog) into a media set. For example:

from transforms.api import transform, Input
from transforms.mediasets import MediaSetOutput


@transform.spark.using(
    pdfs_dataset=Input('/examples/PDFs'),
    pdfs_media_set=MediaSetOutput('/examples/PDFs media set')
)
def upload_to_media_set(pdfs_dataset, pdfs_media_set):
    pdfs_media_set.put_dataset_files(pdfs_dataset, ignore_items_not_matching_schema=False)

This transform will upload all items from the dataset into the media set. If any items do not match the schema of the media set (for example, if there is a JPEG in the dataset), then the build will fail. By setting ignore_items_not_matching_schema=True any such mismatches will instead be ignored.

Using put_media_item

Files can alternatively be uploaded one by one. For example:

from transforms.api import transform, Input, Output, incremental
from transforms.mediasets import MediaSetInput, MediaSetOutput
import os


@transform.spark.using(
    output_media_set=MediaSetOutput("/path/media_set_output", should_snapshot=False),
    input_dataset=Input("/path/dataset_of_raw_files"),
)
def compute(input_dataset, output_media_set):
    all_files = list(input_dataset.filesystem().ls())
    for current_file in all_files:
        with input_dataset.filesystem().open(current_file.path, 'rb') as f:
            filename = os.path.basename(current_file.path)
            output_media_set.put_media_item(f, filename)

Lightweight support

Media sets can be transformed using lightweight transforms. The API is the same as standard Python transforms, but uses single-node libraries like pandas for dataframe operations when listing media items. The following example shows a lightweight transform that lists all items in the input media set and writes the resulting dataframe to a tabular output:

from transforms.api import transform, Output, lightweight, LightweightOutput
from transforms.mediasets import MediaSetInput, LightweightMediaSetInputParam

@transform.using(
    images=MediaSetInput('/examples/images'),
    listing_output=Output('/examples/listed_images')
)
def translate_images(images: LightweightMediaSetInputParam, listing_output: LightweightOutput):
    media_items_listing = images.list_media_items_by_path_with_media_reference().pandas()

    # You can perform regular pandas transformations on media_items_listing

    listing_output.write_table(media_items_listing)

:::callout{theme="neutral"} Some operations are not supported in lightweight transforms. In particular, put_dataset_files is not supported as it specifically relies on Spark's distributed processing to upload files in parallel. :::

Extract layout-aware content from a document

When working with media sets, you can use a transform to extract content from a document, such as paragraphs, headers, and tables, along with additional metadata about the layout of this content. This extraction transform can be run on both PDF and image media sets.

Using the model to extract bounding boxes and passing to a vision model may yield better results for particularly complex or obscure documents.

:::callout{theme="neutral"} To run this extraction in your transform, the Document Information Extraction model must be available on your enrollment. You can check whether the Document Information Extraction model is available by searching for it in the Model Catalog. Contact a Palantir representative if the Document Information Extraction model is unavailable and you would like to use it. :::

The output will be an array of "block" structs, which correspond to areas of the document. Each "block" will have a type, confidence, ID, bounding box, extracted text, extracted table in HTML (if applicable), the page number, and language information.

The following is an example Python transform that extracts layout-aware content from a PDF media set:

from transforms.api import transform, Output
from transforms.mediasets import MediaSetInput, MediaSetInputParam
from pyspark.sql.functions import udf


@transform.spark.using(
    output=Output('ri.foundry.main.dataset.0-1-2-3-4'),
    media_input=MediaSetInput('/examples/input_pdfs'),
)
def compute(media_input: MediaSetInputParam, output):

    def extract_all_text(media_item_rid):
        metadata = media_input.get_media_item_metadata(media_item_rid)
        pages = metadata.document.pages
        if pages is None:
            return ""

        text = ""
        for page in range(pages):
            response = media_input.transform_media_item(media_item_rid, str(page), {
                "type": "documentToText",
                "documentToText": {
                    "operation": {
                        "type": "extractLayoutAwareContent",
                        "extractLayoutAwareContent": {
                            "parameters": {
                                "languages": ["ENG"]
                            }
                        }
                    }
                }
            })
            text += str(response.json())
        return text

    extract_text_udf = udf(extract_all_text)

    result = media_input.dataframe().withColumn("text", extract_text_udf("mediaItemRid"))

    column_typeclasses = {
        "mediaReference": [{"kind": "reference", "name": "media_reference"}]
    }

    output.write_dataframe(result, column_typeclasses=column_typeclasses)

We recommend using parallelism for optimal performance if you are running this extraction transform on many documents or on documents with many pages.

More information can be found regarding rate limits, regional availability and usage rates in the documentation.

Reference: Built-in transformations

For a complete reference of all available transformation methods, parameters, and examples, refer to the media set transforms API documentation.

Convert a PDF document to JPEG images

Converts document pages to images with the specified dimensions.

Parameters:

  • media_item_rid (Optional[str]): If specified, will run the transformation on the specified item instead of the entire media set. Defaults to None.
  • start_page (Optional[int]): The zero-indexed start page for conversion. Defaults to 0 (the first page).
  • end_page (Optional[int]): The zero-indexed end page for conversion (exclusive). Defaults to None (the last page).
  • width (Optional[int]): The width of the output images. Defaults to 1024.
  • height (Optional[int]): The height of the output images. Defaults to 1024.
  • output_format (str): The format of the output images, for example PNG or JPEG. Defaults to PNG.

Returns:

  • An instance of MediaSetInputTransform containing the document to images transformation, allowing for further transformations.

Example:

transform = pdf_input.transform().convert_document_to_images("ri.mio.main.media-item.1", 0, 5, 600, 900, output_format="jpg")
image_output.write(transform)

Transform a PDF document set into text with OCR

This transform uses traditional optical character recognition (OCR) to extract text from PDF documents. Note that this is not AI-powered OCR, which uses a vision language model to perform the extraction. Learn more about using a vision language model to extract PDF document content..

Parameters:

  • languages (list[str]): List of languages to be used for OCR. Defaults to English. All valid codes can be found in the Tesseract documentation ↗ under languages.
  • scripts (Optional[list[str]]): List of scripts to be used for OCR. Defaults to None. All valid codes can be found in the Tesseract documentation ↗ under scripts.
  • media_item_rid (Optional[str]): If specified, will run the transformation on the specified item instead of the entire media set. Defaults to None.
  • start_page (Optional[int]): The zero-indexed start page for OCR. Only applicable for PDF media sets. Defaults to 0 (the first page).
  • end_page (Optional[int]): The zero-indexed end page for OCR (exclusive). Only applicable for PDF media sets. Defaults to None (the final page).
  • return_structure (str): item_per_row or page_per_row. Only applicable to transformations on an entire PDF media set. Defaults to item_per_row.
  • suppress_errors (bool): Specifies error handling behavior. If True, errors are caught and the error message will be returned in the output. If False, any errors will not be caught and the build will fail. Defaults to True. Only applicable to transformations on the entire media set.

Returns:

  • A DataFrame, list of strings, or a single string.
  • str: Transformations on a single image.
  • list[str]: Transformations on a single PDF.
  • DataFrame: Transformations on the entire media set.
    • For PDF (item_per_row): Columns are media_item_rid, path, media_reference, extracted_text (list[str]).
    • For PDF (page_per_row) or image sets: Columns are media_item_rid, path, media_reference, page_number, extracted_text (str).

Example:

df = media_set.transform().ocr()
dataset_output.write_dataframe(df)

Transcribe audio to text

Transcribe an audio file that contains speech into text.

Parameters:

  • media_item_rid (Optional[str]): If specified, will run the transformation on the specified item instead of the entire media set. Defaults to None.
  • language (Optional[str]): The language to use for transcription. Defaults to None, in which case it will be auto-detected. Valid languages can be found in the Whisper GitHub repo ↗ under LANGUAGES.
  • performance_mode (Literal["more_economical", "more_performant"]): The performance mode to use for transcription. Defaults to more_economical.
  • output_format (Literal["text", "segments"]): The format of the output. Defaults to text.
  • add_timestamps (Optional[bool]): Control whether timestamps are added to the transcription. Defaults to False. Only applicable when output_format is text.
  • suppress_errors (bool): Specifies error handling behavior. If True, errors are caught and the error message will be returned in the output. If False, any errors will not be caught and the build will fail. Defaults to True. Only applicable to transformations on the entire media set.

Returns:

  • A DataFrame or a single string.
  • str: Applicable for transformations on a single item.
    • text: The transcribed text.
    • segments: JSON object containing the transcribed segments including timestamps, segment confidence and more details.
  • DataFrame: Columns are media_item_rid, path, media_reference, transcription (str). Applicable for transformations on the entire media set.

Example:

df = media_set.transform().transcribe("ri.mio.main.media-item.1", language="english")
dataset_output.write_dataframe(df)

中文翻译

在 Python 转换中使用媒体集(Media Sets)

您可以在 Python 转换中使用媒体集(Media Sets)进行 PDF 文本提取、光学字符识别(OCR)、图像分割、元数据解析等操作。以下章节将说明如何在 Python 仓库中设置媒体集,以及如何使用 Python 转换读写媒体集。

这里有一个逐步示例,展示如何使用代码仓库创建媒体集批处理管道

您还可以了解如何编写带有媒体集的增量转换

:::callout{theme="neutral"} 媒体转换目前不支持代码仓库的预览(Preview)功能。任何使用媒体集的转换都可以构建,但无法预览。 :::

将 transforms-media 库导入到您的仓库中

要使用特定于媒体集的装饰器,您首先需要将 transforms-media 库导入到您的仓库中。您可以通过导航到代码仓库界面左侧的库(Libraries)文件抽屉来完成此操作。搜索 transforms-media,然后安装该库。

在代码仓库中添加对 transforms-media 的依赖

在使用媒体集时,您必须使用 @transform 装饰器。媒体集输入和输出可以使用 transforms.mediasets.MediaSetInputtransforms.mediasets.MediaSetOutput 规范传入。在构建过程中,这些规范分别解析为 transforms.mediasets.MediaSetInputParamtransforms.mediasets.MediaSetOutputParam 对象。这些 MediaSetInputParamMediaSetOutputParam 对象提供了在计算函数中访问媒体集的能力。任意数量的媒体集输入或输出都可以与任何其他有效的转换输入和输出(例如表格数据集)组合使用。例如:

from transforms.api import transform
from transforms.mediasets import MediaSetInput, MediaSetOutput


@transform.spark.using(
    images=MediaSetInput('/examples/images'),
    output_images=MediaSetOutput('/examples/output_images')
)
def translate_images(images, output_images):
    ...

从媒体集中读取数据

您可以通过文件路径或 RID 访问单个媒体项:

from transforms.api import transform
from transforms.mediasets import MediaSetInput, MediaSetOutput


@transform.spark.using(
    images=MediaSetInput('/examples/images'),
    output_images=MediaSetOutput('/examples/output_images')
)
def translate_images(images, output_images):
    image1 = images.get_media_item_by_path("image1")
    image2 = images.get_media_item("ri.mio.main.media-item.123")
    ...

然而,您可能希望转换媒体集中的所有项。为此,您必须首先使用列表(listing)方法将项拉取到数据框中。在下面的示例中,我们列出输入媒体集中的所有项,并将结果数据框写入表格输出:

from transforms.api import transform, Output
from transforms.mediasets import MediaSetInput


@transform.spark.using(
    images=MediaSetInput('/examples/images'),
    listing_output=Output('/examples/listed_images')
)
def translate_images(ctx, images, listing_output):
    media_items_listing = images.list_media_items_by_path_with_media_reference(ctx)

    # 您可以对 media_items_listing 执行常规的 PySpark 转换

    column_typeclasses = {'mediaReference': [{'kind': 'reference', 'name': 'media_reference'}]}
    listing_output.write_dataframe(media_items_listing, column_typeclasses=column_typeclasses)

如果媒体集中的多个项位于同一路径,则列表中只会包含最新的项。列表将具有以下模式:

+--------------------------+-----------+-------------------+
|        mediaItemRid      |    path   |  mediaReference   |
+--------------------------+-----------+-------------------+
| ri.mio.main.media-item.1 | item1.jpg |  {{reference1}}   |
| ri.mio.main.media-item.2 | item2.jpg |  {{reference2}}   |
| ri.mio.main.media-item.3 | item3.jpg |  {{reference3}}   |
+--------------------------+-----------+-------------------+

请注意,上述示例仅显示了列表的前三行。

设置 mediaReference 列的类型类(typeclass)允许该列被读取为媒体引用(Media Reference)

get_media_item()get_media_item_by_path() 等的调用返回一个类似 Python 文件流的对象。所有 io.open() 接受的选项也都受支持。请注意,项是以流的形式读取的,这意味着不支持随机访问。

您还可以返回单个媒体项的元数据,而无需下载整个项。元数据将包括图像尺寸、音频长度等信息。有关可用元数据的完整参考,请参见下面的附录。下面的示例向媒体项列表中添加了一个包含每个图像元数据的列。

from transforms.api import transform, Output
from transforms.mediasets import MediaSetInput
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
from conjure_python_client import ConjureEncoder


@transform.spark.using(
    images=MediaSetInput('/examples/images'),
    listing_output_with_metadata=Output('/examples/listed_images_with_metadata')
)
def translate_images(ctx, images, listing_output_with_metadata):

    def get_metadata(media_item_rid):
        metadata = images.get_media_item_metadata(media_item_rid)
        return ConjureEncoder().default(metadata)

    metadata_udf = F.udf(get_metadata, StringType())

    media_items_listing = images.list_media_items_by_path_with_media_reference(ctx)
    listing_with_metadata = media_items_listing.withColumn('metadata', metadata_udf(F.col('mediaItemRid')))
    column_typeclasses = {'mediaReference': [{'kind': 'reference', 'name': 'media_reference'}]}
    listing_output_with_metadata.write_dataframe(listing_with_metadata, column_typeclasses=column_typeclasses)

媒体集支持一定数量的内置转换。请参见下面的附录以了解 API 和受支持的转换列表。对这些转换的调用也将返回一个类似 Python 文件流的对象。要使用这些内置转换,请在媒体集输入上调用相应的方法。例如:

@transform.spark.using(
    images=MediaSetInput('/examples/images'),
    image_text_output=Output('/examples/listed_images_with_text')
)
def translate_images(ctx, images, image_text_output):

    def get_ocr_on_image(media_item_rid):
        return images.transform_image_to_text_ocr_output_text(media_item_rid).read().decode('utf-8')

    ocr_on_image_udf = F.udf(get_ocr_on_image, StringType())

    media_items_listing = images.list_media_items_by_path_with_media_reference(ctx)
    listing_with_ocr = media_items_listing.withColumn('text', ocr_on_image_udf(F.col('mediaItemRid')))
    column_typeclasses = {'mediaReference': [{'kind': 'reference', 'name': 'media_reference'}]}
    image_text_output.write_dataframe(listing_with_ocr, column_typeclasses=column_typeclasses)

创建媒体集

除了使用媒体集创建对话框在项目或文件夹中创建媒体集外,您还可以直接在代码仓库中创建媒体集,用作 Python 转换的输出。

首先,在 MediaSetOutput 规范中选择新媒体集的输出位置和名称。

选择刚刚定义的路径,此时该路径应显示为红色下划线;悬停时,您应该会看到一条错误消息,指示该媒体集不存在。

从该行左侧的灯泡图标中,选择创建媒体集(Create media set)

创建媒体集输出提示

按照对话框步骤选择所需的媒体集模式(Media Set Schema),并完成新媒体集的任何其他必要配置。

创建媒体集对话框

选择创建(Create)后,MediaSetOutput 规范将填充您提供的详细信息。这些注释字段定义了媒体集的创建方式。

创建媒体集注释

新媒体集将在 Python 转换首次构建后创建,之后不应编辑注释字段。

写入媒体集

通过使用 MediaSetOutput 规范,媒体集可以用作转换的输出。

要上传项,请调用输出媒体集上的 put_media_item() 端点。此端点接受任何类似文件的对象和一个路径,该路径将用于标识输出媒体集中的项。以下是一个基本示例:

from transforms.api import transform
from transforms.mediasets import MediaSetInput, MediaSetOutput


@transform.spark.using(
    images=MediaSetInput('/examples/images'),
    output_images=MediaSetOutput('/examples/output_images')
)
def upload_images(images, output_images):
    with images.get_media_item_by_path("image1.jpg") as input_image:
        output_images.put_media_item(input_image, "copied_image1.jpg")

在将项从一个媒体集复制到另一个媒体集时,您可以在输出上使用 fast_copy_media_item() 方法。这比下载并重新上传媒体项更快、更高效:

@transform.spark.using(
    images=MediaSetInput('/examples/images'),
    output_images=MediaSetOutput('/examples/output_images')
)
def upload_images(images, output_images):
    origin_media_item_rid = images.get_media_item_rid_by_path("image1.jpg").item
    output_images.fast_copy_media_item(images, origin_media_item_rid, "fast_copied_image1.jpg")

项可以在用户自定义函数(UDF)中上传到媒体集,以实现更高的并行度。在下面的示例中,我们使用内置的 PDF 到 JPEG 转换将输入媒体集中的 PDF 转换为 JPEG,并将这些 JPEG 上传到新的输出媒体集。然后,我们写出一个包含这些上传 JPEG 的媒体引用的表格数据集:

from transforms.api import transform, Output
from transforms.mediasets import MediaSetInput, MediaSetOutput
from pyspark.sql import functions as F
from pyspark.sql.types import StringType


@transform.spark.using(
    pdfs=MediaSetInput('/examples/PDFs'),
    output_images=MediaSetOutput('/examples/JPEGs'),
    output_references=Output('/examples/JPEG listing')
)
def upload_images(ctx, pdfs, output_images, output_references):

    def upload_jpg(media_item_rid, path):
        with pdfs.transform_document_to_jpg(media_item_rid, 0) as jpeg:
            response = output_images.put_media_item(jpeg, path)
        return response.media_item_rid

    upload_udf = F.udf(upload_jpg, StringType())

    listed_pdfs = pdfs.list_media_items_by_path(ctx)
    media_reference_template = output_images.media_reference_template()
    uploaded_jpegs = listed_pdfs\
        .withColumn('uploaded_media_item_rid', upload_udf(F.col('mediaItemRid'), F.col('path')))\
        .select('path', 'uploaded_media_item_rid')\
        .withColumn("mediaReference", F.format_string(media_reference_template,
                                                      'uploaded_media_item_rid'))

    column_typeclasses = {'mediaReference': [{'kind': 'reference', 'name': 'media_reference'}]}
    output_references.write_dataframe(uploaded_jpegs, column_typeclasses=column_typeclasses)

媒体集写入模式

可以使用以下两种写入模式之一写入媒体集:

  • modify:上传的项将添加到媒体集分支中的现有项之外。
  • replace:上传的项将完全替换媒体集分支。

默认写入模式取决于媒体集的事务策略(Transaction Policy)。事务性媒体集默认为 replace。无事务媒体集使用 modify 写入模式,并且无法更改,因为无事务媒体集中的分支无法重置为空。

媒体集输出的写入模式可以在运行时动态更改。这在根据管道中的自定义条件决定是否完全替换输出时非常有用。

要更改媒体集的写入模式,您可以在媒体集输出上使用 .set_write_mode() 方法。写入模式可以在项上传到输出之前的任何时间点更改。例如:

from transforms.api import transform, Input
from transforms.mediasets import MediaSetOutput

@transform.spark.using(
    input_PNGs=Input('/examples/input_PNGs'),
    output_PNGs=MediaSetOutput('/examples/output_PNGs'),
)
def upload_pngs(input_PNGs, output_PNGs):
    if should_replace(input_PNGs):
        output_PNGs.set_write_mode("replace")
    else:
        output_PNGs.set_write_mode("modify")

    output_PNGs.put_dataset_files(input_PNGs)

从文件系统(Catalog)数据集上传

使用 put_dataset_files

Python 媒体集 SDK 具有内置工具,可以将 Palantir 文件系统(称为 Catalog)中常规数据集的文件上传到媒体集。例如:

from transforms.api import transform, Input
from transforms.mediasets import MediaSetOutput


@transform.spark.using(
    pdfs_dataset=Input('/examples/PDFs'),
    pdfs_media_set=MediaSetOutput('/examples/PDFs media set')
)
def upload_to_media_set(pdfs_dataset, pdfs_media_set):
    pdfs_media_set.put_dataset_files(pdfs_dataset, ignore_items_not_matching_schema=False)

此转换将数据集中的所有项上传到媒体集。如果有任何项与媒体集的模式不匹配(例如,数据集中有 JPEG),则构建将失败。通过设置 ignore_items_not_matching_schema=True,将忽略此类不匹配项。

使用 put_media_item

文件也可以逐个上传。例如:

from transforms.api import transform, Input, Output, incremental
from transforms.mediasets import MediaSetInput, MediaSetOutput
import os


@transform.spark.using(
    output_media_set=MediaSetOutput("/path/media_set_output", should_snapshot=False),
    input_dataset=Input("/path/dataset_of_raw_files"),
)
def compute(input_dataset, output_media_set):
    all_files = list(input_dataset.filesystem().ls())
    for current_file in all_files:
        with input_dataset.filesystem().open(current_file.path, 'rb') as f:
            filename = os.path.basename(current_file.path)
            output_media_set.put_media_item(f, filename)

轻量级支持

媒体集可以使用轻量级转换(Lightweight Transforms)进行转换。API 与标准 Python 转换相同,但在列出媒体项时使用单节点库(如 pandas)进行数据框操作。以下示例显示了一个轻量级转换,它列出输入媒体集中的所有项,并将结果数据框写入表格输出:

from transforms.api import transform, Output, lightweight, LightweightOutput
from transforms.mediasets import MediaSetInput, LightweightMediaSetInputParam

@transform.using(
    images=MediaSetInput('/examples/images'),
    listing_output=Output('/examples/listed_images')
)
def translate_images(images: LightweightMediaSetInputParam, listing_output: LightweightOutput):
    media_items_listing = images.list_media_items_by_path_with_media_reference().pandas()

    # 您可以对 media_items_listing 执行常规的 pandas 转换

    listing_output.write_table(media_items_listing)

:::callout{theme="neutral"} 某些操作在轻量级转换中不受支持。特别是,put_dataset_files 不受支持,因为它特别依赖于 Spark 的分布式处理来并行上传文件。 :::

从文档中提取布局感知内容

在使用媒体集时,您可以使用转换从文档中提取内容,例如段落、标题和表格,以及有关此内容布局的额外元数据。此提取转换可以在 PDF 和图像媒体集上运行。

使用模型提取边界框并将其传递给视觉模型,对于特别复杂或模糊的文档可能会产生更好的结果。

:::callout{theme="neutral"} 要在转换中运行此提取,您的注册环境中必须提供文档信息提取模型。您可以通过在模型目录中搜索来检查文档信息提取模型是否可用。如果文档信息提取模型不可用且您希望使用它,请联系 Palantir 代表。 :::

输出将是一个"块(block)"结构体数组,对应于文档的区域。每个"块"将具有类型、置信度、ID、边界框、提取的文本、提取的 HTML 格式表格(如果适用)、页码和语言信息。

以下是一个从 PDF 媒体集中提取布局感知内容的 Python 转换示例:

from transforms.api import transform, Output
from transforms.mediasets import MediaSetInput, MediaSetInputParam
from pyspark.sql.functions import udf


@transform.spark.using(
    output=Output('ri.foundry.main.dataset.0-1-2-3-4'),
    media_input=MediaSetInput('/examples/input_pdfs'),
)
def compute(media_input: MediaSetInputParam, output):

    def extract_all_text(media_item_rid):
        metadata = media_input.get_media_item_metadata(media_item_rid)
        pages = metadata.document.pages
        if pages is None:
            return ""

        text = ""
        for page in range(pages):
            response = media_input.transform_media_item(media_item_rid, str(page), {
                "type": "documentToText",
                "documentToText": {
                    "operation": {
                        "type": "extractLayoutAwareContent",
                        "extractLayoutAwareContent": {
                            "parameters": {
                                "languages": ["ENG"]
                            }
                        }
                    }
                }
            })
            text += str(response.json())
        return text

    extract_text_udf = udf(extract_all_text)

    result = media_input.dataframe().withColumn("text", extract_text_udf("mediaItemRid"))

    column_typeclasses = {
        "mediaReference": [{"kind": "reference", "name": "media_reference"}]
    }

    output.write_dataframe(result, column_typeclasses=column_typeclasses)

如果您在多个文档或具有多页的文档上运行此提取转换,我们建议使用并行处理以获得最佳性能。

有关速率限制(Rate Limits)区域可用性(Regional Availability)使用费率(Usage Rates)的更多信息,请参阅文档。

参考:内置转换

有关所有可用转换方法、参数和示例的完整参考,请参阅媒体集转换 API 文档

将 PDF 文档转换为 JPEG 图像

将文档页面转换为具有指定尺寸的图像。

参数:

  • media_item_rid (Optional[str]): 如果指定,将对指定项而不是整个媒体集运行转换。默认为 None
  • start_page (Optional[int]): 转换的起始页(从零开始索引)。默认为 0(第一页)。
  • end_page (Optional[int]): 转换的结束页(从零开始索引,不包含)。默认为 None(最后一页)。
  • width (Optional[int]): 输出图像的宽度。默认为 1024。
  • height (Optional[int]): 输出图像的高度。默认为 1024。
  • output_format (str): 输出图像的格式,例如 PNG 或 JPEG。默认为 PNG。

返回:

  • 一个 MediaSetInputTransform 实例,包含文档到图像的转换,允许进一步转换。

示例:

transform = pdf_input.transform().convert_document_to_images("ri.mio.main.media-item.1", 0, 5, 600, 900, output_format="jpg")
image_output.write(transform)

使用 OCR 将 PDF 文档集转换为文本

此转换使用传统的光学字符识别(OCR)从 PDF 文档中提取文本。请注意,这不是 AI 驱动的 OCR(后者使用视觉语言模型执行提取)。了解有关使用视觉语言模型提取 PDF 文档内容的更多信息。

参数:

  • languages (list[str]): 用于 OCR 的语言列表。默认为英语。所有有效代码可以在 Tesseract 文档 ↗ 的语言(languages)部分找到。
  • scripts (Optional[list[str]]): 用于 OCR 的脚本列表。默认为 None。所有有效代码可以在 Tesseract 文档 ↗ 的脚本(scripts)部分找到。
  • media_item_rid (Optional[str]): 如果指定,将对指定项而不是整个媒体集运行转换。默认为 None
  • start_page (Optional[int]): OCR 的起始页(从零开始索引)。仅适用于 PDF 媒体集。默认为 0(第一页)。
  • end_page (Optional[int]): OCR 的结束页(从零开始索引,不包含)。仅适用于 PDF 媒体集。默认为 None(最后一页)。
  • return_structure (str): item_per_rowpage_per_row。仅适用于对整个 PDF 媒体集的转换。默认为 item_per_row
  • suppress_errors (bool): 指定错误处理行为。如果为 True,则捕获错误,错误消息将返回到输出中。如果为 False,则不会捕获任何错误,构建将失败。默认为 True。仅适用于对整个媒体集的转换。

返回:

  • 一个 DataFrame、字符串列表或单个字符串。
  • str: 对单个图像的转换。
  • list[str]: 对单个 PDF 的转换。
  • DataFrame: 对整个媒体集的转换。
    • 对于 PDF (item_per_row):列为 media_item_ridpathmedia_referenceextracted_text (list[str])。
    • 对于 PDF (page_per_row) 或图像集:列为 media_item_ridpathmedia_referencepage_numberextracted_text (str)。

示例:

df = media_set.transform().ocr()
dataset_output.write_dataframe(df)

将音频转录为文本

将包含语音的音频文件转录为文本。

参数:

  • media_item_rid (Optional[str]): 如果指定,将对指定项而不是整个媒体集运行转换。默认为 None
  • language (Optional[str]): 用于转录的语言。默认为 None,此时将自动检测。有效语言可以在 Whisper GitHub 仓库 ↗LANGUAGES 下找到。
  • performance_mode (Literal["more_economical", "more_performant"]): 用于转录的性能模式。默认为 more_economical
  • output_format (Literal["text", "segments"]): 输出的格式。默认为 text
  • add_timestamps (Optional[bool]): 控制是否向转录添加时间戳。默认为 False。仅当 output_format 为 text 时适用。
  • suppress_errors (bool): 指定错误处理行为。如果为 True,则捕获错误,错误消息将返回到输出中。如果为 False,则不会捕获任何错误,构建将失败。默认为 True。仅适用于对整个媒体集的转换。

返回:

  • 一个 DataFrame 或单个字符串。
  • str: 适用于对单个项的转换。
    • text: 转录的文本。
    • segments: 包含转录片段的 JSON 对象,包括时间戳、片段置信度等更多详细信息。
  • DataFrame: 列为 media_item_ridpathmedia_referencetranscription (str)。适用于对整个媒体集的转换。

示例:

df = media_set.transform().transcribe("ri.mio.main.media-item.1", language="english")
dataset_output.write_dataframe(df)