# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
from __future__ import annotations
import importlib
import json
import os
import urllib.request
import warnings
from datetime import datetime
from enum import Enum
from typing import Any, ClassVar, Optional, Union
from botocore.errorfactory import ClientError
from networkx import DiGraph, complete_graph, from_edgelist
from braket.ahs.analog_hamiltonian_simulation import AnalogHamiltonianSimulation
from braket.annealing.problem import Problem
from braket.aws.aws_quantum_task import AwsQuantumTask
from braket.aws.aws_quantum_task_batch import AwsQuantumTaskBatch
from braket.aws.aws_session import AwsSession
from braket.aws.queue_information import QueueDepthInfo, QueueType
from braket.circuits import Circuit, Gate, QubitSet
from braket.circuits.gate_calibrations import GateCalibrations
from braket.circuits.noise_model import NoiseModel
from braket.device_schema import DeviceCapabilities, ExecutionDay, GateModelQpuParadigmProperties
from braket.device_schema.dwave import DwaveProviderProperties
# TODO: Remove device_action module once this is added to init in the schemas repo
from braket.device_schema.pulse.pulse_device_action_properties_v1 import PulseDeviceActionProperties
from braket.devices.device import Device
from braket.ir.blackbird import Program as BlackbirdProgram
from braket.ir.openqasm import Program as OpenQasmProgram
from braket.parametric.free_parameter import FreeParameter
from braket.parametric.free_parameter_expression import _is_float
from braket.pulse import ArbitraryWaveform, Frame, Port, PulseSequence
from braket.pulse.waveforms import _parse_waveform_from_calibration_schema
from braket.schema_common import BraketSchemaBase
[docs]
class AwsDeviceType(str, Enum):
"""Possible AWS device types"""
SIMULATOR = "SIMULATOR"
QPU = "QPU"
[docs]
class AwsDevice(Device):
"""Amazon Braket implementation of a device.
Use this class to retrieve the latest metadata about the device and to run a quantum task on the
device.
"""
REGIONS = ("us-east-1", "us-west-1", "us-west-2", "eu-west-2")
DEFAULT_SHOTS_QPU = 1000
DEFAULT_SHOTS_SIMULATOR = 0
DEFAULT_MAX_PARALLEL = 10
_GET_DEVICES_ORDER_BY_KEYS = frozenset({"arn", "name", "type", "provider_name", "status"})
_RIGETTI_GATES_TO_BRAKET: ClassVar[dict[str, str | None]] = {
# Rx_12 does not exist in the Braket SDK, it is a gate between |1> and |2>.
"Rx_12": None,
"Cz": "CZ",
"Cphaseshift": "CPhaseShift",
"Xy": "XY",
}
def __init__(
self,
arn: str,
aws_session: Optional[AwsSession] = None,
noise_model: Optional[NoiseModel] = None,
):
"""Initializes an `AwsDevice`.
Args:
arn (str): The ARN of the device
aws_session (Optional[AwsSession]): An AWS session object. Default is `None`.
noise_model (Optional[NoiseModel]): The Braket noise model to apply to the circuit
before execution. Noise model can only be added to the devices that support
noise simulation.
Note:
Some devices (QPUs) are physically located in specific AWS Regions. In some cases,
the current `aws_session` connects to a Region other than the Region in which the QPU is
physically located. When this occurs, a cloned `aws_session` is created for the Region
the QPU is located in.
See `braket.aws.aws_device.AwsDevice.REGIONS` for the AWS regions provider
devices are located in across the AWS Braket service.
This is not a device specific tuple.
"""
super().__init__(name=None, status=None)
self._arn = arn
self._gate_calibrations = None
self._properties = None
self._provider_name = None
self._poll_interval_seconds = None
self._type = None
self._aws_session = self._get_session_and_initialize(aws_session or AwsSession())
self._ports = None
self._frames = None
if noise_model:
self._validate_device_noise_model_support(noise_model)
self._noise_model = noise_model
[docs]
def run(
self,
task_specification: Union[
Circuit,
Problem,
OpenQasmProgram,
BlackbirdProgram,
PulseSequence,
AnalogHamiltonianSimulation,
],
s3_destination_folder: Optional[AwsSession.S3DestinationFolder] = None,
shots: Optional[int] = None,
poll_timeout_seconds: float = AwsQuantumTask.DEFAULT_RESULTS_POLL_TIMEOUT,
poll_interval_seconds: Optional[float] = None,
inputs: Optional[dict[str, float]] = None,
gate_definitions: Optional[dict[tuple[Gate, QubitSet], PulseSequence]] = None,
reservation_arn: str | None = None,
*aws_quantum_task_args: Any,
**aws_quantum_task_kwargs: Any,
) -> AwsQuantumTask:
"""Run a quantum task specification on this device. A quantum task can be a circuit or an
annealing problem.
Args:
task_specification (Union[Circuit, Problem, OpenQasmProgram, BlackbirdProgram, PulseSequence, AnalogHamiltonianSimulation]):
Specification of quantum task (circuit, OpenQASM program or AHS program)
to run on device.
s3_destination_folder (Optional[S3DestinationFolder]): The S3 location to
save the quantum task's results to. Default is `<default_bucket>/tasks` if evoked outside a
Braket Hybrid Job, `<Job Bucket>/jobs/<job name>/tasks` if evoked inside a Braket Hybrid Job.
shots (Optional[int]): The number of times to run the circuit or annealing problem.
Default is 1000 for QPUs and 0 for simulators.
poll_timeout_seconds (float): The polling timeout for `AwsQuantumTask.result()`,
in seconds. Default: 5 days.
poll_interval_seconds (Optional[float]): The polling interval for `AwsQuantumTask.result()`,
in seconds. Defaults to the ``getTaskPollIntervalMillis`` value specified in
``self.properties.service`` (divided by 1000) if provided, otherwise 1 second.
inputs (Optional[dict[str, float]]): Inputs to be passed along with the
IR. If the IR supports inputs, the inputs will be updated with this value.
Default: {}.
gate_definitions (Optional[dict[tuple[Gate, QubitSet], PulseSequence]]): A
`dict[tuple[Gate, QubitSet], PulseSequence]]` for a user defined gate calibration.
The calibration is defined for a particular `Gate` on a particular `QubitSet`
and is represented by a `PulseSequence`.
Default: None.
reservation_arn (str | None): The reservation ARN provided by Braket Direct
to reserve exclusive usage for the device to run the quantum task on.
Note: If you are creating tasks in a job that itself was created reservation ARN,
those tasks do not need to be created with the reservation ARN.
Default: None.
*aws_quantum_task_args (Any): Arbitrary arguments.
**aws_quantum_task_kwargs (Any): Arbitrary keyword arguments.
Returns:
AwsQuantumTask: An AwsQuantumTask that tracks the execution on the device.
Examples:
>>> circuit = Circuit().h(0).cnot(0, 1)
>>> device = AwsDevice("arn1")
>>> device.run(circuit, ("bucket-foo", "key-bar"))
>>> circuit = Circuit().h(0).cnot(0, 1)
>>> device = AwsDevice("arn2")
>>> device.run(task_specification=circuit,
>>> s3_destination_folder=("bucket-foo", "key-bar"))
>>> circuit = Circuit().h(0).cnot(0, 1)
>>> device = AwsDevice("arn3")
>>> device.run(task_specification=circuit,
>>> s3_destination_folder=("bucket-foo", "key-bar"), disable_qubit_rewiring=True)
>>> problem = Problem(
>>> ProblemType.ISING,
>>> linear={1: 3.14},
>>> quadratic={(1, 2): 10.08},
>>> )
>>> device = AwsDevice("arn4")
>>> device.run(problem, ("bucket-foo", "key-bar"),
>>> device_parameters={
>>> "providerLevelParameters": {"postprocessingType": "SAMPLING"}}
>>> )
See Also:
`braket.aws.aws_quantum_task.AwsQuantumTask.create()`
""" # noqa E501
if self._noise_model:
task_specification = self._apply_noise_model_to_circuit(task_specification)
return AwsQuantumTask.create(
self._aws_session,
self._arn,
task_specification,
s3_destination_folder
or (
AwsSession.parse_s3_uri(os.environ.get("AMZN_BRAKET_TASK_RESULTS_S3_URI"))
if "AMZN_BRAKET_TASK_RESULTS_S3_URI" in os.environ
else None
)
or (self._aws_session.default_bucket(), "tasks"),
shots if shots is not None else self._default_shots,
poll_timeout_seconds=poll_timeout_seconds,
poll_interval_seconds=poll_interval_seconds or self._poll_interval_seconds,
inputs=inputs,
gate_definitions=gate_definitions,
reservation_arn=reservation_arn,
*aws_quantum_task_args,
**aws_quantum_task_kwargs,
)
[docs]
def run_batch(
self,
task_specifications: Union[
Union[
Circuit,
Problem,
OpenQasmProgram,
BlackbirdProgram,
PulseSequence,
AnalogHamiltonianSimulation,
],
list[
Union[
Circuit,
Problem,
OpenQasmProgram,
BlackbirdProgram,
PulseSequence,
AnalogHamiltonianSimulation,
]
],
],
s3_destination_folder: Optional[AwsSession.S3DestinationFolder] = None,
shots: Optional[int] = None,
max_parallel: Optional[int] = None,
max_connections: int = AwsQuantumTaskBatch.MAX_CONNECTIONS_DEFAULT,
poll_timeout_seconds: float = AwsQuantumTask.DEFAULT_RESULTS_POLL_TIMEOUT,
poll_interval_seconds: float = AwsQuantumTask.DEFAULT_RESULTS_POLL_INTERVAL,
inputs: Optional[Union[dict[str, float], list[dict[str, float]]]] = None,
gate_definitions: Optional[dict[tuple[Gate, QubitSet], PulseSequence]] = None,
reservation_arn: Optional[str] = None,
*aws_quantum_task_args,
**aws_quantum_task_kwargs,
) -> AwsQuantumTaskBatch:
"""Executes a batch of quantum tasks in parallel
Args:
task_specifications (Union[Union[Circuit, Problem, OpenQasmProgram, BlackbirdProgram, PulseSequence, AnalogHamiltonianSimulation], list[Union[ Circuit, Problem, OpenQasmProgram, BlackbirdProgram, PulseSequence, AnalogHamiltonianSimulation]]]): # noqa
Single instance or list of circuits, annealing problems, pulse sequences,
or photonics program to run on device.
s3_destination_folder (Optional[S3DestinationFolder]): The S3 location to
save the quantum tasks' results to. Default is `<default_bucket>/tasks` if evoked outside a
Braket Job, `<Job Bucket>/jobs/<job name>/tasks` if evoked inside a Braket Job.
shots (Optional[int]): The number of times to run the circuit or annealing problem.
Default is 1000 for QPUs and 0 for simulators.
max_parallel (Optional[int]): The maximum number of quantum tasks to run on AWS in parallel.
Batch creation will fail if this value is greater than the maximum allowed
concurrent quantum tasks on the device. Default: 10
max_connections (int): The maximum number of connections in the Boto3 connection pool.
Also the maximum number of thread pool workers for the batch. Default: 100
poll_timeout_seconds (float): The polling timeout for `AwsQuantumTask.result()`,
in seconds. Default: 5 days.
poll_interval_seconds (float): The polling interval for `AwsQuantumTask.result()`,
in seconds. Defaults to the ``getTaskPollIntervalMillis`` value specified in
``self.properties.service`` (divided by 1000) if provided, otherwise 1 second.
inputs (Optional[Union[dict[str, float], list[dict[str, float]]]]): Inputs to be
passed along with the IR. If the IR supports inputs, the inputs will be updated
with this value. Default: {}.
gate_definitions (Optional[dict[tuple[Gate, QubitSet], PulseSequence]]): A
`dict[tuple[Gate, QubitSet], PulseSequence]]` for a user defined gate calibration.
The calibration is defined for a particular `Gate` on a particular `QubitSet`
and is represented by a `PulseSequence`. Default: None.
reservation_arn (Optional[str]): The reservation ARN provided by Braket Direct
to reserve exclusive usage for the device to run the quantum task on.
Note: If you are creating tasks in a job that itself was created reservation ARN,
those tasks do not need to be created with the reservation ARN.
Default: None.
Returns:
AwsQuantumTaskBatch: A batch containing all of the quantum tasks run
See Also:
`braket.aws.aws_quantum_task_batch.AwsQuantumTaskBatch`
""" # noqa E501
if self._noise_model:
task_specifications = [
self._apply_noise_model_to_circuit(task_specification)
for task_specification in task_specifications
]
return AwsQuantumTaskBatch(
AwsSession.copy_session(self._aws_session, max_connections=max_connections),
self._arn,
task_specifications,
s3_destination_folder
or (
AwsSession.parse_s3_uri(os.environ.get("AMZN_BRAKET_TASK_RESULTS_S3_URI"))
if "AMZN_BRAKET_TASK_RESULTS_S3_URI" in os.environ
else None
)
or (self._aws_session.default_bucket(), "tasks"),
shots if shots is not None else self._default_shots,
max_parallel=max_parallel if max_parallel is not None else self._default_max_parallel,
max_workers=max_connections,
poll_timeout_seconds=poll_timeout_seconds,
poll_interval_seconds=poll_interval_seconds or self._poll_interval_seconds,
inputs=inputs,
gate_definitions=gate_definitions,
reservation_arn=reservation_arn,
*aws_quantum_task_args,
**aws_quantum_task_kwargs,
)
def _get_session_and_initialize(self, session: AwsSession) -> AwsSession:
device_region = AwsDevice.get_device_region(self._arn)
return (
self._get_regional_device_session(session)
if device_region
else self._get_non_regional_device_session(session)
)
def _get_regional_device_session(self, session: AwsSession) -> AwsSession:
device_region = AwsDevice.get_device_region(self._arn)
region_session = (
session
if session.region == device_region
else AwsSession.copy_session(session, device_region)
)
try:
self._populate_properties(region_session)
return region_session
except ClientError as e:
raise (
ValueError(f"'{self._arn}' not found")
if e.response["Error"]["Code"] == "ResourceNotFoundException"
else e
) from e
def _get_non_regional_device_session(self, session: AwsSession) -> AwsSession:
current_region = session.region
try:
self._populate_properties(session)
return session
except ClientError as e:
if e.response["Error"]["Code"] != "ResourceNotFoundException":
raise e
if "qpu" not in self._arn:
raise ValueError(f"Simulator '{self._arn}' not found in '{current_region}'") from e
# Search remaining regions for QPU
for region in frozenset(AwsDevice.REGIONS) - {current_region}:
region_session = AwsSession.copy_session(session, region)
try:
self._populate_properties(region_session)
return region_session
except ClientError as e:
if e.response["Error"]["Code"] != "ResourceNotFoundException":
raise e
raise ValueError(f"QPU '{self._arn}' not found")
def _populate_properties(self, session: AwsSession) -> None:
metadata = session.get_device(self._arn)
self._name = metadata.get("deviceName")
self._status = metadata.get("deviceStatus")
self._type = AwsDeviceType(metadata.get("deviceType"))
self._provider_name = metadata.get("providerName")
self._properties = BraketSchemaBase.parse_raw_schema(metadata.get("deviceCapabilities"))
device_poll_interval = self._properties.service.getTaskPollIntervalMillis
self._poll_interval_seconds = (
device_poll_interval / 1000.0
if device_poll_interval
else AwsQuantumTask.DEFAULT_RESULTS_POLL_INTERVAL
)
self._topology_graph = None
self._frames = None
self._ports = None
@property
def type(self) -> str:
"""str: Return the device type"""
return self._type
@property
def provider_name(self) -> str:
"""str: Return the provider name"""
return self._provider_name
@property
def aws_session(self) -> AwsSession:
return self._aws_session
@property
def arn(self) -> str:
"""str: Return the ARN of the device"""
return self._arn
@property
def gate_calibrations(self) -> Optional[GateCalibrations]:
"""Calibration data for a QPU. Calibration data is shown for gates on particular gubits.
If a QPU does not expose these calibrations, None is returned.
Returns:
Optional[GateCalibrations]: The calibration object. Returns `None` if the data
is not present.
"""
if not self._gate_calibrations:
self._gate_calibrations = self.refresh_gate_calibrations()
return self._gate_calibrations
@property
def is_available(self) -> bool:
"""Returns true if the device is currently available.
Returns:
bool: Return if the device is currently available.
"""
if self.status != "ONLINE":
return False
is_available_result = False
current_datetime_utc = datetime.utcnow()
for execution_window in self.properties.service.executionWindows:
weekday = current_datetime_utc.weekday()
current_time_utc = current_datetime_utc.time().replace(microsecond=0)
if current_time_utc < execution_window.windowEndHour < execution_window.windowStartHour:
weekday = (weekday - 1) % 7
matched_day = execution_window.executionDay == ExecutionDay.EVERYDAY
matched_day = matched_day or (
execution_window.executionDay == ExecutionDay.WEEKDAYS and weekday < 5
)
matched_day = matched_day or (
execution_window.executionDay == ExecutionDay.WEEKENDS and weekday > 4
)
ordered_days = (
ExecutionDay.MONDAY,
ExecutionDay.TUESDAY,
ExecutionDay.WEDNESDAY,
ExecutionDay.THURSDAY,
ExecutionDay.FRIDAY,
ExecutionDay.SATURDAY,
ExecutionDay.SUNDAY,
)
matched_day = matched_day or (
execution_window.executionDay in ordered_days
and ordered_days.index(execution_window.executionDay) == weekday
)
matched_time = (
execution_window.windowStartHour < execution_window.windowEndHour
and execution_window.windowStartHour
<= current_time_utc
<= execution_window.windowEndHour
) or (
execution_window.windowEndHour < execution_window.windowStartHour
and (
current_time_utc >= execution_window.windowStartHour
or current_time_utc <= execution_window.windowEndHour
)
)
is_available_result = is_available_result or (matched_day and matched_time)
return is_available_result
@property
# TODO: Add a link to the boto3 docs
def properties(self) -> DeviceCapabilities:
"""DeviceCapabilities: Return the device properties
Please see `braket.device_schema` in amazon-braket-schemas-python_
.. _amazon-braket-schemas-python: https://github.com/aws/amazon-braket-schemas-python
"""
return self._properties
@property
def topology_graph(self) -> DiGraph:
"""DiGraph: topology of device as a networkx `DiGraph` object.
Examples:
>>> import networkx as nx
>>> device = AwsDevice("arn1")
>>> nx.draw_kamada_kawai(device.topology_graph, with_labels=True, font_weight="bold")
>>> topology_subgraph = device.topology_graph.subgraph(range(8))
>>> nx.draw_kamada_kawai(topology_subgraph, with_labels=True, font_weight="bold")
>>> print(device.topology_graph.edges)
Returns:
DiGraph: topology of QPU as a networkx `DiGraph` object. `None` if the topology
is not available for the device.
"""
if not self._topology_graph:
self._topology_graph = self._construct_topology_graph()
return self._topology_graph
def _construct_topology_graph(self) -> DiGraph:
"""Construct topology graph. If no such metadata is available, return `None`.
Returns:
DiGraph: topology of QPU as a networkx `DiGraph` object.
"""
if hasattr(self.properties, "paradigm") and isinstance(
self.properties.paradigm, GateModelQpuParadigmProperties
):
if self.properties.paradigm.connectivity.fullyConnected:
return complete_graph(
int(self.properties.paradigm.qubitCount), create_using=DiGraph()
)
adjacency_lists = self.properties.paradigm.connectivity.connectivityGraph
edges = []
for item in adjacency_lists.items():
i = item[0]
edges.extend([(int(i), int(j)) for j in item[1]])
return from_edgelist(edges, create_using=DiGraph())
elif hasattr(self.properties, "provider") and isinstance(
self.properties.provider, DwaveProviderProperties
):
edges = self.properties.provider.couplers
return from_edgelist(edges, create_using=DiGraph())
else:
return None
@property
def _default_shots(self) -> int:
return (
AwsDevice.DEFAULT_SHOTS_QPU if "qpu" in self.arn else AwsDevice.DEFAULT_SHOTS_SIMULATOR
)
@property
def _default_max_parallel(self) -> int:
return AwsDevice.DEFAULT_MAX_PARALLEL
def __repr__(self):
return f"Device('name': {self.name}, 'arn': {self.arn})"
def __eq__(self, other: AwsDevice):
if isinstance(other, AwsDevice):
return self.arn == other.arn
return NotImplemented
@property
def frames(self) -> dict[str, Frame]:
"""Returns a dict mapping frame ids to the frame objects for predefined frames
for this device.
"""
self._update_pulse_properties()
return self._frames or {}
@property
def ports(self) -> dict[str, Port]:
"""Returns a dict mapping port ids to the port objects for predefined ports
for this device.
"""
self._update_pulse_properties()
return self._ports or {}
[docs]
@staticmethod
def get_devices(
arns: Optional[list[str]] = None,
names: Optional[list[str]] = None,
types: Optional[list[AwsDeviceType]] = None,
statuses: Optional[list[str]] = None,
provider_names: Optional[list[str]] = None,
order_by: str = "name",
aws_session: Optional[AwsSession] = None,
) -> list[AwsDevice]:
"""Get devices based on filters and desired ordering. The result is the AND of
all the filters `arns`, `names`, `types`, `statuses`, `provider_names`.
Examples:
>>> AwsDevice.get_devices(provider_names=['Rigetti'], statuses=['ONLINE'])
>>> AwsDevice.get_devices(order_by='provider_name')
>>> AwsDevice.get_devices(types=['SIMULATOR'])
Args:
arns (Optional[list[str]]): device ARN filter, default is `None`
names (Optional[list[str]]): device name filter, default is `None`
types (Optional[list[AwsDeviceType]]): device type filter, default is `None`
QPUs will be searched for all regions and simulators will only be
searched for the region of the current session.
statuses (Optional[list[str]]): device status filter, default is `None`. When `None`
is used, RETIRED devices will not be returned. To include RETIRED devices in
the results, use a filter that includes "RETIRED" for this parameter.
provider_names (Optional[list[str]]): provider name filter, default is `None`
order_by (str): field to order result by, default is `name`.
Accepted values are ['arn', 'name', 'type', 'provider_name', 'status']
aws_session (Optional[AwsSession]): An AWS session object.
Default is `None`.
Raises:
ValueError: order_by not in ['arn', 'name', 'type', 'provider_name', 'status']
Returns:
list[AwsDevice]: list of AWS devices
"""
if order_by not in AwsDevice._GET_DEVICES_ORDER_BY_KEYS:
raise ValueError(
f"order_by '{order_by}' must be in {AwsDevice._GET_DEVICES_ORDER_BY_KEYS}"
)
types = frozenset(types or AwsDeviceType)
aws_session = aws_session or AwsSession()
device_map = {}
session_region = aws_session.boto_session.region_name
search_regions = (
(session_region,) if types == {AwsDeviceType.SIMULATOR} else AwsDevice.REGIONS
)
for region in search_regions:
session_for_region = (
aws_session
if region == session_region
else AwsSession.copy_session(aws_session, region)
)
# Simulators are only instantiated in the same region as the AWS session
types_for_region = sorted(
types if region == session_region else types - {AwsDeviceType.SIMULATOR}
)
try:
region_device_arns = [
result["deviceArn"]
for result in session_for_region.search_devices(
arns=arns,
names=names,
types=types_for_region,
statuses=statuses,
provider_names=provider_names,
)
]
device_map |= {
arn: AwsDevice(arn, session_for_region)
for arn in region_device_arns
if arn not in device_map
}
except ClientError as e:
error_code = e.response["Error"]["Code"]
warnings.warn(
f"{error_code}: Unable to search region '{region}' for devices."
" Please check your settings or try again later."
f" Continuing without devices in '{region}'.",
stacklevel=1,
)
devices = list(device_map.values())
devices.sort(key=lambda x: getattr(x, order_by))
return devices
def _update_pulse_properties(self) -> None:
if not hasattr(self.properties, "pulse") or not isinstance(
self.properties.pulse, PulseDeviceActionProperties
):
return
if self._ports is None:
self._ports = {}
port_data = self.properties.pulse.ports
for port_id, port in port_data.items():
self._ports[port_id] = Port(
port_id=port_id, dt=port.dt, properties=json.loads(port.json())
)
if self._frames is None:
self._frames = {}
if frame_data := self.properties.pulse.frames:
for frame_id, frame in frame_data.items():
self._frames[frame_id] = Frame(
frame_id=frame_id,
port=self._ports[frame.portId],
frequency=frame.frequency,
phase=frame.phase,
is_predefined=True,
properties=json.loads(frame.json()),
)
[docs]
@staticmethod
def get_device_region(device_arn: str) -> str:
"""Gets the region from a device arn.
Args:
device_arn (str): The device ARN.
Raises:
ValueError: Raised if the ARN is not properly formatted
Returns:
str: the region of the ARN.
"""
try:
return device_arn.split(":")[3]
except IndexError as e:
raise ValueError(
f"Device ARN is not a valid format: {device_arn}. For valid Braket ARNs, "
"see 'https://docs.aws.amazon.com/braket/latest/developerguide/braket-devices.html'"
) from e
[docs]
def queue_depth(self) -> QueueDepthInfo:
"""Task queue depth refers to the total number of quantum tasks currently waiting
to run on a particular device.
Returns:
QueueDepthInfo: Instance of the QueueDepth class representing queue depth
information for quantum tasks and hybrid jobs.
Queue depth refers to the number of quantum tasks and hybrid jobs queued on a particular
device. The normal tasks refers to the quantum tasks not submitted via Hybrid Jobs.
Whereas, the priority tasks refers to the total number of quantum tasks waiting to run
submitted through Amazon Braket Hybrid Jobs. These tasks run before the normal tasks.
If the queue depth for normal or priority quantum tasks is greater than 4000, we display
their respective queue depth as '>4000'. Similarly, for hybrid jobs if there are more
than 1000 jobs queued on a device, display the hybrid jobs queue depth as '>1000'.
Additionally, for QPUs if hybrid jobs queue depth is 0, we display information about
priority and count of the running hybrid job.
Example:
Queue depth information for a running job.
>>> device = AwsDevice(Device.Amazon.SV1)
>>> print(device.queue_depth())
QueueDepthInfo(quantum_tasks={<QueueType.NORMAL: 'Normal'>: '0',
<QueueType.PRIORITY: 'Priority'>: '1'}, jobs='0 (1 prioritized job(s) running)')
If more than 4000 quantum tasks queued on a device.
>>> device = AwsDevice(Device.Amazon.DM1)
>>> print(device.queue_depth())
QueueDepthInfo(quantum_tasks={<QueueType.NORMAL: 'Normal'>: '>4000',
<QueueType.PRIORITY: 'Priority'>: '2000'}, jobs='100')
"""
metadata = self.aws_session.get_device(arn=self.arn)
queue_metadata = metadata.get("deviceQueueInfo")
queue_info = {}
for response in queue_metadata:
queue_name = response.get("queue")
queue_priority = response.get("queuePriority")
queue_size = response.get("queueSize")
if queue_name == "QUANTUM_TASKS_QUEUE":
priority_enum = QueueType(queue_priority)
queue_info.setdefault("quantum_tasks", {})[priority_enum] = queue_size
else:
queue_info["jobs"] = queue_size
return QueueDepthInfo(**queue_info)
[docs]
def refresh_gate_calibrations(self) -> Optional[GateCalibrations]:
"""Refreshes the gate calibration data upon request.
If the device does not have calibration data, None is returned.
Raises:
URLError: If the URL provided returns a non 2xx response.
Returns:
Optional[GateCalibrations]: the calibration data for the device. None
is returned if the device does not have a gate calibrations URL associated.
"""
if (
hasattr(self.properties, "pulse")
and hasattr(self.properties.pulse, "nativeGateCalibrationsRef")
and self.properties.pulse.nativeGateCalibrationsRef
):
try:
with urllib.request.urlopen(
self.properties.pulse.nativeGateCalibrationsRef.split("?")[0]
) as f:
json_calibration_data = self._parse_calibration_json(
json.loads(f.read().decode("utf-8"))
)
return GateCalibrations(json_calibration_data)
except urllib.error.URLError as e:
raise urllib.error.URLError(
f"Unable to reach {self.properties.pulse.nativeGateCalibrationsRef}"
) from e
else:
return None
def _parse_waveforms(self, waveforms_json: dict) -> dict:
waveforms = {}
for waveform in waveforms_json:
parsed_waveform = _parse_waveform_from_calibration_schema(waveforms_json[waveform])
waveforms[parsed_waveform.id] = parsed_waveform
return waveforms
def _parse_pulse_sequence(
self, calibration: dict, waveforms: dict[ArbitraryWaveform]
) -> PulseSequence:
return PulseSequence._parse_from_calibration_schema(calibration, waveforms, self.frames)
def _parse_calibration_json(
self, calibration_data: dict
) -> dict[tuple[Gate, QubitSet], PulseSequence]:
"""Takes the json string from the device calibration URL and returns a structured dictionary of
corresponding `dict[tuple[Gate, QubitSet], PulseSequence]` to represent the calibration data.
Args:
calibration_data (dict): The data to be parsed. Based on
https://github.com/aws/amazon-braket-schemas-python/blob/main/src/braket/device_schema/pulse/native_gate_calibrations_v1.py.
Returns:
dict[tuple[Gate, QubitSet], PulseSequence]: The
structured data based on a mapping of `tuple[Gate, Qubit]` to its calibration represented as a
`PulseSequence`.
""" # noqa: E501
waveforms = self._parse_waveforms(calibration_data["waveforms"])
parsed_calibration_data = {}
for qubit_node in calibration_data["gates"]:
qubit = calibration_data["gates"][qubit_node]
for gate_node in qubit:
for gate in qubit[gate_node]:
gate_capitalized = getattr(
self,
f"_{self.provider_name.upper()}_GATES_TO_BRAKET",
{},
).get(gate_node.capitalize(), gate_node.capitalize())
gate_obj = (
getattr(importlib.import_module("braket.circuits.gates"), gate_capitalized)
if gate_capitalized is not None
else None
)
qubits = QubitSet([int(x) for x in gate["qubits"]])
if gate_obj is None:
# We drop out gates that are not implemented in the BDK
continue
argument = None
if gate["arguments"]:
argument = (
float(gate["arguments"][0])
if _is_float(gate["arguments"][0])
else FreeParameter(gate["arguments"][0])
)
gate_qubit_key = (
(gate_obj(argument), qubits) if argument else (gate_obj(), qubits)
)
gate_qubit_pulse = self._parse_pulse_sequence(gate["calibrations"], waveforms)
parsed_calibration_data[gate_qubit_key] = gate_qubit_pulse
return parsed_calibration_data