跳转至

Local environment(本地环境)

Python

Add resource to project

How can I add a resource to a project using an API call?

This code uses the requests library to send an HTTP POST request to the Compass API, importing a reference to an artifact into a specified project. It includes error handling, logging, and optional proxy configuration.

from requests.adapters import HTTPAdapter
import requests
from urllib3 import Retry
import logging
import json

'''
Import a reference to an artifact as a reference to the project specified
'''

# Headers
headers = {
    'Authorization': 'Bearer xxx',  # Replace 'xxx' with your bearer token
    'Content-Type': 'application/json',
}

# Host
host = 'subdomain.domain.extension:port'

# Proxies
proxyDict = {
    'https': 'protocol://subdomain.domain.extension:port'
}

# Retries
retry = Retry(connect=1, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
http = requests.Session()
http.mount('https://', adapter)

# Set the level of logging to be shown
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler("debug.log"),
        logging.StreamHandler()
    ]
)

###############
## VARIABLES ##
###############
RESSOURCE_TO_ADD = "ri......"
PROJECT_TO_ADD_TO = "ri.compass.main.folder.xxxx-xxx-xxx-xxx-xxxx"

# Throw an error if request fails in some way with information about it
try:
    print(f'Beginning script for ...')

    # Data
    source_data = {
        "requests":
        [
            {"resourceRid": f"{RESSOURCE_TO_ADD}"}
        ]
    }

    # Serialize the JSON
    data = json.dumps(source_data)
    response = http.post(f'https://{host}/compass/api/projects/imports/{PROJECT_TO_ADD_TO}/import',
                         data=data,
                         headers=headers,
                         # Uncomment if a proxy is required
                         # proxies=proxyDict
                         )

    print('Completed request')
    print(f'The result of the script is ...')
    raw_response = response.text
    print(raw_response)
    print(response.status_code)

except requests.exceptions.RequestException as e:
    raise Exception(
        f"An error occurred in the request.\nIt failed due to: {response.status_code} - {response.text}\nException: {e}")
  • Date submitted: 2024-03-26
  • Tags: API, python, compass

Dataset row count

How can I calculate in bulk the number of rows in many datasets?

This code uses the Foundry API to trigger row count computation for a list of dataset RIDs. It sends a POST request to the Foundry Stats API with the dataset RID and branch as parameters.

from shutil import ExecError
from wsgiref import headers
import requests
from urllib3 import Retry
import json
import pprint

'''
Script will trigger row count computation on the set of provided dataset rids
'''

# Base variables
base_url = "https://STACK_NAME.palantircloud.com"
branch = "master"

DATASETS_RIDS = [
    "ri.foundry.main.dataset.6d2cd3de-0052-xxxxx-c7ae2c4ab1d8"
]

headers = {
    'Authorization': 'Bearer eyg_PUT_YOUR_TOKEN_HERE_xxxx',
    'Content-Type': 'application/json'
}

# Proxies
proxyDict = {
    # "https": "https://proxyIfNeeded:port"
}

# Retries
retry = Retry(connect=1, backoff_factor=0.5)
adapter = requests.adapters.HTTPAdapter(max_retries=retry)
http = requests.Session()
http.mount("https://", adapter)

def trigger_row_count(dataset_rid, branch):
    response = http.post(f'{base_url}/foundry-stats/api/stats/datasets/{dataset_rid}/branches/{branch}', headers=headers,
                         proxies=proxyDict)
    raw_response = response.text
    curr_response = json.loads(raw_response)
    pprint.pprint(curr_response)

    return curr_response

for curr_dataset_rid in DATASETS_RIDS :
    trigger_row_count(curr_dataset_rid, branch)
  • Date submitted: 2024-03-26
  • Tags: export, python, metrics, metadata, local

Get superset of columns across datasets

How can I get the set of all columns across multiple datasets?

This code uses the requests library to fetch the schema of each dataset in a list of target datasets, and then iterates through the fields in the schema to create a dictionary containing the frequency of each column in the superset of columns.

import time

from requests.packages.urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
import requests
import json
import pprint
import logging
import datetime
import collections


'''
Script that generates the superset of columns with their frequency from a set of datasets
'''

headers = {
    'Authorization': 'Bearer eyg_PUT_YOUR_TOKEN_HERE_xxxx',
    'Content-Type': 'application/json',
}

## STACK_NAME
base_url = "STACK_NAME.palantircloud.com"
branch = "master"


target_datasets = ["ri.foundry.main.dataset.4c2ac089-xxxx-4df863eaf823"]

# Proxies
proxyDict = {
    #"https": "https://proxyIfNeeded:port"
}

# Retries
retry = Retry(connect=1, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
http = requests.Session()
http.mount("https://", adapter)


global_list_fields = {}

for curr_dataset in target_datasets :
    # Get schema of the dataset
    print(f"Step 1. Get Schema of dataset")
    response = http.get(f'{base_url}/foundry-metadata/api/schemas/datasets/{curr_dataset}/branches/{branch}', headers=headers, proxies=proxyDict)
    print(f"Step 1. Response of getting schema of dataset")
    raw_response = response.text
    print(raw_response)
    curr_schema = json.loads(raw_response)
    list_fields = curr_schema["schema"]["fieldSchemaList"]

    for field in list_fields:
        curr_key = f"{field['name']} - {field['type']}"
        # Increment counter
        global_list_fields[curr_key] = global_list_fields.get(curr_key, 0) + 1

print("Unsorted dict")
pprint.pprint(global_list_fields)

# Sort it
sorted_dict = {k: v for k, v in sorted(global_list_fields.items(), key=lambda item: item[1])}
print("Sorted dict")
pprint.pprint(sorted_dict)
  • Date submitted: 2024-03-26
  • Tags: python, API, metadata, code repositories, code authoring, local

OSS direct call

How do I perform aggregations on object sets using the Object Set Service (OSS)?

This code demonstrates how to make direct calls to the Object Set Service (OSS) to perform aggregations on object sets. It is useful for debugging or understanding the data returned by OSS, which is used by other services like OSDK and frontends.

from requests.adapters import HTTPAdapter
import requests
from urllib3 import Retry
import logging
import json
import pprint

'''
Direct calls to object-set-service (OSS) to perform aggregations, etc.
This is not intended for "actual use" in production, but can be useful for debugging 
or going one layer deeper, for instance to debug or understand what is actually returned by OSS.
OSS is used under the hood by other services, like OSDK and frontends.
'''

# Headers
headers = {
    'Authorization': 'Bearer xxx', # Replace 'xxx' with your bearer token
    'Content-Type': 'application/json',
}

# Host
host = 'subdomain.domain.extension:port'

# Proxies
proxyDict = {
    'https': 'protocol://subdomain.domain.extension:port'
}

# Retries
retry = Retry(connect=1, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
http = requests.Session()
http.mount('https://', adapter)

# Set the level of logging to be shown
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler("debug.log"),
        logging.StreamHandler()
    ]
    )

###############
## VARIABLES ##
###############


# Throw an error if request fails in some way with information about it
try:
    print(f'Beginning script for ...')

    # Data - Example payload to OSS
    data = {
        "executionMode":"PREFER_ACCURACY", # OSS will default to "PREFER_SPEED" which can provide inaccurate results
            "objectSet": {
                "base": {
                    "objectTypeId": "af-20m-instances-obv2" # The object instance id
                },
                "type": "base"
            },
            "aggregation": {
                "metrics": {},
                "subAggregations": {
                    "test": {
                        "type": "metrics",
                        "metrics": {
                            "dimension": {
                                "type": "propertyValue",
                                "propertyValue": {
                                    "propertyId": "example_bucket", # The property of the object to aggregate by
                                    "bucketing": {
                                        "type": "exactValue",
                                        "exactValue": {
                                            "maxBuckets": 10 # The number of buckets the response should contain
                                        }
                                    }
                                }
                            },
                            "ordering": [
                                {
                                    "type": "valueOrdering",
                                    "valueOrdering": {
                                        "direction": "DESCENDING",
                                        "metricName": "countM"
                                    }
                                }
                            ],
                            "metrics": {
                                "countM": {
                                    "type": "count",
                                    "count": {}
                                }
                            }
                        }
                    }
                }
            }
        }



    # Serialize the JSON
    data = json.dumps(source_data)

    response = http.put(f'https://{host}/object-set-service/api/aggregate',
                         data=data,
                         headers=headers, 
                         # Uncomment if a proxy is required
                         # proxies=proxyDict  
                        )

    print('Completed request')
    print(f'The result of the script is ...')
    pprint.pprint(response.json())

except requests.exceptions.RequestException as e:
    raise Exception(f"An error occurred in the request.\nIt failed due to: {response.status_code} - {response.text}\nException: {e}")
  • Date submitted: 2024-03-26
  • Tags: ontology, aggregation, objects, python, API, local

Ping Foundry: No token required

How can I ping my Foundry instance or account without an authentication token?

This code demonstrates how to use the Python requests library to send a ping request to a Palantir Cloud Stack with a specified proxy and retry settings.

from requests.packages.urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
import requests

headers = {
    # On principle : no token required ! 'Authorization': 'Bearer eyg_PUT_YOUR_TOKEN_HERE_xxxx',
    'Content-Type': 'application/json',
}

## STACK
base_url = "https://STACK_NAME.palantircloud.com"

# Proxies
proxyDict = {
    "https": "http://proxy.host.com:3333"
}

# Retries
retry = Retry(connect=1, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
http = requests.Session()
http.mount("https://", adapter)


print("Pinging ... ")
response = http.get(f"{base_url}/compass/api/ping", headers=headers, proxies=proxyDict)
print("Ping response :")
raw_response = response.text
print(raw_response)
  • Date submitted: 2024-03-26
  • Tags: API, python, compass, local

Get the path for a given resource RID

How can I find the path of a resource from its RID?

This code uses the requests library to send an HTTP GET request to the specified host with the given RID, and retrieves the path of the resource. It also handles retries and proxies.

from requests.adapters import HTTPAdapter
import requests
from urllib3 import Retry

'''
Script to return the path of a given Resource IDentifier (RID).
'''

# Headers
headers = {
    'Authorization': 'Bearer xxx', # Replace 'xxx' with your bearer token
    'Content-Type': 'application/json',
}

# Host
host = 'host.com:443'

# Proxies
proxyDict = {
    'https': 'http://proxy.domain.com:3333'
}

# Retries
retry = Retry(connect=1, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
http = requests.Session()
http.mount('https://', adapter)

# Enter the rid of the resource you want the path of
RESOURCE_RID = ''

# Throw an error if the reference has failed to be created
try:
    print(f'Fetching path for rid {RESOURCE_RID} ...')
    response = http.get(f'https://{host}/compass/api/resources/{RESOURCE_RID}/path-json', headers=headers, proxies=proxyDict)
    print('Completed request')
    print(f'The path is: {response.text}')
except requests.exceptions.RequestException as e:
    raise Exception(f"An error occurred in the request.\nReturning the path for the repository: {RESOURCE_RID} failed due to: {response.status_code} - {response.text}\nException: {e}")
  • Date submitted: 2024-03-26
  • Tags: api, python, metadata, local

Trigger action using API

How can I manually trigger an action on an object?

This code uses the requests library to send HTTP requests to the actions API, iterating over a list of IDs and triggering an action for each ID with custom parameters.

from requests.packages.urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
import pprint
import uuid
import requests
import json
import time

'''
Script that will trigger an action.
'''

headers = {
    'Authorization': 'Bearer eyg_PUT_YOUR_TOKEN_HERE_xxxx',
    'Content-Type': 'application/json'
}

# Name of the stack
STACK = "STACK_NAME.palantircloud.com"
# List of ids, could be any list of parameters you want to iterate on
list_ids = ["123", "456"]

# Proxies
proxyDict = {
    "https": "https://proxyIfNeeded:port"
}

# Retries
retry = Retry(connect=1, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
http = requests.Session()
http.mount("https://", adapter)

# Iterate over the list of ids and trigger one action per ID.
for curr_id in list_ids:
    curr_uuid = "GENERATED-OBJECT-" + str(uuid.uuid4()) # To generate a uuid
    curr_title = "GENERATED-" + str(time.time()) # To generate a timestamp
    user_rid = "xxxxxx-xxxx-xxxx-xxxx-xxxxxxxxx" # if a user rid is needed

    try:
        # Generate the payload of the action. Look at a network tab from slate/Workshop to obtain it - or build it from scratch.
        payload = r'{"actionTypeRid":"ri.actions.main.action-type.xxxxx-xxxx-xxxx-xxxxxxxx",' \
                  r'"parameters":{"ri.actions.main.parameter.xxxxx-xxxx-xxxx-xxxxxxxx":{"timestamp":"2021-09-30T23:59:59+02:00","type":"timestamp"},' \
                  r'"ri.actions.main.parameter.xxxxx-xxxx-xxxx-xxxxxxxx":{"stringList":{"strings":["mystring1","mystring2"]},"type":"stringList"},' \
                  r'"ri.actions.main.parameter.xxxxx-xxxx-xxxx-xxxxxxxx":{"string":"my_string","type":"string"},' \
                  r'"ri.actions.main.parameter.xxxxx-xxxx-xxxx-xxxxxxxx":{"timestamp":"2022-01-01:23:26+00:00","type":"timestamp"}}'

        response = http.post(f'https://{STACK}/actions/api/actions', headers=headers, proxies=proxyDict, data=payload)
        print(f"Raw response of action call with : {curr_id}\r\n")
        raw_response = response.json()
        pprint.pprint(raw_response, indent=4)

    except Exception as e:
        print(e)
  • Date submitted: 2024-03-26
  • Tags: action, objects, ontology, actions on objects, python, api, local

Upload local file to dataset

What API can I use to upload a local file to a dataset in Foundry?

This code uses the Foundry API to upload a file to a specified dataset. It sets up headers, host, proxies, and retries for the request, and then reads the file and sends it as a POST request to the dataset's files:upload endpoint.

from requests.adapters import HTTPAdapter
import requests
from urllib3 import Retry
import logging
import json

# Headers
headers = {
    'Authorization': 'Bearer xxx',  # Replace 'xxx' with your bearer token
    'Content-type': 'application/octet-stream', ### IMPORTANT !
}

# Host
host = 'subdomain.domain.extension:port'

# Proxies
proxyDict = {
    'https': 'protocol://subdomain.domain.extension:port'
}

# Retries
retry = Retry(connect=1, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
http = requests.Session()
http.mount('https://', adapter)

# Set the level of logging to be shown
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler("debug.log"),
        logging.StreamHandler()
    ]
)

###############
## VARIABLES ##
###############

# Dataset RID to upload to
datasetRid = "rid.123..."

params = {
    'filePath': 'folder_name/my-file.csv',
}

# Throw an error if request fails in some way with information about it
try:
    print(f'Beginning script for one-call upload')

    # Data
    with open('./data_example_file.csv') as f:
        data = f.read().replace('\n', '').replace('\r', '').encode()

    response = http.post(f'https://{host}/api/v1/datasets/{datasetRid}/files:upload',
                        params=params,
                         data=data,
                         headers=headers,
                         # Uncomment if a proxy is required
                         # proxies=proxyDict
                         )

    print('Completed request')
    print(f'The result of the script is {response.status_code} - {response.text}')

except requests.exceptions.RequestException as e:
    raise Exception(
        f"An error occurred in the request.\nIt failed due to: {response.status_code} - {response.text}\nException: {e}")
  • Date submitted: 2024-04-04
  • Tags: API, file upload, dataset, python, csv, local

中文翻译


本地环境

Python

向项目添加资源

如何通过 API 调用向项目添加资源?

以下代码使用 requests 库向 Compass API 发送 HTTP POST 请求,将某个制品的引用导入到指定项目中。代码包含错误处理、日志记录和可选的代理配置。

from requests.adapters import HTTPAdapter
import requests
from urllib3 import Retry
import logging
import json

'''
将制品的引用作为引用导入到指定项目中
'''

# 请求头
headers = {
    'Authorization': 'Bearer xxx',  # 将 'xxx' 替换为你的 bearer token
    'Content-Type': 'application/json',
}

# 主机
host = 'subdomain.domain.extension:port'

# 代理
proxyDict = {
    'https': 'protocol://subdomain.domain.extension:port'
}

# 重试机制
retry = Retry(connect=1, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
http = requests.Session()
http.mount('https://', adapter)

# 设置日志显示级别
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler("debug.log"),
        logging.StreamHandler()
    ]
)

###############
## 变量定义  ##
###############
RESSOURCE_TO_ADD = "ri......"
PROJECT_TO_ADD_TO = "ri.compass.main.folder.xxxx-xxx-xxx-xxx-xxxx"

# 如果请求以某种方式失败,则抛出包含详细信息的错误
try:
    print(f'开始执行脚本...')

    # 数据
    source_data = {
        "requests":
        [
            {"resourceRid": f"{RESSOURCE_TO_ADD}"}
        ]
    }

    # 序列化 JSON
    data = json.dumps(source_data)
    response = http.post(f'https://{host}/compass/api/projects/imports/{PROJECT_TO_ADD_TO}/import',
                         data=data,
                         headers=headers,
                         # 如果需要代理,取消注释
                         # proxies=proxyDict
                         )

    print('请求已完成')
    print(f'脚本执行结果为...')
    raw_response = response.text
    print(raw_response)
    print(response.status_code)

except requests.exceptions.RequestException as e:
    raise Exception(
        f"请求过程中发生错误。\n失败原因:{response.status_code} - {response.text}\n异常信息:{e}")
  • 提交日期:2024-03-26
  • 标签:APIpythoncompass

数据集行数统计

如何批量计算多个数据集的行数?

以下代码使用 Foundry API 触发对一组数据集 RID 的行数计算。它向 Foundry Stats API 发送 POST 请求,并将数据集 RID 和分支作为参数传入。

from shutil import ExecError
from wsgiref import headers
import requests
from urllib3 import Retry
import json
import pprint

'''
脚本将触发对提供的多个数据集 rid 的行数计算
'''

# 基础变量
base_url = "https://STACK_NAME.palantircloud.com"
branch = "master"

DATASETS_RIDS = [
    "ri.foundry.main.dataset.6d2cd3de-0052-xxxxx-c7ae2c4ab1d8"
]

headers = {
    'Authorization': 'Bearer eyg_PUT_YOUR_TOKEN_HERE_xxxx',
    'Content-Type': 'application/json'
}

# 代理
proxyDict = {
    # "https": "https://proxyIfNeeded:port"
}

# 重试机制
retry = Retry(connect=1, backoff_factor=0.5)
adapter = requests.adapters.HTTPAdapter(max_retries=retry)
http = requests.Session()
http.mount("https://", adapter)

def trigger_row_count(dataset_rid, branch):
    response = http.post(f'{base_url}/foundry-stats/api/stats/datasets/{dataset_rid}/branches/{branch}', headers=headers,
                         proxies=proxyDict)
    raw_response = response.text
    curr_response = json.loads(raw_response)
    pprint.pprint(curr_response)

    return curr_response

for curr_dataset_rid in DATASETS_RIDS :
    trigger_row_count(curr_dataset_rid, branch)
  • 提交日期:2024-03-26
  • 标签:exportpythonmetricsmetadatalocal

获取跨数据集的列超集

如何获取多个数据集的所有列的集合?

以下代码使用 requests 库获取目标数据集列表中每个数据集的 schema,然后遍历 schema 中的字段,创建一个包含列超集中每个列出现频率的字典。

import time

from requests.packages.urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
import requests
import json
import pprint
import logging
import datetime
import collections


'''
脚本从一组数据集中生成包含列及其出现频率的超集
'''

headers = {
    'Authorization': 'Bearer eyg_PUT_YOUR_TOKEN_HERE_xxxx',
    'Content-Type': 'application/json',
}

## STACK_NAME
base_url = "STACK_NAME.palantircloud.com"
branch = "master"


target_datasets = ["ri.foundry.main.dataset.4c2ac089-xxxx-4df863eaf823"]

# 代理
proxyDict = {
    #"https": "https://proxyIfNeeded:port"
}

# 重试机制
retry = Retry(connect=1, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
http = requests.Session()
http.mount("https://", adapter)


global_list_fields = {}

for curr_dataset in target_datasets :
    # 获取数据集的 schema
    print(f"步骤 1. 获取数据集的 Schema")
    response = http.get(f'{base_url}/foundry-metadata/api/schemas/datasets/{curr_dataset}/branches/{branch}', headers=headers, proxies=proxyDict)
    print(f"步骤 1. 获取数据集 schema 的响应")
    raw_response = response.text
    print(raw_response)
    curr_schema = json.loads(raw_response)
    list_fields = curr_schema["schema"]["fieldSchemaList"]

    for field in list_fields:
        curr_key = f"{field['name']} - {field['type']}"
        # 递增计数器
        global_list_fields[curr_key] = global_list_fields.get(curr_key, 0) + 1

print("未排序的字典")
pprint.pprint(global_list_fields)

# 排序
sorted_dict = {k: v for k, v in sorted(global_list_fields.items(), key=lambda item: item[1])}
print("排序后的字典")
pprint.pprint(sorted_dict)
  • 提交日期:2024-03-26
  • 标签:pythonAPImetadatacode repositoriescode authoringlocal

OSS 直接调用

如何使用对象集服务(OSS)对对象集执行聚合操作?

以下代码演示了如何直接调用对象集服务(OSS)对对象集执行聚合操作。这对于调试或理解 OSS 返回的数据非常有用,OSS 被其他服务(如 OSDK 和前端)在底层使用。

from requests.adapters import HTTPAdapter
import requests
from urllib3 import Retry
import logging
import json
import pprint

'''
直接调用对象集服务(OSS)执行聚合等操作。
此代码不适用于生产环境中的"实际使用",但对于调试或深入一层非常有用,
例如调试或理解 OSS 实际返回的内容。
OSS 被其他服务(如 OSDK 和前端)在底层使用。
'''

# 请求头
headers = {
    'Authorization': 'Bearer xxx', # 将 'xxx' 替换为你的 bearer token
    'Content-Type': 'application/json',
}

# 主机
host = 'subdomain.domain.extension:port'

# 代理
proxyDict = {
    'https': 'protocol://subdomain.domain.extension:port'
}

# 重试机制
retry = Retry(connect=1, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
http = requests.Session()
http.mount('https://', adapter)

# 设置日志显示级别
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler("debug.log"),
        logging.StreamHandler()
    ]
    )

###############
## 变量定义  ##
###############


# 如果请求以某种方式失败,则抛出包含详细信息的错误
try:
    print(f'开始执行脚本...')

    # 数据 - 发送给 OSS 的示例 payload
    data = {
        "executionMode":"PREFER_ACCURACY", # OSS 默认使用 "PREFER_SPEED",可能导致结果不准确
            "objectSet": {
                "base": {
                    "objectTypeId": "af-20m-instances-obv2" # 对象实例 ID
                },
                "type": "base"
            },
            "aggregation": {
                "metrics": {},
                "subAggregations": {
                    "test": {
                        "type": "metrics",
                        "metrics": {
                            "dimension": {
                                "type": "propertyValue",
                                "propertyValue": {
                                    "propertyId": "example_bucket", # 用于聚合的对象属性
                                    "bucketing": {
                                        "type": "exactValue",
                                        "exactValue": {
                                            "maxBuckets": 10 # 响应中应包含的分桶数量
                                        }
                                    }
                                }
                            },
                            "ordering": [
                                {
                                    "type": "valueOrdering",
                                    "valueOrdering": {
                                        "direction": "DESCENDING",
                                        "metricName": "countM"
                                    }
                                }
                            ],
                            "metrics": {
                                "countM": {
                                    "type": "count",
                                    "count": {}
                                }
                            }
                        }
                    }
                }
            }
        }



    # 序列化 JSON
    data = json.dumps(source_data)

    response = http.put(f'https://{host}/object-set-service/api/aggregate',
                         data=data,
                         headers=headers, 
                         # 如果需要代理,取消注释
                         # proxies=proxyDict  
                        )

    print('请求已完成')
    print(f'脚本执行结果为...')
    pprint.pprint(response.json())

except requests.exceptions.RequestException as e:
    raise Exception(f"请求过程中发生错误。\n失败原因:{response.status_code} - {response.text}\n异常信息:{e}")
  • 提交日期:2024-03-26
  • 标签:ontologyaggregationobjectspythonAPIlocal

Ping Foundry:无需 Token

如何在没有身份验证 Token 的情况下 Ping 我的 Foundry 实例或账户?

以下代码演示了如何使用 Python requests 库,在指定代理和重试设置下向 Palantir Cloud Stack 发送 ping 请求。

from requests.packages.urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
import requests

headers = {
    # 原则上:不需要 token!'Authorization': 'Bearer eyg_PUT_YOUR_TOKEN_HERE_xxxx',
    'Content-Type': 'application/json',
}

## STACK
base_url = "https://STACK_NAME.palantircloud.com"

# 代理
proxyDict = {
    "https": "http://proxy.host.com:3333"
}

# 重试机制
retry = Retry(connect=1, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
http = requests.Session()
http.mount("https://", adapter)


print("正在 Ping ... ")
response = http.get(f"{base_url}/compass/api/ping", headers=headers, proxies=proxyDict)
print("Ping 响应:")
raw_response = response.text
print(raw_response)
  • 提交日期:2024-03-26
  • 标签:APIpythoncompasslocal

获取给定资源 RID 的路径

如何通过资源 RID 查找其路径?

以下代码使用 requests 库向指定主机发送 HTTP GET 请求,并传入给定的 RID,从而获取资源的路径。代码还处理了重试和代理设置。

from requests.adapters import HTTPAdapter
import requests
from urllib3 import Retry

'''
脚本用于返回给定资源标识符(RID)的路径。
'''

# 请求头
headers = {
    'Authorization': 'Bearer xxx', # 将 'xxx' 替换为你的 bearer token
    'Content-Type': 'application/json',
}

# 主机
host = 'host.com:443'

# 代理
proxyDict = {
    'https': 'http://proxy.domain.com:3333'
}

# 重试机制
retry = Retry(connect=1, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
http = requests.Session()
http.mount('https://', adapter)

# 输入你想要获取路径的资源的 rid
RESOURCE_RID = ''

# 如果引用创建失败,则抛出错误
try:
    print(f'正在获取 rid {RESOURCE_RID} 的路径...')
    response = http.get(f'https://{host}/compass/api/resources/{RESOURCE_RID}/path-json', headers=headers, proxies=proxyDict)
    print('请求已完成')
    print(f'路径为:{response.text}')
except requests.exceptions.RequestException as e:
    raise Exception(f"请求过程中发生错误。\n获取仓库 {RESOURCE_RID} 的路径失败,原因:{response.status_code} - {response.text}\n异常信息:{e}")
  • 提交日期:2024-03-26
  • 标签:apipythonmetadatalocal

使用 API 触发操作

如何手动触发对象上的操作?

以下代码使用 requests 库向 actions API 发送 HTTP 请求,遍历 ID 列表并为每个 ID 使用自定义参数触发一个操作。

from requests.packages.urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
import pprint
import uuid
import requests
import json
import time

'''
脚本将触发一个操作。
'''

headers = {
    'Authorization': 'Bearer eyg_PUT_YOUR_TOKEN_HERE_xxxx',
    'Content-Type': 'application/json'
}

# Stack 名称
STACK = "STACK_NAME.palantircloud.com"
# ID 列表,可以是任何你想要遍历的参数列表
list_ids = ["123", "456"]

# 代理
proxyDict = {
    "https": "https://proxyIfNeeded:port"
}

# 重试机制
retry = Retry(connect=1, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
http = requests.Session()
http.mount("https://", adapter)

# 遍历 ID 列表,为每个 ID 触发一个操作。
for curr_id in list_ids:
    curr_uuid = "GENERATED-OBJECT-" + str(uuid.uuid4()) # 生成一个 uuid
    curr_title = "GENERATED-" + str(time.time()) # 生成一个时间戳
    user_rid = "xxxxxx-xxxx-xxxx-xxxx-xxxxxxxxx" # 如果需要用户 rid

    try:
        # 生成操作的 payload。从 slate/Workshop 的网络标签页获取,或从头开始构建。
        payload = r'{"actionTypeRid":"ri.actions.main.action-type.xxxxx-xxxx-xxxx-xxxxxxxx",' \
                  r'"parameters":{"ri.actions.main.parameter.xxxxx-xxxx-xxxx-xxxxxxxx":{"timestamp":"2021-09-30T23:59:59+02:00","type":"timestamp"},' \
                  r'"ri.actions.main.parameter.xxxxx-xxxx-xxxx-xxxxxxxx":{"stringList":{"strings":["mystring1","mystring2"]},"type":"stringList"},' \
                  r'"ri.actions.main.parameter.xxxxx-xxxx-xxxx-xxxxxxxx":{"string":"my_string","type":"string"},' \
                  r'"ri.actions.main.parameter.xxxxx-xxxx-xxxx-xxxxxxxx":{"timestamp":"2022-01-01:23:26+00:00","type":"timestamp"}}'

        response = http.post(f'https://{STACK}/actions/api/actions', headers=headers, proxies=proxyDict, data=payload)
        print(f"使用 {curr_id} 调用操作的原始响应\r\n")
        raw_response = response.json()
        pprint.pprint(raw_response, indent=4)

    except Exception as e:
        print(e)
  • 提交日期:2024-03-26
  • 标签:actionobjectsontologyactions on objectspythonapilocal

将本地文件上传到数据集

我可以使用什么 API 将本地文件上传到 Foundry 中的数据集?

以下代码使用 Foundry API 将文件上传到指定的数据集。它设置了请求头、主机、代理和重试机制,然后读取文件并通过 POST 请求将其发送到数据集的 files:upload 端点。

from requests.adapters import HTTPAdapter
import requests
from urllib3 import Retry
import logging
import json

# 请求头
headers = {
    'Authorization': 'Bearer xxx',  # 将 'xxx' 替换为你的 bearer token
    'Content-type': 'application/octet-stream', ### 重要!
}

# 主机
host = 'subdomain.domain.extension:port'

# 代理
proxyDict = {
    'https': 'protocol://subdomain.domain.extension:port'
}

# 重试机制
retry = Retry(connect=1, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
http = requests.Session()
http.mount('https://', adapter)

# 设置日志显示级别
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler("debug.log"),
        logging.StreamHandler()
    ]
)

###############
## 变量定义  ##
###############

# 要上传到的数据集 RID
datasetRid = "rid.123..."

params = {
    'filePath': 'folder_name/my-file.csv',
}

# 如果请求以某种方式失败,则抛出包含详细信息的错误
try:
    print(f'开始执行单次上传脚本')

    # 数据
    with open('./data_example_file.csv') as f:
        data = f.read().replace('\n', '').replace('\r', '').encode()

    response = http.post(f'https://{host}/api/v1/datasets/{datasetRid}/files:upload',
                        params=params,
                         data=data,
                         headers=headers,
                         # 如果需要代理,取消注释
                         # proxies=proxyDict
                         )

    print('请求已完成')
    print(f'脚本执行结果为 {response.status_code} - {response.text}')

except requests.exceptions.RequestException as e:
    raise Exception(
        f"请求过程中发生错误。\n失败原因:{response.status_code} - {response.text}\n异常信息:{e}")
  • 提交日期:2024-04-04
  • 标签:APIfile uploaddatasetpythoncsvlocal