Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions pyomo/contrib/mpc/data/interpolation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# ___________________________________________________________________________
#
# Pyomo: Python Optimization Modeling Objects
# Copyright (c) 2008-2025
# National Technology and Engineering Solutions of Sandia, LLC
# Under the terms of Contract DE-NA0003525 with National Technology and
# Engineering Solutions of Sandia, LLC, the U.S. Government retains certain
# rights in this software.
# This software is distributed under the 3-clause BSD License.
# ___________________________________________________________________________

from pyomo.common.dependencies import numpy as np


def get_time_index_vec(time_set, time_data):
"""Get the position index of time_data above and below the times in
time_set. This can be used to find positions of points to interpolate
between.
Parameters
----------
time_set: iterable
Time points to locate
time_data: iterable
Sorted time points to locate time_set in
Returns
-------
numpy.array
Position index of the first time in time_data greater than the
corresponding points time_set. If a time is less than all the times
in time_data return 1. If a time is greater than all times time_data
set return the last index of time_data.
"""
pos = np.searchsorted(time_data, time_set)
pos[pos == 0] = 1
pos[pos == len(time_data)] = len(time_data) - 1
return pos


def get_interp_expr_vec(time_set, time_data, data, indexes=None):
"""Depending of the data types contained in data, return a list of
Pyomo expression or float interpolated data at times in time_set.
Parameters
----------
time_set: iterable
Time points to locate
time_data: iterable
Sorted time points to locate time_set in
data: iterable
Data corresponding to times in time_data, must have the same
length as time data.
Returns
-------
list
If data are Pyomo components, this will return Pyomo expressions
interpolation if data are floats, this will return floats.
"""
if indexes is None:
indexes = get_surround_time_vec(time_set, time_data)
expr = [None] * len(time_set)
for i, (l, h, t) in enumerate(zip(indexes - 1, indexes, time_set)):
expr[i] = data[l] + (data[h] - data[l]) / (time_data[h] - time_data[l]) * (
t - time_data[l]
)
return expr
18 changes: 16 additions & 2 deletions pyomo/contrib/mpc/data/series_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from pyomo.contrib.mpc.data.get_cuid import get_indexed_cuid
from pyomo.contrib.mpc.data.dynamic_data_base import _is_iterable, _DynamicDataBase
from pyomo.contrib.mpc.data.scalar_data import ScalarData

from pyomo.contrib.mpc.data.interpolation import get_time_index_vec, get_interp_expr_vec

TimeSeriesTuple = namedtuple("TimeSeriesTuple", ["data", "time"])

Expand Down Expand Up @@ -104,7 +104,7 @@ def get_data_at_time_indices(self, indices):
{cuid: values[indices] for cuid, values in self._data.items()}
)

def get_data_at_time(self, time=None, tolerance=0.0):
def get_data_at_time(self, time=None, tolerance=0.0, interpolate=False):
"""
Returns the data associated with the provided time point or points.
This function attempts to map time points to indices, then uses
Expand Down Expand Up @@ -134,6 +134,20 @@ def get_data_at_time(self, time=None, tolerance=0.0):
# return self.
return self
is_iterable = _is_iterable(time)
if interpolate:
if not is_iterable:
time = [time]
idxs = get_time_index_vec(time, self._time)
data = {}
for cuid in self._data:
v = get_interp_expr_vec(time, self._time, self._data[cuid], idxs)
data[cuid] = v
if is_iterable:
return TimeSeriesData(data, list(time))
else:
for cuid in self._data:
data[cuid] = data[cuid][0]
return ScalarData(data)
time_iter = iter(time) if is_iterable else (time,)
indices = []
# Allocate indices list dynamically to support a general iterator
Expand Down
20 changes: 20 additions & 0 deletions pyomo/contrib/mpc/data/tests/test_series_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,26 @@ def test_get_data_at_time_with_tolerance(self):
with self.assertRaisesRegex(RuntimeError, msg):
new_data = data.get_data_at_time(-0.01, tolerance=1e-3)

def test_get_data_at_time_with_interpolate(self):
m = self._make_model()
data_dict = {m.var[:, "A"]: [1, 2, 3], m.var[:, "B"]: [2, 4, 6]}
data = TimeSeriesData(data_dict, m.time)
new_data = data.get_data_at_time(0.05, interpolate=True)
self.assertEqual(ScalarData({m.var[:, "A"]: 1.5, m.var[:, "B"]: 3}), new_data)

t1 = 0.05
new_data = data.get_data_at_time([t1], interpolate=True)
self.assertEqual(
TimeSeriesData({m.var[:, "A"]: [1.5], m.var[:, "B"]: [3]}, [t1]), new_data
)

new_t = [0.05, 0.15]
new_data = data.get_data_at_time(new_t, interpolate=True)
self.assertEqual(
TimeSeriesData({m.var[:, "A"]: [1.5, 2.5], m.var[:, "B"]: [3, 5]}, new_t),
new_data,
)

def test_to_serializable(self):
m = self._make_model()
data_dict = {m.var[:, "A"]: [1, 2, 3], m.var[:, "B"]: [2, 4, 6]}
Expand Down
Loading