Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ flake8
mysql
pytest
pytest-cov
pytest-benchmark
pytest-html
pytest-mypy
rasterio;implementation_name!='pypy'
Empty file added tests/benchmarks/__init__.py
Empty file.
118 changes: 118 additions & 0 deletions tests/benchmarks/test_insert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import pytest
import shapely
from sqlalchemy import Column
from sqlalchemy import Integer

from geoalchemy2 import Geometry
from geoalchemy2.elements import WKTElement
from geoalchemy2.shape import to_shape

from .. import test_only_with_dialects


@pytest.fixture
def WktTable(base, schema):
class WktTable(base):
__tablename__ = "wkt_table"
__table_args__ = {"schema": schema}
id = Column(Integer, primary_key=True)
geom = Column(Geometry(geometry_type="POINTZM", from_text="ST_GeomFromEWKT", srid=4326))

def __init__(self, geom):
self.geom = geom

return WktTable


@pytest.fixture
def WkbTable(base, schema):
class WkbTable(base):
__tablename__ = "wkb_table"
__table_args__ = {"schema": schema}
id = Column(Integer, primary_key=True)
geom = Column(Geometry(geometry_type="POINTZM", from_text="ST_GeomFromWKB", srid=4326))

def __init__(self, geom):
self.geom = geom

return WkbTable


def create_points(N=50):
"""Create a list of points for benchmarking."""
points = []
for i in range(N):
for j in range(N):
for k in range(N):
wkt = f"POINT({i} {j} {k} {i + j + k})"
points.append(wkt)
return points


def insert_all_points(conn, table, points):
"""Insert all points into the database."""
query = table.insert().values(
[
{
"geom": point,
}
for point in points
]
)
return conn.execute(query)


def _benchmark_insert(conn, table_class, metadata, benchmark, convert_wkb=False, N=50):
"""Benchmark the insert operation."""
print(f"Benchmarking {table_class.__tablename__} insert operation")

# Create the points to insert
points = create_points(N)
print(f"Number of points to insert: {len(points)}")

if convert_wkb:
# Convert WKT to WKB
points = [shapely.io.to_wkb(to_shape(WKTElement(point)), flavor="iso") for point in points]
print(f"Converted points to WKB: {len(points)}")

# Create the table in the database
metadata.drop_all(conn, checkfirst=True)
metadata.create_all(conn)
print(f"Table {table_class.__tablename__} created")

table = table_class.__table__
return benchmark.pedantic(insert_all_points, args=(conn, table, points), iterations=1, rounds=1)


@pytest.mark.parametrize(
"N",
[2, 10, 50],
)
@test_only_with_dialects("postgresql")
def test_insert_wkt(benchmark, WktTable, conn, metadata, N):
"""Benchmark the insert operation for WKT."""
_benchmark_insert(conn, WktTable, metadata, benchmark, N=N)

assert (
conn.execute(
WktTable.__table__.select().where(WktTable.__table__.c.geom.is_not(None))
).rowcount
== N * N * N
)


@pytest.mark.parametrize(
"N",
[2, 10, 50],
)
@test_only_with_dialects("postgresql")
def test_insert_wkb(benchmark, WkbTable, conn, metadata, N):
"""Benchmark the insert operation for WKB."""
_benchmark_insert(conn, WkbTable, metadata, benchmark, convert_wkb=True, N=N)

assert (
conn.execute(
WkbTable.__table__.select().where(WkbTable.__table__.c.geom.is_not(None))
).rowcount
== N * N * N
)