forked from networkx/nx-parallel
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbetweenness.py
More file actions
88 lines (74 loc) · 2.59 KB
/
betweenness.py
File metadata and controls
88 lines (74 loc) · 2.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
from joblib import Parallel, delayed
from networkx.algorithms.centrality.betweenness import (
_accumulate_basic,
_accumulate_endpoints,
_rescale,
_single_source_dijkstra_path_basic,
_single_source_shortest_path_basic,
)
from networkx.utils import py_random_state
import nx_parallel as nxp
__all__ = ["betweenness_centrality"]
@py_random_state(5)
def betweenness_centrality(
G,
k=None,
normalized=True,
weight=None,
endpoints=False,
seed=None,
get_chunks="chunks",
):
"""The parallel computation is implemented by dividing the nodes into chunks and
computing betweenness centrality for each chunk concurrently.
networkx.betweenness_centrality : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.centrality.betweenness_centrality.html
Parameters
----------
get_chunks : str, function (default = "chunks")
A function that takes in a list of all the nodes as input and returns an
iterable `node_chunks`. The default chunking is done by slicing the
`nodes` into `n` chunks, where `n` is the number of CPU cores.
"""
if hasattr(G, "graph_object"):
G = G.graph_object
if k is None:
nodes = G.nodes
else:
nodes = seed.sample(list(G.nodes), k)
total_cores = nxp.cpu_count()
if get_chunks == "chunks":
node_chunks = nxp.create_iterables(G, "node", total_cores, nodes)
else:
node_chunks = get_chunks(nodes)
bt_cs = Parallel(n_jobs=total_cores)(
delayed(_betweenness_centrality_node_subset)(G, chunk, weight, endpoints)
for chunk in node_chunks
)
# Reducing partial solution
bt_c = bt_cs[0]
for bt in bt_cs[1:]:
for n in bt:
bt_c[n] += bt[n]
betweenness = _rescale(
bt_c,
len(G),
normalized=normalized,
directed=G.is_directed(),
k=k,
endpoints=endpoints,
)
return betweenness
def _betweenness_centrality_node_subset(G, nodes, weight=None, endpoints=False):
betweenness = dict.fromkeys(G, 0.0)
for s in nodes:
# single source shortest paths
if weight is None: # use BFS
S, P, sigma, _ = _single_source_shortest_path_basic(G, s)
else: # use Dijkstra's algorithm
S, P, sigma, _ = _single_source_dijkstra_path_basic(G, s, weight)
# accumulation
if endpoints:
betweenness, delta = _accumulate_endpoints(betweenness, S, P, sigma, s)
else:
betweenness, delta = _accumulate_basic(betweenness, S, P, sigma, s)
return betweenness