From 3eb29c0c4665a50b548cc6483191b00896c686d5 Mon Sep 17 00:00:00 2001 From: Rob Emanuele Date: Thu, 15 Jul 2021 19:35:21 -0400 Subject: [PATCH 01/17] Add pgstac to the search path in migrations sql --- sql/999_version.sql | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/999_version.sql b/sql/999_version.sql index 8046936c..d6c8d117 100644 --- a/sql/999_version.sql +++ b/sql/999_version.sql @@ -1 +1,3 @@ +SET SEARCH_PATH TO pgstac, public; + INSERT INTO migrations (version) VALUES ('0.2.7'); From 7d1f7c755c4f8124babf60c73056fc0a4387bbc7 Mon Sep 17 00:00:00 2001 From: Rob Emanuele Date: Thu, 15 Jul 2021 19:35:37 -0400 Subject: [PATCH 02/17] Add version script to pgstac initialization script. This adds the entry to the migrations table which was missing prior, and would cause "pypgstac migrate" to fail on new databases --- pgstac.sql | 1 + 1 file changed, 1 insertion(+) diff --git a/pgstac.sql b/pgstac.sql index 98c88402..6b423951 100644 --- a/pgstac.sql +++ b/pgstac.sql @@ -3,4 +3,5 @@ BEGIN; \i sql/002_collections.sql \i sql/003_items.sql \i sql/004_search.sql +\i sql/999_version.sql COMMIT; \ No newline at end of file From b4e3b367d049b65336f301c250d3d7e26a78c38e Mon Sep 17 00:00:00 2001 From: Rob Emanuele Date: Thu, 15 Jul 2021 19:36:42 -0400 Subject: [PATCH 03/17] Add python entries to .gitignore --- .gitignore | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.gitignore b/.gitignore index 1f927703..e6fa1141 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,6 @@ .envrc pypgstac/dist +*.pyc +*.egg-info +*.eggs +venv \ No newline at end of file From c7961609f48048fab26e752f6f5ee1265adef6bb Mon Sep 17 00:00:00 2001 From: Rob Emanuele Date: Thu, 15 Jul 2021 19:37:08 -0400 Subject: [PATCH 04/17] Add docker-compose and Docker.dev for docker-based testing --- Dockerfile.dev | 15 +++++++++++++++ docker-compose.yml | 33 +++++++++++++++++++++++++++++++++ pypgstac/requirements-dev.txt | 3 +++ 3 files changed, 51 insertions(+) create mode 100644 Dockerfile.dev create mode 100644 docker-compose.yml create mode 100644 pypgstac/requirements-dev.txt diff --git a/Dockerfile.dev b/Dockerfile.dev new file mode 100644 index 00000000..d3324e1a --- /dev/null +++ b/Dockerfile.dev @@ -0,0 +1,15 @@ +FROM python:3.8-slim + +ENV CURL_CA_BUNDLE /etc/ssl/certs/ca-certificates.crt + +RUN mkdir -p /opt/src/pypgstac + +WORKDIR /opt/src/pypgstac + +COPY pypgstac/requirements-dev.txt /opt/src/pypgstac/requirements-dev.txt +RUN pip install -r requirements-dev.txt + +COPY pypgstac /opt/src/pypgstac +RUN pip install -e . + +WORKDIR /opt/src \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..8bf1fe97 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,33 @@ +services: + dev: + container_name: pgstac-dev + image: pgstac-dev + build: + context: . + dockerfile: Dockerfile.dev + depends_on: + - database + volumes: + - ./:/opt/src + environment: + - PGUSER=username + - PGPASSWORD=password + - PGHOST=database + - PGDATABASE=postgis + database: + container_name: pqgstac-db + image: pqstac-db + build: + context: . + dockerfile: Dockerfile + environment: + - POSTGRES_USER=username + - POSTGRES_PASSWORD=password + - POSTGRES_DB=postgis + ports: + - "5432:5432" + volumes: + - pgstac-pgdata:/var/lib/postgresql/data + - ./:/opt/src +volumes: + pgstac-pgdata: diff --git a/pypgstac/requirements-dev.txt b/pypgstac/requirements-dev.txt new file mode 100644 index 00000000..1625db04 --- /dev/null +++ b/pypgstac/requirements-dev.txt @@ -0,0 +1,3 @@ +flake8==3.8.4 +black==20.8b1 +mypy==0.800 \ No newline at end of file From 7269c8f3fc23aa13f83643e9ee77c6774624f618 Mon Sep 17 00:00:00 2001 From: Rob Emanuele Date: Thu, 15 Jul 2021 19:37:48 -0400 Subject: [PATCH 05/17] Add scripts to control dev environment --- .flake8 | 4 ++++ README.md | 46 ++++++++++++++++++++++++++++++++++++++---- mypy.ini | 4 ++++ scripts/bin/format | 23 +++++++++++++++++++++ scripts/bin/test | 33 ++++++++++++++++++++++++++++++ scripts/cibuild | 19 ++++++++++++++++++ scripts/cipublish | 0 scripts/console | 50 ++++++++++++++++++++++++++++++++++++++++++++++ scripts/format | 21 +++++++++++++++++++ scripts/migrate | 24 ++++++++++++++++++++++ scripts/server | 39 ++++++++++++++++++++++++++++++++++++ scripts/setup | 45 +++++++++++++++++++++++++++++++++++++++++ scripts/test | 32 +++++++++++++++++++++++++++++ scripts/update | 33 ++++++++++++++++++++++++++++++ 14 files changed, 369 insertions(+), 4 deletions(-) create mode 100644 .flake8 create mode 100644 mypy.ini create mode 100755 scripts/bin/format create mode 100755 scripts/bin/test create mode 100755 scripts/cibuild create mode 100755 scripts/cipublish create mode 100755 scripts/console create mode 100755 scripts/format create mode 100755 scripts/migrate create mode 100755 scripts/server create mode 100755 scripts/setup create mode 100755 scripts/test create mode 100755 scripts/update diff --git a/.flake8 b/.flake8 new file mode 100644 index 00000000..cbc2b651 --- /dev/null +++ b/.flake8 @@ -0,0 +1,4 @@ +[flake8] +max-line-length = 88 +extend-ignore = E203, W503, E731, E722 +per-file-ignores = __init__.py:F401 diff --git a/README.md b/README.md index 717d0df0..02e26386 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ poetry build pypgstac pip install dist/pypgstac-[version]-py3-none-any.whl ``` -# Migrations +## Migrations To install the latest version of pgstac on an empty database, you can directly load the primary source code. ``` psql -f pgstac.sql @@ -33,20 +33,20 @@ For each new version of PGStac, two migrations should be added to pypgstac/pypgs - pgstac.[version].sql (equivalent to `cat sql/*.sql > migration.sql && echo "insert into migrations (versions) VALUES ('[version]')" >> migration.sql) - pgstac.[version].[fromversion].sql (Migration to move from existing version to new version, can be created by hand or using the makemigration.sh tool below) -## Running Migrations +### Running Migrations Migrations can be installed by either directly running the appropriate migration sql file for from and target PGStac versions: `psql -f pypgstac/pypgstac/migrations/pgstac.0.1.8.sql` Or by using pypgstac: `pypgstac migrate` -## Creating Migrations Using Schema Diff +### Creating Migrations Using Schema Diff To create a migration from a previous version of pgstac you can calculate the migration from the running instance of pgstac using the makemigration.sh command. This will use docker to copy the schema of the existing database and the new sql into new docker databases and create/test the migration between the two. ``` makemigration.sh postgresql://myuser:mypassword@myhost:myport/mydatabase ``` -# Bulk Data Loading +## Bulk Data Loading A python utility is included which allows to load data from any source openable by smart-open using python in a memory efficient streaming manner using PostgreSQL copy. There are options for collections and items and can be used either as a command line or a library. To load an ndjson of items directly using copy (will fail on any duplicate ids but is the fastest option to load new data you know will not conflict) @@ -63,3 +63,41 @@ To upsert any records, adding anything new and replacing anything with the same ``` pypgstac load items --method upsert ``` + +## Development + +PGStac uses a dockerized development environment. You can set this up using: + +```bash +scripts/setup +``` + +To bring up the development database: +``` +scripts/server +``` + +To run tests, use: +```bash +scripts/test +``` + +To rebuild docker images: +```bash +scripts/update +``` + +To drop into a console, use +```bash +scripts/console +``` + +To drop into a psql console on the database container, use: +```bash +scripts/console --db +``` + +To run migrations on the development database, use +```bash +scripts/migrate +``` \ No newline at end of file diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 00000000..5d9c85c9 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,4 @@ +[mypy] +ignore_missing_imports = True +disallow_untyped_defs = True +namespace_packages = True diff --git a/scripts/bin/format b/scripts/bin/format new file mode 100755 index 00000000..2aac5936 --- /dev/null +++ b/scripts/bin/format @@ -0,0 +1,23 @@ +#!/bin/bash + +set -e + +if [[ "${CI}" ]]; then + set -x +fi + +function usage() { + echo -n \ + "Usage: $(basename "$0") +Format code. + +This scripts is meant to be run inside the dev container. + +" +} + +if [ "${BASH_SOURCE[0]}" = "${0}" ]; then + echo "Formatting pypgstac..." + black pypgstac/pypgstac + black pypgstac/tests +fi diff --git a/scripts/bin/test b/scripts/bin/test new file mode 100755 index 00000000..f4f1da63 --- /dev/null +++ b/scripts/bin/test @@ -0,0 +1,33 @@ +#!/bin/bash + +set -e + +if [[ "${CI}" ]]; then + set -x +fi + +function usage() { + echo -n \ + "Usage: $(basename "$0") +Runs tests for the project. + +This scripts is meant to be run inside the dev container. + +" +} + +if [ "${BASH_SOURCE[0]}" = "${0}" ]; then + + echo "Running mypy..." + mypy pypgstac/pypgstac pypgstac/tests + + echo "Running black..." + black --check pypgstac/pypgstac pypgstac/tests + + echo "Running flake8..." + flake8 pypgstac/pypgstac pypgstac/tests + + echo "Running unit tests..." + python -m unittest discover pypgstac/tests + +fi diff --git a/scripts/cibuild b/scripts/cibuild new file mode 100755 index 00000000..f7b2af69 --- /dev/null +++ b/scripts/cibuild @@ -0,0 +1,19 @@ +#!/bin/bash + +set -e + +if [[ "${CI}" ]]; then + set -x +fi + +function usage() { + echo -n \ + "Usage: $(basename "$0") +CI build for this project. +" +} + +if [ "${BASH_SOURCE[0]}" = "${0}" ]; then + scripts/setup + scripts/test +fi diff --git a/scripts/cipublish b/scripts/cipublish new file mode 100755 index 00000000..e69de29b diff --git a/scripts/console b/scripts/console new file mode 100755 index 00000000..6224bfba --- /dev/null +++ b/scripts/console @@ -0,0 +1,50 @@ +#!/bin/bash + +set -e + +if [[ "${CI}" ]]; then + set -x +fi + +function usage() { + echo -n \ + "Usage: $(basename "$0") [--db] +Start a console in the dev container + +--db: Instead, start a psql console in the database container. +" +} + +while [[ "$#" > 0 ]]; do case $1 in + --db) + DB_CONSOLE=1 + shift + ;; + *) + usage "Unknown parameter passed: $1" + shift + shift + ;; + esac; done + +if [ "${BASH_SOURCE[0]}" = "${0}" ]; then + + if [[ "${DB_CONSOLE}" ]]; then + scripts/server --detach + + docker-compose \ + -f docker-compose.yml \ + run --rm \ + database \ + psql postgres://username:password@database:5432/postgis + + exit 0 + fi + + # Run database migrations + docker-compose \ + -f docker-compose.yml \ + run --rm dev \ + /bin/bash + +fi diff --git a/scripts/format b/scripts/format new file mode 100755 index 00000000..75afec38 --- /dev/null +++ b/scripts/format @@ -0,0 +1,21 @@ +#!/bin/bash + +set -e + +if [[ "${CI}" ]]; then + set -x +fi + +function usage() { + echo -n \ + "Usage: $(basename "$0") +Format code in this project + +" +} + +if [ "${BASH_SOURCE[0]}" = "${0}" ]; then + docker-compose \ + run --rm \ + dev scripts/bin/format; +fi diff --git a/scripts/migrate b/scripts/migrate new file mode 100755 index 00000000..808773c4 --- /dev/null +++ b/scripts/migrate @@ -0,0 +1,24 @@ +#!/bin/bash + +set -e + +if [[ "${CI}" ]]; then + set -x +fi + +function usage() { + echo -n \ + "Usage: $(basename "$0") +Run migrations against the development database. +" +} + +if [ "${BASH_SOURCE[0]}" = "${0}" ]; then + + # Run database migrations + docker-compose \ + -f docker-compose.yml \ + run --rm dev \ + bash -c "pypgstac pgready && pypgstac migrate" + +fi \ No newline at end of file diff --git a/scripts/server b/scripts/server new file mode 100755 index 00000000..9e6c8bd9 --- /dev/null +++ b/scripts/server @@ -0,0 +1,39 @@ +#!/bin/bash + +set -e + +if [[ "${CI}" ]]; then + set -x +fi + +function usage() { + echo $1 + + echo -n \ + "Usage: $(basename "$0") [--detach] +Runs the development database. + +--detach: Run in detached mode. +" +} + +DETACH_ARG="" + +while [[ "$#" > 0 ]]; do case $1 in + --detach) + DETACH_ARG="--detach" + shift + ;; + --help) + usage + exit 0 + shift + ;; + *) break ;; + esac; done + +if [ "${BASH_SOURCE[0]}" = "${0}" ]; then + docker-compose \ + -f docker-compose.yml \ + up ${DETACH_ARG} $@ +fi diff --git a/scripts/setup b/scripts/setup new file mode 100755 index 00000000..b575910f --- /dev/null +++ b/scripts/setup @@ -0,0 +1,45 @@ +#!/bin/bash + +set -e + +if [[ "${CI}" ]]; then + set -x +fi + +function usage() { + echo -n \ + "Usage: $(basename "$0") +Sets up this project for development. +" +} + +if [ "${BASH_SOURCE[0]}" = "${0}" ]; then + # Build docker containers + scripts/update + + echo "migrating..." + scripts/migrate + + echo "Bringing up database..." + scripts/server --detach + + echo "Ingesting development data..." + docker-compose \ + -f docker-compose.yml \ + run --rm \ + dev \ + pypgstac load collections \ + /opt/src/test/testdata/collections.ndjson \ + --method upsert + + docker-compose \ + -f docker-compose.yml \ + run --rm \ + dev \ + pypgstac load items \ + /opt/src/test/testdata/items.ndjson \ + --method upsert + + echo "Done." + +fi diff --git a/scripts/test b/scripts/test new file mode 100755 index 00000000..ad563ff6 --- /dev/null +++ b/scripts/test @@ -0,0 +1,32 @@ +#!/bin/bash + +set -e + +if [[ "${CI}" ]]; then + set -x +fi + +function usage() { + echo -n \ + "Usage: $(basename "$0") [--dev, --db, --migrations, --deploy] +Runs tests for the project. + +" +} + +if [ "${BASH_SOURCE[0]}" = "${0}" ]; then + echo "Running database tests..." + # TODO: Fix these tests + # scripts/server --detach; + # docker-compose \ + # run --rm \ + # -e PGUSER=username \ + # -e PGPASSWORD=password \ + # -e PGHOST=database \ + # database /opt/src/test/test.sh; + + echo "Running pypgstac tests..." + docker-compose \ + run --rm \ + dev scripts/bin/test; +fi diff --git a/scripts/update b/scripts/update new file mode 100755 index 00000000..1b5848d6 --- /dev/null +++ b/scripts/update @@ -0,0 +1,33 @@ +#!/bin/bash + +set -e + +if [[ "${CI}" ]]; then + set -x +fi + +function usage() { + echo -n \ + "Usage: $(basename "$0") [--no-cache] +Builds the docker containers for this project. + +--no-cache: Rebuild all containers from scratch. +" +} + +# Parse args +NO_CACHE=""; +while [[ "$#" > 0 ]]; do case $1 in + --no-cache) NO_CACHE="--no-cache"; shift;; + --help) usage; exit 0; shift;; + *) usage "Unknown parameter passed: $1"; shift; shift;; +esac; done + +if [ "${BASH_SOURCE[0]}" = "${0}" ]; then + + echo "==Building images..." + docker-compose \ + -f docker-compose.yml \ + build ${NO_CACHE} + +fi From 4f04b70982a53e37a9a6f94d7eaa85cf36596915 Mon Sep 17 00:00:00 2001 From: Rob Emanuele Date: Thu, 15 Jul 2021 19:38:06 -0400 Subject: [PATCH 06/17] Modify pypgstac to pass mypy test --- pypgstac/pypgstac/pypgstac.py | 178 +++++++++++++++++++++------------- 1 file changed, 113 insertions(+), 65 deletions(-) diff --git a/pypgstac/pypgstac/pypgstac.py b/pypgstac/pypgstac/pypgstac.py index 62f5ab64..b142b791 100755 --- a/pypgstac/pypgstac/pypgstac.py +++ b/pypgstac/pypgstac/pypgstac.py @@ -1,8 +1,11 @@ import asyncio +from io import TextIOWrapper import os -from typing import List +import time +from typing import Any, AsyncGenerator, Dict, Iterable, List, Optional, TypeVar, Union import asyncpg +from asyncpg.connection import Connection import typer import orjson from smart_open import open @@ -21,11 +24,11 @@ migrations_dir = os.path.join(dirname, "migrations") -def pglogger(conn, message): +def pglogger(message: str) -> None: logging.debug(message) -async def con_init(conn): +async def con_init(conn: Connection) -> None: """Use orjson for json returns.""" await conn.set_type_codec( "json", @@ -42,33 +45,36 @@ async def con_init(conn): class DB: - pg_connection_string = None - connection = None + pg_connection_string: Optional[str] = None + connection: Optional[Connection] = None - def __init__(self, pg_connection_string: str = None): + def __init__(self, pg_connection_string: Optional[str] = None) -> None: self.pg_connection_string = pg_connection_string - async def create_connection(self): - self.connection = await asyncpg.connect( + async def create_connection(self) -> Connection: + connection: Connection = await asyncpg.connect( self.pg_connection_string, server_settings={ "search_path": "pgstac,public", "application_name": "pypgstac", }, ) - await con_init(self.connection) + await con_init(connection) + self.connection = connection return self.connection - async def __aenter__(self): + async def __aenter__(self) -> Connection: if self.connection is None: await self.create_connection() + assert self.connection is not None return self.connection - async def __aexit__(self, exc_type, exc_val, exc_tb): - await self.connection.close() + async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + if self.connection: + await self.connection.close() -async def run_migration(dsn: str = None): +async def run_migration(dsn: Optional[str] = None) -> str: conn = await asyncpg.connect(dsn=dsn) async with conn.transaction(): try: @@ -102,27 +108,33 @@ async def run_migration(dsn: str = None): f"Pypgstac does not have a migration from {oldversion} to {version} ({migration_file})" ) - with open(migration_file) as f: - migration_sql = f.read() - logging.debug(migration_sql) - async with conn.transaction(): - conn.add_log_listener(pglogger) - await conn.execute(migration_sql) - await conn.execute( - """ - INSERT INTO pgstac.migrations (version) - VALUES ($1); - """, - version, - ) - - await conn.close() + open_migration_file = open(migration_file) + if isinstance(open_migration_file, TextIOWrapper): + with open_migration_file as f: + migration_sql = f.read() + logging.debug(migration_sql) + async with conn.transaction(): + conn.add_log_listener(pglogger) + await conn.execute(migration_sql) + await conn.execute( + """ + INSERT INTO pgstac.migrations (version) + VALUES ($1); + """, + version, + ) + + await conn.close() + else: + raise IOError(f"Unable to open {migration_file}") return version @app.command() -def migrate(dsn: str = None): - typer.echo(asyncio.run(run_migration(dsn))) +def migrate(dsn: Optional[str] = None) -> None: + """Migrate a pgstac database""" + version = asyncio.run(run_migration(dsn)) + typer.echo(f'pgstac version {version}') class loadopt(str, Enum): @@ -135,35 +147,42 @@ class tables(str, Enum): items = "items" collections = "collections" +# Types of iterable that load_iterator can support +T = TypeVar('T', Iterable[bytes], Iterable[Dict[str, Any]], Iterable[str]) + -async def aiter(list: List): - for i in list: - if isinstance(i, bytes): - i = i.decode("utf-8") - elif isinstance(i, dict): - i = orjson.dumps(i).decode("utf-8") - if isinstance(i, str): - line = "\n".join( - [ - i.rstrip() - .replace(r"\n", r"\\n") - .replace(r"\t", r"\\t") - ] - ).encode("utf-8") - yield line +async def aiter(list: T) -> AsyncGenerator[bytes, None]: + for item in list: + item_str: str + if isinstance(item, bytes): + item_str = item.decode("utf-8") + elif isinstance(item, dict): + item_str = orjson.dumps(item).decode("utf-8") + elif isinstance(item, str): + item_str = item else: - raise Exception(f"Could not parse {i}") + raise ValueError(f"Cannot load iterator with values of type {type(item)} (value {item})") + + line = "\n".join( + [ + item_str.rstrip() + .replace(r"\n", r"\\n") + .replace(r"\t", r"\\t") + ] + ).encode("utf-8") + yield line -async def copy(iter, table: tables, conn: asyncpg.Connection): + +async def copy(iter: T, table: tables, conn: asyncpg.Connection) -> None: logger.debug(f"copying to {table} directly") logger.debug(f"iter: {iter}") - iter = aiter(iter) + bytes_iter = aiter(iter) async with conn.transaction(): logger.debug("Copying data") await conn.copy_to_table( table, - source=iter, + source=bytes_iter, columns=["content"], format="csv", quote=chr(27), @@ -179,10 +198,10 @@ async def copy(iter, table: tables, conn: asyncpg.Connection): async def copy_ignore_duplicates( - iter, table: tables, conn: asyncpg.Connection -): + iter: T, table: tables, conn: asyncpg.Connection +) -> None: logger.debug(f"inserting to {table} ignoring duplicates") - iter = aiter(iter) + bytes_iter = aiter(iter) async with conn.transaction(): await conn.execute( """ @@ -192,7 +211,7 @@ async def copy_ignore_duplicates( ) await conn.copy_to_table( "pgstactemp", - source=iter, + source=bytes_iter, columns=["content"], format="csv", quote=chr(27), @@ -218,9 +237,9 @@ async def copy_ignore_duplicates( logger.debug("Data Inserted") -async def copy_upsert(iter, table: tables, conn: asyncpg.Connection): +async def copy_upsert(iter: T, table: tables, conn: asyncpg.Connection) -> None: logger.debug(f"upserting to {table}") - iter = aiter(iter) + bytes_iter = aiter(iter) async with conn.transaction(): await conn.execute( """ @@ -230,7 +249,7 @@ async def copy_upsert(iter, table: tables, conn: asyncpg.Connection): ) await conn.copy_to_table( "pgstactemp", - source=iter, + source=bytes_iter, columns=["content"], format="csv", quote=chr(27), @@ -258,24 +277,28 @@ async def copy_upsert(iter, table: tables, conn: asyncpg.Connection): async def load_iterator( - iter, table: tables, conn: asyncpg.Connection, method: loadopt = "insert" + iter: T, table: tables, conn: asyncpg.Connection, method: loadopt = loadopt.insert ): logger.debug(f"Load Iterator Connection: {conn}") - if method == "insert": + if method == loadopt.insert: await copy(iter, table, conn) - elif method == "insert_ignore": + elif method == loadopt.insert_ignore: await copy_ignore_duplicates(iter, table, conn) else: await copy_upsert(iter, table, conn) async def load_ndjson( - file: str, table: tables, method: loadopt = "insert", dsn: str = None -): + file: str, table: tables, method: loadopt = loadopt.insert, dsn: str = None +) -> None: print(f"loading {file} into {table} using {method}") - with open(file, "rb") as f: - async with DB(dsn) as conn: - await load_iterator(f, table, conn, method) + open_file = open(file, "rb") + if isinstance(open_file, TextIOWrapper): + with open_file as f: + async with DB(dsn) as conn: + await load_iterator(f, table, conn, method) + else: + raise IOError(f"Cannot read {file}") @app.command() @@ -286,13 +309,38 @@ def load( method: loadopt = typer.Option( "insert", prompt="How to deal conflicting ids" ), -): +) -> None: + "Load STAC data into a pgstac database." typer.echo( asyncio.run( load_ndjson(file=file, table=table, dsn=dsn, method=method) ) ) +@app.command() +def pgready(dsn: Optional[str] = None) -> None: + """Wait for a pgstac database to accept connections""" + async def wait_on_connection() -> bool: + cnt = 0 + + print("Waiting for pgstac to come online...", end="", flush=True) + while True: + if cnt > 150: + raise Exception("Unable to connect to database") + try: + print(".", end="", flush=True) + conn = await asyncpg.connect() + await conn.execute("SELECT 1") + await conn.close() + print("success!") + return True + except Exception: + time.sleep(0.1) + cnt += 1 + + + asyncio.run(wait_on_connection()) + if __name__ == "__main__": app() From 5614e31ab4ec1f21cdc6503153b81ca4f8ba54e9 Mon Sep 17 00:00:00 2001 From: Rob Emanuele Date: Thu, 15 Jul 2021 19:38:48 -0400 Subject: [PATCH 07/17] Run formatting on pypgstac --- pypgstac/pypgstac/__init__.py | 2 +- pypgstac/pypgstac/pypgstac.py | 33 ++++++++++++--------------------- 2 files changed, 13 insertions(+), 22 deletions(-) diff --git a/pypgstac/pypgstac/__init__.py b/pypgstac/pypgstac/__init__.py index 407b8a2b..6cd38b74 100644 --- a/pypgstac/pypgstac/__init__.py +++ b/pypgstac/pypgstac/__init__.py @@ -1 +1 @@ -__version__ = '0.2.7' +__version__ = "0.2.7" diff --git a/pypgstac/pypgstac/pypgstac.py b/pypgstac/pypgstac/pypgstac.py index b142b791..8d215f50 100755 --- a/pypgstac/pypgstac/pypgstac.py +++ b/pypgstac/pypgstac/pypgstac.py @@ -93,9 +93,7 @@ async def run_migration(dsn: Optional[str] = None) -> str: logging.debug(f"Target database already at version: {version}") return version if oldversion is None: - logging.debug( - f"No pgstac version set, installing {version} from scratch" - ) + logging.debug(f"No pgstac version set, installing {version} from scratch") migration_file = os.path.join(migrations_dir, f"pgstac.{version}.sql") else: logging.debug(f"Migrating from {oldversion} to {version}.") @@ -134,7 +132,7 @@ async def run_migration(dsn: Optional[str] = None) -> str: def migrate(dsn: Optional[str] = None) -> None: """Migrate a pgstac database""" version = asyncio.run(run_migration(dsn)) - typer.echo(f'pgstac version {version}') + typer.echo(f"pgstac version {version}") class loadopt(str, Enum): @@ -147,8 +145,9 @@ class tables(str, Enum): items = "items" collections = "collections" + # Types of iterable that load_iterator can support -T = TypeVar('T', Iterable[bytes], Iterable[Dict[str, Any]], Iterable[str]) +T = TypeVar("T", Iterable[bytes], Iterable[Dict[str, Any]], Iterable[str]) async def aiter(list: T) -> AsyncGenerator[bytes, None]: @@ -161,15 +160,12 @@ async def aiter(list: T) -> AsyncGenerator[bytes, None]: elif isinstance(item, str): item_str = item else: - raise ValueError(f"Cannot load iterator with values of type {type(item)} (value {item})") - + raise ValueError( + f"Cannot load iterator with values of type {type(item)} (value {item})" + ) line = "\n".join( - [ - item_str.rstrip() - .replace(r"\n", r"\\n") - .replace(r"\t", r"\\t") - ] + [item_str.rstrip().replace(r"\n", r"\\n").replace(r"\t", r"\\t")] ).encode("utf-8") yield line @@ -306,20 +302,16 @@ def load( table: tables, file: str, dsn: str = None, - method: loadopt = typer.Option( - "insert", prompt="How to deal conflicting ids" - ), + method: loadopt = typer.Option("insert", prompt="How to deal conflicting ids"), ) -> None: "Load STAC data into a pgstac database." - typer.echo( - asyncio.run( - load_ndjson(file=file, table=table, dsn=dsn, method=method) - ) - ) + typer.echo(asyncio.run(load_ndjson(file=file, table=table, dsn=dsn, method=method))) + @app.command() def pgready(dsn: Optional[str] = None) -> None: """Wait for a pgstac database to accept connections""" + async def wait_on_connection() -> bool: cnt = 0 @@ -338,7 +330,6 @@ async def wait_on_connection() -> bool: time.sleep(0.1) cnt += 1 - asyncio.run(wait_on_connection()) From c8033c942e9d3c03a93bd42950f935f255150fdd Mon Sep 17 00:00:00 2001 From: Rob Emanuele Date: Thu, 15 Jul 2021 19:43:04 -0400 Subject: [PATCH 08/17] Fix linting errors --- pypgstac/pypgstac/pypgstac.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/pypgstac/pypgstac/pypgstac.py b/pypgstac/pypgstac/pypgstac.py index 8d215f50..168a11d3 100755 --- a/pypgstac/pypgstac/pypgstac.py +++ b/pypgstac/pypgstac/pypgstac.py @@ -2,7 +2,7 @@ from io import TextIOWrapper import os import time -from typing import Any, AsyncGenerator, Dict, Iterable, List, Optional, TypeVar, Union +from typing import Any, AsyncGenerator, Dict, Iterable, Optional, TypeVar import asyncpg from asyncpg.connection import Connection @@ -87,7 +87,9 @@ async def run_migration(dsn: Optional[str] = None) -> str: except asyncpg.exceptions.UndefinedTableError: oldversion = None logging.debug( - f"Old Version: {oldversion} New Version: {version} Migrations Dir: {migrations_dir}" + f"Old Version: {oldversion} " + f"New Version: {version} " + f"Migrations Dir: {migrations_dir}" ) if oldversion == version: logging.debug(f"Target database already at version: {version}") @@ -103,7 +105,8 @@ async def run_migration(dsn: Optional[str] = None) -> str: if not os.path.exists(migration_file): raise Exception( - f"Pypgstac does not have a migration from {oldversion} to {version} ({migration_file})" + "Pypgstac does not have a migration " + f"from {oldversion} to {version} ({migration_file})" ) open_migration_file = open(migration_file) @@ -186,7 +189,7 @@ async def copy(iter: T, table: tables, conn: asyncpg.Connection) -> None: ) logger.debug("Backfilling partitions") await conn.execute( - f""" + """ SELECT backfill_partitions(); """ ) @@ -254,7 +257,7 @@ async def copy_upsert(iter: T, table: tables, conn: asyncpg.Connection) -> None: logger.debug("Data Copied") if table == "collections": await conn.execute( - f""" + """ INSERT INTO collections (content) SELECT content FROM pgstactemp ON CONFLICT (id) DO UPDATE @@ -265,7 +268,7 @@ async def copy_upsert(iter: T, table: tables, conn: asyncpg.Connection) -> None: if table == "items": logger.debug("Upserting Data") await conn.execute( - f""" + """ SELECT upsert_item(content) FROM pgstactemp; """ From 0fb52a441772a38e9215629d78719e701dc1679c Mon Sep 17 00:00:00 2001 From: Rob Emanuele Date: Thu, 15 Jul 2021 19:44:42 -0400 Subject: [PATCH 09/17] Fix unused arg in new pgready command --- pypgstac/pypgstac/pypgstac.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pypgstac/pypgstac/pypgstac.py b/pypgstac/pypgstac/pypgstac.py index 168a11d3..6e6f8abf 100755 --- a/pypgstac/pypgstac/pypgstac.py +++ b/pypgstac/pypgstac/pypgstac.py @@ -324,7 +324,7 @@ async def wait_on_connection() -> bool: raise Exception("Unable to connect to database") try: print(".", end="", flush=True) - conn = await asyncpg.connect() + conn = await asyncpg.connect(dsn=dsn) await conn.execute("SELECT 1") await conn.close() print("success!") From 177ab1ceca21af4f03b61b8217338b2600a33a56 Mon Sep 17 00:00:00 2001 From: Rob Emanuele Date: Thu, 15 Jul 2021 19:57:15 -0400 Subject: [PATCH 10/17] Use BufferedIOBase, not TextIOWrapper --- pypgstac/pypgstac/pypgstac.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pypgstac/pypgstac/pypgstac.py b/pypgstac/pypgstac/pypgstac.py index 6e6f8abf..58faa2d8 100755 --- a/pypgstac/pypgstac/pypgstac.py +++ b/pypgstac/pypgstac/pypgstac.py @@ -1,5 +1,5 @@ import asyncio -from io import TextIOWrapper +from io import BufferedIOBase import os import time from typing import Any, AsyncGenerator, Dict, Iterable, Optional, TypeVar @@ -110,7 +110,7 @@ async def run_migration(dsn: Optional[str] = None) -> str: ) open_migration_file = open(migration_file) - if isinstance(open_migration_file, TextIOWrapper): + if isinstance(open_migration_file, BufferedIOBase): with open_migration_file as f: migration_sql = f.read() logging.debug(migration_sql) @@ -170,6 +170,7 @@ async def aiter(list: T) -> AsyncGenerator[bytes, None]: line = "\n".join( [item_str.rstrip().replace(r"\n", r"\\n").replace(r"\t", r"\\t")] ).encode("utf-8") + yield line @@ -292,7 +293,7 @@ async def load_ndjson( ) -> None: print(f"loading {file} into {table} using {method}") open_file = open(file, "rb") - if isinstance(open_file, TextIOWrapper): + if isinstance(open_file, BufferedIOBase): with open_file as f: async with DB(dsn) as conn: await load_iterator(f, table, conn, method) From 19dbd6645eaf091da336807815a37f70efa21889 Mon Sep 17 00:00:00 2001 From: Rob Emanuele Date: Thu, 15 Jul 2021 19:57:24 -0400 Subject: [PATCH 11/17] Add test to load test data --- pypgstac/tests/test_load.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 pypgstac/tests/test_load.py diff --git a/pypgstac/tests/test_load.py b/pypgstac/tests/test_load.py new file mode 100644 index 00000000..7a7088f2 --- /dev/null +++ b/pypgstac/tests/test_load.py @@ -0,0 +1,22 @@ +import asyncio +from pathlib import Path +import unittest + +from pypgstac.pypgstac import load_ndjson, loadopt, tables + +HERE = Path(__file__).parent +TEST_DATA_DIR = HERE.parent.parent / "test" / "testdata" +TEST_COLLECTIONS = TEST_DATA_DIR / "collections.ndjson" +TEST_ITEMS = TEST_DATA_DIR / "items.ndjson" + + +class LoadTest(unittest.TestCase): + def test_load_testdata_succeeds(self) -> None: + asyncio.run( + load_ndjson( + str(TEST_COLLECTIONS), table=tables.collections, method=loadopt.upsert + ) + ) + asyncio.run( + load_ndjson(str(TEST_ITEMS), table=tables.items, method=loadopt.upsert) + ) From 26df853ba27eeef830974e0d6f6d2511c4fda59f Mon Sep 17 00:00:00 2001 From: Rob Emanuele Date: Thu, 15 Jul 2021 20:00:22 -0400 Subject: [PATCH 12/17] Fix issue with load - end lines with a newline --- pypgstac/pypgstac/pypgstac.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pypgstac/pypgstac/pypgstac.py b/pypgstac/pypgstac/pypgstac.py index 58faa2d8..43cecf8b 100755 --- a/pypgstac/pypgstac/pypgstac.py +++ b/pypgstac/pypgstac/pypgstac.py @@ -167,11 +167,12 @@ async def aiter(list: T) -> AsyncGenerator[bytes, None]: f"Cannot load iterator with values of type {type(item)} (value {item})" ) - line = "\n".join( + lines = "\n".join( [item_str.rstrip().replace(r"\n", r"\\n").replace(r"\t", r"\\t")] - ).encode("utf-8") + ) + encoded_lines = (lines + "\n").encode("utf-8") - yield line + yield encoded_lines async def copy(iter: T, table: tables, conn: asyncpg.Connection) -> None: From d49b59be493f39c59a8e86f1a04e14c9cc65d4fc Mon Sep 17 00:00:00 2001 From: Rob Emanuele Date: Thu, 15 Jul 2021 20:01:04 -0400 Subject: [PATCH 13/17] Fix collection ID to match items --- test/testdata/collections.ndjson | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/testdata/collections.ndjson b/test/testdata/collections.ndjson index 7239730e..054b93f5 100644 --- a/test/testdata/collections.ndjson +++ b/test/testdata/collections.ndjson @@ -1 +1 @@ -{"id":"naip","stac_version":"1.0.0-beta.2","description":"The National Agriculture Imagery Program (NAIP) acquires aerial imagery\\nduring the agricultural growing seasons in the continental U.S.\\n\\nNAIP projects are contracted each year based upon available funding and the\\nFSA imagery acquisition cycle. Beginning in 2003, NAIP was acquired on\\na 5-year cycle. 2008 was a transition year, and a three-year cycle began\\nin 2009.\\n\\nNAIP imagery is acquired at a one-meter ground sample distance (GSD) with a\\nhorizontal accuracy that matches within six meters of photo-identifiable\\nground control points, which are used during image inspection.\\n\\nOlder images were collected using 3 bands (Red, Green, and Blue: RGB), but\\nnewer imagery is usually collected with an additional near-infrared band\\n(RGBN).","links":[{"rel":"root","href":"/collection.json","type":"application/json"},{"rel":"self","href":"/collection.json","type":"application/json"}],"stac_extensions":[],"title":"NAIP: National Agriculture Imagery Program","extent":{"spatial":{"bbox":[[-124.784,24.744,-66.951,49.346]]},"temporal":{"interval":[["2011-01-01T00:00:00Z","2019-01-01T00:00:00Z"]]}},"license":"PDDL-1.0","providers":[{"name":"USDA Farm Service Agency","roles":["producer","licensor"],"url":"https://www.fsa.usda.gov/programs-and-services/aerial-photography/imagery-programs/naip-imagery/"}]} +{"id":"usda-naip","stac_version":"1.0.0-beta.2","description":"The National Agriculture Imagery Program (NAIP) acquires aerial imagery\\nduring the agricultural growing seasons in the continental U.S.\\n\\nNAIP projects are contracted each year based upon available funding and the\\nFSA imagery acquisition cycle. Beginning in 2003, NAIP was acquired on\\na 5-year cycle. 2008 was a transition year, and a three-year cycle began\\nin 2009.\\n\\nNAIP imagery is acquired at a one-meter ground sample distance (GSD) with a\\nhorizontal accuracy that matches within six meters of photo-identifiable\\nground control points, which are used during image inspection.\\n\\nOlder images were collected using 3 bands (Red, Green, and Blue: RGB), but\\nnewer imagery is usually collected with an additional near-infrared band\\n(RGBN).","links":[{"rel":"root","href":"/collection.json","type":"application/json"},{"rel":"self","href":"/collection.json","type":"application/json"}],"stac_extensions":[],"title":"NAIP: National Agriculture Imagery Program","extent":{"spatial":{"bbox":[[-124.784,24.744,-66.951,49.346]]},"temporal":{"interval":[["2011-01-01T00:00:00Z","2019-01-01T00:00:00Z"]]}},"license":"PDDL-1.0","providers":[{"name":"USDA Farm Service Agency","roles":["producer","licensor"],"url":"https://www.fsa.usda.gov/programs-and-services/aerial-photography/imagery-programs/naip-imagery/"}]} From fb5bda3e76bc4835a94889f6b060193678bd3b61 Mon Sep 17 00:00:00 2001 From: Rob Emanuele Date: Thu, 15 Jul 2021 20:09:01 -0400 Subject: [PATCH 14/17] Add CI build and publish workflows --- .github/workflows/continuous-integration.yml | 16 ++++++ .github/workflows/release.yml | 30 +++++++++++ scripts/cipublish | 57 ++++++++++++++++++++ 3 files changed, 103 insertions(+) create mode 100644 .github/workflows/continuous-integration.yml create mode 100644 .github/workflows/release.yml diff --git a/.github/workflows/continuous-integration.yml b/.github/workflows/continuous-integration.yml new file mode 100644 index 00000000..30f98373 --- /dev/null +++ b/.github/workflows/continuous-integration.yml @@ -0,0 +1,16 @@ +name: CI + +on: + push: + branches: + - main + pull_request: + +jobs: + test: + name: test + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Execute linters and test suites + run: ./scripts/cibuild diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 00000000..532ab81d --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,30 @@ +name: Release + +on: + push: + tags: + - "*" + +jobs: + release: + name: release + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Set up Python 3.x + uses: actions/setup-python@v2 + with: + python-version: "3.x" + + - name: Install release dependencies + run: | + python -m pip install --upgrade pip + pip install setuptools wheel twine + + - name: Build and publish package + env: + TWINE_USERNAME: ${{ secrets.PYPI_STACUTILS_USERNAME }} + TWINE_PASSWORD: ${{ secrets.PYPI_STACUTILS_PASSWORD }} + run: | + scripts/cipublish diff --git a/scripts/cipublish b/scripts/cipublish index e69de29b..25a0b885 100755 --- a/scripts/cipublish +++ b/scripts/cipublish @@ -0,0 +1,57 @@ +#!/bin/bash + +set -e + +if [[ -n "${CI}" ]]; then + set -x +fi + +function usage() { + echo -n \ + "Usage: $(basename "$0") +Publish pypgstac + +Options: +--test Publish to test pypi +" +} + +POSITIONAL=() +while [[ $# -gt 0 ]] +do + key="$1" + case $key in + + --help) + usage + exit 0 + shift + ;; + + --test) + TEST_PYPI="--repository testpypi" + shift + ;; + + *) # unknown option + POSITIONAL+=("$1") # save it in an array for later + shift # past argument + ;; + esac +done +set -- "${POSITIONAL[@]}" # restore positional parameters + +# Fail if this isn't CI and we aren't publishing to test pypi +if [ -z "${TEST_PYPI}" ] && [ -z "${CI}" ]; then + echo "Only CI can publish to pypi" + exit 1 +fi + +if [ "${BASH_SOURCE[0]}" = "${0}" ]; then + pushd pypgstac + rm -rf dist + pip install build twine + python -m build --sdist --wheel + twine upload ${TEST_PYPI} dist/* + popd +fi From 4ca6ab1397982728c0774ed8f3fad4e62d70f38e Mon Sep 17 00:00:00 2001 From: Rob Emanuele Date: Fri, 16 Jul 2021 13:36:19 -0400 Subject: [PATCH 15/17] Set pypgstac on the python path in the dev container --- Dockerfile.dev | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Dockerfile.dev b/Dockerfile.dev index d3324e1a..8d7ac48c 100644 --- a/Dockerfile.dev +++ b/Dockerfile.dev @@ -12,4 +12,6 @@ RUN pip install -r requirements-dev.txt COPY pypgstac /opt/src/pypgstac RUN pip install -e . +ENV PYTHONPATH=/opt/src/pypgstac:${PYTHONPATH} + WORKDIR /opt/src \ No newline at end of file From 67df40c10950b45423496c144bdce5c514c4977f Mon Sep 17 00:00:00 2001 From: Rob Emanuele Date: Fri, 16 Jul 2021 15:03:04 -0400 Subject: [PATCH 16/17] Don't install an editable version in the dev image This causes the required egg-info to be created in the source directory, which is mounted in when running the dev container and can cause issues. Because the source is on the PYTHONPATH any edits mounted into the container will be used over the installed source --- Dockerfile.dev | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile.dev b/Dockerfile.dev index 8d7ac48c..9978c0ff 100644 --- a/Dockerfile.dev +++ b/Dockerfile.dev @@ -10,7 +10,7 @@ COPY pypgstac/requirements-dev.txt /opt/src/pypgstac/requirements-dev.txt RUN pip install -r requirements-dev.txt COPY pypgstac /opt/src/pypgstac -RUN pip install -e . +RUN pip install . ENV PYTHONPATH=/opt/src/pypgstac:${PYTHONPATH} From d999dac7369203c4c8805af6f949f9e74d8ff4cb Mon Sep 17 00:00:00 2001 From: Rob Emanuele Date: Fri, 16 Jul 2021 15:04:52 -0400 Subject: [PATCH 17/17] Fix typo in docker-compose --- docker-compose.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 8bf1fe97..6b1d7828 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,8 +15,8 @@ services: - PGHOST=database - PGDATABASE=postgis database: - container_name: pqgstac-db - image: pqstac-db + container_name: pgstac-db + image: pgstac-db build: context: . dockerfile: Dockerfile