跳转至

Transforms(转换(Transforms))

Python

Combine shapefiles and convert to GeoJSON

How do I combine multiple shapefiles and convert them to GeoJSON format?

This code uses the geospatial_tools library to read multiple shapefiles, convert their geometries to GeoJSON format, and combine them into a single PySpark DataFrame. It also computes the centroid of each geometry and converts it to a geohash.

from transforms.api import transform, Input, Output
from geospatial_tools import geospatial
from geospatial_tools.functions import clean_geometry, centroid, geojson_to_geohash
import tempfile
import shutil
import geopandas as gpd
from pyspark.sql import types as T
from pyspark.sql import functions as F
import json
from shapely.geometry import mapping


@geospatial()
@transform(
    output=Output(),
    input_data=Input(),
)
def compute(ctx, input_data, output):
    fs = input_data.filesystem()
    schema = T.StructType([T.StructField("geoshape", T.StringType()),
                           T.StructField("name", T.StringType()),
                           T.StructField("centroid", T.StringType())])
    shapefiles = [f.path.replace('.shp', '') for f in fs.ls(glob='*shp')]
    combined_data = ctx.spark_session.createDataFrame([], schema)
    for shapefile in shapefiles:  # NOQA
        with tempfile.TemporaryDirectory() as tmp_dir:
            # Copy all files for the shapefile to the local filesystem
            # There are multiple files associated with a shapefile, such as .prj and .cpg
            for shapefile_file in fs.ls(glob=f'{shapefile}.*'):
                with open(f'{tmp_dir}/{shapefile_file.path}', 'wb') as tmp_file:
                    with fs.open(shapefile_file.path, 'rb') as f:
                        shutil.copyfileobj(f, tmp_file)
            # Create a GeoJSON geometry column
            pdf = gpd.read_file(f'{tmp_dir}/{shapefile}.shp')
            pdf['geometry'] = pdf.geometry.apply(lambda x: json.dumps(mapping(x)))
            df = ctx.spark_session.createDataFrame(pdf)

            # Convert everything to EPSG:4326 format expected by Foundry
            crs = gpd.read_file(f'{tmp_dir}/{shapefile}.shp').crs.to_string()
            df = df.withColumn(
                "geoshape",
                clean_geometry('geometry', crs, lat_long=(crs != "EPSG:4326"))
                ).select("geoshape")
            df = df.withColumn('name', F.lit(shapefile))
            df = df.withColumn('centroid', geojson_to_geohash(centroid('geoshape')))
            combined_data = combined_data.unionByName(df)

    return output.write_dataframe(combined_data)
  • Date submitted: 2024-05-23
  • Tags: geospatial, shapefile, geojson, geohash, pyspark, geopandas

Geospatial join with buffer in PySpark

How do I perform a geospatial join with a buffer in PySpark?

This code uses the geospatial_tools library to perform a geospatial join between two datasets, lines, and points, with a 30,000 meters buffer around the points. It then returns a DataFrame with the point_id and line_id.

from pyspark.sql import functions as F
from transforms.api import configure, transform_df, Input, Output
from geospatial_tools import geospatial
from geospatial_tools.functions import buffer, lat_long_to_geometry

@configure(profile=['GEOSPARK'])
@geospatial()
@transform_df(
    Output(),
    lines=Input(),
    points=Input()
)
def compute(lines, points):
    lines = lines.select(F.col('id').alias('line_id'), 'geometry')
    points = points.withColumn(
        'geometry', lat_long_to_geometry('latitude', 'longitude', 'EPSG:4326')
        ).withColumn('geometry_buff', buffer('geometry', meters=30000)
        ).select('point_id', 'geometry_buff')
    df = points.spatial_join(
            lines,
            ('geometry_buff', 'geometry'),
            'left'
        ).select(points.point_id, lines.line_id)
    return df
  • Date submitted: 2024-04-25
  • Tags: geospatial, pyspark, geospatial_tools, buffer, spatial_join

中文翻译


转换(Transforms)

Python

合并 shapefile 并转换为 GeoJSON

如何合并多个 shapefile 并将其转换为 GeoJSON 格式?

以下代码使用 geospatial_tools 库读取多个 shapefile,将其几何图形转换为 GeoJSON 格式,并合并为一个 PySpark DataFrame。同时计算每个几何图形的质心(centroid)并将其转换为地理哈希(geohash)。

from transforms.api import transform, Input, Output
from geospatial_tools import geospatial
from geospatial_tools.functions import clean_geometry, centroid, geojson_to_geohash
import tempfile
import shutil
import geopandas as gpd
from pyspark.sql import types as T
from pyspark.sql import functions as F
import json
from shapely.geometry import mapping


@geospatial()
@transform(
    output=Output(),
    input_data=Input(),
)
def compute(ctx, input_data, output):
    fs = input_data.filesystem()
    schema = T.StructType([T.StructField("geoshape", T.StringType()),
                           T.StructField("name", T.StringType()),
                           T.StructField("centroid", T.StringType())])
    shapefiles = [f.path.replace('.shp', '') for f in fs.ls(glob='*shp')]
    combined_data = ctx.spark_session.createDataFrame([], schema)
    for shapefile in shapefiles:  # NOQA
        with tempfile.TemporaryDirectory() as tmp_dir:
            # 将 shapefile 的所有相关文件复制到本地文件系统
            # shapefile 包含多个关联文件,例如 .prj 和 .cpg
            for shapefile_file in fs.ls(glob=f'{shapefile}.*'):
                with open(f'{tmp_dir}/{shapefile_file.path}', 'wb') as tmp_file:
                    with fs.open(shapefile_file.path, 'rb') as f:
                        shutil.copyfileobj(f, tmp_file)
            # 创建 GeoJSON 几何图形列
            pdf = gpd.read_file(f'{tmp_dir}/{shapefile}.shp')
            pdf['geometry'] = pdf.geometry.apply(lambda x: json.dumps(mapping(x)))
            df = ctx.spark_session.createDataFrame(pdf)

            # 将所有数据转换为 Foundry 所需的 EPSG:4326 格式
            crs = gpd.read_file(f'{tmp_dir}/{shapefile}.shp').crs.to_string()
            df = df.withColumn(
                "geoshape",
                clean_geometry('geometry', crs, lat_long=(crs != "EPSG:4326"))
                ).select("geoshape")
            df = df.withColumn('name', F.lit(shapefile))
            df = df.withColumn('centroid', geojson_to_geohash(centroid('geoshape')))
            combined_data = combined_data.unionByName(df)

    return output.write_dataframe(combined_data)
  • 提交日期:2024-05-23
  • 标签:geospatialshapefilegeojsongeohashpysparkgeopandas

在 PySpark 中执行带缓冲区的空间连接(Geospatial join with buffer)

如何在 PySpark 中执行带缓冲区的空间连接?

以下代码使用 geospatial_tools 库,对线数据集和点数据集执行空间连接,并在点周围设置 30,000 米的缓冲区。最终返回包含 point_idline_id 的 DataFrame。

from pyspark.sql import functions as F
from transforms.api import configure, transform_df, Input, Output
from geospatial_tools import geospatial
from geospatial_tools.functions import buffer, lat_long_to_geometry

@configure(profile=['GEOSPARK'])
@geospatial()
@transform_df(
    Output(),
    lines=Input(),
    points=Input()
)
def compute(lines, points):
    lines = lines.select(F.col('id').alias('line_id'), 'geometry')
    points = points.withColumn(
        'geometry', lat_long_to_geometry('latitude', 'longitude', 'EPSG:4326')
        ).withColumn('geometry_buff', buffer('geometry', meters=30000)
        ).select('point_id', 'geometry_buff')
    df = points.spatial_join(
            lines,
            ('geometry_buff', 'geometry'),
            'left'
        ).select(points.point_id, lines.line_id)
    return df
  • 提交日期:2024-04-25
  • 标签:geospatialpysparkgeospatial_toolsbufferspatial_join