Skip to content

Commit 508732d

Browse files
committed
[FIX] spreadsheet: batch process spreadsheet_revision.commands
Some dbs have `spreadsheet_revision` records with over 10 millions characters in `commands`. If the number of record is high, this leads to memory errors. We distribute them in buckets of `memory_cap` maximum size, and use a named cursor to process them in buckets. Commands larger than `memory_cap` fit in one bucket.
1 parent 86409e5 commit 508732d

File tree

1 file changed

+77
-20
lines changed

1 file changed

+77
-20
lines changed

src/util/spreadsheet/misc.py

Lines changed: 77 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,89 @@
1-
from .. import json
1+
from .. import json, pg
2+
3+
MEMORY_CAP = 2 * 10**8 # 200Mo
24

35

46
def iter_commands(cr, like_all=(), like_any=()):
57
if not (bool(like_all) ^ bool(like_any)):
68
raise ValueError("Please specify `like_all` or `like_any`, not both")
9+
710
cr.execute(
811
"""
9-
SELECT id,
10-
commands
11-
FROM spreadsheet_revision
12-
WHERE commands LIKE {}(%s::text[])
13-
""".format("ALL" if like_all else "ANY"),
12+
WITH RECURSIVE
13+
start_bucket AS (
14+
SELECT 1 AS bucket
15+
),
16+
ordered_rows AS (
17+
SELECT id,
18+
LENGTH(commands) AS length,
19+
ROW_NUMBER() OVER (ORDER BY LENGTH(commands), id) AS rownum
20+
FROM spreadsheet_revision
21+
WHERE commands LIKE {}(%s::text[])
22+
),
23+
assign AS (
24+
SELECT o.id AS id,
25+
o.length,
26+
o.rownum,
27+
sb.bucket AS bucket,
28+
o.length AS sum
29+
FROM ordered_rows o, start_bucket sb
30+
WHERE o.rownum = 1
31+
32+
UNION ALL
33+
34+
SELECT o.id AS id,
35+
o.length,
36+
o.rownum,
37+
CASE
38+
WHEN a.sum + o.length > {memory_cap} THEN a.bucket + 1
39+
ELSE a.bucket
40+
END AS bucket,
41+
CASE
42+
WHEN a.sum + o.length > {memory_cap} THEN o.length
43+
ELSE a.sum + o.length
44+
END AS sum
45+
FROM assign a
46+
JOIN ordered_rows o
47+
ON o.rownum = a.rownum + 1
48+
)
49+
SELECT count(*),ARRAY_AGG(id)
50+
FROM assign
51+
GROUP BY bucket
52+
ORDER BY bucket;
53+
""".format("ALL" if like_all else "ANY", memory_cap=MEMORY_CAP),
1454
[list(like_all or like_any)],
1555
)
16-
for revision_id, data in cr.fetchall():
17-
data_loaded = json.loads(data)
18-
if "commands" not in data_loaded:
19-
continue
20-
data_old = json.dumps(data_loaded, sort_keys=True)
21-
22-
changed = yield data_loaded["commands"]
23-
if changed is None:
24-
changed = data_old != json.dumps(data_loaded, sort_keys=True)
25-
26-
if changed:
27-
cr.execute(
28-
"UPDATE spreadsheet_revision SET commands=%s WHERE id=%s", [json.dumps(data_loaded), revision_id]
29-
)
56+
buckets = cr.fetchall()
57+
if not buckets:
58+
return
59+
60+
with pg.named_cursor(cr) as ncr:
61+
ncr.execute(
62+
"""
63+
SELECT id,
64+
commands
65+
FROM spreadsheet_revision
66+
WHERE id IN %s
67+
ORDER BY LENGTH(commands), id
68+
""",
69+
[tuple([i for bucket in buckets for i in bucket[1]])],
70+
)
71+
for size, _ in buckets:
72+
for revision_id, data in ncr.fetchmany(size=size):
73+
data_loaded = json.loads(data)
74+
if "commands" not in data_loaded:
75+
continue
76+
data_old = json.dumps(data_loaded, sort_keys=True)
77+
78+
changed = yield data_loaded["commands"]
79+
if changed is None:
80+
changed = data_old != json.dumps(data_loaded, sort_keys=True)
81+
82+
if changed:
83+
cr.execute(
84+
"UPDATE spreadsheet_revision SET commands=%s WHERE id=%s",
85+
[json.dumps(data_loaded), revision_id],
86+
)
3087

3188

3289
def process_commands(cr, callback, *args, **kwargs):

0 commit comments

Comments
 (0)