1+ """
2+ Download ICON EU 10m wind data (U/V components) from DWD OpenData,
3+ unpack bz2 GRIB2 files in parallel, and merge into a single NetCDF file
4+ compatible with OpenDrift's GenericModelReader.
5+
6+ Requirements:
7+ pip install requests cfgrib xarray netCDF4 eccodes
8+ (eccodes must also be installed system-wide: apt install libeccodes-dev
9+ or conda install -c conda-forge eccodes)
10+ """
11+ import os
12+ import bz2
13+ import re
14+ import argparse
15+ import tempfile
16+ import json
17+ import requests
18+
19+ import numpy as np
20+ import xarray as xr
21+
22+ from concurrent .futures import ThreadPoolExecutor , as_completed
23+ from pathlib import Path
24+
25+
26+ # ──────────────────────────────── Helpers ─────────────────────────────────── #
27+
28+ def list_bz2_files (url : str ) -> list [str ]:
29+ """Fetch the DWD directory listing and return all sorted .bz2 filenames."""
30+ resp = requests .get (url , timeout = 30 )
31+ resp .raise_for_status ()
32+ files = re .findall (r'href="([^"]+\.bz2)"' , resp .text )
33+ return sorted (set (files ))
34+
35+
36+ def download_and_decompress (url : str , dest_path : Path ) -> Path :
37+ """Download a .bz2 file, decompress it in-memory, write GRIB2 to *dest_path*.
38+
39+ Returns *dest_path* on success so futures can report it easily.
40+ Raises on any HTTP or decompression error.
41+ """
42+ resp = requests .get (url , timeout = 120 , stream = True )
43+ resp .raise_for_status ()
44+ dest_path .write_bytes (bz2 .decompress (resp .content ))
45+ return dest_path
46+
47+
48+ def parallel_download (tasks : list [tuple [str , Path ]], max_workers :int ) -> list [Path ]:
49+ """Download and decompress *tasks* = [(url, dest_path), …] in parallel.
50+
51+ Uses a ThreadPoolExecutor with MAX_WORKERS threads. Progress is printed
52+ as each file completes (thread-safe). Returns a sorted list of paths.
53+ Raises RuntimeError if any download fails.
54+ """
55+ completed = 0
56+ results : list [Path ] = []
57+ errors : list [str ] = []
58+
59+ with ThreadPoolExecutor (max_workers = max_workers ) as executor :
60+ future_to_url = {
61+ executor .submit (download_and_decompress , url , dest ): url
62+ for url , dest in tasks
63+ }
64+
65+ for future in as_completed (future_to_url ):
66+ url = future_to_url [future ]
67+ fname = url .split ("/" )[- 1 ]
68+ try :
69+ path = future .result ()
70+ results .append (path )
71+ completed += 1
72+ except Exception as exc :
73+ errors .append (fname )
74+
75+ if errors :
76+ raise RuntimeError (
77+ f"{ len (errors )} download(s) failed:\n " + "\n " .join (errors )
78+ )
79+
80+ return sorted (results )
81+
82+
83+ def rename_for_opendrift (ds : xr .Dataset ) -> xr .Dataset :
84+ """Rename variables/coordinates so OpenDrift's GenericModelReader
85+ can auto-detect them.
86+
87+ OpenDrift expects:
88+ x_wind / y_wind (CF standard_name)
89+ longitude / latitude
90+ time
91+ """
92+
93+ # CF standard_name attributes
94+
95+ ds = ds .rename ({"valid_time" : "time" })
96+
97+ ds ["u10" ].attrs .update ({
98+ "standard_name" : "x_wind" ,
99+ "units" : "m s-1" ,
100+ "long_name" : "10 metre U wind component" ,
101+ })
102+
103+ ds ["v10" ].attrs .update ({
104+ "standard_name" : "y_wind" ,
105+ "units" : "m s-1" ,
106+ "long_name" : "10 metre V wind component" ,
107+ })
108+
109+ ds .attrs .update ({
110+ "Conventions" : "CF-1.7" ,
111+ "title" : "ICON-EU 10 m wind – DWD OpenData" ,
112+ "source" : "https://opendata.dwd.de/weather/nwp/icon-eu/" ,
113+ })
114+
115+ return ds
116+
117+ def merge_netCDF (big_file :str , new_ds , cutoff_hours = None ):
118+ big_ds = xr .open_dataset (big_file )
119+
120+ # Remove overlapping times from big_ds
121+ ds1_no_overlap = big_ds .sel (time = ~ big_ds .time .isin (new_ds .time ))
122+
123+ # Concatenate along time
124+ big_ds = xr .concat ([ds1_no_overlap , new_ds ], dim = "time" )
125+ big_ds = big_ds .sortby ("time" )
126+
127+ now = np .datetime64 ("now" )
128+ cutoff_time = now - np .timedelta64 (cutoff_hours , "h" )
129+
130+ big_ds = big_ds .sel (time = big_ds .time >= cutoff_time )
131+ big_ds .to_netcdf (f"{ big_file } .PART" )
132+ os .remove (big_file )
133+ os .rename (f"{ big_file } .PART" , big_file )
134+
135+ def load_previous_downloads (file : str ):
136+ if os .path .exists (file ):
137+ with open (file , "r" ) as f :
138+ last_downloads = json .load (f )
139+ else :
140+ last_downloads = {''
141+ '00' :[],
142+ '03' :[],
143+ '06' :[],
144+ '09' :[],
145+ '12' :[],
146+ '15' :[],
147+ '18' :[],
148+ '21' :[]
149+ }
150+ return last_downloads
151+
152+
153+ # ────────────────────────────────── Main ─────────────────────────────────── #
154+
155+ def download_and_merge (last_downloads_file , max_workers : int , output_file : str = None , cutoff_hours : int = 120 ):
156+
157+ last_downloads = load_previous_downloads (last_downloads_file )
158+
159+ for frt in last_downloads .keys (): #frt = forecast time
160+ print (f"looking up { frt } " )
161+ BASE_URL_U = f"https://opendata.dwd.de/weather/nwp/icon-eu/grib/{ frt } /u_10m/"
162+ BASE_URL_V = f"https://opendata.dwd.de/weather/nwp/icon-eu/grib/{ frt } /v_10m/"
163+
164+ # Download and merge U and V components
165+ print ("Listing available files …" )
166+ u_files = list_bz2_files (BASE_URL_U )
167+ v_files = list_bz2_files (BASE_URL_V )
168+
169+ if u_files is None or v_files is None :
170+ print ("No files found." )
171+ continue
172+
173+ if sorted ([* u_files ,* v_files ]) == last_downloads [frt ]:
174+ print (f"{ frt } already downloaded." )
175+ continue
176+
177+ last_downloads [frt ] = sorted ([* u_files ,* v_files ])
178+
179+ print (f" U files: { len (u_files )} V files: { len (v_files )} " )
180+ print (f"\n Downloading in parallel with { max_workers } workers" )
181+
182+ with tempfile .TemporaryDirectory () as tmpdir :
183+ tmp = Path (tmpdir )
184+
185+ # Build task lists: (url, destination_path)
186+ u_tasks = [(BASE_URL_U + f , tmp / f .replace (".bz2" , "" )) for f in u_files ]
187+ v_tasks = [(BASE_URL_V + f , tmp / f .replace (".bz2" , "" )) for f in v_files ]
188+
189+ u_gribs = parallel_download ( u_tasks , max_workers = max_workers )
190+ v_gribs = parallel_download ( v_tasks , max_workers = max_workers )
191+
192+ print ("\n Loading GRIB2 files into xarray …" )
193+ ds_u = xr .open_mfdataset (
194+ u_gribs ,
195+ engine = "cfgrib" ,
196+ combine = "nested" ,
197+ concat_dim = "valid_time" ,
198+ coords = 'different' ,
199+ compat = 'no_conflicts' ,
200+ join = "outer" ,
201+ parallel = True ,
202+ backend_kwargs = {"errors" : "ignore" },
203+ )
204+ ds_v = xr .open_mfdataset (
205+ v_gribs ,
206+ engine = "cfgrib" ,
207+ combine = "nested" ,
208+ concat_dim = "valid_time" ,
209+ coords = 'different' ,
210+ compat = 'no_conflicts' ,
211+ join = "outer" ,
212+ parallel = True ,
213+ backend_kwargs = {"errors" : "ignore" },
214+ )
215+
216+ # Drop scalar coords that differ between timesteps to avoid merge conflicts
217+ keep = {"valid_time" , "latitude" , "longitude" }
218+ ds_u = ds_u .drop_vars ([c for c in ds_u .coords if c not in keep ], errors = "ignore" )
219+ ds_v = ds_v .drop_vars ([c for c in ds_v .coords if c not in keep ], errors = "ignore" )
220+
221+ ds = xr .merge ([ds_u , ds_v ],join = "outer" )
222+
223+ print ("\n Renaming variables for OpenDrift compatibility …" )
224+ ds = rename_for_opendrift (ds )
225+
226+ if output_file :
227+ if os .path .exists (output_file ):
228+ merge_netCDF (output_file , ds , cutoff_hours )
229+ else :
230+ ds .to_netcdf (output_file )
231+ with open (last_downloads_file , "w" ) as f :
232+ json .dump (last_downloads , f )
233+ print ("exiting temp file." )
234+ print ("\n Done." )
235+
236+
237+
238+ if __name__ == "__main__" :
239+ arg_parser = argparse .ArgumentParser (
240+ description = "Download ICON-EU 10 m wind from DWD OpenData" ,
241+ )
242+
243+ arg_parser .add_argument (
244+ "--max_workers" ,
245+ type = int ,
246+ default = 8 ,
247+ help = "Number of parallel download workers" ,
248+ )
249+
250+ arg_parser .add_argument (
251+ "--output" ,
252+ type = str ,
253+ default = 'ICON_EU_120.nc' ,
254+ help = "Existing netCDF file" ,
255+ )
256+
257+ arg_parser .add_argument (
258+ "--last_downloads" ,
259+ type = str ,
260+ default = "last_downloads.json" ,
261+ help = "Last downloaded files" ,
262+ )
263+
264+ arg_parser .add_argument (
265+ "--cutoff_hours" ,
266+ type = int ,
267+ default = 120 ,
268+ help = "data that is older than cutoff hours will be deleted" ,
269+ )
270+
271+ args = arg_parser .parse_args ()
272+
273+ download_and_merge (max_workers = args .max_workers ,
274+ output_file = args .output ,
275+ last_downloads_file = args .last_downloads ,
276+ cutoff_hours = args .cutoff_hours )
0 commit comments