Skip to content

Commit 827c7f0

Browse files
committed
Drop Blosc
This provides value, but not much in practice. It more often gets in the way and causes pain due to version mismatches.
1 parent cced80d commit 827c7f0

File tree

6 files changed

+7
-85
lines changed

6 files changed

+7
-85
lines changed

continuous_integration/environment-3.9.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ dependencies:
3535
- pytest-repeat
3636
- pytest-rerunfailures
3737
- pytest-timeout
38-
- python-blosc # Only tested here
3938
- python-snappy # Only tested here
4039
- pytorch # Only tested here
4140
- requests

distributed/distributed-schema.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -687,7 +687,7 @@ properties:
687687
description: |
688688
The compression algorithm to use
689689
690-
This could be one of lz4, snappy, zstd, or blosc
690+
This could be one of lz4, snappy, zstd
691691
692692
offload:
693693
type:

distributed/protocol/compression.py

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,12 @@
99
import random
1010
from collections.abc import Callable
1111
from contextlib import suppress
12-
from functools import partial
1312
from typing import Literal
1413

1514
from tlz import identity
1615

1716
import dask
1817

19-
try:
20-
import blosc
21-
22-
n = blosc.set_nthreads(2)
23-
if hasattr("blosc", "releasegil"):
24-
blosc.set_releasegil(True)
25-
except ImportError:
26-
blosc = False
27-
2818
from distributed.utils import ensure_bytes
2919

3020
compressions: dict[
@@ -122,15 +112,6 @@ def zstd_decompress(data):
122112
compressions["zstd"] = {"compress": zstd_compress, "decompress": zstd_decompress}
123113

124114

125-
with suppress(ImportError):
126-
import blosc
127-
128-
compressions["blosc"] = {
129-
"compress": partial(blosc.compress, clevel=5, cname="lz4"),
130-
"decompress": blosc.decompress,
131-
}
132-
133-
134115
def get_default_compression():
135116
default = dask.config.get("distributed.comm.compression")
136117
if default != "auto":
@@ -212,14 +193,7 @@ def maybe_compress(
212193
else:
213194
nbytes = len(payload)
214195

215-
if default_compression and blosc and type(payload) is memoryview:
216-
# Blosc does itemsize-aware shuffling, resulting in better compression
217-
compressed = blosc.compress(
218-
payload, typesize=payload.itemsize, cname="lz4", clevel=5
219-
)
220-
compression = "blosc"
221-
else:
222-
compressed = compress(ensure_bytes(payload))
196+
compressed = compress(ensure_bytes(payload))
223197

224198
if len(compressed) > 0.9 * nbytes: # full data not very compressible
225199
return None, payload

distributed/protocol/tests/test_numpy.py

Lines changed: 1 addition & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -225,12 +225,6 @@ def test_compress_numpy():
225225
assert sum(map(nbytes, frames)) < x.nbytes
226226

227227
header = msgpack.loads(frames[1], raw=False, use_list=False, strict_map_key=False)
228-
try:
229-
import blosc # noqa: F401
230-
except ImportError:
231-
pass
232-
else:
233-
assert all(c == "blosc" for c in header["compression"])
234228

235229

236230
def test_compress_memoryview():
@@ -240,49 +234,12 @@ def test_compress_memoryview():
240234
assert len(compressed) < len(mv)
241235

242236

243-
@pytest.mark.skip
244-
def test_dont_compress_uncompressable_data():
245-
blosc = pytest.importorskip("blosc")
246-
x = np.random.randint(0, 255, size=100000).astype("uint8")
247-
header, [data] = serialize(x)
248-
assert "compression" not in header
249-
assert data == x.data
250-
251-
x = np.ones(1000000)
252-
header, [data] = serialize(x)
253-
assert header["compression"] == ["blosc"]
254-
assert data != x.data
255-
256-
x = np.ones(100)
257-
header, [data] = serialize(x)
258-
assert "compression" not in header
259-
if isinstance(data, memoryview):
260-
assert data.obj.ctypes.data == x.ctypes.data
261-
262-
263237
@gen_cluster(client=True, timeout=60)
264-
async def test_dumps_large_blosc(c, s, a, b):
238+
async def test_dumps_large(c, s, a, b):
265239
x = c.submit(np.ones, BIG_BYTES_SHARD_SIZE * 2, dtype="u1")
266240
await x
267241

268242

269-
def test_compression_takes_advantage_of_itemsize():
270-
pytest.importorskip("lz4")
271-
blosc = pytest.importorskip("blosc")
272-
x = np.arange(1000000, dtype="i8")
273-
274-
assert len(blosc.compress(x.data, typesize=8)) < len(
275-
blosc.compress(x.data, typesize=1)
276-
)
277-
278-
_, a = serialize(x)
279-
aa = [maybe_compress(frame)[1] for frame in a]
280-
_, b = serialize(x.view("u1"))
281-
bb = [maybe_compress(frame)[1] for frame in b]
282-
283-
assert sum(map(nbytes, aa)) < sum(map(nbytes, bb))
284-
285-
286243
@pytest.mark.parametrize(
287244
"x",
288245
[

distributed/protocol/tests/test_protocol.py

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,7 @@ def test_maybe_compress(lib, compression):
8080

8181
payload = b"0" * 10000
8282
rc, rd = maybe_compress(f(payload), compression=compression)
83-
# For some reason compressing memoryviews can force blosc...
84-
assert rc in (compression, "blosc")
83+
assert rc == compression
8584
assert compressions[rc]["decompress"](rd) == payload
8685

8786

@@ -218,14 +217,8 @@ def test_maybe_compress_memoryviews():
218217
pytest.importorskip("lz4")
219218
x = np.arange(1000000, dtype="int64")
220219
compression, payload = maybe_compress(x.data)
221-
try:
222-
import blosc # noqa: F401
223-
except ImportError:
224-
assert compression == "lz4"
225-
assert len(payload) < x.nbytes * 0.75
226-
else:
227-
assert compression == "blosc"
228-
assert len(payload) < x.nbytes / 10
220+
assert compression == "lz4"
221+
assert len(payload) < x.nbytes * 0.75
229222

230223

231224
@pytest.mark.parametrize("serializers", [("dask",), ("cuda",)])

distributed/versions.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,11 @@
2525
("numpy", lambda p: p.__version__),
2626
("pandas", lambda p: p.__version__),
2727
("lz4", lambda p: p.__version__),
28-
("blosc", lambda p: p.__version__),
2928
]
3029

3130

3231
# only these scheduler packages will be checked for version mismatch
33-
scheduler_relevant_packages = {pkg for pkg, _ in required_packages} | {"lz4", "blosc"}
32+
scheduler_relevant_packages = {pkg for pkg, _ in required_packages} | {"lz4"}
3433

3534

3635
# notes to be displayed for mismatch packages

0 commit comments

Comments
 (0)