1165 lines
38 KiB
Go
1165 lines
38 KiB
Go
package services
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"net"
|
|
"regexp"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/robfig/cron/v3"
|
|
"golang.org/x/crypto/ssh"
|
|
"maintainarr/internal/models"
|
|
)
|
|
|
|
type NodeService struct {
|
|
db *sql.DB
|
|
crypto *CryptoService
|
|
}
|
|
|
|
func NewNodeService(database *sql.DB, crypto *CryptoService) *NodeService {
|
|
return &NodeService{db: database, crypto: crypto}
|
|
}
|
|
|
|
func (s *NodeService) ListNodes(ctx context.Context, orgID int64) ([]models.Node, error) {
|
|
rows, err := s.db.QueryContext(ctx, `
|
|
SELECT n.id, n.organization_id, n.group_id, COALESCE(g.name, ''), n.tag, n.name, n.distro, n.hostname, n.ip_address, n.mac_address,
|
|
n.ssh_port, n.ssh_username, n.ssh_password, n.package_manager, n.architecture, n.host_model, n.kernel_version, n.cpu_model, n.gpu_model, n.default_shell, n.package_count, n.memory_total_mb, n.disk_total_gb,
|
|
n.cpu_usage, n.ram_usage, n.disk_usage,
|
|
n.uptime_seconds, n.last_seen_at, n.auto_updates_enabled, n.notes, n.created_at, n.updated_at
|
|
FROM nodes n
|
|
LEFT JOIN vm_groups g ON g.id = n.group_id
|
|
WHERE n.organization_id = ?
|
|
ORDER BY n.name ASC
|
|
`, orgID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
return s.scanNodes(rows)
|
|
}
|
|
|
|
func (s *NodeService) ListNodesByGroup(ctx context.Context, orgID, groupID int64) ([]models.Node, error) {
|
|
rows, err := s.db.QueryContext(ctx, `
|
|
SELECT n.id, n.organization_id, n.group_id, COALESCE(g.name, ''), n.tag, n.name, n.distro, n.hostname, n.ip_address, n.mac_address,
|
|
n.ssh_port, n.ssh_username, n.ssh_password, n.package_manager, n.architecture, n.host_model, n.kernel_version, n.cpu_model, n.gpu_model, n.default_shell, n.package_count, n.memory_total_mb, n.disk_total_gb,
|
|
n.cpu_usage, n.ram_usage, n.disk_usage,
|
|
n.uptime_seconds, n.last_seen_at, n.auto_updates_enabled, n.notes, n.created_at, n.updated_at
|
|
FROM nodes n
|
|
LEFT JOIN vm_groups g ON g.id = n.group_id
|
|
WHERE n.organization_id = ? AND n.group_id = ?
|
|
ORDER BY n.name ASC
|
|
`, orgID, groupID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
return s.scanNodes(rows)
|
|
}
|
|
|
|
func (s *NodeService) ListNodesByTag(ctx context.Context, orgID int64, tag string) ([]models.Node, error) {
|
|
rows, err := s.db.QueryContext(ctx, `
|
|
SELECT n.id, n.organization_id, n.group_id, COALESCE(g.name, ''), n.tag, n.name, n.distro, n.hostname, n.ip_address, n.mac_address,
|
|
n.ssh_port, n.ssh_username, n.ssh_password, n.package_manager, n.architecture, n.host_model, n.kernel_version, n.cpu_model, n.gpu_model, n.default_shell, n.package_count, n.memory_total_mb, n.disk_total_gb,
|
|
n.cpu_usage, n.ram_usage, n.disk_usage,
|
|
n.uptime_seconds, n.last_seen_at, n.auto_updates_enabled, n.notes, n.created_at, n.updated_at
|
|
FROM nodes n
|
|
LEFT JOIN vm_groups g ON g.id = n.group_id
|
|
WHERE n.organization_id = ? AND n.tag = ?
|
|
ORDER BY n.name ASC
|
|
`, orgID, tag)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
return s.scanNodes(rows)
|
|
}
|
|
|
|
func (s *NodeService) scanNodes(rows *sql.Rows) ([]models.Node, error) {
|
|
var nodes []models.Node
|
|
for rows.Next() {
|
|
var node models.Node
|
|
if err := rows.Scan(
|
|
&node.ID, &node.OrganizationID, &node.GroupID, &node.GroupName, &node.Tag, &node.Name, &node.Distro, &node.Hostname,
|
|
&node.IPAddress, &node.MACAddress, &node.SSHPort, &node.SSHUsername, &node.SSHPassword,
|
|
&node.PackageManager, &node.Architecture, &node.HostModel, &node.KernelVersion, &node.CPUModel, &node.GPUModel, &node.DefaultShell, &node.PackageCount, &node.MemoryTotalMB, &node.DiskTotalGB,
|
|
&node.CPUUsage, &node.RAMUsage, &node.DiskUsage, &node.UptimeSeconds, &node.LastSeenAt,
|
|
&node.AutoUpdatesEnabled, &node.Notes, &node.CreatedAt, &node.UpdatedAt,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
if node.SSHPassword != "" {
|
|
if decrypted, err := s.crypto.Decrypt(node.SSHPassword); err == nil {
|
|
node.SSHPassword = decrypted
|
|
}
|
|
}
|
|
nodes = append(nodes, node)
|
|
}
|
|
|
|
return nodes, rows.Err()
|
|
}
|
|
|
|
func (s *NodeService) GetNode(ctx context.Context, orgID, nodeID int64) (*models.Node, error) {
|
|
node := &models.Node{}
|
|
err := s.db.QueryRowContext(ctx, `
|
|
SELECT n.id, n.organization_id, n.group_id, COALESCE(g.name, ''), n.tag, n.name, n.distro, n.hostname, n.ip_address, n.mac_address,
|
|
n.ssh_port, n.ssh_username, n.ssh_password, n.package_manager, n.architecture, n.host_model, n.kernel_version, n.cpu_model, n.gpu_model, n.default_shell, n.package_count, n.memory_total_mb, n.disk_total_gb,
|
|
n.cpu_usage, n.ram_usage, n.disk_usage,
|
|
n.uptime_seconds, n.last_seen_at, n.auto_updates_enabled, n.notes, n.created_at, n.updated_at
|
|
FROM nodes n
|
|
LEFT JOIN vm_groups g ON g.id = n.group_id
|
|
WHERE n.organization_id = ? AND n.id = ?
|
|
`, orgID, nodeID).Scan(
|
|
&node.ID, &node.OrganizationID, &node.GroupID, &node.GroupName, &node.Tag, &node.Name, &node.Distro, &node.Hostname,
|
|
&node.IPAddress, &node.MACAddress, &node.SSHPort, &node.SSHUsername, &node.SSHPassword,
|
|
&node.PackageManager, &node.Architecture, &node.HostModel, &node.KernelVersion, &node.CPUModel, &node.GPUModel, &node.DefaultShell, &node.PackageCount, &node.MemoryTotalMB, &node.DiskTotalGB,
|
|
&node.CPUUsage, &node.RAMUsage, &node.DiskUsage, &node.UptimeSeconds, &node.LastSeenAt,
|
|
&node.AutoUpdatesEnabled, &node.Notes, &node.CreatedAt, &node.UpdatedAt,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if node.SSHPassword != "" {
|
|
if decrypted, err := s.crypto.Decrypt(node.SSHPassword); err == nil {
|
|
node.SSHPassword = decrypted
|
|
}
|
|
}
|
|
|
|
return node, nil
|
|
}
|
|
|
|
func (s *NodeService) SaveNode(ctx context.Context, node *models.Node) error {
|
|
encryptedPassword := ""
|
|
if node.SSHPassword != "" {
|
|
token, err := s.crypto.Encrypt(node.SSHPassword)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
encryptedPassword = token
|
|
}
|
|
|
|
if node.ID == 0 {
|
|
result, err := s.db.ExecContext(ctx, `
|
|
INSERT INTO nodes (
|
|
organization_id, group_id, tag, name, distro, hostname, ip_address, mac_address,
|
|
ssh_port, ssh_username, ssh_password, package_manager, architecture, host_model, kernel_version, cpu_model, gpu_model, default_shell, package_count, memory_total_mb, disk_total_gb,
|
|
auto_updates_enabled, notes
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
`, node.OrganizationID, node.GroupID, node.Tag, node.Name, node.Distro, node.Hostname, node.IPAddress,
|
|
node.MACAddress, node.SSHPort, node.SSHUsername, encryptedPassword, node.PackageManager, node.Architecture, node.HostModel, node.KernelVersion, node.CPUModel, node.GPUModel, node.DefaultShell, node.PackageCount, node.MemoryTotalMB, node.DiskTotalGB, node.AutoUpdatesEnabled, node.Notes)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
node.ID, _ = result.LastInsertId()
|
|
return nil
|
|
}
|
|
|
|
_, err := s.db.ExecContext(ctx, `
|
|
UPDATE nodes
|
|
SET group_id = ?, tag = ?, name = ?, distro = ?, hostname = ?, ip_address = ?, mac_address = ?,
|
|
ssh_port = ?, ssh_username = ?, ssh_password = ?, package_manager = ?, architecture = ?, host_model = ?, kernel_version = ?, cpu_model = ?, gpu_model = ?, default_shell = ?, package_count = ?, memory_total_mb = ?, disk_total_gb = ?,
|
|
auto_updates_enabled = ?, notes = ?,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ? AND organization_id = ?
|
|
`, node.GroupID, node.Tag, node.Name, node.Distro, node.Hostname, node.IPAddress, node.MACAddress,
|
|
node.SSHPort, node.SSHUsername, encryptedPassword, node.PackageManager, node.Architecture, node.HostModel, node.KernelVersion, node.CPUModel, node.GPUModel, node.DefaultShell, node.PackageCount, node.MemoryTotalMB, node.DiskTotalGB, node.AutoUpdatesEnabled, node.Notes,
|
|
node.ID, node.OrganizationID)
|
|
return err
|
|
}
|
|
|
|
func (s *NodeService) EnsureUptimeMonitorForNode(ctx context.Context, node *models.Node) error {
|
|
target := fmt.Sprintf("%s:%d", strings.TrimSpace(node.IPAddress), node.SSHPort)
|
|
name := strings.TrimSpace(node.Name)
|
|
if name == "" {
|
|
name = target
|
|
}
|
|
|
|
_, err := s.db.ExecContext(ctx, `
|
|
INSERT INTO uptime_monitors (
|
|
organization_id, node_id, name, target, monitor_type, interval_seconds, enabled
|
|
) VALUES (?, ?, ?, ?, 'ssh', 60, 1)
|
|
ON CONFLICT(node_id) DO UPDATE SET
|
|
organization_id = excluded.organization_id,
|
|
name = excluded.name,
|
|
target = excluded.target,
|
|
interval_seconds = 60,
|
|
enabled = 1,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
`, node.OrganizationID, node.ID, name, target)
|
|
return err
|
|
}
|
|
|
|
func (s *NodeService) EnsureUptimeMonitors(ctx context.Context, orgID int64) error {
|
|
nodes, err := s.ListNodes(ctx, orgID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for i := range nodes {
|
|
if err := s.EnsureUptimeMonitorForNode(ctx, &nodes[i]); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *NodeService) DeleteNode(ctx context.Context, orgID, nodeID int64) error {
|
|
tx, err := s.db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
if _, err := tx.ExecContext(ctx, `DELETE FROM command_runs WHERE node_id = ?`, nodeID); err != nil {
|
|
return err
|
|
}
|
|
if _, err := tx.ExecContext(ctx, `DELETE FROM automation_jobs WHERE organization_id = ? AND node_id = ?`, orgID, nodeID); err != nil {
|
|
return err
|
|
}
|
|
result, err := tx.ExecContext(ctx, `DELETE FROM nodes WHERE organization_id = ? AND id = ?`, orgID, nodeID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
affected, err := result.RowsAffected()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if affected == 0 {
|
|
return sql.ErrNoRows
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
func (s *NodeService) RefreshNodeStats(ctx context.Context, node *models.Node) (string, error) {
|
|
const statsScript = `
|
|
read _ user nice system idle iowait irq softirq steal _ _ < /proc/stat
|
|
total1=$((user + nice + system + idle + iowait + irq + softirq + steal))
|
|
idle1=$((idle + iowait))
|
|
sleep 1
|
|
read _ user nice system idle iowait irq softirq steal _ _ < /proc/stat
|
|
total2=$((user + nice + system + idle + iowait + irq + softirq + steal))
|
|
idle2=$((idle + iowait))
|
|
diff_total=$((total2 - total1))
|
|
diff_idle=$((idle2 - idle1))
|
|
if [ "$diff_total" -gt 0 ]; then
|
|
cpu="$(awk -v total="$diff_total" -v idle="$diff_idle" 'BEGIN { printf "%.2f", (total - idle) * 100 / total }')"
|
|
else
|
|
cpu="0.00"
|
|
fi
|
|
ram="$(awk '
|
|
/MemTotal:/ { total=$2 }
|
|
/MemAvailable:/ { available=$2 }
|
|
END {
|
|
if (total > 0) {
|
|
printf "%.2f", ((total - available) * 100) / total
|
|
} else {
|
|
printf "0.00"
|
|
}
|
|
}
|
|
' /proc/meminfo)"
|
|
disk="$(df / --output=pcent 2>/dev/null | tail -1 | tr -dc '0-9')"
|
|
uptime="$(cut -d. -f1 /proc/uptime 2>/dev/null)"
|
|
echo "CPU=${cpu}"
|
|
echo "RAM=${ram}"
|
|
echo "DISK=${disk:-0}"
|
|
echo "UPTIME=${uptime:-0}"
|
|
`
|
|
|
|
output, err := s.RunSSHCommand(ctx, node, strings.TrimSpace(statsScript))
|
|
if err != nil {
|
|
s.logCommandRun(ctx, commandRunParams{
|
|
NodeID: node.ID,
|
|
Action: "refresh-stats",
|
|
CommandText: sanitizeCommand(statsScript),
|
|
Status: "failed",
|
|
Output: strings.TrimSpace(output + "\n" + err.Error()),
|
|
})
|
|
return "", err
|
|
}
|
|
|
|
stats := map[string]float64{}
|
|
for _, line := range strings.Split(output, "\n") {
|
|
line = strings.TrimSpace(line)
|
|
parts := strings.SplitN(line, "=", 2)
|
|
if len(parts) != 2 {
|
|
continue
|
|
}
|
|
switch parts[0] {
|
|
case "CPU", "RAM", "DISK", "UPTIME":
|
|
var value float64
|
|
fmt.Sscanf(parts[1], "%f", &value)
|
|
stats[parts[0]] = value
|
|
}
|
|
}
|
|
|
|
now := time.Now()
|
|
_, err = s.db.ExecContext(ctx, `
|
|
UPDATE nodes
|
|
SET cpu_usage = ?, ram_usage = ?, disk_usage = ?, uptime_seconds = ?, last_seen_at = ?, updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
`, stats["CPU"], stats["RAM"], stats["DISK"], int64(stats["UPTIME"]), now, node.ID)
|
|
if err != nil {
|
|
return output, err
|
|
}
|
|
|
|
node.CPUUsage = stats["CPU"]
|
|
node.RAMUsage = stats["RAM"]
|
|
node.DiskUsage = stats["DISK"]
|
|
node.UptimeSeconds = int64(stats["UPTIME"])
|
|
node.LastSeenAt = &now
|
|
s.logCommandRun(ctx, commandRunParams{
|
|
NodeID: node.ID,
|
|
Action: "refresh-stats",
|
|
CommandText: sanitizeCommand(statsScript),
|
|
Status: "completed",
|
|
Output: output,
|
|
})
|
|
|
|
return output, nil
|
|
}
|
|
|
|
func (s *NodeService) StatsStale(node *models.Node, maxAge time.Duration) bool {
|
|
if node.LastSeenAt == nil {
|
|
return true
|
|
}
|
|
return time.Since(*node.LastSeenAt) >= maxAge
|
|
}
|
|
|
|
func (s *NodeService) RefreshNodeInventory(ctx context.Context, node *models.Node) (string, error) {
|
|
output, err := s.RunSSHCommand(ctx, node, strings.Join([]string{
|
|
`if [ -r /etc/os-release ]; then . /etc/os-release; echo DISTRO="${PRETTY_NAME:-$ID}"; else echo DISTRO="$(uname -s)"; fi`,
|
|
`echo HOSTNAME="$(hostname 2>/dev/null || uname -n)"`,
|
|
`echo ARCH="$(uname -m 2>/dev/null)"`,
|
|
`HOST_MODEL="$(cat /sys/devices/virtual/dmi/id/product_name 2>/dev/null || hostnamectl 2>/dev/null | awk -F: '/Chassis|Hardware Model/ {gsub(/^[ \t]+/, "", $2); print $2; exit}')"; echo HOST_MODEL="${HOST_MODEL}"`,
|
|
`echo KERNEL="$(uname -r 2>/dev/null)"`,
|
|
`if command -v apt >/dev/null 2>&1; then echo PKG_MGR=apt; elif command -v dnf >/dev/null 2>&1; then echo PKG_MGR=dnf; elif command -v yum >/dev/null 2>&1; then echo PKG_MGR=yum; elif command -v pacman >/dev/null 2>&1; then echo PKG_MGR=pacman; elif command -v zypper >/dev/null 2>&1; then echo PKG_MGR=zypper; elif command -v apk >/dev/null 2>&1; then echo PKG_MGR=apk; elif command -v nix-env >/dev/null 2>&1; then echo PKG_MGR=nix; elif command -v emerge >/dev/null 2>&1; then echo PKG_MGR=emerge; else echo PKG_MGR=unknown; fi`,
|
|
`CPU_MODEL="$( (command -v lscpu >/dev/null 2>&1 && lscpu | awk -F: '/Model name/ {gsub(/^[ \t]+/, "", $2); print $2; exit}') || awk -F: '/model name/ {gsub(/^[ \t]+/, "", $2); print $2; exit}' /proc/cpuinfo )"; echo CPU_MODEL="${CPU_MODEL}"`,
|
|
`GPU_MODEL="$( (command -v lspci >/dev/null 2>&1 && lspci | awk -F': ' '/VGA compatible controller|3D controller|Display controller/ {print $2; exit}') || true )"; echo GPU_MODEL="${GPU_MODEL}"`,
|
|
`DEFAULT_SHELL="${SHELL:-$(getent passwd "$(id -un 2>/dev/null)" 2>/dev/null | cut -d: -f7)}"; echo DEFAULT_SHELL="${DEFAULT_SHELL}"`,
|
|
`PACKAGE_COUNT="$(
|
|
if command -v dpkg-query >/dev/null 2>&1; then dpkg-query -f '.' -W 2>/dev/null | wc -c;
|
|
elif command -v rpm >/dev/null 2>&1; then rpm -qa 2>/dev/null | wc -l;
|
|
elif command -v pacman >/dev/null 2>&1; then pacman -Qq 2>/dev/null | wc -l;
|
|
elif command -v apk >/dev/null 2>&1; then apk info 2>/dev/null | wc -l;
|
|
elif command -v nix-store >/dev/null 2>&1; then nix-store --gc --print-live 2>/dev/null | wc -l;
|
|
else printf 0; fi
|
|
)"; echo PACKAGE_COUNT="${PACKAGE_COUNT:-0}"`,
|
|
`MEMORY_MB="$(awk '/MemTotal/ {printf "%d", $2/1024}' /proc/meminfo 2>/dev/null)"; echo MEMORY_MB="${MEMORY_MB:-0}"`,
|
|
`DISK_GB="$(df -BG / 2>/dev/null | awk 'NR==2 {gsub(/G/, "", $2); print $2}')"; echo DISK_GB="${DISK_GB:-0}"`,
|
|
}, " ; "))
|
|
if err != nil {
|
|
s.logCommandRun(ctx, commandRunParams{
|
|
NodeID: node.ID,
|
|
Action: "refresh-inventory",
|
|
CommandText: sanitizeCommand(strings.Join([]string{
|
|
`read /etc/os-release, hostname, kernel, package manager, cpu, gpu, shell, package count, memory, disk`,
|
|
}, "")),
|
|
Status: "failed",
|
|
Output: strings.TrimSpace(output + "\n" + err.Error()),
|
|
})
|
|
return "", err
|
|
}
|
|
|
|
values := map[string]string{}
|
|
for _, line := range strings.Split(output, "\n") {
|
|
line = strings.TrimSpace(line)
|
|
parts := strings.SplitN(line, "=", 2)
|
|
if len(parts) != 2 {
|
|
continue
|
|
}
|
|
values[parts[0]] = strings.Trim(strings.TrimSpace(parts[1]), `"`)
|
|
}
|
|
|
|
var memoryMB int64
|
|
var diskGB int64
|
|
var packageCount int64
|
|
fmt.Sscanf(values["MEMORY_MB"], "%d", &memoryMB)
|
|
fmt.Sscanf(values["DISK_GB"], "%d", &diskGB)
|
|
fmt.Sscanf(values["PACKAGE_COUNT"], "%d", &packageCount)
|
|
|
|
node.Distro = fallbackInventoryValue(values["DISTRO"], node.Distro, "Linux")
|
|
node.Hostname = fallbackInventoryValue(values["HOSTNAME"], node.Hostname, node.IPAddress)
|
|
node.PackageManager = fallbackInventoryValue(values["PKG_MGR"], node.PackageManager, "")
|
|
node.Architecture = fallbackInventoryValue(values["ARCH"], node.Architecture, "")
|
|
node.HostModel = fallbackInventoryValue(values["HOST_MODEL"], node.HostModel, "")
|
|
node.KernelVersion = fallbackInventoryValue(values["KERNEL"], node.KernelVersion, "")
|
|
node.CPUModel = fallbackInventoryValue(values["CPU_MODEL"], node.CPUModel, "")
|
|
node.GPUModel = fallbackInventoryValue(values["GPU_MODEL"], node.GPUModel, "")
|
|
node.DefaultShell = fallbackInventoryValue(values["DEFAULT_SHELL"], node.DefaultShell, "")
|
|
if packageCount > 0 {
|
|
node.PackageCount = packageCount
|
|
}
|
|
if memoryMB > 0 {
|
|
node.MemoryTotalMB = memoryMB
|
|
}
|
|
if diskGB > 0 {
|
|
node.DiskTotalGB = diskGB
|
|
}
|
|
|
|
_, err = s.db.ExecContext(ctx, `
|
|
UPDATE nodes
|
|
SET distro = ?, hostname = ?, package_manager = ?, architecture = ?, host_model = ?, kernel_version = ?, cpu_model = ?, gpu_model = ?, default_shell = ?, package_count = ?, memory_total_mb = ?, disk_total_gb = ?,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ? AND organization_id = ?
|
|
`, node.Distro, node.Hostname, node.PackageManager, node.Architecture, node.HostModel, node.KernelVersion, node.CPUModel, node.GPUModel, node.DefaultShell, node.PackageCount, node.MemoryTotalMB, node.DiskTotalGB, node.ID, node.OrganizationID)
|
|
if err != nil {
|
|
return output, err
|
|
}
|
|
|
|
s.logCommandRun(ctx, commandRunParams{
|
|
NodeID: node.ID,
|
|
Action: "refresh-inventory",
|
|
CommandText: "inventory probe",
|
|
Status: "completed",
|
|
Output: output,
|
|
})
|
|
|
|
return output, nil
|
|
}
|
|
|
|
func fallbackInventoryValue(primary, current, fallback string) string {
|
|
if strings.TrimSpace(primary) != "" {
|
|
return strings.TrimSpace(primary)
|
|
}
|
|
if strings.TrimSpace(current) != "" {
|
|
return strings.TrimSpace(current)
|
|
}
|
|
return fallback
|
|
}
|
|
|
|
func (s *NodeService) RunSSHCommand(ctx context.Context, node *models.Node, command string) (string, error) {
|
|
config := &ssh.ClientConfig{
|
|
User: node.SSHUsername,
|
|
Auth: []ssh.AuthMethod{ssh.Password(node.SSHPassword)},
|
|
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
|
|
Timeout: 15 * time.Second,
|
|
}
|
|
|
|
client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", node.IPAddress, node.SSHPort), config)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer client.Close()
|
|
|
|
session, err := client.NewSession()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer session.Close()
|
|
|
|
output, err := session.CombinedOutput(command)
|
|
return string(output), err
|
|
}
|
|
|
|
func (s *NodeService) RunAction(ctx context.Context, node *models.Node, action string, userID *int64) (string, error) {
|
|
command := map[string]string{
|
|
"shutdown": "sudo shutdown -h now",
|
|
"restart": "sudo reboot",
|
|
"wake": "",
|
|
"refresh": "printf 'refresh requested'",
|
|
"apt-upgrade": "sudo apt update && sudo apt upgrade -y && sudo apt autoremove -y",
|
|
"apt-install": "sudo apt update",
|
|
}[action]
|
|
|
|
var output string
|
|
var err error
|
|
|
|
if action == "wake" {
|
|
err = sendMagicPacket(node.MACAddress)
|
|
output = "Wake-on-LAN packet sent"
|
|
} else if action == "refresh" {
|
|
output, err = s.RefreshNodeStats(ctx, node)
|
|
} else if command != "" {
|
|
output, err = s.RunSSHCommand(ctx, node, command)
|
|
} else {
|
|
err = fmt.Errorf("unknown action")
|
|
}
|
|
|
|
status := "completed"
|
|
if err != nil {
|
|
status = "failed"
|
|
}
|
|
|
|
s.logCommandRun(ctx, commandRunParams{
|
|
NodeID: node.ID,
|
|
Action: action,
|
|
CommandText: sanitizeCommand(command),
|
|
Status: status,
|
|
Output: output,
|
|
TriggeredBy: userID,
|
|
})
|
|
|
|
return output, err
|
|
}
|
|
|
|
func (s *NodeService) RunAdHocCommand(ctx context.Context, node *models.Node, label, command string, userID *int64) (string, error) {
|
|
output, err := s.RunSSHCommand(ctx, node, command)
|
|
status := "completed"
|
|
if err != nil {
|
|
status = "failed"
|
|
output = strings.TrimSpace(output + "\n" + err.Error())
|
|
}
|
|
|
|
now := time.Now()
|
|
s.logCommandRun(ctx, commandRunParams{
|
|
NodeID: node.ID,
|
|
Action: label,
|
|
CommandText: sanitizeCommand(command),
|
|
Status: status,
|
|
Output: output,
|
|
TriggeredBy: userID,
|
|
StartedAt: &now,
|
|
FinishedAt: &now,
|
|
})
|
|
|
|
return output, err
|
|
}
|
|
|
|
func (s *NodeService) ListAutomations(ctx context.Context, orgID int64) ([]models.AutomationJob, error) {
|
|
rows, err := s.db.QueryContext(ctx, `
|
|
SELECT j.id, j.organization_id, j.node_id, j.group_id, j.tag, j.name, j.trigger_type, j.schedule,
|
|
j.command, j.enabled, j.last_run_at, j.next_run_at, j.created_at,
|
|
COALESCE(n.name, ''), COALESCE(g.name, '')
|
|
FROM automation_jobs j
|
|
LEFT JOIN nodes n ON n.id = j.node_id
|
|
LEFT JOIN vm_groups g ON g.id = j.group_id
|
|
WHERE j.organization_id = ?
|
|
ORDER BY j.name ASC
|
|
`, orgID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var jobs []models.AutomationJob
|
|
for rows.Next() {
|
|
var job models.AutomationJob
|
|
if err := rows.Scan(
|
|
&job.ID, &job.OrganizationID, &job.NodeID, &job.GroupID, &job.Tag, &job.Name, &job.TriggerType,
|
|
&job.Schedule, &job.Command, &job.Enabled, &job.LastRunAt, &job.NextRunAt, &job.CreatedAt,
|
|
&job.NodeName, &job.GroupName,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
jobs = append(jobs, job)
|
|
}
|
|
|
|
return jobs, rows.Err()
|
|
}
|
|
|
|
func (s *NodeService) CreateAutomation(ctx context.Context, job *models.AutomationJob) error {
|
|
job.NextRunAt = nextRunForSchedule(job.Schedule)
|
|
_, err := s.db.ExecContext(ctx, `
|
|
INSERT INTO automation_jobs (organization_id, node_id, group_id, tag, name, trigger_type, schedule, command, enabled, next_run_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
`, job.OrganizationID, job.NodeID, job.GroupID, job.Tag, job.Name, job.TriggerType, job.Schedule, job.Command, job.Enabled, job.NextRunAt)
|
|
return err
|
|
}
|
|
|
|
func (s *NodeService) ListJobRuns(ctx context.Context, orgID int64) ([]models.CommandRun, error) {
|
|
rows, err := s.db.QueryContext(ctx, `
|
|
SELECT cr.id, cr.job_id, cr.node_id, cr.action, cr.command_text, cr.status, cr.output, cr.triggered_by,
|
|
cr.started_at, cr.finished_at, COALESCE(j.name, ''), COALESCE(n.name, ''), COALESCE(g.name, '')
|
|
FROM command_runs cr
|
|
LEFT JOIN automation_jobs j ON j.id = cr.job_id
|
|
LEFT JOIN nodes n ON n.id = cr.node_id
|
|
LEFT JOIN vm_groups g ON g.id = n.group_id
|
|
WHERE j.organization_id = ? OR (j.id IS NULL AND n.organization_id = ?)
|
|
ORDER BY cr.started_at DESC
|
|
LIMIT 50
|
|
`, orgID, orgID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var runs []models.CommandRun
|
|
for rows.Next() {
|
|
var run models.CommandRun
|
|
if err := rows.Scan(
|
|
&run.ID, &run.JobID, &run.NodeID, &run.Action, &run.CommandText, &run.Status, &run.Output, &run.TriggeredBy,
|
|
&run.StartedAt, &run.FinishedAt, &run.JobName, &run.NodeName, &run.GroupName,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
run.DurationText = formatDuration(run.StartedAt, run.FinishedAt)
|
|
runs = append(runs, run)
|
|
}
|
|
|
|
return runs, rows.Err()
|
|
}
|
|
|
|
func (s *NodeService) ListCommandHistory(ctx context.Context, orgID int64) ([]models.CommandRun, error) {
|
|
rows, err := s.db.QueryContext(ctx, `
|
|
SELECT cr.id, cr.job_id, cr.node_id, cr.action, cr.command_text, cr.status, cr.output, cr.triggered_by,
|
|
cr.started_at, cr.finished_at, COALESCE(j.name, ''), COALESCE(n.name, ''), COALESCE(g.name, '')
|
|
FROM command_runs cr
|
|
LEFT JOIN automation_jobs j ON j.id = cr.job_id
|
|
LEFT JOIN nodes n ON n.id = cr.node_id
|
|
LEFT JOIN vm_groups g ON g.id = n.group_id
|
|
WHERE n.organization_id = ? OR j.organization_id = ?
|
|
ORDER BY cr.started_at DESC
|
|
LIMIT 200
|
|
`, orgID, orgID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var runs []models.CommandRun
|
|
for rows.Next() {
|
|
var run models.CommandRun
|
|
if err := rows.Scan(
|
|
&run.ID, &run.JobID, &run.NodeID, &run.Action, &run.CommandText, &run.Status, &run.Output, &run.TriggeredBy,
|
|
&run.StartedAt, &run.FinishedAt, &run.JobName, &run.NodeName, &run.GroupName,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
run.DurationText = formatDuration(run.StartedAt, run.FinishedAt)
|
|
runs = append(runs, run)
|
|
}
|
|
return runs, rows.Err()
|
|
}
|
|
|
|
func (s *NodeService) ListUptimeMonitors(ctx context.Context, orgID int64) ([]models.UptimeMonitor, error) {
|
|
rows, err := s.db.QueryContext(ctx, `
|
|
SELECT m.id, m.organization_id, m.node_id, COALESCE(n.name, ''), COALESCE(g.name, ''),
|
|
m.name, m.target, m.monitor_type, m.interval_seconds, m.enabled, m.last_status,
|
|
m.last_latency_ms, m.last_checked_at, m.last_error, m.up_since_at, m.current_outage_started_at,
|
|
m.created_at, m.updated_at
|
|
FROM uptime_monitors m
|
|
LEFT JOIN nodes n ON n.id = m.node_id
|
|
LEFT JOIN vm_groups g ON g.id = n.group_id
|
|
WHERE m.organization_id = ?
|
|
ORDER BY m.name ASC
|
|
`, orgID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var monitors []models.UptimeMonitor
|
|
for rows.Next() {
|
|
var monitor models.UptimeMonitor
|
|
if err := rows.Scan(
|
|
&monitor.ID, &monitor.OrganizationID, &monitor.NodeID, &monitor.NodeName, &monitor.GroupName,
|
|
&monitor.Name, &monitor.Target, &monitor.MonitorType, &monitor.IntervalSeconds, &monitor.Enabled, &monitor.LastStatus,
|
|
&monitor.LastLatencyMS, &monitor.LastCheckedAt, &monitor.LastError, &monitor.UpSinceAt, &monitor.CurrentOutageStartedAt,
|
|
&monitor.CreatedAt, &monitor.UpdatedAt,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
monitors = append(monitors, monitor)
|
|
}
|
|
return monitors, rows.Err()
|
|
}
|
|
|
|
func (s *NodeService) ListRecentUptimeChecks(ctx context.Context, orgID int64, limitPerMonitor int) (map[int64][]models.UptimeCheck, error) {
|
|
if limitPerMonitor <= 0 {
|
|
limitPerMonitor = 24
|
|
}
|
|
|
|
rows, err := s.db.QueryContext(ctx, `
|
|
SELECT c.id, c.monitor_id, c.status, c.latency_ms, c.error_message, c.checked_at
|
|
FROM uptime_checks c
|
|
INNER JOIN uptime_monitors m ON m.id = c.monitor_id
|
|
WHERE m.organization_id = ?
|
|
ORDER BY c.checked_at DESC
|
|
LIMIT 500
|
|
`, orgID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
results := map[int64][]models.UptimeCheck{}
|
|
for rows.Next() {
|
|
var check models.UptimeCheck
|
|
if err := rows.Scan(&check.ID, &check.MonitorID, &check.Status, &check.LatencyMS, &check.ErrorMessage, &check.CheckedAt); err != nil {
|
|
return nil, err
|
|
}
|
|
if len(results[check.MonitorID]) >= limitPerMonitor {
|
|
continue
|
|
}
|
|
results[check.MonitorID] = append(results[check.MonitorID], check)
|
|
}
|
|
return results, rows.Err()
|
|
}
|
|
|
|
func (s *NodeService) ListRecentUptimeIncidents(ctx context.Context, orgID int64, limit int) ([]models.UptimeIncident, error) {
|
|
if limit <= 0 {
|
|
limit = 20
|
|
}
|
|
rows, err := s.db.QueryContext(ctx, `
|
|
SELECT i.id, i.monitor_id, COALESCE(m.name, ''), COALESCE(n.name, ''), COALESCE(g.name, ''),
|
|
i.error_message, i.started_at, i.ended_at
|
|
FROM uptime_incidents i
|
|
INNER JOIN uptime_monitors m ON m.id = i.monitor_id
|
|
LEFT JOIN nodes n ON n.id = m.node_id
|
|
LEFT JOIN vm_groups g ON g.id = n.group_id
|
|
WHERE m.organization_id = ?
|
|
ORDER BY i.started_at DESC
|
|
LIMIT ?
|
|
`, orgID, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var incidents []models.UptimeIncident
|
|
for rows.Next() {
|
|
var incident models.UptimeIncident
|
|
if err := rows.Scan(
|
|
&incident.ID, &incident.MonitorID, &incident.MonitorName, &incident.NodeName, &incident.GroupName,
|
|
&incident.ErrorMessage, &incident.StartedAt, &incident.EndedAt,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
incident.DurationSeconds = incidentDurationSeconds(incident.StartedAt, incident.EndedAt)
|
|
incident.DurationText = humanDuration(time.Duration(incident.DurationSeconds) * time.Second)
|
|
incidents = append(incidents, incident)
|
|
}
|
|
return incidents, rows.Err()
|
|
}
|
|
|
|
func (s *NodeService) UptimePeriodSummary(ctx context.Context, orgID int64, since *time.Time) (models.UptimePeriodSummary, error) {
|
|
var summary models.UptimePeriodSummary
|
|
args := []any{orgID}
|
|
filter := ""
|
|
if since != nil {
|
|
filter = " AND c.checked_at >= ?"
|
|
args = append(args, *since)
|
|
}
|
|
|
|
if err := s.db.QueryRowContext(ctx, `
|
|
SELECT
|
|
COUNT(*),
|
|
COALESCE(SUM(CASE WHEN c.status = 'up' THEN 1 ELSE 0 END), 0),
|
|
COALESCE(SUM(CASE WHEN c.status = 'down' THEN 1 ELSE 0 END), 0),
|
|
COALESCE(CAST(AVG(CASE WHEN c.status = 'up' THEN c.latency_ms END) AS INTEGER), 0),
|
|
COALESCE(SUM(CASE WHEN c.status = 'down' THEN m.interval_seconds ELSE 0 END), 0)
|
|
FROM uptime_checks c
|
|
INNER JOIN uptime_monitors m ON m.id = c.monitor_id
|
|
WHERE m.organization_id = ?`+filter, args...).Scan(
|
|
&summary.TotalChecks, &summary.UpChecks, &summary.DownChecks, &summary.AvgLatencyMS, &summary.DowntimeSeconds,
|
|
); err != nil {
|
|
return summary, err
|
|
}
|
|
|
|
incidentArgs := []any{orgID}
|
|
incidentFilter := ""
|
|
if since != nil {
|
|
incidentFilter = " AND i.started_at >= ?"
|
|
incidentArgs = append(incidentArgs, *since)
|
|
}
|
|
|
|
var longest sql.NullInt64
|
|
var avg sql.NullFloat64
|
|
if err := s.db.QueryRowContext(ctx, `
|
|
SELECT
|
|
COUNT(*),
|
|
MAX(CAST((strftime('%s', COALESCE(i.ended_at, CURRENT_TIMESTAMP)) - strftime('%s', i.started_at)) AS INTEGER)),
|
|
AVG(CAST((strftime('%s', COALESCE(i.ended_at, CURRENT_TIMESTAMP)) - strftime('%s', i.started_at)) AS INTEGER))
|
|
FROM uptime_incidents i
|
|
INNER JOIN uptime_monitors m ON m.id = i.monitor_id
|
|
WHERE m.organization_id = ?`+incidentFilter, incidentArgs...).Scan(
|
|
&summary.IncidentCount, &longest, &avg,
|
|
); err != nil {
|
|
return summary, err
|
|
}
|
|
if longest.Valid {
|
|
summary.LongestIncidentSeconds = longest.Int64
|
|
}
|
|
if avg.Valid {
|
|
summary.AvgIncidentSeconds = int64(avg.Float64)
|
|
}
|
|
return summary, nil
|
|
}
|
|
|
|
func (s *NodeService) RunAllUptimeChecks(ctx context.Context, orgID int64) error {
|
|
monitors, err := s.ListUptimeMonitors(ctx, orgID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for i := range monitors {
|
|
if !monitors[i].Enabled {
|
|
continue
|
|
}
|
|
_ = s.RunUptimeCheck(ctx, &monitors[i])
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *NodeService) RunUptimeCheck(ctx context.Context, monitor *models.UptimeMonitor) error {
|
|
target := strings.TrimSpace(monitor.Target)
|
|
if target == "" {
|
|
return fmt.Errorf("empty monitor target")
|
|
}
|
|
|
|
startedAt := time.Now()
|
|
timeout := 5 * time.Second
|
|
conn, err := net.DialTimeout("tcp", target, timeout)
|
|
latencyMS := int64(time.Since(startedAt).Milliseconds())
|
|
status := "up"
|
|
errorMessage := ""
|
|
if err != nil {
|
|
status = "down"
|
|
errorMessage = err.Error()
|
|
} else {
|
|
_ = conn.Close()
|
|
}
|
|
|
|
if latencyMS < 0 {
|
|
latencyMS = 0
|
|
}
|
|
if _, execErr := s.db.ExecContext(ctx, `
|
|
INSERT INTO uptime_checks (monitor_id, status, latency_ms, error_message, checked_at)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
`, monitor.ID, status, latencyMS, errorMessage, startedAt); execErr != nil {
|
|
return execErr
|
|
}
|
|
|
|
if status == "down" && monitor.LastStatus != "down" {
|
|
if _, execErr := s.db.ExecContext(ctx, `
|
|
INSERT INTO uptime_incidents (monitor_id, error_message, started_at)
|
|
VALUES (?, ?, ?)
|
|
`, monitor.ID, errorMessage, startedAt); execErr != nil {
|
|
return execErr
|
|
}
|
|
}
|
|
|
|
if status == "up" && monitor.LastStatus == "down" {
|
|
if _, execErr := s.db.ExecContext(ctx, `
|
|
UPDATE uptime_incidents
|
|
SET ended_at = ?
|
|
WHERE id = (
|
|
SELECT id
|
|
FROM uptime_incidents
|
|
WHERE monitor_id = ? AND ended_at IS NULL
|
|
ORDER BY started_at DESC
|
|
LIMIT 1
|
|
)
|
|
`, startedAt, monitor.ID); execErr != nil {
|
|
return execErr
|
|
}
|
|
}
|
|
|
|
var upSinceAt any
|
|
var outageStartedAt any
|
|
if status == "up" {
|
|
if monitor.LastStatus == "up" && monitor.UpSinceAt != nil {
|
|
upSinceAt = *monitor.UpSinceAt
|
|
} else {
|
|
upSinceAt = startedAt
|
|
}
|
|
outageStartedAt = nil
|
|
} else {
|
|
if monitor.LastStatus == "down" && monitor.CurrentOutageStartedAt != nil {
|
|
outageStartedAt = *monitor.CurrentOutageStartedAt
|
|
} else {
|
|
outageStartedAt = startedAt
|
|
}
|
|
upSinceAt = nil
|
|
}
|
|
|
|
_, err = s.db.ExecContext(ctx, `
|
|
UPDATE uptime_monitors
|
|
SET last_status = ?, last_latency_ms = ?, last_checked_at = ?, last_error = ?,
|
|
up_since_at = ?, current_outage_started_at = ?, updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
`, status, latencyMS, startedAt, errorMessage, upSinceAt, outageStartedAt, monitor.ID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
monitor.LastStatus = status
|
|
monitor.LastLatencyMS = latencyMS
|
|
monitor.LastCheckedAt = &startedAt
|
|
monitor.LastError = errorMessage
|
|
if status == "up" {
|
|
if upTime, ok := upSinceAt.(time.Time); ok {
|
|
monitor.UpSinceAt = &upTime
|
|
}
|
|
monitor.CurrentOutageStartedAt = nil
|
|
} else {
|
|
if downTime, ok := outageStartedAt.(time.Time); ok {
|
|
monitor.CurrentOutageStartedAt = &downTime
|
|
}
|
|
monitor.UpSinceAt = nil
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func sendMagicPacket(macAddress string) error {
|
|
hw, err := net.ParseMAC(macAddress)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
packet := make([]byte, 6+16*len(hw))
|
|
for i := 0; i < 6; i++ {
|
|
packet[i] = 0xFF
|
|
}
|
|
for i := 6; i < len(packet); i += len(hw) {
|
|
copy(packet[i:], hw)
|
|
}
|
|
|
|
conn, err := net.Dial("udp", "255.255.255.255:9")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer conn.Close()
|
|
|
|
_, err = conn.Write(packet)
|
|
return err
|
|
}
|
|
|
|
type SchedulerService struct {
|
|
cron *cron.Cron
|
|
nodeService *NodeService
|
|
db *sql.DB
|
|
}
|
|
|
|
func NewSchedulerService(database *sql.DB, nodeService *NodeService) *SchedulerService {
|
|
return &SchedulerService{
|
|
cron: cron.New(),
|
|
nodeService: nodeService,
|
|
db: database,
|
|
}
|
|
}
|
|
|
|
func (s *SchedulerService) Start(ctx context.Context, orgID int64, refreshSpec string) error {
|
|
if err := s.nodeService.EnsureUptimeMonitors(ctx, orgID); err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := s.cron.AddFunc(refreshSpec, func() {
|
|
nodes, err := s.nodeService.ListNodes(ctx, orgID)
|
|
if err != nil {
|
|
return
|
|
}
|
|
for i := range nodes {
|
|
if nodes[i].SSHUsername == "" || nodes[i].SSHPassword == "" {
|
|
continue
|
|
}
|
|
_, _ = s.nodeService.RefreshNodeStats(context.Background(), &nodes[i])
|
|
}
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
if _, err := s.cron.AddFunc("@every 1m", func() {
|
|
_ = s.nodeService.EnsureUptimeMonitors(context.Background(), orgID)
|
|
_ = s.nodeService.RunAllUptimeChecks(context.Background(), orgID)
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
jobs, err := s.nodeService.ListAutomations(ctx, orgID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, job := range jobs {
|
|
if !job.Enabled || job.Schedule == "" {
|
|
continue
|
|
}
|
|
job := job
|
|
_, _ = s.db.ExecContext(ctx, `UPDATE automation_jobs SET next_run_at = ? WHERE id = ?`, nextRunForSchedule(job.Schedule), job.ID)
|
|
_, err := s.cron.AddFunc(job.Schedule, func() {
|
|
s.runAutomation(context.Background(), job)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
s.cron.Start()
|
|
return nil
|
|
}
|
|
|
|
func (s *SchedulerService) runAutomation(ctx context.Context, job models.AutomationJob) {
|
|
targets, err := s.resolveTargets(ctx, job)
|
|
if err != nil || len(targets) == 0 {
|
|
return
|
|
}
|
|
|
|
lastRunAt := time.Now()
|
|
for _, node := range targets {
|
|
startedAt := time.Now()
|
|
output, runErr := s.nodeService.RunSSHCommand(ctx, &node, job.Command)
|
|
status := "completed"
|
|
if runErr != nil {
|
|
status = "failed"
|
|
output = strings.TrimSpace(output + "\n" + runErr.Error())
|
|
}
|
|
finishedAt := time.Now()
|
|
|
|
s.nodeService.logCommandRun(ctx, commandRunParams{
|
|
JobID: &job.ID,
|
|
NodeID: node.ID,
|
|
Action: job.Name,
|
|
CommandText: sanitizeCommand(job.Command),
|
|
Status: status,
|
|
Output: output,
|
|
StartedAt: &startedAt,
|
|
FinishedAt: &finishedAt,
|
|
})
|
|
|
|
lastRunAt = finishedAt
|
|
}
|
|
|
|
next := nextRunForSchedule(job.Schedule)
|
|
_, _ = s.db.ExecContext(ctx, `
|
|
UPDATE automation_jobs
|
|
SET last_run_at = ?, next_run_at = ?
|
|
WHERE id = ?
|
|
`, lastRunAt, next, job.ID)
|
|
}
|
|
|
|
func (s *SchedulerService) resolveTargets(ctx context.Context, job models.AutomationJob) ([]models.Node, error) {
|
|
if job.NodeID != nil {
|
|
node, err := s.nodeService.GetNode(ctx, job.OrganizationID, *job.NodeID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return []models.Node{*node}, nil
|
|
}
|
|
if job.GroupID != nil {
|
|
return s.nodeService.ListNodesByGroup(ctx, job.OrganizationID, *job.GroupID)
|
|
}
|
|
if strings.TrimSpace(job.Tag) != "" {
|
|
return s.nodeService.ListNodesByTag(ctx, job.OrganizationID, job.Tag)
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
func nextRunForSchedule(spec string) *time.Time {
|
|
if strings.TrimSpace(spec) == "" {
|
|
return nil
|
|
}
|
|
parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
|
|
schedule, err := parser.Parse(spec)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
next := schedule.Next(time.Now())
|
|
return &next
|
|
}
|
|
|
|
func formatDuration(startedAt time.Time, finishedAt *time.Time) string {
|
|
if finishedAt == nil {
|
|
return "running"
|
|
}
|
|
duration := finishedAt.Sub(startedAt).Round(time.Second)
|
|
if duration < time.Minute {
|
|
return duration.String()
|
|
}
|
|
hours := int(duration.Hours())
|
|
minutes := int(duration.Minutes()) % 60
|
|
seconds := int(duration.Seconds()) % 60
|
|
if hours > 0 {
|
|
return fmt.Sprintf("%dh %dm %ds", hours, minutes, seconds)
|
|
}
|
|
return fmt.Sprintf("%dm %ds", minutes, seconds)
|
|
}
|
|
|
|
type commandRunParams struct {
|
|
JobID *int64
|
|
NodeID int64
|
|
Action string
|
|
CommandText string
|
|
Status string
|
|
Output string
|
|
TriggeredBy *int64
|
|
StartedAt *time.Time
|
|
FinishedAt *time.Time
|
|
}
|
|
|
|
func (s *NodeService) logCommandRun(ctx context.Context, params commandRunParams) {
|
|
startedAt := time.Now()
|
|
if params.StartedAt != nil {
|
|
startedAt = *params.StartedAt
|
|
}
|
|
finishedAt := params.FinishedAt
|
|
if finishedAt == nil {
|
|
value := time.Now()
|
|
finishedAt = &value
|
|
}
|
|
|
|
_, _ = s.db.ExecContext(ctx, `
|
|
INSERT INTO command_runs (job_id, node_id, action, command_text, status, output, triggered_by, started_at, finished_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
`, params.JobID, params.NodeID, params.Action, params.CommandText, params.Status, params.Output, params.TriggeredBy, startedAt, finishedAt)
|
|
}
|
|
|
|
func sanitizeCommand(command string) string {
|
|
trimmed := strings.TrimSpace(command)
|
|
if trimmed == "" {
|
|
return ""
|
|
}
|
|
|
|
patterns := []*regexp.Regexp{
|
|
regexp.MustCompile(`(?i)(--password(?:=|\s+))(\S+)`),
|
|
regexp.MustCompile(`(?i)(--token(?:=|\s+))(\S+)`),
|
|
regexp.MustCompile(`(?i)(--secret(?:=|\s+))(\S+)`),
|
|
regexp.MustCompile(`(?i)\b(password|passwd|token|secret|api[_-]?key)\s*=\s*(['"]?)[^'"\s]+(['"]?)`),
|
|
}
|
|
|
|
sanitized := trimmed
|
|
for _, pattern := range patterns {
|
|
sanitized = pattern.ReplaceAllString(sanitized, `$1[REDACTED]`)
|
|
}
|
|
return sanitized
|
|
}
|
|
|
|
func incidentDurationSeconds(startedAt time.Time, endedAt *time.Time) int64 {
|
|
end := time.Now()
|
|
if endedAt != nil {
|
|
end = *endedAt
|
|
}
|
|
if end.Before(startedAt) {
|
|
return 0
|
|
}
|
|
return int64(end.Sub(startedAt).Seconds())
|
|
}
|
|
|
|
func humanDuration(duration time.Duration) string {
|
|
if duration < 0 {
|
|
duration = 0
|
|
}
|
|
duration = duration.Round(time.Second)
|
|
days := int(duration / (24 * time.Hour))
|
|
duration -= time.Duration(days) * 24 * time.Hour
|
|
hours := int(duration / time.Hour)
|
|
duration -= time.Duration(hours) * time.Hour
|
|
minutes := int(duration / time.Minute)
|
|
duration -= time.Duration(minutes) * time.Minute
|
|
seconds := int(duration / time.Second)
|
|
|
|
parts := make([]string, 0, 4)
|
|
if days > 0 {
|
|
parts = append(parts, fmt.Sprintf("%dd", days))
|
|
}
|
|
if hours > 0 {
|
|
parts = append(parts, fmt.Sprintf("%dh", hours))
|
|
}
|
|
if minutes > 0 {
|
|
parts = append(parts, fmt.Sprintf("%dm", minutes))
|
|
}
|
|
if seconds > 0 || len(parts) == 0 {
|
|
parts = append(parts, fmt.Sprintf("%ds", seconds))
|
|
}
|
|
if len(parts) > 2 {
|
|
parts = parts[:2]
|
|
}
|
|
return strings.Join(parts, " ")
|
|
}
|