Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions examples/hello/deferredRead/deferred_timing_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#
# Distributed under the OSI-approved Apache License, Version 2.0. See
# accompanying file Copyright.txt for details.
#
# Write multiple arrays to a file
# Compare times to
# Reading them back one by one
# Reading them back at once with Deferred read mode
#

import numpy as np
from adios2 import Stream, FileReader
from time import perf_counter as pc

# User data
Nx = 100
Ny = 1000
Nvars = 1000
fname = "deferred_test.bp"

count = [Nx, Ny]
start = [0, 0]
shape = [Nx, Ny]

with Stream(fname, "w") as outf:
data = np.random.rand(Nvars, Nx, Ny)
print(f"Write {Nvars} arrays of [{Nx}, {Ny}] shape")
twrite_start = pc()
for n in range(Nvars):
outf.write(f"data_{n:0>3}", data[n:n + 1, :, :].squeeze(), shape, start, count)
outf.write("N", [Nvars, Nx, Ny]) # will be an array in output
outf.write("Nx", np.array(Nx)) # will be a scalar in output
outf.write("Ny", Ny) # will be a scalar in output
outf.write("Nvars", Nvars) # will be a scalar in output
twrite_end = pc()
print(f"Time of writing: {twrite_end - twrite_start:0.6f}s")

with FileReader(fname) as inpf:
# scalar variables are read as a numpy array with 0 dimension
in_nx = inpf.read("Nx")
in_ny = inpf.read("Ny")
in_nvars = inpf.read("Nvars")
print(f"Incoming nx, ny, nvars = {in_nx}, {in_ny}, {in_nvars}")

tread_start = pc()
for n in range(Nvars):
var = inpf.inquire_variable(f"data_{n:0>3}")
if var is not None:
var.set_selection([start, shape])
data = inpf.read(var)
tread_end = pc()
print(f"Time of reading one by one (Sync mode): {tread_end - tread_start:0.6f}s")

with FileReader(fname) as inpf:
# scalar variables are read as a numpy array with 0 dimension
in_nx = inpf.read("Nx")
in_ny = inpf.read("Ny")
in_nvars = inpf.read("Nvars")
print(f"Incoming nx, ny, nvars = {in_nx}, {in_ny}, {in_nvars}")

in_data = np.random.rand(Nvars, Nx, Ny)

tread_start = pc()
for n in range(Nvars):
var = inpf.inquire_variable(f"data_{n:0>3}")
if var is not None:
var.set_selection([start, shape])
data = inpf.read_in_buffer(var, buffer=in_data[n:n + 1, :, :], defer_read=True)
inpf.read_complete()
tread_end = pc()
print(f"Time of reading at once (Deferred mode): {tread_end - tread_start:0.6f}s")
99 changes: 89 additions & 10 deletions python/adios2/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ def _(self, name, content, shape=[], start=[], count=[], operations=None):

self.write(variable, content)

def _read_var(self, variable: Variable):
def _read_var(self, variable: Variable, defer_read: bool = False):
"""
Internal function to read when there is no preallocated buffer submitted.
Settings must be done to Variable before the call.
Expand All @@ -384,6 +384,11 @@ def _read_var(self, variable: Variable):
adios2.Variable object to be read
Use variable.set_selection(), set_block_selection(), set_step_selection()
to prepare a read
defer_read
False: read now and blocking wait for completion (Sync mode)
True: defer reading all requests until read_complete().
The returned numpy array will be filled with data
only after calling read_complete().
Returns
array
resulting array from selection
Expand Down Expand Up @@ -420,8 +425,12 @@ def _read_var(self, variable: Variable):
else:
output_shape = []

mode = bindings.Mode.Sync
if defer_read:
mode = bindings.Mode.Deferred

output = np.zeros(output_shape, dtype=dtype)
self._engine.get(variable, output)
self._engine.get(variable, output, mode)
return output

def _set_variable_settings(self, variable, start, count, block_id, step_selection):
Expand Down Expand Up @@ -466,7 +475,14 @@ def _set_variable_settings(self, variable, start, count, block_id, step_selectio

@singledispatchmethod
def read_in_buffer(
self, variable: Variable, buffer, start=[], count=[], block_id=None, step_selection=None
self,
variable: Variable,
buffer,
start=[],
count=[],
block_id=None,
step_selection=None,
defer_read: bool = False,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised the format did not complained about this :)

Suggested change
defer_read: bool = False,
defer_read: bool=False,

):
"""
Read a variable into a preallocated buffer.
Expand Down Expand Up @@ -494,6 +510,12 @@ def read_in_buffer(

step_selection
(list): On the form of [start, count].

defer_read
False: read now and blocking wait for completion (Sync mode)
True: defer reading all requests until read_complete().
The returned numpy array will be filled with data
only after calling read_complete().
"""
variable = self._set_variable_settings(variable, start, count, block_id, step_selection)
# make sure the buffer is a mutable array
Expand Down Expand Up @@ -526,10 +548,23 @@ def read_in_buffer(
if count != buf_size:
raise RuntimeError("Read buffer size {buf_size} does not match variable size {count}")

self._engine.get(variable, buffer)
mode = bindings.Mode.Sync
if defer_read:
mode = bindings.Mode.Deferred

self._engine.get(variable, buffer, mode)

@read_in_buffer.register(str)
def _(self, name: str, buffer, start=[], count=[], block_id=None, step_selection=None):
def _(
self,
name: str,
buffer,
start=[],
count=[],
block_id=None,
step_selection=None,
defer_read: bool = False,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
defer_read: bool = False,
defer_read: bool=False,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flake8 does not like your perfectly reasonable suggestions.
python/adios2/stream.py:566: [E252] missing whitespace around parameter equals

):
"""
Read a variable into a preallocated buffer.
Random access read allowed to select steps.
Expand All @@ -554,15 +589,31 @@ def _(self, name: str, buffer, start=[], count=[], block_id=None, step_selection

step_selection
(list): On the form of [start, count].

defer_read
False: read now and blocking wait for completion (Sync mode)
True: defer reading all requests until read_complete().
The returned numpy array will be filled with data
only after calling read_complete().
"""
variable = self._io.inquire_variable(name)
if not variable:
raise ValueError()

self.read_in_buffer(variable, buffer, start, count, block_id, step_selection)
self.read_in_buffer(
variable, buffer, start, count, block_id, step_selection, defer_read=defer_read
)

@singledispatchmethod
def read(self, variable: Variable, start=[], count=[], block_id=None, step_selection=None):
def read(
self,
variable: Variable,
start=[],
count=[],
block_id=None,
step_selection=None,
defer_read: bool = False,
):
"""
Read a variable.
Random access read allowed to select steps.
Expand All @@ -585,6 +636,12 @@ def read(self, variable: Variable, start=[], count=[], block_id=None, step_selec

step_selection
(list): On the form of [start, count].

defer_read
False: read now and blocking wait for completion (Sync mode)
True: defer reading all requests until read_complete().
The returned numpy array will be filled with data
only after calling read_complete().
Returns
array
resulting array from selection
Expand All @@ -593,10 +650,18 @@ def read(self, variable: Variable, start=[], count=[], block_id=None, step_selec
variable = self._set_variable_settings(variable, start, count, block_id, step_selection)
if variable.type() == "string" and variable.single_value() is True:
return self._engine.get(variable)
return self._read_var(variable)
return self._read_var(variable, defer_read=defer_read)

@read.register(str)
def _(self, name: str, start=[], count=[], block_id=None, step_selection=None):
def _(
self,
name: str,
start=[],
count=[],
block_id=None,
step_selection=None,
defer_read: bool = False,
):
"""
Read a variable.
Random access read allowed to select steps.
Expand All @@ -617,6 +682,12 @@ def _(self, name: str, start=[], count=[], block_id=None, step_selection=None):

step_selection
(list): On the form of [start, count].

defer_read
False: read now and blocking wait for completion (Sync mode)
True: defer reading all requests until read_complete().
The returned numpy array will be filled with data
only after calling read_complete().
Returns
array
resulting array from selection
Expand All @@ -625,7 +696,15 @@ def _(self, name: str, start=[], count=[], block_id=None, step_selection=None):
if not variable:
raise ValueError()

return self.read(variable, start, count, block_id, step_selection)
return self.read(variable, start, count, block_id, step_selection, defer_read=defer_read)

def read_complete(self):
"""
Complete reading all deferred read requests.
The returned numpy arrays of each read(..., defer_read=True) will be
filled with data after this call.
"""
self._engine.perform_gets()

def write_attribute(self, name, content, variable_name="", separator="/"):
"""
Expand Down
Loading