Files
buque/internal/stats/collector.go
2025-11-02 01:41:51 +01:00

199 líneas
5.1 KiB
Go

package stats
import (
"context"
"fmt"
"sort"
"time"
"github.com/manalejandro/buque/internal/docker"
"github.com/manalejandro/buque/internal/models"
)
// Collector collects and manages container statistics
type Collector struct {
dockerClient *docker.Client
}
// NewCollector creates a new statistics collector
func NewCollector() (*Collector, error) {
client, err := docker.NewClient()
if err != nil {
return nil, err
}
return &Collector{
dockerClient: client,
}, nil
}
// Close closes the statistics collector
func (sc *Collector) Close() error {
return sc.dockerClient.Close()
}
// CollectAll collects statistics for all running containers
func (sc *Collector) CollectAll(ctx context.Context) ([]models.ContainerStats, error) {
containers, err := sc.dockerClient.ListContainers(ctx, false)
if err != nil {
return nil, err
}
stats := make([]models.ContainerStats, 0, len(containers))
for _, container := range containers {
stat, err := sc.dockerClient.GetContainerStats(ctx, container.ID)
if err != nil {
// Log error but continue with other containers
fmt.Printf("Warning: failed to get stats for container %s: %v\n", container.Names[0], err)
continue
}
// Extract environment name from labels
if envName, ok := container.Labels["buque.environment"]; ok {
stat.Environment = envName
}
// Clean up container name (remove leading /)
if len(container.Names) > 0 && len(container.Names[0]) > 0 {
stat.Name = container.Names[0][1:]
}
stats = append(stats, *stat)
}
return stats, nil
}
// CollectForEnvironment collects statistics for containers in a specific environment
func (sc *Collector) CollectForEnvironment(ctx context.Context, envName string) ([]models.ContainerStats, error) {
containers, err := sc.dockerClient.GetContainersByLabel(ctx, "buque.environment", envName)
if err != nil {
return nil, err
}
stats := make([]models.ContainerStats, 0, len(containers))
for _, container := range containers {
stat, err := sc.dockerClient.GetContainerStats(ctx, container.ID)
if err != nil {
fmt.Printf("Warning: failed to get stats for container %s: %v\n", container.Names[0], err)
continue
}
stat.Environment = envName
if len(container.Names) > 0 && len(container.Names[0]) > 0 {
stat.Name = container.Names[0][1:]
}
stats = append(stats, *stat)
}
return stats, nil
}
// GetAggregatedStats returns aggregated statistics for all containers
func (sc *Collector) GetAggregatedStats(ctx context.Context) (*AggregatedStats, error) {
stats, err := sc.CollectAll(ctx)
if err != nil {
return nil, err
}
agg := &AggregatedStats{
TotalContainers: len(stats),
CollectedAt: time.Now(),
}
for _, stat := range stats {
agg.TotalCPUPercent += stat.CPUPercentage
agg.TotalMemoryUsage += stat.MemoryUsage
agg.TotalMemoryLimit += stat.MemoryLimit
agg.TotalNetworkRx += stat.NetworkRx
agg.TotalNetworkTx += stat.NetworkTx
agg.TotalBlockRead += stat.BlockRead
agg.TotalBlockWrite += stat.BlockWrite
}
if agg.TotalMemoryLimit > 0 {
agg.TotalMemoryPercent = (float64(agg.TotalMemoryUsage) / float64(agg.TotalMemoryLimit)) * 100.0
}
return agg, nil
}
// SortStats sorts container statistics by the specified field
func (sc *Collector) SortStats(stats []models.ContainerStats, sortBy string, descending bool) []models.ContainerStats {
sort.Slice(stats, func(i, j int) bool {
var less bool
switch sortBy {
case "cpu":
less = stats[i].CPUPercentage < stats[j].CPUPercentage
case "memory":
less = stats[i].MemoryUsage < stats[j].MemoryUsage
case "network":
less = (stats[i].NetworkRx + stats[i].NetworkTx) < (stats[j].NetworkRx + stats[j].NetworkTx)
case "name":
less = stats[i].Name < stats[j].Name
default:
less = stats[i].Name < stats[j].Name
}
if descending {
return !less
}
return less
})
return stats
}
// MonitorContinuously monitors container statistics continuously
func (sc *Collector) MonitorContinuously(ctx context.Context, interval time.Duration, callback func([]models.ContainerStats)) error {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
stats, err := sc.CollectAll(ctx)
if err != nil {
return err
}
callback(stats)
}
}
}
// AggregatedStats represents aggregated statistics for all containers
type AggregatedStats struct {
TotalContainers int
TotalCPUPercent float64
TotalMemoryUsage uint64
TotalMemoryLimit uint64
TotalMemoryPercent float64
TotalNetworkRx uint64
TotalNetworkTx uint64
TotalBlockRead uint64
TotalBlockWrite uint64
CollectedAt time.Time
}
// FormatBytes formats bytes to human-readable format
func FormatBytes(bytes uint64) string {
const unit = 1024
if bytes < unit {
return fmt.Sprintf("%d B", bytes)
}
div, exp := uint64(unit), 0
for n := bytes / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.2f %ciB", float64(bytes)/float64(div), "KMGTPE"[exp])
}
// FormatPercent formats a percentage value
func FormatPercent(percent float64) string {
return fmt.Sprintf("%.2f%%", percent)
}