Source code for skqulacs.qnn.regressor

from __future__ import annotations

from dataclasses import dataclass, field
from typing import List, Optional, Tuple

import numpy as np
from numpy.typing import NDArray
from qulacs import Observable
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler
from typing_extensions import Literal

from skqulacs.circuit import LearningCircuit
from skqulacs.qnn.solver import Solver


[docs]@dataclass(eq=False) class QNNRegressor: """Class to solve regression problems with quantum neural networks The output is taken as expectation values of pauli Z operator acting on the first qubit, i.e., output is <Z_0>. Args: circuit: Circuit to use in the learning. solver: Solver to use(Nelder-Mead is not recommended). n_output: Dimentionality of each output data. cost: Cost function. MSE only for now. MSE computes squared sum after normalization. do_x_scale: Whether to scale x. do_x_scale: Whether to scale y. x_norm_range: Normalize x in [+-xy_norm_range]. y_norm_range: Normalize y in [+-y_norm_range]. Setting y_norm_range to 0.7 improves performance. Examples: >>> from skqulacs.qnn import QNNRegressor >>> from skqulacs.circuit import create_qcl_ansatz >>> n_qubits = 4 >>> depth = 3 >>> evo_time = 0.5 >>> circuit = create_qcl_ansatz(n_qubits, depth, evo_time) >>> model = QNNRegressor(circuit) >>> _, theta = model.fit(x_train, y_train, maxiter=1000) >>> x_list = np.arange(x_min, x_max, 0.02) >>> y_pred = qnn.predict(theta, x_list) """ circuit: LearningCircuit solver: Solver cost: Literal["mse"] = field(default="mse") do_x_scale: bool = field(default=True) do_y_scale: bool = field(default=True) x_norm_range: float = field(default=1.0) y_norm_range: float = field(default=0.7) observables_str: List[str] = field(init=True, default_factory=list) observables: List[Observable] = field(init=False, default_factory=list) n_qubit: int = field(init=False) n_outputs: int = field(default=1) x_scaler: MinMaxScaler = field(init=False) y_scaler: MinMaxScaler = field(init=False) def __post_init__(self) -> None: self.n_qubit = self.circuit.n_qubit if self.do_x_scale: self.scale_x_scaler = MinMaxScaler( feature_range=(-self.x_norm_range, self.x_norm_range) ) if self.do_y_scale: self.scale_y_scaler = MinMaxScaler( feature_range=(-self.y_norm_range, self.y_norm_range) ) if self.observables_str != []: # add the right observables for i in self.observables_str: observable = Observable(self.n_qubit) observable.add_operator(1.0, i) self.observables.append(observable)
[docs] def fit( self, x_train: NDArray[np.float_], y_train: NDArray[np.float_], maxiter_or_lr: Optional[int] = None, ) -> Tuple[float, List[float]]: """ Args: x_list: List of training data inputs whose shape is (n_sample, n_features). y_list: List of training data outputs whose shape is (n_sample, n_output_dims). maxiter: The number of iterations to pass scipy.optimize.minimize Returns: loss: Loss after learning. theta: Parameter theta after learning. """ if x_train.ndim == 1: x_train = x_train.reshape((-1, 1)) if y_train.ndim == 1: y_train = y_train.reshape((-1, 1)) if self.do_x_scale: x_scaled = self.scale_x_scaler.fit_transform(x_train) else: x_scaled = x_train if self.do_y_scale: y_scaled = self.scale_y_scaler.fit_transform(y_train) else: y_scaled = y_train self.n_outputs = y_scaled.shape[1] if ( self.observables_str == [] ): # if this was originally 0, it was not said that the observables are correct. But a eclarations also above allows to not fit in many cases. for i in range(self.n_outputs): observable = Observable(self.n_qubit) observable.add_operator(1.0, f"Z {i}") self.observables.append(observable) ob = "Z " + str(i) self.observables_str.append(ob) theta_init = self.circuit.get_parameters() return self.solver.run( self.cost_func, self._cost_func_grad, theta_init, x_scaled, y_scaled, maxiter_or_lr, )
[docs] def predict(self, x_test: NDArray[np.float_]) -> NDArray[np.float_]: """Predict outcome for each input data in `x_test`. Arguments: x_test: Input data whose shape is (n_samples, n_features). Returns: y_pred: Predicted outcome. """ if x_test.ndim == 1: x_test = x_test.reshape((-1, 1)) if self.do_x_scale: x_scaled: NDArray[np.float_] = self.scale_x_scaler.transform(x_test) else: x_scaled = x_test if self.do_y_scale: y_pred: NDArray[np.float_] = self.scale_y_scaler.inverse_transform( self._predict_inner(x_scaled) ) else: y_pred = self._predict_inner(x_scaled) return y_pred
def _predict_inner(self, x_scaled: NDArray[np.float_]) -> NDArray[np.float_]: res = [] for x in x_scaled: state = self.circuit.run(x) r = [ self.observables[i].get_expectation_value(state) for i in range(self.n_outputs) ] res.append(r) return np.array(res)
[docs] def cost_func( self, theta: List[float], x_scaled: NDArray[np.float_], y_scaled: NDArray[np.float_], ) -> float: if self.cost == "mse": self.circuit.update_parameters(theta) y_pred = self._predict_inner(x_scaled) cost = mean_squared_error(y_pred, y_scaled) return cost else: raise NotImplementedError( f"Cost function {self.cost} is not implemented yet." )
def _cost_func_grad( self, theta: List[float], x_scaled: NDArray[np.float_], y_scaled: NDArray[np.float_], ) -> NDArray[np.float_]: self.circuit.update_parameters(theta) mto = self._predict_inner(x_scaled).copy() grad = np.zeros(len(theta)) for h in range(len(x_scaled)): backobs = Observable(self.n_qubit) if self.n_outputs >= 2: for i in range(len(self.observables_str)): backobs.add_operator( 2 * (-y_scaled[h][i] + mto[h][i]) / self.n_outputs, self.observables_str[ i ], # 2* because of the derivative of the RMSE error ) else: backobs.add_operator( 2 * (-y_scaled[h] + mto[h][0]) / self.n_outputs, self.observables_str[0], ) grad = grad + self.circuit.backprop(x_scaled[h], backobs) grad /= len(x_scaled) return grad
[docs] def func_grad( self, theta: List[float], x_scaled: NDArray[np.float_], ) -> NDArray[np.float_]: self.circuit.update_parameters(theta) grad = np.zeros(len(theta)) for h in range(len(x_scaled)): backobs = Observable(self.n_qubit) for i in self.observables_str: backobs.add_operator(1.0, i) grad += self.circuit.backprop(x_scaled[h], backobs) grad /= len(x_scaled) return grad