-
-
Notifications
You must be signed in to change notification settings - Fork 35
Expand file tree
/
Copy pathcluster.py
More file actions
257 lines (205 loc) · 8.6 KB
/
cluster.py
File metadata and controls
257 lines (205 loc) · 8.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
from itertools import combinations, chain
from collections import Counter
from joblib import Parallel, delayed
import nx_parallel as nxp
import networkx as nx
from networkx.algorithms.cluster import (
_directed_weighted_triangles_and_degree_iter,
_directed_triangles_and_degree_iter,
_weighted_triangles_and_degree_iter,
_triangles_and_degree_iter,
)
__all__ = [
"square_clustering",
"triangles",
"clustering",
"average_clustering",
]
@nxp._configure_if_nx_active()
def square_clustering(G, nodes=None, get_chunks="chunks"):
"""The nodes are chunked into `node_chunks` and then the square clustering
coefficient for all `node_chunks` are computed in parallel over `n_jobs` number
of CPU cores.
networkx.square_clustering: https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.cluster.square_clustering.html
Parameters
----------
get_chunks : str, function (default = "chunks")
A function that takes in a list of all the nodes (or nbunch) as input and
returns an iterable `node_chunks`. The default chunking is done by slicing the
`nodes` into `n_jobs` number of chunks.
"""
def _compute_clustering_chunk(node_iter_chunk):
result_chunk = []
for v in node_iter_chunk:
clustering = 0
potential = 0
v_nbrs = G_nbrs_as_sets[v]
for u, w in combinations(v_nbrs, 2):
u_nbrs = G_nbrs_as_sets[u]
w_nbrs = G_nbrs_as_sets[w]
squares = len((u_nbrs & w_nbrs) - {v})
clustering += squares
degm = squares + 1
if w in u_nbrs:
degm += 1
potential += (len(u_nbrs) - degm) + (len(w_nbrs) - degm) + squares
if potential > 0:
clustering /= potential
result_chunk += [(v, clustering)]
return result_chunk
if hasattr(G, "graph_object"):
G = G.graph_object
# ignore self-loops as per networkx 3.5
G_nbrs_as_sets = {node: set(G[node]) - {node} for node in G}
node_iter = list(G.nbunch_iter(nodes))
n_jobs = nxp.get_n_jobs()
if get_chunks == "chunks":
node_iter_chunks = nxp.chunks(node_iter, n_jobs)
else:
node_iter_chunks = get_chunks(node_iter)
result = Parallel()(
delayed(_compute_clustering_chunk)(node_iter_chunk)
for node_iter_chunk in node_iter_chunks
)
clustering = dict(chain.from_iterable(result))
if nodes in G:
return clustering[nodes]
return clustering
@nxp._configure_if_nx_active(should_run=nxp.should_run_if_nodes_none)
def triangles(G, nodes=None, get_chunks="chunks"):
"""The nodes are chunked into `node_chunks` and for all `node_chunks`
the number of triangles that include a node as one vertex is computed
in parallel over `n_jobs` number of CPU cores.
networkx.triangles : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.cluster.triangles.html#networkx.algorithms.cluster.triangles.html
Parameters
----------
get_chunks : str, function (default = "chunks")
A function that takes in a list of all the nodes (or nbunch) as input and
returns an iterable `node_chunks`. The default chunking is done by slicing the
`nodes` into `n_jobs` number of chunks.
"""
def _compute_triangles_chunk(node_iter_chunk, later_nbrs):
triangle_counts = Counter()
for node1 in node_iter_chunk:
neighbors = later_nbrs[node1]
for node2 in neighbors:
third_nodes = neighbors & later_nbrs[node2]
m = len(third_nodes)
triangle_counts[node1] += m
triangle_counts[node2] += m
triangle_counts.update(third_nodes)
return triangle_counts
if hasattr(G, "graph_object"):
G = G.graph_object
# Use parallel version only if nodes is None (i.e., all nodes requested)
if nodes is not None:
if nodes in G:
return next(_triangles_and_degree_iter(G, nodes))[2] // 2
return {v: t // 2 for v, d, t, _ in _triangles_and_degree_iter(G, nodes)}
# Use parallel version for all nodes in G
nodes = list(G)
later_nbrs = {}
for node, neighbors in G.adjacency():
later_nbrs[node] = {n for n in neighbors if n not in later_nbrs and n != node}
n_jobs = nxp.get_n_jobs()
if get_chunks == "chunks":
node_iter_chunks = nxp.chunks(nodes, n_jobs)
else:
node_iter_chunks = get_chunks(nodes)
results = Parallel()(
delayed(_compute_triangles_chunk)(node_iter_chunk, later_nbrs)
for node_iter_chunk in node_iter_chunks
)
triangle_counts = Counter(dict.fromkeys(G, 0))
for result in results:
triangle_counts.update(result)
return triangle_counts
@nxp._configure_if_nx_active()
def clustering(G, nodes=None, weight=None, get_chunks="chunks"):
"""The nodes are chunked into `node_chunks` and then the clustering
coefficient for all `node_chunks` is computed in parallel over `n_jobs`
number of CPU cores.
networkx.clustering: https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.cluster.clustering.html
Parameters
----------
get_chunks : str, function (default = "chunks")
A function that takes in a list of all the nodes (or nbunch) as input and
returns an iterable `node_chunks`. The default chunking is done by slicing the
`nodes` into `n_jobs` number of chunks.
"""
def _compute_chunk(chunk):
if G.is_directed():
if weight is not None:
td_iter = _directed_weighted_triangles_and_degree_iter(G, chunk, weight)
clusterc = {
v: 0 if t == 0 else t / ((dt * (dt - 1) - 2 * db) * 2)
for v, dt, db, t in td_iter
}
else:
td_iter = _directed_triangles_and_degree_iter(G, chunk)
clusterc = {
v: 0 if t == 0 else t / ((dt * (dt - 1) - 2 * db) * 2)
for v, dt, db, t in td_iter
}
else:
# The formula 2*T/(d*(d-1)) from docs is t/(d*(d-1)) here b/c t==2*T
if weight is not None:
td_iter = _weighted_triangles_and_degree_iter(G, chunk, weight)
clusterc = {
v: 0 if t == 0 else t / (d * (d - 1)) for v, d, t in td_iter
}
else:
td_iter = _triangles_and_degree_iter(G, chunk)
clusterc = {
v: 0 if t == 0 else t / (d * (d - 1)) for v, d, t, _ in td_iter
}
return clusterc
if hasattr(G, "graph_object"):
G = G.graph_object
n_jobs = nxp.get_n_jobs()
nodes_to_chunk = list(G.nbunch_iter(nodes))
if get_chunks == "chunks":
node_chunks = nxp.chunks(nodes_to_chunk, n_jobs)
else:
node_chunks = get_chunks(nodes_to_chunk)
results = Parallel()(delayed(_compute_chunk)(chunk) for chunk in node_chunks)
clusterc = {}
for result in results:
clusterc.update(result)
if nodes in G:
return clusterc[nodes]
return clusterc
@nxp._configure_if_nx_active()
def average_clustering(
G, nodes=None, weight=None, count_zeros=True, get_chunks="chunks"
):
"""The nodes are chunked into `node_chunks` and then the average clustering
coefficient for all `node_chunks` is computed in parallel over `n_jobs`
number of CPU cores.
networkx.average_clustering: https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.cluster.average_clustering.html
Parameters
----------
get_chunks : str, function (default = "chunks")
A function that takes in a list of all the nodes (or nbunch) as input and
returns an iterable `node_chunks`. The default chunking is done by slicing the
`nodes` into `n_jobs` number of chunks.
"""
def _compute_chunk(chunk):
return nx.clustering(G, chunk, weight=weight)
if hasattr(G, "graph_object"):
G = G.graph_object
n_jobs = nxp.get_n_jobs()
if nodes is None:
nodes = list(G)
if get_chunks == "chunks":
node_chunks = nxp.chunks(nodes, n_jobs)
else:
node_chunks = get_chunks(nodes)
results = Parallel()(delayed(_compute_chunk)(chunk) for chunk in node_chunks)
clustering = {}
for result in results:
clustering.update(result)
c = clustering.values()
if not count_zeros:
c = [v for v in c if abs(v) > 0]
return sum(c) / len(c)