Deploy extraction strategies to Python transforms(将提取策略部署到 Python 转换(Transform))¶
After validating your extraction strategy in AIP Document Intelligence, you can deploy it as a Python transform to run batch extraction across all pages of all documents in a media set. The deployed template produces the same results as the corresponding configuration in AIP Document Intelligence.
Using the deployed template¶
Once the template is created in Code Repositories:
- Specify your output dataset in the
@transform.usingdecorator in thesrc/myproject/document_extraction/my_extraction.pyfile. - Trigger the build.
The template uses lightweight transforms for optimal performance. Legacy versions used Spark-based transforms, which are much slower due to Spark overhead. We recommend migrating to lightweight transforms if you have not already.
:::callout{theme="warning"} Preview mode is not yet supported in this template. Errors are expected when using preview, but the actual build will work correctly. :::
Incremental processing¶
By default, the document extraction transform is non-incremental, meaning that all documents are processed on every run. You can configure the transform to run incrementally by uncommenting the @incremental(...) decorator line. For an incremental transform, when new documents are added to the input media set, re-running the transform will only process the new documents and append results to the output dataset.
Customizing the prompt¶
For generative AI configurations, the template inherits the prompt you specified in AIP Document Intelligence. You can view the prompts in src/myproject/document_extraction/prompts.py.
We do not recommend editing prompts directly in the template, as this causes discrepancies between Document Intelligence results and batch job results. Instead, make prompt adjustments in Document Intelligence, verify the results there, then redeploy to create a new template.
| Transform input type | Customizable prompt |
|---|---|
VisionLLMDocumentsExtractorInput |
User prompt only (system prompt is fixed) |
VisionLLMLayoutDocumentsExtractorInput (layout-aware extraction) |
System prompt only (user prompt is fixed) |
For layout-aware extraction configurations, the user prompt must remain fixed because it contains a special JSON schema that preserves layout structure information. Modifying this prompt will significantly reduce extraction success rates.
Custom image preprocessing¶
For documents that require image transformations before extraction, such as rotated text content, you should:
- Create a separate transform pipeline to apply the image transformations.
- Save the processed results to a new media set.
- Use Document Intelligence on the processed media set for extraction.
Run on a subset of media items¶
For layout-aware generative AI configurations, the vision LLM must generate valid JSON adhering to a specific schema. If the response is invalid JSON or does not follow the schema, the extraction fails with an ERROR_RESPONSE_JSON_PARSING error.
In practice, approximately 5% of extractions may fail with top-tier models. For failed rows, you still get valid layoutInfo with extraction results from the layout model only.
To re-run extraction on failed rows:
- Use the
filter_on_media_itemsargument with a list of media item IDs to process only specific items. - Remove the
@incrementaldecorator so these rows are re-processed instead of being detected as finished.
Improve runtime performance¶
The THREAD_NUMBER parameter controls concurrent threads, where each thread extracts data from one document page at a time. Higher values result in faster job completion.
| Setting | Value | Notes |
|---|---|---|
| Default | 20 | Conservative setting suitable for most environments |
| Maximum tested | 300 | Achievable in development environments with abundant Vision LLM capacity |
:::callout{theme="warning"}
Setting the THREAD_NUMBER value too high in capacity-restricted environments will cause rate limit errors. The retry loop then consumes significant capacity, affecting other jobs using the same model. You should monitor usage when adjusting this parameter.
:::
Find logs¶
To view build logs, select Telemetry on the build details page. To filter for document extraction logs, filter the message column for values starting with aip_workflows.
Row-level vs. document-level chunking¶
The extraction output contains one row per page. By default, DocumentChunker.create_chunks_per_document combines all pages from the same document into a single Markdown string before chunking.
To chunk each row independently without combining pages, use DocumentChunker.create_chunks_per_row instead:
chunking_result = chunker.create_chunks_per_row(
extraction_df,
chunk_mode="markdown", # "recursive" for plain text, "markdown" for markdown text
content_column="extractionResult",
id_column="media_item_rid", # used as prefix for chunk_id
chunk_size=8192,
chunk_overlap=0,
thread_number=20,
strip_markdown=False, # set True to remove ```markdown and ``` wrappers before chunking
)
Create embeddings without chunking¶
Chunking is recommended before creating embeddings because embedding models have context limits. To skip chunking while preserving the pipeline structure, set chunk_size to a very large value in create_chunks_per_row, such as sys.maxsize.
Template examples¶
The following examples show the transform code generated for each extraction configuration. These are for reference only. You should create the transform using the deploy tool in Document Intelligence instead of manually writing transform code for document extraction.
Traditional extraction: Raw text¶
Extracts text by reading document metadata. Only available for electronically generated PDFs.
import polars as pl
from concurrent.futures import ThreadPoolExecutor
from transforms.api import Output, incremental, transform
from transforms.mediasets import MediaSetInput
from transforms.mediasets.utils._constants import MEDIA_ITEM_RID, MEDIA_REFERENCE, PATH
THREAD_NUMBER = 20
# @incremental(v2_semantics=True) # uncomment this line if incremental is needed
@transform.using(
output=Output("ri.foundry.main.dataset.abc"),
media_input=MediaSetInput("ri.mio.main.media-set.abc"),
)
def extract(media_input, output):
"""
Extracts content from pdf documents with raw text extraction
"""
media_refs = pl.from_pandas(
media_input.list_media_items_by_path_with_media_reference().pandas(),
schema_overrides={MEDIA_ITEM_RID: pl.String, MEDIA_REFERENCE: pl.String, PATH: pl.String},
)
def process_batch(batch_df: pl.DataFrame) -> pl.DataFrame:
def create_page_tasks(row):
media_item_rid = row[MEDIA_ITEM_RID]
metadata = media_input.get_media_item_metadata(media_item_rid).document
if metadata is None:
raise ValueError(f"Media item {media_item_rid} is not a document")
if metadata.pages is None:
raise ValueError(f"Media item {media_item_rid} has no page count")
return [(row, page_num) for page_num in range(metadata.pages)]
def process_single_page(task):
row, page_num = task
media_item_rid = row[MEDIA_ITEM_RID]
media_reference = row[MEDIA_REFERENCE]
extraction_result = media_input.transform_document_to_text_raw(
media_item_rid, page_num
).read().decode("utf-8")
return {
"media_item_rid": media_item_rid,
"media_reference": media_reference,
"page_num": page_num,
"extraction_result": extraction_result
}
all_tasks = []
for row in batch_df.iter_rows(named=True):
all_tasks.extend(create_page_tasks(row))
with ThreadPoolExecutor(max_workers=THREAD_NUMBER) as executor:
results = list(executor.map(process_single_page, all_tasks))
return pl.DataFrame(results)
extracted_data = media_refs.lazy().map_batches(
process_batch,
schema={
"media_item_rid": pl.String,
"media_reference": pl.String,
"page_num": pl.Int64,
"extraction_result": pl.String,
},
streamable=True,
)
output.write_dataframe(extracted_data)
Traditional extraction: OCR¶
Uses traditional Optical Character Recognition (OCR) to extract text without preserving layout information.
import polars as pl
from concurrent.futures import ThreadPoolExecutor
from transforms.api import Output, incremental, transform
from transforms.mediasets import MediaSetInput
from transforms.mediasets.utils._constants import MEDIA_ITEM_RID, MEDIA_REFERENCE, PATH
THREAD_NUMBER = 20
# @incremental(v2_semantics=True) # uncomment this line if incremental is needed
@transform.using(
output=Output("ri.foundry.main.dataset.abc"),
media_input=MediaSetInput("ri.mio.main.media-set.abc"),
)
def extract(media_input, output):
"""
Extracts content from pdf documents with OCR text extraction
"""
media_refs = pl.from_pandas(
media_input.list_media_items_by_path_with_media_reference().pandas(),
schema_overrides={MEDIA_ITEM_RID: pl.String, MEDIA_REFERENCE: pl.String, PATH: pl.String},
)
def process_batch(batch_df: pl.DataFrame) -> pl.DataFrame:
def create_page_tasks(row):
media_item_rid = row[MEDIA_ITEM_RID]
metadata = media_input.get_media_item_metadata(media_item_rid).document
if metadata is None:
raise ValueError(f"Media item {media_item_rid} is not a document")
if metadata.pages is None:
raise ValueError(f"Media item {media_item_rid} has no page count")
return [(row, page_num) for page_num in range(metadata.pages)]
def process_single_page(task):
row, page_num = task
media_item_rid = row[MEDIA_ITEM_RID]
media_reference = row[MEDIA_REFERENCE]
extraction_result = media_input.transform_document_to_text_ocr_output_text(
media_item_rid, page_num
).read().decode("utf-8")
return {
"media_item_rid": media_item_rid,
"media_reference": media_reference,
"page_num": page_num,
"extraction_result": extraction_result
}
all_tasks = []
for row in batch_df.iter_rows(named=True):
all_tasks.extend(create_page_tasks(row))
with ThreadPoolExecutor(max_workers=THREAD_NUMBER) as executor:
results = list(executor.map(process_single_page, all_tasks))
return pl.DataFrame(results)
extracted_data = media_refs.lazy().map_batches(
process_batch,
schema={
"media_item_rid": pl.String,
"media_reference": pl.String,
"page_num": pl.Int64,
"extraction_result": pl.String,
},
streamable=True,
)
output.write_dataframe(extracted_data)
Traditional extraction: Layout-aware OCR¶
Uses advanced OCR with bounding boxes to preserve document layout and structure.
import polars as pl
from concurrent.futures import ThreadPoolExecutor
from transforms.api import Output, incremental, transform
from transforms.mediasets import MediaSetInput
from transforms.mediasets.utils._constants import MEDIA_ITEM_RID, MEDIA_REFERENCE, PATH
THREAD_NUMBER = 20
# @incremental(v2_semantics=True) # uncomment this line if incremental is needed
@transform.using(
output=Output("ri.foundry.main.dataset.abc"),
media_input=MediaSetInput("ri.mio.main.media-set.abc"),
)
def extract(media_input, output):
"""
Extracts content from pdf documents with layout-aware OCR extraction
"""
media_refs = pl.from_pandas(
media_input.list_media_items_by_path_with_media_reference().pandas(),
schema_overrides={MEDIA_ITEM_RID: pl.String, MEDIA_REFERENCE: pl.String, PATH: pl.String},
)
def process_batch(batch_df: pl.DataFrame) -> pl.DataFrame:
def create_page_tasks(row):
media_item_rid = row[MEDIA_ITEM_RID]
metadata = media_input.get_media_item_metadata(media_item_rid).document
if metadata is None:
raise ValueError(f"Media item {media_item_rid} is not a document")
if metadata.pages is None:
raise ValueError(f"Media item {media_item_rid} has no page count")
return [(row, page_num) for page_num in range(metadata.pages)]
def process_single_page(task):
row, page_num = task
media_item_rid = row[MEDIA_ITEM_RID]
media_reference = row[MEDIA_REFERENCE]
extraction_result = media_input.transform_media_item(media_item_rid, str(page_num), {
"type": "documentToText",
"documentToText": {
"operation": {
"type": "extractLayoutAwareContent",
"extractLayoutAwareContent": {
"parameters": {
"languages": ["ENG"]
}
}
}
}
})
extraction_result = str(extraction_result.json())
return {
"media_item_rid": media_item_rid,
"media_reference": media_reference,
"page_num": page_num,
"extraction_result": extraction_result
}
all_tasks = []
for row in batch_df.iter_rows(named=True):
all_tasks.extend(create_page_tasks(row))
with ThreadPoolExecutor(max_workers=THREAD_NUMBER) as executor:
results = list(executor.map(process_single_page, all_tasks))
return pl.DataFrame(results)
extracted_data = media_refs.lazy().map_batches(
process_batch,
schema={
"media_item_rid": pl.String,
"media_reference": pl.String,
"page_num": pl.Int64,
"extraction_result": pl.String,
},
streamable=True,
)
output.write_dataframe(extracted_data)
Generative AI extraction: Basic¶
Uses a vision language model to extract content as Markdown without preprocessing.
from transforms.api import Output, incremental, transform
from transforms.mediasets import MediaSetInput
from aip_workflows.document_intelligence.transforms import VisionLLMDocumentsExtractorInput
from .prompts import USER_PROMPT
THREAD_NUMBER = 20
# @incremental(v2_semantics=True, snapshot_inputs=["extractor"]) # uncomment this line if incremental is needed
@transform.using(
output=Output("ri.foundry.main.dataset.abc"),
media_input=MediaSetInput("ri.mio.main.media-set.abc"),
extractor=VisionLLMDocumentsExtractorInput(
"ri.language-model-service..language-model.anthropic-claude-xxx-sonnet"
),
)
def extract(media_input, output, extractor):
"""
Extracts content from pdf documents as markdown.
"""
extracted_data = extractor.create_extraction(
media_input, with_ocr=False, prompt=USER_PROMPT, thread_number=THREAD_NUMBER
)
output.write_dataframe(extracted_data)
Generative AI extraction: With OCR preprocessing¶
Uses a vision language model with OCR preprocessing for improved extraction on complex documents.
from transforms.api import Output, incremental, transform
from transforms.mediasets import MediaSetInput
from aip_workflows.document_intelligence.transforms import VisionLLMDocumentsExtractorInput
from .prompts import USER_PROMPT
THREAD_NUMBER = 20
# @incremental(v2_semantics=True, snapshot_inputs=["extractor"]) # uncomment this line if incremental is needed
@transform.using(
output=Output("ri.foundry.main.dataset.abc"),
media_input=MediaSetInput("ri.mio.main.media-set.abc"),
extractor=VisionLLMDocumentsExtractorInput(
"ri.language-model-service..language-model.anthropic-claude-xxx-sonnet"
),
)
def extract(media_input, output, extractor):
"""
Extracts content from pdf documents as markdown.
"""
extracted_data = extractor.create_extraction(
media_input, with_ocr=True, prompt=USER_PROMPT, thread_number=THREAD_NUMBER
)
output.write_dataframe(extracted_data)
Generative AI extraction: Layout-aware¶
Uses a vision language model with layout-aware OCR preprocessing, returning layout information alongside extracted content.
from transforms.api import Output, incremental, transform
from transforms.mediasets import MediaSetInput
from aip_workflows.document_intelligence.transforms import VisionLLMLayoutDocumentsExtractorInput
from .prompts import SYSTEM_PROMPT
THREAD_NUMBER = 20
# @incremental(v2_semantics=True, snapshot_inputs=["extractor"]) # uncomment this line if incremental is needed
@transform.using(
output=Output("ri.foundry.main.dataset.abc"),
media_input=MediaSetInput("ri.mio.main.media-set.abc"),
extractor=VisionLLMLayoutDocumentsExtractorInput(
"ri.language-model-service..language-model.anthropic-claude-xxx-sonnet"
),
)
def extract(media_input, output, extractor):
"""
Extracts content from pdf documents as markdown.
"""
extracted_data = extractor.create_extraction(
media_input,
include_layout_info="no_overlay",
system_prompt=SYSTEM_PROMPT,
thread_number=THREAD_NUMBER
)
output.write_dataframe(extracted_data)
Generative AI extraction: Layout-aware with table cropping¶
Uses a vision language model with layout-aware OCR preprocessing and table cropping for improved table extraction accuracy.
from transforms.api import Output, incremental, transform
from transforms.mediasets import MediaSetInput
from aip_workflows.document_intelligence.transforms import VisionLLMLayoutDocumentsExtractorInput
from .prompts import SYSTEM_PROMPT
THREAD_NUMBER = 20
# @incremental(v2_semantics=True, snapshot_inputs=["extractor"]) # uncomment this line if incremental is needed
@transform.using(
output=Output("ri.foundry.main.dataset.abc"),
media_input=MediaSetInput("ri.mio.main.media-set.abc"),
extractor=VisionLLMLayoutDocumentsExtractorInput(
"ri.language-model-service..language-model.anthropic-claude-xxx-sonnet"
),
)
def extract(media_input, output, extractor):
"""
Extracts content from pdf documents as markdown.
"""
extracted_data = extractor.create_extraction(
media_input,
include_layout_info="crop_tables",
system_prompt=SYSTEM_PROMPT,
thread_number=THREAD_NUMBER
)
output.write_dataframe(extracted_data)
Chunk extracted text and generate embeddings¶
If embedding is not needed, remove the embedder from the transform decorator and the embedding_result line.
from transforms.api import Input, Output, incremental, transform
from aip_workflows.document_intelligence.transforms import DocumentChunker, DocumentEmbedderInput
THREAD_NUMBER = 20
# @incremental(v2_semantics=True, snapshot_inputs=["embedder"]) # uncomment this line if incremental is needed
@transform.using(
extraction_input=Input("ri.foundry.main.dataset.abc"), # typically the output dataset from the extraction transform
output=Output("ri.foundry.main.dataset.xyz"),
embedder=DocumentEmbedderInput("ri.language-model-service..language-model.text-embedding-3-large"),
)
def chunk_and_embed(extraction_input, output, embedder):
extraction_df = extraction_input.polars(lazy=True)
chunker = DocumentChunker()
chunking_result = chunker.create_chunks_per_document(
extraction_df,
chunk_mode="markdown", # "recursive" for raw text, "markdown" for markdown text
content_column="extractionResult", # content column name
id_column="media_item_rid", # id of the document, used to combine content (e.g. from different pages) of the single document
page_column="page_num", # page number column name
chunk_size=8192,
chunk_overlap=0,
thread_number=THREAD_NUMBER,
strip_markdown=True, # when True, removes ```markdown prefix and ``` suffix from the content before chunking
)
embedding_result = embedder.create_embeddings(
chunking_result,
content_column="chunk_content",
thread_number=THREAD_NUMBER,
)
output.write_dataframe(embedding_result)
中文翻译¶
将提取策略部署到 Python 转换(Transform)¶
在 AIP Document Intelligence 中验证提取策略后,您可以将其部署为 Python 转换(Transform),以对媒体集(Media Set)中所有文档的所有页面执行批量提取。部署后的模板会产生与 AIP Document Intelligence 中相应配置相同的结果。
使用已部署的模板¶
在代码仓库(Code Repositories)中创建模板后:
- 在
src/myproject/document_extraction/my_extraction.py文件的@transform.using装饰器中指定您的输出数据集(Output Dataset)。 - 触发构建。
该模板使用轻量级转换(Lightweight Transforms)以获得最佳性能。旧版本使用基于 Spark 的转换(Spark-based Transforms),由于 Spark 开销,速度要慢得多。如果您尚未迁移,我们建议迁移到轻量级转换。
:::callout{theme="warning"} 此模板尚不支持预览模式。使用预览时会出现错误,但实际构建将正常运行。 :::
增量处理(Incremental Processing)¶
默认情况下,文档提取转换是非增量的,这意味着每次运行时都会处理所有文档。您可以通过取消注释 @incremental(...) 装饰器行来配置转换以增量方式运行。对于增量转换,当向输入媒体集添加新文档时,重新运行转换将仅处理新文档并将结果追加到输出数据集。
自定义提示词(Prompt)¶
对于生成式 AI 配置,模板会继承您在 AIP Document Intelligence 中指定的提示词。您可以在 src/myproject/document_extraction/prompts.py 中查看提示词。
我们不建议直接在模板中编辑提示词,因为这会导致 Document Intelligence 结果与批处理作业结果之间存在差异。请改为在 Document Intelligence 中进行提示词调整,在那里验证结果,然后重新部署以创建新模板。
| 转换输入类型 | 可自定义的提示词 |
|---|---|
VisionLLMDocumentsExtractorInput |
仅用户提示词(系统提示词固定) |
VisionLLMLayoutDocumentsExtractorInput(布局感知提取) |
仅系统提示词(用户提示词固定) |
对于布局感知提取配置,用户提示词必须保持固定,因为它包含一个特殊的 JSON 模式,用于保留布局结构信息。修改此提示词将显著降低提取成功率。
自定义图像预处理¶
对于在提取前需要进行图像转换的文档(例如旋转的文本内容),您应该:
- 创建一个单独的转换管道来应用图像转换。
- 将处理后的结果保存到新的媒体集。
- 在处理后的媒体集上使用 Document Intelligence 进行提取。
在媒体项子集上运行¶
对于布局感知的生成式 AI 配置,视觉大语言模型(Vision LLM)必须生成符合特定模式的合法 JSON。如果响应不是合法 JSON 或不符合模式,提取将失败并返回 ERROR_RESPONSE_JSON_PARSING 错误。
在实践中,使用顶级模型时大约有 5% 的提取可能会失败。对于失败的行,您仍然可以从布局模型获得有效的 layoutInfo 和提取结果。
要在失败的行上重新运行提取:
- 使用
filter_on_media_items参数,传入媒体项 ID 列表以仅处理特定项。 - 移除
@incremental装饰器,以便这些行被重新处理,而不是被检测为已完成。
提高运行时性能¶
THREAD_NUMBER 参数控制并发线程数,每个线程一次从一个文档页面提取数据。值越高,作业完成速度越快。
| 设置 | 值 | 说明 |
|---|---|---|
| 默认值 | 20 | 适用于大多数环境的保守设置 |
| 测试最大值 | 300 | 在具有充足 Vision LLM 容量的开发环境中可实现 |
:::callout{theme="warning"}
在容量受限的环境中,将 THREAD_NUMBER 值设置过高会导致速率限制错误。重试循环随后会消耗大量容量,影响使用同一模型的其他作业。调整此参数时应监控使用情况。
:::
查找日志¶
要查看构建日志,请在构建详情页面上选择 遥测(Telemetry)。要筛选文档提取日志,请筛选消息列中以 aip_workflows 开头的值。
行级与文档级分块(Chunking)¶
提取输出每页包含一行。默认情况下,DocumentChunker.create_chunks_per_document 在分块之前将同一文档的所有页面合并为单个 Markdown 字符串。
要独立地对每行进行分块而不合并页面,请改用 DocumentChunker.create_chunks_per_row:
chunking_result = chunker.create_chunks_per_row(
extraction_df,
chunk_mode="markdown", # "recursive" 用于纯文本,"markdown" 用于 Markdown 文本
content_column="extractionResult",
id_column="media_item_rid", # 用作 chunk_id 的前缀
chunk_size=8192,
chunk_overlap=0,
thread_number=20,
strip_markdown=False, # 设置为 True 以在分块前移除 ```markdown 和 ``` 包装器
)
创建嵌入而不分块¶
建议在创建嵌入之前进行分块,因为嵌入模型有上下文限制。要在保留管道结构的同时跳过分块,请在 create_chunks_per_row 中将 chunk_size 设置为非常大的值,例如 sys.maxsize。
模板示例¶
以下示例展示了为每种提取配置生成的转换代码。这些仅供参考。您应该使用 Document Intelligence 中的部署工具创建转换,而不是手动编写文档提取的转换代码。
传统提取:原始文本(Raw Text)¶
通过读取文档元数据来提取文本。仅适用于电子生成的 PDF。
import polars as pl
from concurrent.futures import ThreadPoolExecutor
from transforms.api import Output, incremental, transform
from transforms.mediasets import MediaSetInput
from transforms.mediasets.utils._constants import MEDIA_ITEM_RID, MEDIA_REFERENCE, PATH
THREAD_NUMBER = 20
# @incremental(v2_semantics=True) # 如果需要增量处理,取消注释此行
@transform.using(
output=Output("ri.foundry.main.dataset.abc"),
media_input=MediaSetInput("ri.mio.main.media-set.abc"),
)
def extract(media_input, output):
"""
使用原始文本提取从 PDF 文档中提取内容
"""
media_refs = pl.from_pandas(
media_input.list_media_items_by_path_with_media_reference().pandas(),
schema_overrides={MEDIA_ITEM_RID: pl.String, MEDIA_REFERENCE: pl.String, PATH: pl.String},
)
def process_batch(batch_df: pl.DataFrame) -> pl.DataFrame:
def create_page_tasks(row):
media_item_rid = row[MEDIA_ITEM_RID]
metadata = media_input.get_media_item_metadata(media_item_rid).document
if metadata is None:
raise ValueError(f"媒体项 {media_item_rid} 不是文档")
if metadata.pages is None:
raise ValueError(f"媒体项 {media_item_rid} 没有页数信息")
return [(row, page_num) for page_num in range(metadata.pages)]
def process_single_page(task):
row, page_num = task
media_item_rid = row[MEDIA_ITEM_RID]
media_reference = row[MEDIA_REFERENCE]
extraction_result = media_input.transform_document_to_text_raw(
media_item_rid, page_num
).read().decode("utf-8")
return {
"media_item_rid": media_item_rid,
"media_reference": media_reference,
"page_num": page_num,
"extraction_result": extraction_result
}
all_tasks = []
for row in batch_df.iter_rows(named=True):
all_tasks.extend(create_page_tasks(row))
with ThreadPoolExecutor(max_workers=THREAD_NUMBER) as executor:
results = list(executor.map(process_single_page, all_tasks))
return pl.DataFrame(results)
extracted_data = media_refs.lazy().map_batches(
process_batch,
schema={
"media_item_rid": pl.String,
"media_reference": pl.String,
"page_num": pl.Int64,
"extraction_result": pl.String,
},
streamable=True,
)
output.write_dataframe(extracted_data)
传统提取:OCR¶
使用传统光学字符识别(OCR)提取文本,不保留布局信息。
import polars as pl
from concurrent.futures import ThreadPoolExecutor
from transforms.api import Output, incremental, transform
from transforms.mediasets import MediaSetInput
from transforms.mediasets.utils._constants import MEDIA_ITEM_RID, MEDIA_REFERENCE, PATH
THREAD_NUMBER = 20
# @incremental(v2_semantics=True) # 如果需要增量处理,取消注释此行
@transform.using(
output=Output("ri.foundry.main.dataset.abc"),
media_input=MediaSetInput("ri.mio.main.media-set.abc"),
)
def extract(media_input, output):
"""
使用 OCR 文本提取从 PDF 文档中提取内容
"""
media_refs = pl.from_pandas(
media_input.list_media_items_by_path_with_media_reference().pandas(),
schema_overrides={MEDIA_ITEM_RID: pl.String, MEDIA_REFERENCE: pl.String, PATH: pl.String},
)
def process_batch(batch_df: pl.DataFrame) -> pl.DataFrame:
def create_page_tasks(row):
media_item_rid = row[MEDIA_ITEM_RID]
metadata = media_input.get_media_item_metadata(media_item_rid).document
if metadata is None:
raise ValueError(f"媒体项 {media_item_rid} 不是文档")
if metadata.pages is None:
raise ValueError(f"媒体项 {media_item_rid} 没有页数信息")
return [(row, page_num) for page_num in range(metadata.pages)]
def process_single_page(task):
row, page_num = task
media_item_rid = row[MEDIA_ITEM_RID]
media_reference = row[MEDIA_REFERENCE]
extraction_result = media_input.transform_document_to_text_ocr_output_text(
media_item_rid, page_num
).read().decode("utf-8")
return {
"media_item_rid": media_item_rid,
"media_reference": media_reference,
"page_num": page_num,
"extraction_result": extraction_result
}
all_tasks = []
for row in batch_df.iter_rows(named=True):
all_tasks.extend(create_page_tasks(row))
with ThreadPoolExecutor(max_workers=THREAD_NUMBER) as executor:
results = list(executor.map(process_single_page, all_tasks))
return pl.DataFrame(results)
extracted_data = media_refs.lazy().map_batches(
process_batch,
schema={
"media_item_rid": pl.String,
"media_reference": pl.String,
"page_num": pl.Int64,
"extraction_result": pl.String,
},
streamable=True,
)
output.write_dataframe(extracted_data)
传统提取:布局感知 OCR(Layout-aware OCR)¶
使用带有边界框的高级 OCR 来保留文档布局和结构。
import polars as pl
from concurrent.futures import ThreadPoolExecutor
from transforms.api import Output, incremental, transform
from transforms.mediasets import MediaSetInput
from transforms.mediasets.utils._constants import MEDIA_ITEM_RID, MEDIA_REFERENCE, PATH
THREAD_NUMBER = 20
# @incremental(v2_semantics=True) # 如果需要增量处理,取消注释此行
@transform.using(
output=Output("ri.foundry.main.dataset.abc"),
media_input=MediaSetInput("ri.mio.main.media-set.abc"),
)
def extract(media_input, output):
"""
使用布局感知 OCR 提取从 PDF 文档中提取内容
"""
media_refs = pl.from_pandas(
media_input.list_media_items_by_path_with_media_reference().pandas(),
schema_overrides={MEDIA_ITEM_RID: pl.String, MEDIA_REFERENCE: pl.String, PATH: pl.String},
)
def process_batch(batch_df: pl.DataFrame) -> pl.DataFrame:
def create_page_tasks(row):
media_item_rid = row[MEDIA_ITEM_RID]
metadata = media_input.get_media_item_metadata(media_item_rid).document
if metadata is None:
raise ValueError(f"媒体项 {media_item_rid} 不是文档")
if metadata.pages is None:
raise ValueError(f"媒体项 {media_item_rid} 没有页数信息")
return [(row, page_num) for page_num in range(metadata.pages)]
def process_single_page(task):
row, page_num = task
media_item_rid = row[MEDIA_ITEM_RID]
media_reference = row[MEDIA_REFERENCE]
extraction_result = media_input.transform_media_item(media_item_rid, str(page_num), {
"type": "documentToText",
"documentToText": {
"operation": {
"type": "extractLayoutAwareContent",
"extractLayoutAwareContent": {
"parameters": {
"languages": ["ENG"]
}
}
}
}
})
extraction_result = str(extraction_result.json())
return {
"media_item_rid": media_item_rid,
"media_reference": media_reference,
"page_num": page_num,
"extraction_result": extraction_result
}
all_tasks = []
for row in batch_df.iter_rows(named=True):
all_tasks.extend(create_page_tasks(row))
with ThreadPoolExecutor(max_workers=THREAD_NUMBER) as executor:
results = list(executor.map(process_single_page, all_tasks))
return pl.DataFrame(results)
extracted_data = media_refs.lazy().map_batches(
process_batch,
schema={
"media_item_rid": pl.String,
"media_reference": pl.String,
"page_num": pl.Int64,
"extraction_result": pl.String,
},
streamable=True,
)
output.write_dataframe(extracted_data)
生成式 AI 提取:基础版(Basic)¶
使用视觉语言模型(Vision Language Model)将内容提取为 Markdown,无需预处理。
from transforms.api import Output, incremental, transform
from transforms.mediasets import MediaSetInput
from aip_workflows.document_intelligence.transforms import VisionLLMDocumentsExtractorInput
from .prompts import USER_PROMPT
THREAD_NUMBER = 20
# @incremental(v2_semantics=True, snapshot_inputs=["extractor"]) # 如果需要增量处理,取消注释此行
@transform.using(
output=Output("ri.foundry.main.dataset.abc"),
media_input=MediaSetInput("ri.mio.main.media-set.abc"),
extractor=VisionLLMDocumentsExtractorInput(
"ri.language-model-service..language-model.anthropic-claude-xxx-sonnet"
),
)
def extract(media_input, output, extractor):
"""
从 PDF 文档中提取内容为 Markdown 格式。
"""
extracted_data = extractor.create_extraction(
media_input, with_ocr=False, prompt=USER_PROMPT, thread_number=THREAD_NUMBER
)
output.write_dataframe(extracted_data)
生成式 AI 提取:带 OCR 预处理¶
使用带有 OCR 预处理的视觉语言模型,以改进复杂文档的提取效果。
from transforms.api import Output, incremental, transform
from transforms.mediasets import MediaSetInput
from aip_workflows.document_intelligence.transforms import VisionLLMDocumentsExtractorInput
from .prompts import USER_PROMPT
THREAD_NUMBER = 20
# @incremental(v2_semantics=True, snapshot_inputs=["extractor"]) # 如果需要增量处理,取消注释此行
@transform.using(
output=Output("ri.foundry.main.dataset.abc"),
media_input=MediaSetInput("ri.mio.main.media-set.abc"),
extractor=VisionLLMDocumentsExtractorInput(
"ri.language-model-service..language-model.anthropic-claude-xxx-sonnet"
),
)
def extract(media_input, output, extractor):
"""
从 PDF 文档中提取内容为 Markdown 格式。
"""
extracted_data = extractor.create_extraction(
media_input, with_ocr=True, prompt=USER_PROMPT, thread_number=THREAD_NUMBER
)
output.write_dataframe(extracted_data)
生成式 AI 提取:布局感知(Layout-aware)¶
使用带有布局感知 OCR 预处理的视觉语言模型,返回布局信息以及提取的内容。
from transforms.api import Output, incremental, transform
from transforms.mediasets import MediaSetInput
from aip_workflows.document_intelligence.transforms import VisionLLMLayoutDocumentsExtractorInput
from .prompts import SYSTEM_PROMPT
THREAD_NUMBER = 20
# @incremental(v2_semantics=True, snapshot_inputs=["extractor"]) # 如果需要增量处理,取消注释此行
@transform.using(
output=Output("ri.foundry.main.dataset.abc"),
media_input=MediaSetInput("ri.mio.main.media-set.abc"),
extractor=VisionLLMLayoutDocumentsExtractorInput(
"ri.language-model-service..language-model.anthropic-claude-xxx-sonnet"
),
)
def extract(media_input, output, extractor):
"""
从 PDF 文档中提取内容为 Markdown 格式。
"""
extracted_data = extractor.create_extraction(
media_input,
include_layout_info="no_overlay",
system_prompt=SYSTEM_PROMPT,
thread_number=THREAD_NUMBER
)
output.write_dataframe(extracted_data)
生成式 AI 提取:带表格裁剪的布局感知¶
使用带有布局感知 OCR 预处理和表格裁剪的视觉语言模型,以提高表格提取的准确性。
from transforms.api import Output, incremental, transform
from transforms.mediasets import MediaSetInput
from aip_workflows.document_intelligence.transforms import VisionLLMLayoutDocumentsExtractorInput
from .prompts import SYSTEM_PROMPT
THREAD_NUMBER = 20
# @incremental(v2_semantics=True, snapshot_inputs=["extractor"]) # 如果需要增量处理,取消注释此行
@transform.using(
output=Output("ri.foundry.main.dataset.abc"),
media_input=MediaSetInput("ri.mio.main.media-set.abc"),
extractor=VisionLLMLayoutDocumentsExtractorInput(
"ri.language-model-service..language-model.anthropic-claude-xxx-sonnet"
),
)
def extract(media_input, output, extractor):
"""
从 PDF 文档中提取内容为 Markdown 格式。
"""
extracted_data = extractor.create_extraction(
media_input,
include_layout_info="crop_tables",
system_prompt=SYSTEM_PROMPT,
thread_number=THREAD_NUMBER
)
output.write_dataframe(extracted_data)
对提取的文本进行分块并生成嵌入¶
如果不需要嵌入,请从转换装饰器中移除 embedder 和 embedding_result 行。
from transforms.api import Input, Output, incremental, transform
from aip_workflows.document_intelligence.transforms import DocumentChunker, DocumentEmbedderInput
THREAD_NUMBER = 20
# @incremental(v2_semantics=True, snapshot_inputs=["embedder"]) # 如果需要增量处理,取消注释此行
@transform.using(
extraction_input=Input("ri.foundry.main.dataset.abc"), # 通常是提取转换的输出数据集
output=Output("ri.foundry.main.dataset.xyz"),
embedder=DocumentEmbedderInput("ri.language-model-service..language-model.text-embedding-3-large"),
)
def chunk_and_embed(extraction_input, output, embedder):
extraction_df = extraction_input.polars(lazy=True)
chunker = DocumentChunker()
chunking_result = chunker.create_chunks_per_document(
extraction_df,
chunk_mode="markdown", # "recursive" 用于原始文本,"markdown" 用于 Markdown 文本
content_column="extractionResult", # 内容列名
id_column="media_item_rid", # 文档 ID,用于合并单个文档的内容(例如来自不同页面的内容)
page_column="page_num", # 页码列名
chunk_size=8192,
chunk_overlap=0,
thread_number=THREAD_NUMBER,
strip_markdown=True, # 当为 True 时,在分块前从内容中移除 ```markdown 前缀和 ``` 后缀
)
embedding_result = embedder.create_embeddings(
chunking_result,
content_column="chunk_content",
thread_number=THREAD_NUMBER,
)
output.write_dataframe(embedding_result)