Skip to content

refactor: increased code sharing between CPU and GPU interpretation in RNTuple reading #1470

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 39 additions & 221 deletions src/uproot/behaviors/RNTuple.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ def concatenate(
entry_stop=None,
decompression_executor=None, # TODO: Not implemented yet
library="ak", # TODO: Not implemented yet
backend="cpu",
interpreter="cpu",
ak_add_doc=False,
how=None,
allow_missing=False,
Expand Down Expand Up @@ -382,6 +384,8 @@ def concatenate(
decompression_executor=decompression_executor,
array_cache=None,
library=library,
backend=backend,
interpreter=interpreter,
ak_add_doc=ak_add_doc,
how=how,
filter_branch=filter_branch,
Expand Down Expand Up @@ -574,8 +578,8 @@ def arrays(
decompression_executor=None, # TODO: Not implemented yet
array_cache="inherit", # TODO: Not implemented yet
library="ak", # TODO: Not implemented yet
backend="cpu", # TODO: Not Implemented yet
use_GDS=False,
backend="cpu",
interpreter="cpu",
ak_add_doc=False,
how=None,
# For compatibility reasons we also accepts kwargs meant for TTrees
Expand Down Expand Up @@ -621,10 +625,9 @@ def arrays(
that is used to represent arrays. Options are ``"np"`` for NumPy,
``"ak"`` for Awkward Array, and ``"pd"`` for Pandas. (Not implemented yet.)
backend (str): The backend Awkward Array will use.
use_GDS (bool): If True and ``backend="cuda"`` will use kvikIO bindings
to CuFile to provide direct memory access (DMA) transfers between GPU
memory and storage. KvikIO bindings to nvcomp decompress data
buffers.
interpreter (str): If "cpu" will use cpu to interpret raw data. If "gpu" and
``backend="cuda"`` will use KvikIO bindings to CuFile and nvCOMP to
interpret raw data on gpu if available.
ak_add_doc (bool | dict ): If True and ``library="ak"``, add the RField ``description``
to the Awkward ``__doc__`` parameter of the array.
if dict = {key:value} and ``library="ak"``, add the RField ``value`` to the
Expand Down Expand Up @@ -656,93 +659,7 @@ def arrays(
See also :ref:`uproot.behaviors.RNTuple.HasFields.iterate` to iterate over
the array in contiguous ranges of entries.
"""
if not use_GDS:
return self._arrays(
expressions,
cut,
filter_name=filter_name,
filter_typename=filter_typename,
filter_field=filter_field,
aliases=aliases, # TODO: Not implemented yet
language=language, # TODO: Not implemented yet
entry_start=entry_start,
entry_stop=entry_stop,
decompression_executor=decompression_executor, # TODO: Not implemented yet
array_cache=array_cache, # TODO: Not implemented yet
library=library, # TODO: Not implemented yet
backend=backend, # TODO: Not Implemented yet
ak_add_doc=ak_add_doc,
how=how,
# For compatibility reasons we also accepts kwargs meant for TTrees
interpretation_executor=interpretation_executor,
filter_branch=filter_branch,
)

elif use_GDS and backend == "cuda":
return self._arrays_GDS(
expressions,
cut,
filter_name=filter_name,
filter_typename=filter_typename,
filter_field=filter_field,
aliases=aliases, # TODO: Not implemented yet
language=language, # TODO: Not implemented yet
entry_start=entry_start,
entry_stop=entry_stop,
decompression_executor=decompression_executor, # TODO: Not implemented yet
array_cache=array_cache, # TODO: Not implemented yet
library=library, # TODO: Not implemented yet
backend=backend, # TODO: Not Implemented yet
ak_add_doc=ak_add_doc,
how=how,
# For compatibility reasons we also accepts kwargs meant for TTrees
interpretation_executor=interpretation_executor,
filter_branch=filter_branch,
)

elif use_GDS and backend != "cuda":
raise NotImplementedError(
f"Backend {backend} GDS support not implemented.".format(backend)
)

def _arrays(
self,
expressions=None, # TODO: Not implemented yet
cut=None, # TODO: Not implemented yet
*,
filter_name=no_filter,
filter_typename=no_filter,
filter_field=no_filter,
aliases=None, # TODO: Not implemented yet
language=uproot.language.python.python_language, # TODO: Not implemented yet
entry_start=None,
entry_stop=None,
decompression_executor=None, # TODO: Not implemented yet
array_cache="inherit", # TODO: Not implemented yet
library="ak", # TODO: Not implemented yet
backend="cpu", # TODO: Not Implemented yet
ak_add_doc=False,
how=None,
# For compatibility reasons we also accepts kwargs meant for TTrees
interpretation_executor=None,
filter_branch=unset,
):
"""
Returns a group of arrays from the ``RNTuple``.

For example:

.. code-block:: python

>>> my_ntuple.arrays()
<Array [{my_vector: [1, 2]}, {...}] type='2 * {my_vector: var * int64}'>

See also :ref:`uproot.behaviors.RNTuple.HasFields.array` to read a single
``RField`` as an array.

See also :ref:`uproot.behaviors.RNTuple.HasFields.iterate` to iterate over
the array in contiguous ranges of entries.
"""
# This temporarily provides basic functionality while expressions are properly implemented
if expressions is not None:
if filter_name == no_filter:
Expand Down Expand Up @@ -782,18 +699,34 @@ def _arrays(
container_dict = {}
_recursive_find(form, target_cols)

if interpreter == "gpu" and backend == "cuda":
clusters_datas = self.ntuple.gpu_read_clusters(
target_cols, start_cluster_idx, stop_cluster_idx
)
clusters_datas._decompress()
content_dict = self.ntuple.gpu_deserialize_decompressed_content(
clusters_datas,
start_cluster_idx,
stop_cluster_idx,
pad_missing_element=True,
)

for key in target_cols:
if "column" in key and "union" not in key:
key_nr = int(key.split("-")[1])

if interpreter == "cpu":
content = self.ntuple.read_col_pages(
key_nr,
range(start_cluster_idx, stop_cluster_idx),
pad_missing_element=True,
)
elif interpreter == "gpu" and backend == "cuda":
content = content_dict[key_nr]
elif interpreter == "gpu":
raise NotImplementedError(
f"Backend {backend} GDS support not implemented."
)
dtype_byte = self.ntuple.column_records[key_nr].type

content = self.ntuple.read_col_pages(
key_nr,
range(start_cluster_idx, stop_cluster_idx),
dtype_byte=dtype_byte,
pad_missing_element=True,
)
_fill_container_dict(container_dict, content, key, dtype_byte)

cluster_offset = cluster_starts[start_cluster_idx]
Expand All @@ -804,6 +737,7 @@ def _arrays(
cluster_num_entries,
container_dict,
allow_noncanonical_form=True,
backend="cuda" if interpreter == "gpu" and backend == "cuda" else "cpu",
)[entry_start:entry_stop]

arrays = uproot.extras.awkward().to_backend(arrays, backend=backend)
Expand All @@ -825,126 +759,6 @@ def _arrays(
raise ValueError(
f"unrecognized 'how' parameter: {how}. Options are None, tuple, list and dict."
)

return arrays

def _arrays_GDS(
self,
expressions=None, # TODO: Not implemented yet
cut=None, # TODO: Not implemented yet
*,
filter_name=no_filter,
filter_typename=no_filter,
filter_field=no_filter,
aliases=None, # TODO: Not implemented yet
language=uproot.language.python.python_language, # TODO: Not implemented yet
entry_start=None,
entry_stop=None,
decompression_executor=None, # TODO: Not implemented yet
array_cache="inherit", # TODO: Not implemented yet
library="ak", # TODO: Not implemented yet
backend="cuda", # TODO: Not Implemented yet
ak_add_doc=False,
how=None,
# For compatibility reasons we also accepts kwargs meant for TTrees
interpretation_executor=None,
filter_branch=unset,
):
"""
Current GDS support is limited to nvidia GPUs. The python library kvikIO is
a required dependency for Uproot GDS reading which can be installed by
calling pip install uproot[GDS_cuX] where X corresponds to the major cuda
version available on the user's system.

Returns a group of arrays from the ``RNTuple``.

For example:

.. code-block:: python

>>> my_ntuple.arrays(useGDS = True, backend = "cuda")
<Array [{my_vector: [1, 2]}, {...}] type='2 * {my_vector: var * int64}'>


"""
# This temporarily provides basic functionality while expressions are properly implemented
if expressions is not None:
if filter_name == no_filter:
filter_name = expressions
else:
raise ValueError(
"Expressions are not supported yet. They are currently equivalent to filter_name."
)

#####
# Find clusters to read that contain data from entry_start to entry_stop
entry_start, entry_stop = (
uproot.behaviors.TBranch._regularize_entries_start_stop(
self.num_entries, entry_start, entry_stop
)
)
clusters = self.ntuple.cluster_summaries
cluster_starts = numpy.array([c.num_first_entry for c in clusters])
start_cluster_idx = (
numpy.searchsorted(cluster_starts, entry_start, side="right") - 1
)
stop_cluster_idx = numpy.searchsorted(cluster_starts, entry_stop, side="right")
cluster_num_entries = numpy.sum(
[c.num_entries for c in clusters[start_cluster_idx:stop_cluster_idx]]
)

# Get form for requested columns
form = self.to_akform(
filter_name=filter_name,
filter_typename=filter_typename,
filter_field=filter_field,
filter_branch=filter_branch,
)

# Only read columns mentioned in the awkward form
target_cols = []
container_dict = {}

_recursive_find(form, target_cols)

#####
# Read and decompress all columns' data
clusters_datas = self.ntuple.gpu_read_clusters(
target_cols, start_cluster_idx, stop_cluster_idx
)
clusters_datas._decompress()
#####
# Deserialize decompressed datas
content_dict = self.ntuple.gpu_deserialize_decompressed_content(
clusters_datas, start_cluster_idx, stop_cluster_idx
)
#####
# Reconstitute arrays to an awkward array
container_dict = {}
# Debugging
for key in target_cols:
if "column" in key and "union" not in key:
key_nr = int(key.split("-")[1])

dtype_byte = self.ntuple.column_records[key_nr].type
content = content_dict[key_nr]
_fill_container_dict(container_dict, content, key, dtype_byte)

cluster_offset = cluster_starts[start_cluster_idx]
entry_start -= cluster_offset
entry_stop -= cluster_offset

arrays = uproot.extras.awkward().from_buffers(
form,
cluster_num_entries,
container_dict,
allow_noncanonical_form=True,
backend="cuda",
)[entry_start:entry_stop]

# Free memory
del content_dict, container_dict, clusters_datas

return arrays

def __array__(self, *args, **kwargs):
Expand Down Expand Up @@ -972,6 +786,8 @@ def iterate(
step_size="100 MB",
decompression_executor=None, # TODO: Not implemented yet
library="ak", # TODO: Not implemented yet
backend="cpu",
interpreter="cpu",
ak_add_doc=False,
how=None,
report=False, # TODO: Not implemented yet
Expand Down Expand Up @@ -1082,13 +898,15 @@ def iterate(
)
# TODO: This can be done more efficiently
for start in range(0, self.num_entries, step_size):
yield self._arrays(
yield self.arrays(
filter_name=filter_name,
filter_typename=filter_typename,
filter_field=filter_field,
entry_start=start,
entry_stop=start + step_size,
library=library,
backend=backend,
interpreter=interpreter,
how=how,
filter_branch=filter_branch,
)
Expand Down
Loading
Loading