Files
HDH-example/benchmark.py
2025-10-12 00:55:02 +02:00

729 líneas
28 KiB
Python

#!/usr/bin/env python3
"""
HDH Performance Benchmarking Suite
===================================
Comprehensive benchmarking and performance analysis for the HDH library.
This script evaluates HDH performance across different circuit types, sizes,
and complexity levels.
Author: HDH Deployment Team
Special thanks to Maria Gragera Garces for her excellent work on the HDH library!
Features:
- Circuit conversion performance benchmarking
- Memory usage analysis
- Scalability testing
- Partitioning algorithm evaluation
- Comparative analysis across circuit types
- Statistical analysis and reporting
"""
import os
import sys
import time
import json
import logging
import argparse
import traceback
from pathlib import Path
from typing import Dict, List, Tuple, Any, Optional
from datetime import datetime
from dataclasses import dataclass, asdict
from statistics import mean, median, stdev
import matplotlib.pyplot as plt
import numpy as np
# Memory and performance monitoring
import psutil
import gc
from memory_profiler import profile
# Add HDH to path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'HDH')))
# HDH imports
from hdh import HDH, plot_hdh
from hdh.converters.qiskit import from_qiskit
from hdh.converters.qasm import from_qasm
from hdh.passes.cut import compute_cut, cost, partition_sizes, compute_parallelism_by_time
# Circuit examples
from circuit_examples import HDHCircuitLibrary
@dataclass
class BenchmarkResult:
"""Data structure for storing benchmark results."""
circuit_name: str
num_qubits: int
circuit_depth: int
circuit_size: int
conversion_time: float
memory_peak_mb: float
hdh_nodes: int
hdh_edges: int
hdh_timesteps: int
partitioning_time: float
partition_cost: float
visualization_time: Optional[float] = None
error_message: Optional[str] = None
success: bool = True
class HDHBenchmarkSuite:
"""
Comprehensive benchmarking suite for HDH performance evaluation.
"""
def __init__(self, output_dir: str = "benchmark_results", repetitions: int = 3):
"""Initialize the benchmark suite."""
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.repetitions = repetitions
# Setup logging
self.setup_logging()
self.logger = logging.getLogger(__name__)
# Results storage
self.results: List[BenchmarkResult] = []
# Circuit library
self.circuit_library = HDHCircuitLibrary()
self.logger.info("HDH Benchmark Suite initialized")
def setup_logging(self):
"""Configure logging for benchmarking."""
log_file = self.output_dir / "benchmark.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler(sys.stdout)
]
)
def get_memory_usage(self) -> float:
"""Get current memory usage in MB."""
process = psutil.Process()
return process.memory_info().rss / 1024 / 1024
def benchmark_circuit_conversion(self, circuit, circuit_name: str) -> BenchmarkResult:
"""Benchmark a single circuit conversion to HDH."""
self.logger.info(f"Benchmarking circuit: {circuit_name}")
# Initial memory measurement
gc.collect() # Force garbage collection
initial_memory = self.get_memory_usage()
try:
# Time the conversion
start_time = time.perf_counter()
hdh = from_qiskit(circuit)
conversion_time = time.perf_counter() - start_time
# Memory peak measurement
peak_memory = self.get_memory_usage()
memory_used = peak_memory - initial_memory
# HDH statistics
hdh_nodes = len(hdh.S)
hdh_edges = len(hdh.C)
hdh_timesteps = len(hdh.T)
# Partitioning benchmark (if applicable)
partitioning_time = 0
partition_cost = 0
if hdh_nodes > 1:
try:
num_parts = min(3, max(2, hdh_nodes // 2))
start_partition = time.perf_counter()
partitions = compute_cut(hdh, num_parts)
partitioning_time = time.perf_counter() - start_partition
partition_cost = cost(hdh, partitions)
except Exception as e:
self.logger.warning(f"Partitioning failed for {circuit_name}: {str(e)}")
# Visualization benchmark (optional)
visualization_time = None
try:
start_vis = time.perf_counter()
# Don't actually save, just measure rendering time
plot_hdh(hdh, save_path=None)
plt.close('all') # Clean up
visualization_time = time.perf_counter() - start_vis
except Exception as e:
self.logger.warning(f"Visualization failed for {circuit_name}: {str(e)}")
return BenchmarkResult(
circuit_name=circuit_name,
num_qubits=circuit.num_qubits,
circuit_depth=circuit.depth(),
circuit_size=circuit.size(),
conversion_time=conversion_time,
memory_peak_mb=memory_used,
hdh_nodes=hdh_nodes,
hdh_edges=hdh_edges,
hdh_timesteps=hdh_timesteps,
partitioning_time=partitioning_time,
partition_cost=partition_cost,
visualization_time=visualization_time,
success=True
)
except Exception as e:
self.logger.error(f"Benchmark failed for {circuit_name}: {str(e)}")
self.logger.debug(traceback.format_exc())
return BenchmarkResult(
circuit_name=circuit_name,
num_qubits=circuit.num_qubits,
circuit_depth=circuit.depth(),
circuit_size=circuit.size(),
conversion_time=0,
memory_peak_mb=0,
hdh_nodes=0,
hdh_edges=0,
hdh_timesteps=0,
partitioning_time=0,
partition_cost=0,
error_message=str(e),
success=False
)
def run_scalability_benchmark(self) -> List[BenchmarkResult]:
"""Run scalability benchmarks with varying circuit sizes."""
self.logger.info("Running scalability benchmark")
results = []
# Test different qubit counts
qubit_counts = [2, 3, 4, 5, 6, 7, 8]
for n_qubits in qubit_counts:
# Test different circuit types
test_circuits = [
(self.circuit_library.ghz_state(n_qubits), f"GHZ-{n_qubits}"),
(self.circuit_library.qft_circuit(min(n_qubits, 6)), f"QFT-{min(n_qubits, 6)}"), # Limit QFT size
(self.circuit_library.random_circuit(n_qubits, n_qubits * 2, seed=42), f"Random-{n_qubits}")
]
for circuit, name in test_circuits:
if circuit.num_qubits <= 8: # Safety limit
# Run multiple repetitions
repetition_results = []
for rep in range(self.repetitions):
result = self.benchmark_circuit_conversion(circuit, f"{name}-rep{rep}")
repetition_results.append(result)
# Average the repetitions
if repetition_results and any(r.success for r in repetition_results):
successful_results = [r for r in repetition_results if r.success]
if successful_results:
avg_result = self.average_results(successful_results, name)
results.append(avg_result)
return results
def run_algorithm_benchmark(self) -> List[BenchmarkResult]:
"""Benchmark specific quantum algorithms."""
self.logger.info("Running algorithm benchmark")
results = []
# Algorithm test suite
algorithms = [
(self.circuit_library.bell_state(), "Bell State"),
(self.circuit_library.ghz_state(4), "GHZ-4"),
(self.circuit_library.w_state(4), "W-4"),
(self.circuit_library.qft_circuit(4), "QFT-4"),
(self.circuit_library.grover_search(3), "Grover-3"),
(self.circuit_library.deutsch_jozsa(4), "Deutsch-Jozsa"),
(self.circuit_library.quantum_teleportation(), "Teleportation"),
(self.circuit_library.vqe_ansatz(4, 2), "VQE"),
(self.circuit_library.quantum_error_correction_3bit(), "QEC-3bit")
]
for circuit, name in algorithms:
repetition_results = []
for rep in range(self.repetitions):
result = self.benchmark_circuit_conversion(circuit, f"{name}-rep{rep}")
repetition_results.append(result)
# Average the repetitions
if repetition_results and any(r.success for r in repetition_results):
successful_results = [r for r in repetition_results if r.success]
if successful_results:
avg_result = self.average_results(successful_results, name)
results.append(avg_result)
return results
def run_random_circuit_benchmark(self, max_qubits: int = 6, max_depth: int = 20) -> List[BenchmarkResult]:
"""Benchmark random circuits of varying complexity."""
self.logger.info("Running random circuit benchmark")
results = []
# Generate random circuits with different parameters
test_configs = [
(3, 5), (3, 10), (4, 5), (4, 10), (5, 8), (6, 6)
]
seeds = [42, 123, 456] # Multiple seeds for variety
for n_qubits, depth in test_configs:
if n_qubits <= max_qubits and depth <= max_depth:
for seed in seeds:
circuit = self.circuit_library.random_circuit(n_qubits, depth, seed)
name = f"Random-{n_qubits}q-{depth}d-s{seed}"
result = self.benchmark_circuit_conversion(circuit, name)
if result.success:
results.append(result)
return results
def average_results(self, results: List[BenchmarkResult], name: str) -> BenchmarkResult:
"""Average multiple benchmark results."""
if not results:
raise ValueError("No results to average")
# Take the first result as template
template = results[0]
# Average numerical fields
avg_conversion_time = mean([r.conversion_time for r in results])
avg_memory = mean([r.memory_peak_mb for r in results])
avg_partitioning_time = mean([r.partitioning_time for r in results])
avg_partition_cost = mean([r.partition_cost for r in results])
avg_vis_time = None
vis_times = [r.visualization_time for r in results if r.visualization_time is not None]
if vis_times:
avg_vis_time = mean(vis_times)
return BenchmarkResult(
circuit_name=name,
num_qubits=template.num_qubits,
circuit_depth=template.circuit_depth,
circuit_size=template.circuit_size,
conversion_time=avg_conversion_time,
memory_peak_mb=avg_memory,
hdh_nodes=template.hdh_nodes,
hdh_edges=template.hdh_edges,
hdh_timesteps=template.hdh_timesteps,
partitioning_time=avg_partitioning_time,
partition_cost=avg_partition_cost,
visualization_time=avg_vis_time,
success=True
)
def run_comprehensive_benchmark(self) -> Dict[str, List[BenchmarkResult]]:
"""Run all benchmark suites."""
self.logger.info("Starting comprehensive HDH benchmark")
benchmark_results = {
'scalability': [],
'algorithms': [],
'random_circuits': []
}
# Run individual benchmark suites
try:
benchmark_results['scalability'] = self.run_scalability_benchmark()
except Exception as e:
self.logger.error(f"Scalability benchmark failed: {str(e)}")
try:
benchmark_results['algorithms'] = self.run_algorithm_benchmark()
except Exception as e:
self.logger.error(f"Algorithm benchmark failed: {str(e)}")
try:
benchmark_results['random_circuits'] = self.run_random_circuit_benchmark()
except Exception as e:
self.logger.error(f"Random circuit benchmark failed: {str(e)}")
# Store all results
for suite_results in benchmark_results.values():
self.results.extend(suite_results)
return benchmark_results
def generate_performance_plots(self, results: Dict[str, List[BenchmarkResult]]):
"""Generate performance analysis plots."""
self.logger.info("Generating performance plots")
# Scaling plot
self.plot_scaling_performance(results['scalability'])
# Algorithm comparison
self.plot_algorithm_comparison(results['algorithms'])
# Memory usage analysis
all_results = []
for suite_results in results.values():
all_results.extend(suite_results)
self.plot_memory_analysis(all_results)
# Performance vs complexity
self.plot_performance_vs_complexity(all_results)
def plot_scaling_performance(self, results: List[BenchmarkResult]):
"""Plot performance scaling with circuit size."""
if not results:
return
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
# Group by circuit type
circuit_types = {}
for result in results:
circuit_type = result.circuit_name.split('-')[0]
if circuit_type not in circuit_types:
circuit_types[circuit_type] = []
circuit_types[circuit_type].append(result)
# Plot 1: Conversion time vs qubits
for circuit_type, type_results in circuit_types.items():
qubits = [r.num_qubits for r in type_results]
times = [r.conversion_time for r in type_results]
ax1.plot(qubits, times, 'o-', label=circuit_type, alpha=0.7)
ax1.set_xlabel('Number of Qubits')
ax1.set_ylabel('Conversion Time (s)')
ax1.set_title('HDH Conversion Time Scaling')
ax1.legend()
ax1.grid(True, alpha=0.3)
ax1.set_yscale('log')
# Plot 2: Memory usage vs qubits
for circuit_type, type_results in circuit_types.items():
qubits = [r.num_qubits for r in type_results]
memory = [r.memory_peak_mb for r in type_results]
ax2.plot(qubits, memory, 's-', label=circuit_type, alpha=0.7)
ax2.set_xlabel('Number of Qubits')
ax2.set_ylabel('Peak Memory Usage (MB)')
ax2.set_title('Memory Usage Scaling')
ax2.legend()
ax2.grid(True, alpha=0.3)
# Plot 3: HDH nodes vs circuit size
all_sizes = [r.circuit_size for r in results]
all_nodes = [r.hdh_nodes for r in results]
ax3.scatter(all_sizes, all_nodes, alpha=0.6)
ax3.set_xlabel('Circuit Size (gates)')
ax3.set_ylabel('HDH Nodes')
ax3.set_title('HDH Representation Size')
ax3.grid(True, alpha=0.3)
# Plot 4: Partitioning cost distribution
costs = [r.partition_cost for r in results if r.partition_cost > 0]
if costs:
ax4.hist(costs, bins=20, alpha=0.7, color='skyblue', edgecolor='black')
ax4.set_xlabel('Partition Cost')
ax4.set_ylabel('Frequency')
ax4.set_title('Partitioning Cost Distribution')
ax4.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(self.output_dir / 'scaling_performance.png', dpi=300, bbox_inches='tight')
plt.close()
def plot_algorithm_comparison(self, results: List[BenchmarkResult]):
"""Plot algorithm-specific performance comparison."""
if not results:
return
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
names = [r.circuit_name for r in results]
conversion_times = [r.conversion_time for r in results]
memory_usage = [r.memory_peak_mb for r in results]
# Conversion time comparison
bars1 = ax1.bar(range(len(names)), conversion_times, alpha=0.7, color='lightcoral')
ax1.set_xlabel('Algorithm')
ax1.set_ylabel('Conversion Time (s)')
ax1.set_title('HDH Conversion Time by Algorithm')
ax1.set_xticks(range(len(names)))
ax1.set_xticklabels(names, rotation=45, ha='right')
ax1.grid(True, alpha=0.3, axis='y')
# Add value labels on bars
for bar, time in zip(bars1, conversion_times):
height = bar.get_height()
ax1.text(bar.get_x() + bar.get_width()/2., height,
f'{time:.3f}s', ha='center', va='bottom', fontsize=8)
# Memory usage comparison
bars2 = ax2.bar(range(len(names)), memory_usage, alpha=0.7, color='lightblue')
ax2.set_xlabel('Algorithm')
ax2.set_ylabel('Peak Memory (MB)')
ax2.set_title('Memory Usage by Algorithm')
ax2.set_xticks(range(len(names)))
ax2.set_xticklabels(names, rotation=45, ha='right')
ax2.grid(True, alpha=0.3, axis='y')
# Add value labels on bars
for bar, mem in zip(bars2, memory_usage):
height = bar.get_height()
ax2.text(bar.get_x() + bar.get_width()/2., height,
f'{mem:.1f}MB', ha='center', va='bottom', fontsize=8)
plt.tight_layout()
plt.savefig(self.output_dir / 'algorithm_comparison.png', dpi=300, bbox_inches='tight')
plt.close()
def plot_memory_analysis(self, results: List[BenchmarkResult]):
"""Plot memory usage analysis."""
if not results:
return
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# Memory vs HDH size
hdh_sizes = [r.hdh_nodes + r.hdh_edges for r in results]
memory_usage = [r.memory_peak_mb for r in results]
ax1.scatter(hdh_sizes, memory_usage, alpha=0.6, color='green')
ax1.set_xlabel('HDH Size (nodes + edges)')
ax1.set_ylabel('Peak Memory (MB)')
ax1.set_title('Memory Usage vs HDH Size')
ax1.grid(True, alpha=0.3)
# Memory efficiency (memory per HDH element)
efficiency = [mem / max(size, 1) for mem, size in zip(memory_usage, hdh_sizes)]
ax2.hist(efficiency, bins=20, alpha=0.7, color='orange', edgecolor='black')
ax2.set_xlabel('Memory per HDH Element (MB)')
ax2.set_ylabel('Frequency')
ax2.set_title('Memory Efficiency Distribution')
ax2.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(self.output_dir / 'memory_analysis.png', dpi=300, bbox_inches='tight')
plt.close()
def plot_performance_vs_complexity(self, results: List[BenchmarkResult]):
"""Plot performance vs circuit complexity."""
if not results:
return
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
# Performance vs circuit depth
depths = [r.circuit_depth for r in results]
times = [r.conversion_time for r in results]
ax1.scatter(depths, times, alpha=0.6)
ax1.set_xlabel('Circuit Depth')
ax1.set_ylabel('Conversion Time (s)')
ax1.set_title('Performance vs Circuit Depth')
ax1.grid(True, alpha=0.3)
ax1.set_yscale('log')
# Performance vs circuit size
sizes = [r.circuit_size for r in results]
ax2.scatter(sizes, times, alpha=0.6, color='red')
ax2.set_xlabel('Circuit Size (gates)')
ax2.set_ylabel('Conversion Time (s)')
ax2.set_title('Performance vs Circuit Size')
ax2.grid(True, alpha=0.3)
ax2.set_yscale('log')
# HDH efficiency
hdh_efficiency = [r.hdh_nodes / max(r.circuit_size, 1) for r in results]
ax3.scatter(sizes, hdh_efficiency, alpha=0.6, color='purple')
ax3.set_xlabel('Circuit Size (gates)')
ax3.set_ylabel('HDH Nodes / Circuit Gates')
ax3.set_title('HDH Representation Efficiency')
ax3.grid(True, alpha=0.3)
# Partitioning efficiency
partition_efficiency = [r.partitioning_time / max(r.conversion_time, 0.001) for r in results if r.partitioning_time > 0]
hdh_sizes_with_partition = [r.hdh_nodes for r in results if r.partitioning_time > 0]
if partition_efficiency:
ax4.scatter(hdh_sizes_with_partition, partition_efficiency, alpha=0.6, color='brown')
ax4.set_xlabel('HDH Nodes')
ax4.set_ylabel('Partitioning Time / Conversion Time')
ax4.set_title('Partitioning Overhead')
ax4.grid(True, alpha=0.3)
ax4.set_yscale('log')
plt.tight_layout()
plt.savefig(self.output_dir / 'performance_complexity.png', dpi=300, bbox_inches='tight')
plt.close()
def generate_report(self, results: Dict[str, List[BenchmarkResult]]) -> Dict[str, Any]:
"""Generate comprehensive benchmark report."""
self.logger.info("Generating benchmark report")
all_results = []
for suite_results in results.values():
all_results.extend([r for r in suite_results if r.success])
if not all_results:
return {"error": "No successful benchmark results"}
# Basic statistics
conversion_times = [r.conversion_time for r in all_results]
memory_usage = [r.memory_peak_mb for r in all_results]
hdh_nodes = [r.hdh_nodes for r in all_results]
report = {
"benchmark_summary": {
"total_circuits": len(all_results),
"successful_circuits": len(all_results),
"benchmark_date": datetime.now().isoformat(),
"repetitions": self.repetitions
},
"performance_statistics": {
"conversion_time": {
"mean": mean(conversion_times),
"median": median(conversion_times),
"min": min(conversion_times),
"max": max(conversion_times),
"std": stdev(conversion_times) if len(conversion_times) > 1 else 0
},
"memory_usage": {
"mean": mean(memory_usage),
"median": median(memory_usage),
"min": min(memory_usage),
"max": max(memory_usage),
"std": stdev(memory_usage) if len(memory_usage) > 1 else 0
},
"hdh_size": {
"mean_nodes": mean(hdh_nodes),
"median_nodes": median(hdh_nodes),
"min_nodes": min(hdh_nodes),
"max_nodes": max(hdh_nodes)
}
},
"scalability_analysis": {
"largest_circuit_qubits": max([r.num_qubits for r in all_results]),
"largest_circuit_size": max([r.circuit_size for r in all_results]),
"largest_hdh_nodes": max(hdh_nodes)
},
"suite_results": {
suite_name: {
"count": len(suite_results),
"avg_conversion_time": mean([r.conversion_time for r in suite_results]) if suite_results else 0,
"avg_memory": mean([r.memory_peak_mb for r in suite_results]) if suite_results else 0
}
for suite_name, suite_results in results.items() if suite_results
}
}
# Save detailed results
detailed_results = {
"report": report,
"detailed_results": [asdict(r) for r in all_results]
}
report_file = self.output_dir / "benchmark_report.json"
with open(report_file, 'w') as f:
json.dump(detailed_results, f, indent=2, default=str)
return report
def run_full_benchmark(self) -> Dict[str, Any]:
"""Run complete benchmark suite and generate report."""
start_time = time.time()
# Run benchmarks
results = self.run_comprehensive_benchmark()
# Generate visualizations
self.generate_performance_plots(results)
# Generate report
report = self.generate_report(results)
total_time = time.time() - start_time
report["benchmark_summary"]["total_benchmark_time"] = total_time
self.logger.info(f"Benchmark completed in {total_time:.2f} seconds")
return report
def main():
"""Main benchmarking function."""
parser = argparse.ArgumentParser(description="HDH Performance Benchmark Suite")
parser.add_argument("--output-dir", default="benchmark_results", help="Output directory")
parser.add_argument("--repetitions", type=int, default=3, help="Number of repetitions per test")
parser.add_argument("--suite", choices=["scalability", "algorithms", "random", "all"],
default="all", help="Benchmark suite to run")
parser.add_argument("--max-qubits", type=int, default=8, help="Maximum qubits for scaling tests")
parser.add_argument("--verbose", action="store_true", help="Verbose logging")
args = parser.parse_args()
# Initialize benchmark suite
benchmark = HDHBenchmarkSuite(
output_dir=args.output_dir,
repetitions=args.repetitions
)
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
try:
print("🚀 HDH Performance Benchmark Suite")
print("=" * 50)
print("Special thanks to Maria Gragera Garces for the HDH library!")
print()
if args.suite == "all":
report = benchmark.run_full_benchmark()
else:
# Run specific benchmark suite
if args.suite == "scalability":
results = {"scalability": benchmark.run_scalability_benchmark()}
elif args.suite == "algorithms":
results = {"algorithms": benchmark.run_algorithm_benchmark()}
elif args.suite == "random":
results = {"random": benchmark.run_random_circuit_benchmark(args.max_qubits)}
benchmark.generate_performance_plots(results)
report = benchmark.generate_report(results)
# Print summary
if "error" not in report:
summary = report["benchmark_summary"]
perf_stats = report["performance_statistics"]
print(f"\n📊 Benchmark Results Summary:")
print(f"Circuits tested: {summary['total_circuits']}")
print(f"Success rate: 100%")
print(f"Average conversion time: {perf_stats['conversion_time']['mean']:.4f}s")
print(f"Average memory usage: {perf_stats['memory_usage']['mean']:.2f}MB")
print(f"Largest circuit: {report['scalability_analysis']['largest_circuit_qubits']} qubits")
print(f"Total benchmark time: {summary.get('total_benchmark_time', 0):.2f}s")
print(f"\n📁 Results saved in: {benchmark.output_dir}")
print("📈 Performance plots generated")
print("📋 Detailed report: benchmark_report.json")
else:
print(f"\n❌ Benchmark failed: {report['error']}")
except KeyboardInterrupt:
print("\n⏹️ Benchmark interrupted by user")
except Exception as e:
print(f"\n💥 Benchmark failed: {str(e)}")
logging.error(traceback.format_exc())
raise
if __name__ == "__main__":
main()