198
internal/stats/collector.go
Archivo normal
198
internal/stats/collector.go
Archivo normal
@@ -0,0 +1,198 @@
|
||||
package stats
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/yourusername/buque/internal/docker"
|
||||
"github.com/yourusername/buque/internal/models"
|
||||
)
|
||||
|
||||
// Collector collects and manages container statistics
|
||||
type Collector struct {
|
||||
dockerClient *docker.Client
|
||||
}
|
||||
|
||||
// NewCollector creates a new statistics collector
|
||||
func NewCollector() (*Collector, error) {
|
||||
client, err := docker.NewClient()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Collector{
|
||||
dockerClient: client,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Close closes the statistics collector
|
||||
func (sc *Collector) Close() error {
|
||||
return sc.dockerClient.Close()
|
||||
}
|
||||
|
||||
// CollectAll collects statistics for all running containers
|
||||
func (sc *Collector) CollectAll(ctx context.Context) ([]models.ContainerStats, error) {
|
||||
containers, err := sc.dockerClient.ListContainers(ctx, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
stats := make([]models.ContainerStats, 0, len(containers))
|
||||
for _, container := range containers {
|
||||
stat, err := sc.dockerClient.GetContainerStats(ctx, container.ID)
|
||||
if err != nil {
|
||||
// Log error but continue with other containers
|
||||
fmt.Printf("Warning: failed to get stats for container %s: %v\n", container.Names[0], err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Extract environment name from labels
|
||||
if envName, ok := container.Labels["buque.environment"]; ok {
|
||||
stat.Environment = envName
|
||||
}
|
||||
|
||||
// Clean up container name (remove leading /)
|
||||
if len(container.Names) > 0 && len(container.Names[0]) > 0 {
|
||||
stat.Name = container.Names[0][1:]
|
||||
}
|
||||
|
||||
stats = append(stats, *stat)
|
||||
}
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
// CollectForEnvironment collects statistics for containers in a specific environment
|
||||
func (sc *Collector) CollectForEnvironment(ctx context.Context, envName string) ([]models.ContainerStats, error) {
|
||||
containers, err := sc.dockerClient.GetContainersByLabel(ctx, "buque.environment", envName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
stats := make([]models.ContainerStats, 0, len(containers))
|
||||
for _, container := range containers {
|
||||
stat, err := sc.dockerClient.GetContainerStats(ctx, container.ID)
|
||||
if err != nil {
|
||||
fmt.Printf("Warning: failed to get stats for container %s: %v\n", container.Names[0], err)
|
||||
continue
|
||||
}
|
||||
|
||||
stat.Environment = envName
|
||||
if len(container.Names) > 0 && len(container.Names[0]) > 0 {
|
||||
stat.Name = container.Names[0][1:]
|
||||
}
|
||||
|
||||
stats = append(stats, *stat)
|
||||
}
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
// GetAggregatedStats returns aggregated statistics for all containers
|
||||
func (sc *Collector) GetAggregatedStats(ctx context.Context) (*AggregatedStats, error) {
|
||||
stats, err := sc.CollectAll(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
agg := &AggregatedStats{
|
||||
TotalContainers: len(stats),
|
||||
CollectedAt: time.Now(),
|
||||
}
|
||||
|
||||
for _, stat := range stats {
|
||||
agg.TotalCPUPercent += stat.CPUPercentage
|
||||
agg.TotalMemoryUsage += stat.MemoryUsage
|
||||
agg.TotalMemoryLimit += stat.MemoryLimit
|
||||
agg.TotalNetworkRx += stat.NetworkRx
|
||||
agg.TotalNetworkTx += stat.NetworkTx
|
||||
agg.TotalBlockRead += stat.BlockRead
|
||||
agg.TotalBlockWrite += stat.BlockWrite
|
||||
}
|
||||
|
||||
if agg.TotalMemoryLimit > 0 {
|
||||
agg.TotalMemoryPercent = (float64(agg.TotalMemoryUsage) / float64(agg.TotalMemoryLimit)) * 100.0
|
||||
}
|
||||
|
||||
return agg, nil
|
||||
}
|
||||
|
||||
// SortStats sorts container statistics by the specified field
|
||||
func (sc *Collector) SortStats(stats []models.ContainerStats, sortBy string, descending bool) []models.ContainerStats {
|
||||
sort.Slice(stats, func(i, j int) bool {
|
||||
var less bool
|
||||
switch sortBy {
|
||||
case "cpu":
|
||||
less = stats[i].CPUPercentage < stats[j].CPUPercentage
|
||||
case "memory":
|
||||
less = stats[i].MemoryUsage < stats[j].MemoryUsage
|
||||
case "network":
|
||||
less = (stats[i].NetworkRx + stats[i].NetworkTx) < (stats[j].NetworkRx + stats[j].NetworkTx)
|
||||
case "name":
|
||||
less = stats[i].Name < stats[j].Name
|
||||
default:
|
||||
less = stats[i].Name < stats[j].Name
|
||||
}
|
||||
|
||||
if descending {
|
||||
return !less
|
||||
}
|
||||
return less
|
||||
})
|
||||
|
||||
return stats
|
||||
}
|
||||
|
||||
// MonitorContinuously monitors container statistics continuously
|
||||
func (sc *Collector) MonitorContinuously(ctx context.Context, interval time.Duration, callback func([]models.ContainerStats)) error {
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-ticker.C:
|
||||
stats, err := sc.CollectAll(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
callback(stats)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// AggregatedStats represents aggregated statistics for all containers
|
||||
type AggregatedStats struct {
|
||||
TotalContainers int
|
||||
TotalCPUPercent float64
|
||||
TotalMemoryUsage uint64
|
||||
TotalMemoryLimit uint64
|
||||
TotalMemoryPercent float64
|
||||
TotalNetworkRx uint64
|
||||
TotalNetworkTx uint64
|
||||
TotalBlockRead uint64
|
||||
TotalBlockWrite uint64
|
||||
CollectedAt time.Time
|
||||
}
|
||||
|
||||
// FormatBytes formats bytes to human-readable format
|
||||
func FormatBytes(bytes uint64) string {
|
||||
const unit = 1024
|
||||
if bytes < unit {
|
||||
return fmt.Sprintf("%d B", bytes)
|
||||
}
|
||||
div, exp := uint64(unit), 0
|
||||
for n := bytes / unit; n >= unit; n /= unit {
|
||||
div *= unit
|
||||
exp++
|
||||
}
|
||||
return fmt.Sprintf("%.2f %ciB", float64(bytes)/float64(div), "KMGTPE"[exp])
|
||||
}
|
||||
|
||||
// FormatPercent formats a percentage value
|
||||
func FormatPercent(percent float64) string {
|
||||
return fmt.Sprintf("%.2f%%", percent)
|
||||
}
|
||||
Referencia en una nueva incidencia
Block a user