Source code for braket.timings.time_series

# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
#     http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

from __future__ import annotations

from collections import OrderedDict
from collections.abc import Iterator
from dataclasses import dataclass
from decimal import Decimal
from enum import Enum
from numbers import Number
from typing import Optional


[docs] @dataclass class TimeSeriesItem: time: Number value: Number
[docs] class StitchBoundaryCondition(str, Enum): MEAN = "mean" LEFT = "left" RIGHT = "right"
[docs] class TimeSeries: def __init__(self): self._series = OrderedDict() self._sorted = True self._largest_time = -1
[docs] def put( self, time: Number, value: Number, ) -> TimeSeries: """Puts a value to the time series at the given time. A value passed to an existing time will overwrite the current value. Args: time (Number): The time of the value. value (Number): The value to add to the time series. Returns: TimeSeries: returns self (to allow for chaining). """ if time in self._series: self._series[time] = TimeSeriesItem(time, value) elif time > self._largest_time: self._series[time] = TimeSeriesItem(time, value) self._largest_time = time else: self._series[time] = TimeSeriesItem(time, value) self._sorted = False return self
[docs] def times(self) -> list[Number]: """Returns the times in the time series. Returns: list[Number]: The times in the time series. """ self._ensure_sorted() return [item.time for item in self._series.values()]
[docs] def values(self) -> list[Number]: """Returns the values in the time series. Returns: list[Number]: The values in the time series. """ self._ensure_sorted() return [item.value for item in self._series.values()]
def __iter__(self) -> Iterator: self._ensure_sorted() return self._series.values().__iter__() def __len__(self): return self._series.values().__len__() def _ensure_sorted(self) -> None: if not self._sorted: self._series = OrderedDict(sorted(self._series.items())) self._sorted = True
[docs] @staticmethod def from_lists(times: list[float], values: list[float]) -> TimeSeries: """Create a time series from the list of time and value points Args: times (list[float]): list of time points values (list[float]): list of value points Returns: TimeSeries: time series constructed from lists Raises: ValueError: If the len of `times` does not equal len of `values`. """ if len(times) != len(values): raise ValueError( f"The lengths of the times({len(times)})\ and values({len(values)}) lists are not equal." ) ts = TimeSeries() for t, v in zip(times, values): ts.put(t, v) return ts
[docs] @staticmethod def constant_like(times: list | float | TimeSeries, constant: float = 0.0) -> TimeSeries: """Obtain a constant time series given another time series or the list of time points, and the constant values. Args: times (list | float | TimeSeries): list of time points or a time series constant (float): constant value Returns: TimeSeries: A constant time series """ if not isinstance(times, list): times = times.times() ts = TimeSeries() for t in times: ts.put(t, constant) return ts
[docs] def concatenate(self, other: TimeSeries) -> TimeSeries: """Concatenates two time series ino to a single time series. The time points in the final time series are obtained by concatenating two lists of time points from the first and the second time series. Similarly, the values in the final time series is a concatenated list of the values in the first and the second time series. Args: other (TimeSeries): The second time series to be concatenated Notes: Keeps the time points in both time series unchanged. Assumes that the time points in the first TimeSeries are at earlier times then the time points in the second TimeSeries. Returns: TimeSeries: The concatenated time series. Raises: ValueError: If the timeseries is not empty and time points in the first TimeSeries are not strictly smaller than in the second. Example: :: time_series_1 = TimeSeries.from_lists(times=[0, 0.1], values=[1, 2]) time_series_2 = TimeSeries.from_lists(times=[0.2, 0.3], values=[4, 5]) concat_ts = time_series_1.concatenate(time_series_2) Result: concat_ts.times() = [0, 0.1, 0.2, 0.3] concat_ts.values() = [1, 2, 4, 5] """ not_empty_ts = len(other.times()) * len(self.times()) != 0 if not_empty_ts and min(other.times()) <= max(self.times()): raise ValueError( "The time points in the first TimeSeries must be strictly smaller \ then the time points in the second TimeSeries." ) new_time_series = TimeSeries() new_times = self.times() + other.times() new_values = self.values() + other.values() for t, v in zip(new_times, new_values): new_time_series.put(t, v) return new_time_series
[docs] def stitch( self, other: TimeSeries, boundary: StitchBoundaryCondition = StitchBoundaryCondition.MEAN ) -> TimeSeries: """Stitch two time series to a single time series. The time points of the second time series are shifted such that the first time point of the second series coincides with the last time point of the first series. The boundary point value is handled according to StitchBoundaryCondition argument value. Args: other (TimeSeries): The second time series to be stitched with. boundary (StitchBoundaryCondition): {"mean", "left", "right"}. Boundary point handler. Possible options are - "mean" - take the average of the boundary value points of the first and the second time series. - "left" - use the last value from the left time series as the boundary point. - "right" - use the first value from the right time series as the boundary point. Returns: TimeSeries: The stitched time series. Raises: ValueError: If boundary is not one of {"mean", "left", "right"}. Example (StitchBoundaryCondition.MEAN): :: time_series_1 = TimeSeries.from_lists(times=[0, 0.1], values=[1, 2]) time_series_2 = TimeSeries.from_lists(times=[0.2, 0.4], values=[4, 5]) stitch_ts = time_series_1.stitch(time_series_2, boundary=StitchBoundaryCondition.MEAN) Result: stitch_ts.times() = [0, 0.1, 0.3] stitch_ts.values() = [1, 3, 5] Example (StitchBoundaryCondition.LEFT): :: stitch_ts = time_series_1.stitch(time_series_2, boundary=StitchBoundaryCondition.LEFT) Result: stitch_ts.times() = [0, 0.1, 0.3] stitch_ts.values() = [1, 2, 5] Example (StitchBoundaryCondition.RIGHT): :: stitch_ts = time_series_1.stitch(time_series_2, boundary=StitchBoundaryCondition.RIGHT) Result: stitch_ts.times() = [0, 0.1, 0.3] stitch_ts.values() = [1, 4, 5] """ if len(self.times()) == 0: return TimeSeries.from_lists(times=other.times(), values=other.values()) if len(other.times()) == 0: return TimeSeries.from_lists(times=self.times(), values=self.values()) new_time_series = TimeSeries() left_t, right_t = self.times()[-1], other.times()[0] other_times = [t - right_t + left_t for t in other.times()] new_times = self.times() + other_times[1:] left, right = self.values()[-1], other.values()[0] if boundary == StitchBoundaryCondition.MEAN: bndry_val = 0.5 * sum([left, right]) elif boundary == StitchBoundaryCondition.LEFT: bndry_val = left elif boundary == StitchBoundaryCondition.RIGHT: bndry_val = right else: raise ValueError( f"Boundary handler value {boundary} is not allowed. \ Possible options are: 'mean', 'left', 'right'." ) new_values = self.values()[:-1] + [bndry_val] + other.values()[1:] for t, v in zip(new_times, new_values): new_time_series.put(t, v) return new_time_series
[docs] def discretize( self, time_resolution: Optional[Decimal], value_resolution: Optional[Decimal] ) -> TimeSeries: """Creates a discretized version of the time series, rounding all times and values to the closest multiple of the corresponding resolution. Args: time_resolution (Optional[Decimal]): Time resolution value_resolution (Optional[Decimal]): Value resolution Returns: TimeSeries: A new discretized time series. """ discretized_ts = TimeSeries() for item in self: if time_resolution is None: discretized_time = Decimal(item.time) else: discretized_time = round(Decimal(item.time) / time_resolution) * time_resolution if value_resolution is None: discretized_value = Decimal(item.value) else: discretized_value = round(Decimal(item.value) / value_resolution) * value_resolution discretized_ts.put(time=discretized_time, value=discretized_value) return discretized_ts
[docs] @staticmethod def periodic_signal(times: list[float], values: list[float], num_repeat: int = 1) -> TimeSeries: """Create a periodic time series by repeating the same block multiple times. Args: times (list[float]): List of time points in a single block values (list[float]): Values for the time series in a single block num_repeat (int): Number of block repetitions Raises: ValueError: If the first and last values are not the same Returns: TimeSeries: A new periodic time series. """ if values[0] != values[-1]: raise ValueError("The first and last values must coincide to guarantee periodicity") new_time_series = TimeSeries() repeating_block = TimeSeries.from_lists(times=times, values=values) for _index in range(num_repeat): new_time_series = new_time_series.stitch(repeating_block) return new_time_series
[docs] @staticmethod def trapezoidal_signal( area: float, value_max: float, slew_rate_max: float, time_separation_min: float = 0.0 ) -> TimeSeries: """Get a trapezoidal time series with specified area, maximum value, maximum slew rate and minimum separation of time points Args: area (float): Total area under the time series value_max (float): The maximum value of the time series slew_rate_max (float): The maximum slew rate time_separation_min (float): The minimum separation of time points Raises: ValueError: If the time separation is negative Returns: TimeSeries: A trapezoidal time series Notes: The area of a time series f(t) is defined as the time integral of f(t) from t=0 to t=T, where T is the duration. We also assume the trapezoidal time series starts and ends at zero. """ if area <= 0.0: raise ValueError("The area of the trapezoidal time series has to be positive.") if value_max <= 0.0: raise ValueError("The maximum value of the trapezoidal time series has to be positive.") if slew_rate_max <= 0.0: raise ValueError( "The maximum slew rate of the trapezoidal time series has to be positive." ) if time_separation_min < 0.0: raise ValueError( "The minimum separation of time points of the trapezoidal time series " "has to be non-negative." ) # Determine the ramp time to reach the max allowed value t_ramp = max(time_separation_min, value_max / slew_rate_max) # The max achievable area if there are 3 time points: [0, t_ramp, 2 * t_ramp] area_threshold_1 = t_ramp * value_max # The max achievable area if there are 4 time points: # [0, t_ramp, t_ramp + time_separation_min, 2 * t_ramp + time_separation_min] area_threshold_2 = (t_ramp + time_separation_min) * value_max if area <= area_threshold_1: # Determine the max value if area <= area_threshold_1 value = area / t_ramp times = [0, t_ramp, 2 * t_ramp] values = [0, value, 0] elif area <= area_threshold_2: # Determine the max value if area_threshold_1 < area <= area_threshold_2 value = area / (t_ramp + time_separation_min) times = [0, t_ramp, t_ramp + time_separation_min, 2 * t_ramp + time_separation_min] values = [0, value, value, 0] else: # Determine the t_plateau if area > area_threshold_2 t_plateau = (area - area_threshold_2) / value_max + time_separation_min times = [0, t_ramp, t_ramp + t_plateau, 2 * t_ramp + t_plateau] values = [0, value_max, value_max, 0] return TimeSeries.from_lists(times, values)
# TODO: Verify if this belongs here. def _all_close(first: TimeSeries, second: TimeSeries, tolerance: Number = 1e-7) -> bool: """Returns True if the times and values in two time series are all within (less than) a given tolerance range. The values in the TimeSeries must be numbers that can be subtracted from each-other, support getting the absolute value, and can be compared against the tolerance. Args: first (TimeSeries): A time series. second (TimeSeries): A time series. tolerance (Number): The tolerance value. Returns: bool: True if the times and values in two time series are all within (less than) a given tolerance range. If the time series are not the same size, this function will return False. """ if len(first) != len(second): return False if len(first) == 0: return True first_times = first.times() second_times = second.times() first_values = first.values() second_values = second.values() for index in range(len(first)): if abs(first_times[index] - second_times[index]) >= tolerance: return False if abs(first_values[index] - second_values[index]) >= tolerance: return False return True