# mypy: ignore-errors
from collections.abc import Sequence
from dataclasses import dataclass, field
import numpy as np
from numpy.typing import NDArray
from quri_parts.algo.optimizer import Optimizer, Params
from quri_parts.core.estimator import (
ConcurrentParametricQuantumEstimator,
Estimatable,
GradientEstimator,
ConcurrentQuantumEstimator,
)
from quri_parts.circuit import ParametricQuantumCircuitProtocol
from quri_parts.core.state import ParametricCircuitQuantumState
from quri_parts.core.estimator.gradient import _ParametricStateT
from quri_parts.algo.optimizer import OptimizerStatus
from quri_parts.core.state import quantum_state
from quri_parts.qulacs import QulacsParametricStateT, QulacsStateT
from quri_parts.core.operator import Operator, pauli_label
from scikit_quri.circuit import LearningCircuit
from typing import List, Optional
from sklearn.preprocessing import MinMaxScaler
# from sklearn.metrics import mean_squared_error
from typing_extensions import TypeAlias
EstimatorType: TypeAlias = ConcurrentQuantumEstimator[QulacsStateT]
GradientEstimatorType: TypeAlias = GradientEstimator[_ParametricStateT]
[docs]def mean_squared_error(y_true: NDArray[np.float64], y_pred: NDArray[np.float64]) -> float:
"""
Calculate the mean squared error between true and predicted values.
Parameters:
y_true: True values.
y_pred: Predicted values.
Returns:
mse: Mean squared error.
"""
if y_true.shape != y_pred.shape:
raise ValueError("Shapes of y_true and y_pred must match.")
return float(np.mean((y_true - y_pred) ** 2))
[docs]@dataclass
class QNNRegressor:
"""
Class to solve regression problems with quantum neural networks.
The out is taken as expectation values of ``Pauli Z`` operators acting on the first qubit. i.e., output is ``<Z_0>``.
Args:
ansatz: Circuit to use in the learning.
estimator: Estimator to use. use :py:func:`~quri_parts.qulacs.estimator.create_qulacs_vector_concurrent_estimator` method.
gradient_estimator: Gradient estimator to use. use :py:func:`~quri_parts.core.estimator.gradient.create_parameter_shift_gradient_estimator` or :py:func:`~quri_parts.core.estimator.gradient.create_parameter_shift_gradient_estimator` method.
optimizer: Optimizer to use. use :py:class:`~quri_parts.algo.optimizer.Adam` or :py:class:`~quri_parts.algo.optimizer.LBFGS` method.
Example:
>>> from quri_parts.qulacs.estimator import (
>>> create_qulacs_vector_concurrent_estimator,
>>> create_qulacs_vector_concurrent_parametric_estimator,
>>> )
>>> from quri_parts.core.estimator.gradient import (
>>> create_numerical_gradient_estimator,
>>> )
>>> n_qubit = 3
>>> depth = 3
>>> time_step = 0.5
>>> estimator = create_qulacs_vector_concurrent_estimator()
>>> gradient_estimator = create_numerical_gradient_estimator(
>>> create_qulacs_vector_concurrent_parametric_estimator()
>>> )
>>> circuit = create_qcl_ansatz(n_qubit, depth, time_step, 0)
>>> circuit = create_qcl_ansatz(n_qubit, depth, time_step, 0)
>>> qnn = QNNRegressor(n_qubit, circuit, estimator, gradient_estimator, solver)
>>> qnn.fit(x_train, y_train, maxiter)
>>> y_pred = qnn.predict(x_test)
"""
ansatz: LearningCircuit
estimator: EstimatorType
gradient_estimator: GradientEstimatorType
optimizer: Optimizer
operator: List[Estimatable] = field(default_factory=list)
x_norm_range: float = field(default=1.0)
y_norm_range: float = field(default=0.7)
n_qubit: int = field(init=False)
do_x_scale: bool = field(default=True)
do_y_scale: bool = field(default=True)
n_outputs: int = field(default=1)
y_exp_ratio: float = field(default=2.2)
trained_param: Optional[Params] = field(default=None)
def __post_init__(self) -> None:
self.n_qubit = self.ansatz.n_qubits
if self.do_x_scale:
self.scale_x_scaler = MinMaxScaler(
feature_range=(-self.x_norm_range, self.x_norm_range) # type: ignore
)
if self.do_y_scale:
self.scale_y_scaler = MinMaxScaler(
feature_range=(-self.y_norm_range, self.y_norm_range) # type: ignore
)
[docs] def fit(self, x_train: NDArray[np.float64], y_train: NDArray[np.float64], maxiter=20) -> None:
"""
Fit the model to the training data.
Parameters:
x_train: Input data whose shape is (n_samples, n_features).
y_train: Output data whose shape is (n_samples, n_outputs).
batch_size: The number of samples in each batch.
"""
if x_train.ndim == 1:
x_train = x_train.reshape((-1, 1))
if y_train.ndim == 1:
y_train = y_train.reshape((-1, 1))
if self.do_x_scale:
x_scaled = self.scale_x_scaler.fit_transform(x_train)
else:
x_scaled = x_train
if self.do_y_scale:
y_scaled = self.scale_y_scaler.fit_transform(y_train)
else:
y_scaled = y_train
self.n_outputs = y_scaled.shape[1]
# operator設定
operators = []
for i in range(self.n_outputs):
operators.append(Operator({pauli_label(f"Z {i}"): 1.0}))
self.operator = operators
self.x_train = x_scaled
self.y_train = y_scaled
parameter_count = self.ansatz.learning_params_count
# set initial learning parameters
init_params = 2 * np.pi * np.random.random(parameter_count)
print(f"{init_params=}")
optimizer_state = self.optimizer.get_init_state(init_params)
c = 0
while maxiter > c:
cost_fn = lambda params: self.cost_fn(self.x_train, self.y_train, params)
grad_fn = lambda params: self.grad_fn(self.x_train, self.y_train, params)
optimizer_state = self.optimizer.step(optimizer_state, cost_fn, grad_fn)
print("\r", f"iter:{c}/{maxiter} cost:{optimizer_state.cost}", end="")
# print(f"{optimizer_state.cost=}")
# break
if optimizer_state.status == OptimizerStatus.CONVERGED:
break
if optimizer_state.status == OptimizerStatus.FAILED:
break
c += 1
print("")
self.trained_param = optimizer_state.params
print(f"{optimizer_state.cost=}")
[docs] def cost_fn(
self, x_scaled: NDArray[np.float64], y_scaled: NDArray[np.float64], params: Params
) -> float:
"""
Calculate the cost function for solver.
Parameters:
x_batched: Input data whose shape is (batch_size, n_features).
y_batched: Output data whose shape is (batch_size, n_outputs).
params: Parameters for the quantum circuit.
Returns:
cost: Cost function value.
"""
y_pred = self._predict_inner(x_scaled, params)
# Case of MSE
cost = mean_squared_error(y_scaled, y_pred)
return cost
[docs] def predict(self, x_test: NDArray[np.float64]) -> NDArray[np.float64]:
"""
Predict outcome for each input data in `x_test`.
Arguments:
x_test: Input data whose shape is (batch_size, n_features).
Returns:
y_pred: Predicted outcome.
"""
if self.trained_param is None:
raise ValueError("Model is not trained yet.")
if x_test.ndim == 1:
x_test = x_test.reshape((-1, 1))
if self.do_x_scale:
x_scaled = self.scale_x_scaler.transform(x_test)
else:
x_scaled = x_test
if self.do_y_scale:
y_pred: NDArray[np.float64] = self.scale_y_scaler.inverse_transform(
self._predict_inner(x_scaled, self.trained_param)
)
else:
y_pred = self._predict_inner(x_scaled, self.trained_param)
return y_pred
[docs] def grad_fn(
self, x_scaled: NDArray[np.float64], y_scaled: NDArray[np.float64], params: Params
) -> NDArray[np.float64]:
"""
Calculate the gradient of the cost function for solver.
Parameters:
x_batched: Input data whose shape is (batch_size, n_features).
y_batched: Output data whose shape is (batch_size, n_outputs).
params: Parameters for the quantum circuit.
Returns:
grads: Gradient of the cost function.
"""
# for MSE
y_pred = self._predict_inner(x_scaled, params)
y_pred_grads = self._estimate_grad(x_scaled, params)
grads = np.zeros(len(self.ansatz.get_learning_param_indexes()))
diff = y_pred - y_scaled
for i in range(len(diff)):
# (self.n_outputs, params)
grad: np.ndarray = 2 * diff[i][:, np.newaxis] * y_pred_grads[i, :, :]
# (params)
grad = grad.mean(axis=0)
grads += grad
grads /= len(diff)
return grads
def _estimate_grad(self, x_scaled: NDArray[np.float64], params: Params) -> NDArray[np.float64]:
"""
Estimate the gradient of the cost function.
Parameters:
x_scaled: Input data whose shape is (batch_size, n_features).
params: Parameters for the quantum circuit.
Returns:
grads: Gradients of the cost function.
"""
learning_params_indexes = self.ansatz.get_learning_param_indexes()
grads = []
for x in x_scaled:
circuit_params = self.ansatz.generate_bound_params(x, params)
circuit = quantum_state(n_qubits=self.n_qubit, circuit=self.ansatz.circuit)
grad = np.zeros((self.n_outputs, len(learning_params_indexes)), dtype=np.float64)
# obsのi qubitにx[i]が対応
for i, operator in enumerate(self.operator):
# concurrentにgradientを計算
estimate = self.gradient_estimator(operator, circuit, circuit_params)
_grad: NDArray[np.complex64] = np.array(estimate.values)[learning_params_indexes]
grad[i, :] = _grad.real
grads.append(grad)
# return grads / len(x_scaled)
return np.asarray(grads)
def _predict_inner(self, x_scaled: NDArray[np.float64], params: Params) -> NDArray[np.float64]:
"""
Predict inner function.
Parameters:
x_scaled: Input data whose shape is (batch_size, n_features).
params: Parameters for the quantum circuit.
Returns:
res: Predicted outcome.
"""
circuit_states: List[QulacsStateT] = []
for x in x_scaled:
circuit_params = self.ansatz.generate_bound_params(x, params)
# Classifier参照
param_circuit_state: ParametricCircuitQuantumState = quantum_state( # type: ignore
n_qubits=self.n_qubit, circuit=self.ansatz.circuit
)
circuit_state = param_circuit_state.bind_parameters(circuit_params)
circuit_states.append(circuit_state)
res = np.zeros((len(circuit_states), self.n_outputs), dtype=np.float64)
for i, operator in enumerate(self.operator):
# Operatorが1じゃない時は,stateの数と,operatorの数が一致しないといけない
estimates = self.estimator([operator], circuit_states)
res[:, i] = np.array([e.value.real for e in estimates])
res *= self.y_exp_ratio
return res