Skip to content

Commit b66d834

Browse files
authored
Merge pull request #9 from lincc-frameworks/downloader-bandwidth-measure
Adding bandwith measurement to downloadCutout
2 parents 827babf + db26056 commit b66d834

File tree

3 files changed

+228
-38
lines changed

3 files changed

+228
-38
lines changed

src/fibad/download.py

Lines changed: 173 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import contextlib
2+
import datetime
23
import os
4+
import urllib.request
35
from pathlib import Path
46
from typing import Union
57

@@ -20,8 +22,12 @@
2022
def working_directory(path: Path):
2123
"""
2224
Context Manager to change our working directory.
23-
2425
Supports downloadCutouts which always writes to cwd.
26+
27+
Parameters
28+
----------
29+
path : Path
30+
Path that we change `Path.cwd()` while we are active.
2531
"""
2632
old_cwd = Path.cwd()
2733
os.chdir(path)
@@ -34,58 +40,85 @@ def working_directory(path: Path):
3440
def run(args, config):
3541
"""
3642
Main entrypoint for downloading cutouts from HSC for use with fibad
43+
44+
Parameters
45+
----------
46+
args : list
47+
Command line arguments (unused)
48+
config : dict
49+
Runtime configuration, which is only read by this function
3750
"""
3851

3952
config = config.get("download", {})
4053

41-
print("Download command")
54+
print("Download command Start")
4255

56+
fits_file = config.get("fits_file", "")
57+
print(f"Reading in fits catalog: {fits_file}")
4358
# Filter the fits file for the fields we want
4459
column_names = ["object_id"] + variable_fields
45-
locations = filterfits(config.get("fits_file"), column_names)
60+
locations = filterfits(fits_file, column_names)
4661

47-
# Sort by tract, ra, dec to optimize speed that the cutout server can serve us
48-
#
49-
# TODO: See if this sort is performed by downloadCutouts
50-
# It appears downloadCutouts is doing some sorting prior to download, but
51-
# unclear if it is the same sort
52-
locations.sort(variable_fields)
62+
# TODO slice up the locations to multiplex across connections if necessary, but for now
63+
# we simply mask off a few
64+
offset = config.get("offset", 0)
65+
end = offset + config.get("num_sources", 10)
66+
locations = locations[offset:end]
5367

54-
# TODO slice up the locations
55-
locations = locations[0:10]
56-
57-
# make a list of rects
68+
# Make a list of rects to pass to downloadCutout
5869
rects = create_rects(locations, offset=0, default=rect_from_config(config))
5970

6071
# Configure global parameters for the downloader
6172
dC.set_max_connections(num=config.get("max_connections", 2))
6273

74+
print("Requesting cutouts")
6375
# pass the rects to the cutout downloader
6476
download_cutout_group(
6577
rects=rects, cutout_dir=config.get("cutout_dir"), user=config["username"], password=config["password"]
6678
)
6779

68-
print(locations)
80+
# print(locations)
81+
print("Done")
6982

7083

7184
# TODO add error checking
7285
def filterfits(filename: str, column_names: list[str]) -> Table:
73-
"""
74-
Read a fits file with the required column names for making cutouts
86+
"""Read a fits file with the required column names for making cutouts
7587
76-
Returns an astropy table containing only the necessary fields
7788
78-
The easiest way to make one of these is to select from the main HSC catalog
89+
90+
The easiest way to make such a fits file is to select from the main HSC catalog
91+
92+
Parameters
93+
----------
94+
filename : str
95+
The fits file to read in
96+
column_names : list[str]
97+
The columns that are filtered out
98+
99+
Returns
100+
-------
101+
Table
102+
Returns an astropy table containing only the fields specified in column_names
79103
"""
80104
t = Table.read(filename)
81105
columns = [t[column] for column in column_names]
82106
return hstack(columns, uniq_col_name="{table_name}", table_names=column_names)
83107

84108

85109
def rect_from_config(config: dict) -> dC.Rect:
86-
"""
87-
Takes our Download config and loads cutout config
110+
"""Takes our runtime config and loads cutout config
88111
common to all cutouts into a prototypical Rect for downloading
112+
113+
Parameters
114+
----------
115+
config : dict
116+
Runtime config, only the download section
117+
118+
Returns
119+
-------
120+
dC.Rect
121+
A single rectangle with fields `sw`, `sh`, `filter`, `rerun`, and `type` populated from the config
89122
"""
90123
return dC.Rect.create(
91124
sw=config["sw"],
@@ -97,15 +130,32 @@ def rect_from_config(config: dict) -> dC.Rect:
97130

98131

99132
def create_rects(locations: Table, offset: int = 0, default: dC.Rect = None) -> list[dC.Rect]:
100-
"""
101-
Create the rects we will need to pass to the downloader.
133+
"""Create the rects we will need to pass to the downloader.
102134
One Rect per location in our list of sky locations.
103135
104136
Rects are created with all fields in the default rect pre-filled
105137
106138
Offset here is to allow multiple downloads on different sections of the source list
107139
without file name clobbering during the download phase. The offset is intended to be
108140
the index of the start of the locations table within some larger fits file.
141+
142+
Parameters
143+
----------
144+
locations : Table
145+
Table containing ra, dec locations in the sky
146+
offset : int, optional
147+
Index to start the `lineno` field in the rects at, by default 0. The purpose of this is to allow
148+
multiple downloads on different sections of a larger source list without file name clobbering during
149+
the download phase. This is important because `lineno` in a rect can becomes a file name parameter
150+
The offset is intended to be the index of the start of the locations table within some larger fits
151+
file.
152+
default : dC.Rect, optional
153+
The default Rect that contains properties common to all sky locations, by default None
154+
155+
Returns
156+
-------
157+
list[dC.Rect]
158+
Rects populated with sky locations from the table
109159
"""
110160
rects = []
111161
for index, location in enumerate(locations):
@@ -118,11 +168,108 @@ def create_rects(locations: Table, offset: int = 0, default: dC.Rect = None) ->
118168
return rects
119169

120170

121-
def download_cutout_group(rects: list[dC.Rect], cutout_dir: Union[str, Path], user, password):
171+
stats = {
172+
"request_duration": datetime.timedelta(), # Time from request sent to first byte from the server
173+
"response_duration": datetime.timedelta(), # Total time spent recieving and processing a response
174+
"request_size_bytes": 0, # Total size of all requests
175+
"response_size_bytes": 0, # Total size of all responses
176+
"snapshots": 0, # Number of fits snapshots downloaded
177+
}
178+
179+
180+
def _stat_accumulate(name: str, value: Union[int, datetime.timedelta]):
181+
"""Accumulate a sum into the global stats dict
182+
183+
Parameters
184+
----------
185+
name : str
186+
Name of the stat. Assumed to exist in the dict already.
187+
value : Union[int, datetime.timedelta]
188+
How much time or count to add to the stat
189+
"""
190+
global stats
191+
stats[name] += value
192+
193+
194+
def _print_stats():
195+
"""Print the accumulated stats including bandwidth calculated from duration and sizes
196+
197+
This prints out multiple lines with `\r` at the end in order to create a continuously updating
198+
line of text during download if your terminal supports it.
199+
"""
200+
global stats
201+
202+
total_dur_s = (stats["request_duration"] + stats["response_duration"]).total_seconds()
203+
204+
resp_s = stats["response_duration"].total_seconds()
205+
down_rate_mb_s = (stats["response_size_bytes"] / (1024**2)) / resp_s
206+
207+
req_s = stats["request_duration"].total_seconds()
208+
up_rate_mb_s = (stats["request_size_bytes"] / (1024**2)) / req_s
209+
210+
snapshot_rate = stats["snapshots"] / total_dur_s
211+
212+
print(
213+
f"Stats: Duration: {total_dur_s:.2f} s, Files: {stats['snapshots']}, \
214+
Upload: {up_rate_mb_s:.2f} MB/s, Download: {down_rate_mb_s:.2f} MB/s File rate: {snapshot_rate:.2f} files/s",
215+
end="\r",
216+
flush=True,
217+
)
218+
219+
220+
def request_hook(
221+
request: urllib.request.Request,
222+
request_start: datetime.datetime,
223+
response_start: datetime.datetime,
224+
response_size: int,
225+
chunk_size: int,
226+
):
227+
"""This hook is called on each chunk of snapshots downloaded.
228+
It is called immediately after the server has finished responding to the
229+
request, so datetime.datetime.now() is the end moment of the request
230+
231+
Parameters
232+
----------
233+
request : urllib.request.Request
234+
The request object relevant to this call
235+
request_start : datetime.datetime
236+
The moment the request was handed off to urllib.request.urlopen()
237+
response_start : datetime.datetime
238+
The moment there were bytes from the server to process
239+
response_size : int
240+
The size of the response from the server in bytes
241+
chunk_size : int
242+
The number of cutout files recieved in this request
122243
"""
123-
Download cutouts to the given directory
124244

125-
Calls downloadCutout.download, so supports long lists of rects and
245+
now = datetime.datetime.now()
246+
247+
_stat_accumulate("request_duration", response_start - request_start)
248+
_stat_accumulate("response_duration", now - response_start)
249+
_stat_accumulate("request_size_bytes", len(request.data))
250+
_stat_accumulate("response_size_bytes", response_size)
251+
_stat_accumulate("snapshots", chunk_size)
252+
253+
_print_stats()
254+
255+
256+
def download_cutout_group(rects: list[dC.Rect], cutout_dir: Union[str, Path], user, password):
257+
"""Download cutouts to the given directory
258+
259+
Calls downloadCutout.download, so supports long lists of rects beyond the limits of the HSC API
260+
261+
Parameters
262+
----------
263+
rects : list[dC.Rect]
264+
The rects we would like to download
265+
cutout_dir : Union[str, Path]
266+
The directory to put the files
267+
user : _type_
268+
Username for HSC's download service to use
269+
password : _type_
270+
Password for HSC's download service to use
126271
"""
127272
with working_directory(Path(cutout_dir)):
128-
dC.download(rects, user=user, password=password, onmemory=False)
273+
dC.download(rects, user=user, password=password, onmemory=True, request_hook=request_hook)
274+
275+
print("") # Print a newline so the stats stay and look pretty.

0 commit comments

Comments
 (0)