Transforms(转换(Transforms))¶
Python¶
Fetch and update data incrementally from API using PySpark¶
How do I fetch data from an API and update it incrementally using external transforms?
This code uses PySpark and the requests library to fetch data from an API between a specified date range and update the output incrementally. It additionally supports paging if the API also supports paging.
from pyspark.sql import functions as F
from transforms.api import incremental, transform, Output
import requests
from transforms.external.systems import EgressPolicy, use_external_systems, Credential
import logging
from datetime import datetime as dt
import json
def _get_data(token, start_date, end_date, next_link_url='<YOUR_URL>'):
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"}
data = {
"from": start_date,
"to": end_date,
}
response = requests.post(next_link_url, json=data, headers=headers)
logging.warn(response.json())
data = response.json()["data"]
return json.dumps(data)
@use_external_systems(
creds=Credential(),
egress=EgressPolicy(),
)
@incremental()
@transform(
output=Output(),
)
def compute(output, creds, egress, ctx):
token = creds.get("token")
if ctx.is_incremental:
previous = output.dataframe('current').localCheckpoint()
if NEXT_LINK_COLUMN in previous.columns:
latest_row = (
previous
.where(F.col(LAST_MODIFIED_COLUMN).isNotNull())
.orderBy([F.col(REQUEST_TIMESTAMP_COLUMN).desc(), F.col(LAST_MODIFIED_COLUMN).desc()])
.limit(1).collect()[0]
)
next_link_url = latest_row[NEXT_LINK_COLUMN]
last_date = latest_row[LAST_MODIFIED_COLUMN]
else:
last_date = previous.orderBy(F.col(LAST_MODIFIED_COLUMN).desc()).limit(1).collect()[0][LAST_MODIFIED_COLUMN]
today = dt.today().strftime("%Y-%m-%d")
data = _get_data(token, last_date, today, next_link_url)
df = ctx.spark_session.createDataFrame([{'date': last_date, 'data': data}])
output.set_mode("modify")
output.write_dataframe(df)
- Date submitted: 2024-04-26
- Tags:
API,pyspark,incremental,dataframe,external transform
中文翻译¶
转换(Transforms)¶
Python¶
使用PySpark从API增量获取和更新数据¶
如何从API获取数据并使用外部转换(External Transforms)进行增量更新?
以下代码使用PySpark和requests库,在指定日期范围内从API获取数据并增量更新输出。如果API支持分页(Paging),该代码也支持分页功能。
from pyspark.sql import functions as F
from transforms.api import incremental, transform, Output
import requests
from transforms.external.systems import EgressPolicy, use_external_systems, Credential
import logging
from datetime import datetime as dt
import json
def _get_data(token, start_date, end_date, next_link_url='<YOUR_URL>'):
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"}
data = {
"from": start_date,
"to": end_date,
}
response = requests.post(next_link_url, json=data, headers=headers)
logging.warn(response.json())
data = response.json()["data"]
return json.dumps(data)
@use_external_systems(
creds=Credential(),
egress=EgressPolicy(),
)
@incremental()
@transform(
output=Output(),
)
def compute(output, creds, egress, ctx):
token = creds.get("token")
if ctx.is_incremental:
previous = output.dataframe('current').localCheckpoint()
if NEXT_LINK_COLUMN in previous.columns:
latest_row = (
previous
.where(F.col(LAST_MODIFIED_COLUMN).isNotNull())
.orderBy([F.col(REQUEST_TIMESTAMP_COLUMN).desc(), F.col(LAST_MODIFIED_COLUMN).desc()])
.limit(1).collect()[0]
)
next_link_url = latest_row[NEXT_LINK_COLUMN]
last_date = latest_row[LAST_MODIFIED_COLUMN]
else:
last_date = previous.orderBy(F.col(LAST_MODIFIED_COLUMN).desc()).limit(1).collect()[0][LAST_MODIFIED_COLUMN]
today = dt.today().strftime("%Y-%m-%d")
data = _get_data(token, last_date, today, next_link_url)
df = ctx.spark_session.createDataFrame([{'date': last_date, 'data': data}])
output.set_mode("modify")
output.write_dataframe(df)
- 提交日期:2024-04-26
- 标签:
API、pyspark、incremental、dataframe、external transform