Files
GigabiteStudios 20103d9793
Some checks failed
Verify / verify (push) Failing after 17s
feat(logs): archive command history daily
2026-06-20 20:05:16 -05:00

1830 lines
61 KiB
Go

package services
import (
"compress/gzip"
"context"
"database/sql"
"encoding/json"
"fmt"
"net"
"os"
"path/filepath"
"regexp"
"strings"
"time"
"github.com/robfig/cron/v3"
"golang.org/x/crypto/ssh"
"maintainarr/internal/models"
)
type NodeService struct {
db *sql.DB
crypto *CryptoService
}
func NewNodeService(database *sql.DB, crypto *CryptoService) *NodeService {
return &NodeService{db: database, crypto: crypto}
}
func (s *NodeService) ListNodes(ctx context.Context, orgID int64) ([]models.Node, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT n.id, n.organization_id, n.group_id, COALESCE(g.name, ''), n.tag, n.name, n.distro, n.hostname, n.ip_address, n.mac_address,
n.ssh_port, n.ssh_username, n.ssh_password, n.package_manager, n.architecture, n.host_model, n.kernel_version, n.cpu_model, n.gpu_model, n.default_shell, n.package_count, n.memory_total_mb, n.disk_total_gb,
n.cpu_usage, n.ram_usage, n.disk_usage,
n.uptime_seconds, n.last_seen_at, n.updates_scan_enabled, n.auto_updates_enabled, n.updates_available, n.updates_details, n.auto_update_window_start, n.auto_update_window_end, n.auto_update_days, n.updates_last_checked_at, n.updates_last_error, n.notes, n.created_at, n.updated_at
FROM nodes n
LEFT JOIN vm_groups g ON g.id = n.group_id
WHERE n.organization_id = ?
ORDER BY n.name ASC
`, orgID)
if err != nil {
return nil, err
}
defer rows.Close()
return s.scanNodes(rows)
}
func (s *NodeService) ListNodesByGroup(ctx context.Context, orgID, groupID int64) ([]models.Node, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT n.id, n.organization_id, n.group_id, COALESCE(g.name, ''), n.tag, n.name, n.distro, n.hostname, n.ip_address, n.mac_address,
n.ssh_port, n.ssh_username, n.ssh_password, n.package_manager, n.architecture, n.host_model, n.kernel_version, n.cpu_model, n.gpu_model, n.default_shell, n.package_count, n.memory_total_mb, n.disk_total_gb,
n.cpu_usage, n.ram_usage, n.disk_usage,
n.uptime_seconds, n.last_seen_at, n.updates_scan_enabled, n.auto_updates_enabled, n.updates_available, n.updates_details, n.auto_update_window_start, n.auto_update_window_end, n.auto_update_days, n.updates_last_checked_at, n.updates_last_error, n.notes, n.created_at, n.updated_at
FROM nodes n
LEFT JOIN vm_groups g ON g.id = n.group_id
WHERE n.organization_id = ? AND n.group_id = ?
ORDER BY n.name ASC
`, orgID, groupID)
if err != nil {
return nil, err
}
defer rows.Close()
return s.scanNodes(rows)
}
func (s *NodeService) ListNodesByTag(ctx context.Context, orgID int64, tag string) ([]models.Node, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT n.id, n.organization_id, n.group_id, COALESCE(g.name, ''), n.tag, n.name, n.distro, n.hostname, n.ip_address, n.mac_address,
n.ssh_port, n.ssh_username, n.ssh_password, n.package_manager, n.architecture, n.host_model, n.kernel_version, n.cpu_model, n.gpu_model, n.default_shell, n.package_count, n.memory_total_mb, n.disk_total_gb,
n.cpu_usage, n.ram_usage, n.disk_usage,
n.uptime_seconds, n.last_seen_at, n.updates_scan_enabled, n.auto_updates_enabled, n.updates_available, n.updates_details, n.auto_update_window_start, n.auto_update_window_end, n.auto_update_days, n.updates_last_checked_at, n.updates_last_error, n.notes, n.created_at, n.updated_at
FROM nodes n
LEFT JOIN vm_groups g ON g.id = n.group_id
WHERE n.organization_id = ? AND n.tag = ?
ORDER BY n.name ASC
`, orgID, tag)
if err != nil {
return nil, err
}
defer rows.Close()
return s.scanNodes(rows)
}
func (s *NodeService) scanNodes(rows *sql.Rows) ([]models.Node, error) {
var nodes []models.Node
for rows.Next() {
var node models.Node
if err := rows.Scan(
&node.ID, &node.OrganizationID, &node.GroupID, &node.GroupName, &node.Tag, &node.Name, &node.Distro, &node.Hostname,
&node.IPAddress, &node.MACAddress, &node.SSHPort, &node.SSHUsername, &node.SSHPassword,
&node.PackageManager, &node.Architecture, &node.HostModel, &node.KernelVersion, &node.CPUModel, &node.GPUModel, &node.DefaultShell, &node.PackageCount, &node.MemoryTotalMB, &node.DiskTotalGB,
&node.CPUUsage, &node.RAMUsage, &node.DiskUsage, &node.UptimeSeconds, &node.LastSeenAt,
&node.UpdatesScanEnabled, &node.AutoUpdatesEnabled, &node.UpdatesAvailable, &node.UpdatesDetails, &node.AutoUpdateWindowStart, &node.AutoUpdateWindowEnd, &node.AutoUpdateDays, &node.UpdatesLastChecked, &node.UpdatesLastError, &node.Notes, &node.CreatedAt, &node.UpdatedAt,
); err != nil {
return nil, err
}
if node.SSHPassword != "" {
if decrypted, err := s.crypto.Decrypt(node.SSHPassword); err == nil {
node.SSHPassword = decrypted
}
}
nodes = append(nodes, node)
}
return nodes, rows.Err()
}
func (s *NodeService) GetNode(ctx context.Context, orgID, nodeID int64) (*models.Node, error) {
node := &models.Node{}
err := s.db.QueryRowContext(ctx, `
SELECT n.id, n.organization_id, n.group_id, COALESCE(g.name, ''), n.tag, n.name, n.distro, n.hostname, n.ip_address, n.mac_address,
n.ssh_port, n.ssh_username, n.ssh_password, n.package_manager, n.architecture, n.host_model, n.kernel_version, n.cpu_model, n.gpu_model, n.default_shell, n.package_count, n.memory_total_mb, n.disk_total_gb,
n.cpu_usage, n.ram_usage, n.disk_usage,
n.uptime_seconds, n.last_seen_at, n.updates_scan_enabled, n.auto_updates_enabled, n.updates_available, n.updates_details, n.auto_update_window_start, n.auto_update_window_end, n.auto_update_days, n.updates_last_checked_at, n.updates_last_error, n.notes, n.created_at, n.updated_at
FROM nodes n
LEFT JOIN vm_groups g ON g.id = n.group_id
WHERE n.organization_id = ? AND n.id = ?
`, orgID, nodeID).Scan(
&node.ID, &node.OrganizationID, &node.GroupID, &node.GroupName, &node.Tag, &node.Name, &node.Distro, &node.Hostname,
&node.IPAddress, &node.MACAddress, &node.SSHPort, &node.SSHUsername, &node.SSHPassword,
&node.PackageManager, &node.Architecture, &node.HostModel, &node.KernelVersion, &node.CPUModel, &node.GPUModel, &node.DefaultShell, &node.PackageCount, &node.MemoryTotalMB, &node.DiskTotalGB,
&node.CPUUsage, &node.RAMUsage, &node.DiskUsage, &node.UptimeSeconds, &node.LastSeenAt,
&node.UpdatesScanEnabled, &node.AutoUpdatesEnabled, &node.UpdatesAvailable, &node.UpdatesDetails, &node.AutoUpdateWindowStart, &node.AutoUpdateWindowEnd, &node.AutoUpdateDays, &node.UpdatesLastChecked, &node.UpdatesLastError, &node.Notes, &node.CreatedAt, &node.UpdatedAt,
)
if err != nil {
return nil, err
}
if node.SSHPassword != "" {
if decrypted, err := s.crypto.Decrypt(node.SSHPassword); err == nil {
node.SSHPassword = decrypted
}
}
return node, nil
}
func (s *NodeService) SaveNode(ctx context.Context, node *models.Node) error {
encryptedPassword := ""
if node.SSHPassword != "" {
token, err := s.crypto.Encrypt(node.SSHPassword)
if err != nil {
return err
}
encryptedPassword = token
}
if node.ID == 0 {
result, err := s.db.ExecContext(ctx, `
INSERT INTO nodes (
organization_id, group_id, tag, name, distro, hostname, ip_address, mac_address,
ssh_port, ssh_username, ssh_password, package_manager, architecture, host_model, kernel_version, cpu_model, gpu_model, default_shell, package_count, memory_total_mb, disk_total_gb,
updates_scan_enabled, auto_updates_enabled, updates_available, updates_details, auto_update_window_start, auto_update_window_end, auto_update_days, updates_last_checked_at, updates_last_error, notes
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, node.OrganizationID, node.GroupID, node.Tag, node.Name, node.Distro, node.Hostname, node.IPAddress,
node.MACAddress, node.SSHPort, node.SSHUsername, encryptedPassword, node.PackageManager, node.Architecture, node.HostModel, node.KernelVersion, node.CPUModel, node.GPUModel, node.DefaultShell, node.PackageCount, node.MemoryTotalMB, node.DiskTotalGB, node.UpdatesScanEnabled, node.AutoUpdatesEnabled, node.UpdatesAvailable, node.UpdatesDetails, node.AutoUpdateWindowStart, node.AutoUpdateWindowEnd, node.AutoUpdateDays, node.UpdatesLastChecked, node.UpdatesLastError, node.Notes)
if err != nil {
return err
}
node.ID, _ = result.LastInsertId()
return nil
}
_, err := s.db.ExecContext(ctx, `
UPDATE nodes
SET group_id = ?, tag = ?, name = ?, distro = ?, hostname = ?, ip_address = ?, mac_address = ?,
ssh_port = ?, ssh_username = ?, ssh_password = ?, package_manager = ?, architecture = ?, host_model = ?, kernel_version = ?, cpu_model = ?, gpu_model = ?, default_shell = ?, package_count = ?, memory_total_mb = ?, disk_total_gb = ?,
updates_scan_enabled = ?, auto_updates_enabled = ?, updates_available = ?, updates_details = ?, auto_update_window_start = ?, auto_update_window_end = ?, auto_update_days = ?, updates_last_checked_at = ?, updates_last_error = ?, notes = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ? AND organization_id = ?
`, node.GroupID, node.Tag, node.Name, node.Distro, node.Hostname, node.IPAddress, node.MACAddress,
node.SSHPort, node.SSHUsername, encryptedPassword, node.PackageManager, node.Architecture, node.HostModel, node.KernelVersion, node.CPUModel, node.GPUModel, node.DefaultShell, node.PackageCount, node.MemoryTotalMB, node.DiskTotalGB, node.UpdatesScanEnabled, node.AutoUpdatesEnabled, node.UpdatesAvailable, node.UpdatesDetails, node.AutoUpdateWindowStart, node.AutoUpdateWindowEnd, node.AutoUpdateDays, node.UpdatesLastChecked, node.UpdatesLastError, node.Notes,
node.ID, node.OrganizationID)
return err
}
func (s *NodeService) EnsureUptimeMonitorForNode(ctx context.Context, node *models.Node) error {
target := fmt.Sprintf("%s:%d", strings.TrimSpace(node.IPAddress), node.SSHPort)
name := strings.TrimSpace(node.Name)
if name == "" {
name = target
}
_, err := s.db.ExecContext(ctx, `
INSERT INTO uptime_monitors (
organization_id, node_id, name, target, monitor_type, interval_seconds, enabled
) VALUES (?, ?, ?, ?, 'ssh', 60, 1)
ON CONFLICT(node_id) DO UPDATE SET
organization_id = excluded.organization_id,
name = excluded.name,
target = excluded.target,
interval_seconds = 60,
enabled = 1,
updated_at = CURRENT_TIMESTAMP
`, node.OrganizationID, node.ID, name, target)
return err
}
func (s *NodeService) EnsureUptimeMonitors(ctx context.Context, orgID int64) error {
nodes, err := s.ListNodes(ctx, orgID)
if err != nil {
return err
}
for i := range nodes {
if err := s.EnsureUptimeMonitorForNode(ctx, &nodes[i]); err != nil {
return err
}
}
return nil
}
func (s *NodeService) DeleteNode(ctx context.Context, orgID, nodeID int64) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
rows, err := tx.QueryContext(ctx, `
SELECT id
FROM uptime_monitors
WHERE organization_id = ? AND node_id = ?
`, orgID, nodeID)
if err != nil {
return err
}
defer rows.Close()
var monitorIDs []int64
for rows.Next() {
var monitorID int64
if err := rows.Scan(&monitorID); err != nil {
return err
}
monitorIDs = append(monitorIDs, monitorID)
}
if err := rows.Err(); err != nil {
return err
}
for _, monitorID := range monitorIDs {
if _, err := tx.ExecContext(ctx, `DELETE FROM uptime_checks WHERE monitor_id = ?`, monitorID); err != nil {
return err
}
if _, err := tx.ExecContext(ctx, `DELETE FROM uptime_incidents WHERE monitor_id = ?`, monitorID); err != nil {
return err
}
}
if _, err := tx.ExecContext(ctx, `DELETE FROM uptime_monitors WHERE organization_id = ? AND node_id = ?`, orgID, nodeID); err != nil {
return err
}
if _, err := tx.ExecContext(ctx, `DELETE FROM command_runs WHERE node_id = ?`, nodeID); err != nil {
return err
}
if _, err := tx.ExecContext(ctx, `DELETE FROM automation_jobs WHERE organization_id = ? AND node_id = ?`, orgID, nodeID); err != nil {
return err
}
result, err := tx.ExecContext(ctx, `DELETE FROM nodes WHERE organization_id = ? AND id = ?`, orgID, nodeID)
if err != nil {
return err
}
affected, err := result.RowsAffected()
if err != nil {
return err
}
if affected == 0 {
return sql.ErrNoRows
}
return tx.Commit()
}
func (s *NodeService) UpdateNodeUpdatePolicy(ctx context.Context, orgID, nodeID int64, scanEnabled, autoUpdate bool, windowStart, windowEnd, updateDays string) error {
_, err := s.db.ExecContext(ctx, `
UPDATE nodes
SET updates_scan_enabled = ?, auto_updates_enabled = ?, auto_update_window_start = ?, auto_update_window_end = ?, auto_update_days = ?, updated_at = CURRENT_TIMESTAMP
WHERE organization_id = ? AND id = ?
`, scanEnabled, autoUpdate, windowStart, windowEnd, updateDays, orgID, nodeID)
return err
}
func (s *NodeService) UpdateGroupUpdatePolicy(ctx context.Context, orgID, groupID int64, scanEnabled, autoUpdate bool, windowStart, windowEnd, updateDays string) error {
_, err := s.db.ExecContext(ctx, `
UPDATE nodes
SET updates_scan_enabled = ?, auto_updates_enabled = ?, auto_update_window_start = ?, auto_update_window_end = ?, auto_update_days = ?, updated_at = CURRENT_TIMESTAMP
WHERE organization_id = ? AND group_id = ?
`, scanEnabled, autoUpdate, windowStart, windowEnd, updateDays, orgID, groupID)
return err
}
func (s *NodeService) RefreshNodeUpdates(ctx context.Context, node *models.Node) (int64, string, error) {
command := updateScanCommand(node.PackageManager)
if strings.TrimSpace(command) == "" {
node.UpdatesAvailable = 0
node.UpdatesDetails = ""
node.UpdatesLastError = "Unsupported package manager"
now := time.Now()
node.UpdatesLastChecked = &now
_, _ = s.db.ExecContext(ctx, `
UPDATE nodes
SET updates_available = 0, updates_details = '', updates_last_checked_at = ?, updates_last_error = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ? AND organization_id = ?
`, now, node.UpdatesLastError, node.ID, node.OrganizationID)
return 0, "", fmt.Errorf("unsupported package manager")
}
startedAt := time.Now()
output, err := s.RunSSHCommand(ctx, node, command)
finishedAt := time.Now()
status := "completed"
if err != nil {
status = "failed"
output = strings.TrimSpace(output + "\n" + err.Error())
}
count, packages, parseErr := parseUpdateScan(output)
lastError := ""
if err != nil {
lastError = err.Error()
} else if parseErr != nil {
lastError = parseErr.Error()
status = "failed"
}
if err == nil && parseErr == nil {
node.UpdatesAvailable = count
node.UpdatesDetails = mustMarshalUpdatePackages(packages)
} else {
node.UpdatesDetails = ""
}
node.UpdatesLastError = lastError
node.UpdatesLastChecked = &finishedAt
_, dbErr := s.db.ExecContext(ctx, `
UPDATE nodes
SET updates_available = ?, updates_details = ?, updates_last_checked_at = ?, updates_last_error = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ? AND organization_id = ?
`, node.UpdatesAvailable, node.UpdatesDetails, finishedAt, node.UpdatesLastError, node.ID, node.OrganizationID)
if dbErr != nil {
return node.UpdatesAvailable, output, dbErr
}
s.logCommandRun(ctx, commandRunParams{
NodeID: node.ID,
Action: "scan-updates",
CommandText: sanitizeCommand(command),
Status: status,
Output: output,
StartedAt: &startedAt,
FinishedAt: &finishedAt,
})
if err != nil {
return node.UpdatesAvailable, output, err
}
if parseErr != nil {
return node.UpdatesAvailable, output, parseErr
}
return count, output, nil
}
func (s *NodeService) ApplyNodeUpdates(ctx context.Context, node *models.Node, triggeredBy *int64) (string, error) {
baseCommand := updateApplyCommand(node.PackageManager)
if strings.TrimSpace(baseCommand) == "" {
return "", fmt.Errorf("unsupported package manager")
}
command := sudoWrappedCommand(baseCommand, node.SSHPassword)
startedAt := time.Now()
output, err := s.RunSSHCommand(ctx, node, command)
finishedAt := time.Now()
status := "completed"
if err != nil {
status = "failed"
output = strings.TrimSpace(output + "\n" + err.Error())
}
s.logCommandRun(ctx, commandRunParams{
NodeID: node.ID,
Action: "apply-updates",
CommandText: sanitizeCommand(baseCommand),
Status: status,
Output: output,
TriggeredBy: triggeredBy,
StartedAt: &startedAt,
FinishedAt: &finishedAt,
})
if err != nil {
return output, err
}
_, _, _ = s.RefreshNodeUpdates(ctx, node)
return output, nil
}
func (s *NodeService) ScanAllNodeUpdates(ctx context.Context, orgID int64) error {
nodes, err := s.ListNodes(ctx, orgID)
if err != nil {
return err
}
orgWindow, _ := s.organizationAutoUpdateWindow(ctx, orgID)
for i := range nodes {
if !nodes[i].UpdatesScanEnabled || nodes[i].SSHUsername == "" || nodes[i].SSHPassword == "" {
continue
}
_, _, _ = s.RefreshNodeUpdates(ctx, &nodes[i])
if nodes[i].AutoUpdatesEnabled && nodes[i].UpdatesAvailable > 0 && autoUpdateWindowOpen(&nodes[i], orgWindow, time.Now()) {
_, _ = s.ApplyNodeUpdates(ctx, &nodes[i], nil)
}
}
return nil
}
func (s *NodeService) RefreshAllNodeInventory(ctx context.Context, orgID int64) error {
nodes, err := s.ListNodes(ctx, orgID)
if err != nil {
return err
}
for i := range nodes {
if nodes[i].SSHUsername == "" || nodes[i].SSHPassword == "" {
continue
}
_, _ = s.RefreshNodeInventory(ctx, &nodes[i])
}
return nil
}
func (s *NodeService) ScanGroupNodeUpdates(ctx context.Context, orgID, groupID int64) error {
nodes, err := s.ListNodesByGroup(ctx, orgID, groupID)
if err != nil {
return err
}
for i := range nodes {
if !nodes[i].UpdatesScanEnabled || nodes[i].SSHUsername == "" || nodes[i].SSHPassword == "" {
continue
}
_, _, _ = s.RefreshNodeUpdates(ctx, &nodes[i])
}
return nil
}
func (s *NodeService) ApplyAllNodeUpdates(ctx context.Context, orgID int64, triggeredBy *int64) error {
nodes, err := s.ListNodes(ctx, orgID)
if err != nil {
return err
}
for i := range nodes {
if nodes[i].SSHUsername == "" || nodes[i].SSHPassword == "" || nodes[i].UpdatesAvailable <= 0 {
continue
}
_, _ = s.ApplyNodeUpdates(ctx, &nodes[i], triggeredBy)
}
return nil
}
func (s *NodeService) ApplyEligibleAutoUpdates(ctx context.Context, orgID int64) error {
nodes, err := s.ListNodes(ctx, orgID)
if err != nil {
return err
}
orgWindow, _ := s.organizationAutoUpdateWindow(ctx, orgID)
now := time.Now()
for i := range nodes {
if !nodes[i].AutoUpdatesEnabled || nodes[i].SSHUsername == "" || nodes[i].SSHPassword == "" || nodes[i].UpdatesAvailable <= 0 {
continue
}
if !autoUpdateWindowOpen(&nodes[i], orgWindow, now) {
continue
}
_, _ = s.ApplyNodeUpdates(ctx, &nodes[i], nil)
}
return nil
}
func (s *NodeService) ApplyGroupNodeUpdates(ctx context.Context, orgID, groupID int64, triggeredBy *int64) error {
nodes, err := s.ListNodesByGroup(ctx, orgID, groupID)
if err != nil {
return err
}
for i := range nodes {
if nodes[i].SSHUsername == "" || nodes[i].SSHPassword == "" || nodes[i].UpdatesAvailable <= 0 {
continue
}
_, _ = s.ApplyNodeUpdates(ctx, &nodes[i], triggeredBy)
}
return nil
}
func (s *NodeService) RefreshNodeStats(ctx context.Context, node *models.Node) (string, error) {
const statsScript = `
read _ user nice system idle iowait irq softirq steal _ _ < /proc/stat
total1=$((user + nice + system + idle + iowait + irq + softirq + steal))
idle1=$((idle + iowait))
sleep 1
read _ user nice system idle iowait irq softirq steal _ _ < /proc/stat
total2=$((user + nice + system + idle + iowait + irq + softirq + steal))
idle2=$((idle + iowait))
diff_total=$((total2 - total1))
diff_idle=$((idle2 - idle1))
if [ "$diff_total" -gt 0 ]; then
cpu="$(awk -v total="$diff_total" -v idle="$diff_idle" 'BEGIN { printf "%.2f", (total - idle) * 100 / total }')"
else
cpu="0.00"
fi
ram="$(awk '
/MemTotal:/ { total=$2 }
/MemAvailable:/ { available=$2 }
END {
if (total > 0) {
printf "%.2f", ((total - available) * 100) / total
} else {
printf "0.00"
}
}
' /proc/meminfo)"
disk="$(df / --output=pcent 2>/dev/null | tail -1 | tr -dc '0-9')"
uptime="$(cut -d. -f1 /proc/uptime 2>/dev/null)"
echo "CPU=${cpu}"
echo "RAM=${ram}"
echo "DISK=${disk:-0}"
echo "UPTIME=${uptime:-0}"
`
output, err := s.RunSSHCommand(ctx, node, strings.TrimSpace(statsScript))
if err != nil {
s.logCommandRun(ctx, commandRunParams{
NodeID: node.ID,
Action: "refresh-stats",
CommandText: sanitizeCommand(statsScript),
Status: "failed",
Output: strings.TrimSpace(output + "\n" + err.Error()),
})
return "", err
}
stats := map[string]float64{}
for _, line := range strings.Split(output, "\n") {
line = strings.TrimSpace(line)
parts := strings.SplitN(line, "=", 2)
if len(parts) != 2 {
continue
}
switch parts[0] {
case "CPU", "RAM", "DISK", "UPTIME":
var value float64
fmt.Sscanf(parts[1], "%f", &value)
stats[parts[0]] = value
}
}
now := time.Now()
_, err = s.db.ExecContext(ctx, `
UPDATE nodes
SET cpu_usage = ?, ram_usage = ?, disk_usage = ?, uptime_seconds = ?, last_seen_at = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
`, stats["CPU"], stats["RAM"], stats["DISK"], int64(stats["UPTIME"]), now, node.ID)
if err != nil {
return output, err
}
node.CPUUsage = stats["CPU"]
node.RAMUsage = stats["RAM"]
node.DiskUsage = stats["DISK"]
node.UptimeSeconds = int64(stats["UPTIME"])
node.LastSeenAt = &now
s.logCommandRun(ctx, commandRunParams{
NodeID: node.ID,
Action: "refresh-stats",
CommandText: sanitizeCommand(statsScript),
Status: "completed",
Output: output,
})
return output, nil
}
func (s *NodeService) StatsStale(node *models.Node, maxAge time.Duration) bool {
if node.LastSeenAt == nil {
return true
}
return time.Since(*node.LastSeenAt) >= maxAge
}
func (s *NodeService) RefreshNodeInventory(ctx context.Context, node *models.Node) (string, error) {
output, err := s.RunSSHCommand(ctx, node, strings.Join([]string{
`if [ -r /etc/os-release ]; then . /etc/os-release; echo DISTRO="${PRETTY_NAME:-$ID}"; else echo DISTRO="$(uname -s)"; fi`,
`echo HOSTNAME="$(hostname 2>/dev/null || uname -n)"`,
`PRIMARY_IFACE="$(ip route get 1.1.1.1 2>/dev/null | awk '/dev/ {for (i=1;i<=NF;i++) if ($i=="dev") {print $(i+1); exit}}')"; if [ -z "$PRIMARY_IFACE" ]; then PRIMARY_IFACE="$(ip -o link show 2>/dev/null | awk -F': ' '$2 != "lo" {print $2; exit}')"; fi; MAC_ADDR="$(cat "/sys/class/net/${PRIMARY_IFACE}/address" 2>/dev/null)"; echo MAC_ADDR="${MAC_ADDR}"`,
`echo ARCH="$(uname -m 2>/dev/null)"`,
`HOST_MODEL="$(cat /sys/devices/virtual/dmi/id/product_name 2>/dev/null || hostnamectl 2>/dev/null | awk -F: '/Chassis|Hardware Model/ {gsub(/^[ \t]+/, "", $2); print $2; exit}')"; echo HOST_MODEL="${HOST_MODEL}"`,
`echo KERNEL="$(uname -r 2>/dev/null)"`,
`if command -v apt >/dev/null 2>&1; then echo PKG_MGR=apt; elif command -v dnf >/dev/null 2>&1; then echo PKG_MGR=dnf; elif command -v yum >/dev/null 2>&1; then echo PKG_MGR=yum; elif command -v pacman >/dev/null 2>&1; then echo PKG_MGR=pacman; elif command -v zypper >/dev/null 2>&1; then echo PKG_MGR=zypper; elif command -v apk >/dev/null 2>&1; then echo PKG_MGR=apk; elif command -v nix-env >/dev/null 2>&1; then echo PKG_MGR=nix; elif command -v emerge >/dev/null 2>&1; then echo PKG_MGR=emerge; else echo PKG_MGR=unknown; fi`,
`CPU_MODEL="$( (command -v lscpu >/dev/null 2>&1 && lscpu | awk -F: '/Model name/ {gsub(/^[ \t]+/, "", $2); print $2; exit}') || awk -F: '/model name/ {gsub(/^[ \t]+/, "", $2); print $2; exit}' /proc/cpuinfo )"; echo CPU_MODEL="${CPU_MODEL}"`,
`GPU_MODEL="$( (command -v lspci >/dev/null 2>&1 && lspci | awk -F': ' '/VGA compatible controller|3D controller|Display controller/ {print $2; exit}') || true )"; echo GPU_MODEL="${GPU_MODEL}"`,
`DEFAULT_SHELL="${SHELL:-$(getent passwd "$(id -un 2>/dev/null)" 2>/dev/null | cut -d: -f7)}"; echo DEFAULT_SHELL="${DEFAULT_SHELL}"`,
`PACKAGE_COUNT="$(
if command -v dpkg-query >/dev/null 2>&1; then dpkg-query -f '.' -W 2>/dev/null | wc -c;
elif command -v rpm >/dev/null 2>&1; then rpm -qa 2>/dev/null | wc -l;
elif command -v pacman >/dev/null 2>&1; then pacman -Qq 2>/dev/null | wc -l;
elif command -v apk >/dev/null 2>&1; then apk info 2>/dev/null | wc -l;
elif command -v nix-store >/dev/null 2>&1; then nix-store --gc --print-live 2>/dev/null | wc -l;
else printf 0; fi
)"; echo PACKAGE_COUNT="${PACKAGE_COUNT:-0}"`,
`MEMORY_MB="$(awk '/MemTotal/ {printf "%d", $2/1024}' /proc/meminfo 2>/dev/null)"; echo MEMORY_MB="${MEMORY_MB:-0}"`,
`DISK_GB="$(df -BG / 2>/dev/null | awk 'NR==2 {gsub(/G/, "", $2); print $2}')"; echo DISK_GB="${DISK_GB:-0}"`,
}, " ; "))
if err != nil {
s.logCommandRun(ctx, commandRunParams{
NodeID: node.ID,
Action: "refresh-inventory",
CommandText: sanitizeCommand(strings.Join([]string{
`read /etc/os-release, hostname, kernel, package manager, cpu, gpu, shell, package count, memory, disk`,
}, "")),
Status: "failed",
Output: strings.TrimSpace(output + "\n" + err.Error()),
})
return "", err
}
values := map[string]string{}
for _, line := range strings.Split(output, "\n") {
line = strings.TrimSpace(line)
parts := strings.SplitN(line, "=", 2)
if len(parts) != 2 {
continue
}
values[parts[0]] = strings.Trim(strings.TrimSpace(parts[1]), `"`)
}
var memoryMB int64
var diskGB int64
var packageCount int64
fmt.Sscanf(values["MEMORY_MB"], "%d", &memoryMB)
fmt.Sscanf(values["DISK_GB"], "%d", &diskGB)
fmt.Sscanf(values["PACKAGE_COUNT"], "%d", &packageCount)
node.Distro = fallbackInventoryValue(values["DISTRO"], node.Distro, "Linux")
node.Hostname = fallbackInventoryValue(values["HOSTNAME"], node.Hostname, node.IPAddress)
node.MACAddress = fallbackInventoryValue(values["MAC_ADDR"], node.MACAddress, "")
node.PackageManager = fallbackInventoryValue(values["PKG_MGR"], node.PackageManager, "")
node.Architecture = fallbackInventoryValue(values["ARCH"], node.Architecture, "")
node.HostModel = fallbackInventoryValue(values["HOST_MODEL"], node.HostModel, "")
node.KernelVersion = fallbackInventoryValue(values["KERNEL"], node.KernelVersion, "")
node.CPUModel = fallbackInventoryValue(values["CPU_MODEL"], node.CPUModel, "")
node.GPUModel = fallbackInventoryValue(values["GPU_MODEL"], node.GPUModel, "")
node.DefaultShell = fallbackInventoryValue(values["DEFAULT_SHELL"], node.DefaultShell, "")
if packageCount > 0 {
node.PackageCount = packageCount
}
if memoryMB > 0 {
node.MemoryTotalMB = memoryMB
}
if diskGB > 0 {
node.DiskTotalGB = diskGB
}
_, err = s.db.ExecContext(ctx, `
UPDATE nodes
SET distro = ?, hostname = ?, mac_address = ?, package_manager = ?, architecture = ?, host_model = ?, kernel_version = ?, cpu_model = ?, gpu_model = ?, default_shell = ?, package_count = ?, memory_total_mb = ?, disk_total_gb = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ? AND organization_id = ?
`, node.Distro, node.Hostname, node.MACAddress, node.PackageManager, node.Architecture, node.HostModel, node.KernelVersion, node.CPUModel, node.GPUModel, node.DefaultShell, node.PackageCount, node.MemoryTotalMB, node.DiskTotalGB, node.ID, node.OrganizationID)
if err != nil {
return output, err
}
s.logCommandRun(ctx, commandRunParams{
NodeID: node.ID,
Action: "refresh-inventory",
CommandText: "inventory probe",
Status: "completed",
Output: output,
})
return output, nil
}
func fallbackInventoryValue(primary, current, fallback string) string {
if strings.TrimSpace(primary) != "" {
return strings.TrimSpace(primary)
}
if strings.TrimSpace(current) != "" {
return strings.TrimSpace(current)
}
return fallback
}
func (s *NodeService) RunSSHCommand(ctx context.Context, node *models.Node, command string) (string, error) {
config := &ssh.ClientConfig{
User: node.SSHUsername,
Auth: []ssh.AuthMethod{ssh.Password(node.SSHPassword)},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
Timeout: 15 * time.Second,
}
client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", node.IPAddress, node.SSHPort), config)
if err != nil {
return "", err
}
defer client.Close()
session, err := client.NewSession()
if err != nil {
return "", err
}
defer session.Close()
output, err := session.CombinedOutput(command)
return string(output), err
}
func (s *NodeService) RunAction(ctx context.Context, node *models.Node, action string, userID *int64) (string, error) {
command := map[string]string{
"shutdown": "sudo shutdown -h now",
"restart": "sudo reboot",
"wake": "",
"refresh": "printf 'refresh requested'",
"apt-upgrade": "sudo apt update && sudo apt upgrade -y && sudo apt autoremove -y",
"apt-install": "sudo apt update",
}[action]
var output string
var err error
if action == "wake" {
err = sendMagicPacket(node.MACAddress)
output = "Wake-on-LAN packet sent"
} else if action == "refresh" {
output, err = s.RefreshNodeStats(ctx, node)
} else if command != "" {
output, err = s.RunSSHCommand(ctx, node, command)
} else {
err = fmt.Errorf("unknown action")
}
status := "completed"
if err != nil {
status = "failed"
}
s.logCommandRun(ctx, commandRunParams{
NodeID: node.ID,
Action: action,
CommandText: sanitizeCommand(command),
Status: status,
Output: output,
TriggeredBy: userID,
})
return output, err
}
func (s *NodeService) RunAdHocCommand(ctx context.Context, node *models.Node, label, command string, userID *int64) (string, error) {
output, err := s.RunSSHCommand(ctx, node, command)
status := "completed"
if err != nil {
status = "failed"
output = strings.TrimSpace(output + "\n" + err.Error())
}
now := time.Now()
s.logCommandRun(ctx, commandRunParams{
NodeID: node.ID,
Action: label,
CommandText: sanitizeCommand(command),
Status: status,
Output: output,
TriggeredBy: userID,
StartedAt: &now,
FinishedAt: &now,
})
return output, err
}
func (s *NodeService) ListAutomations(ctx context.Context, orgID int64) ([]models.AutomationJob, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT j.id, j.organization_id, j.node_id, j.group_id, j.tag, j.name, j.trigger_type, j.schedule,
j.command, j.enabled, j.last_run_at, j.next_run_at, j.created_at,
COALESCE(n.name, ''), COALESCE(g.name, '')
FROM automation_jobs j
LEFT JOIN nodes n ON n.id = j.node_id
LEFT JOIN vm_groups g ON g.id = j.group_id
WHERE j.organization_id = ?
ORDER BY j.name ASC
`, orgID)
if err != nil {
return nil, err
}
defer rows.Close()
var jobs []models.AutomationJob
for rows.Next() {
var job models.AutomationJob
if err := rows.Scan(
&job.ID, &job.OrganizationID, &job.NodeID, &job.GroupID, &job.Tag, &job.Name, &job.TriggerType,
&job.Schedule, &job.Command, &job.Enabled, &job.LastRunAt, &job.NextRunAt, &job.CreatedAt,
&job.NodeName, &job.GroupName,
); err != nil {
return nil, err
}
jobs = append(jobs, job)
}
return jobs, rows.Err()
}
func (s *NodeService) CreateAutomation(ctx context.Context, job *models.AutomationJob) error {
job.NextRunAt = nextRunForSchedule(job.Schedule)
_, err := s.db.ExecContext(ctx, `
INSERT INTO automation_jobs (organization_id, node_id, group_id, tag, name, trigger_type, schedule, command, enabled, next_run_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, job.OrganizationID, job.NodeID, job.GroupID, job.Tag, job.Name, job.TriggerType, job.Schedule, job.Command, job.Enabled, job.NextRunAt)
return err
}
func (s *NodeService) ListJobRuns(ctx context.Context, orgID int64) ([]models.CommandRun, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT cr.id, cr.job_id, cr.node_id, cr.action, cr.command_text, cr.status, cr.output, cr.triggered_by,
cr.started_at, cr.finished_at, COALESCE(j.name, ''), COALESCE(n.name, ''), COALESCE(g.name, '')
FROM command_runs cr
LEFT JOIN automation_jobs j ON j.id = cr.job_id
LEFT JOIN nodes n ON n.id = cr.node_id
LEFT JOIN vm_groups g ON g.id = n.group_id
WHERE j.organization_id = ? OR (j.id IS NULL AND n.organization_id = ?)
ORDER BY cr.started_at DESC
LIMIT 50
`, orgID, orgID)
if err != nil {
return nil, err
}
defer rows.Close()
var runs []models.CommandRun
for rows.Next() {
var run models.CommandRun
if err := rows.Scan(
&run.ID, &run.JobID, &run.NodeID, &run.Action, &run.CommandText, &run.Status, &run.Output, &run.TriggeredBy,
&run.StartedAt, &run.FinishedAt, &run.JobName, &run.NodeName, &run.GroupName,
); err != nil {
return nil, err
}
run.DurationText = formatDuration(run.StartedAt, run.FinishedAt)
runs = append(runs, run)
}
return runs, rows.Err()
}
func (s *NodeService) ListCommandHistory(ctx context.Context, orgID int64) ([]models.CommandRun, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT cr.id, cr.job_id, cr.node_id, cr.action, cr.command_text, cr.status, cr.output, cr.triggered_by,
cr.started_at, cr.finished_at, COALESCE(j.name, ''), COALESCE(n.name, ''), COALESCE(g.name, '')
FROM command_runs cr
LEFT JOIN automation_jobs j ON j.id = cr.job_id
LEFT JOIN nodes n ON n.id = cr.node_id
LEFT JOIN vm_groups g ON g.id = n.group_id
WHERE n.organization_id = ? OR j.organization_id = ?
ORDER BY cr.started_at DESC
LIMIT 200
`, orgID, orgID)
if err != nil {
return nil, err
}
defer rows.Close()
var runs []models.CommandRun
for rows.Next() {
var run models.CommandRun
if err := rows.Scan(
&run.ID, &run.JobID, &run.NodeID, &run.Action, &run.CommandText, &run.Status, &run.Output, &run.TriggeredBy,
&run.StartedAt, &run.FinishedAt, &run.JobName, &run.NodeName, &run.GroupName,
); err != nil {
return nil, err
}
run.DurationText = formatDuration(run.StartedAt, run.FinishedAt)
runs = append(runs, run)
}
return runs, rows.Err()
}
func (s *NodeService) ListUptimeMonitors(ctx context.Context, orgID int64) ([]models.UptimeMonitor, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT m.id, m.organization_id, m.node_id, COALESCE(n.name, ''), COALESCE(g.name, ''),
m.name, m.target, m.monitor_type, m.interval_seconds, m.enabled, m.last_status,
m.last_latency_ms, m.last_checked_at, m.last_error, m.up_since_at, m.current_outage_started_at,
m.created_at, m.updated_at
FROM uptime_monitors m
LEFT JOIN nodes n ON n.id = m.node_id
LEFT JOIN vm_groups g ON g.id = n.group_id
WHERE m.organization_id = ?
ORDER BY m.name ASC
`, orgID)
if err != nil {
return nil, err
}
defer rows.Close()
var monitors []models.UptimeMonitor
for rows.Next() {
var monitor models.UptimeMonitor
if err := rows.Scan(
&monitor.ID, &monitor.OrganizationID, &monitor.NodeID, &monitor.NodeName, &monitor.GroupName,
&monitor.Name, &monitor.Target, &monitor.MonitorType, &monitor.IntervalSeconds, &monitor.Enabled, &monitor.LastStatus,
&monitor.LastLatencyMS, &monitor.LastCheckedAt, &monitor.LastError, &monitor.UpSinceAt, &monitor.CurrentOutageStartedAt,
&monitor.CreatedAt, &monitor.UpdatedAt,
); err != nil {
return nil, err
}
monitors = append(monitors, monitor)
}
return monitors, rows.Err()
}
func (s *NodeService) ListRecentUptimeChecks(ctx context.Context, orgID int64, limitPerMonitor int) (map[int64][]models.UptimeCheck, error) {
if limitPerMonitor <= 0 {
limitPerMonitor = 24
}
rows, err := s.db.QueryContext(ctx, `
SELECT c.id, c.monitor_id, c.status, c.latency_ms, c.error_message, c.checked_at
FROM uptime_checks c
INNER JOIN uptime_monitors m ON m.id = c.monitor_id
WHERE m.organization_id = ?
ORDER BY c.checked_at DESC
LIMIT 500
`, orgID)
if err != nil {
return nil, err
}
defer rows.Close()
results := map[int64][]models.UptimeCheck{}
for rows.Next() {
var check models.UptimeCheck
if err := rows.Scan(&check.ID, &check.MonitorID, &check.Status, &check.LatencyMS, &check.ErrorMessage, &check.CheckedAt); err != nil {
return nil, err
}
if len(results[check.MonitorID]) >= limitPerMonitor {
continue
}
results[check.MonitorID] = append(results[check.MonitorID], check)
}
return results, rows.Err()
}
func (s *NodeService) ListRecentUptimeIncidents(ctx context.Context, orgID int64, limit int) ([]models.UptimeIncident, error) {
if limit <= 0 {
limit = 20
}
rows, err := s.db.QueryContext(ctx, `
SELECT i.id, i.monitor_id, COALESCE(m.name, ''), COALESCE(n.name, ''), COALESCE(g.name, ''),
i.error_message, i.started_at, i.ended_at
FROM uptime_incidents i
INNER JOIN uptime_monitors m ON m.id = i.monitor_id
LEFT JOIN nodes n ON n.id = m.node_id
LEFT JOIN vm_groups g ON g.id = n.group_id
WHERE m.organization_id = ?
ORDER BY i.started_at DESC
LIMIT ?
`, orgID, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var incidents []models.UptimeIncident
for rows.Next() {
var incident models.UptimeIncident
if err := rows.Scan(
&incident.ID, &incident.MonitorID, &incident.MonitorName, &incident.NodeName, &incident.GroupName,
&incident.ErrorMessage, &incident.StartedAt, &incident.EndedAt,
); err != nil {
return nil, err
}
incident.DurationSeconds = incidentDurationSeconds(incident.StartedAt, incident.EndedAt)
incident.DurationText = humanDuration(time.Duration(incident.DurationSeconds) * time.Second)
incidents = append(incidents, incident)
}
return incidents, rows.Err()
}
func (s *NodeService) UptimePeriodSummary(ctx context.Context, orgID int64, since *time.Time) (models.UptimePeriodSummary, error) {
var summary models.UptimePeriodSummary
if since != nil {
if err := s.db.QueryRowContext(ctx, `
SELECT
COUNT(*),
COALESCE(SUM(CASE WHEN c.status = 'up' THEN 1 ELSE 0 END), 0),
COALESCE(SUM(CASE WHEN c.status = 'down' THEN 1 ELSE 0 END), 0),
COALESCE(CAST(AVG(CASE WHEN c.status = 'up' THEN c.latency_ms END) AS INTEGER), 0),
COALESCE(SUM(CASE WHEN c.status = 'down' THEN m.interval_seconds ELSE 0 END), 0)
FROM uptime_checks c
INNER JOIN uptime_monitors m ON m.id = c.monitor_id
WHERE m.organization_id = ? AND c.checked_at >= ?
`, orgID, *since).Scan(
&summary.TotalChecks, &summary.UpChecks, &summary.DownChecks, &summary.AvgLatencyMS, &summary.DowntimeSeconds,
); err != nil {
return summary, err
}
} else {
if err := s.db.QueryRowContext(ctx, `
SELECT
COUNT(*),
COALESCE(SUM(CASE WHEN c.status = 'up' THEN 1 ELSE 0 END), 0),
COALESCE(SUM(CASE WHEN c.status = 'down' THEN 1 ELSE 0 END), 0),
COALESCE(CAST(AVG(CASE WHEN c.status = 'up' THEN c.latency_ms END) AS INTEGER), 0),
COALESCE(SUM(CASE WHEN c.status = 'down' THEN m.interval_seconds ELSE 0 END), 0)
FROM uptime_checks c
INNER JOIN uptime_monitors m ON m.id = c.monitor_id
WHERE m.organization_id = ?
`, orgID).Scan(
&summary.TotalChecks, &summary.UpChecks, &summary.DownChecks, &summary.AvgLatencyMS, &summary.DowntimeSeconds,
); err != nil {
return summary, err
}
}
var longest sql.NullInt64
var avg sql.NullFloat64
if since != nil {
if err := s.db.QueryRowContext(ctx, `
SELECT
COUNT(*),
MAX(CAST((strftime('%s', COALESCE(i.ended_at, CURRENT_TIMESTAMP)) - strftime('%s', i.started_at)) AS INTEGER)),
AVG(CAST((strftime('%s', COALESCE(i.ended_at, CURRENT_TIMESTAMP)) - strftime('%s', i.started_at)) AS INTEGER))
FROM uptime_incidents i
INNER JOIN uptime_monitors m ON m.id = i.monitor_id
WHERE m.organization_id = ? AND i.started_at >= ?
`, orgID, *since).Scan(
&summary.IncidentCount, &longest, &avg,
); err != nil {
return summary, err
}
} else {
if err := s.db.QueryRowContext(ctx, `
SELECT
COUNT(*),
MAX(CAST((strftime('%s', COALESCE(i.ended_at, CURRENT_TIMESTAMP)) - strftime('%s', i.started_at)) AS INTEGER)),
AVG(CAST((strftime('%s', COALESCE(i.ended_at, CURRENT_TIMESTAMP)) - strftime('%s', i.started_at)) AS INTEGER))
FROM uptime_incidents i
INNER JOIN uptime_monitors m ON m.id = i.monitor_id
WHERE m.organization_id = ?
`, orgID).Scan(
&summary.IncidentCount, &longest, &avg,
); err != nil {
return summary, err
}
}
if longest.Valid {
summary.LongestIncidentSeconds = longest.Int64
}
if avg.Valid {
summary.AvgIncidentSeconds = int64(avg.Float64)
}
return summary, nil
}
func (s *NodeService) RunAllUptimeChecks(ctx context.Context, orgID int64) error {
monitors, err := s.ListUptimeMonitors(ctx, orgID)
if err != nil {
return err
}
for i := range monitors {
if !monitors[i].Enabled {
continue
}
_ = s.RunUptimeCheck(ctx, &monitors[i])
}
return nil
}
func (s *NodeService) RunUptimeCheck(ctx context.Context, monitor *models.UptimeMonitor) error {
target := strings.TrimSpace(monitor.Target)
if target == "" {
return fmt.Errorf("empty monitor target")
}
startedAt := time.Now()
timeout := 5 * time.Second
conn, err := net.DialTimeout("tcp", target, timeout)
latencyMS := int64(time.Since(startedAt).Milliseconds())
status := "up"
errorMessage := ""
if err != nil {
status = "down"
errorMessage = err.Error()
} else {
_ = conn.Close()
}
if latencyMS < 0 {
latencyMS = 0
}
if _, execErr := s.db.ExecContext(ctx, `
INSERT INTO uptime_checks (monitor_id, status, latency_ms, error_message, checked_at)
VALUES (?, ?, ?, ?, ?)
`, monitor.ID, status, latencyMS, errorMessage, startedAt); execErr != nil {
return execErr
}
if status == "down" && monitor.LastStatus != "down" {
if _, execErr := s.db.ExecContext(ctx, `
INSERT INTO uptime_incidents (monitor_id, error_message, started_at)
VALUES (?, ?, ?)
`, monitor.ID, errorMessage, startedAt); execErr != nil {
return execErr
}
}
if status == "up" && monitor.LastStatus == "down" {
if _, execErr := s.db.ExecContext(ctx, `
UPDATE uptime_incidents
SET ended_at = ?
WHERE id = (
SELECT id
FROM uptime_incidents
WHERE monitor_id = ? AND ended_at IS NULL
ORDER BY started_at DESC
LIMIT 1
)
`, startedAt, monitor.ID); execErr != nil {
return execErr
}
}
var upSinceAt any
var outageStartedAt any
if status == "up" {
if monitor.LastStatus == "up" && monitor.UpSinceAt != nil {
upSinceAt = *monitor.UpSinceAt
} else {
upSinceAt = startedAt
}
outageStartedAt = nil
} else {
if monitor.LastStatus == "down" && monitor.CurrentOutageStartedAt != nil {
outageStartedAt = *monitor.CurrentOutageStartedAt
} else {
outageStartedAt = startedAt
}
upSinceAt = nil
}
_, err = s.db.ExecContext(ctx, `
UPDATE uptime_monitors
SET last_status = ?, last_latency_ms = ?, last_checked_at = ?, last_error = ?,
up_since_at = ?, current_outage_started_at = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
`, status, latencyMS, startedAt, errorMessage, upSinceAt, outageStartedAt, monitor.ID)
if err != nil {
return err
}
monitor.LastStatus = status
monitor.LastLatencyMS = latencyMS
monitor.LastCheckedAt = &startedAt
monitor.LastError = errorMessage
if status == "up" {
if upTime, ok := upSinceAt.(time.Time); ok {
monitor.UpSinceAt = &upTime
}
monitor.CurrentOutageStartedAt = nil
} else {
if downTime, ok := outageStartedAt.(time.Time); ok {
monitor.CurrentOutageStartedAt = &downTime
}
monitor.UpSinceAt = nil
}
return nil
}
func sendMagicPacket(macAddress string) error {
hw, err := net.ParseMAC(macAddress)
if err != nil {
return err
}
packet := make([]byte, 6+16*len(hw))
for i := 0; i < 6; i++ {
packet[i] = 0xFF
}
for i := 6; i < len(packet); i += len(hw) {
copy(packet[i:], hw)
}
conn, err := net.Dial("udp", "255.255.255.255:9")
if err != nil {
return err
}
defer conn.Close()
_, err = conn.Write(packet)
return err
}
type SchedulerService struct {
cron *cron.Cron
nodeService *NodeService
db *sql.DB
archiveDir string
}
func NewSchedulerService(database *sql.DB, nodeService *NodeService, archiveDir string) *SchedulerService {
return &SchedulerService{
cron: cron.New(),
nodeService: nodeService,
db: database,
archiveDir: archiveDir,
}
}
func (s *SchedulerService) Start(ctx context.Context, orgID int64, refreshSpec string) error {
if err := s.nodeService.EnsureUptimeMonitors(ctx, orgID); err != nil {
return err
}
if _, err := s.cron.AddFunc(refreshSpec, func() {
nodes, err := s.nodeService.ListNodes(ctx, orgID)
if err != nil {
return
}
for i := range nodes {
if nodes[i].SSHUsername == "" || nodes[i].SSHPassword == "" {
continue
}
_, _ = s.nodeService.RefreshNodeStats(context.Background(), &nodes[i])
}
}); err != nil {
return err
}
if _, err := s.cron.AddFunc("@every 1m", func() {
_ = s.nodeService.EnsureUptimeMonitors(context.Background(), orgID)
_ = s.nodeService.RunAllUptimeChecks(context.Background(), orgID)
}); err != nil {
return err
}
if _, err := s.cron.AddFunc("@daily", func() {
_ = s.nodeService.RefreshAllNodeInventory(context.Background(), orgID)
_ = s.nodeService.ScanAllNodeUpdates(context.Background(), orgID)
_ = s.nodeService.ArchiveCommandRuns(context.Background(), orgID, time.Now().AddDate(0, 0, -1), s.archiveDir)
}); err != nil {
return err
}
if _, err := s.cron.AddFunc("@every 15m", func() {
_ = s.nodeService.ApplyEligibleAutoUpdates(context.Background(), orgID)
}); err != nil {
return err
}
jobs, err := s.nodeService.ListAutomations(ctx, orgID)
if err != nil {
return err
}
for _, job := range jobs {
if !job.Enabled || job.Schedule == "" {
continue
}
job := job
_, _ = s.db.ExecContext(ctx, `UPDATE automation_jobs SET next_run_at = ? WHERE id = ?`, nextRunForSchedule(job.Schedule), job.ID)
_, err := s.cron.AddFunc(job.Schedule, func() {
s.runAutomation(context.Background(), job)
})
if err != nil {
return err
}
}
s.cron.Start()
return nil
}
func (s *SchedulerService) runAutomation(ctx context.Context, job models.AutomationJob) {
targets, err := s.resolveTargets(ctx, job)
if err != nil || len(targets) == 0 {
return
}
lastRunAt := time.Now()
for _, node := range targets {
startedAt := time.Now()
output, runErr := s.nodeService.RunSSHCommand(ctx, &node, job.Command)
status := "completed"
if runErr != nil {
status = "failed"
output = strings.TrimSpace(output + "\n" + runErr.Error())
}
finishedAt := time.Now()
s.nodeService.logCommandRun(ctx, commandRunParams{
JobID: &job.ID,
NodeID: node.ID,
Action: job.Name,
CommandText: sanitizeCommand(job.Command),
Status: status,
Output: output,
StartedAt: &startedAt,
FinishedAt: &finishedAt,
})
lastRunAt = finishedAt
}
next := nextRunForSchedule(job.Schedule)
_, _ = s.db.ExecContext(ctx, `
UPDATE automation_jobs
SET last_run_at = ?, next_run_at = ?
WHERE id = ?
`, lastRunAt, next, job.ID)
}
func (s *SchedulerService) resolveTargets(ctx context.Context, job models.AutomationJob) ([]models.Node, error) {
if job.NodeID != nil {
node, err := s.nodeService.GetNode(ctx, job.OrganizationID, *job.NodeID)
if err != nil {
return nil, err
}
return []models.Node{*node}, nil
}
if job.GroupID != nil {
return s.nodeService.ListNodesByGroup(ctx, job.OrganizationID, *job.GroupID)
}
if strings.TrimSpace(job.Tag) != "" {
return s.nodeService.ListNodesByTag(ctx, job.OrganizationID, job.Tag)
}
return nil, nil
}
func nextRunForSchedule(spec string) *time.Time {
if strings.TrimSpace(spec) == "" {
return nil
}
parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
schedule, err := parser.Parse(spec)
if err != nil {
return nil
}
next := schedule.Next(time.Now())
return &next
}
func formatDuration(startedAt time.Time, finishedAt *time.Time) string {
if finishedAt == nil {
return "running"
}
duration := finishedAt.Sub(startedAt).Round(time.Second)
if duration < time.Minute {
return duration.String()
}
hours := int(duration.Hours())
minutes := int(duration.Minutes()) % 60
seconds := int(duration.Seconds()) % 60
if hours > 0 {
return fmt.Sprintf("%dh %dm %ds", hours, minutes, seconds)
}
return fmt.Sprintf("%dm %ds", minutes, seconds)
}
type commandRunParams struct {
JobID *int64
NodeID int64
Action string
CommandText string
Status string
Output string
TriggeredBy *int64
StartedAt *time.Time
FinishedAt *time.Time
}
func (s *NodeService) logCommandRun(ctx context.Context, params commandRunParams) {
startedAt := time.Now()
if params.StartedAt != nil {
startedAt = *params.StartedAt
}
finishedAt := params.FinishedAt
if finishedAt == nil {
value := time.Now()
finishedAt = &value
}
_, _ = s.db.ExecContext(ctx, `
INSERT INTO command_runs (job_id, node_id, action, command_text, status, output, triggered_by, started_at, finished_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`, params.JobID, params.NodeID, params.Action, params.CommandText, params.Status, params.Output, params.TriggeredBy, startedAt, finishedAt)
}
type archivedCommandRun struct {
ID int64
GroupName string
NodeName string
Action string
CommandText string
Status string
Output string
StartedAt time.Time
FinishedAt *time.Time
UserName string
}
func (s *NodeService) ArchiveCommandRuns(ctx context.Context, orgID int64, day time.Time, archiveDir string) error {
if strings.TrimSpace(archiveDir) == "" {
return nil
}
dayStart := time.Date(day.Year(), day.Month(), day.Day(), 0, 0, 0, 0, time.Local)
dayEnd := dayStart.AddDate(0, 0, 1)
archiveDate := dayStart.Format("2006-01-02")
var alreadyArchived int
if err := s.db.QueryRowContext(ctx, `
SELECT COUNT(1)
FROM log_archives
WHERE organization_id = ? AND archive_date = ?
`, orgID, archiveDate).Scan(&alreadyArchived); err != nil {
return err
}
if alreadyArchived > 0 {
return nil
}
rows, err := s.db.QueryContext(ctx, `
SELECT cr.id, COALESCE(g.name, ''), COALESCE(n.name, ''), cr.action, cr.command_text, cr.status, cr.output,
cr.started_at, cr.finished_at, COALESCE(u.name, '')
FROM command_runs cr
LEFT JOIN nodes n ON n.id = cr.node_id
LEFT JOIN vm_groups g ON g.id = n.group_id
LEFT JOIN automation_jobs j ON j.id = cr.job_id
LEFT JOIN users u ON u.id = cr.triggered_by
WHERE (n.organization_id = ? OR j.organization_id = ?)
AND cr.started_at >= ? AND cr.started_at < ?
ORDER BY cr.started_at ASC, cr.id ASC
`, orgID, orgID, dayStart, dayEnd)
if err != nil {
return err
}
defer rows.Close()
var runs []archivedCommandRun
for rows.Next() {
var run archivedCommandRun
if err := rows.Scan(
&run.ID, &run.GroupName, &run.NodeName, &run.Action, &run.CommandText, &run.Status, &run.Output,
&run.StartedAt, &run.FinishedAt, &run.UserName,
); err != nil {
return err
}
runs = append(runs, run)
}
if err := rows.Err(); err != nil {
return err
}
if len(runs) == 0 {
return nil
}
if err := os.MkdirAll(archiveDir, 0o755); err != nil {
return err
}
archivePath := filepath.Join(archiveDir, fmt.Sprintf("command-runs-%s.log.gz", archiveDate))
if _, err := os.Stat(archivePath); err == nil {
return fmt.Errorf("log archive already exists for %s", archiveDate)
} else if err != nil && !os.IsNotExist(err) {
return err
}
tempPath := archivePath + ".tmp"
file, err := os.Create(tempPath)
if err != nil {
return err
}
success := false
defer func() {
_ = file.Close()
if !success {
_ = os.Remove(tempPath)
}
}()
gzipWriter := gzip.NewWriter(file)
for _, run := range runs {
target := run.NodeName
if strings.TrimSpace(run.GroupName) != "" {
target = run.GroupName + " / " + target
}
commandText := strings.TrimSpace(run.CommandText)
if commandText == "" {
commandText = strings.TrimSpace(run.Action)
}
duration := formatDuration(run.StartedAt, run.FinishedAt)
finishedAt := ""
if run.FinishedAt != nil {
finishedAt = run.FinishedAt.Format(time.RFC3339)
}
output := strings.TrimSpace(run.Output)
if output == "" {
output = "-"
}
if _, err := fmt.Fprintf(gzipWriter,
"[%s] %d\nTarget: %s\nAction: %s\nCommand: %s\nStatus: %s\nDuration: %s\nFinished: %s\nTriggered By: %s\nOutput:\n%s\n\n",
run.StartedAt.Format(time.RFC3339),
run.ID,
serviceDefaultIfEmpty(target, "Unknown"),
serviceDefaultIfEmpty(strings.TrimSpace(run.Action), "-"),
serviceDefaultIfEmpty(commandText, "-"),
serviceDefaultIfEmpty(strings.TrimSpace(run.Status), "-"),
serviceDefaultIfEmpty(duration, "-"),
serviceDefaultIfEmpty(finishedAt, "-"),
serviceDefaultIfEmpty(strings.TrimSpace(run.UserName), "system"),
output,
); err != nil {
_ = gzipWriter.Close()
return err
}
}
if err := gzipWriter.Close(); err != nil {
return err
}
if err := file.Close(); err != nil {
return err
}
if err := os.Rename(tempPath, archivePath); err != nil {
return err
}
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
_ = os.Remove(archivePath)
return err
}
defer tx.Rollback()
if _, err := tx.ExecContext(ctx, `
INSERT INTO log_archives (organization_id, archive_date, file_path, entry_count, compressed_at)
VALUES (?, ?, ?, ?, ?)
`, orgID, archiveDate, archivePath, len(runs), time.Now()); err != nil {
_ = os.Remove(archivePath)
return err
}
if _, err := tx.ExecContext(ctx, `
DELETE FROM command_runs
WHERE id IN (
SELECT cr.id
FROM command_runs cr
LEFT JOIN nodes n ON n.id = cr.node_id
LEFT JOIN automation_jobs j ON j.id = cr.job_id
WHERE (n.organization_id = ? OR j.organization_id = ?)
AND cr.started_at >= ? AND cr.started_at < ?
)
`, orgID, orgID, dayStart, dayEnd); err != nil {
_ = os.Remove(archivePath)
return err
}
if err := tx.Commit(); err != nil {
_ = os.Remove(archivePath)
return err
}
success = true
return nil
}
func sanitizeCommand(command string) string {
trimmed := strings.TrimSpace(command)
if trimmed == "" {
return ""
}
patterns := []*regexp.Regexp{
regexp.MustCompile(`(?i)(--password(?:=|\s+))(\S+)`),
regexp.MustCompile(`(?i)(--token(?:=|\s+))(\S+)`),
regexp.MustCompile(`(?i)(--secret(?:=|\s+))(\S+)`),
regexp.MustCompile(`(?i)\b(password|passwd|token|secret|api[_-]?key)\s*=\s*(['"]?)[^'"\s]+(['"]?)`),
}
sanitized := trimmed
for _, pattern := range patterns {
sanitized = pattern.ReplaceAllString(sanitized, `$1[REDACTED]`)
}
return sanitized
}
func serviceDefaultIfEmpty(value, fallback string) string {
if strings.TrimSpace(value) == "" {
return fallback
}
return value
}
func updateScanCommand(packageManager string) string {
switch strings.ToLower(strings.TrimSpace(packageManager)) {
case "apt":
return `apt list --upgradable 2>/dev/null | awk 'BEGIN{count=0} NR > 1 && /\[upgradable from:/ { count++; pkg=$1; sub(/\/.*/, "", pkg); arch=""; split($1, parts, "/"); if (length(parts) > 1) arch=parts[2]; from=$0; sub(/^.*\[upgradable from: /, "", from); sub(/\].*$/, "", from); printf "PKG|%s|%s|%s|%s\n", pkg, from, $2, arch } END { printf "UPDATES=%d\n", count }'`
case "dnf":
return `tmp="$(mktemp)"; dnf check-update -q >"$tmp" 2>/dev/null; status=$?; awk -v status="$status" 'BEGIN{count=0} status == 100 && /^[[:alnum:]_.+-]+[[:space:]]+[[:alnum:]_.:+~^-]+[[:space:]]+/ { count++; pkg=$1; ver=$2; repo=$3; arch=""; if (match(pkg, /\.([^.]+)$/, m)) { arch=m[1]; sub(/\.[^.]+$/, "", pkg) } printf "PKG|%s||%s|%s\n", pkg, ver, arch } END { printf "UPDATES=%d\n", count }' "$tmp"; rm -f "$tmp"`
case "yum":
return `tmp="$(mktemp)"; yum check-update -q >"$tmp" 2>/dev/null; status=$?; awk -v status="$status" 'BEGIN{count=0} status == 100 && /^[[:alnum:]_.+-]+[[:space:]]+[[:alnum:]_.:+~^-]+[[:space:]]+/ { count++; pkg=$1; ver=$2; arch=""; if (match(pkg, /\.([^.]+)$/, m)) { arch=m[1]; sub(/\.[^.]+$/, "", pkg) } printf "PKG|%s||%s|%s\n", pkg, ver, arch } END { printf "UPDATES=%d\n", count }' "$tmp"; rm -f "$tmp"`
case "pacman":
return `if command -v checkupdates >/dev/null 2>&1; then checkupdates 2>/dev/null; else pacman -Qu 2>/dev/null; fi | awk 'BEGIN{count=0} NF >= 3 {count++; printf "PKG|%s|%s|%s|\n", $1, $2, $4} END { printf "UPDATES=%d\n", count }'`
case "zypper":
return `zypper list-updates 2>/dev/null | awk 'BEGIN{count=0} /^\|/ && $0 !~ /\| Repository / && $0 !~ /\| Status / && $0 !~ /\+-/ { split($0, parts, "|"); pkg=trim(parts[3]); cur=trim(parts[4]); avail=trim(parts[5]); arch=trim(parts[6]); if (pkg != "" && pkg != "Name") { count++; printf "PKG|%s|%s|%s|%s\n", pkg, cur, avail, arch } } END { printf "UPDATES=%d\n", count } function trim(value) { gsub(/^[ \t]+|[ \t]+$/, "", value); return value }'`
case "apk":
return `apk version -l '<' 2>/dev/null | awk 'BEGIN{count=0} NF {count++; pkg=$1; sub(/-[0-9].*$/, "", pkg); printf "PKG|%s||%s|\n", pkg, $0} END { printf "UPDATES=%d\n", count }'`
default:
return ""
}
}
func updateApplyCommand(packageManager string) string {
switch strings.ToLower(strings.TrimSpace(packageManager)) {
case "apt":
return `if command -v sudo >/dev/null 2>&1; then sudo -n apt-get update && sudo -n DEBIAN_FRONTEND=noninteractive apt-get -y upgrade; else apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y upgrade; fi`
case "dnf":
return `if command -v sudo >/dev/null 2>&1; then sudo -n dnf -y upgrade --refresh; else dnf -y upgrade --refresh; fi`
case "yum":
return `if command -v sudo >/dev/null 2>&1; then sudo -n yum -y update; else yum -y update; fi`
case "pacman":
return `if command -v sudo >/dev/null 2>&1; then sudo -n pacman -Syu --noconfirm; else pacman -Syu --noconfirm; fi`
case "zypper":
return `if command -v sudo >/dev/null 2>&1; then sudo -n zypper --non-interactive update; else zypper --non-interactive update; fi`
case "apk":
return `if command -v sudo >/dev/null 2>&1; then sudo -n apk update && sudo -n apk upgrade; else apk update && apk upgrade; fi`
default:
return ""
}
}
func sudoWrappedCommand(command, password string) string {
command = strings.TrimSpace(command)
if command == "" {
return ""
}
quotedCommand := shellSingleQuote(command)
if strings.TrimSpace(password) == "" {
return fmt.Sprintf("if command -v sudo >/dev/null 2>&1; then sudo -n sh -lc %s; else sh -lc %s; fi", quotedCommand, quotedCommand)
}
quotedPassword := shellSingleQuote(password)
return fmt.Sprintf("if command -v sudo >/dev/null 2>&1; then printf '%%s\\n' %s | sudo -S -p '' sh -lc %s; else sh -lc %s; fi", quotedPassword, quotedCommand, quotedCommand)
}
func shellSingleQuote(value string) string {
return "'" + strings.ReplaceAll(value, "'", `'"'"'`) + "'"
}
func parseUpdateScan(output string) (int64, []models.NodeUpdatePackage, error) {
var packages []models.NodeUpdatePackage
for _, line := range strings.Split(output, "\n") {
line = strings.TrimSpace(line)
if strings.HasPrefix(line, "PKG|") {
parts := strings.SplitN(line, "|", 5)
if len(parts) == 5 {
packages = append(packages, models.NodeUpdatePackage{
Name: strings.TrimSpace(parts[1]),
Current: strings.TrimSpace(parts[2]),
Available: strings.TrimSpace(parts[3]),
Architecture: strings.TrimSpace(parts[4]),
})
}
continue
}
if !strings.HasPrefix(line, "UPDATES=") {
continue
}
var count int64
if _, err := fmt.Sscanf(line, "UPDATES=%d", &count); err != nil {
return 0, nil, err
}
return count, packages, nil
}
return 0, packages, fmt.Errorf("update count not found")
}
func mustMarshalUpdatePackages(packages []models.NodeUpdatePackage) string {
if len(packages) == 0 {
return ""
}
raw, err := json.Marshal(packages)
if err != nil {
return ""
}
return string(raw)
}
func autoUpdateWindowOpen(node *models.Node, org models.Organization, now time.Time) bool {
if node == nil {
return false
}
days := strings.TrimSpace(node.AutoUpdateDays)
start := strings.TrimSpace(node.AutoUpdateWindowStart)
end := strings.TrimSpace(node.AutoUpdateWindowEnd)
if days == "" {
days = strings.TrimSpace(org.AutoUpdateDays)
}
if start == "" {
start = strings.TrimSpace(org.AutoUpdateWindowStart)
}
if end == "" {
end = strings.TrimSpace(org.AutoUpdateWindowEnd)
}
if !updateDayAllowed(days, now.Weekday()) {
return false
}
startMinutes, hasStart := parseClockMinutes(start)
endMinutes, hasEnd := parseClockMinutes(end)
if !hasStart || !hasEnd {
return true
}
currentMinutes := now.Hour()*60 + now.Minute()
if startMinutes == endMinutes {
return true
}
if startMinutes < endMinutes {
return currentMinutes >= startMinutes && currentMinutes <= endMinutes
}
return currentMinutes >= startMinutes || currentMinutes <= endMinutes
}
func (s *NodeService) organizationAutoUpdateWindow(ctx context.Context, orgID int64) (models.Organization, error) {
var org models.Organization
err := s.db.QueryRowContext(ctx, `
SELECT id, name, theme, theme_mode, auto_update_window_start, auto_update_window_end, auto_update_days, created_at
FROM organizations
WHERE id = ?
`, orgID).Scan(&org.ID, &org.Name, &org.Theme, &org.ThemeMode, &org.AutoUpdateWindowStart, &org.AutoUpdateWindowEnd, &org.AutoUpdateDays, &org.CreatedAt)
return org, err
}
func updateDayAllowed(days string, weekday time.Weekday) bool {
selected := strings.TrimSpace(days)
if selected == "" {
return true
}
code := weekdayCode(weekday)
for _, item := range strings.Split(selected, ",") {
if strings.EqualFold(strings.TrimSpace(item), code) {
return true
}
}
return false
}
func parseClockMinutes(value string) (int, bool) {
var hour int
var minute int
if _, err := fmt.Sscanf(strings.TrimSpace(value), "%d:%d", &hour, &minute); err != nil {
return 0, false
}
if hour < 0 || hour > 23 || minute < 0 || minute > 59 {
return 0, false
}
return hour*60 + minute, true
}
func weekdayCode(weekday time.Weekday) string {
switch weekday {
case time.Monday:
return "mon"
case time.Tuesday:
return "tue"
case time.Wednesday:
return "wed"
case time.Thursday:
return "thu"
case time.Friday:
return "fri"
case time.Saturday:
return "sat"
default:
return "sun"
}
}
func incidentDurationSeconds(startedAt time.Time, endedAt *time.Time) int64 {
end := time.Now()
if endedAt != nil {
end = *endedAt
}
if end.Before(startedAt) {
return 0
}
return int64(end.Sub(startedAt).Seconds())
}
func humanDuration(duration time.Duration) string {
if duration < 0 {
duration = 0
}
duration = duration.Round(time.Second)
days := int(duration / (24 * time.Hour))
duration -= time.Duration(days) * 24 * time.Hour
hours := int(duration / time.Hour)
duration -= time.Duration(hours) * time.Hour
minutes := int(duration / time.Minute)
duration -= time.Duration(minutes) * time.Minute
seconds := int(duration / time.Second)
parts := make([]string, 0, 4)
if days > 0 {
parts = append(parts, fmt.Sprintf("%dd", days))
}
if hours > 0 {
parts = append(parts, fmt.Sprintf("%dh", hours))
}
if minutes > 0 {
parts = append(parts, fmt.Sprintf("%dm", minutes))
}
if seconds > 0 || len(parts) == 0 {
parts = append(parts, fmt.Sprintf("%ds", seconds))
}
if len(parts) > 2 {
parts = parts[:2]
}
return strings.Join(parts, " ")
}