Source code for scikit_quri.backend.oqtopus_sampler

from concurrent.futures import ThreadPoolExecutor, as_completed

from quri_parts.circuit import NonParametricQuantumCircuit
from typing import Optional, Iterable
from quri_parts.core.sampling import MeasurementCounts
from quri_parts_oqtopus.backend import (
    OqtopusConfig,
    OqtopusSamplingBackend,
    OqtopusDeviceBackend,
)


[docs]class OqtopusSampler: def __init__(self, device_id: str, config: Optional[OqtopusConfig]) -> None: self.backend = OqtopusSamplingBackend(config) self.device_id = device_id self.config = config self.device_backend = OqtopusDeviceBackend(config)
[docs] def get_device_qubit_count(self) -> int: """Oqtopusのデバイスの量子ビット数を取得する関数 Returns: int: Oqtopusのデバイスの量子ビット数 Raises: BackendError: Oqtopusでの実行に失敗した場合 """ device_info = self.device_backend.get_device(self.device_id) return device_info.n_qubits
[docs] def sample(self, circuit: NonParametricQuantumCircuit, shots: int) -> MeasurementCounts: """ Raises: BackendError: Oqtopusでの実行に失敗した場合 """ result = self.backend.sample(circuit, device_id=self.device_id, shots=shots).result() return result.counts
[docs] def concurrent_sample( self, circuit_shots_tuples: Iterable[tuple[NonParametricQuantumCircuit, int]] ) -> Iterable[MeasurementCounts]: """ concurrentにsampleする関数 quri-partsのConcurrentSamplerに合わせるため、shotsはtupleの最大値を使用する devicesの量子ビット数を超える量子回路の組み合わせに対しては、複数回に分けて実行する Raises: BackendError: Oqtopusでの実行に失敗した場合 """ circuit_shots_list = list(circuit_shots_tuples) max_shots = max(shots for _, shots in circuit_shots_list) circuits = [circuit for circuit, _ in circuit_shots_list] # batched_circuits = [[]] # device_qubits = self.get_device_qubit_count() # prefix_qubits = 0 # for circuit in circuits: # prefix_qubits += circuit.qubit_count # if prefix_qubits > device_qubits: # prefix_qubits = circuit.qubit_count # batched_circuits.append([circuit]) # else: # batched_circuits[-1].append(circuit) #! qiskitのtranspilerで物理layoutを考慮してしまうので、combineすると相互作用でoverlapが1.0じゃなくなる #! qulacsのsimでも物理layoutが考慮されているっぽい? batched_circuits = [[c] for c in circuits] counts = [] transpiler_info = { "transpiler_lib": "qiskit", "transpiler_options": { "optimization_level": 2, }, } def submit_sample( batched_circuits: list[NonParametricQuantumCircuit], ) -> dict[int, MeasurementCounts]: req = self.backend.sample( batched_circuits, device_id=self.device_id, shots=max_shots, transpiler_info=transpiler_info, ).result() if req.divided_counts is None: raise RuntimeError("Expected divided_counts for list of circuits.") return req.divided_counts batched_result: list[dict[int, MeasurementCounts]] = [] # SamplerをThreadPoolExecutorで並列化して実行 with ThreadPoolExecutor() as executor: futures = [executor.submit(submit_sample, batch) for batch in batched_circuits] c = 0 for _ in as_completed(futures): c += 1 print("\r", f"succeeded: {c}/{len(batched_circuits)}", end="") batched_result = [future.result() for future in futures] print() for divided_counts in batched_result: # dictのkey(int)でsortしてから格納 counts.extend(v for _, v in sorted(divided_counts.items())) return counts
[docs]def create_oqtopus_sampler(device_id: str, config: Optional[OqtopusConfig] = None): """Oqtopus用のSamplerを生成する関数 quri-partsのSamplerとして動作 Returns: Sampler: Oqtopus用のSampler """ oqtopusSampler = OqtopusSampler(device_id, config) return oqtopusSampler.sample
[docs]def create_oqtopus_concurrent_sampler(device_id: str, config: Optional[OqtopusConfig] = None): """Oqtopus用のConcurrentSamplerを生成する関数 quri-partsのConcurrentSamplerとして動作 Returns: ConcurrentSampler: Oqtopus用のConcurrentSampler """ oqtopusSampler = OqtopusSampler(device_id, config) return oqtopusSampler.concurrent_sample