#!/usr/bin/env python3 """ HDH Performance Benchmarking Suite =================================== Comprehensive benchmarking and performance analysis for the HDH library. This script evaluates HDH performance across different circuit types, sizes, and complexity levels. Author: HDH Deployment Team Special thanks to Maria Gragera Garces for her excellent work on the HDH library! Features: - Circuit conversion performance benchmarking - Memory usage analysis - Scalability testing - Partitioning algorithm evaluation - Comparative analysis across circuit types - Statistical analysis and reporting """ import os import sys import time import json import logging import argparse import traceback from pathlib import Path from typing import Dict, List, Tuple, Any, Optional from datetime import datetime from dataclasses import dataclass, asdict from statistics import mean, median, stdev import matplotlib.pyplot as plt import numpy as np # Memory and performance monitoring import psutil import gc import tracemalloc from memory_profiler import profile, memory_usage # Add HDH to path sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'HDH'))) # HDH imports from hdh import HDH, plot_hdh from hdh.converters.qiskit import from_qiskit from hdh.converters.qasm import from_qasm from hdh.passes.cut import compute_cut, cost, partition_size, compute_parallelism_by_time # Circuit examples from circuit_examples import HDHCircuitLibrary @dataclass class BenchmarkResult: """Data structure for storing benchmark results.""" circuit_name: str num_qubits: int circuit_depth: int circuit_size: int conversion_time: float memory_peak_mb: float hdh_nodes: int hdh_edges: int hdh_timesteps: int partitioning_time: float partition_cost: float visualization_time: Optional[float] = None error_message: Optional[str] = None success: bool = True class HDHBenchmarkSuite: """ Comprehensive benchmarking suite for HDH performance evaluation. """ def __init__(self, output_dir: str = "benchmark_results", repetitions: int = 3): """Initialize the benchmark suite.""" self.output_dir = Path(output_dir) self.output_dir.mkdir(exist_ok=True) self.repetitions = repetitions # Setup logging self.setup_logging() self.logger = logging.getLogger(__name__) # Results storage self.results: List[BenchmarkResult] = [] # Circuit library self.circuit_library = HDHCircuitLibrary() self.logger.info("HDH Benchmark Suite initialized") def setup_logging(self): """Configure logging for benchmarking.""" log_file = self.output_dir / "benchmark.log" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler(sys.stdout) ] ) def get_memory_usage(self) -> float: """Get current memory usage in MB.""" process = psutil.Process() return process.memory_info().rss / 1024 / 1024 def benchmark_circuit_conversion(self, circuit, circuit_name: str) -> BenchmarkResult: """Benchmark a single circuit conversion to HDH.""" self.logger.info(f"Benchmarking circuit: {circuit_name}") # Initial memory measurement gc.collect() # Force garbage collection # Start memory tracking with tracemalloc tracemalloc.start() initial_snapshot = tracemalloc.take_snapshot() initial_memory = self.get_memory_usage() try: # Time the conversion and track memory start_time = time.perf_counter() hdh = from_qiskit(circuit) conversion_time = time.perf_counter() - start_time # Memory peak measurement using tracemalloc current_snapshot = tracemalloc.take_snapshot() top_stats = current_snapshot.compare_to(initial_snapshot, 'lineno') # Calculate memory difference in MB tracemalloc_memory = sum(stat.size_diff for stat in top_stats) / 1024 / 1024 # Also get psutil measurement as backup peak_memory = self.get_memory_usage() psutil_memory = peak_memory - initial_memory # Use the larger of the two measurements (tracemalloc is usually more accurate) memory_used = max(abs(tracemalloc_memory), abs(psutil_memory)) # Stop tracemalloc tracemalloc.stop() # HDH statistics hdh_nodes = len(hdh.S) hdh_edges = len(hdh.C) hdh_timesteps = len(hdh.T) # Partitioning benchmark (if applicable) partitioning_time = 0 partition_cost = 0 if hdh_nodes > 1: try: num_parts = min(3, max(2, hdh_nodes // 2)) # Calculate capacity: distribute qubits evenly across partitions with some buffer num_qubits = circuit.num_qubits capacity = max(1, (num_qubits + num_parts - 1) // num_parts + 1) # Ceiling division + buffer start_partition = time.perf_counter() partitions, _ = compute_cut(hdh, num_parts, capacity) partitioning_time = time.perf_counter() - start_partition cost_q, cost_c = cost(hdh, partitions) partition_cost = cost_q + cost_c # Total cost except Exception as e: self.logger.warning(f"Partitioning failed for {circuit_name}: {str(e)}") # Visualization benchmark (optional) visualization_time = None try: start_vis = time.perf_counter() # Don't actually save, just measure rendering time plot_hdh(hdh, save_path=None) plt.close('all') # Clean up visualization_time = time.perf_counter() - start_vis except Exception as e: self.logger.warning(f"Visualization failed for {circuit_name}: {str(e)}") return BenchmarkResult( circuit_name=circuit_name, num_qubits=circuit.num_qubits, circuit_depth=circuit.depth(), circuit_size=circuit.size(), conversion_time=conversion_time, memory_peak_mb=memory_used, hdh_nodes=hdh_nodes, hdh_edges=hdh_edges, hdh_timesteps=hdh_timesteps, partitioning_time=partitioning_time, partition_cost=partition_cost, visualization_time=visualization_time, success=True ) except Exception as e: # Make sure to stop tracemalloc even on error if tracemalloc.is_tracing(): tracemalloc.stop() self.logger.error(f"Benchmark failed for {circuit_name}: {str(e)}") self.logger.debug(traceback.format_exc()) return BenchmarkResult( circuit_name=circuit_name, num_qubits=circuit.num_qubits, circuit_depth=circuit.depth(), circuit_size=circuit.size(), conversion_time=0, memory_peak_mb=0, hdh_nodes=0, hdh_edges=0, hdh_timesteps=0, partitioning_time=0, partition_cost=0, error_message=str(e), success=False ) def run_scalability_benchmark(self) -> List[BenchmarkResult]: """Run scalability benchmarks with varying circuit sizes.""" self.logger.info("Running scalability benchmark") results = [] # Test different qubit counts qubit_counts = [2, 3, 4, 5, 6, 7, 8] for n_qubits in qubit_counts: # Test different circuit types test_circuits = [ (self.circuit_library.ghz_state(n_qubits), f"GHZ-{n_qubits}"), (self.circuit_library.qft_circuit(min(n_qubits, 6)), f"QFT-{min(n_qubits, 6)}"), # Limit QFT size (self.circuit_library.random_circuit(n_qubits, n_qubits * 2, seed=42), f"Random-{n_qubits}") ] for circuit, name in test_circuits: if circuit.num_qubits <= 8: # Safety limit # Run multiple repetitions repetition_results = [] for rep in range(self.repetitions): result = self.benchmark_circuit_conversion(circuit, f"{name}-rep{rep}") repetition_results.append(result) # Average the repetitions if repetition_results and any(r.success for r in repetition_results): successful_results = [r for r in repetition_results if r.success] if successful_results: avg_result = self.average_results(successful_results, name) results.append(avg_result) return results def run_algorithm_benchmark(self) -> List[BenchmarkResult]: """Benchmark specific quantum algorithms.""" self.logger.info("Running algorithm benchmark") results = [] # Algorithm test suite algorithms = [ (self.circuit_library.bell_state(), "Bell State"), (self.circuit_library.ghz_state(4), "GHZ-4"), (self.circuit_library.w_state(4), "W-4"), (self.circuit_library.qft_circuit(4), "QFT-4"), (self.circuit_library.grover_search(3), "Grover-3"), (self.circuit_library.deutsch_jozsa(4), "Deutsch-Jozsa"), (self.circuit_library.quantum_teleportation(), "Teleportation"), (self.circuit_library.vqe_ansatz(4, 2), "VQE"), (self.circuit_library.quantum_error_correction_3bit(), "QEC-3bit") ] for circuit, name in algorithms: repetition_results = [] for rep in range(self.repetitions): result = self.benchmark_circuit_conversion(circuit, f"{name}-rep{rep}") repetition_results.append(result) # Average the repetitions if repetition_results and any(r.success for r in repetition_results): successful_results = [r for r in repetition_results if r.success] if successful_results: avg_result = self.average_results(successful_results, name) results.append(avg_result) return results def run_random_circuit_benchmark(self, max_qubits: int = 6, max_depth: int = 20) -> List[BenchmarkResult]: """Benchmark random circuits of varying complexity.""" self.logger.info("Running random circuit benchmark") results = [] # Generate random circuits with different parameters test_configs = [ (3, 5), (3, 10), (4, 5), (4, 10), (5, 8), (6, 6) ] seeds = [42, 123, 456] # Multiple seeds for variety for n_qubits, depth in test_configs: if n_qubits <= max_qubits and depth <= max_depth: for seed in seeds: circuit = self.circuit_library.random_circuit(n_qubits, depth, seed) name = f"Random-{n_qubits}q-{depth}d-s{seed}" result = self.benchmark_circuit_conversion(circuit, name) if result.success: results.append(result) return results def average_results(self, results: List[BenchmarkResult], name: str) -> BenchmarkResult: """Average multiple benchmark results.""" if not results: raise ValueError("No results to average") # Take the first result as template template = results[0] # Average numerical fields avg_conversion_time = mean([r.conversion_time for r in results]) avg_memory = mean([r.memory_peak_mb for r in results]) avg_partitioning_time = mean([r.partitioning_time for r in results]) avg_partition_cost = mean([r.partition_cost for r in results]) avg_vis_time = None vis_times = [r.visualization_time for r in results if r.visualization_time is not None] if vis_times: avg_vis_time = mean(vis_times) return BenchmarkResult( circuit_name=name, num_qubits=template.num_qubits, circuit_depth=template.circuit_depth, circuit_size=template.circuit_size, conversion_time=avg_conversion_time, memory_peak_mb=avg_memory, hdh_nodes=template.hdh_nodes, hdh_edges=template.hdh_edges, hdh_timesteps=template.hdh_timesteps, partitioning_time=avg_partitioning_time, partition_cost=avg_partition_cost, visualization_time=avg_vis_time, success=True ) def run_comprehensive_benchmark(self) -> Dict[str, List[BenchmarkResult]]: """Run all benchmark suites.""" self.logger.info("Starting comprehensive HDH benchmark") benchmark_results = { 'scalability': [], 'algorithms': [], 'random_circuits': [] } # Run individual benchmark suites try: benchmark_results['scalability'] = self.run_scalability_benchmark() except Exception as e: self.logger.error(f"Scalability benchmark failed: {str(e)}") try: benchmark_results['algorithms'] = self.run_algorithm_benchmark() except Exception as e: self.logger.error(f"Algorithm benchmark failed: {str(e)}") try: benchmark_results['random_circuits'] = self.run_random_circuit_benchmark() except Exception as e: self.logger.error(f"Random circuit benchmark failed: {str(e)}") # Store all results for suite_results in benchmark_results.values(): self.results.extend(suite_results) return benchmark_results def generate_performance_plots(self, results: Dict[str, List[BenchmarkResult]]): """Generate performance analysis plots.""" self.logger.info("Generating performance plots") # Scaling plot self.plot_scaling_performance(results['scalability']) # Algorithm comparison self.plot_algorithm_comparison(results['algorithms']) # Memory usage analysis all_results = [] for suite_results in results.values(): all_results.extend(suite_results) self.plot_memory_analysis(all_results) # Performance vs complexity self.plot_performance_vs_complexity(all_results) def plot_scaling_performance(self, results: List[BenchmarkResult]): """Plot performance scaling with circuit size.""" if not results: return fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12)) # Group by circuit type circuit_types = {} for result in results: circuit_type = result.circuit_name.split('-')[0] if circuit_type not in circuit_types: circuit_types[circuit_type] = [] circuit_types[circuit_type].append(result) # Plot 1: Conversion time vs qubits for circuit_type, type_results in circuit_types.items(): qubits = [r.num_qubits for r in type_results] times = [r.conversion_time for r in type_results] ax1.plot(qubits, times, 'o-', label=circuit_type, alpha=0.7) ax1.set_xlabel('Number of Qubits') ax1.set_ylabel('Conversion Time (s)') ax1.set_title('HDH Conversion Time Scaling') ax1.legend() ax1.grid(True, alpha=0.3) ax1.set_yscale('log') # Plot 2: Memory usage vs qubits for circuit_type, type_results in circuit_types.items(): qubits = [r.num_qubits for r in type_results] memory = [r.memory_peak_mb for r in type_results] ax2.plot(qubits, memory, 's-', label=circuit_type, alpha=0.7) ax2.set_xlabel('Number of Qubits') ax2.set_ylabel('Peak Memory Usage (MB)') ax2.set_title('Memory Usage Scaling') ax2.legend() ax2.grid(True, alpha=0.3) # Plot 3: HDH nodes vs circuit size all_sizes = [r.circuit_size for r in results] all_nodes = [r.hdh_nodes for r in results] ax3.scatter(all_sizes, all_nodes, alpha=0.6) ax3.set_xlabel('Circuit Size (gates)') ax3.set_ylabel('HDH Nodes') ax3.set_title('HDH Representation Size') ax3.grid(True, alpha=0.3) # Plot 4: Partitioning cost distribution costs = [r.partition_cost for r in results if r.partition_cost > 0] if costs: ax4.hist(costs, bins=20, alpha=0.7, color='skyblue', edgecolor='black') ax4.set_xlabel('Partition Cost') ax4.set_ylabel('Frequency') ax4.set_title('Partitioning Cost Distribution') ax4.grid(True, alpha=0.3) plt.tight_layout() plt.savefig(self.output_dir / 'scaling_performance.png', dpi=300, bbox_inches='tight') plt.close() def plot_algorithm_comparison(self, results: List[BenchmarkResult]): """Plot algorithm-specific performance comparison.""" if not results: return fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6)) names = [r.circuit_name for r in results] conversion_times = [r.conversion_time for r in results] memory_usage = [r.memory_peak_mb for r in results] # Conversion time comparison bars1 = ax1.bar(range(len(names)), conversion_times, alpha=0.7, color='lightcoral') ax1.set_xlabel('Algorithm') ax1.set_ylabel('Conversion Time (s)') ax1.set_title('HDH Conversion Time by Algorithm') ax1.set_xticks(range(len(names))) ax1.set_xticklabels(names, rotation=45, ha='right') ax1.grid(True, alpha=0.3, axis='y') # Add value labels on bars for bar, time in zip(bars1, conversion_times): height = bar.get_height() ax1.text(bar.get_x() + bar.get_width()/2., height, f'{time:.3f}s', ha='center', va='bottom', fontsize=8) # Memory usage comparison bars2 = ax2.bar(range(len(names)), memory_usage, alpha=0.7, color='lightblue') ax2.set_xlabel('Algorithm') ax2.set_ylabel('Peak Memory (MB)') ax2.set_title('Memory Usage by Algorithm') ax2.set_xticks(range(len(names))) ax2.set_xticklabels(names, rotation=45, ha='right') ax2.grid(True, alpha=0.3, axis='y') # Add value labels on bars for bar, mem in zip(bars2, memory_usage): height = bar.get_height() ax2.text(bar.get_x() + bar.get_width()/2., height, f'{mem:.1f}MB', ha='center', va='bottom', fontsize=8) plt.tight_layout() plt.savefig(self.output_dir / 'algorithm_comparison.png', dpi=300, bbox_inches='tight') plt.close() def plot_memory_analysis(self, results: List[BenchmarkResult]): """Plot memory usage analysis.""" if not results: return fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6)) # Memory vs HDH size hdh_sizes = [r.hdh_nodes + r.hdh_edges for r in results] memory_usage = [r.memory_peak_mb for r in results] ax1.scatter(hdh_sizes, memory_usage, alpha=0.6, color='green') ax1.set_xlabel('HDH Size (nodes + edges)') ax1.set_ylabel('Peak Memory (MB)') ax1.set_title('Memory Usage vs HDH Size') ax1.grid(True, alpha=0.3) # Memory efficiency (memory per HDH element) efficiency = [mem / max(size, 1) for mem, size in zip(memory_usage, hdh_sizes)] ax2.hist(efficiency, bins=20, alpha=0.7, color='orange', edgecolor='black') ax2.set_xlabel('Memory per HDH Element (MB)') ax2.set_ylabel('Frequency') ax2.set_title('Memory Efficiency Distribution') ax2.grid(True, alpha=0.3) plt.tight_layout() plt.savefig(self.output_dir / 'memory_analysis.png', dpi=300, bbox_inches='tight') plt.close() def plot_performance_vs_complexity(self, results: List[BenchmarkResult]): """Plot performance vs circuit complexity.""" if not results: return fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12)) # Performance vs circuit depth depths = [r.circuit_depth for r in results] times = [r.conversion_time for r in results] ax1.scatter(depths, times, alpha=0.6) ax1.set_xlabel('Circuit Depth') ax1.set_ylabel('Conversion Time (s)') ax1.set_title('Performance vs Circuit Depth') ax1.grid(True, alpha=0.3) ax1.set_yscale('log') # Performance vs circuit size sizes = [r.circuit_size for r in results] ax2.scatter(sizes, times, alpha=0.6, color='red') ax2.set_xlabel('Circuit Size (gates)') ax2.set_ylabel('Conversion Time (s)') ax2.set_title('Performance vs Circuit Size') ax2.grid(True, alpha=0.3) ax2.set_yscale('log') # HDH efficiency hdh_efficiency = [r.hdh_nodes / max(r.circuit_size, 1) for r in results] ax3.scatter(sizes, hdh_efficiency, alpha=0.6, color='purple') ax3.set_xlabel('Circuit Size (gates)') ax3.set_ylabel('HDH Nodes / Circuit Gates') ax3.set_title('HDH Representation Efficiency') ax3.grid(True, alpha=0.3) # Partitioning efficiency partition_efficiency = [r.partitioning_time / max(r.conversion_time, 0.001) for r in results if r.partitioning_time > 0] hdh_sizes_with_partition = [r.hdh_nodes for r in results if r.partitioning_time > 0] if partition_efficiency: ax4.scatter(hdh_sizes_with_partition, partition_efficiency, alpha=0.6, color='brown') ax4.set_xlabel('HDH Nodes') ax4.set_ylabel('Partitioning Time / Conversion Time') ax4.set_title('Partitioning Overhead') ax4.grid(True, alpha=0.3) ax4.set_yscale('log') plt.tight_layout() plt.savefig(self.output_dir / 'performance_complexity.png', dpi=300, bbox_inches='tight') plt.close() def generate_report(self, results: Dict[str, List[BenchmarkResult]]) -> Dict[str, Any]: """Generate comprehensive benchmark report.""" self.logger.info("Generating benchmark report") all_results = [] for suite_results in results.values(): all_results.extend([r for r in suite_results if r.success]) if not all_results: return {"error": "No successful benchmark results"} # Basic statistics conversion_times = [r.conversion_time for r in all_results] memory_usage = [r.memory_peak_mb for r in all_results] hdh_nodes = [r.hdh_nodes for r in all_results] report = { "benchmark_summary": { "total_circuits": len(all_results), "successful_circuits": len(all_results), "benchmark_date": datetime.now().isoformat(), "repetitions": self.repetitions }, "performance_statistics": { "conversion_time": { "mean": mean(conversion_times), "median": median(conversion_times), "min": min(conversion_times), "max": max(conversion_times), "std": stdev(conversion_times) if len(conversion_times) > 1 else 0 }, "memory_usage": { "mean": mean(memory_usage), "median": median(memory_usage), "min": min(memory_usage), "max": max(memory_usage), "std": stdev(memory_usage) if len(memory_usage) > 1 else 0 }, "hdh_size": { "mean_nodes": mean(hdh_nodes), "median_nodes": median(hdh_nodes), "min_nodes": min(hdh_nodes), "max_nodes": max(hdh_nodes) } }, "scalability_analysis": { "largest_circuit_qubits": max([r.num_qubits for r in all_results]), "largest_circuit_size": max([r.circuit_size for r in all_results]), "largest_hdh_nodes": max(hdh_nodes) }, "suite_results": { suite_name: { "count": len(suite_results), "avg_conversion_time": mean([r.conversion_time for r in suite_results]) if suite_results else 0, "avg_memory": mean([r.memory_peak_mb for r in suite_results]) if suite_results else 0 } for suite_name, suite_results in results.items() if suite_results } } # Save detailed results detailed_results = { "report": report, "detailed_results": [asdict(r) for r in all_results] } report_file = self.output_dir / "benchmark_report.json" with open(report_file, 'w') as f: json.dump(detailed_results, f, indent=2, default=str) return report def run_full_benchmark(self) -> Dict[str, Any]: """Run complete benchmark suite and generate report.""" start_time = time.time() # Run benchmarks results = self.run_comprehensive_benchmark() # Generate visualizations self.generate_performance_plots(results) # Generate report report = self.generate_report(results) total_time = time.time() - start_time report["benchmark_summary"]["total_benchmark_time"] = total_time self.logger.info(f"Benchmark completed in {total_time:.2f} seconds") return report def main(): """Main benchmarking function.""" parser = argparse.ArgumentParser(description="HDH Performance Benchmark Suite") parser.add_argument("--output-dir", default="benchmark_results", help="Output directory") parser.add_argument("--repetitions", type=int, default=3, help="Number of repetitions per test") parser.add_argument("--suite", choices=["scalability", "algorithms", "random", "all"], default="all", help="Benchmark suite to run") parser.add_argument("--max-qubits", type=int, default=8, help="Maximum qubits for scaling tests") parser.add_argument("--verbose", action="store_true", help="Verbose logging") args = parser.parse_args() # Initialize benchmark suite benchmark = HDHBenchmarkSuite( output_dir=args.output_dir, repetitions=args.repetitions ) if args.verbose: logging.getLogger().setLevel(logging.DEBUG) try: print("šŸš€ HDH Performance Benchmark Suite") print("=" * 50) print("Special thanks to Maria Gragera Garces for the HDH library!") print() if args.suite == "all": report = benchmark.run_full_benchmark() else: # Run specific benchmark suite if args.suite == "scalability": results = {"scalability": benchmark.run_scalability_benchmark()} elif args.suite == "algorithms": results = {"algorithms": benchmark.run_algorithm_benchmark()} elif args.suite == "random": results = {"random": benchmark.run_random_circuit_benchmark(args.max_qubits)} benchmark.generate_performance_plots(results) report = benchmark.generate_report(results) # Print summary if "error" not in report: summary = report["benchmark_summary"] perf_stats = report["performance_statistics"] print(f"\nšŸ“Š Benchmark Results Summary:") print(f"Circuits tested: {summary['total_circuits']}") print(f"Success rate: 100%") print(f"Average conversion time: {perf_stats['conversion_time']['mean']:.4f}s") print(f"Average memory usage: {perf_stats['memory_usage']['mean']:.2f}MB") print(f"Largest circuit: {report['scalability_analysis']['largest_circuit_qubits']} qubits") print(f"Total benchmark time: {summary.get('total_benchmark_time', 0):.2f}s") print(f"\nšŸ“ Results saved in: {benchmark.output_dir}") print("šŸ“ˆ Performance plots generated") print("šŸ“‹ Detailed report: benchmark_report.json") else: print(f"\nāŒ Benchmark failed: {report['error']}") except KeyboardInterrupt: print("\nā¹ļø Benchmark interrupted by user") except Exception as e: print(f"\nšŸ’„ Benchmark failed: {str(e)}") logging.error(traceback.format_exc()) raise if __name__ == "__main__": main()