# mypy: ignore-errors
from functools import partial
from typing import List, Literal, Optional, Tuple
import numpy as np
from numpy.typing import NDArray
from qulacs import QuantumState as QulacsQuantumState
from quri_parts.algo.optimizer import Optimizer, OptimizerStatus
from quri_parts.core.operator import pauli_label
from quri_parts.core.state import QuantumState, quantum_state
from quri_parts.qulacs.simulator import evaluate_state_to_vector
from scikit_quri.circuit import LearningCircuit
[docs]class QNNGenerator:
"""
Class to generation problems by quantum neural networks.
This class is only working with a simulator. depending on qulacs.
It is not working with real quantum computers.
Args:
circuit: The learning circuit to be used.
solver: The optimizer to be used for training.
kernel_type: The type of kernel to be used. exp_hamming is not implemented yet.
gauss_sigma: The sigma value for Gaussian kernel.
fitting_qubit: The number of qubits to be used for fitting.
"""
def __init__(
self,
circuit: LearningCircuit,
solver: Optimizer,
kernel_type: Literal["gauss", "exp_hamming", "same"],
gauss_sigma: float,
fitting_qubit: int,
):
self.n_qubit = circuit.n_qubits
self.circuit = circuit
self.solver = solver
self.kernel_type = kernel_type
self.gauss_sigma = gauss_sigma
self.fitting_qubit = fitting_qubit
self.theta: None | List[float] = None
self.observables = [pauli_label(f"Z{i}") for i in range(self.n_qubit)]
[docs] def fit(self, train_data: NDArray[np.float64], maxiter: int = 100):
"""
Params:
train_data: The training data to be used for fitting.
maxiter: The maximum number of iterations for the optimizer. Default is 100.
"""
train_scaled = np.zeros(2**self.fitting_qubit)
for i in train_data:
train_scaled[i] += 1 / len(train_data)
return self.fit_direct_distribution(train_scaled, maxiter)
[docs] def fit_direct_distribution(
self, train_scaled: NDArray[np.float64], maxiter: int
) -> Tuple[float, List[float]]:
theta_init = 2 * np.pi * np.random.random(self.circuit.learning_params_count)
optimizer_state = self.solver.get_init_state(theta_init)
cost_func = partial(self.cost_func, train_scaled=train_scaled)
grad_func = partial(self._cost_func_grad, train_scaled=train_scaled)
c = 0
while maxiter > c:
optimizer_state = self.solver.step(optimizer_state, cost_func, grad_func)
print("\r", f"iter:{c}/{maxiter} cost:{optimizer_state.cost=}", end="")
if optimizer_state.status == OptimizerStatus.CONVERGED:
break
if optimizer_state.status == OptimizerStatus.FAILED:
break
c += 1
print("")
self.trained_param = optimizer_state.params
[docs] def predict(self) -> NDArray[np.float64]:
y_pred_in = evaluate_state_to_vector(self._predict_inner()).vector
y_pred_conj = y_pred_in.conjugate()
data_per: NDArray[np.float64] = np.abs(y_pred_in * y_pred_conj)
if self.n_qubit != self.fitting_qubit:
data_per = data_per.reshape(
(2 ** (self.n_qubit - self.fitting_qubit), 2**self.fitting_qubit)
)
data_per = data_per.sum(axis=0)
return data_per
def _predict_and_inner(self) -> Tuple[NDArray[np.float64], QuantumState]:
state = self._predict_inner()
y_pred_in = evaluate_state_to_vector(state).vector
y_pred_conj = y_pred_in.conjugate()
data_per = y_pred_in * y_pred_conj
if self.n_qubit != self.fitting_qubit:
data_per = data_per.reshape(
(2 ** (self.n_qubit - self.fitting_qubit), 2**self.fitting_qubit)
)
data_per = data_per.sum(axis=0)
return (data_per, state)
def _predict_inner(self) -> QuantumState:
circuit = self.circuit.bind_input_and_parameters(np.array([0]), np.array(self.theta))
state = quantum_state(n_qubits=self.n_qubit, circuit=circuit)
return state
[docs] def conving(self, data_diff: NDArray[np.float64]) -> NDArray[np.float64]:
# data_diffは、現在の分布ー正しい分布
# (data_diff) (カーネル行列) (data_diffの行ベクトル)を計算すると、cost_funcになる。
# ここでは、(data_diff) (カーネル行列) のベクトルを求める。
# つまり、確率差ベクトルにカーネル行列を掛ける。
if self.kernel_type == "gauss":
beta = -0.5 / self.gauss_sigma
width = int(4 * np.sqrt(self.gauss_sigma))
conv_len = width * 2 + 1
conv_target = np.zeros(conv_len)
for i in range(conv_len):
conv_target[i] = np.exp((i - width) ** 2 * beta)
if conv_len <= 2**self.fitting_qubit:
conv_diff = np.convolve(data_diff, conv_target, mode="same")
else:
conv_diff = np.convolve(data_diff, conv_target)[width:-width]
return conv_diff
# elif self.kernel_type == "exp_hamming":
# beta = -0.5 / self.gauss_sigma
# swap_pena = np.exp(beta)
# # ハミング距離の畳み込み演算をします。
# # これはバタフライ演算でできます。
# # バタフライ演算を行うのにqulacsを使います。
# # 注意!ユニタリ的な量子演算ではありません!
# # この演算をすることで高速にできる。
# # TODO 実機運用
# diff_state = QuantumState(self.fitting_qubit)
# diff_state
elif self.kernel_type == "same":
return data_diff
else:
raise NotImplementedError(f"Kernel type {self.kernel_type} is not implemented yet.")
[docs] def cost_func(
self,
theta: List[float],
train_scaled: NDArray[np.float64],
) -> NDArray[np.float64]:
self.theta = theta
data_diff = self.predict() - train_scaled
conv_diff = self.conving(data_diff)
cost = np.dot(data_diff, conv_diff)
return cost
def _cost_func_grad(
self,
theta: List[float],
train_scaled: NDArray[np.float64],
) -> NDArray[np.float64]:
self.theta = theta
(pre, prein) = self._predict_and_inner()
data_diff = pre - train_scaled
conv_diff = self.conving(data_diff)
convconv_diff = np.tile(conv_diff, 2 ** (self.n_qubit - self.fitting_qubit))
state_vec = evaluate_state_to_vector(prein).vector
ret = QulacsQuantumState(self.n_qubit)
ret.load(convconv_diff * state_vec * 4)
grad = np.array(self.circuit.backprop_innner_product([0], self.theta, ret))
return -grad