Skip to content

Integer Scaling In Zarr 3 #2926

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
mickyals opened this issue Mar 23, 2025 · 4 comments
Closed

Integer Scaling In Zarr 3 #2926

mickyals opened this issue Mar 23, 2025 · 4 comments

Comments

@mickyals
Copy link

I'm currently converting thousands of nc files into a single zarr store. However, the zarr store needs much more storage than the discrete nc files combined. I've encountered a similar issue on stackoverflow but this is in reference to Zarr 2. On disk, the arrays within the nc file are stored as int16 but the conversion works on the opened nc files which are float32. My question is, how can the stored arrays be further compressed? Is storage as int16 possible and during reading the arrays are uncompressed? I know zarr 3 has brought about relatively significant change compared to previous iterations so perhaps there are some points you can provide for more efficient compression and storage?

@rabernat
Copy link
Contributor

Could you shore more details please? What's the original encoding of the netcdf files? How are you writing the Zarr store? What do you mean by "much more storage"?

If you could provide a minimal reproducible example we will be able to help you much more effectively. As currently worded, your question is a bit vague.

@mickyals
Copy link
Author

Hi really sorry for the lack of detail. Here goes

I am working to turn the 100s thousands of GOES nc files into a zarr store for easier access and use for my research. Currently each file is simply quantized with a scale_factor and add_offset so when stored the arrays are int16, halving the number of bytes required for compared to float32 when accessed and viewed via xarray or similar packages.

C16 Band on GOES18 for example

Variable: CMI_C16
{'dtype': dtype('int16'), 'source': 'https://redoak.cs.toronto.edu/twitcher/ows/proxy/thredds/dodsC/datasets/GOES/GOES18/ABI-L2-MCMIPF/2022/209/00/OR_ABI-L2-MCMIPF-M6_G18_s20222090000172_e20222090009486_c20222090009581.nc', 'original_shape': (5424, 5424), '_FillValue': -1, '_Unsigned': 'true', 'scale_factor': 0.05508153, 'add_offset': 92.7, 'coordinates': 'band_id_C16 band_wavelength_C16 t y x'}

While there are workarounds for this, for example I could simply add the required metadata for xarray scale the data. I suspect that may not be ideal as it is possible that each file may possess unique scaling and offset values, it would be best to apply the quantization at the zarr storage level.

My current pipeline ingests GOES data in geostationary projection, regrids it to rectilinear using xesmf and compresses these float arrays into zarr storage as float32. However, I've noticed how important the quantization is for more efficient storage as a single 16 Band nc file increases from ~300MB to 1.2GB even with the compression level maxed. I guess what I am asking here is whether quantization is also supported by Zarr 3 so files can be compressed and quantized for storage.

Type               : Array
Zarr format        : 3
Data type          : DataType.float32
Shape              : (5, 8133, 8130)
Chunk shape        : (336, 512, 512)
Order              : C
Read-only          : False
Store type         : LocalStore
Filters            : ()
Serializer         : BytesCodec(endian=<Endian.little: 'little'>)
Compressors        : (ZstdCodec(level=9, checksum=False),)
No. bytes          : 1322425800 (1.2G)
No. bytes stored   : 325550794
Storage ratio      : 4.1
Chunks Initialized : 0

Main Processing Code


    def process_files(self, file_paths: list[str], store_name: str, target_grid_file: str,
                      regridder_weights: str = None, satellite: str = 'west',
                      chunks: tuple[int] = (336, 256, 256), shards: int = None,
                      dt_units: str = "seconds since 2000-01-01T12:00:00", dt_calendar: str = "proleptic_gregorian"):
        """

        :param file_paths: List of input NetCDF files
        :param store_name: Output Zarr store name
        :param target_grid: Target grid definition file
        :param regridder_weights: Precomputed regridder weights
        :param satellite: Satellite identifier ('west' or 'east')
        :param chunks: Zarr chunk dimensions
        :param batch_size: Number of files to process per batch
        """
        LOGGER.debug("Starting processing netCDF input files")
        LOGGER.debug("Creating sample dataset to calculate initial extents and weights using netCDF file at '%s'", file_paths[0])
        # Initialize components
        sample_ds = xr.open_dataset(file_paths[0], chunks='auto', drop_variables=self.config.UNUSED_DS_VARIABLES)

        # Calculate geocoordinates
        lons, lats = self.calculate_geocoordinates(sample_ds, satellite)

        # Set coordinates
        sample_ds = self.add_coordinates(sample_ds, lons, lats)

        # Target grid
        target_ds = xr.open_dataset(target_grid_file, chunks='auto')

        # Initialize regridder
        self.regridder = xe.Regridder(
            sample_ds,
            target_ds,
            method='bilinear',
            periodic=False,
            weights=regridder_weights
        )

        sample_ds = self.regridder(sample_ds['CMI_C07'])
        mask = sample_ds.where(sample_ds < 197.3).values

        # Initialize Zarr store
        self.initialize_zarr_store(store_name, self.config.GOES18_METADATA if 'west' in satellite else self.config.GOES16_METADATA)

        # Create coordinate arrays
        LOGGER.debug("Create coordinate arrays from sample dataset")
        if 'lat' not in self.root_group:
            lat = zarr.create_array(self.zarr_store, name='lat', shape=sample_ds['lat'].values.shape, dtype=np.float32,
                                    attributes=self.config.GOES18_LAT_LON_METADATA['lat'] if 'west' in satellite else self.config.GOES16_LAT_LON_METADATA['lat'],
                                    dimension_names=['lat'])
            lat[:] = sample_ds['lat'].values

        if 'lon' not in self.root_group:
            lon = zarr.create_array(self.zarr_store, name='lon', shape=sample_ds['lon'].values.shape, dtype=np.float32,
                                   attributes=self.config.GOES18_LAT_LON_METADATA['lon'] if 'west' in satellite else self.config.GOES16_LAT_LON_METADATA['lon'],
                                   dimension_names=['lon'])
            lon[:] = sample_ds['lon'].values
        
        batch_size = chunks[0]
        # Process in batches
        # iterate through each batch
        LOGGER.debug("Starting batch processing of netCDF files. Total files: %d Batch size: %d", len(file_paths), batch_size)
        for i in range(0, len(file_paths), batch_size):
            # Open and concatenate files
            batch_end = min(len(file_paths), i + batch_size)
            LOGGER.debug("Starting batch of files from '%s' to '%s' (indexes %d to %d)", file_paths[i], file_paths[batch_end - 1], i, batch_end)
            ds = self.add_coordinates(xr.open_mfdataset(
                file_paths[i: batch_end],
                combine="nested",
                concat_dim="t",
                chunks='auto',
                drop_variables=self.config.UNUSED_DS_VARIABLES
            ), lats, lons)

            # Encode time values
            encoded_time, _, _ = xr.coding.times.encode_cf_datetime(ds['t'].values, units=dt_units,
                                                                    calendar=dt_calendar, dtype=np.float64)
            # Write time values to Zarr
            if 't' in self.root_group:
                ta = zarr.open_array(self.zarr_store, path="t")
                ta.append(encoded_time)
            else:
                ta = zarr.create_array(self.zarr_store, name="t", shape=encoded_time.shape, dtype=np.float64,
                                       attributes={"units": dt_units, "calendar": dt_calendar, "standard_name": "time"},
                                       dimension_names=['t'])
                ta[:] = encoded_time

            # Process bands
            with concurrent.futures.ThreadPoolExecutor() as executor:
                executor.map(partial(self.process_band, ds=ds, chunks=chunks, mask=mask, shards=shards),
                             self.config.BANDS)
            LOGGER.info("Completed conversion of batch from file %d to %d.", i, batch_end)
        # Consolidate metadata
        zarr.consolidate_metadata(self.zarr_store)

processor = GOESProcessor(args.satellite, config, compressors=zarr.codecs.ZstdCodec(level=args.compression_level))
processor.process_files(
        file_paths=file_paths,
        store_name=args.store_path,
        target_grid_file=get_target_grid_file(args.satellite, args.grid_lon_extent),
        regridder_weights=args.regridder_weight_file,
        satellite=args.satellite,
        chunks=(args.chunk_size, 512, 512)
    )

Git Repo of code

@rabernat
Copy link
Contributor

I guess what I am asking here is whether quantization is also supported by Zarr 3 so files can be compressed and quantized for storage.

Yes you can definitely achieve this with Zarr! However, you should realize that such compression is lossy, i.e. it will introduce noise and reduce the effective precision of your data. You need to decide how important that is for your application vs. storage volume.

With Xarray and Zarr, you could achieve this by manually setting dtype (to int16) plus scale_factor and add_offset in the Xarray encoding for each data variable. With this approach, you will need to manually pick appropriate values for scale_factor and add_offset.

I would not that the scale_factor and add_offset approach is considered old-fashioned by modern standards, and more sophisticated lossy methods have been developed. Check out xbitinfo for a theoretically grounded approach to reducing precision. With this approach, you analyze your data to figure out how many "significant" bits it actually has, and then truncate the binary data appropriately. Running this truncated data through a lossless compressor such as zstd usually delivers significant compression ratios.

You could also look into PCodec, which does (lossless) wonders on floating point data.

@mickyals
Copy link
Author

Will give this a try. Thanks! 🙏

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants