Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions pyomo/contrib/mpc/data/interpolation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# ___________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2025
# National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and
# Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
# rights in this software.
# This software is distributed under the 3-clause BSD License.
# ___________________________________________________________________________

from bisect import bisect_right


def _get_time_index_vec(time_set, time_data):
"""Get the position index of time_data above and below the times in
time_set. This can be used to find positions of points to interpolate
between.

Parameters
----------
time_set: iterable
Time points to locate
time_data: iterable
Sorted time points to locate time_set in

Returns
-------
numpy.array
Position index of the first time in time_data greater than the
corresponding points time_set. If a time is less than all the times
in time_data return 1. If a time is greater than all times time_data
set return the last index of time_data.
"""
pos = [None] * len(time_set)
for i, t in enumerate(time_set):
pos[i] = bisect_right(time_data, t)
if pos[i] == 0:
pos[i] = 1
elif pos[i] == len(time_data):
pos[i] = len(time_data) - 1
return pos


def _get_interp_expr_vec(time_set, time_data, data, indexes=None):
"""Return an array of floats interpolated at the time points in time_set
from data defined at time_data.

Parameters
----------
time_set: iterable
Time points to locate
time_data: iterable
Sorted time points to locate time_set in
data: iterable
Data corresponding to times in time_data, must have the same
length as time data.
indexes: numpy.array
Numpy array of position indexes of the time points to interpolate in the
time data. The format is the same as returned by ``_get_time_index_vec()``.
If this is None, ``_get_time_index_vec()`` is called. The reason to pass
this is to avoid multiple position searches when interpolating multiple
outputs with the same time points.

Returns
-------
list
If data are Pyomo components, this will return Pyomo expressions.
If data are floats, this will return floats.
"""
if indexes is None:
indexes = _get_time_index_vec(time_set, time_data)

Check warning on line 72 in pyomo/contrib/mpc/data/interpolation.py

View check run for this annotation

Codecov / codecov/patch

pyomo/contrib/mpc/data/interpolation.py#L72

Added line #L72 was not covered by tests
expr = [None] * len(time_set)
for i, (h, t) in enumerate(zip(indexes, time_set)):
l = h - 1
expr[i] = data[l] + (data[h] - data[l]) / (time_data[h] - time_data[l]) * (
t - time_data[l]
)
return expr
49 changes: 48 additions & 1 deletion pyomo/contrib/mpc/data/series_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
from pyomo.contrib.mpc.data.get_cuid import get_indexed_cuid
from pyomo.contrib.mpc.data.dynamic_data_base import _is_iterable, _DynamicDataBase
from pyomo.contrib.mpc.data.scalar_data import ScalarData

from pyomo.contrib.mpc.data.interpolation import (
_get_time_index_vec,
_get_interp_expr_vec,
)

TimeSeriesTuple = namedtuple("TimeSeriesTuple", ["data", "time"])

Expand Down Expand Up @@ -152,6 +155,50 @@
indices = indices[0]
return self.get_data_at_time_indices(indices)

def get_interpolated_data(self, time=None, tolerance=0.0):
"""
Returns the data associated with the provided time point or points by
linear interpolation.

Parameters
----------
time: Float or iterable
The time point or points corresponding to returned data.
tolerance: float
Tolerance used when checking if time points are inside the data
range.

Returns
-------
TimeSeriesData or ~scalar_data.ScalarData
TimeSeriesData containing only the specified time points
or dict mapping CUIDs to values at the specified scalar time
point.

"""
if time is None:
# If time is not specified, assume we want the entire time
# set. Skip all the overhead, don't create a new object, and
# return self.
return self

Check warning on line 183 in pyomo/contrib/mpc/data/series_data.py

View check run for this annotation

Codecov / codecov/patch

pyomo/contrib/mpc/data/series_data.py#L183

Added line #L183 was not covered by tests
is_iterable = _is_iterable(time)
if not is_iterable:
time = [time]
for t in time:
if t > self._time[-1] + tolerance or t < self._time[0] - tolerance:
raise RuntimeError("Requesting interpolation outside data range.")
idxs = _get_time_index_vec(time, self._time)
data = {}
for cuid in self._data:
v = _get_interp_expr_vec(time, self._time, self._data[cuid], idxs)
data[cuid] = v
if is_iterable:
return TimeSeriesData(data, list(time))
else:
for cuid in self._data:
data[cuid] = data[cuid][0]
return ScalarData(data)

def to_serializable(self):
"""
Convert to json-serializable object.
Expand Down
57 changes: 57 additions & 0 deletions pyomo/contrib/mpc/data/tests/test_series_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,63 @@ def test_get_data_at_time_with_tolerance(self):
with self.assertRaisesRegex(RuntimeError, msg):
new_data = data.get_data_at_time(-0.01, tolerance=1e-3)

def test_get_data_interpolate(self):
m = self._make_model()
data_dict = {m.var[:, "A"]: [1, 2, 3], m.var[:, "B"]: [2, 4, 6]}
data = TimeSeriesData(data_dict, m.time)
new_data = data.get_interpolated_data(0.05)
self.assertEqual(ScalarData({m.var[:, "A"]: 1.5, m.var[:, "B"]: 3}), new_data)

t1 = 0.05
new_data = data.get_interpolated_data([t1])
self.assertEqual(
TimeSeriesData({m.var[:, "A"]: [1.5], m.var[:, "B"]: [3]}, [t1]), new_data
)

new_t = [0.05, 0.15]
new_data = data.get_interpolated_data(new_t)
self.assertEqual(
TimeSeriesData({m.var[:, "A"]: [1.5, 2.5], m.var[:, "B"]: [3, 5]}, new_t),
new_data,
)

def test_get_data_interpolate_range_check(self):
m = self._make_model()
data_dict = {m.var[:, "A"]: [1, 2, 3], m.var[:, "B"]: [2, 4, 6]}
data = TimeSeriesData(data_dict, m.time)
msg = "Requesting interpolation outside data range."
with self.assertRaisesRegex(RuntimeError, msg):
new_data = data.get_interpolated_data(0.2 + 1e-6)
with self.assertRaisesRegex(RuntimeError, msg):
new_data = data.get_interpolated_data(0.0 - 1e-6)
new_data = data.get_interpolated_data(0.2 + 1e-6, tolerance=1e-5)
self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "A"]), 3, 4)
self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "B"]), 6, 4)

t1 = 0.2 + 1e-6
with self.assertRaisesRegex(RuntimeError, msg):
new_data = data.get_interpolated_data([t1])
new_data = data.get_interpolated_data([t1], tolerance=1e-5)
self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "A"])[0], 3, 4)
self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "B"])[0], 6, 4)

new_t = [0.0 - 1e-6, 0.2 + 1e-6]
with self.assertRaisesRegex(RuntimeError, msg):
new_data = data.get_interpolated_data(new_t)
new_data = data.get_interpolated_data(new_t, tolerance=1e-5)
self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "A"])[0], 1, 4)
self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "B"])[0], 2, 4)
self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "A"])[1], 3, 4)
self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "B"])[1], 6, 4)

# check that the exact endpoints don't raise an exception with 0 tol
new_t = [0.0, 0.2]
new_data = data.get_interpolated_data(new_t, tolerance=0.0)
self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "A"])[0], 1, 4)
self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "B"])[0], 2, 4)
self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "A"])[1], 3, 4)
self.assertAlmostEqual(new_data.get_data_from_key(m.var[:, "B"])[1], 6, 4)

def test_to_serializable(self):
m = self._make_model()
data_dict = {m.var[:, "A"]: [1, 2, 3], m.var[:, "B"]: [2, 4, 6]}
Expand Down
Loading