Skip to content

Commit cc0598b

Browse files
authored
Remove thread-unsafe event logging code (#95)
This would occasionally cause errors because of dask/distributed#5552, and wasn't very useful anyway.
1 parent 2f3e348 commit cc0598b

File tree

1 file changed

+0
-37
lines changed

1 file changed

+0
-37
lines changed

stackstac/rio_reader.py

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
from __future__ import annotations
22

3-
import functools
43
import logging
54
import threading
65
import warnings
7-
import weakref
86
from typing import TYPE_CHECKING, Optional, Protocol, Tuple, Type, TypedDict, Union
97

108
import numpy as np
@@ -32,16 +30,6 @@ def _curthread():
3230
return threading.current_thread().name
3331

3432

35-
def log_event(topic: str, msg: dict) -> None:
36-
try:
37-
import distributed
38-
39-
worker = distributed.get_worker()
40-
except (ImportError, ValueError):
41-
return
42-
worker.log_event(topic, dict(msg, thread=_curthread()))
43-
44-
4533
# /TODO
4634

4735

@@ -203,8 +191,6 @@ def __init__(
203191
# but because `close` closes datasets across all threads by simply deleting the current threadlocal
204192
# and replacing it with an empty one, we have to synchronize all access to `self._threadlocal`.
205193

206-
log_event("create_ThreadLocalRioDataset", dict(url=self._url, vrt=bool(vrt)))
207-
208194
def _open(self) -> Union[SelfCleaningDatasetReader, WarpedVRT]:
209195
with self._env.open:
210196
with time(f"Reopen {self._url!r} in {_curthread()}: {{t}}"):
@@ -214,23 +200,16 @@ def _open(self) -> Union[SelfCleaningDatasetReader, WarpedVRT]:
214200
driver=self._driver,
215201
**self._open_options,
216202
)
217-
log_event("open_dataset", dict(url=self._url))
218203
if self._vrt_params:
219204
with self._env.open_vrt:
220205
result = vrt = WarpedVRT(ds, sharing=False, **self._vrt_params)
221-
log_event("open_vrt", dict(url=self._url))
222206
else:
223207
vrt = None
224208

225209
with self._lock:
226210
self._threadlocal.ds = ds
227211
self._threadlocal.vrt = vrt
228212

229-
weakref.ref(
230-
ds, functools.partial(log_event, "close_dataset", dict(url=self._url))
231-
)
232-
weakref.ref(vrt, functools.partial(log_event, "close_vrt", dict(url=self._url)))
233-
# NOTE: functools.partial to hopefully avoid taking a closure over `self`
234213
return result
235214

236215
@property
@@ -277,7 +256,6 @@ def close(self) -> None:
277256
# datasets.
278257
# NOTE: we're assuming here that closing a GDAL dataset from a thread other than the one that created
279258
# it is safe to do, which, knowing GDAL, is quite possibly untrue.
280-
log_event("close_ThreadLocalRioDataset", dict(url=self._url))
281259
with self._lock:
282260
self._threadlocal = threading.local()
283261

@@ -365,14 +343,6 @@ def _open(self) -> ThreadsafeRioDataset:
365343
"a separate STAC asset), so you'll need to exclude this asset from your analysis."
366344
)
367345

368-
log_event("open_dataset_initial", dict(url=self.url))
369-
weakref.ref(
370-
ds,
371-
functools.partial(
372-
log_event, "close_dataset_initial", dict(url=self.url)
373-
),
374-
)
375-
376346
# Only make a VRT if the dataset doesn't match the spatial spec we want
377347
if self.spec.vrt_params != {
378348
"crs": ds.crs.to_epsg(),
@@ -387,13 +357,6 @@ def _open(self) -> ThreadsafeRioDataset:
387357
resampling=self.resampling,
388358
**self.spec.vrt_params,
389359
)
390-
log_event("open_vrt_initial", dict(url=self.url))
391-
weakref.ref(
392-
vrt,
393-
functools.partial(
394-
log_event, "close_vrt_initial", dict(url=self.url)
395-
),
396-
)
397360
else:
398361
logger.info(f"Skipping VRT for {self.url!r}")
399362
vrt = None

0 commit comments

Comments
 (0)