Files
mcp-quantum/python/cudaq_bridge.py
2025-10-08 04:03:23 +02:00

573 líneas
21 KiB
Python

"""
CUDA Quantum Python Bridge
Provides a unified interface for Node.js to interact with CUDA Quantum functionality
"""
import json
import sys
import traceback
import numpy as np
from typing import Dict, List, Any, Optional, Union, Tuple
import logging
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
try:
import cudaq
from cudaq import spin
CUDAQ_AVAILABLE = True
print("INFO - CUDA Quantum successfully imported", flush=True)
except ImportError as e:
CUDAQ_AVAILABLE = False
print(f"WARNING - CUDA Quantum not available: {e}", flush=True)
print("WARNING - Running in mock mode - install CUDA Quantum for full functionality: pip install cudaq", flush=True)
class QuantumKernelManager:
"""Manages quantum kernels and their execution"""
def __init__(self):
self.kernels: Dict[str, Any] = {}
self.kernel_metadata: Dict[str, Dict] = {}
def create_kernel(self, name: str, num_qubits: int, parameters: Optional[List[Dict]] = None) -> Dict:
"""Create a new quantum kernel"""
if not CUDAQ_AVAILABLE:
# Mock mode - store kernel metadata without actual CUDA Quantum
self.kernel_metadata[name] = {
"num_qubits": num_qubits,
"parameters": parameters or [],
"operations": []
}
return {"success": True, "kernel_name": name, "mode": "mock"}
try:
# Create kernel dynamically
kernel_code = self._generate_kernel_code(name, num_qubits, parameters or [])
exec(kernel_code, globals())
self.kernels[name] = globals()[name]
self.kernel_metadata[name] = {
"num_qubits": num_qubits,
"parameters": parameters or [],
"operations": []
}
return {"success": True, "kernel_name": name}
except Exception as e:
logger.error(f"Error creating kernel {name}: {e}")
return {"error": str(e), "traceback": traceback.format_exc()}
def _generate_kernel_code(self, name: str, num_qubits: int, parameters: List[Dict]) -> str:
"""Generate CUDA Quantum kernel code dynamically"""
param_str = ""
if parameters:
param_types = {
"int": "int",
"float": "float",
"complex": "complex",
"list[int]": "list[int]",
"list[float]": "list[float]",
"list[complex]": "list[complex]"
}
param_list = []
for p in parameters:
param_list.append(f"{p['name']}: {param_types.get(p['type'], 'float')}")
param_str = ", " + ", ".join(param_list)
code = f'''
@cudaq.kernel
def {name}({param_str.lstrip(', ')}):
"""Dynamically generated quantum kernel"""
qubits = cudaq.qvector({num_qubits})
# Placeholder - operations will be added dynamically
pass
'''
return code
def apply_gate(self, kernel_name: str, gate_name: str, qubits: List[int],
parameters: Optional[List[float]] = None,
controls: Optional[List[int]] = None,
adjoint: bool = False) -> Dict:
"""Apply a quantum gate to a kernel"""
if not CUDAQ_AVAILABLE:
return {"error": "CUDA Quantum not available"}
if kernel_name not in self.kernel_metadata:
return {"error": f"Kernel {kernel_name} not found"}
try:
operation = {
"gate": gate_name,
"qubits": qubits,
"parameters": parameters,
"controls": controls,
"adjoint": adjoint
}
self.kernel_metadata[kernel_name]["operations"].append(operation)
# Rebuild kernel with new operations
self._rebuild_kernel(kernel_name)
return {"success": True}
except Exception as e:
logger.error(f"Error applying gate {gate_name} to {kernel_name}: {e}")
return {"error": str(e), "traceback": traceback.format_exc()}
def _rebuild_kernel(self, kernel_name: str):
"""Rebuild kernel with accumulated operations"""
metadata = self.kernel_metadata[kernel_name]
num_qubits = metadata["num_qubits"]
parameters = metadata["parameters"]
operations = metadata["operations"]
# Generate parameter string
param_str = ""
if parameters:
param_types = {
"int": "int", "float": "float", "complex": "complex",
"list[int]": "list[int]", "list[float]": "list[float]",
"list[complex]": "list[complex]"
}
param_list = []
for p in parameters:
param_list.append(f"{p['name']}: {param_types.get(p['type'], 'float')}")
param_str = ", " + ", ".join(param_list)
# Generate operations code
ops_code = []
for op in operations:
gate_code = self._generate_gate_code(op)
if gate_code:
ops_code.append(f" {gate_code}")
code = f'''
@cudaq.kernel
def {kernel_name}({param_str.lstrip(', ')}):
"""Dynamically generated quantum kernel with operations"""
qubits = cudaq.qvector({num_qubits})
{chr(10).join(ops_code) if ops_code else " pass"}
'''
# Execute and register new kernel
exec(code, globals())
self.kernels[kernel_name] = globals()[kernel_name]
def _generate_gate_code(self, operation: Dict) -> str:
"""Generate code for a single gate operation"""
gate = operation["gate"]
qubits = operation["qubits"]
parameters = operation.get("parameters", [])
controls = operation.get("controls", [])
adjoint = operation.get("adjoint", False)
if len(qubits) == 1:
target = f"qubits[{qubits[0]}]"
else:
target = f"[{', '.join([f'qubits[{q}]' for q in qubits])}]"
# Handle parameterized gates
if parameters:
if len(parameters) == 1:
gate_call = f"{gate}({parameters[0]}, {target})"
else:
params_str = ", ".join(map(str, parameters))
gate_call = f"{gate}({params_str}, {target})"
else:
gate_call = f"{gate}({target})"
# Handle controlled gates
if controls:
if len(controls) == 1:
gate_call = f"{gate}.ctrl(qubits[{controls[0]}], {target})"
else:
ctrl_str = "[" + ", ".join([f"qubits[{c}]" for c in controls]) + "]"
gate_call = f"{gate}.ctrl({ctrl_str}, {target})"
# Handle adjoint
if adjoint:
gate_call = gate_call.replace(gate, f"{gate}.adj")
return gate_call
class QuantumExecutor:
"""Handles execution of quantum kernels"""
def __init__(self, kernel_manager: QuantumKernelManager):
self.kernel_manager = kernel_manager
def set_target(self, target_name: str, **kwargs) -> Dict:
"""Set the quantum computing target/backend"""
if not CUDAQ_AVAILABLE:
return {"success": True, "target": target_name, "mode": "mock"}
try:
# Get available targets first
available_targets = cudaq.get_targets()
# Validate target exists
if target_name not in available_targets:
# Try common fallbacks
fallback_targets = ['qpp-cpu', 'density-matrix-cpu', 'default']
for fallback in fallback_targets:
if fallback in available_targets:
print(f"WARNING - Target {target_name} not available, using {fallback}", flush=True)
cudaq.set_target(fallback, **kwargs)
return {"success": True, "target": fallback, "original_target": target_name, "fallback": True}
# If no fallbacks work, return error but don't crash
return {"error": f"Invalid target name ({target_name}). Available targets: {available_targets}", "available_targets": available_targets}
cudaq.set_target(target_name, **kwargs)
return {"success": True, "target": target_name}
except Exception as e:
print(f"WARNING - Error setting target {target_name}: {e}", flush=True)
# Try to set a default target as fallback
try:
cudaq.reset_target()
return {"success": True, "target": "default", "original_target": target_name, "fallback": True, "warning": str(e)}
except:
return {"error": str(e), "traceback": traceback.format_exc()}
def sample(self, kernel_name: str, shots: int = 1000,
parameters: Optional[Dict] = None) -> Dict:
"""Sample measurement results from quantum kernel"""
if not CUDAQ_AVAILABLE:
# Mock mode - return simulated results
if kernel_name not in self.kernel_manager.kernel_metadata:
return {"error": f"Kernel {kernel_name} not found"}
import random
num_qubits = self.kernel_manager.kernel_metadata[kernel_name]["num_qubits"]
# Generate mock results for demonstration
mock_counts = {}
if num_qubits == 2: # Bell pair example
mock_counts = {"00": shots//2 + random.randint(-50, 50),
"11": shots//2 + random.randint(-50, 50)}
else:
# Random distribution
for i in range(min(4, 2**num_qubits)):
binary = format(i, f'0{num_qubits}b')
mock_counts[binary] = random.randint(shots//10, shots//3)
return {
"success": True,
"counts": mock_counts,
"shots": shots,
"total_counts": sum(mock_counts.values()),
"mode": "mock"
}
if kernel_name not in self.kernel_manager.kernels:
return {"error": f"Kernel {kernel_name} not found"}
try:
kernel = self.kernel_manager.kernels[kernel_name]
if parameters:
# Call with parameters
args = [parameters[p["name"]] for p in
self.kernel_manager.kernel_metadata[kernel_name]["parameters"]]
result = cudaq.sample(kernel, *args, shots_count=shots)
else:
result = cudaq.sample(kernel, shots_count=shots)
# Convert result to serializable format
counts = {}
for bits, count in result.items():
counts[str(bits)] = count
return {
"success": True,
"counts": counts,
"shots": shots,
"total_counts": sum(counts.values())
}
except Exception as e:
logger.error(f"Error sampling kernel {kernel_name}: {e}")
return {"error": str(e), "traceback": traceback.format_exc()}
def observe(self, kernel_name: str, hamiltonian_terms: List[Dict],
shots: int = 1000, parameters: Optional[Dict] = None) -> Dict:
"""Compute expectation value of Hamiltonian"""
if not CUDAQ_AVAILABLE:
return {"error": "CUDA Quantum not available"}
if kernel_name not in self.kernel_manager.kernels:
return {"error": f"Kernel {kernel_name} not found"}
try:
kernel = self.kernel_manager.kernels[kernel_name]
# Build Hamiltonian from terms
hamiltonian = self._build_hamiltonian(hamiltonian_terms)
if parameters:
args = [parameters[p["name"]] for p in
self.kernel_manager.kernel_metadata[kernel_name]["parameters"]]
result = cudaq.observe(kernel, hamiltonian, *args, shots_count=shots)
else:
result = cudaq.observe(kernel, hamiltonian, shots_count=shots)
return {
"success": True,
"expectation": result.expectation(),
"variance": result.variance() if hasattr(result, 'variance') else None,
"shots": shots
}
except Exception as e:
logger.error(f"Error observing kernel {kernel_name}: {e}")
return {"error": str(e), "traceback": traceback.format_exc()}
def get_state(self, kernel_name: str, parameters: Optional[Dict] = None) -> Dict:
"""Get quantum state vector"""
if not CUDAQ_AVAILABLE:
return {"error": "CUDA Quantum not available"}
if kernel_name not in self.kernel_manager.kernels:
return {"error": f"Kernel {kernel_name} not found"}
try:
kernel = self.kernel_manager.kernels[kernel_name]
if parameters:
args = [parameters[p["name"]] for p in
self.kernel_manager.kernel_metadata[kernel_name]["parameters"]]
state = cudaq.get_state(kernel, *args)
else:
state = cudaq.get_state(kernel)
# Convert state to serializable format
state_array = np.array(state)
state_list = []
for amplitude in state_array:
state_list.append({
"real": float(amplitude.real),
"imag": float(amplitude.imag)
})
return {
"success": True,
"state": state_list,
"dimension": len(state_list)
}
except Exception as e:
logger.error(f"Error getting state for kernel {kernel_name}: {e}")
return {"error": str(e), "traceback": traceback.format_exc()}
def _build_hamiltonian(self, terms: List[Dict]):
"""Build CUDA Quantum Hamiltonian from term list"""
h = None
for term in terms:
paulis = term["paulis"]
qubits = term["qubits"]
coeff_real = term["coefficient"]["real"]
coeff_imag = term["coefficient"]["imag"]
# Build Pauli string
pauli_term = None
for i, (pauli, qubit) in enumerate(zip(paulis, qubits)):
if pauli == 'I':
continue
elif pauli == 'X':
p = spin.x(qubit)
elif pauli == 'Y':
p = spin.y(qubit)
elif pauli == 'Z':
p = spin.z(qubit)
else:
raise ValueError(f"Invalid Pauli operator: {pauli}")
if pauli_term is None:
pauli_term = p
else:
pauli_term = pauli_term * p
if pauli_term is not None:
# Apply coefficient
if coeff_imag != 0:
coeff = complex(coeff_real, coeff_imag)
else:
coeff = coeff_real
term_contribution = coeff * pauli_term
if h is None:
h = term_contribution
else:
h = h + term_contribution
return h if h is not None else 0.0 * spin.i(0)
# Global instances
kernel_manager = QuantumKernelManager()
executor = QuantumExecutor(kernel_manager)
# Add missing methods to executor class
def get_target_info_standalone() -> Dict:
"""Get information about available targets"""
if not CUDAQ_AVAILABLE:
return {"target": "mock", "mode": "mock"}
try:
available_targets = cudaq.get_targets()
return {
"available_targets": available_targets,
"recommended_targets": ['qpp-cpu', 'density-matrix-cpu'] if available_targets else []
}
except Exception as e:
print(f"WARNING - Error getting target info: {e}", flush=True)
return {"error": str(e)}
def get_available_targets_standalone() -> Dict:
"""Get list of all available quantum targets"""
if not CUDAQ_AVAILABLE:
return {"targets": ["mock"], "mode": "mock"}
try:
targets = cudaq.get_targets()
return {"targets": targets, "count": len(targets)}
except Exception as e:
print(f"WARNING - Error getting available targets: {e}", flush=True)
return {"targets": [], "error": str(e)}
def dispatch_command(command: str, **kwargs) -> Dict:
"""Main dispatch function for Python bridge commands"""
try:
if command == "create_kernel":
return kernel_manager.create_kernel(
kwargs["name"],
kwargs["num_qubits"],
kwargs.get("parameters")
)
elif command == "apply_gate":
return kernel_manager.apply_gate(
kwargs["kernel_name"],
kwargs["gate_name"],
kwargs["qubits"],
kwargs.get("parameters"),
kwargs.get("controls"),
kwargs.get("adjoint", False)
)
elif command == "set_target":
return executor.set_target(
kwargs["target"], # This will be passed as target_name
**kwargs.get("configuration", {})
)
elif command == "sample":
return executor.sample(
kwargs["kernel_name"],
kwargs.get("shots", 1000),
kwargs.get("parameters")
)
elif command == "observe":
return executor.observe(
kwargs["kernel_name"],
kwargs["hamiltonian_terms"],
kwargs.get("shots", 1000),
kwargs.get("parameters")
)
elif command == "get_state":
return executor.get_state(
kwargs["kernel_name"],
kwargs.get("parameters")
)
elif command == "get_available_targets":
return get_available_targets_standalone()
elif command == "get_target_info":
return get_target_info_standalone()
elif command == "list_kernels":
return {
"success": True,
"kernels": list(kernel_manager.kernels.keys()),
"metadata": kernel_manager.kernel_metadata
}
elif command == "get_platform_info":
if not CUDAQ_AVAILABLE:
return {"error": "CUDA Quantum not available"}
try:
platform = cudaq.get_platform()
return {
"success": True,
"platform_name": platform.name(),
"num_qpus": platform.num_qpus(),
"is_simulator": platform.is_simulator(),
"is_remote": platform.is_remote()
}
except:
return {"success": True, "platform_name": "default"}
else:
return {"error": f"Unknown command: {command}"}
except Exception as e:
logger.error(f"Error executing command {command}: {e}")
return {"error": str(e), "traceback": traceback.format_exc()}
def main():
"""Main entry point for standalone execution"""
if len(sys.argv) > 1:
# Command line mode
command_json = sys.argv[1]
try:
command_data = json.loads(command_json)
result = dispatch_command(**command_data)
print(json.dumps(result))
except Exception as e:
print(json.dumps({"error": str(e), "traceback": traceback.format_exc()}))
else:
# Server mode - read from stdin for MCP communication
print("CUDA Quantum Python Bridge - Ready")
sys.stdout.flush()
try:
while True:
line = sys.stdin.readline()
if not line:
break
try:
command_data = json.loads(line.strip())
result = dispatch_command(**command_data)
print(json.dumps(result))
sys.stdout.flush()
except json.JSONDecodeError:
print(json.dumps({"error": "Invalid JSON"}))
sys.stdout.flush()
except Exception as e:
print(json.dumps({"error": str(e)}))
sys.stdout.flush()
except KeyboardInterrupt:
pass
if __name__ == "__main__":
main()