跳转至

Subscribe to changes in object sets via WebSocket(通过WebSocket订阅对象集(object set)的变更)

The TypeScript OSDK's .subscribe method uses the WebSocket protocol to stream object updates to the client. You can also directly establish a connection to our Object Set Watcher endpoint (located at /api/v2/ontologySubscriptions/ontologies/{ontology}/streamSubscriptions) to subscribe to an object set.

Endpoint

wss://{foundryUrl}/api/v2/ontologySubscriptions/ontologies/{ontology}/streamSubscriptions

Replace {foundryUrl} with your Foundry instance's URL (without the https:// prefix) and {ontology} with your ontology's API name.

Authentication

Connections are authenticated by passing "Bearer-{token}" as a sub-protocol on the WebSocket. Note that the WebSocket version of our bearer token authentication uses a - before the token instead of the space that our REST APIs use.

Message format

Communication occurs through JSON-serialized messages. The client sends subscription requests. The server responds with subscription confirmations or errors, as well as object updates for subscribed object sets.

Subscribing to an object set

Send a message to establish subscriptions:

{
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "requests": [
    {
      "objectSet": {
        "type": "base",
        "objectType": "Country"
      },
      "propertySet": ["population", "countryName"],
      "referenceSet": []
    }
  ]
}
Field Type Required Description
id UUID Yes A unique request ID to correlate with the response
requests array Yes List of object sets to subscribe to
requests[].objectSet object Yes The object set definition
requests[].propertySet string[] No Property API names to include in updates. If omitted, all properties are returned.
requests[].referenceSet string[] No Reference property API names to subscribe to (e.g., geotime series)
requests[].objectLoadingResponseOptions object No Optional response configuration
requests[].objectLoadingResponseOptions.shouldLoadObjectRids boolean No Whether to include object RIDs in responses (default: false). Including RIDs adds latency, so only enable this if necessary.

You can filter the object set using standard object set syntax:

{
  "id": "550e8400-e29b-41d4-a716-446655440001",
  "requests":[
    {
      "objectSet": {
        "type": "filter",
        "objectSet": {
          "type": "base",
          "objectType": "Country"
        },
        "where": {
          "type": "gte",
          "field": "population",
          "value": 1000000
        }
      },
      "propertySet": ["population"]
    }
  ]
}

Managing subscriptions

Sending a new subscription message updates your active subscriptions. The server compares your new request list against existing subscriptions. Matching subscriptions remain open, missing subscriptions are closed, and new subscriptions are opened.

To unsubscribe from all object sets, send an empty requests array:

{
  "id": "550e8400-e29b-41d4-a716-446655440002",
  "requests": []
}

Object set updates

The client will receive different message types throughout the subscription lifecycle.

subscribeResponses

Sent after receiving a subscribe request, containing a response for each requested subscription:

{
  "type": "subscribeResponses",
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "responses": [
    {
      "type": "success",
      "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
    }
  ]
}

A successful response includes a subscription ID that identifies updates for that subscription. Error responses include diagnostic information:

{
  "type": "subscribeResponses",
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "responses": [
    {
      "type": "error",
      "errors": [
        {
          "error": "INVALID_OBJECT_TYPE",
          "args": [{ "name": "objectType", "value": "InvalidType" }]
        }
      ]
    }
  ]
}

If you receive a subscription response with a type of "qos", the server is under heavy load. We recommend using exponential backoff with jitter to retry requests.

objectSetChanged

Sent when objects in your subscribed object set are added, updated, or removed:

{
  "type": "objectSetChanged",
  "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "updates": [
    {
      "type": "object",
      "object": {
        "__apiName": "Country",
        "__primaryKey": "US",
        "countryName": "United States",
        "population": 331000000
      },
      "state": "ADDED_OR_UPDATED"
    }
  ]
}
Field Description
object The updated object with requested properties
object.__apiName The object type API name
object.__primaryKey The object's primary key value
state Either "ADDED_OR_UPDATED" or "REMOVED"

When state is "REMOVED", the object was either deleted or no longer matches the object set filter.

For subscriptions with reference properties (like geotime series), you may receive reference updates:

{
  "type": "objectSetChanged",
  "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "updates": [
    {
      "type": "reference",
      "objectType": "Aircraft",
      "primaryKey": { "aircraftId": "AC-123" },
      "property": "currentLocation",
      "value": {
        "type": "geotimeSeriesValue",
        "position": { "type": "Point", "coordinates": [-122.4194, 37.7749] },
        "timestamp": "2024-01-15T10:30:00Z"
      }
    }
  ]
}

refreshObjectSet

Indicates that the server cannot provide incremental updates. You should reload the object set with a normal HTTP request to the standard load object set endpoint:

{
  "type": "refreshObjectSet",
  "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "objectType": "Country"
}

subscriptionClosed

Indicates that a subscription has been closed. This can occur for several reasons, such as errors:

{
  "type": "subscriptionClosed",
  "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "cause": {
    "type": "error",
    "error": "SUBSCRIPTION_MEMORY_LIMIT_EXCEEDED",
    "args": []
  }
}

Subscriptions are also closed when you remove a subscription from your request list:

{
  "type": "subscriptionClosed",
  "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "cause": {
    "type": "reason",
    "reason": "USER_CLOSED"
  }
}

Limitations

  • Object sets constructed using joins are not fully supported. The server watches the selected portion and sends notifications when it changes, but cannot guarantee completeness.
  • Subscriptions do not persist across server restarts. Implement reconnection logic in your client.
  • Memory limits apply per subscription. Subscriptions tracking large numbers of objects may be closed if limits are exceeded.
  • Some advanced full-text search filters are not supported for subscription filtering.
  • When subscribing to an interface object set, object updates are returned as their underlying object types. You must remap object properties to interface properties when processing updates.

Interfaces

To handle interface object set subscriptions, you need to fetch mappings from object property types to their corresponding interface property types. This can be done via the Load Object Type Full Metadata endpoint. This beta endpoint returns the full metadata for the requested object type, including interface property mappings for all implemented interfaces.

The object type API name is accessible via the updated object's "__apiName" key. For an interface with API name interfaceType and a Load Object Type Full Metadata response response, the returned interface mappings can be accessed via response["implementsInterfaces2"][interfaceType]["properties"]. This mapping is from object property type API name to the corresponding interface property type API name.

We recommend caching these property mappings for the lifetime of the subscription.

Python Example

Below is a sample Python script showing how to subscribe to a filtered object set and process updates. This example does not show how to remap interface properties.

# /// script
# requires-python = ">=3.10"
# dependencies = [
#     "websockets",
# ]
# ///
import asyncio
import json
import os
import uuid

from websockets.asyncio.client import connect

TOKEN = ...
FOUNDRY_URL = ...
ONTOLOGY_RID = ...


async def main() -> None:
    filtered_object_set = {
        "type": "filter",
        "objectSet": {
            "type": "base",
            "objectType": "Country",
        },
        "where": {
            "type": "gte",
            "field": "population",
            "value": 10000000,
        },
    }
    subscribe_msg = {
        "id": str(uuid.uuid4()),
        "requests": [
            {
                "objectSet": filtered_object_set,
                "propertySet": ["timeUntilNextFlight", "aircraftRegistration"],
            },
        ],
    }

    open_subscriptions: set[str] = set()

    async with connect(
        f"wss://{FOUNDRY_URL}/api/v2/ontologySubscriptions/ontologies/{ONTOLOGY_RID}/streamSubscriptions",
        subprotocols=[f"Bearer-{TOKEN}"],  # Bearer token authentication
    ) as websocket:
        await websocket.send(json.dumps(subscribe_msg))

        async for message in websocket:
            match json.loads(message):
                case {"type": "subscribeResponses", "responses": [*responses]}:
                    for response in responses:
                        match response:
                            case {"type": "success", "id": identifier}:
                                print(f"Subscribed: ID {identifier}")
                                open_subscriptions.add(identifier)
                            case {"type": "error", "errors": [*errors]}:
                                print(f"Subscription errors: {errors}")
                    if not any(response["type"] == "success" for response in responses):
                        print("All subscriptions failed. Exiting.")
                        return

                case {"type": "objectSetChanged", "updates": [*updates]}:
                    for update in updates:
                        match update:
                            case {"type": "object", "state": state, "object": {**obj}}:
                                print(f"{state}: {obj}")
                            case {
                                "type": "reference",
                                "property": prop,
                                "primaryKey": primary_key,
                            }:
                                print(f"Reference update: {primary_key} <- {prop}")

                case {"type": "refreshObjectSet", "objectType": object_type}:
                    print(f"Refresh required for object type: {object_type}")

                case {
                    "type": "subscriptionClosed",
                    "id": identifier,
                    "cause": {**cause},
                }:
                    print(f"Subscription {identifier} was closed: {cause}")
                    open_subscriptions.remove(identifier)
                    if not open_subscriptions:
                        print("All subscriptions closed. Exiting.")
                        return


if __name__ == "__main__":
    asyncio.run(main())

中文翻译


通过WebSocket订阅对象集(object set)的变更

TypeScript OSDK的.subscribe方法使用WebSocket协议将对象更新流式传输到客户端。您也可以直接连接我们的对象集观察器端点(Object Set Watcher endpoint)(地址为/api/v2/ontologySubscriptions/ontologies/{ontology}/streamSubscriptions)来订阅对象集。

端点(Endpoint)

wss://{foundryUrl}/api/v2/ontologySubscriptions/ontologies/{ontology}/streamSubscriptions
{foundryUrl}替换为您的Foundry实例URL(无需带https://前缀),将{ontology}替换为您的本体(Ontology)的API名称。

身份验证(Authentication)

连接通过在WebSocket中传入"Bearer-{token}"作为子协议(sub-protocol)进行身份验证。请注意,我们的持票令牌(Bearer Token)身份验证的WebSocket版本在令牌前使用-,而非REST API中使用的空格。

消息格式(Message format)

通信通过JSON序列化的消息进行。客户端发送订阅请求,服务器返回订阅确认或错误信息,以及已订阅对象集的对象更新。

订阅对象集

发送如下消息来建立订阅:

{
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "requests": [
    {
      "objectSet": {
        "type": "base",
        "objectType": "Country"
      },
      "propertySet": ["population", "countryName"],
      "referenceSet": []
    }
  ]
}

字段 类型 是否必填 说明
id UUID 用于关联请求和对应响应的唯一请求ID
requests 数组 要订阅的对象集列表
requests[].objectSet 对象 对象集定义
requests[].propertySet string[] 更新中要包含的属性API名称。如果省略该参数,将返回所有属性。
requests[].referenceSet string[] 要订阅的引用属性API名称(例如地理时间序列(geotime series))
requests[].objectLoadingResponseOptions 对象 可选的响应配置
requests[].objectLoadingResponseOptions.shouldLoadObjectRids 布尔值 是否在响应中包含对象RID(默认值:false)。包含RID会增加延迟,因此仅在必要时开启。

您可以使用标准对象集语法对对象集进行过滤:

{
  "id": "550e8400-e29b-41d4-a716-446655440001",
  "requests":[
    {
      "objectSet": {
        "type": "filter",
        "objectSet": {
          "type": "base",
          "objectType": "Country"
        },
        "where": {
          "type": "gte",
          "field": "population",
          "value": 1000000
        }
      },
      "propertySet": ["population"]
    }
  ]
}

管理订阅(Managing subscriptions)

发送新的订阅消息会更新您的活跃订阅。服务器会将您的新请求列表与现有订阅进行比对:匹配的订阅保持开启,请求中缺失的订阅将被关闭,新增的订阅将被开启。

要取消所有对象集的订阅,发送空的requests数组即可:

{
  "id": "550e8400-e29b-41d4-a716-446655440002",
  "requests": []
}

对象集更新

在订阅的整个生命周期中,客户端会收到不同类型的消息。

subscribeResponses

收到订阅请求后发送,包含每个请求的订阅响应:

{
  "type": "subscribeResponses",
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "responses": [
    {
      "type": "success",
      "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
    }
  ]
}
成功响应会包含一个订阅ID,用于标识该订阅对应的更新。错误响应会包含诊断信息:
{
  "type": "subscribeResponses",
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "responses": [
    {
      "type": "error",
      "errors": [
        {
          "error": "INVALID_OBJECT_TYPE",
          "args": [{ "name": "objectType", "value": "InvalidType" }]
        }
      ]
    }
  ]
}
如果您收到type"qos"的订阅响应,说明服务器负载过高。我们建议使用带抖动的指数退避算法重试请求。

objectSetChanged

当您订阅的对象集中的对象被添加、更新或移除时发送该消息:

{
  "type": "objectSetChanged",
  "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "updates": [
    {
      "type": "object",
      "object": {
        "__apiName": "Country",
        "__primaryKey": "US",
        "countryName": "United States",
        "population": 331000000
      },
      "state": "ADDED_OR_UPDATED"
    }
  ]
}

字段 说明
object 包含请求的属性的更新后对象
object.__apiName 对象类型的API名称
object.__primaryKey 对象的主键(primary key)值
state 取值为"ADDED_OR_UPDATED"(新增或更新)或"REMOVED"(已移除)

state"REMOVED"时,说明对象已被删除,或不再符合对象集的过滤条件。

对于带引用属性(如地理时间序列)的订阅,您可能会收到引用更新:

{
  "type": "objectSetChanged",
  "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "updates": [
    {
      "type": "reference",
      "objectType": "Aircraft",
      "primaryKey": { "aircraftId": "AC-123" },
      "property": "currentLocation",
      "value": {
        "type": "geotimeSeriesValue",
        "position": { "type": "Point", "coordinates": [-122.4194, 37.7749] },
        "timestamp": "2024-01-15T10:30:00Z"
      }
    }
  ]
}

refreshObjectSet

表示服务器无法提供增量更新。您需要向标准的加载对象集端点发送普通HTTP请求来重新加载对象集:

{
  "type": "refreshObjectSet",
  "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "objectType": "Country"
}

subscriptionClosed

表示订阅已被关闭,可能由多种原因触发,比如发生错误:

{
  "type": "subscriptionClosed",
  "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "cause": {
    "type": "error",
    "error": "SUBSCRIPTION_MEMORY_LIMIT_EXCEEDED",
    "args": []
  }
}
当您从请求列表中移除某个订阅时,该订阅也会被关闭:
{
  "type": "subscriptionClosed",
  "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
  "cause": {
    "type": "reason",
    "reason": "USER_CLOSED"
  }
}

限制(Limitations)

  • 使用连接(join)构建的对象集暂不支持完整订阅能力。服务器会监控选中的部分并在其变更时发送通知,但无法保证通知的完整性。
  • 订阅不会在服务器重启后保留,请在客户端实现重连逻辑。
  • 每个订阅都有内存上限。跟踪大量对象的订阅如果超出限制可能会被关闭。
  • 部分高级全文搜索过滤器不支持用于订阅过滤。
  • 订阅接口对象集(interface object set)时,对象更新会以其底层对象类型返回。您在处理更新时必须将对象属性重新映射为接口属性。

接口(Interfaces)

要处理接口对象集的订阅,您需要获取对象属性类型到对应接口属性类型的映射。您可以通过加载对象类型全量元数据端点实现。这个测试版(beta)端点会返回请求的对象类型的全量元数据,包括所有已实现接口的属性映射。

对象类型API名称可通过更新后对象的"__apiName"字段获取。对于API名称为interfaceType的接口,若加载对象类型全量元数据的响应为response,则返回的接口映射可通过response["implementsInterfaces2"][interfaceType]["properties"]获取。该映射的内容为对象属性类型API名称到对应接口属性类型API名称的映射。

我们建议在订阅的生命周期内缓存这些属性映射。

Python示例(Python Example)

以下是Python脚本示例,展示了如何订阅过滤后的对象集并处理更新。本示例没有展示接口属性的重映射逻辑。

# /// script
# requires-python = ">=3.10"
# dependencies = [
#     "websockets",
# ]
# ///
import asyncio
import json
import os
import uuid

from websockets.asyncio.client import connect

TOKEN = ...
FOUNDRY_URL = ...
ONTOLOGY_RID = ...


async def main() -> None:
    filtered_object_set = {
        "type": "filter",
        "objectSet": {
            "type": "base",
            "objectType": "Country",
        },
        "where": {
            "type": "gte",
            "field": "population",
            "value": 10000000,
        },
    }
    subscribe_msg = {
        "id": str(uuid.uuid4()),
        "requests": [
            {
                "objectSet": filtered_object_set,
                "propertySet": ["timeUntilNextFlight", "aircraftRegistration"],
            },
        ],
    }

    open_subscriptions: set[str] = set()

    async with connect(
        f"wss://{FOUNDRY_URL}/api/v2/ontologySubscriptions/ontologies/{ONTOLOGY_RID}/streamSubscriptions",
        subprotocols=[f"Bearer-{TOKEN}"],  # Bearer token authentication
    ) as websocket:
        await websocket.send(json.dumps(subscribe_msg))

        async for message in websocket:
            match json.loads(message):
                case {"type": "subscribeResponses", "responses": [*responses]}:
                    for response in responses:
                        match response:
                            case {"type": "success", "id": identifier}:
                                print(f"Subscribed: ID {identifier}")
                                open_subscriptions.add(identifier)
                            case {"type": "error", "errors": [*errors]}:
                                print(f"Subscription errors: {errors}")
                    if not any(response["type"] == "success" for response in responses):
                        print("All subscriptions failed. Exiting.")
                        return

                case {"type": "objectSetChanged", "updates": [*updates]}:
                    for update in updates:
                        match update:
                            case {"type": "object", "state": state, "object": {**obj}}:
                                print(f"{state}: {obj}")
                            case {
                                "type": "reference",
                                "property": prop,
                                "primaryKey": primary_key,
                            }:
                                print(f"Reference update: {primary_key} <- {prop}")

                case {"type": "refreshObjectSet", "objectType": object_type}:
                    print(f"Refresh required for object type: {object_type}")

                case {
                    "type": "subscriptionClosed",
                    "id": identifier,
                    "cause": {**cause},
                }:
                    print(f"Subscription {identifier} was closed: {cause}")
                    open_subscriptions.remove(identifier)
                    if not open_subscriptions:
                        print("All subscriptions closed. Exiting.")
                        return


if __name__ == "__main__":
    asyncio.run(main())