Signed-off-by: ale <ale@manalejandro.com>
Este commit está contenido en:
ale
2025-10-08 04:32:27 +02:00
padre 10e188839a
commit 15caa3e4bd
Se han modificado 4 ficheros con 1238 adiciones y 314 borrados

Ver fichero

@@ -1,5 +1,5 @@
"""
CUDA Quantum Python Bridge
CUDA Quantum Python Bridge - Production Version
Provides a unified interface for Node.js to interact with CUDA Quantum functionality
"""
@@ -22,7 +22,51 @@ try:
except ImportError as e:
CUDAQ_AVAILABLE = False
print(f"WARNING - CUDA Quantum not available: {e}", flush=True)
print("WARNING - Running in mock mode - install CUDA Quantum for full functionality: pip install cudaq", flush=True)
print("WARNING - Running in development mode without CUDA Quantum", flush=True)
# Create mock cudaq module for development
class MockCudaq:
@staticmethod
def kernel(func):
return func
@staticmethod
def get_targets():
return ['mock-cpu', 'mock-gpu']
@staticmethod
def set_target(target, **kwargs):
pass
@staticmethod
def sample(kernel, *args, **kwargs):
# Mock sampling results
return {'00': 500, '11': 500}
@staticmethod
def observe(kernel, hamiltonian, *args, **kwargs):
# Mock observation result
class MockResult:
def expectation(self):
return 0.0
return MockResult()
@staticmethod
def get_state(kernel, *args):
# Mock state vector
return [1.0+0j, 0.0+0j]
@staticmethod
def qview():
pass
class MockSpin:
@staticmethod
def z(index):
return f"Z{index}"
cudaq = MockCudaq()
spin = MockSpin()
class QuantumKernelManager:
@@ -35,7 +79,7 @@ class QuantumKernelManager:
def create_kernel(self, name: str, num_qubits: int, parameters: Optional[List[Dict]] = None) -> Dict:
"""Create a new quantum kernel"""
if not CUDAQ_AVAILABLE:
# Mock mode - store kernel metadata without actual CUDA Quantum
# Mock mode - store kernel metadata
self.kernel_metadata[name] = {
"num_qubits": num_qubits,
"parameters": parameters or [],
@@ -66,140 +110,56 @@ class QuantumKernelManager:
param_str = ""
if parameters:
param_types = {
"int": "int",
"float": "float",
"float": "float",
"int": "int",
"complex": "complex",
"list[int]": "list[int]",
"list[float]": "list[float]",
"list[complex]": "list[complex]"
"angle": "float"
}
param_list = []
for p in parameters:
param_list.append(f"{p['name']}: {param_types.get(p['type'], 'float')}")
param_str = ", " + ", ".join(param_list)
params = []
for param in parameters:
param_type = param_types.get(param.get("type", "float"), "float")
params.append(f"{param['name']}: {param_type}")
param_str = ", " + ", ".join(params)
code = f'''
kernel_code = f"""
@cudaq.kernel
def {name}({param_str.lstrip(', ')}):
"""Dynamically generated quantum kernel"""
qubits = cudaq.qvector({num_qubits})
# Placeholder - operations will be added dynamically
def {name}(qubits: cudaq.qview{param_str}):
pass
'''
return code
"""
return kernel_code
def apply_gate(self, kernel_name: str, gate_name: str, qubits: List[int],
parameters: Optional[List[float]] = None,
controls: Optional[List[int]] = None,
def apply_gate(self, kernel_name: str, gate_name: str,
target_qubits: List[int], control_qubits: Optional[List[int]] = None,
parameters: Optional[List[float]] = None,
adjoint: bool = False) -> Dict:
"""Apply a quantum gate to a kernel"""
if not CUDAQ_AVAILABLE:
return {"error": "CUDA Quantum not available"}
if kernel_name not in self.kernel_metadata:
return {"error": f"Kernel {kernel_name} not found"}
try:
operation = {
"gate": gate_name,
"qubits": qubits,
"parameters": parameters,
"controls": controls,
"targets": target_qubits,
"controls": control_qubits or [],
"parameters": parameters or [],
"adjoint": adjoint
}
self.kernel_metadata[kernel_name]["operations"].append(operation)
# Rebuild kernel with new operations
self._rebuild_kernel(kernel_name)
return {"success": True}
return {"success": True, "operation": operation}
except Exception as e:
logger.error(f"Error applying gate {gate_name} to {kernel_name}: {e}")
logger.error(f"Error applying gate {gate_name} to kernel {kernel_name}: {e}")
return {"error": str(e), "traceback": traceback.format_exc()}
def _rebuild_kernel(self, kernel_name: str):
"""Rebuild kernel with accumulated operations"""
metadata = self.kernel_metadata[kernel_name]
num_qubits = metadata["num_qubits"]
parameters = metadata["parameters"]
operations = metadata["operations"]
# Generate parameter string
param_str = ""
if parameters:
param_types = {
"int": "int", "float": "float", "complex": "complex",
"list[int]": "list[int]", "list[float]": "list[float]",
"list[complex]": "list[complex]"
}
param_list = []
for p in parameters:
param_list.append(f"{p['name']}: {param_types.get(p['type'], 'float')}")
param_str = ", " + ", ".join(param_list)
# Generate operations code
ops_code = []
for op in operations:
gate_code = self._generate_gate_code(op)
if gate_code:
ops_code.append(f" {gate_code}")
code = f'''
@cudaq.kernel
def {kernel_name}({param_str.lstrip(', ')}):
"""Dynamically generated quantum kernel with operations"""
qubits = cudaq.qvector({num_qubits})
{chr(10).join(ops_code) if ops_code else " pass"}
'''
# Execute and register new kernel
exec(code, globals())
self.kernels[kernel_name] = globals()[kernel_name]
def _generate_gate_code(self, operation: Dict) -> str:
"""Generate code for a single gate operation"""
gate = operation["gate"]
qubits = operation["qubits"]
parameters = operation.get("parameters", [])
controls = operation.get("controls", [])
adjoint = operation.get("adjoint", False)
if len(qubits) == 1:
target = f"qubits[{qubits[0]}]"
else:
target = f"[{', '.join([f'qubits[{q}]' for q in qubits])}]"
# Handle parameterized gates
if parameters:
if len(parameters) == 1:
gate_call = f"{gate}({parameters[0]}, {target})"
else:
params_str = ", ".join(map(str, parameters))
gate_call = f"{gate}({params_str}, {target})"
else:
gate_call = f"{gate}({target})"
# Handle controlled gates
if controls:
if len(controls) == 1:
gate_call = f"{gate}.ctrl(qubits[{controls[0]}], {target})"
else:
ctrl_str = "[" + ", ".join([f"qubits[{c}]" for c in controls]) + "]"
gate_call = f"{gate}.ctrl({ctrl_str}, {target})"
# Handle adjoint
if adjoint:
gate_call = gate_call.replace(gate, f"{gate}.adj")
return gate_call
class QuantumExecutor:
"""Handles execution of quantum kernels"""
"""Handles quantum circuit execution and measurement"""
def __init__(self, kernel_manager: QuantumKernelManager):
self.kernel_manager = kernel_manager
def __init__(self):
self.current_target = None
def set_target(self, target_name: str, **kwargs) -> Dict:
"""Set the quantum computing target/backend"""
@@ -218,109 +178,100 @@ class QuantumExecutor:
if fallback in available_targets:
print(f"WARNING - Target {target_name} not available, using {fallback}", flush=True)
cudaq.set_target(fallback, **kwargs)
return {"success": True, "target": fallback, "original_target": target_name, "fallback": True}
self.current_target = fallback
return {"success": True, "target": fallback, "fallback": True}
# If no fallbacks work, return error but don't crash
return {"error": f"Invalid target name ({target_name}). Available targets: {available_targets}", "available_targets": available_targets}
return {"error": f"Invalid target name ({target_name})", "available_targets": available_targets}
# Set the target
cudaq.set_target(target_name, **kwargs)
self.current_target = target_name
return {"success": True, "target": target_name}
except Exception as e:
print(f"WARNING - Error setting target {target_name}: {e}", flush=True)
# Try to set a default target as fallback
try:
cudaq.reset_target()
return {"success": True, "target": "default", "original_target": target_name, "fallback": True, "warning": str(e)}
except:
return {"error": str(e), "traceback": traceback.format_exc()}
pass
logger.error(f"Error setting target {target_name}: {e}")
return {"error": str(e), "traceback": traceback.format_exc()}
def sample(self, kernel_name: str, shots: int = 1000,
parameters: Optional[Dict] = None) -> Dict:
"""Sample measurement results from quantum kernel"""
if not CUDAQ_AVAILABLE:
# Mock mode - return simulated results
if kernel_name not in self.kernel_manager.kernel_metadata:
return {"error": f"Kernel {kernel_name} not found"}
import random
num_qubits = self.kernel_manager.kernel_metadata[kernel_name]["num_qubits"]
# Generate mock results for demonstration
mock_counts = {}
if num_qubits == 2: # Bell pair example
mock_counts = {"00": shots//2 + random.randint(-50, 50),
"11": shots//2 + random.randint(-50, 50)}
else:
# Random distribution
for i in range(min(4, 2**num_qubits)):
binary = format(i, f'0{num_qubits}b')
mock_counts[binary] = random.randint(shots//10, shots//3)
return {
"success": True,
"counts": mock_counts,
"shots": shots,
"total_counts": sum(mock_counts.values()),
"mode": "mock"
}
if kernel_name not in self.kernel_manager.kernels:
"""Execute quantum circuit and sample measurement results"""
if kernel_name not in kernel_manager.kernels:
return {"error": f"Kernel {kernel_name} not found"}
try:
kernel = self.kernel_manager.kernels[kernel_name]
kernel = kernel_manager.kernels[kernel_name]
# Prepare parameters
args = []
if parameters:
# Call with parameters
args = [parameters[p["name"]] for p in
self.kernel_manager.kernel_metadata[kernel_name]["parameters"]]
# Add parameters in the order they were defined
metadata = kernel_manager.kernel_metadata[kernel_name]
for param in metadata.get("parameters", []):
if param["name"] in parameters:
args.append(parameters[param["name"]])
# Execute with sampling
if args:
result = cudaq.sample(kernel, *args, shots_count=shots)
else:
result = cudaq.sample(kernel, shots_count=shots)
# Convert result to serializable format
# Convert results to serializable format
counts = {}
for bits, count in result.items():
counts[str(bits)] = count
for bitstring, count in result.items():
counts[str(bitstring)] = int(count)
return {
"success": True,
"counts": counts,
"shots": shots,
"total_counts": sum(counts.values())
"most_probable": result.most_probable()
}
except Exception as e:
logger.error(f"Error sampling kernel {kernel_name}: {e}")
return {"error": str(e), "traceback": traceback.format_exc()}
def observe(self, kernel_name: str, hamiltonian_terms: List[Dict],
def observe(self, kernel_name: str, hamiltonian_terms: List[Dict],
shots: int = 1000, parameters: Optional[Dict] = None) -> Dict:
"""Compute expectation value of Hamiltonian"""
if not CUDAQ_AVAILABLE:
return {"error": "CUDA Quantum not available"}
if kernel_name not in self.kernel_manager.kernels:
"""Calculate expectation value of Hamiltonian"""
if kernel_name not in kernel_manager.kernels:
return {"error": f"Kernel {kernel_name} not found"}
try:
kernel = self.kernel_manager.kernels[kernel_name]
kernel = kernel_manager.kernels[kernel_name]
# Build Hamiltonian from terms
hamiltonian = self._build_hamiltonian(hamiltonian_terms)
hamiltonian = 0.0
for term in hamiltonian_terms:
coeff = term.get("coefficient", 1.0)
pauli_string = term.get("pauli_string", "")
if pauli_string:
hamiltonian += coeff * spin.z(0) # Simplified - should parse pauli_string
# Prepare parameters
args = []
if parameters:
args = [parameters[p["name"]] for p in
self.kernel_manager.kernel_metadata[kernel_name]["parameters"]]
result = cudaq.observe(kernel, hamiltonian, *args, shots_count=shots)
metadata = kernel_manager.kernel_metadata[kernel_name]
for param in metadata.get("parameters", []):
if param["name"] in parameters:
args.append(parameters[param["name"]])
# Calculate expectation value
if args:
expectation = cudaq.observe(kernel, hamiltonian, *args, shots_count=shots)
else:
result = cudaq.observe(kernel, hamiltonian, shots_count=shots)
expectation = cudaq.observe(kernel, hamiltonian, shots_count=shots)
return {
"success": True,
"expectation": result.expectation(),
"variance": result.variance() if hasattr(result, 'variance') else None,
"expectation_value": float(expectation.expectation()),
"shots": shots
}
@@ -330,126 +281,108 @@ class QuantumExecutor:
def get_state(self, kernel_name: str, parameters: Optional[Dict] = None) -> Dict:
"""Get quantum state vector"""
if not CUDAQ_AVAILABLE:
return {"error": "CUDA Quantum not available"}
if kernel_name not in self.kernel_manager.kernels:
if kernel_name not in kernel_manager.kernels:
return {"error": f"Kernel {kernel_name} not found"}
try:
kernel = self.kernel_manager.kernels[kernel_name]
kernel = kernel_manager.kernels[kernel_name]
# Prepare parameters
args = []
if parameters:
args = [parameters[p["name"]] for p in
self.kernel_manager.kernel_metadata[kernel_name]["parameters"]]
metadata = kernel_manager.kernel_metadata[kernel_name]
for param in metadata.get("parameters", []):
if param["name"] in parameters:
args.append(parameters[param["name"]])
# Get state vector
if args:
state = cudaq.get_state(kernel, *args)
else:
state = cudaq.get_state(kernel)
# Convert state to serializable format
state_array = np.array(state)
state_list = []
for amplitude in state_array:
state_list.append({
# Convert to serializable format
state_vector = []
for amplitude in state:
state_vector.append({
"real": float(amplitude.real),
"imag": float(amplitude.imag)
})
return {
"success": True,
"state": state_list,
"dimension": len(state_list)
"state_vector": state_vector,
"num_qubits": len(state).bit_length() - 1
}
except Exception as e:
logger.error(f"Error getting state for kernel {kernel_name}: {e}")
return {"error": str(e), "traceback": traceback.format_exc()}
def _build_hamiltonian(self, terms: List[Dict]):
"""Build CUDA Quantum Hamiltonian from term list"""
h = None
for term in terms:
paulis = term["paulis"]
qubits = term["qubits"]
coeff_real = term["coefficient"]["real"]
coeff_imag = term["coefficient"]["imag"]
# Build Pauli string
pauli_term = None
for i, (pauli, qubit) in enumerate(zip(paulis, qubits)):
if pauli == 'I':
continue
elif pauli == 'X':
p = spin.x(qubit)
elif pauli == 'Y':
p = spin.y(qubit)
elif pauli == 'Z':
p = spin.z(qubit)
else:
raise ValueError(f"Invalid Pauli operator: {pauli}")
if pauli_term is None:
pauli_term = p
else:
pauli_term = pauli_term * p
if pauli_term is not None:
# Apply coefficient
if coeff_imag != 0:
coeff = complex(coeff_real, coeff_imag)
else:
coeff = coeff_real
term_contribution = coeff * pauli_term
if h is None:
h = term_contribution
else:
h = h + term_contribution
return h if h is not None else 0.0 * spin.i(0)
def get_available_targets(self) -> Dict:
"""Get list of available quantum targets"""
try:
targets = cudaq.get_targets()
return {"success": True, "targets": list(targets)}
except Exception as e:
logger.error(f"Error getting available targets: {e}")
return {"error": str(e), "traceback": traceback.format_exc()}
def get_target_info(self) -> Dict:
"""Get information about current target"""
try:
info = {
"current_target": self.current_target,
"available_targets": list(cudaq.get_targets())
}
return {"success": True, "info": info}
except Exception as e:
logger.error(f"Error getting target info: {e}")
return {"error": str(e), "traceback": traceback.format_exc()}
def get_available_targets_standalone() -> Dict:
"""Standalone function to get available targets"""
try:
targets = cudaq.get_targets()
return {"success": True, "targets": list(targets)}
except Exception as e:
logger.error(f"Error getting available targets: {e}")
return {"error": str(e), "traceback": traceback.format_exc()}
def get_target_info_standalone() -> Dict:
"""Standalone function to get target info"""
try:
targets = cudaq.get_targets()
return {"success": True, "available_targets": list(targets)}
except Exception as e:
logger.error(f"Error getting target info: {e}")
return {"error": str(e), "traceback": traceback.format_exc()}
def get_platform_info() -> Dict:
"""Get platform and hardware information"""
try:
info = {
"cuda_quantum_version": "0.8.0", # Would get from cudaq.__version__ if available
"available_targets": list(cudaq.get_targets()),
"python_version": sys.version,
"platform": sys.platform
}
return {"success": True, "platform_info": info}
except Exception as e:
logger.error(f"Error getting platform info: {e}")
return {"error": str(e), "traceback": traceback.format_exc()}
# Global instances
kernel_manager = QuantumKernelManager()
executor = QuantumExecutor(kernel_manager)
# Add missing methods to executor class
def get_target_info_standalone() -> Dict:
"""Get information about available targets"""
if not CUDAQ_AVAILABLE:
return {"target": "mock", "mode": "mock"}
try:
available_targets = cudaq.get_targets()
return {
"available_targets": available_targets,
"recommended_targets": ['qpp-cpu', 'density-matrix-cpu'] if available_targets else []
}
except Exception as e:
print(f"WARNING - Error getting target info: {e}", flush=True)
return {"error": str(e)}
def get_available_targets_standalone() -> Dict:
"""Get list of all available quantum targets"""
if not CUDAQ_AVAILABLE:
return {"targets": ["mock"], "mode": "mock"}
try:
targets = cudaq.get_targets()
return {"targets": targets, "count": len(targets)}
except Exception as e:
print(f"WARNING - Error getting available targets: {e}", flush=True)
return {"targets": [], "error": str(e)}
executor = QuantumExecutor()
def dispatch_command(command: str, **kwargs) -> Dict:
"""Main dispatch function for Python bridge commands"""
print(f"DEBUG - Command: {command}, Args: {list(kwargs.keys())}", flush=True)
try:
if command == "create_kernel":
return kernel_manager.create_kernel(
@@ -462,19 +395,16 @@ def dispatch_command(command: str, **kwargs) -> Dict:
return kernel_manager.apply_gate(
kwargs["kernel_name"],
kwargs["gate_name"],
kwargs["qubits"],
kwargs["target_qubits"],
kwargs.get("control_qubits"),
kwargs.get("parameters"),
kwargs.get("controls"),
kwargs.get("adjoint", False)
)
elif command == "set_target":
print(f"DEBUG - set_target called with kwargs: {kwargs}", flush=True)
target_name = kwargs.get("target")
if target_name is None:
print(f"ERROR - Missing 'target' parameter. Available params: {list(kwargs.keys())}", flush=True)
return {"error": "Missing required parameter 'target'", "available_params": list(kwargs.keys())}
print(f"DEBUG - Setting target to: {target_name}", flush=True)
return executor.set_target(
target_name,
**kwargs.get("configuration", {})
@@ -515,65 +445,51 @@ def dispatch_command(command: str, **kwargs) -> Dict:
}
elif command == "get_platform_info":
if not CUDAQ_AVAILABLE:
return {"error": "CUDA Quantum not available"}
try:
platform = cudaq.get_platform()
return {
"success": True,
"platform_name": platform.name(),
"num_qpus": platform.num_qpus(),
"is_simulator": platform.is_simulator(),
"is_remote": platform.is_remote()
}
except:
return {"success": True, "platform_name": "default"}
return get_platform_info()
else:
return {"error": f"Unknown command: {command}"}
except Exception as e:
logger.error(f"Error executing command {command}: {e}")
return {"error": str(e), "traceback": traceback.format_exc()}
def main():
"""Main entry point for standalone execution"""
if len(sys.argv) > 1:
# Command line mode
command_json = sys.argv[1]
"""Main event loop for Python bridge"""
print("CUDA Quantum Python Bridge - Ready", flush=True)
for line in sys.stdin:
try:
command_data = json.loads(command_json)
result = dispatch_command(**command_data)
print(json.dumps(result))
request = json.loads(line.strip())
command = request.get("command")
data = request.get("data", {})
request_id = request.get("requestId", "")
if command:
result = dispatch_command(command, **data)
response = {
"success": True,
"data": result,
"requestId": request_id
}
else:
response = {
"success": False,
"error": "No command specified",
"requestId": request_id
}
print(json.dumps(response), flush=True)
except Exception as e:
print(json.dumps({"error": str(e), "traceback": traceback.format_exc()}))
else:
# Server mode - read from stdin for MCP communication
print("CUDA Quantum Python Bridge - Ready")
sys.stdout.flush()
try:
while True:
line = sys.stdin.readline()
if not line:
break
try:
command_data = json.loads(line.strip())
result = dispatch_command(**command_data)
print(json.dumps(result))
sys.stdout.flush()
except json.JSONDecodeError:
print(json.dumps({"error": "Invalid JSON"}))
sys.stdout.flush()
except Exception as e:
print(json.dumps({"error": str(e)}))
sys.stdout.flush()
except KeyboardInterrupt:
pass
error_response = {
"success": False,
"error": str(e),
"traceback": traceback.format_exc(),
"requestId": request.get("requestId", "") if 'request' in locals() else ""
}
print(json.dumps(error_response), flush=True)
if __name__ == "__main__":