forked from networkx/nx-parallel
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtournament.py
More file actions
95 lines (74 loc) · 3.28 KB
/
tournament.py
File metadata and controls
95 lines (74 loc) · 3.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import networkx as nx
from joblib import Parallel, delayed
from networkx.algorithms.simple_paths import is_simple_path as is_path
import nx_parallel as nxp
__all__ = [
"is_reachable",
"tournament_is_strongly_connected",
]
def is_reachable(G, s, t, get_chunks="chunks"):
"""The function parallelizes the calculation of two
neighborhoods of vertices in `G` and checks closure conditions for each
neighborhood subset in parallel.
networkx.tournament.is_reachable : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.tournament.is_reachable.html
Parameters
----------
get_chunks : str, function (default = "chunks")
A function that takes in a list of all the nodes as input and returns an
iterable `node_chunks`. The default chunking is done by slicing the `nodes`
into `n` chunks, where `n` is the total number of CPU cores available.
"""
def two_neighborhood_close(G, chunk):
tnc = []
for v in chunk:
S = {
x
for x in G
if x == v or x in G[v] or any(is_path(G, [v, z, x]) for z in G)
}
tnc.append(not (is_closed(G, S) and s in S and t not in S))
return all(tnc)
def is_closed(G, nodes):
return all(v in G[u] for u in set(G) - nodes for v in nodes)
if hasattr(G, "graph_object"):
G = G.graph_object
cpu_count = nxp.cpu_count()
if get_chunks == "chunks":
num_in_chunk = max(len(G) // cpu_count, 1)
node_chunks = nxp.chunks(G, num_in_chunk)
else:
node_chunks = get_chunks(G)
return all(
Parallel(n_jobs=cpu_count)(
delayed(two_neighborhood_close)(G, chunk) for chunk in node_chunks
)
)
def tournament_is_strongly_connected(G, get_chunks="chunks"):
"""The parallel computation is implemented by dividing the
nodes into chunks and then checking whether each node is reachable from each
other node in parallel.
Note, this function uses the name `tournament_is_strongly_connected` while
dispatching to the backend implementation. So, `nxp.tournament.is_strongly_connected`
will result in an error. Use `nxp.tournament_is_strongly_connected` instead.
networkx.tournament.is_strongly_connected : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.tournament.is_strongly_connected.html
Parameters
----------
get_chunks : str, function (default = "chunks")
A function that takes in a list of all the nodes as input and returns an
iterable `node_chunks`. The default chunking is done by slicing the `nodes`
into `n` chunks, where `n` is the total number of CPU cores available.
"""
if hasattr(G, "graph_object"):
G = G.graph_object
def is_reachable_subset(G, chunk):
return all(nx.tournament.is_reachable(G, u, v) for v in chunk for u in G)
cpu_count = nxp.cpu_count()
if get_chunks == "chunks":
num_in_chunk = max(min(len(G) // cpu_count, 10), 1)
node_chunks = nxp.chunks(G, num_in_chunk)
else:
node_chunks = get_chunks(G)
results = Parallel(n_jobs=cpu_count)(
delayed(is_reachable_subset)(G, chunk) for chunk in node_chunks
)
return all(results)