Skip to content

Commit 8082fdd

Browse files
mkittimanzt
authored andcommitted
Implement Zstandard streaming decompression (#47)
* Initial attempt at implementing Zstandard streaming decompression * Drop Node 16 due to exeptions support, bump setup-node to v4 * Add streaming decompression tests for streaming compression * Apply prettier * Add EXPORT_EXCEPTION_HANDLING_HELPERS * Code clean up * Fix typo in build.sh * Free decompression stream when throwing or exiting * Throw error if there is additional input data after decompressing frame * Run prettier on test/zstd.test.js * Update with recommendations from Janelia SciCompSoft code review * Update codecs/zstd/zstd_codec.cpp * Add changeset
1 parent b9a8ca9 commit 8082fdd

File tree

11 files changed

+214
-55
lines changed

11 files changed

+214
-55
lines changed

.changeset/forty-seals-grow.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'numcodecs': patch
3+
---
4+
5+
feat: Support Zstandard streaming decompression

.github/workflows/ci.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ jobs:
1616
runs-on: ubuntu-latest
1717
strategy:
1818
matrix:
19-
node-version: [16.x, 18.x, 20.x]
19+
node-version: [18.x, 20.x, 22.x]
2020
steps:
2121
- uses: actions/checkout@v3
22-
- uses: actions/setup-node@v3
22+
- uses: actions/setup-node@v4
2323
with:
2424
node-version: ${{ matrix.node-version }}
2525
- run: npm ci
@@ -29,7 +29,7 @@ jobs:
2929
runs-on: ubuntu-latest
3030
steps:
3131
- uses: actions/checkout@v3
32-
- uses: actions/setup-node@v3
32+
- uses: actions/setup-node@v4
3333
with:
3434
node-version: 20
3535
- run: npm ci

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,6 @@ dist/
2121

2222
# ignore codec builds
2323
codecs/**/build/
24+
# pixi environments
25+
.pixi
26+
*.egg-info

codecs/zstd/build.sh

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,16 +49,18 @@ cd ../../../
4949
${OPTIMIZE} \
5050
-I "$CODEC_DIR/lib" \
5151
--closure 1 \
52-
--bind \
52+
-fwasm-exceptions \
5353
-s ALLOW_MEMORY_GROWTH=1 \
5454
-s MODULARIZE=1 \
5555
-s EXPORT_ES6=1 \
5656
-s USE_ES6_IMPORT_META=0 \
5757
-s ENVIRONMENT="webview" \
5858
-s MALLOC=emmalloc \
5959
-s EXPORT_NAME="zstd_codec" \
60+
-s EXPORT_EXCEPTION_HANDLING_HELPERS=1 \
6061
-x c++ \
6162
--std=c++17 \
63+
-lembind \
6264
-lzstd \
6365
-L "$BUILD_DIR/lib" \
6466
-o "zstd_codec.js"

codecs/zstd/zstd_codec.cpp

Lines changed: 102 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,114 @@ val compress(std::string source, int level)
2020

2121
val decompress(std::string source)
2222
{
23+
// number of bytes to grow the output buffer if more space is needed
24+
const size_t DEST_GROWTH_SIZE = ZSTD_DStreamOutSize();
25+
2326
// setup source buffer
2427
const char *source_ptr = source.c_str();
2528
int source_size = source.size();
2629

30+
// create and initialize decompression stream / context
31+
// use the streaming API so that we can handle unkown frame content size
32+
ZSTD_DStream *zds = ZSTD_createDStream();
33+
34+
size_t status = ZSTD_initDStream(zds);
35+
if (ZSTD_isError(status)) {
36+
ZSTD_freeDStream(zds);
37+
throw std::runtime_error("zstd codec error: " + std::string(ZSTD_getErrorName(status)));
38+
}
39+
40+
ZSTD_inBuffer input = {
41+
.src = (void*) source.c_str(),
42+
.size = (size_t) source.size(),
43+
.pos = 0
44+
};
45+
ZSTD_outBuffer output = {
46+
.dst = NULL,
47+
.size = 0,
48+
.pos = 0,
49+
};
50+
2751
// setup destination buffer
28-
int dest_size = ZSTD_getFrameContentSize(source_ptr, source_size);
29-
dest_ptr = (char *)malloc((size_t)dest_size);
52+
unsigned long long dest_size = ZSTD_getFrameContentSize(source_ptr, source_size);
53+
54+
// If Zstd_compressStream was used, we may not know the frame content size.
55+
// https://github.com/manzt/numcodecs.js/issues/46
56+
if (dest_size == ZSTD_CONTENTSIZE_UNKNOWN) {
57+
// guess decompressed buffer size based on source size
58+
dest_size = source_size*2;
59+
60+
// Initialize the destination size to DEST_GROWTH_SIZE (default: 128 KiB) at minimum
61+
if (dest_size < DEST_GROWTH_SIZE)
62+
dest_size = DEST_GROWTH_SIZE;
63+
64+
} else if (dest_size == ZSTD_CONTENTSIZE_ERROR) {
65+
ZSTD_freeDStream(zds);
66+
throw std::runtime_error("zstd codec error: content size error");
67+
} else if (dest_size < 0) {
68+
// unknown error
69+
ZSTD_freeDStream(zds);
70+
throw std::runtime_error("zstd codec error: unknown ZSTD_getFrameContentSize error");
71+
}
72+
73+
// the output buffer will either be assigned to dest_ptr to be freed by free_result, or freed on error
74+
output.dst = malloc((size_t) dest_size);
75+
76+
if (output.dst == NULL) {
77+
// error, cannot allocate memory
78+
ZSTD_freeDStream(zds);
79+
throw std::runtime_error("zstd codec error: cannot allocate output buffer");
80+
}
81+
82+
output.size = dest_size;
83+
84+
// Call ZSTD_decompressStream repeatedly until status == 0 or error (status < 0)
85+
do {
86+
status = ZSTD_decompressStream(zds, &output, &input);
87+
88+
if (ZSTD_isError(status)) {
89+
if (dest_ptr == output.dst)
90+
dest_ptr = (char *) NULL;
91+
ZSTD_freeDStream(zds);
92+
free(output.dst);
93+
throw std::runtime_error("zstd codec error: " + std::string(ZSTD_getErrorName(status)));
94+
}
95+
96+
if (status > 0 && output.pos == output.size ) {
97+
// attempt to expand output buffer in DEST_GROWTH_SIZE increments
98+
size_t new_size = output.size + DEST_GROWTH_SIZE;
99+
100+
if (new_size < output.size || new_size < DEST_GROWTH_SIZE) {
101+
// overflow error
102+
ZSTD_freeDStream(zds);
103+
free(output.dst);
104+
throw std::runtime_error("zstd codec error: output buffer overflow");
105+
}
106+
107+
// Increase output buffer size
108+
void *new_dst = realloc(output.dst, new_size);
109+
110+
if (new_dst == NULL) {
111+
// free the original pointer if realloc fails.
112+
ZSTD_freeDStream(zds);
113+
free(output.dst);
114+
throw std::runtime_error("zstd codec error: could not expand output buffer");
115+
}
116+
// the old output.dst is freed by realloc is it succeeds
117+
output.dst = new_dst;
118+
119+
output.size = new_size;
120+
}
121+
122+
// status > 0 indicates there are additional bytes to process in this frame
123+
// status == 0 and input.pos < input.size suggests there may be an additional frame
124+
} while (status > 0 || input.pos < input.size);
125+
126+
ZSTD_freeDStream(zds);
127+
128+
dest_ptr = (char *) output.dst;
30129

31-
int decompressed_size = ZSTD_decompress(dest_ptr, dest_size, source_ptr, source_size);
32-
return val(typed_memory_view(decompressed_size, (uint8_t *)dest_ptr));
130+
return val(typed_memory_view(output.pos, (uint8_t *)dest_ptr));
33131
}
34132

35133
void free_result()

codecs/zstd/zstd_codec.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ export interface ZstdModule extends EmscriptenModule {
22
compress(data: BufferSource, level: number): Uint8Array;
33
decompress(data: BufferSource): Uint8Array;
44
free_result(): void;
5+
getExceptionMessage(err: WebAssembly.Exception): [string, string];
56
}
67

78
declare const moduleFactory: EmscriptenModuleFactory<ZstdModule>;

0 commit comments

Comments
 (0)