Amazon Kinesis¶
Connect Foundry to Amazon Kinesis to read data from a Kinesis stream into a Foundry stream in realtime.
Supported capabilities¶
| Capability | Status |
|---|---|
| Exploration | 🟢 Generally available |
| Streaming syncs | 🟢 Generally available |
| Streaming exports | 🟢 Generally available |
Data model¶
| partition_key (string) | data (string) | kinesis_ingestion_timestamp (timestamp) | foundry_ingestion_timestamp (timestamp) |
|---|---|---|---|
| London | {"firstName": "John", "lastName": "Doe"} | 2023-07-12T15:12:42.371Z | 2023-07-12T15:12:42.512Z |
| Paris | {"firstName": "Jean", "lastName": "DuPont"} | 2023-07-12T15:12:42.418Z | 2023-07-12T15:12:42.512Z |
The Kinesis connector parses message contents into unicode strings. Use a downstream streaming transform (for example, parse_json in Pipeline Builder) to parse structured data.
- The
partition_keycolumn will contain the partition key that was used to post the message to Kinesis. - The
kinesis_ingestion_timestampcolumn will contain the timestamp when the message was posted to Kinesis. - The
foundry_ingestion_timestampcolumn will contain the timestamp when the message was ingested by Foundry.
Performance and limitations¶
The connector always uses a single consumer thread per active shard on the source Kinesis stream.
Streaming syncs are meant to be consistent, long-running jobs. Any interruption to a streaming sync is a potential outage, depending on the expected outcomes.
Currently, streaming syncs have the following limitations:
- Jobs on a Foundry worker restart at least once every 48 hours. Expected downtime is single-digit minutes (assuming resource availability allows jobs to restart immediately).
- For legacy agent worker sources, jobs restart during agent maintenance windows (typically once a week) to pick up upgrades. Expected downtime is less than five minutes. For high availability, connect through two agents with non-overlapping maintenance windows.
Message ordering¶
The Kinesis connector guarantees message delivery order for messages with the same partition_key. Messages with different partition_key values may be processed in any order.
Setup¶
- Open the Data Connection application and select + New Source in the upper right corner of the screen.
- Select Kinesis from the available connector types.
- Follow the additional configuration prompts to continue the set up of your connector using the information in the sections below.
Learn more about setting up a connector in Foundry.
Connection settings¶
| Parameter | Required? | Default | Description |
|---|---|---|---|
| AWS Region | Yes | us-east-1 | The AWS region your Kinesis stream is in. |
Authentication¶
Select an authentication method for your Kinesis connection: AWS Instance or Static Credentials.
Below is a sample IAM policy with examples of the permissions required to read from and write to specified kinesis streams.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "ReadKinesisStream",
"Effect": "Allow",
"Action": [
"kinesis:ListShards",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:DescribeStream"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/read-stream-name"
},
{
"Sid": "WriteKinesisStream",
"Effect": "Allow",
"Action": [
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/write-stream-name"
}
]
}
AWS Instance¶
:::callout{theme="warning" title="Legacy"} AWS instance authentication is only available for legacy agent worker sources. Foundry worker sources cannot use AWS instance authentication; use IAM access keys instead. :::
When your Foundry agent is running on an AWS resource with a provisioned IAM role (e.g. an EC2 instance), the Kinesis connector will use the provisioned IAM role to connect to Kinesis streams. No additional configuration is required.
Static Credentials¶
Static Credentials refers to standard AWS authentication with an Access Key ID and Secret Access Key tied to an IAM user.
| Parameter | Required? | Default |
|---|---|---|
| Access Key ID | Yes | No |
| Secret Access Key | Yes | No |
STS Role¶
The Kinesis connector can optionally assume an STS role before connecting to a Kinesis stream. Refer to the AWS documentation ↗ for details about these parameters.
| Parameter | Required? | Default |
|---|---|---|
| Role ARN | Yes | No |
| Role session name | Yes | No |
| Role session duration | Yes | 900 |
| External ID | No | No |
Networking¶
The connector must have access to the AWS Kinesis API and optionally the AWS STS API if using an STS role.
- Kinesis API:
https://kinesis.<region>.amazonaws.com - STS API:
https://sts.<region>.amazonaws.com
Sync data from Kinesis¶
Learn how to set up a sync with Kinesis in the Set up a streaming sync tutorial.
Export data to Kinesis¶
The connector supports exporting to external Kinesis streams in Data Connection.
To export to Kinesis, first enable exports for your Kinesis connector. Then, create a new export.
Export configuration options¶
| Option | Required? | Default | Description |
|---|---|---|---|
Output stream ARN |
Yes | N/A | The ARN of the Kinesis stream to which you want to export. |
Partition column |
Yes | First String Column | The column that will be used to determine which shard a data record will belong to within the stream. This must be a string value, usually the primary key. Review the AWS documentation ↗ for more information. |
中文翻译¶
Amazon Kinesis¶
将 Foundry 连接到 Amazon Kinesis,以实时从 Kinesis 数据流(Kinesis stream)读取数据到 Foundry 数据流(Foundry stream)。
支持的功能¶
| 功能 | 状态 |
|---|---|
| 探索(Exploration) | 🟢 正式可用 |
| 流式同步(Streaming syncs) | 🟢 正式可用 |
| 流式导出(Streaming exports) | 🟢 正式可用 |
数据模型¶
| partition_key (字符串) | data (字符串) | kinesis_ingestion_timestamp (时间戳) | foundry_ingestion_timestamp (时间戳) |
|---|---|---|---|
| London | {"firstName": "John", "lastName": "Doe"} | 2023-07-12T15:12:42.371Z | 2023-07-12T15:12:42.512Z |
| Paris | {"firstName": "Jean", "lastName": "DuPont"} | 2023-07-12T15:12:42.418Z | 2023-07-12T15:12:42.512Z |
Kinesis 连接器(Kinesis connector)将消息内容解析为 Unicode 字符串。请使用下游的流式转换(streaming transform)(例如,Pipeline Builder 中的 parse_json)来解析结构化数据。
partition_key列将包含用于将消息发布到 Kinesis 的分区键(partition key)。kinesis_ingestion_timestamp列将包含消息发布到 Kinesis 时的时间戳。foundry_ingestion_timestamp列将包含消息被 Foundry 摄取时的时间戳。
性能与限制¶
该连接器始终为源 Kinesis 数据流(source Kinesis stream)上的每个活跃分片(active shard)使用单个消费者线程(consumer thread)。
流式同步旨在成为一致、长期运行的作业。对流式同步的任何中断都可能导致潜在的中断,具体取决于预期结果。
目前,流式同步存在以下限制:
- 在 Foundry 工作节点(Foundry worker) 上运行的作业至少每 48 小时重启一次。预期停机时间为个位数分钟(假设资源可用性允许作业立即重启)。
- 对于传统的 代理工作节点(agent worker) 源,作业会在代理维护窗口期间(通常每周一次)重启以获取升级。预期停机时间少于五分钟。为实现高可用性,请通过两个维护窗口不重叠的代理进行连接。
消息排序¶
Kinesis 连接器保证具有相同 partition_key 的消息按投递顺序处理。具有不同 partition_key 值的消息可能以任意顺序处理。
设置¶
- 打开 数据连接(Data Connection) 应用程序,并在屏幕右上角选择 + 新建源(+ New Source)。
- 从可用的连接器类型中选择 Kinesis。
- 按照附加配置提示,使用以下各节中的信息继续设置您的连接器。
了解更多关于在 Foundry 中设置连接器的信息。
连接设置¶
| 参数 | 是否必需? | 默认值 | 描述 |
|---|---|---|---|
| AWS 区域(AWS Region) | 是 | us-east-1 | 您的 Kinesis 数据流所在的 AWS 区域。 |
身份验证¶
为您的 Kinesis 连接选择一种身份验证方法:AWS 实例(AWS Instance) 或 静态凭据(Static Credentials)。
以下是一个示例 IAM 策略,包含从指定 Kinesis 数据流读取和写入所需权限的示例。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "ReadKinesisStream",
"Effect": "Allow",
"Action": [
"kinesis:ListShards",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:DescribeStream"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/read-stream-name"
},
{
"Sid": "WriteKinesisStream",
"Effect": "Allow",
"Action": [
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/write-stream-name"
}
]
}
AWS 实例(AWS Instance)¶
:::callout{theme="warning" title="传统(Legacy)"} AWS 实例身份验证仅适用于传统的代理工作节点(agent worker)源。Foundry 工作节点源不能使用 AWS 实例身份验证;请改用 IAM 访问密钥(IAM access keys)。 :::
当您的 Foundry 代理在配置了 IAM 角色(IAM role)的 AWS 资源(例如 EC2 实例)上运行时,Kinesis 连接器将使用配置的 IAM 角色连接到 Kinesis 数据流。无需额外配置。
静态凭据(Static Credentials)¶
静态凭据是指使用与 IAM 用户(IAM user)关联的访问密钥 ID(Access Key ID)和秘密访问密钥(Secret Access Key)进行标准 AWS 身份验证。
| 参数 | 是否必需? | 默认值 |
|---|---|---|
| 访问密钥 ID(Access Key ID) | 是 | 无 |
| 秘密访问密钥(Secret Access Key) | 是 | 无 |
STS 角色(STS Role)¶
Kinesis 连接器可以选择在连接到 Kinesis 数据流之前代入一个 STS 角色(STS role)。有关这些参数的详细信息,请参阅 AWS 文档 ↗。
| 参数 | 是否必需? | 默认值 |
|---|---|---|
| 角色 ARN(Role ARN) | 是 | 无 |
| 角色会话名称(Role session name) | 是 | 无 |
| 角色会话时长(Role session duration) | 是 | 900 |
| 外部 ID(External ID) | 否 | 无 |
网络¶
连接器必须能够访问 AWS Kinesis API,如果使用 STS 角色,则还需访问 AWS STS API。
- Kinesis API:
https://kinesis.<region>.amazonaws.com - STS API:
https://sts.<region>.amazonaws.com
从 Kinesis 同步数据¶
了解如何在设置流式同步(Set up a streaming sync)教程中设置与 Kinesis 的同步。
将数据导出到 Kinesis¶
该连接器支持在数据连接(Data Connection)中导出到外部 Kinesis 数据流。
要导出到 Kinesis,首先为您的 Kinesis 连接器启用导出(enable exports)。然后,创建一个新导出(create a new export)。
导出配置选项¶
| 选项 | 是否必需? | 默认值 | 描述 |
|---|---|---|---|
输出流 ARN(Output stream ARN) |
是 | 不适用 | 您要导出到的 Kinesis 数据流的 ARN。 |
分区列(Partition column) |
是 | 第一个字符串列 | 用于确定数据记录在流中属于哪个分片的列。该值必须为字符串,通常是主键。有关更多信息,请查阅 AWS 文档 ↗。 |