-
-
Notifications
You must be signed in to change notification settings - Fork 35
ENH : adding parallel implementations of all_pairs_ algos
#33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 21 commits
Commits
Show all changes
24 commits
Select commit
Hold shift + click to select a range
f9f1ff4
initial commit
Schefflera-Arboricola d7fcf95
rm extra docs
Schefflera-Arboricola fcf6612
added unweighted.py rm johnson and all non all_pairs_ algos from weig…
Schefflera-Arboricola 1f68d7d
added all_pairs_node_connectivity
Schefflera-Arboricola 94de91f
Update weighted.py
Schefflera-Arboricola 485270d
modifying G
Schefflera-Arboricola a6a8f0a
fixed all_pairs_node_connectivity
Schefflera-Arboricola 0cba23e
un-updated ParallelGraph class
Schefflera-Arboricola e9c6c06
style fixes
Schefflera-Arboricola 5bc797e
changed all_pairs def
Schefflera-Arboricola 414f2c8
adding directed to _calculate_all_pairs_node_connectivity_subset
Schefflera-Arboricola 4f8a751
updated docs of all funcs
Schefflera-Arboricola 00b310b
style fix
Schefflera-Arboricola 20ec806
added benchmarks
Schefflera-Arboricola 01e42ac
style fix
Schefflera-Arboricola bbbc5f2
added 6 heatmaps(no speedups in 3)
Schefflera-Arboricola cbda3e8
used loky backend in approximation.connectivity.all_pairs_node_connec…
Schefflera-Arboricola c7a341b
added get_chunks to shortest paths algos
Schefflera-Arboricola 95b9217
added chunking in all_pairs_node_connectivity, added benchmarks, fixe…
Schefflera-Arboricola a96dbff
updated docstrings of all 9 funcs
Schefflera-Arboricola 34ef348
Merge branch 'main' into shortest_paths
dschult 07db6dc
Merge branch 'main' into shortest_paths
dschult 5c28ad1
typo fix
Schefflera-Arboricola e0c000b
Merge branch 'main' into shortest_paths
Schefflera-Arboricola File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| from .common import ( | ||
| backends, | ||
| num_nodes, | ||
| edge_prob, | ||
| get_cached_gnp_random_graph, | ||
| Benchmark, | ||
| ) | ||
| import networkx as nx | ||
| import nx_parallel as nxp | ||
|
|
||
|
|
||
| class Connectivity(Benchmark): | ||
| params = [(backends), (num_nodes), (edge_prob)] | ||
| param_names = ["backend", "num_nodes", "edge_prob"] | ||
|
|
||
| def time_approximate_all_pairs_node_connectivity( | ||
| self, backend, num_nodes, edge_prob | ||
| ): | ||
| G = get_cached_gnp_random_graph(num_nodes, edge_prob) | ||
| if backend == "parallel": | ||
| G = nxp.ParallelGraph(G) | ||
| _ = nx.algorithms.approximation.connectivity.all_pairs_node_connectivity(G) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| from .common import ( | ||
| backends, | ||
| num_nodes, | ||
| edge_prob, | ||
| get_cached_gnp_random_graph, | ||
| Benchmark, | ||
| ) | ||
| import networkx as nx | ||
| import nx_parallel as nxp | ||
|
|
||
|
|
||
| class Connectivity(Benchmark): | ||
| params = [(backends), (num_nodes), (edge_prob)] | ||
| param_names = ["backend", "num_nodes", "edge_prob"] | ||
|
|
||
| def time_all_pairs_node_connectivity(self, backend, num_nodes, edge_prob): | ||
| G = get_cached_gnp_random_graph(num_nodes, edge_prob) | ||
| if backend == "parallel": | ||
| G = nxp.ParallelGraph(G) | ||
| _ = nx.algorithms.connectivity.connectivity.all_pairs_node_connectivity(G) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| from .connectivity import * |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| """Parallel implementations of fast approximation for node connectivity""" | ||
| import itertools | ||
| from joblib import Parallel, delayed | ||
| from networkx.algorithms.approximation.connectivity import local_node_connectivity | ||
| import nx_parallel as nxp | ||
|
|
||
| __all__ = [ | ||
| "all_pairs_node_connectivity", | ||
| ] | ||
|
|
||
|
|
||
| def all_pairs_node_connectivity(G, nbunch=None, cutoff=None, get_chunks="chunks"): | ||
| """The parallel implementation first divides the a list of all permutation (in case | ||
| of directed graphs) and combinations (in case of undirected graphs) of `nbunch` | ||
| into chunks and then creates a generator to lazily compute the local node | ||
| connectivities for each chunk, and then employs joblib's `Parallel` function to | ||
| execute these computations in parallel across all available CPU cores. At the end, | ||
| the results are aggregated into a single dictionary and returned. | ||
|
|
||
| Parameters | ||
| ------------ | ||
| get_chunks : str, function (default = "chunks") | ||
| A function that takes in `list(iter_func(nbunch, 2))` as input and returns | ||
| an iterable `pairs_chunks`, here `iter_func` is `permutations` in case of | ||
| directed graphs and `combinations` in case of undirected graphs. The default | ||
| is to create chunks by slicing the list into `n` chunks, where `n` is the | ||
| number of CPU cores, such that size of each chunk is atmost 10, and at least 1. | ||
|
|
||
| networkx.all_pairs_node_connectivity : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.approximation.connectivity.all_pairs_node_connectivity.html | ||
| """ | ||
|
|
||
| if hasattr(G, "graph_object"): | ||
| G = G.graph_object | ||
|
|
||
| if nbunch is None: | ||
| nbunch = G | ||
| else: | ||
| nbunch = set(nbunch) | ||
|
|
||
| directed = G.is_directed() | ||
| if directed: | ||
| iter_func = itertools.permutations | ||
| else: | ||
| iter_func = itertools.combinations | ||
|
|
||
| all_pairs = {n: {} for n in nbunch} | ||
|
|
||
| def _process_pair_chunk(pairs_chunk): | ||
| return [ | ||
| (u, v, local_node_connectivity(G, u, v, cutoff=cutoff)) | ||
| for u, v in pairs_chunk | ||
| ] | ||
|
|
||
| pairs = list(iter_func(nbunch, 2)) | ||
| total_cores = nxp.cpu_count() | ||
| if get_chunks == "chunks": | ||
| num_in_chunk = max(min(len(pairs) // total_cores, 10), 1) | ||
| pairs_chunks = nxp.chunks(pairs, num_in_chunk) | ||
| else: | ||
| pairs_chunks = get_chunks(pairs) | ||
|
|
||
| nc_chunk_generator = ( # nc = node connectivity | ||
| delayed(_process_pair_chunk)(pairs_chunk) for pairs_chunk in pairs_chunks | ||
| ) | ||
|
|
||
| for nc_chunk in Parallel(n_jobs=total_cores)(nc_chunk_generator): | ||
| for u, v, k in nc_chunk: | ||
| all_pairs[u][v] = k | ||
| if not directed: | ||
| all_pairs[v][u] = k | ||
| return all_pairs |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| from .connectivity import * |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| """ | ||
| Parallel flow based connectivity algorithms | ||
| """ | ||
|
|
||
| import itertools | ||
| from networkx.algorithms.flow import build_residual_network | ||
| from networkx.algorithms.connectivity.utils import build_auxiliary_node_connectivity | ||
| from networkx.algorithms.connectivity.connectivity import local_node_connectivity | ||
| from joblib import Parallel, delayed | ||
| import nx_parallel as nxp | ||
|
|
||
| __all__ = [ | ||
| "all_pairs_node_connectivity", | ||
| ] | ||
|
|
||
|
|
||
| def all_pairs_node_connectivity(G, nbunch=None, flow_func=None, get_chunks="chunks"): | ||
| """The parallel implementation first divides the a list of all permutation (in case | ||
| of directed graphs) and combinations (in case of undirected graphs) of `nbunch` | ||
| into chunks and then creates a generator to lazily compute the local node | ||
| connectivities for each chunk, and then employs joblib's `Parallel` function to | ||
| execute these computations in parallel across all available CPU cores. At the end, | ||
| the results are aggregated into a single dictionary and returned. | ||
|
|
||
| Parameters | ||
| ------------ | ||
| get_chunks : str, function (default = "chunks") | ||
| A function that takes in `list(iter_func(nbunch, 2))` as input and returns | ||
| an iterable `pairs_chunks`, here `iter_func` is `permutations` in case of | ||
| directed graphs and `combinations` in case of undirected graphs. The default | ||
| is to create chunks by slicing the list into `n` chunks, where `n` is the | ||
| number of CPU cores, such that size of each chunk is atmost 10, and at least 1. | ||
|
|
||
| networkx.all_pairs_node_connectivity : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.connectivity.connectivity.all_pairs_node_connectivity.html | ||
| """ | ||
|
|
||
| if hasattr(G, "graph_object"): | ||
| G = G.graph_object | ||
|
|
||
| if nbunch is None: | ||
| nbunch = G | ||
| else: | ||
| nbunch = set(nbunch) | ||
|
|
||
| directed = G.is_directed() | ||
| if directed: | ||
| iter_func = itertools.permutations | ||
| else: | ||
| iter_func = itertools.combinations | ||
|
|
||
| all_pairs = {n: {} for n in nbunch} | ||
|
|
||
| # Reuse auxiliary digraph and residual network | ||
| H = build_auxiliary_node_connectivity(G) | ||
| R = build_residual_network(H, "capacity") | ||
| kwargs = {"flow_func": flow_func, "auxiliary": H, "residual": R} | ||
|
|
||
| def _process_pair_chunk(pairs_chunk): | ||
| return [ | ||
| (u, v, local_node_connectivity(G, u, v, **kwargs)) for u, v in pairs_chunk | ||
| ] | ||
|
|
||
| pairs = list(iter_func(nbunch, 2)) | ||
| total_cores = nxp.cpu_count() | ||
| if get_chunks == "chunks": | ||
| num_in_chunk = max(min(len(pairs) // total_cores, 10), 1) | ||
| pairs_chunks = nxp.chunks(pairs, num_in_chunk) | ||
| else: | ||
| pairs_chunks = get_chunks(pairs) | ||
|
|
||
| nc_chunk_generator = ( # nc = node connectivity | ||
| delayed(_process_pair_chunk)(pairs_chunk) for pairs_chunk in pairs_chunks | ||
| ) | ||
|
|
||
| for nc_chunk in Parallel(n_jobs=total_cores)(nc_chunk_generator): | ||
| for u, v, k in nc_chunk: | ||
| all_pairs[u][v] = k | ||
| if not directed: | ||
| all_pairs[v][u] = k | ||
| return all_pairs | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1,3 @@ | ||
| from .generic import * | ||
| from .weighted import * | ||
| from .unweighted import * |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| from networkx.algorithms.shortest_paths.generic import single_source_all_shortest_paths | ||
| from joblib import Parallel, delayed | ||
| import nx_parallel as nxp | ||
|
|
||
| __all__ = [ | ||
| "all_pairs_all_shortest_paths", | ||
| ] | ||
|
|
||
|
|
||
| def all_pairs_all_shortest_paths( | ||
| G, weight=None, method="dijkstra", get_chunks="chunks" | ||
| ): | ||
| """The parallel implementation first divides the nodes into chunks and then | ||
| creates a generator to lazily compute all shortest paths between all nodes for | ||
| each node in `node_chunk`, and then employs joblib's `Parallel` function to | ||
| execute these computations in parallel across all available CPU cores. | ||
|
|
||
| Parameters | ||
| ------------ | ||
| get_chunks : str, function (default = "chunks") | ||
| A function that takes in an iterable of all the nodes as input and returns | ||
| an iterable `node_chunks`. The default chunking is done by slicing the | ||
| `G.nodes` into `n` chunks, where `n` is the number of CPU cores. | ||
|
|
||
| networkx.single_source_all_shortest_paths : https://github.com/networkx/networkx/blob/de85e3fe52879f819e7a7924474fc6be3994e8e4/networkx/algorithms/shortest_paths/generic.py#L606 | ||
| """ | ||
|
|
||
| def _process_node_chunk(node_chunk): | ||
| return [ | ||
| ( | ||
| n, | ||
| dict( | ||
| single_source_all_shortest_paths(G, n, weight=weight, method=method) | ||
| ), | ||
| ) | ||
| for n in node_chunk | ||
| ] | ||
|
|
||
| if hasattr(G, "graph_object"): | ||
| G = G.graph_object | ||
|
|
||
| nodes = G.nodes | ||
| total_cores = nxp.cpu_count() | ||
|
|
||
| if get_chunks == "chunks": | ||
| num_in_chunk = max(len(nodes) // total_cores, 1) | ||
| node_chunks = nxp.chunks(nodes, num_in_chunk) | ||
| else: | ||
| node_chunks = get_chunks(nodes) | ||
|
|
||
| paths_chunk_generator = ( | ||
| delayed(_process_node_chunk)(node_chunk) for node_chunk in node_chunks | ||
| ) | ||
|
|
||
| for path_chunk in Parallel(n_jobs=total_cores)(paths_chunk_generator): | ||
| for path in path_chunk: | ||
| yield path |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.