diff --git a/.circleci/config.yml b/.circleci/config.yml index 907e1850..ab1ae916 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -12,6 +12,11 @@ executors: machine: image: ubuntu-2404:current + gpu-executor: + machine: + image: linux-cuda-12:default + resource_class: gpu.nvidia.small.multi + jobs: lint: executor: python-executor @@ -79,6 +84,45 @@ jobs: name: Run NetworkX tests command: ./run_nx_tests.sh + test-gpu: + parameters: + python_version: + type: string + executor: gpu-executor + steps: + - checkout + + - run: + name: Set up ArangoDB + command: | + chmod +x starter.sh + ./starter.sh + + - run: + name: Setup Python + command: | + pyenv --version + pyenv install -f << parameters.python_version >> + pyenv global << parameters.python_version >> + + - run: + name: Setup pip + command: python -m pip install --upgrade pip setuptools wheel + + - run: + name: Install packages + command: pip install .[dev] + + - run: + name: Install cuda related dependencies + command: | + pip install pylibcugraph-cu12 --extra-index-url https://pypi.nvidia.com + pip install nx-cugraph-cu12 --extra-index-url https://pypi.nvidia.com + + - run: + name: Run local gpu tests + command: pytest tests/test.py -k "test_gpu" --run-gpu-tests + workflows: version: 2 build: @@ -87,4 +131,11 @@ workflows: - test: matrix: parameters: - python_version: ["3.10", "3.11", "3.12.2"] \ No newline at end of file + python_version: ["3.10", "3.11", "3.12.2"] + - test-gpu: + requires: + - lint + - test + matrix: + parameters: + python_version: ["3.10", "3.11"] # "3.12" # TODO: Revisit 3.12 \ No newline at end of file diff --git a/_nx_arangodb/__init__.py b/_nx_arangodb/__init__.py index 557dcef7..e79e600c 100644 --- a/_nx_arangodb/__init__.py +++ b/_nx_arangodb/__init__.py @@ -82,6 +82,7 @@ def get_info(): "read_parallelism": None, "read_batch_size": None, "write_batch_size": None, + "use_gpu": True, } return d diff --git a/nx_arangodb/classes/digraph.py b/nx_arangodb/classes/digraph.py index d1931963..7a3bbf33 100644 --- a/nx_arangodb/classes/digraph.py +++ b/nx_arangodb/classes/digraph.py @@ -34,6 +34,7 @@ def __init__( read_parallelism: int = 10, read_batch_size: int = 100000, write_batch_size: int = 50000, + write_async: bool = True, symmetrize_edges: bool = False, use_experimental_views: bool = False, *args: Any, @@ -50,6 +51,7 @@ def __init__( read_parallelism, read_batch_size, write_batch_size, + write_async, symmetrize_edges, use_experimental_views, *args, diff --git a/nx_arangodb/classes/graph.py b/nx_arangodb/classes/graph.py index 23ab36ac..1a728ee4 100644 --- a/nx_arangodb/classes/graph.py +++ b/nx_arangodb/classes/graph.py @@ -3,8 +3,6 @@ from typing import Any, Callable, ClassVar import networkx as nx -import numpy as np -import numpy.typing as npt from adbnx_adapter import ADBNX_Adapter from arango import ArangoClient from arango.cursor import Cursor @@ -57,6 +55,7 @@ def __init__( read_parallelism: int = 10, read_batch_size: int = 100000, write_batch_size: int = 50000, + write_async: bool = True, symmetrize_edges: bool = False, use_experimental_views: bool = False, *args: Any, @@ -168,7 +167,7 @@ def edge_type_func(u: str, v: str) -> str: incoming_graph_data, edge_definitions=edge_definitions, batch_size=self.write_batch_size, - use_async=True, + use_async=write_async, ) else: @@ -211,6 +210,7 @@ def _set_arangodb_backend_config(self) -> None: config.read_parallelism = self.read_parallelism config.read_batch_size = self.read_batch_size config.write_batch_size = self.write_batch_size + config.use_gpu = True # Only used by default if nx-cugraph is available def _set_factory_methods(self) -> None: """Set the factory methods for the graph, _node, and _adj dictionaries. diff --git a/nx_arangodb/classes/multidigraph.py b/nx_arangodb/classes/multidigraph.py index 597c37b8..8bee4830 100644 --- a/nx_arangodb/classes/multidigraph.py +++ b/nx_arangodb/classes/multidigraph.py @@ -33,6 +33,7 @@ def __init__( read_parallelism: int = 10, read_batch_size: int = 100000, write_batch_size: int = 50000, + write_async: bool = True, symmetrize_edges: bool = False, use_experimental_views: bool = False, *args: Any, @@ -49,6 +50,7 @@ def __init__( read_parallelism, read_batch_size, write_batch_size, + write_async, symmetrize_edges, use_experimental_views, *args, diff --git a/nx_arangodb/classes/multigraph.py b/nx_arangodb/classes/multigraph.py index 4efa205b..a90d43de 100644 --- a/nx_arangodb/classes/multigraph.py +++ b/nx_arangodb/classes/multigraph.py @@ -34,6 +34,7 @@ def __init__( read_parallelism: int = 10, read_batch_size: int = 100000, write_batch_size: int = 50000, + write_async: bool = True, symmetrize_edges: bool = False, use_experimental_views: bool = False, *args: Any, @@ -50,6 +51,7 @@ def __init__( read_parallelism, read_batch_size, write_batch_size, + write_async, symmetrize_edges, use_experimental_views, *args, diff --git a/nx_arangodb/convert.py b/nx_arangodb/convert.py index 0133111f..4da24451 100644 --- a/nx_arangodb/convert.py +++ b/nx_arangodb/convert.py @@ -16,11 +16,11 @@ import numpy as np import nx_cugraph as nxcg - GPU_ENABLED = True - logger.info("NetworkX-cuGraph is enabled.") + GPU_AVAILABLE = True + logger.info("NetworkX-cuGraph is available.") except Exception as e: - GPU_ENABLED = False - logger.info(f"NetworkX-cuGraph is disabled: {e}.") + GPU_AVAILABLE = False + logger.info(f"NetworkX-cuGraph is unavailable: {e}.") __all__ = [ "_to_nx_graph", @@ -58,7 +58,7 @@ def _to_nxadb_graph( raise TypeError(f"Expected nxadb.Graph or nx.Graph; got {type(G)}") -if GPU_ENABLED: +if GPU_AVAILABLE: def _to_nxcg_graph(G: Any, as_directed: bool = False) -> nxcg.Graph: logger.debug(f"_to_nxcg_graph for {G.__class__.__name__}") @@ -161,7 +161,7 @@ def nxadb_to_nx(G: nxadb.Graph) -> nx.Graph: return G_NX -if GPU_ENABLED: +if GPU_AVAILABLE: def nxadb_to_nxcg(G: nxadb.Graph, as_directed: bool = False) -> nxcg.Graph: if G.use_nxcg_cache and G.nxcg_graph is not None: diff --git a/nx_arangodb/interface.py b/nx_arangodb/interface.py index ada504d0..725c110d 100644 --- a/nx_arangodb/interface.py +++ b/nx_arangodb/interface.py @@ -64,7 +64,7 @@ def _auto_func(func_name: str, /, *args: Any, **kwargs: Any) -> Any: # TODO: Use `nx.config.backends.arangodb.backend_priority` instead backend_priority = [] - if nxadb.convert.GPU_ENABLED: + if nxadb.convert.GPU_AVAILABLE and nx.config.backends.arangodb.use_gpu: backend_priority.append("cugraph") for backend in backend_priority: diff --git a/tests/conftest.py b/tests/conftest.py index 2756d557..f807426f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,7 @@ import logging import os +import sys +from io import StringIO from typing import Any import networkx as nx @@ -14,6 +16,7 @@ logger.setLevel(logging.INFO) db: StandardDatabase +run_gpu_tests: bool def pytest_addoption(parser: Any) -> None: @@ -21,6 +24,9 @@ def pytest_addoption(parser: Any) -> None: parser.addoption("--dbName", action="store", default="_system") parser.addoption("--username", action="store", default="root") parser.addoption("--password", action="store", default="test") + parser.addoption( + "--run-gpu-tests", action="store_true", default=False, help="Run GPU tests" + ) def pytest_configure(config: Any) -> None: @@ -48,6 +54,9 @@ def pytest_configure(config: Any) -> None: os.environ["DATABASE_PASSWORD"] = con["password"] os.environ["DATABASE_NAME"] = con["dbName"] + global run_gpu_tests + run_gpu_tests = config.getoption("--run-gpu-tests") + @pytest.fixture(scope="function") def load_karate_graph() -> None: @@ -100,3 +109,28 @@ def create_line_graph(load_attributes: set[str]) -> nxadb.Graph: name="LineGraph", edge_collections_attributes=load_attributes, ) + + +def create_grid_graph(graph_cls: type[nxadb.Graph]) -> nxadb.Graph: + global db + if db.has_graph("GridGraph"): + return graph_cls(name="GridGraph") + + grid_graph = nx.grid_graph(dim=(500, 500)) + return graph_cls( + incoming_graph_data=grid_graph, name="GridGraph", write_async=False + ) + + +# Taken from: +# https://stackoverflow.com/questions/16571150/how-to-capture-stdout-output-from-a-python-function-call +class Capturing(list[str]): + def __enter__(self): + self._stdout = sys.stdout + sys.stdout = self._stringio = StringIO() + return self + + def __exit__(self, *args): + self.extend(self._stringio.getvalue().splitlines()) + del self._stringio # free up some memory + sys.stdout = self._stdout diff --git a/tests/test.py b/tests/test.py index 1b37f654..40718fc0 100644 --- a/tests/test.py +++ b/tests/test.py @@ -1,7 +1,7 @@ +import time from typing import Any, Callable, Dict, Union import networkx as nx -import phenolrs import pytest from arango import DocumentDeleteError from phenolrs.networkx.typings import ( @@ -15,7 +15,7 @@ from nx_arangodb.classes.dict.adj import AdjListOuterDict, EdgeAttrDict, EdgeKeyDict from nx_arangodb.classes.dict.node import NodeAttrDict, NodeDict -from .conftest import create_line_graph, db +from .conftest import Capturing, create_grid_graph, create_line_graph, db, run_gpu_tests G_NX = nx.karate_club_graph() @@ -38,7 +38,11 @@ def assert_same_dict_values( if type(next(iter(d2.keys()))) == int: d2 = {f"person/{k}": v for k, v in d2.items()} - assert d1.keys() == d2.keys(), "Dictionaries have different keys" + d1_keys = set(d1.keys()) + d2_keys = set(d2.keys()) + difference = d1_keys ^ d2_keys + assert difference == set(), "Dictionaries have different keys" + for key in d1: m = f"Values for key '{key}' are not equal up to digit {digit}" assert round(d1[key], digit) == round(d2[key], digit), m @@ -50,10 +54,12 @@ def assert_bc(d1: dict[str | int, float], d2: dict[str | int, float]) -> None: assert_same_dict_values(d1, d2, 14) -def assert_pagerank(d1: dict[str | int, float], d2: dict[str | int, float]) -> None: +def assert_pagerank( + d1: dict[str | int, float], d2: dict[str | int, float], digit: int = 15 +) -> None: assert d1 assert d2 - assert_same_dict_values(d1, d2, 15) + assert_same_dict_values(d1, d2, digit) def assert_louvain(l1: list[set[Any]], l2: list[set[Any]]) -> None: @@ -315,6 +321,57 @@ def test_shortest_path_remote_algorithm(load_karate_graph: Any) -> None: assert r_3 != r_4 +@pytest.mark.parametrize( + "graph_cls", + [ + (nxadb.Graph), + (nxadb.DiGraph), + (nxadb.MultiGraph), + (nxadb.MultiDiGraph), + ], +) +def test_gpu_pagerank(graph_cls: type[nxadb.Graph]) -> None: + if not run_gpu_tests: + pytest.skip("GPU tests are disabled") + + graph = create_grid_graph(graph_cls) + + assert nxadb.convert.GPU_AVAILABLE is True + assert nx.config.backends.arangodb.use_gpu is True + + res_gpu = None + res_cpu = None + + # Measure GPU execution time + start_gpu = time.time() + + # Note: While this works, we should use the logger or some alternative + # approach testing this. Via stdout is not the best way to test this. + with Capturing() as output_gpu: + res_gpu = nx.pagerank(graph) + + assert any( + "NXCG Graph construction took" in line for line in output_gpu + ), "Expected output not found in GPU execution" + + gpu_time = time.time() - start_gpu + + # Disable GPU and measure CPU execution time + nx.config.backends.arangodb.use_gpu = False + start_cpu = time.time() + with Capturing() as output_cpu: + res_cpu = nx.pagerank(graph) + + output_cpu_list = list(output_cpu) + assert len(output_cpu_list) == 1 + assert "Graph 'GridGraph' load took" in output_cpu_list[0] + + cpu_time = time.time() - start_cpu + + assert gpu_time < cpu_time, "GPU execution should be faster than CPU execution" + assert_pagerank(res_gpu, res_cpu, 10) + + @pytest.mark.parametrize( "graph_cls", [