Transforms(转换(Transforms))¶
Python¶
Combine shapefiles and convert to GeoJSON¶
How do I combine multiple shapefiles and convert them to GeoJSON format?
This code uses the geospatial_tools library to read multiple shapefiles, convert their geometries to GeoJSON format, and combine them into a single PySpark DataFrame. It also computes the centroid of each geometry and converts it to a geohash.
from transforms.api import transform, Input, Output
from geospatial_tools import geospatial
from geospatial_tools.functions import clean_geometry, centroid, geojson_to_geohash
import tempfile
import shutil
import geopandas as gpd
from pyspark.sql import types as T
from pyspark.sql import functions as F
import json
from shapely.geometry import mapping
@geospatial()
@transform(
output=Output(),
input_data=Input(),
)
def compute(ctx, input_data, output):
fs = input_data.filesystem()
schema = T.StructType([T.StructField("geoshape", T.StringType()),
T.StructField("name", T.StringType()),
T.StructField("centroid", T.StringType())])
shapefiles = [f.path.replace('.shp', '') for f in fs.ls(glob='*shp')]
combined_data = ctx.spark_session.createDataFrame([], schema)
for shapefile in shapefiles: # NOQA
with tempfile.TemporaryDirectory() as tmp_dir:
# Copy all files for the shapefile to the local filesystem
# There are multiple files associated with a shapefile, such as .prj and .cpg
for shapefile_file in fs.ls(glob=f'{shapefile}.*'):
with open(f'{tmp_dir}/{shapefile_file.path}', 'wb') as tmp_file:
with fs.open(shapefile_file.path, 'rb') as f:
shutil.copyfileobj(f, tmp_file)
# Create a GeoJSON geometry column
pdf = gpd.read_file(f'{tmp_dir}/{shapefile}.shp')
pdf['geometry'] = pdf.geometry.apply(lambda x: json.dumps(mapping(x)))
df = ctx.spark_session.createDataFrame(pdf)
# Convert everything to EPSG:4326 format expected by Foundry
crs = gpd.read_file(f'{tmp_dir}/{shapefile}.shp').crs.to_string()
df = df.withColumn(
"geoshape",
clean_geometry('geometry', crs, lat_long=(crs != "EPSG:4326"))
).select("geoshape")
df = df.withColumn('name', F.lit(shapefile))
df = df.withColumn('centroid', geojson_to_geohash(centroid('geoshape')))
combined_data = combined_data.unionByName(df)
return output.write_dataframe(combined_data)
- Date submitted: 2024-05-23
- Tags:
geospatial,shapefile,geojson,geohash,pyspark,geopandas
Geospatial join with buffer in PySpark¶
How do I perform a geospatial join with a buffer in PySpark?
This code uses the geospatial_tools library to perform a geospatial join between two datasets, lines, and points, with a 30,000 meters buffer around the points. It then returns a DataFrame with the point_id and line_id.
from pyspark.sql import functions as F
from transforms.api import configure, transform_df, Input, Output
from geospatial_tools import geospatial
from geospatial_tools.functions import buffer, lat_long_to_geometry
@configure(profile=['GEOSPARK'])
@geospatial()
@transform_df(
Output(),
lines=Input(),
points=Input()
)
def compute(lines, points):
lines = lines.select(F.col('id').alias('line_id'), 'geometry')
points = points.withColumn(
'geometry', lat_long_to_geometry('latitude', 'longitude', 'EPSG:4326')
).withColumn('geometry_buff', buffer('geometry', meters=30000)
).select('point_id', 'geometry_buff')
df = points.spatial_join(
lines,
('geometry_buff', 'geometry'),
'left'
).select(points.point_id, lines.line_id)
return df
- Date submitted: 2024-04-25
- Tags:
geospatial,pyspark,geospatial_tools,buffer,spatial_join
中文翻译¶
转换(Transforms)¶
Python¶
合并 shapefile 并转换为 GeoJSON¶
如何合并多个 shapefile 并将其转换为 GeoJSON 格式?
以下代码使用 geospatial_tools 库读取多个 shapefile,将其几何图形转换为 GeoJSON 格式,并合并为一个 PySpark DataFrame。同时计算每个几何图形的质心(centroid)并将其转换为地理哈希(geohash)。
from transforms.api import transform, Input, Output
from geospatial_tools import geospatial
from geospatial_tools.functions import clean_geometry, centroid, geojson_to_geohash
import tempfile
import shutil
import geopandas as gpd
from pyspark.sql import types as T
from pyspark.sql import functions as F
import json
from shapely.geometry import mapping
@geospatial()
@transform(
output=Output(),
input_data=Input(),
)
def compute(ctx, input_data, output):
fs = input_data.filesystem()
schema = T.StructType([T.StructField("geoshape", T.StringType()),
T.StructField("name", T.StringType()),
T.StructField("centroid", T.StringType())])
shapefiles = [f.path.replace('.shp', '') for f in fs.ls(glob='*shp')]
combined_data = ctx.spark_session.createDataFrame([], schema)
for shapefile in shapefiles: # NOQA
with tempfile.TemporaryDirectory() as tmp_dir:
# 将 shapefile 的所有相关文件复制到本地文件系统
# shapefile 包含多个关联文件,例如 .prj 和 .cpg
for shapefile_file in fs.ls(glob=f'{shapefile}.*'):
with open(f'{tmp_dir}/{shapefile_file.path}', 'wb') as tmp_file:
with fs.open(shapefile_file.path, 'rb') as f:
shutil.copyfileobj(f, tmp_file)
# 创建 GeoJSON 几何图形列
pdf = gpd.read_file(f'{tmp_dir}/{shapefile}.shp')
pdf['geometry'] = pdf.geometry.apply(lambda x: json.dumps(mapping(x)))
df = ctx.spark_session.createDataFrame(pdf)
# 将所有数据转换为 Foundry 所需的 EPSG:4326 格式
crs = gpd.read_file(f'{tmp_dir}/{shapefile}.shp').crs.to_string()
df = df.withColumn(
"geoshape",
clean_geometry('geometry', crs, lat_long=(crs != "EPSG:4326"))
).select("geoshape")
df = df.withColumn('name', F.lit(shapefile))
df = df.withColumn('centroid', geojson_to_geohash(centroid('geoshape')))
combined_data = combined_data.unionByName(df)
return output.write_dataframe(combined_data)
- 提交日期:2024-05-23
- 标签:
geospatial、shapefile、geojson、geohash、pyspark、geopandas
在 PySpark 中执行带缓冲区的空间连接(Geospatial join with buffer)¶
如何在 PySpark 中执行带缓冲区的空间连接?
以下代码使用 geospatial_tools 库,对线数据集和点数据集执行空间连接,并在点周围设置 30,000 米的缓冲区。最终返回包含 point_id 和 line_id 的 DataFrame。
from pyspark.sql import functions as F
from transforms.api import configure, transform_df, Input, Output
from geospatial_tools import geospatial
from geospatial_tools.functions import buffer, lat_long_to_geometry
@configure(profile=['GEOSPARK'])
@geospatial()
@transform_df(
Output(),
lines=Input(),
points=Input()
)
def compute(lines, points):
lines = lines.select(F.col('id').alias('line_id'), 'geometry')
points = points.withColumn(
'geometry', lat_long_to_geometry('latitude', 'longitude', 'EPSG:4326')
).withColumn('geometry_buff', buffer('geometry', meters=30000)
).select('point_id', 'geometry_buff')
df = points.spatial_join(
lines,
('geometry_buff', 'geometry'),
'left'
).select(points.point_id, lines.line_id)
return df
- 提交日期:2024-04-25
- 标签:
geospatial、pyspark、geospatial_tools、buffer、spatial_join