跳转至

Microsoft SQL Server

Connect Foundry to Microsoft SQL Server to read and sync data between SQL Server databases and Foundry.

Supported capabilities

Capability Status
Exploration 🟢 Generally available
Batch syncs 🟢 Generally available
Incremental 🟢 Generally available
Change data capture syncs 🟢 Generally available
Table Exports 🟢 Generally available

Setup

  1. Open the Data Connection application and select + New Source in the upper right corner of the screen.
  2. Select MS SQL Server from the available connector types.
  3. Follow the additional configuration prompts to continue the setup of your connector using the information in the sections below.

Learn more about setting up a connector in Foundry.

Authentication

You can authenticate with SQL Server in the following ways:

  1. Username and password: Provide a username and password. We recommend the use of service credentials rather than individual user credentials.
  2. Active Directory Msi*: This option will use the authentication=ActiveDirectoryMSI JDBC setting. An msiClientId may optionally be provided.
  3. Active Directory Password*: This option will use the authentication=ActiveDirectoryPassword JDBC setting. A username and password for an Active Directory user must be provided to use this setting.
  4. Active Directory Service Principal*: This option will use the authentication=ActiveDirectoryServicePrincipal JDBC setting. A principal ID (sometimes referred to as an application or client ID) must be specified, along with a secret for that principal ID.

For more information on these authentication modes, see the official documentation ↗. For all authentication options, ensure that the provided user and role has the necessary privileges on the target database, as well as permission to read from or write to the target table(s).

* Note that Azure Active Directory is now called Microsoft Entra ID ↗; however, the JDBC options on the SQL Server driver published by Microsoft retain the original names referring to Active Directory.

Networking

The Microsoft SQL Server connector requires network access to the SQL Server instance that you wish to connect to. SQL Server connections will normally use a hostname to connect on port 1433.

For SQL Server connections, the appropriate egress policies must be added when setting up the source in the Data Connection application.

For SQL Server instances hosted on a cloud service like Azure SQL or AWS RDS, you must add an egress policy for the hostname retrieved from your cloud provider’s console.

:::callout{theme="warning"} If Azure redirect mode is used, then egress policies for all resolved IP addresses must also be added. The resolved IP may occasionally change, and you must update the egress policies to allow the new IP. If your instance of SQL Server is hosted on Azure, for example, then you can find more information on public IP addresses for Azure SQL instances in the Azure SQL documentation ↗. :::

  • To find the hostname for your Azure SQL instance, navigate to the Settings > Properties page in the Azure portal, and look for the Server Name field. An example hostname-based egress policy for Azure SQL: <your-database-name>.database.windows.net (port 1433)
  • To find the resolved IP, you can run nslookup <your-database-name>.database.windows.net from the command line. The final result will be the IP address that this hostname resolves to in Azure. The following is an example IPv4-based egress policy for Azure SQL: x.x.x.x (port 1433). Azure SQL does load balancing across multiple hosts, so you may need to run the nslookup command several times and add all of the resolved IP addresses.

:::callout{theme="neutral"} If you are connecting to an Azure SQL instance from a Foundry instance also hosted in Azure, you will need to use the Proxy connection policy option. For traffic originating within Azure, the connection policy defaults to Redirect. Using the redirect option to connection for Azure-Azure connections would require configuring egress policies for all Azure SQL IP addresses on all ports in the range of 11000 to 11999. This is possible but not recommended as it is overly permissive. For details on Azure SQL connection policies, see the official Azure SQL documentation ↗. :::

Connection details

Option Required? Description
Host type Yes Specify how Foundry should connect with your SQL Server database.

Option 1: Hostname
Provide a hostname. This is the recommended option for all SQL Server connections, and should always be used when connecting to an Azure SQL ↗ instance.

Option 2: IPv4
Provide an IPv4 address. If you normally connect using an IPv4 address, either within a corporate network or over the Internet, you can use this option.

Option 3: IPv6
Provide an IPv6 address. Use this option if you normally connect using an IPv6 address.
Port Yes Specify a port to use when connecting. The default port for most SQL Server instances will be 1433. For more information on ports, see the official documentation for the version of SQL Server you are connecting to.
Database name Yes The name of the database you're connecting to within your instance of MS SQL Server.
Authentication Yes Configure using the Authentication guidance shown above.
Require encryption Yes Defaults to enabled. For more details, see Microsoft's documentation for the encrypt setting on the SQL Server JDBC driver:

Connection properties reference ↗
Encryption support examples ↗
Trust server certificate Yes Defaults to disabled. For more details, see Microsoft's documentation for the trustServerCertificate setting on the SQL Server JDBC driver:

Connection properties reference ↗
Network Connectivity Yes You must provide egress policies to allow connections to your MS SQL Server instance. Refer to the Networking section for more details.

Change data capture

The Microsoft SQL Server source supports change data capture syncs.

To enable change data capture for Microsoft SQL Server, you must run a command like the one below to enable CDC on the database.

USE <database>
GO
EXEC sys.sp_cdc_enable_db
GO

Then, run another command on each table that should be recording changelogs:

EXEC sys.sp_cdc_enable_table
    @source_schema = N'<schema>'
  , @source_name = N'<table_name>'
  , @role_name = NULL
  , @capture_instance = NULL
  , @supports_net_changes = 0
  , @filegroup_name = N'PRIMARY';
GO

Once change data capture is enabled for the table(s) you wish to sync to Foundry, you can navigate to the Overview page and select + Create CDC sync to start creating a new change data capture sync.

:::callout{theme="neutral"} The exploration runtime must be working in order to create a change data capture sync. If the runtime is still initializing, you may need to wait a few seconds and refresh the page to proceed with creating a change data capture sync. :::

For more information on these commands and using change data capture (CDC) with Microsoft SQL Server, see the official documentation ↗ for the version of SQL Server in use.

Change Data Capture permissioning

To successfully read CDC data, you will need to ensure you have provided sufficient permissions to the database user.

  • You can verify your permissions with the following query: SELECT HAS_PERMS_BY_NAME('cdc', 'SCHEMA', 'EXECUTE') AS HasExecutePermission;. The result will return 1 if True and 0 if False.
  • You can grant missing permissions by running the following query within the source system itself: GRANT EXECUTE ON SCHEMA::cdc TO <USER>;

Use Microsoft SQL Server sources in code

These examples demonstrate how to connect to a Microsoft SQL Server source using the pymssql Python package in an external transform.

The examples are based on a Fruits table created with the following schema:

CREATE TABLE Fruits (
    FruitName VARCHAR(50) PRIMARY KEY,
    Inventory INT NOT NULL,
    PricePerKg DECIMAL(10, 2) NOT NULL
);

Read from MSSQL with an external transform

This example reads data from the Fruits table, filtered to Inventory values below 60.

import logging
from pandas import DataFrame
from transforms.external.systems import ResolvedSource
from transforms.api import lightweight, Output, transform_pandas
from transforms.external.systems import external_systems, Source
import pymssql
import pandas as pd

logger = logging.getLogger(__name__)

@lightweight
@external_systems(
    mssql_source=Source("<source_rid>")
)
@transform_pandas(
    Output("<dataset_rid>")
)
def compute(mssql_source: ResolvedSource) -> DataFrame:
    # Inventory threshold parameter (this could also be read from an input DataFrame)
    INVENTORY_THRESHOLD = 60

    connection_parameters = {
        "server": "<your_server_name>",
        "database": "<your_database_name>",
        "port": "1433",
        "user": "<your_user_name>",
        "password": mssql_source.get_secret("MSSQL_BASIC_AUTH_PASSWORD"),
        "encryption": "require",
        "timeout": 30
    }
    try:
        with pymssql.connect(**connection_parameters) as connection:
            df = pd.read_sql(
                'SELECT * FROM Fruits WHERE Inventory < %s',
                connection,
                params=(INVENTORY_THRESHOLD,)
            )
    except Exception as e:
        logger.error(f"Error querying MSSQL Fruits table: {e}")
        raise RuntimeError(f"Failed to fetch Fruits data from MSSQL: {e}") from e
    return df

Write to MSSQL with an external transform

This example uses an input dataset to update the Fruits table. It returns a dataset summarizing the actions (update or insert) taken per FruitName.

import logging
from transforms.api import lightweight, Output, transform_pandas, Input
from transforms.external.systems import external_systems, Source
import pymssql
import pandas as pd

logger = logging.getLogger(__name__)

@lightweight
@external_systems(
    mssql_source=Source("<source_rid>")
)
@transform_pandas(
    Output("<dataset_rid>"),
    fruits_df=Input("<dataset_rid>") # DataFrame with schema [FruitName: String, Inventory: Integer, PricePerKg: Double]
)
def compute(mssql_source, fruits_df):
    # Connection parameters
    connection_parameters = {
        "server": "<your_server_name>",
        "database": "<your_database_name>",
        "port": "1433",
        "user": "<your_user_name>",
        "password": mssql_source.get_secret("MSSQL_BASIC_AUTH_PASSWORD"),
        "encryption": "require",
        "timeout": 30
    }
    results = []
    try:
        with pymssql.connect(**connection_parameters) as connection:
            with connection.cursor() as cursor:
                for _, row in fruits_df.iterrows():
                    fruit_name = row["FruitName"]
                    inventory = row["Inventory"]
                    price_per_kg = row["PricePerKg"]

                    # Try to update; if no row updated, then insert
                    update_sql = """
                        UPDATE Fruits
                        SET Inventory = %s, PricePerKg = %s
                        WHERE FruitName = %s
                    """
                    cursor.execute(update_sql, (inventory, price_per_kg, fruit_name))
                    if cursor.rowcount == 0:
                        insert_sql = """
                            INSERT INTO Fruits (FruitName, Inventory, PricePerKg)
                            VALUES (%s, %s, %s)
                        """
                        cursor.execute(insert_sql, (fruit_name, inventory, price_per_kg))
                        results.append({"FruitName": fruit_name, "action": "inserted"})
                    else:
                        results.append({"FruitName": fruit_name, "action": "updated"})
                connection.commit()
    except Exception as e:
        logger.error(f"Error updating MSSQL Fruits table: {e}")
        raise RuntimeError(f"Failed to update Fruits data in MSSQL: {e}") from e
    # Return a DataFrame summarizing the actions
    return pd.DataFrame(results, columns=["FruitName", "action"])

中文翻译

Microsoft SQL Server

将 Foundry 连接到 Microsoft SQL Server,以便在 SQL Server 数据库和 Foundry 之间读取和同步数据。

支持的功能

功能 状态
数据探索 🟢 正式发布
批量同步 🟢 正式发布
增量同步 🟢 正式发布
变更数据捕获同步 🟢 正式发布
表导出 🟢 正式发布

设置步骤

  1. 打开 数据连接 应用程序,点击屏幕右上角的 + 新建数据源
  2. 从可用的连接器类型中选择 MS SQL Server
  3. 按照后续配置提示,使用以下各节中的信息完成连接器的设置。

了解更多关于在 Foundry 中设置连接器的信息。

身份验证

您可以通过以下方式对 SQL Server 进行身份验证:

  1. 用户名和密码: 提供用户名和密码。我们建议使用服务凭据而非个人用户凭据。
  2. Active Directory Msi*:此选项将使用 authentication=ActiveDirectoryMSI JDBC 设置。可选择提供 msiClientId
  3. Active Directory 密码*:此选项将使用 authentication=ActiveDirectoryPassword JDBC 设置。必须提供 Active Directory 用户的用户名和密码才能使用此设置。
  4. Active Directory 服务主体*:此选项将使用 authentication=ActiveDirectoryServicePrincipal JDBC 设置。必须指定主体 ID(有时称为应用程序或客户端 ID)以及该主体 ID 的密钥。

有关这些身份验证模式的更多信息,请参阅官方文档 ↗。对于所有身份验证选项,请确保所提供的用户和角色在目标数据库上拥有必要的权限,以及读取或写入目标表的权限。

* 请注意,Azure Active Directory 现已更名为 Microsoft Entra ID ↗;但 Microsoft 发布的 SQL Server 驱动程序上的 JDBC 选项仍保留引用 Active Directory 的原始名称。

网络连接

Microsoft SQL Server 连接器需要能够通过网络访问您要连接的 SQL Server 实例。SQL Server 连接通常使用主机名通过端口 1433 进行连接。

对于 SQL Server 连接,在数据连接应用程序中设置数据源时,必须添加相应的出站策略

对于托管在云服务(如 Azure SQL 或 AWS RDS)上的 SQL Server 实例,您必须为从云提供商控制台获取的主机名添加出站策略。

:::callout{theme="warning"} 如果使用 Azure 重定向模式,则还必须为所有已解析的 IP 地址添加出站策略。已解析的 IP 可能会偶尔更改,您必须更新出站策略以允许新的 IP。例如,如果您的 SQL Server 实例托管在 Azure 上,您可以在 Azure SQL 文档 ↗ 中找到有关 Azure SQL 实例公共 IP 地址的更多信息。 :::

  • 要查找 Azure SQL 实例的主机名,请导航到 Azure 门户中的 设置 > 属性 页面,查找 服务器名称 字段。Azure SQL 的基于主机名的出站策略示例:<您的数据库名称>.database.windows.net (端口 1433)
  • 要查找已解析的 IP,您可以在命令行中运行 nslookup <您的数据库名称>.database.windows.net。最终结果将是该主机名在 Azure 中解析到的 IP 地址。以下是 Azure SQL 的基于 IPv4 的出站策略示例:x.x.x.x (端口 1433)。Azure SQL 在多个主机之间进行负载均衡,因此您可能需要多次运行 nslookup 命令并添加所有已解析的 IP 地址。

:::callout{theme="neutral"} 如果您要从同样托管在 Azure 中的 Foundry 实例连接到 Azure SQL 实例,则需要使用 代理 连接策略选项。对于源自 Azure 内部的流量,连接策略默认为 重定向。使用重定向选项进行 Azure 到 Azure 的连接需要为所有 Azure SQL IP 地址配置端口范围 11000 到 11999 的出站策略。这虽然可行,但由于过于宽松而不推荐。有关 Azure SQL 连接策略的详细信息,请参阅官方 Azure SQL 文档 ↗。 :::

连接详情

选项 是否必填 描述
主机类型 指定 Foundry 应如何连接到您的 SQL Server 数据库。

选项 1:主机名
提供主机名。这是所有 SQL Server 连接的推荐选项,在连接到 Azure SQL ↗ 实例时应始终使用。

选项 2:IPv4
提供 IPv4 地址。如果您通常通过 IPv4 地址连接(无论是在公司网络内还是通过互联网),可以使用此选项。

选项 3:IPv6
提供 IPv6 地址。如果您通常通过 IPv6 地址连接,请使用此选项。
端口 指定连接时使用的端口。大多数 SQL Server 实例的默认端口为 1433。有关端口的更多信息,请参阅您所连接 SQL Server 版本的官方文档。
数据库名称 您在 MS SQL Server 实例中要连接的数据库名称。
身份验证 使用上方身份验证指南进行配置。
需要加密 默认为启用。有关更多详细信息,请参阅 Microsoft 关于 SQL Server JDBC 驱动程序中 encrypt 设置的文档:

连接属性参考 ↗
加密支持示例 ↗
信任服务器证书 默认为禁用。有关更多详细信息,请参阅 Microsoft 关于 SQL Server JDBC 驱动程序中 trustServerCertificate 设置的文档:

连接属性参考 ↗
网络连接 您必须提供出站策略以允许连接到您的 MS SQL Server 实例。有关更多详细信息,请参阅网络连接部分。

变更数据捕获

Microsoft SQL Server 数据源支持变更数据捕获同步。

要为 Microsoft SQL Server 启用变更数据捕获,您必须运行类似下面的命令来在数据库上启用 CDC。

USE <数据库>
GO
EXEC sys.sp_cdc_enable_db
GO

然后,在每个需要记录变更日志的表上运行另一个命令:

EXEC sys.sp_cdc_enable_table
    @source_schema = N'<模式>'
  , @source_name = N'<表名>'
  , @role_name = NULL
  , @capture_instance = NULL
  , @supports_net_changes = 0
  , @filegroup_name = N'PRIMARY';
GO

一旦为您要同步到 Foundry 的表启用了变更数据捕获,您可以导航到 概览 页面并选择 + 创建 CDC 同步 来开始创建新的变更数据捕获同步。

:::callout{theme="neutral"} 必须确保探索运行时正在运行才能创建变更数据捕获同步。如果运行时仍在初始化,您可能需要等待几秒钟并刷新页面才能继续创建变更数据捕获同步。 :::

有关这些命令以及使用 Microsoft SQL Server 变更数据捕获 (CDC) 的更多信息,请参阅所使用 SQL Server 版本的官方文档 ↗

变更数据捕获权限设置

要成功读取 CDC 数据,您需要确保已为数据库用户授予了足够的权限。

  • 您可以使用以下查询验证您的权限:SELECT HAS_PERMS_BY_NAME('cdc', 'SCHEMA', 'EXECUTE') AS HasExecutePermission;。如果结果为 True,则返回 1;如果为 False,则返回 0。
  • 您可以通过在源系统内运行以下查询来授予缺失的权限:GRANT EXECUTE ON SCHEMA::cdc TO <用户>;

在代码中使用 Microsoft SQL Server 数据源

以下示例演示了如何在外部转换中使用 pymssql Python 包连接到 Microsoft SQL Server 数据源。

这些示例基于使用以下模式创建的 Fruits 表:

CREATE TABLE Fruits (
    FruitName VARCHAR(50) PRIMARY KEY,
    Inventory INT NOT NULL,
    PricePerKg DECIMAL(10, 2) NOT NULL
);

使用外部转换从 MSSQL 读取数据

此示例从 Fruits 表中读取数据,并过滤出 Inventory 值低于 60 的记录。

import logging
from pandas import DataFrame
from transforms.external.systems import ResolvedSource
from transforms.api import lightweight, Output, transform_pandas
from transforms.external.systems import external_systems, Source
import pymssql
import pandas as pd

logger = logging.getLogger(__name__)

@lightweight
@external_systems(
    mssql_source=Source("<source_rid>")
)
@transform_pandas(
    Output("<dataset_rid>")
)
def compute(mssql_source: ResolvedSource) -> DataFrame:
    # 库存阈值参数(也可以从输入 DataFrame 中读取)
    INVENTORY_THRESHOLD = 60

    connection_parameters = {
        "server": "<您的服务器名称>",
        "database": "<您的数据库名称>",
        "port": "1433",
        "user": "<您的用户名>",
        "password": mssql_source.get_secret("MSSQL_BASIC_AUTH_PASSWORD"),
        "encryption": "require",
        "timeout": 30
    }
    try:
        with pymssql.connect(**connection_parameters) as connection:
            df = pd.read_sql(
                'SELECT * FROM Fruits WHERE Inventory < %s',
                connection,
                params=(INVENTORY_THRESHOLD,)
            )
    except Exception as e:
        logger.error(f"查询 MSSQL Fruits 表时出错: {e}")
        raise RuntimeError(f"从 MSSQL 获取 Fruits 数据失败: {e}") from e
    return df

使用外部转换向 MSSQL 写入数据

此示例使用输入数据集更新 Fruits 表。它返回一个数据集,汇总每个 FruitName 所执行的操作(更新或插入)。

import logging
from transforms.api import lightweight, Output, transform_pandas, Input
from transforms.external.systems import external_systems, Source
import pymssql
import pandas as pd

logger = logging.getLogger(__name__)

@lightweight
@external_systems(
    mssql_source=Source("<source_rid>")
)
@transform_pandas(
    Output("<dataset_rid>"),
    fruits_df=Input("<dataset_rid>") # DataFrame 模式为 [FruitName: String, Inventory: Integer, PricePerKg: Double]
)
def compute(mssql_source, fruits_df):
    # 连接参数
    connection_parameters = {
        "server": "<您的服务器名称>",
        "database": "<您的数据库名称>",
        "port": "1433",
        "user": "<您的用户名>",
        "password": mssql_source.get_secret("MSSQL_BASIC_AUTH_PASSWORD"),
        "encryption": "require",
        "timeout": 30
    }
    results = []
    try:
        with pymssql.connect(**connection_parameters) as connection:
            with connection.cursor() as cursor:
                for _, row in fruits_df.iterrows():
                    fruit_name = row["FruitName"]
                    inventory = row["Inventory"]
                    price_per_kg = row["PricePerKg"]

                    # 尝试更新;如果没有行被更新,则执行插入
                    update_sql = """
                        UPDATE Fruits
                        SET Inventory = %s, PricePerKg = %s
                        WHERE FruitName = %s
                    """
                    cursor.execute(update_sql, (inventory, price_per_kg, fruit_name))
                    if cursor.rowcount == 0:
                        insert_sql = """
                            INSERT INTO Fruits (FruitName, Inventory, PricePerKg)
                            VALUES (%s, %s, %s)
                        """
                        cursor.execute(insert_sql, (fruit_name, inventory, price_per_kg))
                        results.append({"FruitName": fruit_name, "action": "inserted"})
                    else:
                        results.append({"FruitName": fruit_name, "action": "updated"})
                connection.commit()
    except Exception as e:
        logger.error(f"更新 MSSQL Fruits 表时出错: {e}")
        raise RuntimeError(f"在 MSSQL 中更新 Fruits 数据失败: {e}") from e
    # 返回一个汇总操作的 DataFrame
    return pd.DataFrame(results, columns=["FruitName", "action"])