跳转至

Push data into a stream(将数据推送到流中)

Most data ingestion in Foundry focuses on pulling data from a source system and syncing it into a dataset or stream.

With streams, Foundry supports push-based ingestion to support event-based workflows.

Push-based record ingestion in Foundry follows the same principles as typical REST services. Through a series of REST endpoints, we expose a push-based API that can consume records and write them into streams and datasets. The following high-level information is required to push into a stream:

  • The dataset resource identifier of the stream.
  • The name of the branch.
  • A token to authenticate the request.

:::callout{theme="neutral"} If you already have a Foundry source configured for your data, you may want to connect with that source instead. Learn how to set up a streaming sync with an existing source, or learn how to set up a Kafka source.

If you need to receive inbound webhooks from systems that cannot properly authenticate with Foundry or conform to standard stream schemas, consider using listeners. :::

Below, we will discuss the steps required to push data into a stream:

  1. Set up a new stream.
  2. Push records into the stream.
  3. Share the stream.
  4. Test the stream.

Part 1. Initial setup

First, begin the stream creation workflow.

  1. Log into Foundry.
  2. Navigate to a Project.
  3. Select + New in the top right corner.
  4. Scroll down and select Stream.

Create stream

Next, we must define the schema, throughput, and keys for the stream.

  • Schema defines the structure and types of your stream data.
  • Throughput represents the data processing rate and will impact the number of partitions used in your stream. Learn more in our throughput and partitions documentation.
  • Keys are used to guarantee ordering for unique IDs when using multiple partitions. Learn more in our steaming keys documentation.

For this tutorial, we will create a simple single partition stream.

  • Set the schema to sensor_id: String, temperature: Double, and created_at: Timestamp.
  • Set the throughput to Normal.

Then, configure your stream.

  • Automatically generate schema from an existing JSON blob by selecting the Generate from JSON sample... button and pasting in your existing JSON blob.
  • Set up change data capture if you are streaming real-time updates from a relational database.
  • Consider the parallelism requirements of your stream to determine if you need higher throughput settings. Review our partitions documentation for more information.
  • Consider setting a key for ordering guarantees when using multiple partitions.

Define stream

:::callout{theme="neutral"} All validation errors must be addressed before selecting Create stream. Hover over the tooltips on the bottom of the page for more details about the error. :::

Select the Create stream button in the bottom right corner to navigate to the Connect page. Here, you can specify how to connect to the streaming data.

Part 2. Push records into the stream

We are now ready to connect our stream.

  1. For push-based ingestion, select one of the options under the Connect via API section.

Any language or technology that can make HTTP requests can be used to push records. We provide examples for cURL, Python, JavaScript via Node, and Java. If you want to ingest data via a Foundry sync instead of pushing the data into Foundry, review how to set up a streaming sync.

For this example, we will select cURL.

<img alt="Stream connection" src="./media/stream-connect@2x.png">

You will then land on the Push data workflow page.

  1. Next, select an authentication mechanism. We support two ways to authenticate to the request:

  2. Push with a third-party application (recommended): This method uses an OAuth2 workflow to create a secure token that can be used to push records into your stream.

  3. Push with a personal token: This method uses a user-generated token for testing purposes only.

    Stream push options

  4. For this tutorial, choose the Push with a third-party application method. Follow the steps on the screen to set up a third-party application and create your client secret. When configuring the application's operation restrictions, ensure the api:use-streams-write operation is included for the third-party application to push records into the stream.

    Stream push third party auth

  5. Now, select Go to third-party applications to open the third-party application management page in your Foundry platform settings.

  6. Select Register new application at the top right of your screen.

    Third party new app

  7. Choose a name, and set the client type to Server application.

    Third party create app

  8. Select Create, and you will be presented with your client ID and secret.

:::callout{theme="warning"} The client secret will not be accessible once you leave this page. Be sure to store it in a secure location. :::

  1. Now, you can add the client ID and secret into the Push workflow page.

    Add client secret

  2. Scroll down to the Configuring the Application section of the workflow, then select Manage application to open the Third-party applications management page in your Foundry platform settings.

    Configure third party app

  3. Next, enable the Client credentials grant setting.

    Configure grant

  4. Finally, click Save in the upper right corner.

Part 3: Share the stream

Now, we need to share the stream with the application we created.

  1. First, return to the Push workflow page. Under the Using Your New Application section in the workflow, find the client ID that you generated.

    Client ID

  2. Next, select Share on the top right to open the Roles tab of the stream Details sidebar to the right side of your screen.

    Share third party app

  3. Copy and paste the client ID into the Roles search field to find the application you created. Select the + to search for and choose the Editor role.

  4. Choose Save at the bottom of the side panel to share the stream.

  5. Return to the push workflow and select Next step.

Part 4: Test the stream

You will now be presented with code examples that can be used to push test records into the stream. The code examples differ depending on what language you selected in the previous steps. For this example, we are using cURL.

:::callout{theme="neutral"} Before running cURL commands, be sure to install jq ↗ for command line JSON parsing. :::

  1. First, copy the command from the first box. This will hit Foundry’s OAuth2 endpoint, providing you with an access token you can use to push records.

Push records with cURL

  1. Execute the command in a bash terminal on your Mac, Windows, or Linux machine.

Once the command is executed, you will have an available variable called $ACCESS_TOKEN. You will use this variable in the next command to push records.

The second command will use cURL to hit a Foundry endpoint with a post request that contains the records we want to insert into the stream. The command is prepopulated with a dummy record to push into the stream, but you could provide any data in the HTTP request that adheres to the schema of the stream.

  1. Copy and paste the second command into your terminal and execute it.

If the command is successful, you will see records appear in the stream.

<img alt="View records" src="./media/stream-view-records@2x.png">

To change the data pushed into the stream, modify the data parameter of the post request.

[{ \"value\": {\"sensor_id\":\"sensor4\",\"temperature\":4.132} }]

You can also send test records into the stream from the user interface by expanding the Test with JSON card.

Test with JSON

Next steps

Now that you have successfully pushed data into a stream, you are ready to start transforming your data. Select Start pipelining to navigate to the Pipeline Builder application where you will build your streaming pipeline. Learn how to transform your data, and learn more about the different transforms available in Pipeline Builder.


中文翻译

将数据推送到流中

在 Foundry 中,大多数数据摄取都侧重于从源系统拉取数据并将其同步到数据集或流中。

通过流,Foundry 支持基于推送的摄取(Push-based Ingestion),以支持基于事件的工作流。

Foundry 中基于推送的记录摄取遵循与典型 REST 服务相同的原则。通过一系列 REST 端点,我们公开了一个基于推送的 API,该 API 可以消费记录并将其写入流和数据集中。将数据推送到流中需要以下高级信息:

  • 流的数据集资源标识符(Dataset Resource Identifier)
  • 分支(Branch)名称
  • 用于验证请求的令牌(Token)

:::callout{theme="neutral"} 如果您已经为数据配置了 Foundry 源,您可能希望改为连接到该源。了解如何使用现有源设置流式同步(Streaming Sync),或了解如何设置 Kafka 源(Kafka Source)

如果您需要从无法与 Foundry 正确进行身份验证或不符合标准流模式的系统接收入站 Webhook,请考虑使用监听器(Listeners)。 :::

下面,我们将讨论将数据推送到流中所需的步骤:

  1. 设置新流
  2. 记录推送到流中
  3. 共享
  4. 测试

第一部分:初始设置

首先,开始流创建工作流。

  1. 登录 Foundry。
  2. 导航到项目(Project)
  3. 选择右上角的 + 新建
  4. 向下滚动并选择流(Stream)

创建流

接下来,我们必须定义流的模式(Schema)、吞吐量(Throughput)和键(Keys)。

  • 模式(Schema) 定义了流数据的结构和类型。
  • 吞吐量(Throughput) 表示数据处理速率,将影响流中使用的分区数量。在我们的吞吐量分区文档中了解更多信息。
  • 键(Keys) 用于在使用多个分区时保证唯一 ID 的排序。在我们的流键(Streaming Keys)文档中了解更多信息。

在本教程中,我们将创建一个简单的单分区流。

  • 将模式设置为 sensor_id: Stringtemperature: Doublecreated_at: Timestamp
  • 将吞吐量设置为 正常(Normal)

然后,配置您的流。

  • 通过选择从 JSON 样本生成...按钮并粘贴现有的 JSON blob,从现有的 JSON blob 自动生成模式。
  • 如果您正在从关系数据库流式传输实时更新,请设置变更数据捕获(Change Data Capture)
  • 考虑流的并行性要求,以确定是否需要更高的吞吐量设置。查看我们的分区文档以获取更多信息。
  • 考虑在使用多个分区时设置键以保证排序。

定义流

:::callout{theme="neutral"} 在选择创建流之前,必须解决所有验证错误。将鼠标悬停在页面底部的工具提示上以获取有关错误的更多详细信息。 :::

选择右下角的创建流按钮以导航到连接页面。在这里,您可以指定如何连接到流数据。

第二部分:将记录推送到流中

我们现在准备连接我们的流。

  1. 对于基于推送的摄取,选择通过 API 连接部分下的选项之一。

任何能够发出 HTTP 请求的语言或技术都可以用于推送记录。我们提供了 cURL、Python、通过 Node 的 JavaScript 和 Java 的示例。如果您希望通过 Foundry 同步摄取数据而不是将数据推送到 Foundry,请查看如何设置流式同步

对于此示例,我们将选择 cURL

<img alt="流连接" src="./media/stream-connect@2x.png">

然后您将进入推送数据工作流页面。

  1. 接下来,选择一种身份验证机制。我们支持两种方式对请求进行身份验证:

  2. 使用第三方应用程序推送(推荐): 此方法使用 OAuth2 工作流创建一个安全令牌,可用于将记录推送到您的流中。

  3. 使用个人令牌推送: 此方法使用用户生成的令牌,仅用于测试目的。

    流推送选项

  4. 在本教程中,选择使用第三方应用程序推送方法。按照屏幕上的步骤设置第三方应用程序并创建您的客户端密钥(Client Secret)。在配置应用程序的操作限制时,确保包含 api:use-streams-write 操作,以便第三方应用程序可以将记录推送到流中。

    流推送第三方身份验证

  5. 现在,选择转到第三方应用程序以在您的 Foundry 平台设置中打开第三方应用程序管理页面。

  6. 选择屏幕右上角的注册新应用程序

    第三方新应用程序

  7. 选择一个名称,并将客户端类型设置为服务器应用程序

    第三方创建应用程序

  8. 选择创建,系统将显示您的客户端 ID 和密钥。

:::callout{theme="warning"} 一旦离开此页面,客户端密钥将无法访问。请务必将其存储在安全位置。 :::

  1. 现在,您可以将客户端 ID 和密钥添加到推送工作流页面。

    添加客户端密钥

  2. 向下滚动到工作流的配置应用程序部分,然后选择管理应用程序以在您的 Foundry 平台设置中打开第三方应用程序管理页面。

    配置第三方应用程序

  3. 接下来,启用客户端凭证授权(Client credentials grant)设置。

    配置授权

  4. 最后,点击右上角的保存

第三部分:共享流

现在,我们需要与我们创建的应用程序共享流。

  1. 首先,返回到推送工作流页面。在工作流的使用您的新应用程序部分下,找到您生成的客户端 ID。

    客户端 ID

  2. 接下来,选择右上角的共享以打开屏幕右侧的流详细信息侧边栏的角色选项卡。

    共享第三方应用程序

  3. 将客户端 ID 复制并粘贴到角色搜索字段中,以找到您创建的应用程序。选择 + 搜索并选择 编辑者(Editor) 角色。

  4. 选择侧面板底部的保存以共享流。

  5. 返回到推送工作流并选择下一步

第四部分:测试流

现在将向您展示可用于将测试记录推送到流中的代码示例。代码示例根据您在前面的步骤中选择的语言而有所不同。在此示例中,我们使用 cURL。

:::callout{theme="neutral"} 在运行 cURL 命令之前,请确保安装 jq ↗ 用于命令行 JSON 解析。 :::

  1. 首先,复制第一个框中的命令。这将访问 Foundry 的 OAuth2 端点,为您提供一个可用于推送记录的访问令牌。

使用 cURL 推送记录

  1. 在您的 Mac、Windows 或 Linux 机器上的 bash 终端中执行该命令。

命令执行后,您将获得一个名为 $ACCESS_TOKEN 的可用变量。您将在下一个命令中使用此变量来推送记录。

第二个命令将使用 cURL 向 Foundry 端点发送一个包含我们要插入到流中的记录的 POST 请求。该命令预填充了一个要推送到流中的虚拟记录,但您可以在 HTTP 请求中提供任何符合流模式的数据。

  1. 将第二个命令复制并粘贴到您的终端中并执行。

如果命令成功,您将看到记录出现在流中。

<img alt="查看记录" src="./media/stream-view-records@2x.png">

要更改推送到流中的数据,请修改 POST 请求的 data 参数。

[{ \"value\": {\"sensor_id\":\"sensor4\",\"temperature\":4.132} }]

您还可以通过展开使用 JSON 测试卡片,从用户界面将测试记录发送到流中。

使用 JSON 测试

后续步骤

现在您已成功将数据推送到流中,您可以开始转换数据了。选择开始流水线处理以导航到 Pipeline Builder 应用程序,您将在其中构建流式处理流水线。了解如何转换数据,并了解有关 Pipeline Builder 中可用的不同转换的更多信息。