跳转至

Streaming functions(流式函数(Streaming functions))

Python and TypeScript v2 functions can stream the result of an execution in chunks. This is useful when data becomes available over time and lets consuming applications begin processing the result before it is complete.

Write a function with a streamed response

To stream a response, Python functions must be annotated with an Iterable[T] return type and use the yield keyword to produce values as they become available. In TypeScript, you must declare an async generator function by adding a * character after the function keyword, specify a return type of AsyncIterable<T>, and use the yield keyword to produce values as they become available.

The following example demonstrates how a function can return a stream of integers where one second elapses between each integer:

```python tab="Python" from functions.api import function from typing import Iterable import time

@function def my_lazy_number_generator(n: int) -> Iterable[int]: for i in range(n): time.sleep(1) yield i

```typescript tab="TypeScript v2"
import { Integer } from "@osdk/functions";

export default async function* myLazyNumberGenerator(n: Integer): AsyncIterable<Integer> {
    for (let i = 0; i < n; i++) {
        await new Promise(resolve => setTimeout(resolve, 1_000));
        yield i;
    }
}

Functions with streamed responses are useful when working with language models that may take time to generate the entire output. By yielding each chunk of the response as it is produced by the model, you can provide a real-time experience that does not block on the full completion of the response. Refer to Language models in TypeScript v2 and Python functions for more information on calling language models in functions.

The following example uses the openai SDK to call a language model and stream the response back by enabling the stream flag in the request:

```python tab="Python" from openai import OpenAI from functions.api import function from functions.aliases import model from foundry_sdk.v2.language_models import ( get_openai_base_url, get_foundry_token, get_http_client, ) from typing import Iterable

@function def create_chat_completion(prompt: str) -> Iterable[str]: client = OpenAI( api_key=get_foundry_token(preview=True), base_url=get_openai_base_url(preview=True), http_client=get_http_client(preview=True), )

stream = client.chat.completions.create(
    model=model("gpt55").rid,
    messages=[
        {
            "role": "user",
            "content": prompt,
        },
    ],
    stream=True
)

for event in stream:
    if event.choices:
        content = event.choices[0].delta.content
        if content:
            yield content

typescript tab="TypeScript v2" import { PlatformClient } from "@osdk/client"; import OpenAI from "openai"; import { Aliases } from "@osdk/functions"; import { getFoundryToken, getOpenAiBaseUrl, createFetch } from "@osdk/language-models"; export default async function* createChatCompletion(client: PlatformClient, prompt: string): AsyncIterable { const oaiClient = new OpenAI({ apiKey: await getFoundryToken(client), baseURL: getOpenAiBaseUrl(client), fetch: createFetch(client), }); const stream = await oaiClient.chat.completions.create({ model: Aliases.model("gpt55").rid, messages: [ { role: 'user', content: prompt }, ], stream: true }); for await (const event of stream) { const content = event.choices[0]?.delta?.content; if (content) { yield content; } } } ```

Call streaming functions through the Ontology SDK

Once you have tagged and published your streaming function, you can execute it through the Ontology SDK in a React application, a custom widget in Workshop, or another function.

:::callout{theme="neutral" title="Beta"} Executing functions with streamed responses through the Ontology SDK is in the beta phase of development. Functionality may change during active development. :::

```python tab="Python" from foundry_sdk_runtime import AllowBetaFeatures

with AllowBetaFeatures(): with client.ontology.queries.create_chat_completion_streaming(prompt="Where is Mount Everest?") as stream: for text in stream: # ...

```typescript tab="TypeScript"
import { __EXPERIMENTAL__NOT_SUPPORTED_YET__executeStreamingFunction } from "@osdk/api/unstable";

const stream = client(__EXPERIMENTAL__NOT_SUPPORTED_YET__executeStreamingFunction).executeStreamingFunction(
    createChatCompletion,
    {
        prompt: "Where is Mount Everest?",
    }
);

for await (const text of stream) {
    // ...
}


中文翻译

流式函数(Streaming functions)

Python和TypeScript v2函数可以分块流式传输执行结果。当数据随时间逐步可用时,这种方式非常有用,能让消费端应用在结果完全生成前就开始处理。

编写具有流式响应的函数

要实现流式响应,Python函数必须使用Iterable[T]返回类型进行注解,并使用yield关键字在数据可用时生成值。在TypeScript中,您需要在function关键字后添加*字符来声明异步生成器函数,指定返回类型为AsyncIterable<T>,并使用yield关键字在数据可用时生成值。

以下示例演示了函数如何返回整数流,每个整数之间间隔一秒:

```python tab="Python" from functions.api import function from typing import Iterable import time

@function def my_lazy_number_generator(n: int) -> Iterable[int]: for i in range(n): time.sleep(1) yield i

```typescript tab="TypeScript v2"
import { Integer } from "@osdk/functions";

export default async function* myLazyNumberGenerator(n: Integer): AsyncIterable<Integer> {
    for (let i = 0; i < n; i++) {
        await new Promise(resolve => setTimeout(resolve, 1_000));
        yield i;
    }
}

具有流式响应的函数在处理需要较长时间生成完整输出的语言模型时非常有用。通过逐块生成模型产生的响应内容,您可以提供实时体验,而无需等待完整响应生成完毕。有关在函数中调用语言模型的更多信息,请参阅TypeScript v2和Python函数中的语言模型(Language models in TypeScript v2 and Python functions)

以下示例使用openai SDK调用语言模型,并通过启用请求中的stream标志来流式传输响应:

```python tab="Python" from openai import OpenAI from functions.api import function from functions.aliases import model from foundry_sdk.v2.language_models import ( get_openai_base_url, get_foundry_token, get_http_client, ) from typing import Iterable

@function def create_chat_completion(prompt: str) -> Iterable[str]: client = OpenAI( api_key=get_foundry_token(preview=True), base_url=get_openai_base_url(preview=True), http_client=get_http_client(preview=True), )

stream = client.chat.completions.create(
    model=model("gpt55").rid,
    messages=[
        {
            "role": "user",
            "content": prompt,
        },
    ],
    stream=True
)

for event in stream:
    if event.choices:
        content = event.choices[0].delta.content
        if content:
            yield content

typescript tab="TypeScript v2" import { PlatformClient } from "@osdk/client"; import OpenAI from "openai"; import { Aliases } from "@osdk/functions"; import { getFoundryToken, getOpenAiBaseUrl, createFetch } from "@osdk/language-models"; export default async function* createChatCompletion(client: PlatformClient, prompt: string): AsyncIterable { const oaiClient = new OpenAI({ apiKey: await getFoundryToken(client), baseURL: getOpenAiBaseUrl(client), fetch: createFetch(client), }); const stream = await oaiClient.chat.completions.create({ model: Aliases.model("gpt55").rid, messages: [ { role: 'user', content: prompt }, ], stream: true }); for await (const event of stream) { const content = event.choices[0]?.delta?.content; if (content) { yield content; } } } ```

通过本体论SDK(Ontology SDK)调用流式函数

在标记并发布流式函数后,您可以通过本体论SDK(Ontology SDK)在React应用、Workshop自定义组件或其他函数中执行它。

:::callout{theme="neutral" title="测试版(Beta)"} 通过本体论SDK(Ontology SDK)执行具有流式响应的函数目前处于测试版(Beta)开发阶段。功能在活跃开发期间可能会发生变化。 :::

```python tab="Python" from foundry_sdk_runtime import AllowBetaFeatures

with AllowBetaFeatures(): with client.ontology.queries.create_chat_completion_streaming(prompt="Where is Mount Everest?") as stream: for text in stream: # ...

```typescript tab="TypeScript"
import { __EXPERIMENTAL__NOT_SUPPORTED_YET__executeStreamingFunction } from "@osdk/api/unstable";

const stream = client(__EXPERIMENTAL__NOT_SUPPORTED_YET__executeStreamingFunction).executeStreamingFunction(
    createChatCompletion,
    {
        prompt: "Where is Mount Everest?",
    }
);

for await (const text of stream) {
    // ...
}