diff --git a/docs/source/gflow_utils.rst b/docs/source/gflow_utils.rst new file mode 100644 index 000000000..8c7e00376 --- /dev/null +++ b/docs/source/gflow_utils.rst @@ -0,0 +1,12 @@ +Gflow Utils +=========== + +:mod:`graphqomb.gflow_utils` module ++++++++++++++++++++++++++++++++++++ + +.. automodule:: graphqomb.gflow_utils + +Functions +--------- + +.. autofunction:: graphqomb.gflow_utils.gflow_wrapper diff --git a/docs/source/graphstate.rst b/docs/source/graphstate.rst index 9e228eddb..392e90718 100644 --- a/docs/source/graphstate.rst +++ b/docs/source/graphstate.rst @@ -24,11 +24,3 @@ Functions .. autofunction:: graphqomb.graphstate.compose .. autofunction:: graphqomb.graphstate.bipartite_edges .. autofunction:: graphqomb.graphstate.odd_neighbors - -Auxiliary Classes ------------------- -.. autoclass:: graphqomb.graphstate.LocalCliffordExpansion - :members: - -.. autoclass:: graphqomb.graphstate.ExpansionMaps - :members: diff --git a/docs/source/references.rst b/docs/source/references.rst index 5bc984987..f6039a287 100644 --- a/docs/source/references.rst +++ b/docs/source/references.rst @@ -15,6 +15,7 @@ Module reference random_objects feedforward focus_flow + gflow_utils command pattern pauli_frame @@ -22,3 +23,4 @@ Module reference scheduler stim_compiler visualizer + zxgraphstate diff --git a/docs/source/zxgraphstate.rst b/docs/source/zxgraphstate.rst new file mode 100644 index 000000000..c790cfd67 --- /dev/null +++ b/docs/source/zxgraphstate.rst @@ -0,0 +1,32 @@ +ZXGraphState +============ + +:mod:`graphqomb.zxgraphstate` module ++++++++++++++++++++++++++++++++++++++ + +.. automodule:: graphqomb.zxgraphstate + +ZX Graph State +-------------- + +.. autoclass:: graphqomb.zxgraphstate.ZXGraphState + :members: + :show-inheritance: + :member-order: bysource + +Functions +--------- + +.. autofunction:: graphqomb.zxgraphstate.complete_graph_edges + +Auxiliary Classes +------------------ + +.. autoclass:: graphqomb.zxgraphstate.InputLocalCliffordExpansion + :members: + +.. autoclass:: graphqomb.zxgraphstate.OutputLocalCliffordExpansion + :members: + +.. autoclass:: graphqomb.zxgraphstate.ExpansionMaps + :members: diff --git a/examples/zxgraph_simplification.py b/examples/zxgraph_simplification.py new file mode 100644 index 000000000..6123e7f9f --- /dev/null +++ b/examples/zxgraph_simplification.py @@ -0,0 +1,136 @@ +""" +Basic example of simplifying a ZX-diagram. +========================================== + +By using the full_reduce method, +we can remove all the internal Clifford nodes and some non-Clifford nodes from the graph state, +which generates a simpler ZX-diagram. +This example is a simple demonstration of the simplification process. + +Note that as a result of the simplification, local Clifford operations are applied to the input/output nodes. +""" + +# %% + +from __future__ import annotations + +from copy import deepcopy + +import numpy as np + +from graphqomb.gflow_utils import gflow_wrapper +from graphqomb.qompiler import qompile +from graphqomb.random_objects import generate_random_flow_graph +from graphqomb.simulator import PatternSimulator, SimulatorBackend +from graphqomb.visualizer import visualize +from graphqomb.zxgraphstate import ZXGraphState + +# %% +# Prepare an initial random graph state with flow +graph, flow = generate_random_flow_graph(width=3, depth=4, edge_p=0.5) +zx_graph, _ = ZXGraphState.from_base_graph_state(graph) +visualize(zx_graph) + +# %% +# We can compile the graph state into a measurement pattern, simulate it, and get the resulting statevector. +pattern = qompile(zx_graph, flow) +sim = PatternSimulator(pattern, backend=SimulatorBackend.StateVector) +sim.simulate() +statevec_original = sim.state + + +# %% +def print_boundary_lcs(zxgraph: ZXGraphState) -> None: + lc_map = zxgraph.local_cliffords + for node in zxgraph.input_node_indices | zxgraph.output_node_indices: + # check lc on input and output nodes + lc = lc_map.get(node, None) + if lc is not None: + if node in zxgraph.input_node_indices: + print(f"Input node {node} has local Clifford: alpha={lc.alpha}, beta={lc.beta}, gamma={lc.gamma}") + else: + print(f"Output node {node} has local Clifford: alpha={lc.alpha}, beta={lc.beta}, gamma={lc.gamma}") + else: + print(f"Node {node} has no local Clifford.") + + +def print_meas_bses(graph: ZXGraphState) -> None: + print("node | plane | angle (/pi)") + for node in graph.input_node_indices: + print(f"{node} (input)", graph.meas_bases[node].plane, graph.meas_bases[node].angle / np.pi) + for node in graph.physical_nodes - set(graph.input_node_indices) - set(graph.output_node_indices): + print(node, graph.meas_bases[node].plane, graph.meas_bases[node].angle / np.pi) + for node in graph.output_node_indices: + print(f"{node} (output)", "-", "-") + + +# %% +print_boundary_lcs(zx_graph) + +# %% +# Initial graph state before simplification +print_meas_bses(zx_graph) + + +# %% +# Simplify the graph state by full_reduce method +zx_graph_smp = deepcopy(zx_graph) +zx_graph_smp.full_reduce() + +# %% +# Simplified graph state after full_reduce. +visualize(zx_graph_smp) +print_meas_bses(zx_graph_smp) +print_boundary_lcs(zx_graph_smp) + + +# %% +# Let us compare the graph state before and after simplification. +# We simulate the pattern obtained from the simplified graph state. +# Note that we need to call the `expand_local_cliffords` method before generating the pattern to get the gflow. + +zx_graph_smp.expand_local_cliffords() +print("input_node_indices: ", set(zx_graph_smp.input_node_indices)) +print("output_node_indices: ", set(zx_graph_smp.output_node_indices)) +print("local_cliffords: ", zx_graph_smp.local_cliffords) + +print_meas_bses(zx_graph_smp) +visualize(zx_graph_smp) +print_boundary_lcs(zx_graph_smp) + +# %% +# Now we can obtain the gflow for the simplified graph state. +# Then, we compile the simplified graph state into a measurement pattern, +# simulate it, and get the resulting statevector. +gflow_smp = gflow_wrapper(zx_graph_smp) +pattern_smp = qompile(zx_graph_smp, gflow_smp) +sim_smp = PatternSimulator(pattern_smp, backend=SimulatorBackend.StateVector) +sim_smp.simulate() + +# %% +statevec_smp = sim_smp.state +# %% +# normalization check +print("norm of original statevector:", np.linalg.norm(statevec_original.state())) +print("norm of simplified statevector:", np.linalg.norm(statevec_smp.state())) + +# %% +# Finally, we compare the expectation values of random observables before and after simplification. +rng = np.random.default_rng() +for i in range(len(zx_graph.input_node_indices)): + rand_mat = rng.random((2, 2)) + 1j * rng.random((2, 2)) + rand_mat += rand_mat.T.conj() + exp = statevec_original.expectation(rand_mat, [i]) + exp_cr = statevec_smp.expectation(rand_mat, [i]) + print("Expectation values for rand_mat\n===============================") + print("rand_mat: \n", rand_mat) + print("Original: \t\t", exp) + print("After simplification: \t", exp_cr) + +print("norm: ", np.linalg.norm(statevec_original.state()), np.linalg.norm(statevec_smp.state())) +print("data shape: ", statevec_original.state().shape, statevec_smp.state().shape) +psi_org = statevec_original.state() +psi_smp = statevec_smp.state() +print("inner product: ", np.abs(np.vdot(psi_org, psi_smp))) + +# %% diff --git a/graphqomb/euler.py b/graphqomb/euler.py index 0b4e0bb92..6d073f9c0 100644 --- a/graphqomb/euler.py +++ b/graphqomb/euler.py @@ -213,17 +213,52 @@ def conjugate(self) -> LocalClifford: return LocalClifford(-self.gamma, -self.beta, -self.alpha) -def meas_basis_info(vector: NDArray[np.complex128]) -> tuple[Plane, float]: +def _meas_basis_candidates(vector: NDArray[np.complex128]) -> list[tuple[Plane, float]]: + r"""Return candidate measurement planes and angles corresponding to a vector. + + Parameters + ---------- + vector : `numpy.typing.NDArray`\[`numpy.complex128`\] + 1 qubit state vector + + Returns + ------- + `list`\[`tuple`\[`Plane`, `float`\]\] + candidate measurement planes and angles + """ + theta, phi = bloch_sphere_coordinates(vector) + candidates: list[tuple[Plane, float]] = [] + if is_clifford_angle(phi): + # YZ or XZ plane + if is_close_angle(2 * phi, 0): # 0 or pi + xz_theta = -theta if is_close_angle(phi, math.pi) else theta + candidates.append((Plane.XZ, xz_theta)) + if is_close_angle(phi, 0) and is_close_angle(2 * theta, 0): + candidates.append((Plane.YZ, theta)) + if is_close_angle(2 * (phi - math.pi / 2), 0): + yz_theta = -theta if is_close_angle(phi, 3 * math.pi / 2) else theta + candidates.append((Plane.YZ, yz_theta)) + if is_clifford_angle(theta) and not is_clifford_angle(theta / 2): + # XY plane + phi = phi + math.pi if is_close_angle(theta, 3 * math.pi / 2) else phi + candidates.append((Plane.XY, phi)) + + return candidates + + +def meas_basis_info(vector: NDArray[np.complex128], expected_plane: Plane | None = None) -> tuple[Plane, float]: r"""Return the measurement plane and angle corresponding to a vector. Parameters ---------- vector : `numpy.typing.NDArray`\[`numpy.complex128`\] 1 qubit state vector + expected_plane : `Plane` | `None`, optional + expected measurement plane to preserve gflow existence, by default None Returns ------- - `tuple`\[`Plane`, `float`] + `tuple`\[`Plane`, `float`\] measurement plane and angle Raises @@ -231,23 +266,17 @@ def meas_basis_info(vector: NDArray[np.complex128]) -> tuple[Plane, float]: ValueError if the vector does not lie on any of 3 planes """ - theta, phi = bloch_sphere_coordinates(vector) - if is_clifford_angle(phi): - # YZ or XZ plane - if is_clifford_angle(phi / 2): # 0 or pi - if is_close_angle(phi, math.pi): - theta = -theta - return Plane.XZ, theta - if is_close_angle(phi, 3 * math.pi / 2): - theta = -theta - return Plane.YZ, theta - if is_clifford_angle(theta) and not is_clifford_angle(theta / 2): - # XY plane - if is_close_angle(theta, 3 * math.pi / 2): - phi += math.pi - return Plane.XY, phi - msg = "The vector does not lie on any of 3 planes" - raise ValueError(msg) + candidates = _meas_basis_candidates(vector) + + if not candidates: + msg = "The vector does not lie on any of 3 planes" + raise ValueError(msg) + + if expected_plane is not None: + for plane, angle in candidates: + if plane == expected_plane: + return plane, angle + return candidates[0] # TODO(masa10-f): Algebraic backend for this computation(#023) @@ -275,7 +304,7 @@ def update_lc_lc(lc1: LocalClifford, lc2: LocalClifford) -> LocalClifford: # TODO(masa10-f): Algebraic backend for this computation(#023) -def update_lc_basis(lc: LocalClifford, basis: MeasBasis) -> PlannerMeasBasis: +def update_lc_basis(lc: LocalClifford, basis: MeasBasis, expected_plane: Plane | None = None) -> PlannerMeasBasis: """Update a `MeasBasis` object with an action of `LocalClifford` object. Parameters @@ -284,17 +313,19 @@ def update_lc_basis(lc: LocalClifford, basis: MeasBasis) -> PlannerMeasBasis: `LocalClifford` basis : `MeasBasis` `MeasBasis` + expected_plane : `Plane` | `None`, optional + expected measurement plane to preserve gflow existence, by default None Returns ------- `PlannerMeasBasis` updated `PlannerMeasBasis` """ - matrix = lc.matrix() + matrix = lc.matrix().conjugate().T vector = basis.vector() updated_vector = np.asarray(matrix @ vector, dtype=np.complex128) - plane, angle = meas_basis_info(updated_vector) + plane, angle = meas_basis_info(updated_vector, expected_plane=expected_plane) return PlannerMeasBasis(plane, angle) diff --git a/graphqomb/gflow_utils.py b/graphqomb/gflow_utils.py new file mode 100644 index 000000000..a107db716 --- /dev/null +++ b/graphqomb/gflow_utils.py @@ -0,0 +1,68 @@ +"""Utilities for generalized flow (gflow) computation. + +This module provides: + +- `gflow_wrapper`: Thin adapter around ``swiflow.gflow`` so that gflow can be computed directly + from a `BaseGraphState` instance. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import networkx as nx +from swiflow import gflow +from swiflow.common import Plane as SfPlane +from typing_extensions import assert_never + +from graphqomb.common import Plane + +if TYPE_CHECKING: + from networkx import Graph as NxGraph + + from graphqomb.graphstate import BaseGraphState + + +def gflow_wrapper(graphstate: BaseGraphState) -> dict[int, set[int]]: + """Utilize ``swiflow.gflow`` to search gflow. + + Parameters + ---------- + graphstate : `BaseGraphState` + graph state to find gflow + + Returns + ------- + ``dict[int, set[int]]`` + gflow object + + Raises + ------ + ValueError + If no gflow is found + """ + graph: NxGraph[int] = nx.Graph() + graph.add_nodes_from(graphstate.physical_nodes) + graph.add_edges_from(graphstate.physical_edges) + + bases = graphstate.meas_bases + planes = {node: bases[node].plane for node in bases} + swiflow_planes: dict[int, SfPlane] = {} + for node, plane in planes.items(): + if plane == Plane.XY: + swiflow_planes[node] = SfPlane.XY + elif plane == Plane.YZ: + swiflow_planes[node] = SfPlane.YZ + elif plane == Plane.XZ: + swiflow_planes[node] = SfPlane.XZ + else: + assert_never(plane) + + gflow_object = gflow.find( + graph, graphstate.input_node_indices.keys(), graphstate.output_node_indices.keys(), swiflow_planes + ) + if gflow_object is None: + msg = "No flow found" + raise ValueError(msg) + + return gflow_object.f diff --git a/graphqomb/graphstate.py b/graphqomb/graphstate.py index 4253ac542..bc59ab9ac 100644 --- a/graphqomb/graphstate.py +++ b/graphqomb/graphstate.py @@ -4,8 +4,6 @@ - `BaseGraphState`: Abstract base class for Graph State. - `GraphState`: Minimal implementation of Graph State. -- `LocalCliffordExpansion`: Local Clifford expansion. -- `ExpansionMaps`: Expansion maps for local clifford operators. - `compose`: Function to compose two graph states sequentially. - `bipartite_edges`: Function to create a complete bipartite graph between two sets of nodes. - `odd_neighbors`: Function to get odd neighbors of a node. @@ -21,15 +19,14 @@ from abc import ABC from collections.abc import Hashable, Iterable, Mapping, Sequence from collections.abc import Set as AbstractSet -from typing import TYPE_CHECKING, NamedTuple, TypeVar +from typing import TYPE_CHECKING, TypeVar import typing_extensions -from graphqomb.common import MeasBasis, Plane, PlannerMeasBasis -from graphqomb.euler import update_lc_basis, update_lc_lc - if TYPE_CHECKING: - from graphqomb.euler import LocalClifford + from typing_extensions import Self + + from graphqomb.common import MeasBasis NodeT = TypeVar("NodeT", bound=Hashable) @@ -178,7 +175,6 @@ class GraphState(BaseGraphState): __physical_nodes: set[int] __physical_edges: dict[int, set[int]] __meas_bases: dict[int, MeasBasis] - __local_cliffords: dict[int, LocalClifford] __node_counter: int @@ -188,7 +184,6 @@ def __init__(self) -> None: self.__physical_nodes = set() self.__physical_edges = {} self.__meas_bases = {} - self.__local_cliffords = {} self.__node_counter = 0 @@ -257,17 +252,6 @@ def meas_bases(self) -> dict[int, MeasBasis]: """ return self.__meas_bases.copy() - @property - def local_cliffords(self) -> dict[int, LocalClifford]: - r"""Return local clifford nodes. - - Returns - ------- - `dict`\[`int`, `LocalClifford`\] - local clifford nodes. - """ - return self.__local_cliffords.copy() - def _check_meas_basis(self) -> None: """Check if the measurement basis is set for all physical nodes except output nodes. @@ -363,7 +347,6 @@ def remove_physical_node(self, node: int) -> None: if node in self.output_node_indices: del self.__output_node_indices[node] self.__meas_bases.pop(node, None) - self.__local_cliffords.pop(node, None) def remove_physical_edge(self, node1: int, node2: int) -> None: """Remove a physical edge from the graph state. @@ -413,6 +396,54 @@ def register_input(self, node: int, q_index: int) -> None: raise ValueError(msg) self.__input_node_indices[node] = q_index + def unregister_input(self, node: int) -> int: + """Remove the input label from the node. + + Parameters + ---------- + node : `int` + node index + + Returns + ------- + `int` + logical qubit index of the unregistered input node + + Raises + ------ + ValueError + If the node is not registered as an input node. + """ + self._ensure_node_exists(node) + if node not in self.__input_node_indices: + msg = "The node is not registered as an input node." + raise ValueError(msg) + return self.__input_node_indices.pop(node) + + def replace_input(self, old_node: int, new_node: int) -> None: + """Replace the input node with a new node. + + Parameters + ---------- + old_node : `int` + node index whose input label to be replaced + new_node : `int` + node index to be set as the new input node + + Raises + ------ + ValueError + If the new_node is already registered as an input node. + """ + self._ensure_node_exists(new_node) + + q_index = self.unregister_input(old_node) + try: + self.register_input(new_node, q_index) + except ValueError: + self.register_input(old_node, q_index) # rollback + raise + @typing_extensions.override def register_output(self, node: int, q_index: int) -> None: """Mark the node as an output node. @@ -440,42 +471,66 @@ def register_output(self, node: int, q_index: int) -> None: raise ValueError(msg) self.__output_node_indices[node] = q_index - @typing_extensions.override - def assign_meas_basis(self, node: int, meas_basis: MeasBasis) -> None: - """Set the measurement basis of the node. + def unregister_output(self, node: int) -> int: + """Remove the output label from the node. Parameters ---------- node : `int` node index - meas_basis : `MeasBasis` - measurement basis + + Returns + ------- + `int` + logical qubit index of the unregistered output node + + Raises + ------ + ValueError + If the node is not registered as an output node. """ self._ensure_node_exists(node) - self.__meas_bases[node] = meas_basis + if node not in self.__output_node_indices: + msg = "The node is not registered as an output node." + raise ValueError(msg) + return self.__output_node_indices.pop(node) - def apply_local_clifford(self, node: int, lc: LocalClifford) -> None: - """Apply a local clifford to the node. + def replace_output(self, old_node: int, new_node: int) -> None: + """Replace the output node with a new node. + + Parameters + ---------- + old_node : `int` + node index whose output label to be replaced + new_node : `int` + node index to be set as the new output node + + Raises + ------ + ValueError + If the new_node is already registered as an output node. + """ + self._ensure_node_exists(new_node) + q_index = self.unregister_output(old_node) + try: + self.register_output(new_node, q_index) + except ValueError: + self.register_output(old_node, q_index) # rollback + raise + + @typing_extensions.override + def assign_meas_basis(self, node: int, meas_basis: MeasBasis) -> None: + """Set the measurement basis of the node. Parameters ---------- node : `int` node index - lc : `LocalClifford` - local clifford operator + meas_basis : `MeasBasis` + measurement basis """ self._ensure_node_exists(node) - if node in self.input_node_indices or node in self.output_node_indices: - original_lc = self._pop_local_clifford(node) - if original_lc is not None: - new_lc = update_lc_lc(lc, original_lc) - self.__local_cliffords[node] = new_lc - else: - self.__local_cliffords[node] = lc - else: - self._check_meas_basis() - new_meas_basis = update_lc_basis(lc.conjugate(), self.meas_bases[node]) - self.assign_meas_basis(node, new_meas_basis) + self.__meas_bases[node] = meas_basis @typing_extensions.override def neighbors(self, node: int) -> set[int]: @@ -499,127 +554,18 @@ def check_canonical_form(self) -> None: r"""Check if the graph state is in canonical form. The definition of canonical form is: - 1. No Clifford operators applied. - 2. All non-output nodes have measurement basis + All non-output nodes have measurement basis Raises ------ ValueError If the graph state is not in canonical form. """ - if self.__local_cliffords: - msg = "Clifford operators are applied." - raise ValueError(msg) for node in self.physical_nodes - set(self.output_node_indices): if self.meas_bases.get(node) is None: msg = "All non-output nodes must have measurement basis." raise ValueError(msg) - def expand_local_cliffords(self) -> ExpansionMaps: - r"""Expand local Clifford operators applied on the input and output nodes. - - Returns - ------- - `ExpansionMaps` - A tuple of dictionaries mapping input and output node indices to the new node indices created. - """ - input_node_map = self._expand_input_local_cliffords() - output_node_map = self._expand_output_local_cliffords() - return ExpansionMaps(input_node_map, output_node_map) - - def _pop_local_clifford(self, node: int) -> LocalClifford | None: - """Pop local clifford of the node. - - Parameters - ---------- - node : `int` - node index to remove local clifford. - - Returns - ------- - `LocalClifford` | `None` - removed local clifford - """ - return self.__local_cliffords.pop(node, None) - - def _expand_input_local_cliffords(self) -> dict[int, LocalCliffordExpansion]: - r"""Expand local Clifford operators applied on the input nodes. - - Returns - ------- - `dict`\[`int`, `LocalCliffordExpansion`\] - A dictionary mapping input node indices to the new node indices created. - """ - node_index_addition_map: dict[int, LocalCliffordExpansion] = {} - new_input_indices: dict[int, int] = {} - for input_node, q_index in self.input_node_indices.items(): - lc = self._pop_local_clifford(input_node) - if lc is None: - new_input_indices[input_node] = q_index - continue - - new_node_index0 = self.add_physical_node() - new_input_indices[new_node_index0] = q_index - new_node_index1 = self.add_physical_node() - new_node_index2 = self.add_physical_node() - - self.add_physical_edge(new_node_index0, new_node_index1) - self.add_physical_edge(new_node_index1, new_node_index2) - self.add_physical_edge(new_node_index2, input_node) - - self.assign_meas_basis(new_node_index0, PlannerMeasBasis(Plane.XY, lc.alpha)) - self.assign_meas_basis(new_node_index1, PlannerMeasBasis(Plane.XY, lc.beta)) - self.assign_meas_basis(new_node_index2, PlannerMeasBasis(Plane.XY, lc.gamma)) - - node_index_addition_map[input_node] = LocalCliffordExpansion( - new_node_index0, new_node_index1, new_node_index2 - ) - - self.__input_node_indices = {} - for new_input_index, q_index in new_input_indices.items(): - self.register_input(new_input_index, q_index) - - return node_index_addition_map - - def _expand_output_local_cliffords(self) -> dict[int, LocalCliffordExpansion]: - r"""Expand local Clifford operators applied on the output nodes. - - Returns - ------- - `dict`\[`int`, `LocalCliffordExpansion`\] - A dictionary mapping output node indices to the new node indices created. - """ - node_index_addition_map: dict[int, LocalCliffordExpansion] = {} - new_output_index_map: dict[int, int] = {} - for output_node, q_index in self.output_node_indices.items(): - lc = self._pop_local_clifford(output_node) - if lc is None: - new_output_index_map[output_node] = q_index - continue - - new_node_index0 = self.add_physical_node() - new_node_index1 = self.add_physical_node() - new_node_index2 = self.add_physical_node() - new_output_index_map[new_node_index2] = q_index - - self.add_physical_edge(output_node, new_node_index0) - self.add_physical_edge(new_node_index0, new_node_index1) - self.add_physical_edge(new_node_index1, new_node_index2) - - self.assign_meas_basis(output_node, PlannerMeasBasis(Plane.XY, lc.alpha)) - self.assign_meas_basis(new_node_index0, PlannerMeasBasis(Plane.XY, lc.beta)) - self.assign_meas_basis(new_node_index1, PlannerMeasBasis(Plane.XY, lc.gamma)) - - node_index_addition_map[output_node] = LocalCliffordExpansion( - new_node_index0, new_node_index1, new_node_index2 - ) - - self.__output_node_indices = {} - for new_output_index, q_index in new_output_index_map.items(): - self.register_output(new_output_index, q_index) - - return node_index_addition_map - @classmethod def from_graph( # noqa: C901, PLR0912 cls, @@ -735,8 +681,7 @@ def from_graph( # noqa: C901, PLR0912 def from_base_graph_state( cls, base: BaseGraphState, - copy_local_cliffords: bool = True, - ) -> tuple[GraphState, dict[int, int]]: + ) -> tuple[Self, dict[int, int]]: r"""Create a new GraphState from an existing BaseGraphState instance. This method creates a complete copy of the graph structure, including nodes, @@ -747,11 +692,6 @@ def from_base_graph_state( ---------- base : `BaseGraphState` The source graph state to copy from. - copy_local_cliffords : `bool`, optional - Whether to copy local Clifford operators if the source is a GraphState. - If True and the source has local Cliffords, they are copied. - If False, local Cliffords are not copied (canonical form only). - Default is True. Returns ------- @@ -784,30 +724,9 @@ def from_base_graph_state( for node, meas_basis in base.meas_bases.items(): graph_state.assign_meas_basis(node_map[node], meas_basis) - # Copy local Clifford operators if requested and source is GraphState - if copy_local_cliffords and isinstance(base, GraphState): - for node, lc in base.local_cliffords.items(): - # Access private attribute to copy local cliffords - graph_state.apply_local_clifford(node_map[node], lc) - return graph_state, node_map -class LocalCliffordExpansion(NamedTuple): - """Local Clifford expansion map for each input/output node.""" - - node1: int - node2: int - node3: int - - -class ExpansionMaps(NamedTuple): - """Expansion maps for inputs and outputs with Local Clifford.""" - - input_node_map: dict[int, LocalCliffordExpansion] - output_node_map: dict[int, LocalCliffordExpansion] - - def compose( # noqa: C901 graph1: BaseGraphState, graph2: BaseGraphState ) -> tuple[GraphState, dict[int, int], dict[int, int]]: diff --git a/graphqomb/random_objects.py b/graphqomb/random_objects.py index d43bffc0d..d299d5b74 100644 --- a/graphqomb/random_objects.py +++ b/graphqomb/random_objects.py @@ -11,10 +11,13 @@ import numpy as np +from graphqomb.circuit import MBQCCircuit from graphqomb.common import default_meas_basis from graphqomb.graphstate import GraphState if TYPE_CHECKING: + from collections.abc import Sequence + from numpy.random import Generator @@ -80,3 +83,48 @@ def generate_random_flow_graph( flow[node_index - width] = {node_index} return graph, flow + + +def random_circ( + width: int, + depth: int, + rng: np.random.Generator | None = None, + edge_p: float = 0.5, + angle_candidates: Sequence[float] = (0.0, np.pi / 3, 2 * np.pi / 3, np.pi), +) -> MBQCCircuit: + r"""Generate a random MBQC circuit. + + Parameters + ---------- + width : `int` + circuit width + depth : `int` + circuit depth + rng : `numpy.random.Generator`, optional + random number generator, by default numpy.random.default_rng() + edge_p : `float`, optional + probability of adding CZ gate, by default 0.5 + angle_candidates : `collections.abc.Sequence[float]`, optional + sequence of angles, by default (0, np.pi / 3, 2 * np.pi / 3, np.pi) + + Returns + ------- + `MBQCCircuit` + generated MBQC circuit + """ + if rng is None: + rng = np.random.default_rng() + circ = MBQCCircuit(width) + for d in range(depth): + for j in range(width): + circ.j(j, rng.choice(angle_candidates)) + if d < depth - 1: + for j in range(width): + if rng.random() < edge_p: + circ.cz(j, (j + 1) % width) + num = rng.integers(0, width) + if num > 0: + target = sorted(set(rng.choice(range(width), num))) + circ.phase_gadget(target, rng.choice(angle_candidates)) + + return circ diff --git a/graphqomb/zxgraphstate.py b/graphqomb/zxgraphstate.py new file mode 100644 index 000000000..ae0b96aea --- /dev/null +++ b/graphqomb/zxgraphstate.py @@ -0,0 +1,875 @@ +"""ZXGraph State classes for Measurement-based Quantum Computing. + +This module provides: + +- `ZXGraphState`: Graph State for the ZX-calculus. +- `complete_graph_edges`: Return a set of edges for the complete graph on the given nodes. +- `InputLocalCliffordExpansion`: Local Clifford expansion for input nodes. +- `OutputLocalCliffordExpansion`: Local Clifford expansion for output nodes. +- `ExpansionMaps`: Expansion maps for local clifford operators. +""" + +from __future__ import annotations + +import sys +from collections import defaultdict +from itertools import combinations +from typing import TYPE_CHECKING, NamedTuple + +import numpy as np +import typing_extensions + +from graphqomb.common import ( + Plane, + PlannerMeasBasis, + is_clifford_angle, + is_close_angle, +) +from graphqomb.euler import LocalClifford, update_lc_basis, update_lc_lc +from graphqomb.graphstate import BaseGraphState, GraphState, bipartite_edges + +if sys.version_info >= (3, 10): + from typing import TypeAlias +else: + from typing_extensions import TypeAlias + +if TYPE_CHECKING: + from collections.abc import Callable, Iterable + from collections.abc import Set as AbstractSet + + CliffordRule: TypeAlias = tuple[Callable[[int, float], bool], Callable[[int], None]] + + +class ZXGraphState(GraphState): + r"""Graph State for the ZX-calculus. + + Attributes + ---------- + input_nodes : `set`\[`int`\] + set of input nodes + output_nodes : `set`\[`int`\] + set of output nodes + physical_nodes : `set`\[`int`\] + set of physical nodes + physical_edges : `set`\[`tuple`\[`int`, `int`\]\] + physical edges + meas_bases : `dict`\[`int`, `MeasBasis`\] + measurement bases + q_indices : `dict`\[`int`, `int`\] + qubit indices + """ + + __local_cliffords: dict[int, LocalClifford] + + def __init__(self) -> None: + super().__init__() + self.__local_cliffords = {} + + @property + def local_cliffords(self) -> dict[int, LocalClifford]: + r"""Return local clifford nodes. + + Returns + ------- + `dict`\[`int`, `LocalClifford`\] + local clifford nodes. + """ + return self.__local_cliffords.copy() + + def apply_local_clifford(self, node: int, lc: LocalClifford, expected_plane: Plane | None = None) -> None: + """Apply a local clifford to the node. + + Parameters + ---------- + node : `int` + node index + lc : `LocalClifford` + local clifford operator + expected_plane : `Plane` | `None`, optional + expected measurement plane to preserve gflow existence, by default None + """ + self._ensure_node_exists(node) + if node in self.input_node_indices or node in self.output_node_indices: + original_lc = self._pop_local_clifford(node) + if original_lc is not None: + new_lc = update_lc_lc(lc, original_lc) + self.__local_cliffords[node] = new_lc + else: + self.__local_cliffords[node] = lc + else: + self._check_meas_basis() + new_meas_basis = update_lc_basis(lc, self.meas_bases[node], expected_plane=expected_plane) + self.assign_meas_basis(node, new_meas_basis) + + def remove_physical_node(self, node: int) -> None: + """Remove a physical node from the zxgraph state. + + Parameters + ---------- + node : `int` + node index + """ + super().remove_physical_node(node) + self.__local_cliffords.pop(node, None) + + @typing_extensions.override + def check_canonical_form(self) -> None: + r"""Check if the graph state is in canonical form. + + The definition of canonical form is: + 1. No Clifford operators applied. + 2. All non-output nodes have measurement basis + + Raises + ------ + ValueError + If the graph state is not in canonical form. + """ + if self.__local_cliffords: + msg = "Clifford operators are applied." + raise ValueError(msg) + super().check_canonical_form() + + def _pop_local_clifford(self, node: int) -> LocalClifford | None: + """Pop local clifford of the node. + + Parameters + ---------- + node : `int` + node index to remove local clifford. + + Returns + ------- + `LocalClifford` | `None` + removed local clifford + """ + return self.__local_cliffords.pop(node, None) + + @classmethod + def from_base_graph_state( + cls, + base: BaseGraphState, + copy_local_cliffords: bool = True, + ) -> tuple[ZXGraphState, dict[int, int]]: + r"""Create a new ZXGraphState from an existing BaseGraphState instance. + + This method creates a complete copy of the graph structure, including nodes, + edges, input/output registrations, and measurement bases. Useful for creating + mutable copies or converting between ZXGraphState implementations. + + Parameters + ---------- + base : `BaseGraphState` + The source graph state to copy from. + copy_local_cliffords : `bool`, optional + Whether to copy local Clifford operators if the source is a ZXGraphState. + If True and the source has local Cliffords, they are copied. + If False, local Cliffords are not copied (canonical form only). + Default is True. + + Returns + ------- + `tuple`\[`ZXGraphState`, `dict`\[`int`, `int`\]\] + - Created ZXGraphState instance + - Mapping from source node indices to new node indices + """ + zxgraph_state, node_map = super().from_base_graph_state(base) + + # Copy local Clifford operators if requested and source is ZXGraphState + if not copy_local_cliffords or not isinstance(base, ZXGraphState) or not hasattr(base, "local_cliffords"): + return zxgraph_state, node_map + for node, lc in base.local_cliffords.items(): + zxgraph_state.apply_local_clifford(node_map[node], lc) + + return zxgraph_state, node_map + + @property + def _clifford_rules(self) -> tuple[CliffordRule, ...]: + r"""Tuple of rules (check_func, action_func) for removing local clifford nodes. + + The rules are applied in the order they are defined. + + Returns + ------- + `tuple`\[`CliffordRule`, ...\] + Tuple of rules (check_func, action_func) before removing local clifford nodes. + If check_func(node) returns True, action_func(node) is executed. + Then, the removal of the local clifford node is performed if possible. + """ + return ( + (self._needs_lc, self.local_complement), + (self._is_trivial_meas, lambda _: None), + ( + self._needs_pivot, + lambda node: self.pivot( + node, + min(self.neighbors(node) - set(self.input_node_indices) - set(self.output_node_indices)), + ), + ), + ) + + def _update_connections( + self, rmv_edges: AbstractSet[tuple[int, int]], new_edges: AbstractSet[tuple[int, int]] + ) -> None: + r"""Update the physical edges of the graph state. + + Parameters + ---------- + rmv_edges : `collections.abc.Set`\[`tuple`\[`int`, `int`\]\] + edges to remove + new_edges : `collections.abc.Set`\[`tuple`\[`int`, `int`\]\] + edges to add + """ + for edge in rmv_edges: + self.remove_physical_edge(edge[0], edge[1]) + for edge in new_edges: + self.add_physical_edge(edge[0], edge[1]) + + def local_complement(self, node: int) -> None: + r"""Local complement operation on the graph state: G*u. + + Non-input node u gets Rx(pi/2) and its neighbors get Rz(-pi/2). + The edges between the neighbors of u are complemented. + + Parameters + ---------- + node : `int` + node index. The node must not be an input node. + + Raises + ------ + ValueError + If the node does not exist, is an input node, or the graph is not a ZX-diagram. + + Notes + ----- + Here we adopt the definition (lemma) of local complementation from [1]. + In some literature, local complementation is defined with Rx(-pi/2) on the target node + and Rz(pi/2) on the neighbors, which is strictly equivalent. + + References + ---------- + [1] Backens et al., Quantum 5, 421 (2021); arXiv:2003.01664v3 [quant-ph]. Lemma 2.31 and lemma 4.3 + """ + self._ensure_node_exists(node) + if node in self.input_node_indices: + msg = "Cannot apply local complement to input node." + raise ValueError(msg) + self._check_meas_basis() + + nbrs: set[int] = self.neighbors(node) + nbr_pairs: set[tuple[int, int]] = complete_graph_edges(nbrs) + new_edges = nbr_pairs - self.physical_edges + rmv_edges = self.physical_edges & nbr_pairs + + self._update_connections(rmv_edges, new_edges) + + # apply local clifford to node + lc = LocalClifford(0, np.pi / 2, 0) + old_meas_basis = self.meas_bases.get(node, None) + if old_meas_basis is None: + self.apply_local_clifford(node, lc) + else: + # assure gflow existence + plane_map: dict[Plane, Plane] = { + Plane.XY: Plane.XZ, + Plane.XZ: Plane.XY, + Plane.YZ: Plane.YZ, + } + self.apply_local_clifford(node, lc, expected_plane=plane_map[old_meas_basis.plane]) + + # apply local clifford to neighbors + lc = LocalClifford(-np.pi / 2, 0, 0) + plane_map = { + Plane.XY: Plane.XY, + Plane.XZ: Plane.YZ, + Plane.YZ: Plane.XZ, + } + for v in nbrs: + old_meas_basis = self.meas_bases.get(v, None) + if old_meas_basis is None: + self.apply_local_clifford(v, lc) + continue + + self.apply_local_clifford(v, lc, expected_plane=plane_map[old_meas_basis.plane]) + + def _pivot(self, node1: int, node2: int) -> None: + """Pivot edges around nodes u and v in the graph state. + + Parameters + ---------- + node1 : `int` + node index + node2 : `int` + node index + """ + node1_nbrs = self.neighbors(node1) - {node2} + node2_nbrs = self.neighbors(node2) - {node1} + nbr_a = node1_nbrs & node2_nbrs + nbr_b = node1_nbrs - node2_nbrs + nbr_c = node2_nbrs - node1_nbrs + nbr_pairs = [ + bipartite_edges(nbr_a, nbr_b), + bipartite_edges(nbr_a, nbr_c), + bipartite_edges(nbr_b, nbr_c), + ] + + # complement edges between nbr_a, nbr_b, nbr_c + rmv_edges: set[tuple[int, int]] = set() + rmv_edges.update(*(p & self.physical_edges for p in nbr_pairs)) + add_edges: set[tuple[int, int]] = set() + add_edges.update(*(p - self.physical_edges for p in nbr_pairs)) + self._update_connections(rmv_edges, add_edges) + + # swap node u and node v + for b in nbr_b: + self.remove_physical_edge(node1, b) + self.add_physical_edge(node2, b) + for c in nbr_c: + self.remove_physical_edge(node2, c) + self.add_physical_edge(node1, c) + + def pivot(self, node1: int, node2: int) -> None: + """Pivot operation on the graph state: G∧(uv) (= G*u*v*u = G*v*u*v) for neighboring nodes u and v. + + Parameters + ---------- + node1 : `int` + node index. The node must not be an input node. + node2 : `int` + node index. The node must not be an input node. + + Raises + ------ + ValueError + If the nodes are input nodes, or the graph is not a ZX-diagram. + + Notes + ----- + Here we adopt the definition (lemma) of pivot from [1]: + Rz(pi/2) Rx(pi/2) Rz(pi/2) on the target nodes, + Rz(pi) on the common neighbors of both target nodes. + + In some literature, pivot is defined as below:: + + Rz(pi/2) Rx(-pi/2) Rz(pi/2) on the target nodes, + Rz(pi) on all the neighbors of both target nodes (not including the target nodes). + + These definitions are equivalent. + + References + ---------- + [1] Backens et al., Quantum 5, 421 (2021); arXiv:2003.01664v3 [quant-ph]. Lemma 2.32 and lemma 4.5 + """ + self._ensure_node_exists(node1) + self._ensure_node_exists(node2) + if node1 in self.input_node_indices or node2 in self.input_node_indices: + msg = "Cannot apply pivot to input node" + raise ValueError(msg) + self._check_meas_basis() + common_nbrs = self.neighbors(node1) & self.neighbors(node2) + self._pivot(node1, node2) + + # update node1 and node2 measurement + plane_map: dict[Plane, Plane] = { + Plane.XY: Plane.YZ, + Plane.XZ: Plane.XZ, + Plane.YZ: Plane.XY, + } + lc = LocalClifford(np.pi / 2, np.pi / 2, np.pi / 2) + for a in {node1, node2}: + old_meas_basis = self.meas_bases.get(a, None) + if old_meas_basis is None: + self.apply_local_clifford(a, lc) + continue + + self.apply_local_clifford(a, lc, expected_plane=plane_map[old_meas_basis.plane]) + + # update nodes measurement of neighbors + plane_map = { + Plane.XY: Plane.XY, + Plane.XZ: Plane.XZ, + Plane.YZ: Plane.YZ, + } + lc = LocalClifford(np.pi, 0, 0) + for w in common_nbrs: + old_meas_basis = self.meas_bases.get(w, None) + if old_meas_basis is None: + self.apply_local_clifford(w, lc) + continue + + self.apply_local_clifford(w, lc, expected_plane=plane_map[old_meas_basis.plane]) + + def _is_trivial_meas(self, node: int, atol: float = 1e-9) -> bool: + """Check if the node does not need any operation in order to perform _remove_clifford. + + For this operation, the followings must hold: + measurement plane = YZ or XZ + measurement angle = 0 or pi (mod 2pi) + + Parameters + ---------- + node : `int` + node index + atol : `float`, optional + absolute tolerance, by default 1e-9 + + Returns + ------- + `bool` + True if the node is a removable Clifford node. + + References + ---------- + [1] Backens et al., Quantum 5, 421 (2021); arXiv:2003.01664v3 [quant-ph]. Lemma 4.7 + """ + alpha = self.meas_bases[node].angle % (2.0 * np.pi) + return (self.meas_bases[node].plane in {Plane.YZ, Plane.XZ}) and is_close_angle(2 * alpha, 0, atol) + + def _needs_lc(self, node: int, atol: float = 1e-9) -> bool: + """Check if the node needs a local complementation in order to perform _remove_clifford. + + For this operation, the followings must hold: + measurement plane = XY or YZ + measurement angle = 0.5 pi or 1.5 pi (mod 2pi) + + Parameters + ---------- + node : `int` + node index + atol : `float`, optional + absolute tolerance, by default 1e-9 + + Returns + ------- + `bool` + True if the node needs a local complementation. + + References + ---------- + [1] Backens et al., Quantum 5, 421 (2021); arXiv:2003.01664v3 [quant-ph]. Lemma 4.8 + """ + alpha = self.meas_bases[node].angle % (2.0 * np.pi) + return self.meas_bases[node].plane in {Plane.XY, Plane.YZ} and is_close_angle(2 * (alpha - np.pi / 2), 0, atol) + + def _needs_pivot(self, node: int, atol: float = 1e-9) -> bool: + """Check if the node needs a pivot operation in order to perform _remove_clifford. + + The pivot operation is performed on the node and its non-input neighbor. + For this operation, either of the following must hold: + (a) measurement plane = XY and measurement angle = 0 or pi (mod 2pi) + (b) measurement plane = XZ and measurement angle = 0.5 pi or 1.5 pi (mod 2pi) + + Parameters + ---------- + node : `int` + node index + atol : `float`, optional + absolute tolerance, by default 1e-9 + + Returns + ------- + `bool` + True if the node needs a pivot operation. + + References + ---------- + [1] Backens et al., Quantum 5, 421 (2021); arXiv:2003.01664v3 [quant-ph]. Lemma 4.9 + """ + non_input_nbrs = self.neighbors(node) - set(self.input_node_indices) - set(self.output_node_indices) + if not non_input_nbrs: + return False + + alpha = self.meas_bases[node].angle % (2.0 * np.pi) + # (a) measurement plane = XY and measurement angle = 0 or pi (mod 2pi) + case_a = self.meas_bases[node].plane == Plane.XY and is_close_angle(2 * alpha, 0, atol) + # (b) measurement plane = XZ and measurement angle = 0.5 pi or 1.5 pi (mod 2pi) + case_b = self.meas_bases[node].plane == Plane.XZ and is_close_angle(2 * (alpha - np.pi / 2), 0, atol) + return case_a or case_b + + def _remove_clifford(self, node: int, atol: float = 1e-9) -> None: + """Perform the Clifford node removal. + + Parameters + ---------- + node : `int` + node index + atol : `float`, optional + absolute tolerance, by default 1e-9 + + Raises + ------ + ValueError + If the node is not a Clifford node. + + References + ---------- + [1] Backens et al., Quantum 5, 421 (2021); arXiv:2003.01664v3 [quant-ph]. Lemma 4.7 + """ + a_pi = self.meas_bases[node].angle % (2.0 * np.pi) + if not is_close_angle(2 * a_pi, 0, atol): + msg = "This node cannot be removed by _remove_clifford." + raise ValueError(msg) + if self.meas_bases[node].plane not in {Plane.YZ, Plane.XZ}: + msg = "This node cannot be removed by _remove_clifford (plane must be YZ or XZ)." + raise ValueError(msg) + neighbors = self.neighbors(node) + self.remove_physical_node(node) + + lc = LocalClifford(a_pi, 0, 0) + plane_map = { + Plane.XY: Plane.XY, + Plane.XZ: Plane.XZ, + Plane.YZ: Plane.YZ, + } + for v in neighbors: + old_meas_basis = self.meas_bases.get(v, None) + if old_meas_basis is None: + self.apply_local_clifford(v, lc) + continue + + self.apply_local_clifford(v, lc, expected_plane=plane_map[old_meas_basis.plane]) + + def remove_clifford(self, node: int, atol: float = 1e-9) -> None: + """Remove the local Clifford node. + + Parameters + ---------- + node : `int` + node index + atol : `float`, optional + absolute tolerance, by default 1e-9 + + Raises + ------ + ValueError + 1. If the node is an input node. + 2. If the node is not a Clifford node. + 3. If all neighbors are input nodes + in some special cases ((meas_plane, meas_angle) = (XY, a pi), (XZ, a pi/2) for a = 0, 1). + 4. If the node has no neighbors that are not connected only to output nodes. + + References + ---------- + [1] Backens et al., Quantum 5, 421 (2021); arXiv:2003.01664v3 [quant-ph]. Theorem 4.12 + """ + self._ensure_node_exists(node) + if node in self.input_node_indices or node in self.output_node_indices: + msg = "Clifford node removal not allowed for input or output nodes." + raise ValueError(msg) + + if not ( + is_clifford_angle(self.meas_bases[node].angle, atol) + and self.meas_bases[node].plane in {Plane.XY, Plane.XZ, Plane.YZ} + ): + msg = "This node is not a Clifford node." + raise ValueError(msg) + + for check, action in self._clifford_rules: + if not check(node, atol): + continue + action(node) + if node in self.input_node_indices or node in self.output_node_indices: + msg = "Clifford node removal not allowed for input or output nodes." + raise ValueError(msg) + self._remove_clifford(node, atol) + return + + msg = "This Clifford node is unremovable." + raise ValueError(msg) + + def is_removable_clifford(self, node: int, atol: float = 1e-9) -> bool: + """Check if the node is a removable Clifford node. + + Parameters + ---------- + node : `int` + node index + atol : `float`, optional + absolute tolerance, by default 1e-9 + + Returns + ------- + `bool` + True if the node is a removable Clifford node. + """ + return any( + [ + self._is_trivial_meas(node, atol), + self._needs_lc(node, atol), + self._needs_pivot(node, atol), + ] + ) + + def remove_cliffords(self, atol: float = 1e-9) -> None: + """Remove all local clifford nodes which are removable. + + Parameters + ---------- + atol : `float`, optional + absolute tolerance, by default 1e-9 + """ + self._check_meas_basis() + while any( + self.is_removable_clifford(n, atol) + for n in (self.physical_nodes - set(self.input_node_indices) - set(self.output_node_indices)) + ): + for check, action in self._clifford_rules: + while True: + candidates = self.physical_nodes - set(self.input_node_indices) - set(self.output_node_indices) + clifford_node = next((node for node in candidates if check(node, atol)), None) + if clifford_node is None: + break + action(clifford_node) + self._remove_clifford(clifford_node, atol) + + def _extract_yz_adjacent_pair(self) -> tuple[int, int] | None: + r"""Call inside convert_to_phase_gadget. + + Find a pair of adjacent nodes that are both measured in the YZ-plane. + + Returns + ------- + `tuple`\[`int`, `int`\] | `None` + A pair of adjacent nodes that are both measured in the YZ-plane, or None if no such pair exists. + """ + yz_nodes = {node for node, basis in self.meas_bases.items() if basis.plane == Plane.YZ} + for u, v in self.physical_edges: + if u in yz_nodes and v in yz_nodes: + return (min(u, v), max(u, v)) + return None + + def _extract_xz_node(self) -> int | None: + """Call inside convert_to_phase_gadget. + + Find a node that is measured in the XZ-plane. + + Returns + ------- + `int` | `None` + A node that is measured in the XZ-plane, or None if no such node exists. + """ + for node, basis in self.meas_bases.items(): + if basis.plane == Plane.XZ: + return node + return None + + def convert_to_phase_gadget(self) -> None: + """Convert a ZX-diagram with gflow in MBQC+LC form into its phase-gadget form while preserving gflow.""" + while True: + if pair := self._extract_yz_adjacent_pair(): + self.pivot(*pair) + continue + if u := self._extract_xz_node(): + self.local_complement(u) + continue + break + + def merge_yz_to_xy(self) -> None: + """Merge YZ-measured nodes that have only one neighbor with an XY-measured node. + + If a node u is measured in the YZ-plane and u has only one neighbor v with a XY-measurement, + then the node u can be merged into the node v. + """ + target_candidates = { + u for u, basis in self.meas_bases.items() if (basis.plane == Plane.YZ and len(self.neighbors(u)) == 1) + } + target_nodes = { + u + for u in target_candidates + if ( + (v := next(iter(self.neighbors(u)))) + and (mb := self.meas_bases.get(v, None)) is not None + and mb.plane == Plane.XY + ) + } + for u in target_nodes: + (v,) = self.neighbors(u) + new_angle = (self.meas_bases[u].angle - self.meas_bases[v].angle) % (2.0 * np.pi) + self.assign_meas_basis(v, PlannerMeasBasis(Plane.XY, -new_angle)) + self.remove_physical_node(u) + + def merge_yz_nodes(self) -> None: + """Merge isolated YZ-measured nodes into a single node. + + If u, v nodes are measured in the YZ-plane and u, v have the same neighbors, + then u, v can be merged into a single node. + """ + min_nodes = 2 + yz_nodes = {u for u, basis in self.meas_bases.items() if basis.plane == Plane.YZ} + if len(yz_nodes) < min_nodes: + return + neighbor_groups: dict[frozenset[int], list[int]] = defaultdict(list) + for u in yz_nodes: + neighbors = frozenset(self.neighbors(u)) + neighbor_groups[neighbors].append(u) + + for neighbors, nodes in neighbor_groups.items(): + if len(nodes) < min_nodes or len(neighbors) < min_nodes: + continue + new_angle = sum(self.meas_bases[v].angle for v in nodes) % (2.0 * np.pi) + self.assign_meas_basis(nodes[0], PlannerMeasBasis(Plane.YZ, new_angle)) + for v in nodes[1:]: + self.remove_physical_node(v) + + def full_reduce(self, atol: float = 1e-9) -> None: + """Reduce all Clifford nodes and some non-Clifford nodes. + + Repeat the following steps until there are no non-Clifford nodes: + 1. remove_cliffords + 2. convert_to_phase_gadget + 3. merge_yz_to_xy + 4. merge_yz_nodes + 5. if there are some removable Clifford nodes, back to step 1. + + Parameters + ---------- + atol : `float`, optional + absolute tolerance, by default 1e-9 + """ + while True: + self.remove_cliffords(atol) + self.convert_to_phase_gadget() + self.merge_yz_to_xy() + self.merge_yz_nodes() + if not any( + self.is_removable_clifford(node, atol) + for node in self.physical_nodes - set(self.input_node_indices) - set(self.output_node_indices) + ): + break + + def expand_local_cliffords(self) -> ExpansionMaps: + r"""Expand local Clifford operators applied on the input and output nodes. + + Returns + ------- + `ExpansionMaps` + A tuple of dictionaries mapping input and output node indices to the new node indices created. + """ + input_node_map = self._expand_input_local_cliffords() + output_node_map = self._expand_output_local_cliffords() + return ExpansionMaps(input_node_map, output_node_map) + + def _expand_input_local_cliffords(self) -> dict[int, InputLocalCliffordExpansion]: + r"""Expand local Clifford operators applied on the input nodes. + + Returns + ------- + `dict`\[`int`, `InputLocalCliffordExpansion`\] + A dictionary mapping input node indices to the new node indices created. + """ + node_index_addition_map: dict[int, InputLocalCliffordExpansion] = {} + for old_input in self.input_node_indices: + lc = self._pop_local_clifford(old_input) + if lc is None: + continue + + new_input = self.add_physical_node() + new_node = self.add_physical_node() + + self.add_physical_edge(new_input, new_node) + self.add_physical_edge(new_node, old_input) + + self.replace_input(old_input, new_input) + + self.assign_meas_basis(new_input, PlannerMeasBasis(Plane.XY, -lc.gamma)) + self.assign_meas_basis(new_node, PlannerMeasBasis(Plane.XY, -lc.beta)) + + if is_close_angle(2 * lc.alpha, 0.0): + plane_map = { + Plane.XY: Plane.XY, + Plane.XZ: Plane.XZ, + Plane.YZ: Plane.YZ, + } + else: + plane_map = { + Plane.XY: Plane.XY, + Plane.XZ: Plane.YZ, + Plane.YZ: Plane.XZ, + } + meas_basis = self.meas_bases[old_input] + meas_basis = update_lc_basis( + LocalClifford(lc.alpha, 0.0, 0.0), + meas_basis, + expected_plane=plane_map[meas_basis.plane], + ) + self.assign_meas_basis(old_input, meas_basis) + node_index_addition_map[old_input] = InputLocalCliffordExpansion(new_input, new_node) + + return node_index_addition_map + + def _expand_output_local_cliffords(self) -> dict[int, OutputLocalCliffordExpansion]: + r"""Expand local Clifford operators applied on the output nodes. + + Returns + ------- + `dict`\[`int`, `OutputLocalCliffordExpansion`\] + A dictionary mapping output node indices to the new node indices created. + """ + node_index_addition_map: dict[int, OutputLocalCliffordExpansion] = {} + for old_output_node in self.output_node_indices: + lc = self._pop_local_clifford(old_output_node) + if lc is None: + continue + + new_node1 = self.add_physical_node() + new_node2 = self.add_physical_node() + new_node3 = self.add_physical_node() + new_output_node = self.add_physical_node() + + self.add_physical_edge(old_output_node, new_node1) + self.add_physical_edge(new_node1, new_node2) + self.add_physical_edge(new_node2, new_node3) + self.add_physical_edge(new_node3, new_output_node) + + self.replace_output(old_output_node, new_output_node) + + self.assign_meas_basis(old_output_node, PlannerMeasBasis(Plane.XY, -lc.alpha)) + self.assign_meas_basis(new_node1, PlannerMeasBasis(Plane.XY, -lc.beta)) + self.assign_meas_basis(new_node2, PlannerMeasBasis(Plane.XY, -lc.gamma)) + self.assign_meas_basis(new_node3, PlannerMeasBasis(Plane.XY, 0.0)) + + node_index_addition_map[old_output_node] = OutputLocalCliffordExpansion( + new_node1, new_node2, new_node3, new_output_node + ) + + return node_index_addition_map + + +def complete_graph_edges(nodes: Iterable[int]) -> set[tuple[int, int]]: + r"""Return a set of edges for the complete graph on the given nodes. + + Parameters + ---------- + nodes : `Iterable`\[`int`\] + nodes + + Returns + ------- + `set`\[`tuple`\[`int`, `int`\]\] + edges of the complete graph + """ + return {(min(u, v), max(u, v)) for u, v in combinations(nodes, 2)} + + +class InputLocalCliffordExpansion(NamedTuple): + """Local Clifford expansion map.""" + + node1: int + node2: int + + +class OutputLocalCliffordExpansion(NamedTuple): + """Output local Clifford expansion map.""" + + node1: int + node2: int + node3: int + output_node: int + + +class ExpansionMaps(NamedTuple): + """Expansion maps for inputs and outputs with Local Clifford.""" + + input_node_map: dict[int, InputLocalCliffordExpansion] + output_node_map: dict[int, OutputLocalCliffordExpansion] diff --git a/requirements.txt b/requirements.txt index 59ee09d07..2316d62c6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,5 @@ numpy>=1.22,<3 matplotlib networkx ortools>=9,<10 +swiflow typing_extensions diff --git a/tests/test_euler.py b/tests/test_euler.py index fb3614998..fe99a4258 100644 --- a/tests/test_euler.py +++ b/tests/test_euler.py @@ -9,6 +9,7 @@ from graphqomb.euler import ( LocalClifford, LocalUnitary, + _meas_basis_candidates, bloch_sphere_coordinates, euler_decomposition, meas_basis_info, @@ -91,6 +92,37 @@ def test_bloch_sphere_coordinates_corner(plane: Plane, angle: float) -> None: assert np.isclose(inner_product, 1) +DEGENERATE_CASES: list[tuple[Plane, float, Plane, float]] = [ + (Plane.XZ, 0.0, Plane.YZ, 0.0), + (Plane.XZ, np.pi, Plane.YZ, np.pi), + (Plane.XZ, 0.5 * np.pi, Plane.XY, 0.0), + (Plane.XZ, 1.5 * np.pi, Plane.XY, np.pi), + (Plane.YZ, 0.5 * np.pi, Plane.XY, 0.5 * np.pi), + (Plane.YZ, 1.5 * np.pi, Plane.XY, 1.5 * np.pi), +] + + +def test_equivalence() -> None: + for plane1, angle1, plane2, angle2 in DEGENERATE_CASES: + basis1 = meas_basis(plane1, angle1) + basis2 = meas_basis(plane2, angle2) + inner_product = abs(np.vdot(basis1, basis2)) + assert np.isclose(inner_product, 1) + + +@pytest.mark.parametrize("case", DEGENERATE_CASES) +def test_meas_basis_candidates(case: tuple[Plane, float, Plane, float]) -> None: + plane1, angle1, plane2, angle2 = case + basis = meas_basis(plane1, angle1) + candidates = _meas_basis_candidates(basis) + expected_candidates = [(plane1, angle1), (plane2, angle2)] + assert len(candidates) == len(expected_candidates) + for expected in expected_candidates: + assert any( + candidate[0] == expected[0] and is_close_angle(candidate[1], expected[1]) for candidate in candidates + ) + + @pytest.mark.parametrize("plane", list(Plane)) def test_meas_basis_info(plane: Plane, rng: np.random.Generator) -> None: angle = rng.uniform(0, 2 * np.pi) @@ -100,6 +132,17 @@ def test_meas_basis_info(plane: Plane, rng: np.random.Generator) -> None: assert is_close_angle(angle, angle_get), f"Expected {angle}, got {angle_get}" +@pytest.mark.parametrize("case", DEGENERATE_CASES) +def test_meas_basis_info_degenerate(case: tuple[Plane, float, Plane, float]) -> None: + plane1, angle1, plane2, angle2 = case + basis = meas_basis(plane1, angle1) + + for expected_plane, expected_angle in ((plane1, angle1), (plane2, angle2)): + plane_get, angle_get = meas_basis_info(basis, expected_plane=expected_plane) + assert expected_plane == plane_get + assert is_close_angle(expected_angle, angle_get) + + def test_local_clifford(random_clifford_angles: tuple[float, float, float]) -> None: lc = LocalClifford(*random_clifford_angles) assert is_unitary(lc.matrix()) @@ -130,8 +173,8 @@ def test_lc_basis_update( angle = rng.uniform(0, 2 * np.pi) basis = PlannerMeasBasis(plane, angle) basis_updated = update_lc_basis(lc, basis) - ref_updated_basis = lc.matrix() @ basis.vector() - inner_product = abs(np.vdot(basis_updated.vector(), ref_updated_basis)) + ref_updated_vector = lc.conjugate().matrix() @ basis.vector() + inner_product = abs(np.vdot(basis_updated.vector(), ref_updated_vector)) assert np.isclose(inner_product, 1) @@ -140,17 +183,16 @@ def test_local_complement_target_update(plane: Plane, rng: np.random.Generator) lc = LocalClifford(0, np.pi / 2, 0) measurement_action: dict[Plane, tuple[Plane, Callable[[float], float]]] = { Plane.XY: (Plane.XZ, lambda angle: angle + np.pi / 2), - Plane.XZ: (Plane.XY, lambda angle: np.pi / 2 - angle), + Plane.XZ: (Plane.XY, lambda angle: -angle + np.pi / 2), Plane.YZ: (Plane.YZ, lambda angle: angle + np.pi / 2), } angle = rng.random() * 2 * np.pi meas_basis = PlannerMeasBasis(plane, angle) - result_basis = update_lc_basis(lc.conjugate(), meas_basis) + result_basis = update_lc_basis(lc, meas_basis) ref_plane, ref_angle_func = measurement_action[plane] ref_angle = ref_angle_func(angle) - assert result_basis.plane == ref_plane assert is_close_angle(result_basis.angle, ref_angle) @@ -167,9 +209,80 @@ def test_local_complement_neighbors(plane: Plane, rng: np.random.Generator) -> N angle = rng.random() * 2 * np.pi meas_basis = PlannerMeasBasis(plane, angle) - result_basis = update_lc_basis(lc.conjugate(), meas_basis) + result_basis = update_lc_basis(lc, meas_basis) + ref_plane, ref_angle_func = measurement_action[plane] + ref_angle = ref_angle_func(angle) + + assert result_basis.plane == ref_plane + assert is_close_angle(result_basis.angle, ref_angle) + + +@pytest.mark.parametrize("plane", list(Plane)) +def test_pivot_target_update(plane: Plane, rng: np.random.Generator) -> None: + lc = LocalClifford(np.pi / 2, np.pi / 2, np.pi / 2) + measurement_action: dict[Plane, tuple[Plane, Callable[[float], float]]] = { + Plane.XY: (Plane.YZ, lambda angle: -1 * angle), + Plane.XZ: (Plane.XZ, lambda angle: (np.pi / 2 - angle)), + Plane.YZ: (Plane.XY, lambda angle: -1 * angle), + } + + angle = rng.random() * 2 * np.pi + + meas_basis = PlannerMeasBasis(plane, angle) + result_basis = update_lc_basis(lc, meas_basis) ref_plane, ref_angle_func = measurement_action[plane] ref_angle = ref_angle_func(angle) assert result_basis.plane == ref_plane assert is_close_angle(result_basis.angle, ref_angle) + + +@pytest.mark.parametrize("plane", list(Plane)) +def test_pivot_neighbors(plane: Plane, rng: np.random.Generator) -> None: + lc = LocalClifford(np.pi, 0, 0) + measurement_action: dict[Plane, tuple[Plane, Callable[[float], float]]] = { + Plane.XY: (Plane.XY, lambda angle: (angle + np.pi) % (2.0 * np.pi)), + Plane.XZ: (Plane.XZ, lambda angle: -1 * angle), + Plane.YZ: (Plane.YZ, lambda angle: -1 * angle), + } + + angle = rng.random() * 2 * np.pi + + meas_basis = PlannerMeasBasis(plane, angle) + result_basis = update_lc_basis(lc, meas_basis) + ref_plane, ref_angle_func = measurement_action[plane] + ref_angle = ref_angle_func(angle) + + assert result_basis.plane == ref_plane + assert is_close_angle(result_basis.angle, ref_angle) + + +@pytest.mark.parametrize("plane", list(Plane)) +def test_remove_clifford_update(plane: Plane, rng: np.random.Generator) -> None: + measurement_action: dict[Plane, tuple[Plane, Callable[[float, float], float]]] = { + Plane.XY: ( + Plane.XY, + lambda a_pi, alpha: (alpha if is_close_angle(a_pi, 0) else alpha + np.pi) % (2.0 * np.pi), + ), + Plane.XZ: ( + Plane.XZ, + lambda a_pi, alpha: (alpha if is_close_angle(a_pi, 0) else -alpha) % (2.0 * np.pi), + ), + Plane.YZ: ( + Plane.YZ, + lambda a_pi, alpha: (alpha if is_close_angle(a_pi, 0) else -alpha) % (2.0 * np.pi), + ), + } + + angle = rng.random() * 2 * np.pi + + a_pi = np.pi + for a_pi in (0.0, np.pi): + lc = LocalClifford(a_pi, 0, 0) + meas_basis = PlannerMeasBasis(plane, angle) + result_basis = update_lc_basis(lc, meas_basis) + ref_plane, ref_angle_func = measurement_action[plane] + ref_angle = ref_angle_func(a_pi, angle) + + assert result_basis.plane == ref_plane + assert is_close_angle(result_basis.angle, ref_angle) diff --git a/tests/test_graphstate.py b/tests/test_graphstate.py index e649ff916..a8430833a 100644 --- a/tests/test_graphstate.py +++ b/tests/test_graphstate.py @@ -6,7 +6,6 @@ import pytest from graphqomb.common import Plane, PlannerMeasBasis -from graphqomb.euler import LocalClifford from graphqomb.graphstate import GraphState, bipartite_edges, odd_neighbors @@ -207,24 +206,6 @@ def test_check_canonical_form_input_output_mismatch(canonical_graph: GraphState) canonical_graph.check_canonical_form() -def test_check_canonical_form_with_local_clifford_false(canonical_graph: GraphState) -> None: - """Test if the graph is in canonical form with local Clifford operator.""" - local_clifford = LocalClifford() - in_node = next(iter(canonical_graph.input_node_indices)) - canonical_graph.apply_local_clifford(in_node, local_clifford) - with pytest.raises(ValueError, match="Clifford operators are applied"): - canonical_graph.check_canonical_form() - - -def test_check_canonical_form_with_local_clifford_expansion_true(canonical_graph: GraphState) -> None: - """Test if the graph is in canonical form with local Clifford operator expansion.""" - local_clifford = LocalClifford() - in_node = next(iter(canonical_graph.input_node_indices)) - canonical_graph.apply_local_clifford(in_node, local_clifford) - canonical_graph.expand_local_cliffords() - canonical_graph.check_canonical_form() # Should not raise an exception - - def test_check_canonical_form_missing_meas_basis_false(canonical_graph: GraphState) -> None: """Test if the graph is in canonical form with missing measurement basis.""" _ = canonical_graph.add_physical_node() diff --git a/tests/test_zxgraphstate.py b/tests/test_zxgraphstate.py new file mode 100644 index 000000000..6351c8cda --- /dev/null +++ b/tests/test_zxgraphstate.py @@ -0,0 +1,1238 @@ +"""Tests for ZXGraphState + +Measurement actions for the followings are used: + - Local complement (LC): MEAS_ACTION_LC_* + - Pivot (PV): MEAS_ACTION_PV_* + - Remove Cliffords (RC): MEAS_ACTION_RC + +Reference: + M. Backens et al., Quantum 5, 421 (2021). + https://doi.org/10.22331/q-2021-03-25-421 +""" + +from __future__ import annotations + +import itertools +import operator +from copy import deepcopy +from typing import TYPE_CHECKING + +import numpy as np +import pytest + +from graphqomb.common import Plane, PlannerMeasBasis, is_close_angle +from graphqomb.euler import LocalClifford +from graphqomb.gflow_utils import gflow_wrapper +from graphqomb.qompiler import qompile +from graphqomb.random_objects import generate_random_flow_graph +from graphqomb.simulator import PatternSimulator, SimulatorBackend +from graphqomb.zxgraphstate import ZXGraphState + +if TYPE_CHECKING: + from collections.abc import Sequence + from typing import Callable + + Measurements = list[tuple[int, PlannerMeasBasis]] + +MEAS_ACTION_LC_TARGET: dict[Plane, tuple[Plane, Callable[[float], float]]] = { + Plane.XY: (Plane.XZ, lambda angle: angle + np.pi / 2), + Plane.XZ: (Plane.XY, lambda angle: -angle + np.pi / 2), + Plane.YZ: (Plane.YZ, lambda angle: angle + np.pi / 2), +} +MEAS_ACTION_LC_NEIGHBORS: dict[Plane, tuple[Plane, Callable[[float], float]]] = { + Plane.XY: (Plane.XY, lambda angle: angle + np.pi / 2), + Plane.XZ: (Plane.YZ, lambda angle: angle), + Plane.YZ: (Plane.XZ, operator.neg), +} +MEAS_ACTION_PV_TARGET: dict[Plane, tuple[Plane, Callable[[float], float]]] = { + Plane.XY: (Plane.YZ, operator.neg), + Plane.XZ: (Plane.XZ, lambda angle: (np.pi / 2 - angle)), + Plane.YZ: (Plane.XY, operator.neg), +} +MEAS_ACTION_PV_NEIGHBORS: dict[Plane, tuple[Plane, Callable[[float], float]]] = { + Plane.XY: (Plane.XY, lambda angle: (angle + np.pi) % (2.0 * np.pi)), + Plane.XZ: (Plane.XZ, operator.neg), + Plane.YZ: (Plane.YZ, operator.neg), +} +ATOL = 1e-9 +MEAS_ACTION_RC: dict[Plane, tuple[Plane, Callable[[float, float], float]]] = { + Plane.XY: ( + Plane.XY, + lambda a_pi, alpha: (alpha if is_close_angle(a_pi, 0, ATOL) else alpha + np.pi) % (2.0 * np.pi), + ), + Plane.XZ: ( + Plane.XZ, + lambda a_pi, alpha: (alpha if is_close_angle(a_pi, 0, ATOL) else -alpha) % (2.0 * np.pi), + ), + Plane.YZ: ( + Plane.YZ, + lambda a_pi, alpha: (alpha if is_close_angle(a_pi, 0, ATOL) else -alpha) % (2.0 * np.pi), + ), +} + + +def plane_combinations(n: int) -> list[tuple[Plane, ...]]: + """Generate all combinations of planes of length n. + + Parameters + ---------- + n : int + The length of the combinations. n > 1. + + Returns + ------- + list[tuple[Plane, ...]] + A list of tuples containing all combinations of planes of length n. + """ + return list(itertools.product(Plane, repeat=n)) + + +@pytest.fixture +def rng() -> np.random.Generator: + return np.random.default_rng() + + +@pytest.fixture +def canonical_zxgraph() -> ZXGraphState: + graph = ZXGraphState() + in_node = graph.add_physical_node() + out_node = graph.add_physical_node() + + q_idx = 0 + graph.register_input(in_node, q_idx) + graph.register_output(out_node, q_idx) + graph.assign_meas_basis(in_node, PlannerMeasBasis(Plane.XY, 0.5 * np.pi)) + return graph + + +@pytest.fixture +def zx_graph() -> ZXGraphState: + """Generate an empty ZXGraphState object. + + Returns + ------- + ZXGraphState: An empty ZXGraphState object. + """ + return ZXGraphState() + + +def _initialize_graph( + zx_graph: ZXGraphState, + nodes: range, + edges: set[tuple[int, int]], + inputs: Sequence[int] = (), + outputs: Sequence[int] = (), +) -> None: + """Initialize a ZXGraphState object with the given nodes and edges. + + Parameters + ---------- + zx_graph : ZXGraphState + The ZXGraphState object to initialize. + nodes : range + nodes to add to the graph. + edges : list[tuple[int, int]] + edges to add to the graph. + inputs : Sequence[int], optional + input nodes, by default (). + outputs : Sequence[int], optional + output nodes, by default (). + + Raises + ------ + ValueError + If the number of output nodes is greater than the number of input nodes. + """ + for _ in nodes: + zx_graph.add_physical_node() + for i, j in edges: + zx_graph.add_physical_edge(i, j) + + node2q_index: dict[int, int] = {} + q_indices: list[int] = [] + + for i, node in enumerate(inputs): + zx_graph.register_input(node, q_index=i) + node2q_index[node] = i + q_indices.append(i) + + if len(outputs) > len(q_indices): + msg = "Cannot assign valid q_index to all output nodes." + raise ValueError(msg) + + for i, node in enumerate(outputs): + q_index = node2q_index.get(node, q_indices[i]) + zx_graph.register_output(node, q_index) + + +def _apply_measurements(zx_graph: ZXGraphState, measurements: Measurements) -> None: + for node_id, planner_meas_basis in measurements: + if node_id in zx_graph.output_node_indices: + continue + zx_graph.assign_meas_basis(node_id, planner_meas_basis) + + +def _test( + zx_graph: ZXGraphState, + exp_nodes: set[int], + exp_edges: set[tuple[int, int]], + exp_measurements: Measurements, +) -> None: + assert zx_graph.physical_nodes == exp_nodes + assert zx_graph.physical_edges == exp_edges + for node_id, planner_meas_basis in exp_measurements: + assert zx_graph.meas_bases[node_id].plane == planner_meas_basis.plane + assert is_close_angle(zx_graph.meas_bases[node_id].angle, planner_meas_basis.angle) + + +def test_check_canonical_form_with_local_clifford_false(canonical_zxgraph: ZXGraphState) -> None: + """Test if the graph is in canonical form with local Clifford operator.""" + local_clifford = LocalClifford() + in_node = next(iter(canonical_zxgraph.input_node_indices)) + canonical_zxgraph.apply_local_clifford(in_node, local_clifford) + with pytest.raises(ValueError, match="Clifford operators are applied"): + canonical_zxgraph.check_canonical_form() + + +def test_check_canonical_form_with_local_clifford_expansion_true(canonical_zxgraph: ZXGraphState) -> None: + """Test if the graph is in canonical form with local Clifford operator expansion.""" + local_clifford = LocalClifford() + in_node = next(iter(canonical_zxgraph.input_node_indices)) + canonical_zxgraph.apply_local_clifford(in_node, local_clifford) + canonical_zxgraph.expand_local_cliffords() + canonical_zxgraph.check_canonical_form() # Should not raise an exception + + +def test_apply_local_clifford_to_planner_meas_basis_xy_1(zx_graph: ZXGraphState, rng: np.random.Generator) -> None: + """Test apply_local_clifford correctly updates the PlannerMeasBasis(XY, angle).""" + node = zx_graph.add_physical_node() + ref_zx_graph = deepcopy(zx_graph) + + angle = rng.random() * 2 * np.pi + zx_graph.assign_meas_basis(node, PlannerMeasBasis(Plane.XY, angle)) + lc = LocalClifford(-np.pi / 2, 0.0, 0.0) + zx_graph.apply_local_clifford(node, lc) + meas_vector = zx_graph.meas_bases[node].vector() + + ref_meas_basis = PlannerMeasBasis(Plane.XY, angle + np.pi / 2) + ref_zx_graph.assign_meas_basis(node, ref_meas_basis) + ref_meas_vector = ref_zx_graph.meas_bases[node].vector() + assert np.isclose(abs(np.vdot(meas_vector, ref_meas_vector)), 1) + assert zx_graph.meas_bases[node].plane == ref_meas_basis.plane + assert is_close_angle(zx_graph.meas_bases[node].angle, ref_meas_basis.angle) + + +def test_apply_local_clifford_to_planner_meas_basis_xy_2(zx_graph: ZXGraphState, rng: np.random.Generator) -> None: + """Test apply_local_clifford correctly updates the PlannerMeasBasis(XY, angle).""" + node = zx_graph.add_physical_node() + ref_zx_graph = deepcopy(zx_graph) + + angle = rng.random() * 2 * np.pi + zx_graph.assign_meas_basis(node, PlannerMeasBasis(Plane.XY, angle)) + lc = LocalClifford(0.0, np.pi / 2, 0.0) + zx_graph.apply_local_clifford(node, lc) + meas_vector = zx_graph.meas_bases[node].vector() + + ref_meas_basis = PlannerMeasBasis(Plane.XZ, angle + np.pi / 2) + ref_zx_graph.assign_meas_basis(node, ref_meas_basis) + ref_meas_vector = ref_zx_graph.meas_bases[node].vector() + assert np.isclose(abs(np.vdot(meas_vector, ref_meas_vector)), 1) + assert zx_graph.meas_bases[node].plane == ref_meas_basis.plane + assert is_close_angle(zx_graph.meas_bases[node].angle, ref_meas_basis.angle) + + +def test_apply_local_clifford_to_planner_meas_basis_xy_3(zx_graph: ZXGraphState, rng: np.random.Generator) -> None: + """Test apply_local_clifford correctly updates the PlannerMeasBasis(XY, angle).""" + node = zx_graph.add_physical_node() + ref_zx_graph = deepcopy(zx_graph) + + angle = rng.random() * 2 * np.pi + zx_graph.assign_meas_basis(node, PlannerMeasBasis(Plane.XY, angle)) + lc = LocalClifford(np.pi / 2, np.pi / 2, np.pi / 2) + zx_graph.apply_local_clifford(node, lc) + meas_vector = zx_graph.meas_bases[node].vector() + + ref_meas_basis = PlannerMeasBasis(Plane.YZ, -angle) + ref_zx_graph.assign_meas_basis(node, ref_meas_basis) + ref_meas_vector = ref_zx_graph.meas_bases[node].vector() + assert np.isclose(abs(np.vdot(meas_vector, ref_meas_vector)), 1) + assert zx_graph.meas_bases[node].plane == ref_meas_basis.plane + assert is_close_angle(zx_graph.meas_bases[node].angle, ref_meas_basis.angle) + + +def test_apply_local_clifford_to_planner_meas_basis_xz_1(zx_graph: ZXGraphState, rng: np.random.Generator) -> None: + """Test apply_local_clifford correctly updates the PlannerMeasBasis(XZ, angle).""" + node = zx_graph.add_physical_node() + ref_zx_graph = deepcopy(zx_graph) + + angle = rng.random() * 2 * np.pi + zx_graph.assign_meas_basis(node, PlannerMeasBasis(Plane.XZ, angle)) + lc = LocalClifford(-np.pi / 2, 0.0, 0.0) + zx_graph.apply_local_clifford(node, lc) + meas_vector = zx_graph.meas_bases[node].vector() + + ref_meas_basis = PlannerMeasBasis(Plane.YZ, angle) + ref_zx_graph.assign_meas_basis(node, ref_meas_basis) + ref_meas_vector = ref_zx_graph.meas_bases[node].vector() + assert np.isclose(abs(np.vdot(meas_vector, ref_meas_vector)), 1) + assert zx_graph.meas_bases[node].plane == ref_meas_basis.plane + assert is_close_angle(zx_graph.meas_bases[node].angle, ref_meas_basis.angle) + + +def test_apply_local_clifford_to_planner_meas_basis_xz_2(zx_graph: ZXGraphState, rng: np.random.Generator) -> None: + """Test apply_local_clifford correctly updates the PlannerMeasBasis(XZ, angle).""" + node = zx_graph.add_physical_node() + ref_zx_graph = deepcopy(zx_graph) + + angle = rng.random() * 2 * np.pi + zx_graph.assign_meas_basis(node, PlannerMeasBasis(Plane.XZ, angle)) + lc = LocalClifford(0.0, np.pi / 2, 0.0) + zx_graph.apply_local_clifford(node, lc) + meas_vector = zx_graph.meas_bases[node].vector() + + ref_meas_basis = PlannerMeasBasis(Plane.XY, -angle + np.pi / 2) + ref_zx_graph.assign_meas_basis(node, ref_meas_basis) + ref_meas_vector = ref_zx_graph.meas_bases[node].vector() + assert np.isclose(abs(np.vdot(meas_vector, ref_meas_vector)), 1) + assert zx_graph.meas_bases[node].plane == ref_meas_basis.plane + assert is_close_angle(zx_graph.meas_bases[node].angle, ref_meas_basis.angle) + + +def test_apply_local_clifford_to_planner_meas_basis_xz_3(zx_graph: ZXGraphState, rng: np.random.Generator) -> None: + """Test apply_local_clifford correctly updates the PlannerMeasBasis(XZ, angle).""" + node = zx_graph.add_physical_node() + ref_zx_graph = deepcopy(zx_graph) + + angle = rng.random() * 2 * np.pi + zx_graph.assign_meas_basis(node, PlannerMeasBasis(Plane.XZ, angle)) + lc = LocalClifford(np.pi / 2, np.pi / 2, np.pi / 2) + zx_graph.apply_local_clifford(node, lc) + meas_vector = zx_graph.meas_bases[node].vector() + + ref_meas_basis = PlannerMeasBasis(Plane.XZ, -angle + np.pi / 2) + ref_zx_graph.assign_meas_basis(node, ref_meas_basis) + ref_meas_vector = ref_zx_graph.meas_bases[node].vector() + assert np.isclose(abs(np.vdot(meas_vector, ref_meas_vector)), 1) + assert zx_graph.meas_bases[node].plane == ref_meas_basis.plane + assert is_close_angle(zx_graph.meas_bases[node].angle, ref_meas_basis.angle) + + +def test_apply_local_clifford_to_planner_meas_basis_yz_1(zx_graph: ZXGraphState, rng: np.random.Generator) -> None: + """Test apply_local_clifford correctly updates the PlannerMeasBasis(YZ, angle).""" + node = zx_graph.add_physical_node() + ref_zx_graph = deepcopy(zx_graph) + + angle = rng.random() * 2 * np.pi + zx_graph.assign_meas_basis(node, PlannerMeasBasis(Plane.YZ, angle)) + lc = LocalClifford(-np.pi / 2, 0.0, 0.0) + zx_graph.apply_local_clifford(node, lc) + meas_vector = zx_graph.meas_bases[node].vector() + + ref_meas_basis = PlannerMeasBasis(Plane.XZ, -angle) + ref_zx_graph.assign_meas_basis(node, ref_meas_basis) + ref_meas_vector = ref_zx_graph.meas_bases[node].vector() + assert np.isclose(abs(np.vdot(meas_vector, ref_meas_vector)), 1) + assert zx_graph.meas_bases[node].plane == ref_meas_basis.plane + assert is_close_angle(zx_graph.meas_bases[node].angle, ref_meas_basis.angle) + + +def test_apply_local_clifford_to_planner_meas_basis_yz_2(zx_graph: ZXGraphState, rng: np.random.Generator) -> None: + """Test apply_local_clifford correctly updates the PlannerMeasBasis(YZ, angle).""" + node = zx_graph.add_physical_node() + ref_zx_graph = deepcopy(zx_graph) + + angle = rng.random() * 2 * np.pi + zx_graph.assign_meas_basis(node, PlannerMeasBasis(Plane.YZ, angle)) + lc = LocalClifford(0.0, np.pi / 2, 0.0) + zx_graph.apply_local_clifford(node, lc) + meas_vector = zx_graph.meas_bases[node].vector() + + ref_meas_basis = PlannerMeasBasis(Plane.YZ, angle + np.pi / 2) + ref_zx_graph.assign_meas_basis(node, ref_meas_basis) + ref_meas_vector = ref_zx_graph.meas_bases[node].vector() + assert np.isclose(abs(np.vdot(meas_vector, ref_meas_vector)), 1) + assert zx_graph.meas_bases[node].plane == ref_meas_basis.plane + assert is_close_angle(zx_graph.meas_bases[node].angle, ref_meas_basis.angle) + + +def test_apply_local_clifford_to_planner_meas_basis_yz_3(zx_graph: ZXGraphState, rng: np.random.Generator) -> None: + """Test apply_local_clifford correctly updates the PlannerMeasBasis(YZ, angle).""" + node = zx_graph.add_physical_node() + ref_zx_graph = deepcopy(zx_graph) + + angle = rng.random() * 2 * np.pi + zx_graph.assign_meas_basis(node, PlannerMeasBasis(Plane.YZ, angle)) + lc = LocalClifford(np.pi / 2, np.pi / 2, np.pi / 2) + zx_graph.apply_local_clifford(node, lc) + meas_vector = zx_graph.meas_bases[node].vector() + + ref_meas_basis = PlannerMeasBasis(Plane.XY, -angle) + ref_zx_graph.assign_meas_basis(node, ref_meas_basis) + ref_meas_vector = ref_zx_graph.meas_bases[node].vector() + assert np.isclose(abs(np.vdot(meas_vector, ref_meas_vector)), 1) + assert zx_graph.meas_bases[node].plane == ref_meas_basis.plane + assert is_close_angle(zx_graph.meas_bases[node].angle, ref_meas_basis.angle) + + +def test_local_complement_fails_if_nonexistent_node(zx_graph: ZXGraphState) -> None: + """Test local complement raises an error if the node does not exist.""" + with pytest.raises(ValueError, match="Node does not exist node=0"): + zx_graph.local_complement(0) + + +def test_local_complement_fails_if_not_zx_graph(zx_graph: ZXGraphState) -> None: + """Test local complement raises an error if the graph is not a ZX-diagram.""" + node = zx_graph.add_physical_node() + with pytest.raises(ValueError, match="Measurement basis not set for node 0"): + zx_graph.local_complement(node) + + +def test_local_complement_fails_with_input_node(zx_graph: ZXGraphState) -> None: + """Test local complement fails with input node.""" + node = zx_graph.add_physical_node() + zx_graph.register_input(node, q_index=0) + with pytest.raises(ValueError, match=r"Cannot apply local complement to input node."): + zx_graph.local_complement(node) + + +def test_local_clifford_updates_xy_basis(zx_graph: ZXGraphState, rng: np.random.Generator) -> None: + """Applying an LC should update stored XY angles using the public convention.""" + node = zx_graph.add_physical_node() + angle = rng.random() * 2 * np.pi + zx_graph.assign_meas_basis(node, PlannerMeasBasis(Plane.XY, angle)) + lc = LocalClifford(0.0, 0.0, -np.pi / 2) + + zx_graph.apply_local_clifford(node, lc) + meas_basis_vector = zx_graph.meas_bases[node].vector() + ref_vector = PlannerMeasBasis(Plane.XY, angle + np.pi / 2).vector() + inner_product = abs(np.vdot(meas_basis_vector, ref_vector)) + assert np.isclose(inner_product, 1) + + +@pytest.mark.parametrize("plane", list(Plane)) +def test_local_complement_with_no_edge(zx_graph: ZXGraphState, plane: Plane, rng: np.random.Generator) -> None: + """Test local complement with a graph with a single node and no edges.""" + angle = rng.random() * 2 * np.pi + ref_plane, ref_angle_func = MEAS_ACTION_LC_TARGET[plane] + ref_angle = ref_angle_func(angle) + node = zx_graph.add_physical_node() + zx_graph.assign_meas_basis(node, PlannerMeasBasis(plane, angle)) + + zx_graph.local_complement(node) + assert zx_graph.physical_edges == set() + assert zx_graph.meas_bases[node].plane == ref_plane + assert is_close_angle(zx_graph.meas_bases[node].angle, ref_angle) + + +@pytest.mark.parametrize("planes", plane_combinations(3)) +def test_local_complement_with_minimal_graph( + zx_graph: ZXGraphState, planes: tuple[Plane, Plane, Plane], rng: np.random.Generator +) -> None: + """Test local complement with a minimal graph.""" + for _ in range(3): + zx_graph.add_physical_node() + for i, j in [(0, 1), (1, 2)]: + zx_graph.add_physical_edge(i, j) + angles = [rng.random() * 2 * np.pi for _ in range(3)] + for i in range(3): + zx_graph.assign_meas_basis(i, PlannerMeasBasis(planes[i], angles[i])) + zx_graph.local_complement(1) + ref_plane0, ref_angle_func0 = MEAS_ACTION_LC_NEIGHBORS[planes[0]] + ref_plane1, ref_angle_func1 = MEAS_ACTION_LC_TARGET[planes[1]] + ref_plane2, ref_angle_func2 = MEAS_ACTION_LC_NEIGHBORS[planes[2]] + ref_angle0 = ref_angle_func0(angles[0]) + ref_angle1 = ref_angle_func1(angles[1]) + ref_angle2 = ref_angle_func2(angles[2]) + exp_measurements = [ + (0, PlannerMeasBasis(ref_plane0, ref_angle0)), + (1, PlannerMeasBasis(ref_plane1, ref_angle1)), + (2, PlannerMeasBasis(ref_plane2, ref_angle2)), + ] + _test(zx_graph, exp_nodes={0, 1, 2}, exp_edges={(0, 1), (0, 2), (1, 2)}, exp_measurements=exp_measurements) + + zx_graph.local_complement(1) + ref_plane0, ref_angle_func0 = MEAS_ACTION_LC_NEIGHBORS[ref_plane0] + ref_plane1, ref_angle_func1 = MEAS_ACTION_LC_TARGET[ref_plane1] + ref_plane2, ref_angle_func2 = MEAS_ACTION_LC_NEIGHBORS[ref_plane2] + exp_measurements = [ + (0, PlannerMeasBasis(ref_plane0, ref_angle_func0(ref_angle0))), + (1, PlannerMeasBasis(ref_plane1, ref_angle_func1(ref_angle1))), + (2, PlannerMeasBasis(ref_plane2, ref_angle_func2(ref_angle2))), + ] + _test( + zx_graph, + exp_nodes={0, 1, 2}, + exp_edges={(0, 1), (1, 2)}, + exp_measurements=exp_measurements, + ) + + +@pytest.mark.parametrize(("plane1", "plane3"), plane_combinations(2)) +def test_local_complement_on_output_node( + zx_graph: ZXGraphState, plane1: Plane, plane3: Plane, rng: np.random.Generator +) -> None: + """Test local complement on an output node.""" + _initialize_graph(zx_graph, nodes=range(5), edges={(0, 1), (1, 2), (2, 3), (3, 4)}, inputs=(0, 4), outputs=(2,)) + angle1 = rng.random() * 2 * np.pi + angle3 = rng.random() * 2 * np.pi + measurements = [ + (0, PlannerMeasBasis(Plane.XY, 0.0)), + (1, PlannerMeasBasis(plane1, angle1)), + (3, PlannerMeasBasis(plane3, angle3)), + (4, PlannerMeasBasis(Plane.XY, 0.0)), + ] + _apply_measurements(zx_graph, measurements) + zx_graph.local_complement(2) + + ref_plane1, ref_angle_func1 = MEAS_ACTION_LC_NEIGHBORS[plane1] + ref_plane3, ref_angle_func3 = MEAS_ACTION_LC_NEIGHBORS[plane3] + exp_measurements = [ + (1, PlannerMeasBasis(ref_plane1, ref_angle_func1(measurements[1][1].angle))), + (3, PlannerMeasBasis(ref_plane3, ref_angle_func3(measurements[2][1].angle))), + ] + _test( + zx_graph, + exp_nodes={0, 1, 2, 3, 4}, + exp_edges={(0, 1), (1, 2), (1, 3), (2, 3), (3, 4)}, + exp_measurements=exp_measurements, + ) + assert zx_graph.meas_bases.get(2) is None + + +@pytest.mark.parametrize(("plane0", "plane1"), plane_combinations(2)) +def test_local_complement_with_two_nodes_graph( + zx_graph: ZXGraphState, plane0: Plane, plane1: Plane, rng: np.random.Generator +) -> None: + """Test local complement with a graph with two nodes.""" + zx_graph.add_physical_node() + zx_graph.add_physical_node() + zx_graph.add_physical_edge(0, 1) + angle0 = rng.random() * 2 * np.pi + angle1 = rng.random() * 2 * np.pi + zx_graph.assign_meas_basis(0, PlannerMeasBasis(plane0, angle0)) + zx_graph.assign_meas_basis(1, PlannerMeasBasis(plane1, angle1)) + zx_graph.local_complement(0) + + ref_plane0, ref_angle_func0 = MEAS_ACTION_LC_TARGET[plane0] + ref_plane1, ref_angle_func1 = MEAS_ACTION_LC_NEIGHBORS[plane1] + exp_measurements = [ + (0, PlannerMeasBasis(ref_plane0, ref_angle_func0(angle0))), + (1, PlannerMeasBasis(ref_plane1, ref_angle_func1(angle1))), + ] + _test(zx_graph, exp_nodes={0, 1}, exp_edges={(0, 1)}, exp_measurements=exp_measurements) + + +@pytest.mark.parametrize("planes", plane_combinations(3)) +def test_local_complement_4_times( + zx_graph: ZXGraphState, planes: tuple[Plane, Plane, Plane], rng: np.random.Generator +) -> None: + """Test 4 times local complement returns to the original graph.""" + for _ in range(3): + zx_graph.add_physical_node() + for i, j in [(0, 1), (1, 2)]: + zx_graph.add_physical_edge(i, j) + angles = [rng.random() * 2 * np.pi for _ in range(3)] + for i in range(3): + zx_graph.assign_meas_basis(i, PlannerMeasBasis(planes[i], angles[i])) + + for _ in range(4): + zx_graph.local_complement(1) + + exp_measurements = [(i, PlannerMeasBasis(planes[i], angles[i])) for i in range(3)] + _test(zx_graph, exp_nodes={0, 1, 2}, exp_edges={(0, 1), (1, 2)}, exp_measurements=exp_measurements) + + +@pytest.mark.parametrize( + "alpha", + [0, np.pi / 2, np.pi, 3 * np.pi / 2], +) +def test_expand_input_local_cliffords_xy_plane(zx_graph: ZXGraphState, alpha: float) -> None: + """Test expanding local Clifford operators on an input node with XY measurement plane.""" + old_input_node = zx_graph.add_physical_node() + output_node = zx_graph.add_physical_node() + zx_graph.add_physical_edge(old_input_node, output_node) + zx_graph.register_input(old_input_node, 0) + zx_graph.register_output(output_node, 0) + old_input_angle = np.pi / 3 + zx_graph.assign_meas_basis(old_input_node, PlannerMeasBasis(Plane.XY, old_input_angle)) + + beta = 0.0 + gamma = 0.0 + lc = LocalClifford(alpha=alpha, beta=beta, gamma=gamma) + zx_graph.apply_local_clifford(old_input_node, lc) + zx_graph.expand_local_cliffords() + + new_input_node, new_node = (2, 3) + assert zx_graph.input_node_indices == {new_input_node: 0} + for node in zx_graph.physical_nodes - set(zx_graph.output_node_indices): + assert zx_graph.meas_bases[node].plane == Plane.XY + assert is_close_angle(zx_graph.meas_bases[new_node].angle, lc.beta) + assert is_close_angle(zx_graph.meas_bases[new_input_node].angle, lc.gamma) + assert is_close_angle(zx_graph.meas_bases[old_input_node].angle, old_input_angle - lc.alpha) + + +@pytest.mark.parametrize( + "alpha", + [0, np.pi, np.pi / 2, 3 * np.pi / 2], +) +def test_expand_input_local_cliffords_yz_plane(zx_graph: ZXGraphState, alpha: float) -> None: + """Test expanding local Clifford operators on an input node with YZ measurement plane.""" + old_input_node = zx_graph.add_physical_node() + output_node = zx_graph.add_physical_node() + zx_graph.add_physical_edge(old_input_node, output_node) + zx_graph.register_input(old_input_node, 0) + zx_graph.register_output(output_node, 0) + old_input_angle = np.pi / 3 + zx_graph.assign_meas_basis(old_input_node, PlannerMeasBasis(Plane.YZ, old_input_angle)) + + beta = 0.0 + gamma = 0.0 + lc = LocalClifford(alpha=alpha, beta=beta, gamma=gamma) + zx_graph.apply_local_clifford(old_input_node, lc) + zx_graph.expand_local_cliffords() + + exp_angle: float = old_input_angle + if is_close_angle(2 * alpha, 0): + assert zx_graph.meas_bases[old_input_node].plane == Plane.YZ + exp_angle = old_input_angle if is_close_angle(alpha, 0) else -old_input_angle + elif is_close_angle(2 * (alpha - np.pi / 2), 0): + assert zx_graph.meas_bases[old_input_node].plane == Plane.XZ + exp_angle = old_input_angle if is_close_angle(alpha - np.pi / 2, 0) else -old_input_angle + assert is_close_angle(zx_graph.meas_bases[old_input_node].angle, exp_angle) + new_input_node = 2 + assert zx_graph.meas_bases[new_input_node].plane == Plane.XY + assert is_close_angle(zx_graph.meas_bases[new_input_node].angle, 0.0) + + +@pytest.mark.parametrize( + "alpha", + [0, np.pi / 2, np.pi, 3 * np.pi / 2], +) +def test_expand_input_local_cliffords_xz_plane(zx_graph: ZXGraphState, alpha: float) -> None: + """Test expanding local Clifford operators on an input node with XZ measurement plane.""" + old_input_node = zx_graph.add_physical_node() + output_node = zx_graph.add_physical_node() + zx_graph.add_physical_edge(old_input_node, output_node) + zx_graph.register_input(old_input_node, 0) + zx_graph.register_output(output_node, 0) + old_input_angle = np.pi / 3 + zx_graph.assign_meas_basis(old_input_node, PlannerMeasBasis(Plane.XZ, old_input_angle)) + + beta = 0.0 + gamma = 0.0 + lc = LocalClifford(alpha=alpha, beta=beta, gamma=gamma) + zx_graph.apply_local_clifford(old_input_node, lc) + zx_graph.expand_local_cliffords() + + exp_angle: float = old_input_angle + if is_close_angle(2 * alpha, 0): + assert zx_graph.meas_bases[old_input_node].plane == Plane.XZ + exp_angle = old_input_angle if is_close_angle(alpha, 0) else -old_input_angle + elif is_close_angle(2 * (alpha - np.pi / 2), 0): + assert zx_graph.meas_bases[old_input_node].plane == Plane.YZ + exp_angle = old_input_angle if is_close_angle(alpha + np.pi / 2, 0) else -old_input_angle + assert is_close_angle(zx_graph.meas_bases[old_input_node].angle, exp_angle) + new_input_node = 2 + assert zx_graph.meas_bases[new_input_node].plane == Plane.XY + assert is_close_angle(zx_graph.meas_bases[new_input_node].angle, 0.0) + + +def test_expand_output_local_cliffords(zx_graph: ZXGraphState) -> None: + """Test expanding local Clifford operators on an output node.""" + input_node = zx_graph.add_physical_node() + old_output_node = zx_graph.add_physical_node() + zx_graph.add_physical_edge(input_node, old_output_node) + zx_graph.register_input(input_node, 0) + zx_graph.register_output(old_output_node, 0) + zx_graph.assign_meas_basis(input_node, PlannerMeasBasis(Plane.XY, 0.0)) + lc = LocalClifford(alpha=np.pi / 2, beta=np.pi, gamma=3 * np.pi / 2) + zx_graph.apply_local_clifford(old_output_node, lc) + zx_graph.expand_local_cliffords() + new_output_node = 5 + assert zx_graph.output_node_indices == {new_output_node: 0} + for node in zx_graph.physical_nodes - set(zx_graph.output_node_indices): + assert zx_graph.meas_bases[node].plane == Plane.XY + assert is_close_angle(zx_graph.meas_bases[old_output_node].angle, -lc.alpha) + assert is_close_angle(zx_graph.meas_bases[old_output_node + 1].angle, -lc.beta) + assert is_close_angle(zx_graph.meas_bases[old_output_node + 2].angle, -lc.gamma) + assert is_close_angle(zx_graph.meas_bases[old_output_node + 3].angle, 0.0) + + +def test_local_clifford_expansion() -> None: + graph, flow = generate_random_flow_graph(width=1, depth=3, edge_p=0.5) + zx_graph, _ = ZXGraphState.from_base_graph_state(graph) + + pattern = qompile(zx_graph, flow) + sim = PatternSimulator(pattern, backend=SimulatorBackend.StateVector) + sim.simulate() + psi_original = sim.state.state() + + zx_graph_cp = deepcopy(zx_graph) + lc = LocalClifford(2 * np.pi, 2 * np.pi, 2 * np.pi) + zx_graph_cp.apply_local_clifford(0, lc) + lc = LocalClifford(2 * np.pi, 0.0, 0.0) + zx_graph_cp.apply_local_clifford(2, lc) + zx_graph_cp.expand_local_cliffords() + gflow_cp = gflow_wrapper(zx_graph_cp) + pattern_cp = qompile(zx_graph_cp, gflow_cp) + sim_cp = PatternSimulator(pattern_cp, backend=SimulatorBackend.StateVector) + sim_cp.simulate() + psi_cp = sim_cp.state.state() + assert np.isclose(np.abs(np.vdot(psi_original, psi_cp)), 1.0) + + +def test_pivot_fails_with_nonexistent_nodes(zx_graph: ZXGraphState) -> None: + """Test pivot fails with nonexistent nodes.""" + with pytest.raises(ValueError, match="Node does not exist node=0"): + zx_graph.pivot(0, 1) + zx_graph.add_physical_node() + with pytest.raises(ValueError, match="Node does not exist node=1"): + zx_graph.pivot(0, 1) + + +def test_pivot_fails_with_input_node(zx_graph: ZXGraphState) -> None: + """Test pivot fails with input node.""" + node1 = zx_graph.add_physical_node() + node2 = zx_graph.add_physical_node() + zx_graph.register_input(node1, q_index=0) + with pytest.raises(ValueError, match="Cannot apply pivot to input node"): + zx_graph.pivot(node1, node2) + + +def test_pivot_with_obvious_graph(zx_graph: ZXGraphState) -> None: + """Test pivot with an obvious graph.""" + # 0---1---2 -> 0---2---1 + for _ in range(3): + zx_graph.add_physical_node() + + for i, j in [(0, 1), (1, 2)]: + zx_graph.add_physical_edge(i, j) + + measurements = [ + (0, PlannerMeasBasis(Plane.XY, np.pi)), + (1, PlannerMeasBasis(Plane.XZ, 1.4 * np.pi)), + (2, PlannerMeasBasis(Plane.YZ, 0.4 * np.pi)), + ] + _apply_measurements(zx_graph, measurements) + + zx_graph.pivot(1, 2) + assert zx_graph.physical_edges == {(0, 2), (1, 2)} + ref_plane1, ref_angle_func1 = MEAS_ACTION_PV_TARGET[Plane.XZ] + ref_plane2, ref_angle_func2 = MEAS_ACTION_PV_TARGET[Plane.YZ] + assert zx_graph.meas_bases[0].plane == Plane.XY + assert zx_graph.meas_bases[1].plane == ref_plane1 + assert zx_graph.meas_bases[2].plane == ref_plane2 + assert is_close_angle(zx_graph.meas_bases[0].angle, np.pi) + assert is_close_angle(zx_graph.meas_bases[1].angle, ref_angle_func1(1.4 * np.pi)) + assert is_close_angle(zx_graph.meas_bases[2].angle, ref_angle_func2(0.4 * np.pi)) + + +@pytest.mark.parametrize("planes", plane_combinations(5)) +def test_pivot_with_minimal_graph( + zx_graph: ZXGraphState, planes: tuple[Plane, Plane, Plane, Plane, Plane], rng: np.random.Generator +) -> None: + """Test pivot with a minimal graph.""" + # 0---1---2---4 + # \ / + # 3 + for _ in range(5): + zx_graph.add_physical_node() + + for i, j in [(0, 1), (1, 2), (1, 3), (2, 3), (2, 4)]: + zx_graph.add_physical_edge(i, j) + + angles = [rng.random() * 2 * np.pi for _ in range(5)] + measurements = [(i, PlannerMeasBasis(planes[i], angles[i])) for i in range(5)] + _apply_measurements(zx_graph, measurements) + zx_graph_cp = deepcopy(zx_graph) + + zx_graph.pivot(1, 2) + zx_graph_cp.local_complement(1) + zx_graph_cp.local_complement(2) + zx_graph_cp.local_complement(1) + assert zx_graph.physical_edges == zx_graph_cp.physical_edges + assert zx_graph.meas_bases[1].plane == zx_graph_cp.meas_bases[1].plane + assert zx_graph.meas_bases[2].plane == zx_graph_cp.meas_bases[2].plane + + _, ref_angle_func1 = MEAS_ACTION_PV_TARGET[planes[1]] + _, ref_angle_func2 = MEAS_ACTION_PV_TARGET[planes[2]] + _, ref_angle_func3 = MEAS_ACTION_PV_NEIGHBORS[planes[3]] + ref_angle1 = ref_angle_func1(angles[1]) + ref_angle2 = ref_angle_func2(angles[2]) + ref_angle3 = ref_angle_func3(angles[3]) + assert is_close_angle(zx_graph.meas_bases[1].angle, ref_angle1) + assert is_close_angle(zx_graph.meas_bases[2].angle, ref_angle2) + assert is_close_angle(zx_graph.meas_bases[3].angle, ref_angle3) + + +def test_remove_clifford_fails_if_nonexistent_node(zx_graph: ZXGraphState) -> None: + """Test remove_clifford raises an error if the node does not exist.""" + with pytest.raises(ValueError, match="Node does not exist node=0"): + zx_graph.remove_clifford(0) + + +def test_remove_clifford_fails_with_input_node(zx_graph: ZXGraphState) -> None: + node = zx_graph.add_physical_node() + zx_graph.register_input(node, q_index=0) + with pytest.raises(ValueError, match="Clifford node removal not allowed for input or output nodes"): + zx_graph.remove_clifford(node) + + +def test_remove_clifford_fails_with_invalid_plane(zx_graph: ZXGraphState) -> None: + """Test remove_clifford fails if the measurement plane is invalid.""" + zx_graph.add_physical_node() + zx_graph.assign_meas_basis( + 0, + PlannerMeasBasis("test_plane", 0.5 * np.pi), # type: ignore[reportArgumentType, arg-type, unused-ignore] + ) + with pytest.raises(ValueError, match="This node is not a Clifford node"): + zx_graph.remove_clifford(0) + + +def test_remove_clifford_fails_for_non_clifford_node(zx_graph: ZXGraphState) -> None: + zx_graph.add_physical_node() + zx_graph.assign_meas_basis(0, PlannerMeasBasis(Plane.XY, 0.1 * np.pi)) + with pytest.raises(ValueError, match="This node is not a Clifford node"): + zx_graph.remove_clifford(0) + + +def graph_1(zx_graph: ZXGraphState) -> None: + # _is_trivial_meas + # 3---0---1 3 1 + # | -> + # 2 2 + _initialize_graph(zx_graph, nodes=range(4), edges={(0, 1), (0, 2), (0, 3)}) + + +def graph_2(zx_graph: ZXGraphState) -> None: + # _needs_lc + # 0---1---2 -> 0---2 + _initialize_graph(zx_graph, nodes=range(3), edges={(0, 1), (1, 2)}) + + +def graph_3(zx_graph: ZXGraphState) -> None: + # _needs_pivot_1 on (1, 2) + # 3(I) 3(I) + # / \ / | \ + # 0(I) - 1 - 2 - 5 -> 0(I) - 2 5 - 0(I) + # \ / \ | / + # 4(I) 4(I) + _initialize_graph( + zx_graph, nodes=range(6), edges={(0, 1), (1, 2), (1, 3), (1, 4), (2, 3), (2, 4), (2, 5)}, inputs=(0, 3, 4) + ) + + +def _test_remove_clifford( + zx_graph: ZXGraphState, + node: int, + measurements: Measurements, + exp_graph: tuple[set[int], set[tuple[int, int]]], + exp_measurements: Measurements, +) -> None: + _apply_measurements(zx_graph, measurements) + zx_graph.remove_clifford(node) + exp_nodes = exp_graph[0] + exp_edges = exp_graph[1] + _test(zx_graph, exp_nodes, exp_edges, exp_measurements) + + +@pytest.mark.parametrize( + "planes", + list(itertools.product(list(Plane), [Plane.XZ, Plane.YZ], list(Plane))), +) +def test_remove_clifford( + zx_graph: ZXGraphState, + planes: tuple[Plane, Plane, Plane], + rng: np.random.Generator, +) -> None: + graph_2(zx_graph) + angles = [rng.random() * 2 * np.pi for _ in range(3)] + epsilon = 1e-10 + angles[1] = rng.choice([0.0, np.pi, 2 * np.pi - epsilon]) + measurements = [(i, PlannerMeasBasis(planes[i], angles[i])) for i in range(3)] + ref_plane0, ref_angle_func0 = MEAS_ACTION_RC[planes[0]] + ref_plane2, ref_angle_func2 = MEAS_ACTION_RC[planes[2]] + ref_angle0 = ref_angle_func0(angles[1], angles[0]) + ref_angle2 = ref_angle_func2(angles[1], angles[2]) + exp_measurements = [ + (0, PlannerMeasBasis(ref_plane0, ref_angle0)), + (2, PlannerMeasBasis(ref_plane2, ref_angle2)), + ] + _test_remove_clifford( + zx_graph, node=1, measurements=measurements, exp_graph=({0, 2}, set()), exp_measurements=exp_measurements + ) + + +def test_unremovable_clifford_node(zx_graph: ZXGraphState) -> None: + _initialize_graph(zx_graph, nodes=range(3), edges={(0, 1), (1, 2)}, inputs=(0, 2)) + measurements = [ + (0, PlannerMeasBasis(Plane.XY, 0.5 * np.pi)), + (1, PlannerMeasBasis(Plane.XY, np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.5 * np.pi)), + ] + _apply_measurements(zx_graph, measurements) + with pytest.raises(ValueError, match=r"This Clifford node is unremovable."): + zx_graph.remove_clifford(1) + + +def test_remove_cliffords(zx_graph: ZXGraphState) -> None: + """Test removing multiple Clifford nodes.""" + _initialize_graph(zx_graph, nodes=range(4), edges={(0, 1), (0, 2), (0, 3)}) + measurements = [ + (0, PlannerMeasBasis(Plane.XY, 0.5 * np.pi)), + (1, PlannerMeasBasis(Plane.XY, 0.5 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.5 * np.pi)), + (3, PlannerMeasBasis(Plane.XY, 0.5 * np.pi)), + ] + _apply_measurements(zx_graph, measurements) + zx_graph.remove_cliffords() + _test(zx_graph, {2}, set(), []) + + +def test_remove_cliffords_graph1(zx_graph: ZXGraphState) -> None: + """Test removing multiple Clifford nodes.""" + graph_1(zx_graph) + measurements = [ + (0, PlannerMeasBasis(Plane.YZ, np.pi)), + (1, PlannerMeasBasis(Plane.XY, 0.1 * np.pi)), + (2, PlannerMeasBasis(Plane.XZ, 0.2 * np.pi)), + (3, PlannerMeasBasis(Plane.YZ, 0.3 * np.pi)), + ] + exp_measurements = [ + (1, PlannerMeasBasis(Plane.XY, 1.1 * np.pi)), + (2, PlannerMeasBasis(Plane.XZ, 1.8 * np.pi)), + (3, PlannerMeasBasis(Plane.YZ, 1.7 * np.pi)), + ] + _apply_measurements(zx_graph, measurements) + zx_graph.remove_cliffords() + _test(zx_graph, {1, 2, 3}, set(), exp_measurements=exp_measurements) + + +def test_remove_cliffords_graph2(zx_graph: ZXGraphState) -> None: + graph_2(zx_graph) + measurements = [ + (0, PlannerMeasBasis(Plane.XY, 0.1 * np.pi)), + (1, PlannerMeasBasis(Plane.YZ, 1.5 * np.pi)), + (2, PlannerMeasBasis(Plane.XZ, 0.2 * np.pi)), + ] + exp_measurements = [ + (0, PlannerMeasBasis(Plane.XY, 0.6 * np.pi)), + (2, PlannerMeasBasis(Plane.YZ, 0.2 * np.pi)), + ] + _apply_measurements(zx_graph, measurements) + zx_graph.remove_cliffords() + _test(zx_graph, {0, 2}, {(0, 2)}, exp_measurements=exp_measurements) + + +def test_remove_cliffords_graph3(zx_graph: ZXGraphState) -> None: + graph_3(zx_graph) + measurements = [ + (0, PlannerMeasBasis(Plane.XY, 0.1 * np.pi)), + (1, PlannerMeasBasis(Plane.XZ, 1.5 * np.pi)), + (2, PlannerMeasBasis(Plane.XZ, 0.2 * np.pi)), + (3, PlannerMeasBasis(Plane.YZ, 0.3 * np.pi)), + (4, PlannerMeasBasis(Plane.XY, 0.4 * np.pi)), + (5, PlannerMeasBasis(Plane.XZ, 0.5 * np.pi)), + ] + exp_measurements = [ + (0, PlannerMeasBasis(Plane.XY, 0.1 * np.pi)), + (2, PlannerMeasBasis(Plane.XZ, 1.7 * np.pi)), + (3, PlannerMeasBasis(Plane.YZ, 0.3 * np.pi)), + (4, PlannerMeasBasis(Plane.XY, 0.4 * np.pi)), + (5, PlannerMeasBasis(Plane.XZ, 1.5 * np.pi)), + ] + _apply_measurements(zx_graph, measurements) + zx_graph.remove_cliffords() + _test( + zx_graph, + {0, 2, 3, 4, 5}, + {(0, 2), (0, 3), (0, 4), (0, 5), (2, 3), (2, 4), (3, 5), (4, 5)}, + exp_measurements=exp_measurements, + ) + + +def test_random_graph(zx_graph: ZXGraphState) -> None: + """Test removing multiple Clifford nodes from a random graph.""" + random_graph, _ = generate_random_flow_graph(5, 5) + zx_graph, _ = ZXGraphState.from_base_graph_state(random_graph) + + for i in zx_graph.physical_nodes - set(zx_graph.output_node_indices): + rng = np.random.default_rng(seed=0) + rnd = rng.random() + if 0 <= rnd < 0.33: + pass + elif 0.33 <= rnd < 0.66: + angle = zx_graph.meas_bases[i].angle + zx_graph.assign_meas_basis(i, PlannerMeasBasis(Plane.XZ, angle)) + else: + angle = zx_graph.meas_bases[i].angle + zx_graph.assign_meas_basis(i, PlannerMeasBasis(Plane.YZ, angle)) + + zx_graph.remove_cliffords() + atol = 1e-9 + nodes = zx_graph.physical_nodes - set(zx_graph.input_node_indices) - set(zx_graph.output_node_indices) + clifford_nodes = [node for node in nodes if zx_graph.is_removable_clifford(node, atol)] + assert clifford_nodes == [] + + +@pytest.mark.parametrize( + ("measurements", "exp_measurements", "exp_edges"), + [ + # no pair of adjacent nodes with YZ measurements + # and no node with XZ measurement + ( + [ + (0, PlannerMeasBasis(Plane.XY, 0.11 * np.pi)), + (1, PlannerMeasBasis(Plane.XY, 0.22 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.33 * np.pi)), + (3, PlannerMeasBasis(Plane.XY, 0.44 * np.pi)), + (4, PlannerMeasBasis(Plane.XY, 0.55 * np.pi)), + (5, PlannerMeasBasis(Plane.XY, 0.66 * np.pi)), + ], + [ + (0, PlannerMeasBasis(Plane.XY, 0.11 * np.pi)), + (1, PlannerMeasBasis(Plane.XY, 0.22 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.33 * np.pi)), + (3, PlannerMeasBasis(Plane.XY, 0.44 * np.pi)), + (4, PlannerMeasBasis(Plane.XY, 0.55 * np.pi)), + (5, PlannerMeasBasis(Plane.XY, 0.66 * np.pi)), + ], + {(0, 1), (1, 2), (1, 3), (1, 4), (2, 3), (2, 4), (2, 5)}, + ), + ], +) +def test_convert_to_phase_gadget( + zx_graph: ZXGraphState, + measurements: Measurements, + exp_measurements: Measurements, + exp_edges: set[tuple[int, int]], +) -> None: + initial_edges = {(0, 1), (1, 2), (1, 3), (1, 4), (2, 3), (2, 4), (2, 5)} + _initialize_graph(zx_graph, nodes=range(6), edges=initial_edges) + _apply_measurements(zx_graph, measurements) + zx_graph.convert_to_phase_gadget() + _test(zx_graph, exp_nodes={0, 1, 2, 3, 4, 5}, exp_edges=exp_edges, exp_measurements=exp_measurements) + + +@pytest.mark.parametrize( + ("initial_edges", "measurements", "exp_measurements", "exp_edges"), + [ + # 3(XY) 3(XY) + # | -> | + # 0(YZ) - 1(XY) - 2(XY) 1(XY) - 2(XY) + ( + {(0, 1), (1, 2), (1, 3)}, + [ + (0, PlannerMeasBasis(Plane.YZ, 0.11 * np.pi)), + (1, PlannerMeasBasis(Plane.XY, 0.22 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.33 * np.pi)), + (3, PlannerMeasBasis(Plane.XY, 0.44 * np.pi)), + ], + [ + (1, PlannerMeasBasis(Plane.XY, 0.11 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.33 * np.pi)), + (3, PlannerMeasBasis(Plane.XY, 0.44 * np.pi)), + ], + {(1, 2), (1, 3)}, + ), + # 3(YZ) 3(YZ) + # | \ -> | \ + # 0(YZ) - 1(XY) - 2(XY) 1(XY) - 2(XY) + ( + {(0, 1), (1, 2), (1, 3), (2, 3)}, + [ + (0, PlannerMeasBasis(Plane.YZ, 0.11 * np.pi)), + (1, PlannerMeasBasis(Plane.XY, 0.22 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.33 * np.pi)), + (3, PlannerMeasBasis(Plane.YZ, 0.44 * np.pi)), + ], + [ + (1, PlannerMeasBasis(Plane.XY, 0.11 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.33 * np.pi)), + (3, PlannerMeasBasis(Plane.YZ, 0.44 * np.pi)), + ], + {(1, 2), (1, 3), (2, 3)}, + ), + ], +) +def test_merge_yz_to_xy( + zx_graph: ZXGraphState, + initial_edges: set[tuple[int, int]], + measurements: Measurements, + exp_measurements: Measurements, + exp_edges: set[tuple[int, int]], +) -> None: + _initialize_graph(zx_graph, nodes=range(4), edges=initial_edges) + _apply_measurements(zx_graph, measurements) + zx_graph.merge_yz_to_xy() + _test(zx_graph, exp_nodes={1, 2, 3}, exp_edges=exp_edges, exp_measurements=exp_measurements) + + +@pytest.mark.parametrize( + ("initial_edges", "measurements", "exp_zxgraph"), + [ + # 3(YZ) 3(YZ) + # / \ / \ + # 0(XY) - 1(XY) - 2(XY) -> 0(XY) - 1(XY) - 2(XY) + # \ / + # 4(YZ) + ( + {(0, 1), (0, 3), (0, 4), (1, 2), (2, 3), (2, 4)}, + [ + (0, PlannerMeasBasis(Plane.XY, 0.11 * np.pi)), + (1, PlannerMeasBasis(Plane.XY, 0.22 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.33 * np.pi)), + (3, PlannerMeasBasis(Plane.YZ, 0.44 * np.pi)), + (4, PlannerMeasBasis(Plane.YZ, 0.55 * np.pi)), + ], + ( + [ + (0, PlannerMeasBasis(Plane.XY, 0.11 * np.pi)), + (1, PlannerMeasBasis(Plane.XY, 0.22 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.33 * np.pi)), + (3, PlannerMeasBasis(Plane.YZ, 0.99 * np.pi)), + ], + {(0, 1), (0, 3), (1, 2), (2, 3)}, + {0, 1, 2, 3}, + ), + ), + # 3(YZ) + # / \ + # 0(XY) - 1(YZ) - 2(XY) -> 0(XY) - 1(YZ) - 2(XY) + # \ / + # 4(YZ) + ( + {(0, 1), (0, 3), (0, 4), (1, 2), (2, 3), (2, 4)}, + [ + (0, PlannerMeasBasis(Plane.XY, 0.11 * np.pi)), + (1, PlannerMeasBasis(Plane.YZ, 0.22 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.33 * np.pi)), + (3, PlannerMeasBasis(Plane.YZ, 0.44 * np.pi)), + (4, PlannerMeasBasis(Plane.YZ, 0.55 * np.pi)), + ], + ( + [ + (0, PlannerMeasBasis(Plane.XY, 0.11 * np.pi)), + (1, PlannerMeasBasis(Plane.YZ, 1.21 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.33 * np.pi)), + ], + {(0, 1), (1, 2)}, + {0, 1, 2}, + ), + ), + # 3(YZ) + # / \ + # 0(XY) - 1(YZ) - 2(XY) - 0(XY) -> 0(XY) - 1(YZ) - 2(XY) - 0(XY) + # \ / + # 4(YZ) + ( + {(0, 1), (0, 2), (0, 3), (0, 4), (1, 2), (2, 3), (2, 4)}, + [ + (0, PlannerMeasBasis(Plane.XY, 0.11 * np.pi)), + (1, PlannerMeasBasis(Plane.YZ, 0.22 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.33 * np.pi)), + (3, PlannerMeasBasis(Plane.YZ, 0.44 * np.pi)), + (4, PlannerMeasBasis(Plane.YZ, 0.55 * np.pi)), + ], + ( + [ + (0, PlannerMeasBasis(Plane.XY, 0.11 * np.pi)), + (1, PlannerMeasBasis(Plane.YZ, 1.21 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.33 * np.pi)), + ], + {(0, 1), (0, 2), (1, 2)}, + {0, 1, 2}, + ), + ), + ], +) +def test_merge_yz_nodes( + zx_graph: ZXGraphState, + initial_edges: set[tuple[int, int]], + measurements: Measurements, + exp_zxgraph: tuple[Measurements, set[tuple[int, int]], set[int]], +) -> None: + _initialize_graph(zx_graph, nodes=range(5), edges=initial_edges) + _apply_measurements(zx_graph, measurements) + zx_graph.merge_yz_nodes() + exp_measurements, exp_edges, exp_nodes = exp_zxgraph + _test(zx_graph, exp_nodes, exp_edges, exp_measurements) + + +@pytest.mark.parametrize( + ("initial_zxgraph", "measurements", "exp_zxgraph"), + [ + # test for a phase gadget: apply merge_yz_to_xy then remove_cliffords + ( + (range(4), {(0, 1), (1, 2), (1, 3)}), + [ + (0, PlannerMeasBasis(Plane.YZ, -0.1 * np.pi)), + (1, PlannerMeasBasis(Plane.XY, 0.4 * np.pi)), + (2, PlannerMeasBasis(Plane.XY, 0.3 * np.pi)), + (3, PlannerMeasBasis(Plane.XY, 0.4 * np.pi)), + ], + ( + [ + (2, PlannerMeasBasis(Plane.XY, 1.8 * np.pi)), + (3, PlannerMeasBasis(Plane.XY, 1.9 * np.pi)), + ], + {(2, 3)}, + {2, 3}, + ), + ), + # apply convert_to_phase_gadget, merge_yz_to_xy, then remove_cliffords + ( + (range(4), {(0, 1), (1, 2), (1, 3)}), + [ + (0, PlannerMeasBasis(Plane.YZ, -0.1 * np.pi)), + (1, PlannerMeasBasis(Plane.XY, 0.9 * np.pi)), + (2, PlannerMeasBasis(Plane.XZ, 0.8 * np.pi)), + (3, PlannerMeasBasis(Plane.XY, 0.4 * np.pi)), + ], + ( + [ + (2, PlannerMeasBasis(Plane.XY, 0.2 * np.pi)), + (3, PlannerMeasBasis(Plane.XY, 0.9 * np.pi)), + ], + {(2, 3)}, + {2, 3}, + ), + ), + # apply remove_cliffords, convert_to_phase_gadget, merge_yz_to_xy, then remove_cliffords + ( + (range(6), {(0, 1), (1, 2), (1, 3), (2, 5), (3, 4)}), + [ + (0, PlannerMeasBasis(Plane.YZ, -0.1 * np.pi)), + (1, PlannerMeasBasis(Plane.XY, 0.9 * np.pi)), + (2, PlannerMeasBasis(Plane.YZ, 1.2 * np.pi)), + (3, PlannerMeasBasis(Plane.XY, 1.4 * np.pi)), + (4, PlannerMeasBasis(Plane.YZ, 1.0 * np.pi)), + (5, PlannerMeasBasis(Plane.XY, 0.5 * np.pi)), + ], + ( + [ + (2, PlannerMeasBasis(Plane.XY, 1.8 * np.pi)), + (3, PlannerMeasBasis(Plane.XY, 0.9 * np.pi)), + ], + {(2, 3)}, + {2, 3}, + ), + ), + ], +) +def test_full_reduce( + zx_graph: ZXGraphState, + initial_zxgraph: tuple[range, set[tuple[int, int]]], + measurements: Measurements, + exp_zxgraph: tuple[Measurements, set[tuple[int, int]], set[int]], +) -> None: + nodes, edges = initial_zxgraph + _initialize_graph(zx_graph, nodes, edges) + exp_measurements, exp_edges, exp_nodes = exp_zxgraph + _apply_measurements(zx_graph, measurements) + zx_graph.full_reduce() + _test(zx_graph, exp_nodes, exp_edges, exp_measurements) + + +if __name__ == "__main__": + pytest.main()