Logging(日志记录)¶
It's possible to output various debugging information from PySpark in Foundry.
Code Workbook¶
Python's built in print pipes to the Output section of the Code Workbook to the right of the code editor where errors normally appear.
def new_dataset(some_input_dataset):
print("example log output")
example log output
Code Repositories¶
Code Repositories uses Python's built in logging library ↗. This is widely documented online and allows you to control logging level (ERROR, WARNING, INFO) for easier filtering.
Logging output appears in both your output dataset's log files, and in your build's driver logs (Dataset -> Details -> Files -> Log Files, and Builds -> Build -> Job status logs; select "Driver logs", respectively).
import logging
logger = logging.getLogger(__name__)
@transform_df(
...
)
def some_transformation(some_input):
logger.info("example log output")
INFO [2018-01-01T12:00:00] some_transformation: example log output
Logging from inside a Python UDF¶
Spark captures logging output from the top-level driver process that creates your query, such as from the some_transformation function above. However, it does not capture logs written from inside of User Defined Functions (UDFs). If you are using a UDF within your PySpark query and need to log data, create and call a second UDF that returns the data you wish to capture.
@transform_df(
...
)
def some_transformation(some_input):
logger.info("log output related to the overall query")
@F.udf("integer")
def custom_function(integer_input):
return integer_input + 5
@F.udf("string")
def custom_log(integer_input):
return "Original integer was %d before adding 5" % integer_input
df = (
some_input
.withColumn("new_integer", custom_function(F.col("example_integer_col"))
.withColumn("debugging", custom_log(F.col("example_integer_col"))
)
Examples¶
We often want to log information about what's happening in our query. PySpark has a number of ways to introspect DataFrames and we can send this information to the logging mechanisms described above.
We will use Code Workbook's print syntax in these examples but print can be substituted for Transforms & Authoring's logger.
DataFrame Columns¶
We can introspect the columns that exist on a DataFrame with df.columns This produces a list of strings.
def employee_phone_numbers(employee_df, phone_number_df):
print("employee columns are {}".format(employee_df.columns))
print("phone columns are {}".format(phone_df.columns))
df = employee_df.join(phone_number_df, 'employee_id', 'left')
print("joined columns are {}".format(df.columns))
employee columns are ['name', 'employee_id']
phone columns are ['phone_number', 'employee_id']
joined columns are ['name', 'employee_id', 'phone_number']
Verifying join behavior¶
Suppose we are performing a left join, expect a one-to-one relationship and want to verify that the number of rows in our left DataFrame has stayed the same.
def employee_phone_numbers(employee_df, phone_number_df):
original_employee_rows = employee_df.count()
print("Incoming employee rows {}".format(original_employee_rows))
df = employee_df.join(phone_number_df, 'employee_id', 'left')
rows_after_join = df.count()
print("Final employee rows {}".format(rows_after_join))
if rows_after_join > original_employee_rows:
print("Some employees have multiple phone numbers!")
else:
print("Data is correct")
Incoming employee rows 100
Final employee rows 105
Some employees have multiple phone numbers!
Spark Query Plan¶
You can access the optimized physical plan that Spark will run to generate a given DataFrame by calling .explain().
def employee_phone_numbers(employee, phone):
employee = employee.where(F.month(employee.birthday) == F.month(F.current_date()))
df = employee.join(phone, 'employee_id', 'left')
df.explain()
== Physical Plan ==
*(2) Project [employee_id#9734, name#9732, birthday#9733, phone_number#9728]
+- *(2) BroadcastHashJoin [employee_id#9734], [employee_id#9729], LeftOuter, BuildRight
:- *(2) Filter (month(birthday#9733) = 10)
: +- *(2) FileScan parquet !ri.foundry.main.transaction.00000000-e98a-c557-a20f-5eea5f373e36:ri.foundry.main.transaction.00000000-e98a-c557-a20f-5eea5f373e36@00000000-1ebd-4a81-9f64-2d4c8a8472bc:master.ri.foundry.main.dataset.6ad20cd7-45b0-4312-b096-05f57487f650[name#9732,birthday#9733,employee_id#9734] Batched: true, Format: Parquet, Location: FoundryCatalogFileIndex[sparkfoundry://.../datasets/ri.f..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<name:string,birthday:date,employee_id:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, true] as bigint)))
+- *(1) Project [phone_number#9728, employee_id#9729]
+- *(1) Filter isnotnull(employee_id#9729)
+- *(1) FileScan csv !ri.foundry.main.transaction.00000000-e989-4f9a-90d5-996f088611db:ri.foundry.main.transaction.00000000-e989-4f9a-90d5-996f088611db@00000000-1ebc-f483-b75d-dbcc3292d9e4:master.ri.foundry.main.dataset.f5bf4c77-37c0-4e29-8a68-814c35442bbd[phone_number#9728,employee_id#9729] Batched: false, Format: CSV, Location: FoundryCatalogFileIndex[sparkfoundry://.../datasets/ri.f..., PartitionFilters: [], PushedFilters: [IsNotNull(employee_id)], ReadSchema: struct<phone_number:int,employee_id:int>
Viewing Data¶
Suppose we would like to see which employees have the most phone numbers. We will derive the dataset we're interested in (employees with more than one number) and call .take(3) to retrieve the top 3 rows as a list. Alternatively .collect() retrieves all rows of a DataFrame as a list.
:::callout{theme="warning" title="Warning"}
Pulling too much data into your Python environment can easily cause it to run out of memory. Only collect() small amounts of data.
:::
def multiple_numbers(phone_numbers):
df = phone_numbers.groupBy('employee_id').agg(
F.count('phone_number').alias('numbers')
).where('numbers' > 1).sort(F.col('numbers').desc())
print(df.take(3))
[Row(employee_id=70, numbers=4), Row(employee_id=90, numbers=2), Row(employee_id=25, numbers=2)]
中文翻译¶
日志记录¶
在 Foundry 中,可以通过 PySpark 输出各种调试信息。
代码工作簿(Code Workbook)¶
Python 内置的 print 函数会将输出管道传输至 Code Workbook 的“输出”部分,该部分位于代码编辑器右侧,通常用于显示错误信息。
def new_dataset(some_input_dataset):
print("example log output")
example log output
代码库(Code Repositories)¶
Code Repositories 使用 Python 内置的日志记录库 ↗。该库在网上有详尽的文档,并允许您控制日志级别(ERROR、WARNING、INFO),以便于过滤。
日志输出会同时显示在输出数据集的日志文件以及构建的驱动日志中(分别通过 Dataset -> Details -> Files -> Log Files 和 Builds -> Build -> Job status logs 路径查看;并选择 "Driver logs")。
import logging
logger = logging.getLogger(__name__)
@transform_df(
...
)
def some_transformation(some_input):
logger.info("example log output")
INFO [2018-01-01T12:00:00] some_transformation: example log output
在 Python UDF 内部记录日志¶
Spark 会捕获创建查询的顶层驱动进程的日志输出,例如上述 some_transformation 函数产生的日志。但是,它不会捕获在用户自定义函数(UDFs)内部写入的日志。如果您在 PySpark 查询中使用了 UDF 并需要记录数据,请创建并调用第二个 UDF,使其返回您希望捕获的数据。
@transform_df(
...
)
def some_transformation(some_input):
logger.info("log output related to the overall query")
@F.udf("integer")
def custom_function(integer_input):
return integer_input + 5
@F.udf("string")
def custom_log(integer_input):
return "Original integer was %d before adding 5" % integer_input
df = (
some_input
.withColumn("new_integer", custom_function(F.col("example_integer_col"))
.withColumn("debugging", custom_log(F.col("example_integer_col"))
)
示例¶
我们通常需要记录查询中正在执行的操作信息。PySpark 提供了多种内省 DataFrame 的方法,我们可以将这些信息发送到上述日志记录机制中。
以下示例将使用 Code Workbook 的 print 语法,但在 Transforms & Authoring 中,可以将 print 替换为 logger。
DataFrame 列¶
我们可以通过 df.columns 内省 DataFrame 中存在的列。这将生成一个字符串列表。
def employee_phone_numbers(employee_df, phone_number_df):
print("employee columns are {}".format(employee_df.columns))
print("phone columns are {}".format(phone_df.columns))
df = employee_df.join(phone_number_df, 'employee_id', 'left')
print("joined columns are {}".format(df.columns))
employee columns are ['name', 'employee_id']
phone columns are ['phone_number', 'employee_id']
joined columns are ['name', 'employee_id', 'phone_number']
验证连接行为¶
假设我们正在执行左连接,预期为一对一关系,并希望验证左侧 DataFrame 的行数是否保持不变。
def employee_phone_numbers(employee_df, phone_number_df):
original_employee_rows = employee_df.count()
print("Incoming employee rows {}".format(original_employee_rows))
df = employee_df.join(phone_number_df, 'employee_id', 'left')
rows_after_join = df.count()
print("Final employee rows {}".format(rows_after_join))
if rows_after_join > original_employee_rows:
print("Some employees have multiple phone numbers!")
else:
print("Data is correct")
Incoming employee rows 100
Final employee rows 105
Some employees have multiple phone numbers!
Spark 查询计划¶
您可以通过调用 .explain() 来访问 Spark 为生成给定 DataFrame 而运行的优化后的物理计划(Physical Plan)。
def employee_phone_numbers(employee, phone):
employee = employee.where(F.month(employee.birthday) == F.month(F.current_date()))
df = employee.join(phone, 'employee_id', 'left')
df.explain()
== Physical Plan ==
*(2) Project [employee_id#9734, name#9732, birthday#9733, phone_number#9728]
+- *(2) BroadcastHashJoin [employee_id#9734], [employee_id#9729], LeftOuter, BuildRight
:- *(2) Filter (month(birthday#9733) = 10)
: +- *(2) FileScan parquet !ri.foundry.main.transaction.00000000-e98a-c557-a20f-5eea5f373e36:ri.foundry.main.transaction.00000000-e98a-c557-a20f-5eea5f373e36@00000000-1ebd-4a81-9f64-2d4c8a8472bc:master.ri.foundry.main.dataset.6ad20cd7-45b0-4312-b096-05f57487f650[name#9732,birthday#9733,employee_id#9734] Batched: true, Format: Parquet, Location: FoundryCatalogFileIndex[sparkfoundry://.../datasets/ri.f..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<name:string,birthday:date,employee_id:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, true] as bigint)))
+- *(1) Project [phone_number#9728, employee_id#9729]
+- *(1) Filter isnotnull(employee_id#9729)
+- *(1) FileScan csv !ri.foundry.main.transaction.00000000-e989-4f9a-90d5-996f088611db:ri.foundry.main.transaction.00000000-e989-4f9a-90d5-996f088611db@00000000-1ebc-f483-b75d-dbcc3292d9e4:master.ri.foundry.main.dataset.f5bf4c77-37c0-4e29-8a68-814c35442bbd[phone_number#9728,employee_id#9729] Batched: false, Format: CSV, Location: FoundryCatalogFileIndex[sparkfoundry://.../datasets/ri.f..., PartitionFilters: [], PushedFilters: [IsNotNull(employee_id)], ReadSchema: struct<phone_number:int,employee_id:int>
查看数据¶
假设我们想查看哪些员工拥有的电话号码最多。我们将派生出感兴趣的数据集(拥有多个号码的员工),并调用 .take(3) 以列表形式检索前 3 行。或者,使用 .collect() 以列表形式检索 DataFrame 的所有行。
:::callout{theme="warning" title="警告"}
将过多数据拉取到 Python 环境中很容易导致内存溢出。请仅对少量数据使用 collect()。
:::
def multiple_numbers(phone_numbers):
df = phone_numbers.groupBy('employee_id').agg(
F.count('phone_number').alias('numbers')
).where('numbers' > 1).sort(F.col('numbers').desc())
print(df.take(3))
[Row(employee_id=70, numbers=4), Row(employee_id=90, numbers=2), Row(employee_id=25, numbers=2)]