# mypy: ignore-errors
from collections.abc import Sequence
from dataclasses import dataclass, field
import numpy as np
from numpy.typing import NDArray
from quri_parts.algo.optimizer import Optimizer, Params, Adam
from quri_parts.core.estimator import (
ConcurrentQuantumEstimator,
Estimatable,
GradientEstimator,
)
from quri_parts.core.estimator.gradient import _ParametricStateT
from quri_parts.core.state import ParametricCircuitQuantumState, quantum_state
from quri_parts.algo.optimizer import OptimizerStatus
from quri_parts.qulacs import QulacsStateT
from scikit_quri.circuit import LearningCircuit
from scikit_quri.backend import BaseEstimator
from typing import List, Optional, Dict, Tuple
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import log_loss
from quri_parts.core.operator import Operator, pauli_label
from quri_parts.circuit import ParametricQuantumCircuitProtocol
from typing_extensions import TypeAlias
EstimatorType: TypeAlias = ConcurrentQuantumEstimator[QulacsStateT]
GradientEstimatorType: TypeAlias = GradientEstimator[_ParametricStateT]
[docs]@dataclass
class QNNClassifier:
"""Class to solve classification problems by quantum neural networks.
The prediction is made by making a vector which predicts one-hot encoding of labels.
The prediction is made by
1. taking expectation values of Pauli Z operator of each qubit ``<Z_i>``,
2. taking softmax function of the vector (``<Z_0>, <Z_1>, ..., <Z_{n-1}>``).
Args:
ansatz: Circuit to use in the learning.
num_class: The number of classes; the number of qubits to measure. must be n_qubits >= num_class .
estimator: Estimator to use. It must be a concurrent estimator.
gradient_estimator: Gradient estimator to use.
optimizer: Solver to use. use :py:class:`~quri_parts.algo.optimizer.Adam` or :py:class:`~quri_parts.algo.optimizer.LBFGS` method.
Example:
>>> from scikit_quri.qnn.classifier import QNNClassifier
>>> from scikit_quri.circuit import create_qcl_ansatz
>>> from quri_parts.core.estimator.gradient import (
>>> create_numerical_gradient_estimator,
>>> )
>>> from quri_parts.qulacs.estimator import (
>>> create_qulacs_vector_concurrent_estimator,
>>> create_qulacs_vector_concurrent_parametric_estimator,
>>> )
>>> from quri_parts.algo.optimizer import Adam
>>> num_class = 3
>>> nqubit = 5
>>> c_depth = 3
>>> time_step = 0.5
>>> circuit = create_qcl_ansatz(nqubit, c_depth, time_step, 0)
>>> adam = Adam()
>>> estimator = create_qulacs_vector_concurrent_estimator()
>>> gradient_estimator = create_numerical_gradient_estimator(
>>> create_qulacs_vector_concurrent_parametric_estimator(), delta=1e-10
>>> )
>>> qnn = QNNClassifier(circuit, num_class, estimator, gradient_estimator, adam)
>>> qnn.fit(x_train, y_train, maxiter)
>>> y_pred = qnn.predict(x_test).argmax(axis=1)
"""
ansatz: LearningCircuit
num_class: int
estimator: BaseEstimator
gradient_estimator: GradientEstimatorType
optimizer: Optimizer
operator: List[Estimatable] = field(default_factory=list)
x_norm_range: float = field(default=1.0)
y_norm_range: float = field(default=0.7)
do_x_scale: bool = field(default=True)
do_y_scale: bool = field(default=True)
n_outputs: int = field(default=1)
y_exp_ratio: float = field(default=2.2)
trained_param: Optional[Params] = field(default=None)
n_qubit: int = field(init=False)
predict_inner_cache: Dict[Tuple[bytes, bytes], NDArray[np.float64]] = field(
default_factory=dict
)
def __post_init__(self) -> None:
if not issubclass(type(self.estimator), BaseEstimator):
raise TypeError("estimator must be a subclass of BaseEstimator")
self.n_qubit = self.ansatz.n_qubits
if self.do_x_scale:
self.scale_x_scaler = MinMaxScaler(
feature_range=(-self.x_norm_range, self.x_norm_range) # type: ignore
)
def _softmax(self, x: NDArray[np.float64], axis=None) -> NDArray[np.float64]:
x_max = np.amax(x, axis=axis, keepdims=True)
exp_x_shifted = np.exp(x - x_max)
return exp_x_shifted / np.sum(exp_x_shifted, axis=axis, keepdims=True)
def _cost(
self,
x_train: NDArray[np.float64],
y_train: NDArray[np.int64],
params: NDArray[np.float64],
):
if x_train.ndim == 1:
x_train = x_train.reshape(-1, 1)
if self.do_x_scale:
x_scaled = self.scale_x_scaler.fit_transform(x_train)
else:
x_scaled = x_train
cost = self.cost_func(x_scaled, y_train, params)
return cost
[docs] def fit(
self,
x_train: NDArray[np.float64],
y_train: NDArray[np.int64],
maxiter: int = 100,
):
"""
Args:
x_train: List of training data inputs whose shape is (n_samples, n_features).
y_train: List of labels to fit. Labels must be represented as integers. Shape is (n_samples,).
maxiter: The number of maximum iterations for the optimizer.
Returns:
None
"""
if x_train.ndim == 1:
x_train = x_train.reshape(-1, 1)
if self.do_x_scale:
x_scaled = self.scale_x_scaler.fit_transform(x_train)
else:
x_scaled = x_train
# operator設定
operators = []
for i in range(self.num_class):
operators.append(Operator({pauli_label(f"Z {i}"): 1.0}))
self.operator = operators
parameter_count = self.ansatz.learning_params_count
if self.trained_param is None:
init_params = 2 * np.pi * np.random.random(parameter_count)
else:
init_params = self.trained_param
# print(f"{init_params=}")
optimizer_state = self.optimizer.get_init_state(init_params)
cost_func = lambda params: self.cost_func(x_scaled, y_train, params)
# cost_func = partial(self.cost_func, x_scaled=x_scaled, y_train=y_train)
grad_func = lambda params: self.cost_func_grad(x_scaled, y_train, params)
# grad_func = partial(self.cost_func_grad, x_scaled=x_scaled, y_train=y_train)
c = 0
while maxiter > c:
optimizer_state = self.optimizer.step(optimizer_state, cost_func, grad_func)
print("\r", f"iter:{c}/{maxiter} cost:{optimizer_state.cost=}", end="")
if optimizer_state.status == OptimizerStatus.CONVERGED:
break
if optimizer_state.status == OptimizerStatus.FAILED:
break
c += 1
print("")
self.trained_param = optimizer_state.params
[docs] def predict(self, x_test: NDArray[np.float64]) -> NDArray[np.float64]:
"""Predict outcome for each input data in ``x_test``. This method returns the predicted outcome as a vector of probabilities for each class.
Args:
x_test: Input data whose shape is ``(n_samples, n_features)``.
Returns:
y_pred: Predicted outcome whose shape is ``(n_samples, num_class)``.
"""
if self.trained_param is None:
raise ValueError("Model is not trained.")
if x_test.ndim == 1:
x_test = x_test.reshape(-1, 1)
if self.do_x_scale:
x_scaled = self.scale_x_scaler.transform(x_test)
else:
x_scaled = x_test
y_pred = self._predict_inner(x_scaled, self.trained_param) # .argmax(axis=1)
return y_pred
def _predict_inner(
self, x_scaled: NDArray[np.float64], params: NDArray[np.float64]
) -> NDArray[np.float64]:
"""
Predict inner function.
Parameters:
x_scaled: Input data whose shape is (batch_size, n_features).
params: Parameters for the quantum circuit.
Returns:
res: Predicted outcome.
"""
key = (x_scaled.tobytes(), params.tobytes())
cache = self.predict_inner_cache.get(key)
if cache is not None:
# print("cache hit")
return cache
res = np.zeros((len(x_scaled), self.num_class))
circuit_states = []
# 入力ごとのcircuit_state生成
for x in x_scaled:
circuit_params = self.ansatz.generate_bound_params(x, params)
circuit: ParametricQuantumCircuitProtocol = self.ansatz.circuit
# !overrideがやばすぎてType Annotationが通らない
param_circuit_state: ParametricCircuitQuantumState = quantum_state( # type: ignore
n_qubits=self.n_qubit, circuit=circuit
)
circuit_state = param_circuit_state.bind_parameters(circuit_params)
circuit_states.append(circuit_state)
for i in range(self.num_class):
# print("\r", f"pred_inner:{i}/{self.num_class}", end="")
op = self.operator[i]
estimates = self.estimator.estimate([op], circuit_states)
estimates = [e.value.real * self.y_exp_ratio for e in estimates]
res[:, i] = estimates.copy()
self.predict_inner_cache[(x_scaled.tobytes(), params.tobytes())] = res
return res
[docs] def cost_func(
self,
x_scaled: NDArray[np.float64],
y_train: NDArray[np.int64],
params: NDArray[np.float64],
) -> float:
y_pred = self._predict_inner(x_scaled, params)
# Case of log_logg
# softmax
y_pred_sm = self._softmax(y_pred, axis=1)
loss = float(log_loss(y_train, y_pred_sm))
# print(f"{params[:4]=}")
return loss
[docs] def cost_func_grad(
self, x_scaled: NDArray[np.float64], y_train: NDArray[np.int64], params: Params
) -> NDArray[np.float64]:
# start = time.perf_counter()
y_pred = self._predict_inner(x_scaled, params)
y_pred_sm = self._softmax(y_pred, axis=1)
raw_grads = self._estimate_grad(x_scaled, params)
# print(f"{raw_grads.shape=}")
grads = np.zeros(self.ansatz.learning_params_count)
# print(f"{grads.shape=}")
# print(f"{raw_grads=}")
for sample_index in range(len(x_scaled)):
for current_class in range(self.num_class):
expected = 1.0 if current_class == y_train[sample_index] else 0.0
coef = self.y_exp_ratio * (-expected + y_pred_sm[sample_index][current_class])
grads += coef * raw_grads[sample_index][current_class]
grads /= len(x_scaled)
# print(f"{time.perf_counter()-start=}")
return grads
def _estimate_grad(
self, x_scaled: NDArray[np.float64], params: NDArray[np.float64]
) -> NDArray[np.float64]:
grads = []
# learning_param_indexes = self.ansatz.get_learning_param_indexes()
learning_param_indexes = self.ansatz.get_minimum_learning_param_indexes()
for x in x_scaled:
_grads = []
for op in self.operator:
circuit_params = self.ansatz.generate_bound_params(x, params)
param_state = quantum_state(n_qubits=self.n_qubit, circuit=self.ansatz.circuit)
estimate = self.gradient_estimator(op, param_state, circuit_params)
# input用のparamsを取り除く
grad = np.array(estimate.values)[learning_param_indexes]
_grads.append([g.real for g in grad])
grads.append(_grads)
return np.asarray(grads)