feat(nodes): add inventory stats and fleet operations

This commit is contained in:
2026-06-20 17:21:54 -05:00
parent 2fbe9c3b57
commit 22cf3a5836
2 changed files with 802 additions and 108 deletions

View File

@@ -4,9 +4,14 @@ import (
"context"
"database/sql"
"encoding/base64"
"encoding/json"
"fmt"
"html/template"
"io"
"net/http"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"time"
@@ -14,27 +19,32 @@ import (
"github.com/go-chi/chi/v5"
"github.com/gorilla/websocket"
"github.com/skip2/go-qrcode"
localmiddleware "github.com/spenc/maintainarr/internal/middleware"
"github.com/spenc/maintainarr/internal/models"
"github.com/spenc/maintainarr/internal/services"
"github.com/spenc/maintainarr/internal/views"
"golang.org/x/crypto/ssh"
localmiddleware "maintainarr/internal/middleware"
"maintainarr/internal/models"
"maintainarr/internal/services"
"maintainarr/internal/views"
)
type Handler struct {
repo *services.Repository
auth *services.AuthService
sessions *services.SessionService
nodes *services.NodeService
renderer *views.Renderer
org models.Organization
baseURL string
upgrader websocket.Upgrader
repo *services.Repository
auth *services.AuthService
sessions *services.SessionService
nodes *services.NodeService
renderer *views.Renderer
org models.Organization
baseURL string
upgrader websocket.Upgrader
}
type dashboardData struct {
Nodes []models.Node
Automations []models.AutomationJob
Groups []dashboardNodeGroup
Ungrouped []models.Node
}
type dashboardNodeGroup struct {
Name string
Nodes []models.Node
}
type nodePageData struct {
@@ -45,6 +55,16 @@ type nodePageData struct {
type settingsData struct {
ThemeVariables template.CSS
CurrentTheme string
CurrentMode string
}
type jobsPageData struct {
Jobs []models.AutomationJob
Nodes []models.Node
Groups []models.VMGroup
Tags []string
Runs []models.CommandRun
}
func New(repo *services.Repository, auth *services.AuthService, sessions *services.SessionService, nodes *services.NodeService, renderer *views.Renderer, org models.Organization, baseURL string) *Handler {
@@ -242,12 +262,87 @@ func (h *Handler) SetupOTP(w http.ResponseWriter, r *http.Request) {
func (h *Handler) Dashboard(w http.ResponseWriter, r *http.Request) {
user := localmiddleware.CurrentUser(r)
nodes, _ := h.nodes.ListNodes(r.Context(), h.org.ID)
automations, _ := h.nodes.ListAutomations(r.Context(), h.org.ID)
h.render(w, r, "dashboard", "Fleet Dashboard", dashboardData{
Nodes: nodes,
Automations: automations,
}, user)
h.render(w, r, "dashboard", "Fleet Dashboard", nil, user)
}
func (h *Handler) DashboardNodes(w http.ResponseWriter, r *http.Request) {
h.renderer.RenderPageTemplate(w, "dashboard", "dashboardNodes", views.ViewData{
User: localmiddleware.CurrentUser(r),
Content: h.dashboardData(r.Context()),
})
}
func (h *Handler) CreateNode(w http.ResponseWriter, r *http.Request) {
if err := r.ParseMultipartForm(10 << 20); err != nil {
http.Error(w, "invalid form", http.StatusBadRequest)
return
}
ip := strings.TrimSpace(r.FormValue("ip_address"))
username := strings.TrimSpace(r.FormValue("ssh_username"))
password := r.FormValue("ssh_password")
name := strings.TrimSpace(r.FormValue("name"))
if name == "" {
name = ip
}
if ip == "" || username == "" {
http.Error(w, "ip address and username are required", http.StatusBadRequest)
return
}
keyName, err := saveKeyUpload(r, "ssh_key")
if err != nil {
http.Error(w, "failed to save uploaded key", http.StatusInternalServerError)
return
}
if password == "" && keyName == "" {
http.Error(w, "provide a password or upload a key", http.StatusBadRequest)
return
}
notes := strings.TrimSpace(r.FormValue("notes"))
tag := strings.TrimSpace(r.FormValue("tag"))
var groupID *int64
if raw := strings.TrimSpace(r.FormValue("group_id")); raw != "" {
parsed, err := strconv.ParseInt(raw, 10, 64)
if err == nil {
groupID = &parsed
}
}
if keyName != "" {
if notes != "" {
notes += "\n"
}
notes += "Uploaded key: " + keyName
}
node := &models.Node{
OrganizationID: h.org.ID,
GroupID: groupID,
Tag: tag,
Name: name,
Distro: "Linux",
Hostname: ip,
IPAddress: ip,
SSHPort: 22,
SSHUsername: username,
SSHPassword: password,
AutoUpdatesEnabled: r.FormValue("auto_updates_enabled") == "on",
Notes: notes,
}
if err := h.nodes.SaveNode(r.Context(), node); err != nil {
http.Error(w, "failed to create node", http.StatusInternalServerError)
return
}
if node.SSHUsername != "" && node.SSHPassword != "" {
_, _ = h.nodes.RefreshNodeInventory(r.Context(), node)
_, _ = h.nodes.RefreshNodeStats(r.Context(), node)
}
http.Redirect(w, r, "/dashboard", http.StatusSeeOther)
}
func (h *Handler) NodeOverview(w http.ResponseWriter, r *http.Request) {
@@ -271,6 +366,33 @@ func (h *Handler) NodeOverview(w http.ResponseWriter, r *http.Request) {
}, user)
}
func (h *Handler) NodeStatsAPI(w http.ResponseWriter, r *http.Request) {
nodeID, err := strconv.ParseInt(chi.URLParam(r, "nodeID"), 10, 64)
if err != nil {
http.NotFound(w, r)
return
}
node, err := h.nodes.GetNode(r.Context(), h.org.ID, nodeID)
if err != nil {
http.NotFound(w, r)
return
}
if node.SSHUsername != "" && node.SSHPassword != "" && h.nodes.StatsStale(node, 5*time.Second) {
_, _ = h.nodes.RefreshNodeStats(r.Context(), node)
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]any{
"cpu_usage": node.CPUUsage,
"ram_usage": node.RAMUsage,
"disk_usage": node.DiskUsage,
"uptime_seconds": node.UptimeSeconds,
"last_seen_at": node.LastSeenAt,
})
}
func (h *Handler) NodeConsole(w http.ResponseWriter, r *http.Request) {
user := localmiddleware.CurrentUser(r)
nodeID, err := strconv.ParseInt(chi.URLParam(r, "nodeID"), 10, 64)
@@ -394,24 +516,117 @@ func (h *Handler) NodeAction(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "/nodes/"+strconv.FormatInt(nodeID, 10), http.StatusSeeOther)
}
func (h *Handler) DeleteNode(w http.ResponseWriter, r *http.Request) {
nodeID, err := strconv.ParseInt(chi.URLParam(r, "nodeID"), 10, 64)
if err != nil {
http.NotFound(w, r)
return
}
if err := h.nodes.DeleteNode(r.Context(), h.org.ID, nodeID); err != nil {
if err == sql.ErrNoRows {
http.NotFound(w, r)
return
}
http.Error(w, "failed to delete node", http.StatusInternalServerError)
return
}
http.Redirect(w, r, "/dashboard", http.StatusSeeOther)
}
func (h *Handler) NodeQuickCommand(w http.ResponseWriter, r *http.Request) {
nodeID, err := strconv.ParseInt(chi.URLParam(r, "nodeID"), 10, 64)
if err != nil {
http.NotFound(w, r)
return
}
node, err := h.nodes.GetNode(r.Context(), h.org.ID, nodeID)
if err != nil {
http.NotFound(w, r)
return
}
if err := r.ParseForm(); err != nil {
http.Error(w, "bad request", http.StatusBadRequest)
return
}
command := strings.TrimSpace(r.FormValue("command"))
label := strings.TrimSpace(r.FormValue("label"))
if command == "" {
http.Error(w, "command required", http.StatusBadRequest)
return
}
if label == "" {
label = "custom-command"
}
user := localmiddleware.CurrentUser(r)
var userID *int64
if user != nil {
userID = &user.ID
}
output, runErr := h.nodes.RunAdHocCommand(r.Context(), node, label, command, userID)
if runErr != nil {
http.Error(w, output, http.StatusBadGateway)
return
}
http.Redirect(w, r, "/nodes/"+strconv.FormatInt(nodeID, 10), http.StatusSeeOther)
}
func (h *Handler) GroupsPage(w http.ResponseWriter, r *http.Request) {
user := localmiddleware.CurrentUser(r)
nodes, _ := h.nodes.ListNodes(r.Context(), h.org.ID)
h.render(w, r, "groups", "VM Groups", map[string]any{"Nodes": nodes}, user)
}
func (h *Handler) CreateGroup(w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil {
http.Error(w, "bad request", http.StatusBadRequest)
return
}
name := strings.TrimSpace(r.FormValue("name"))
if name == "" {
http.Error(w, "group name required", http.StatusBadRequest)
return
}
group := &models.VMGroup{
OrganizationID: h.org.ID,
Name: name,
Description: strings.TrimSpace(r.FormValue("description")),
ColorToken: "primary",
}
if err := h.repo.CreateGroup(r.Context(), group); err != nil {
http.Error(w, "failed to create group", http.StatusInternalServerError)
return
}
http.Redirect(w, r, "/groups", http.StatusSeeOther)
}
func (h *Handler) AutomationsPage(w http.ResponseWriter, r *http.Request) {
user := localmiddleware.CurrentUser(r)
jobs, _ := h.nodes.ListAutomations(r.Context(), h.org.ID)
nodes, _ := h.nodes.ListNodes(r.Context(), h.org.ID)
h.render(w, r, "automations", "Automations", map[string]any{
"Jobs": jobs,
"Nodes": nodes,
groups, _ := h.repo.ListGroups(r.Context(), h.org.ID)
tags, _ := h.repo.ListTags(r.Context(), h.org.ID)
runs, _ := h.nodes.ListJobRuns(r.Context(), h.org.ID)
h.render(w, r, "automations", "Jobs", jobsPageData{
Jobs: jobs,
Nodes: nodes,
Groups: groups,
Tags: tags,
Runs: runs,
}, user)
}
func (h *Handler) CreateAutomation(w http.ResponseWriter, r *http.Request) {
user := localmiddleware.CurrentUser(r)
if err := r.ParseForm(); err != nil {
http.Error(w, "bad request", http.StatusBadRequest)
return
@@ -423,13 +638,61 @@ func (h *Handler) CreateAutomation(w http.ResponseWriter, r *http.Request) {
nodeID = &parsed
}
var groupID *int64
if raw := r.FormValue("group_id"); raw != "" {
parsed, _ := strconv.ParseInt(raw, 10, 64)
groupID = &parsed
}
tag := strings.TrimSpace(r.FormValue("tag"))
targetCount := 0
if nodeID != nil {
targetCount++
}
if groupID != nil {
targetCount++
}
if tag != "" {
targetCount++
}
if targetCount != 1 {
http.Error(w, "select exactly one target: node, group, or tag", http.StatusBadRequest)
return
}
commandSteps := r.Form["command_steps[]"]
var commands []string
for _, step := range commandSteps {
step = strings.TrimSpace(step)
if step != "" {
commands = append(commands, step)
}
}
if len(commands) == 0 {
http.Error(w, "at least one command is required", http.StatusBadRequest)
return
}
triggerType := r.FormValue("trigger_type")
schedule := buildSchedule(
triggerType,
r.FormValue("schedule_kind"),
r.FormValue("schedule_hour"),
r.FormValue("schedule_minute"),
r.FormValue("schedule_weekday"),
r.FormValue("interval_value"),
r.FormValue("interval_unit"),
)
job := &models.AutomationJob{
OrganizationID: h.org.ID,
NodeID: nodeID,
GroupID: groupID,
Tag: tag,
Name: r.FormValue("name"),
TriggerType: r.FormValue("trigger_type"),
Schedule: r.FormValue("schedule"),
Command: r.FormValue("command"),
TriggerType: triggerType,
Schedule: schedule,
Command: strings.Join(commands, "\n"),
Enabled: true,
}
if err := h.nodes.CreateAutomation(r.Context(), job); err != nil {
@@ -437,17 +700,40 @@ func (h *Handler) CreateAutomation(w http.ResponseWriter, r *http.Request) {
return
}
_ = user
http.Redirect(w, r, "/automations", http.StatusSeeOther)
}
func (h *Handler) SettingsPage(w http.ResponseWriter, r *http.Request) {
user := localmiddleware.CurrentUser(r)
org := h.currentOrganization(r.Context())
h.render(w, r, "settings", "Theme System", settingsData{
ThemeVariables: template.CSS(themePreview),
CurrentTheme: org.Theme,
CurrentMode: org.ThemeMode,
}, user)
}
func (h *Handler) UpdateTheme(w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil {
http.Error(w, "bad request", http.StatusBadRequest)
return
}
theme := strings.TrimSpace(r.FormValue("theme"))
mode := strings.TrimSpace(r.FormValue("mode"))
if !isAllowedTheme(theme) || !isAllowedMode(mode) {
http.Error(w, "invalid theme selection", http.StatusBadRequest)
return
}
if err := h.repo.UpdateOrganizationTheme(r.Context(), theme, mode); err != nil {
http.Error(w, "failed to update theme", http.StatusInternalServerError)
return
}
http.Redirect(w, r, "/settings", http.StatusSeeOther)
}
func (h *Handler) render(w http.ResponseWriter, r *http.Request, page, title string, content any, user ...*models.User) {
var currentUser *models.User
if len(user) > 0 {
@@ -457,17 +743,64 @@ func (h *Handler) render(w http.ResponseWriter, r *http.Request, page, title str
currentUser = localmiddleware.CurrentUser(r)
}
org := h.currentOrganization(r.Context())
groups, _ := h.repo.ListGroups(r.Context(), org.ID)
tags, _ := h.repo.ListTags(r.Context(), org.ID)
h.renderer.Render(w, page, views.ViewData{
Title: title,
Shell: shellForPage(page),
ThemeClass: h.org.Theme,
CurrentPath: r.URL.Path,
User: currentUser,
Organization: h.org,
Content: content,
Title: title,
Shell: shellForPage(page),
ThemeClass: org.Theme,
ThemeMode: org.ThemeMode,
CurrentPath: r.URL.Path,
User: currentUser,
Organization: org,
AvailableGroups: groups,
AvailableTags: tags,
Content: content,
})
}
func (h *Handler) dashboardData(ctx context.Context) dashboardData {
nodes, _ := h.nodes.ListNodes(ctx, h.org.ID)
grouped := make(map[string][]models.Node)
var ungrouped []models.Node
for _, node := range nodes {
if strings.TrimSpace(node.GroupName) == "" {
ungrouped = append(ungrouped, node)
continue
}
grouped[node.GroupName] = append(grouped[node.GroupName], node)
}
groupNames := make([]string, 0, len(grouped))
for name := range grouped {
groupNames = append(groupNames, name)
}
sort.Strings(groupNames)
groups := make([]dashboardNodeGroup, 0, len(groupNames))
for _, name := range groupNames {
groups = append(groups, dashboardNodeGroup{
Name: name,
Nodes: grouped[name],
})
}
return dashboardData{
Groups: groups,
Ungrouped: ungrouped,
}
}
func (h *Handler) currentOrganization(ctx context.Context) models.Organization {
org, err := h.repo.GetOrganization(ctx)
if err != nil {
return h.org
}
return org
}
func shellForPage(page string) string {
switch page {
case "login", "login_otp", "register":
@@ -530,3 +863,78 @@ const themePreview = `
--color-primary-900: 6 78 59;
--color-primary-950: 2 44 34;
}`
func buildSchedule(triggerType, kind, hour, minute, weekday, intervalValue, intervalUnit string) string {
if triggerType == "triggered" {
return ""
}
hour = defaultIfEmpty(hour, "0")
minute = defaultIfEmpty(minute, "0")
switch kind {
case "hourly":
return fmt.Sprintf("%s * * * *", minute)
case "weekly":
weekday = defaultIfEmpty(weekday, "1")
return fmt.Sprintf("%s %s * * %s", minute, hour, weekday)
case "interval":
if intervalUnit == "hours" {
value := defaultIfEmpty(intervalValue, "1")
return fmt.Sprintf("%s */%s * * *", minute, value)
}
value := defaultIfEmpty(intervalValue, "30")
return fmt.Sprintf("*/%s * * * *", value)
default:
return fmt.Sprintf("%s %s * * *", minute, hour)
}
}
func defaultIfEmpty(value, fallback string) string {
if strings.TrimSpace(value) == "" {
return fallback
}
return value
}
func isAllowedTheme(value string) bool {
switch value {
case "dark", "light", "green", "red", "blue":
return true
default:
return false
}
}
func isAllowedMode(value string) bool {
return value == "dark" || value == "light"
}
func saveKeyUpload(r *http.Request, field string) (string, error) {
file, header, err := r.FormFile(field)
if err != nil {
if err == http.ErrMissingFile {
return "", nil
}
return "", err
}
defer file.Close()
if err := os.MkdirAll(filepath.Join("data", "keys"), 0o755); err != nil {
return "", err
}
safeName := fmt.Sprintf("%d-%s", time.Now().UnixNano(), filepath.Base(header.Filename))
targetPath := filepath.Join("data", "keys", safeName)
target, err := os.Create(targetPath)
if err != nil {
return "", err
}
defer target.Close()
if _, err := io.Copy(target, file); err != nil {
return "", err
}
return safeName, nil
}

View File

@@ -9,8 +9,8 @@ import (
"time"
"github.com/robfig/cron/v3"
"github.com/spenc/maintainarr/internal/models"
"golang.org/x/crypto/ssh"
"maintainarr/internal/models"
)
type NodeService struct {
@@ -24,39 +24,80 @@ func NewNodeService(database *sql.DB, crypto *CryptoService) *NodeService {
func (s *NodeService) ListNodes(ctx context.Context, orgID int64) ([]models.Node, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT id, organization_id, group_id, name, distro, hostname, ip_address, mac_address,
ssh_port, ssh_username, ssh_password, cpu_usage, ram_usage, disk_usage,
uptime_seconds, last_seen_at, auto_updates_enabled, notes, created_at, updated_at
FROM nodes
WHERE organization_id = ?
ORDER BY name ASC
SELECT n.id, n.organization_id, n.group_id, COALESCE(g.name, ''), n.tag, n.name, n.distro, n.hostname, n.ip_address, n.mac_address,
n.ssh_port, n.ssh_username, n.ssh_password, n.package_manager, n.architecture, n.kernel_version, n.cpu_model, n.memory_total_mb, n.disk_total_gb,
n.cpu_usage, n.ram_usage, n.disk_usage,
n.uptime_seconds, n.last_seen_at, n.auto_updates_enabled, n.notes, n.created_at, n.updated_at
FROM nodes n
LEFT JOIN vm_groups g ON g.id = n.group_id
WHERE n.organization_id = ?
ORDER BY n.name ASC
`, orgID)
if err != nil {
return nil, err
}
defer rows.Close()
return s.scanNodes(rows)
}
func (s *NodeService) ListNodesByGroup(ctx context.Context, orgID, groupID int64) ([]models.Node, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT n.id, n.organization_id, n.group_id, COALESCE(g.name, ''), n.tag, n.name, n.distro, n.hostname, n.ip_address, n.mac_address,
n.ssh_port, n.ssh_username, n.ssh_password, n.package_manager, n.architecture, n.kernel_version, n.cpu_model, n.memory_total_mb, n.disk_total_gb,
n.cpu_usage, n.ram_usage, n.disk_usage,
n.uptime_seconds, n.last_seen_at, n.auto_updates_enabled, n.notes, n.created_at, n.updated_at
FROM nodes n
LEFT JOIN vm_groups g ON g.id = n.group_id
WHERE n.organization_id = ? AND n.group_id = ?
ORDER BY n.name ASC
`, orgID, groupID)
if err != nil {
return nil, err
}
defer rows.Close()
return s.scanNodes(rows)
}
func (s *NodeService) ListNodesByTag(ctx context.Context, orgID int64, tag string) ([]models.Node, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT n.id, n.organization_id, n.group_id, COALESCE(g.name, ''), n.tag, n.name, n.distro, n.hostname, n.ip_address, n.mac_address,
n.ssh_port, n.ssh_username, n.ssh_password, n.package_manager, n.architecture, n.kernel_version, n.cpu_model, n.memory_total_mb, n.disk_total_gb,
n.cpu_usage, n.ram_usage, n.disk_usage,
n.uptime_seconds, n.last_seen_at, n.auto_updates_enabled, n.notes, n.created_at, n.updated_at
FROM nodes n
LEFT JOIN vm_groups g ON g.id = n.group_id
WHERE n.organization_id = ? AND n.tag = ?
ORDER BY n.name ASC
`, orgID, tag)
if err != nil {
return nil, err
}
defer rows.Close()
return s.scanNodes(rows)
}
func (s *NodeService) scanNodes(rows *sql.Rows) ([]models.Node, error) {
var nodes []models.Node
for rows.Next() {
var node models.Node
if err := rows.Scan(
&node.ID, &node.OrganizationID, &node.GroupID, &node.Name, &node.Distro, &node.Hostname,
&node.ID, &node.OrganizationID, &node.GroupID, &node.GroupName, &node.Tag, &node.Name, &node.Distro, &node.Hostname,
&node.IPAddress, &node.MACAddress, &node.SSHPort, &node.SSHUsername, &node.SSHPassword,
&node.PackageManager, &node.Architecture, &node.KernelVersion, &node.CPUModel, &node.MemoryTotalMB, &node.DiskTotalGB,
&node.CPUUsage, &node.RAMUsage, &node.DiskUsage, &node.UptimeSeconds, &node.LastSeenAt,
&node.AutoUpdatesEnabled, &node.Notes, &node.CreatedAt, &node.UpdatedAt,
); err != nil {
return nil, err
}
nodes = append(nodes, node)
}
for i := range nodes {
if nodes[i].SSHPassword != "" {
decrypted, err := s.crypto.Decrypt(nodes[i].SSHPassword)
if err == nil {
nodes[i].SSHPassword = decrypted
if node.SSHPassword != "" {
if decrypted, err := s.crypto.Decrypt(node.SSHPassword); err == nil {
node.SSHPassword = decrypted
}
}
nodes = append(nodes, node)
}
return nodes, rows.Err()
@@ -65,14 +106,17 @@ func (s *NodeService) ListNodes(ctx context.Context, orgID int64) ([]models.Node
func (s *NodeService) GetNode(ctx context.Context, orgID, nodeID int64) (*models.Node, error) {
node := &models.Node{}
err := s.db.QueryRowContext(ctx, `
SELECT id, organization_id, group_id, name, distro, hostname, ip_address, mac_address,
ssh_port, ssh_username, ssh_password, cpu_usage, ram_usage, disk_usage,
uptime_seconds, last_seen_at, auto_updates_enabled, notes, created_at, updated_at
FROM nodes
WHERE organization_id = ? AND id = ?
SELECT n.id, n.organization_id, n.group_id, COALESCE(g.name, ''), n.tag, n.name, n.distro, n.hostname, n.ip_address, n.mac_address,
n.ssh_port, n.ssh_username, n.ssh_password, n.package_manager, n.architecture, n.kernel_version, n.cpu_model, n.memory_total_mb, n.disk_total_gb,
n.cpu_usage, n.ram_usage, n.disk_usage,
n.uptime_seconds, n.last_seen_at, n.auto_updates_enabled, n.notes, n.created_at, n.updated_at
FROM nodes n
LEFT JOIN vm_groups g ON g.id = n.group_id
WHERE n.organization_id = ? AND n.id = ?
`, orgID, nodeID).Scan(
&node.ID, &node.OrganizationID, &node.GroupID, &node.Name, &node.Distro, &node.Hostname,
&node.ID, &node.OrganizationID, &node.GroupID, &node.GroupName, &node.Tag, &node.Name, &node.Distro, &node.Hostname,
&node.IPAddress, &node.MACAddress, &node.SSHPort, &node.SSHUsername, &node.SSHPassword,
&node.PackageManager, &node.Architecture, &node.KernelVersion, &node.CPUModel, &node.MemoryTotalMB, &node.DiskTotalGB,
&node.CPUUsage, &node.RAMUsage, &node.DiskUsage, &node.UptimeSeconds, &node.LastSeenAt,
&node.AutoUpdatesEnabled, &node.Notes, &node.CreatedAt, &node.UpdatedAt,
)
@@ -81,8 +125,7 @@ func (s *NodeService) GetNode(ctx context.Context, orgID, nodeID int64) (*models
}
if node.SSHPassword != "" {
decrypted, err := s.crypto.Decrypt(node.SSHPassword)
if err == nil {
if decrypted, err := s.crypto.Decrypt(node.SSHPassword); err == nil {
node.SSHPassword = decrypted
}
}
@@ -101,35 +144,100 @@ func (s *NodeService) SaveNode(ctx context.Context, node *models.Node) error {
}
if node.ID == 0 {
_, err := s.db.ExecContext(ctx, `
result, err := s.db.ExecContext(ctx, `
INSERT INTO nodes (
organization_id, group_id, name, distro, hostname, ip_address, mac_address,
ssh_port, ssh_username, ssh_password, auto_updates_enabled, notes
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, node.OrganizationID, node.GroupID, node.Name, node.Distro, node.Hostname, node.IPAddress,
node.MACAddress, node.SSHPort, node.SSHUsername, encryptedPassword, node.AutoUpdatesEnabled, node.Notes)
return err
organization_id, group_id, tag, name, distro, hostname, ip_address, mac_address,
ssh_port, ssh_username, ssh_password, package_manager, architecture, kernel_version, cpu_model, memory_total_mb, disk_total_gb,
auto_updates_enabled, notes
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, node.OrganizationID, node.GroupID, node.Tag, node.Name, node.Distro, node.Hostname, node.IPAddress,
node.MACAddress, node.SSHPort, node.SSHUsername, encryptedPassword, node.PackageManager, node.Architecture, node.KernelVersion, node.CPUModel, node.MemoryTotalMB, node.DiskTotalGB, node.AutoUpdatesEnabled, node.Notes)
if err != nil {
return err
}
node.ID, _ = result.LastInsertId()
return nil
}
_, err := s.db.ExecContext(ctx, `
UPDATE nodes
SET group_id = ?, name = ?, distro = ?, hostname = ?, ip_address = ?, mac_address = ?,
ssh_port = ?, ssh_username = ?, ssh_password = ?, auto_updates_enabled = ?, notes = ?,
SET group_id = ?, tag = ?, name = ?, distro = ?, hostname = ?, ip_address = ?, mac_address = ?,
ssh_port = ?, ssh_username = ?, ssh_password = ?, package_manager = ?, architecture = ?, kernel_version = ?, cpu_model = ?, memory_total_mb = ?, disk_total_gb = ?,
auto_updates_enabled = ?, notes = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ? AND organization_id = ?
`, node.GroupID, node.Name, node.Distro, node.Hostname, node.IPAddress, node.MACAddress,
node.SSHPort, node.SSHUsername, encryptedPassword, node.AutoUpdatesEnabled, node.Notes,
`, node.GroupID, node.Tag, node.Name, node.Distro, node.Hostname, node.IPAddress, node.MACAddress,
node.SSHPort, node.SSHUsername, encryptedPassword, node.PackageManager, node.Architecture, node.KernelVersion, node.CPUModel, node.MemoryTotalMB, node.DiskTotalGB, node.AutoUpdatesEnabled, node.Notes,
node.ID, node.OrganizationID)
return err
}
func (s *NodeService) DeleteNode(ctx context.Context, orgID, nodeID int64) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
if _, err := tx.ExecContext(ctx, `DELETE FROM command_runs WHERE node_id = ?`, nodeID); err != nil {
return err
}
if _, err := tx.ExecContext(ctx, `DELETE FROM automation_jobs WHERE organization_id = ? AND node_id = ?`, orgID, nodeID); err != nil {
return err
}
result, err := tx.ExecContext(ctx, `DELETE FROM nodes WHERE organization_id = ? AND id = ?`, orgID, nodeID)
if err != nil {
return err
}
affected, err := result.RowsAffected()
if err != nil {
return err
}
if affected == 0 {
return sql.ErrNoRows
}
return tx.Commit()
}
func (s *NodeService) RefreshNodeStats(ctx context.Context, node *models.Node) (string, error) {
const statsScript = `
read _ user nice system idle iowait irq softirq steal _ _ < /proc/stat
total1=$((user + nice + system + idle + iowait + irq + softirq + steal))
idle1=$((idle + iowait))
sleep 1
read _ user nice system idle iowait irq softirq steal _ _ < /proc/stat
total2=$((user + nice + system + idle + iowait + irq + softirq + steal))
idle2=$((idle + iowait))
diff_total=$((total2 - total1))
diff_idle=$((idle2 - idle1))
if [ "$diff_total" -gt 0 ]; then
cpu="$(awk -v total="$diff_total" -v idle="$diff_idle" 'BEGIN { printf "%.2f", (total - idle) * 100 / total }')"
else
cpu="0.00"
fi
ram="$(awk '
/MemTotal:/ { total=$2 }
/MemAvailable:/ { available=$2 }
END {
if (total > 0) {
printf "%.2f", ((total - available) * 100) / total
} else {
printf "0.00"
}
}
' /proc/meminfo)"
disk="$(df / --output=pcent 2>/dev/null | tail -1 | tr -dc '0-9')"
uptime="$(cut -d. -f1 /proc/uptime 2>/dev/null)"
echo "CPU=${cpu}"
echo "RAM=${ram}"
echo "DISK=${disk:-0}"
echo "UPTIME=${uptime:-0}"
`
output, err := s.RunSSHCommand(ctx, node, strings.Join([]string{
`echo CPU=$(top -bn1 | awk '/Cpu\(s\)/ {print 100 - $8}')`,
`echo RAM=$(free | awk '/Mem:/ {printf("%.2f", $3/$2*100)}')`,
`echo DISK=$(df / --output=pcent | tail -1 | tr -dc '0-9')`,
`echo UPTIME=$(cut -d. -f1 /proc/uptime)`,
}, " ; "))
`sh -c ` + fmt.Sprintf("%q", statsScript),
}, ""))
if err != nil {
return "", err
}
@@ -168,6 +276,79 @@ func (s *NodeService) RefreshNodeStats(ctx context.Context, node *models.Node) (
return output, nil
}
func (s *NodeService) StatsStale(node *models.Node, maxAge time.Duration) bool {
if node.LastSeenAt == nil {
return true
}
return time.Since(*node.LastSeenAt) >= maxAge
}
func (s *NodeService) RefreshNodeInventory(ctx context.Context, node *models.Node) (string, error) {
output, err := s.RunSSHCommand(ctx, node, strings.Join([]string{
`if [ -r /etc/os-release ]; then . /etc/os-release; echo DISTRO="${PRETTY_NAME:-$ID}"; else echo DISTRO="$(uname -s)"; fi`,
`echo HOSTNAME="$(hostname 2>/dev/null || uname -n)"`,
`echo ARCH="$(uname -m 2>/dev/null)"`,
`echo KERNEL="$(uname -r 2>/dev/null)"`,
`if command -v apt >/dev/null 2>&1; then echo PKG_MGR=apt; elif command -v dnf >/dev/null 2>&1; then echo PKG_MGR=dnf; elif command -v yum >/dev/null 2>&1; then echo PKG_MGR=yum; elif command -v pacman >/dev/null 2>&1; then echo PKG_MGR=pacman; elif command -v zypper >/dev/null 2>&1; then echo PKG_MGR=zypper; elif command -v apk >/dev/null 2>&1; then echo PKG_MGR=apk; elif command -v nix-env >/dev/null 2>&1; then echo PKG_MGR=nix; elif command -v emerge >/dev/null 2>&1; then echo PKG_MGR=emerge; else echo PKG_MGR=unknown; fi`,
`CPU_MODEL="$( (command -v lscpu >/dev/null 2>&1 && lscpu | awk -F: '/Model name/ {gsub(/^[ \t]+/, "", $2); print $2; exit}') || awk -F: '/model name/ {gsub(/^[ \t]+/, "", $2); print $2; exit}' /proc/cpuinfo )"; echo CPU_MODEL="${CPU_MODEL}"`,
`MEMORY_MB="$(awk '/MemTotal/ {printf "%d", $2/1024}' /proc/meminfo 2>/dev/null)"; echo MEMORY_MB="${MEMORY_MB:-0}"`,
`DISK_GB="$(df -BG / 2>/dev/null | awk 'NR==2 {gsub(/G/, "", $2); print $2}')"; echo DISK_GB="${DISK_GB:-0}"`,
}, " ; "))
if err != nil {
return "", err
}
values := map[string]string{}
for _, line := range strings.Split(output, "\n") {
line = strings.TrimSpace(line)
parts := strings.SplitN(line, "=", 2)
if len(parts) != 2 {
continue
}
values[parts[0]] = strings.Trim(strings.TrimSpace(parts[1]), `"`)
}
var memoryMB int64
var diskGB int64
fmt.Sscanf(values["MEMORY_MB"], "%d", &memoryMB)
fmt.Sscanf(values["DISK_GB"], "%d", &diskGB)
node.Distro = fallbackInventoryValue(values["DISTRO"], node.Distro, "Linux")
node.Hostname = fallbackInventoryValue(values["HOSTNAME"], node.Hostname, node.IPAddress)
node.PackageManager = fallbackInventoryValue(values["PKG_MGR"], node.PackageManager, "")
node.Architecture = fallbackInventoryValue(values["ARCH"], node.Architecture, "")
node.KernelVersion = fallbackInventoryValue(values["KERNEL"], node.KernelVersion, "")
node.CPUModel = fallbackInventoryValue(values["CPU_MODEL"], node.CPUModel, "")
if memoryMB > 0 {
node.MemoryTotalMB = memoryMB
}
if diskGB > 0 {
node.DiskTotalGB = diskGB
}
_, err = s.db.ExecContext(ctx, `
UPDATE nodes
SET distro = ?, hostname = ?, package_manager = ?, architecture = ?, kernel_version = ?, cpu_model = ?, memory_total_mb = ?, disk_total_gb = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ? AND organization_id = ?
`, node.Distro, node.Hostname, node.PackageManager, node.Architecture, node.KernelVersion, node.CPUModel, node.MemoryTotalMB, node.DiskTotalGB, node.ID, node.OrganizationID)
if err != nil {
return output, err
}
return output, nil
}
func fallbackInventoryValue(primary, current, fallback string) string {
if strings.TrimSpace(primary) != "" {
return strings.TrimSpace(primary)
}
if strings.TrimSpace(current) != "" {
return strings.TrimSpace(current)
}
return fallback
}
func (s *NodeService) RunSSHCommand(ctx context.Context, node *models.Node, command string) (string, error) {
config := &ssh.ClientConfig{
User: node.SSHUsername,
@@ -194,12 +375,12 @@ func (s *NodeService) RunSSHCommand(ctx context.Context, node *models.Node, comm
func (s *NodeService) RunAction(ctx context.Context, node *models.Node, action string, userID *int64) (string, error) {
command := map[string]string{
"shutdown": "sudo shutdown -h now",
"restart": "sudo reboot",
"wake": "",
"refresh": "printf 'refresh requested'",
"apt-upgrade": "sudo apt update && sudo apt upgrade -y && sudo apt autoremove -y",
"apt-install": "sudo apt update",
"shutdown": "sudo shutdown -h now",
"restart": "sudo reboot",
"wake": "",
"refresh": "printf 'refresh requested'",
"apt-upgrade": "sudo apt update && sudo apt upgrade -y && sudo apt autoremove -y",
"apt-install": "sudo apt update",
}[action]
var output string
@@ -229,13 +410,33 @@ func (s *NodeService) RunAction(ctx context.Context, node *models.Node, action s
return output, err
}
func (s *NodeService) RunAdHocCommand(ctx context.Context, node *models.Node, label, command string, userID *int64) (string, error) {
output, err := s.RunSSHCommand(ctx, node, command)
status := "completed"
if err != nil {
status = "failed"
output = strings.TrimSpace(output + "\n" + err.Error())
}
now := time.Now()
_, _ = s.db.ExecContext(ctx, `
INSERT INTO command_runs (node_id, action, status, output, triggered_by, started_at, finished_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
`, node.ID, label, status, output, userID, now, now)
return output, err
}
func (s *NodeService) ListAutomations(ctx context.Context, orgID int64) ([]models.AutomationJob, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT id, organization_id, node_id, group_id, name, trigger_type, schedule,
command, enabled, last_run_at, next_run_at, created_at
FROM automation_jobs
WHERE organization_id = ?
ORDER BY name ASC
SELECT j.id, j.organization_id, j.node_id, j.group_id, j.tag, j.name, j.trigger_type, j.schedule,
j.command, j.enabled, j.last_run_at, j.next_run_at, j.created_at,
COALESCE(n.name, ''), COALESCE(g.name, '')
FROM automation_jobs j
LEFT JOIN nodes n ON n.id = j.node_id
LEFT JOIN vm_groups g ON g.id = j.group_id
WHERE j.organization_id = ?
ORDER BY j.name ASC
`, orgID)
if err != nil {
return nil, err
@@ -246,8 +447,9 @@ func (s *NodeService) ListAutomations(ctx context.Context, orgID int64) ([]model
for rows.Next() {
var job models.AutomationJob
if err := rows.Scan(
&job.ID, &job.OrganizationID, &job.NodeID, &job.GroupID, &job.Name, &job.TriggerType,
&job.ID, &job.OrganizationID, &job.NodeID, &job.GroupID, &job.Tag, &job.Name, &job.TriggerType,
&job.Schedule, &job.Command, &job.Enabled, &job.LastRunAt, &job.NextRunAt, &job.CreatedAt,
&job.NodeName, &job.GroupName,
); err != nil {
return nil, err
}
@@ -258,13 +460,46 @@ func (s *NodeService) ListAutomations(ctx context.Context, orgID int64) ([]model
}
func (s *NodeService) CreateAutomation(ctx context.Context, job *models.AutomationJob) error {
job.NextRunAt = nextRunForSchedule(job.Schedule)
_, err := s.db.ExecContext(ctx, `
INSERT INTO automation_jobs (organization_id, node_id, group_id, name, trigger_type, schedule, command, enabled)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
`, job.OrganizationID, job.NodeID, job.GroupID, job.Name, job.TriggerType, job.Schedule, job.Command, job.Enabled)
INSERT INTO automation_jobs (organization_id, node_id, group_id, tag, name, trigger_type, schedule, command, enabled, next_run_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, job.OrganizationID, job.NodeID, job.GroupID, job.Tag, job.Name, job.TriggerType, job.Schedule, job.Command, job.Enabled, job.NextRunAt)
return err
}
func (s *NodeService) ListJobRuns(ctx context.Context, orgID int64) ([]models.CommandRun, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT cr.id, cr.job_id, cr.node_id, cr.action, cr.status, cr.output, cr.triggered_by,
cr.started_at, cr.finished_at, COALESCE(j.name, ''), COALESCE(n.name, '')
FROM command_runs cr
LEFT JOIN automation_jobs j ON j.id = cr.job_id
LEFT JOIN nodes n ON n.id = cr.node_id
WHERE j.organization_id = ? OR (j.id IS NULL AND n.organization_id = ?)
ORDER BY cr.started_at DESC
LIMIT 50
`, orgID, orgID)
if err != nil {
return nil, err
}
defer rows.Close()
var runs []models.CommandRun
for rows.Next() {
var run models.CommandRun
if err := rows.Scan(
&run.ID, &run.JobID, &run.NodeID, &run.Action, &run.Status, &run.Output, &run.TriggeredBy,
&run.StartedAt, &run.FinishedAt, &run.JobName, &run.NodeName,
); err != nil {
return nil, err
}
run.DurationText = formatDuration(run.StartedAt, run.FinishedAt)
runs = append(runs, run)
}
return runs, rows.Err()
}
func sendMagicPacket(macAddress string) error {
hw, err := net.ParseMAC(macAddress)
if err != nil {
@@ -329,6 +564,7 @@ func (s *SchedulerService) Start(ctx context.Context, orgID int64, refreshSpec s
continue
}
job := job
_, _ = s.db.ExecContext(ctx, `UPDATE automation_jobs SET next_run_at = ? WHERE id = ?`, nextRunForSchedule(job.Schedule), job.ID)
_, err := s.cron.AddFunc(job.Schedule, func() {
s.runAutomation(context.Background(), job)
})
@@ -342,31 +578,81 @@ func (s *SchedulerService) Start(ctx context.Context, orgID int64, refreshSpec s
}
func (s *SchedulerService) runAutomation(ctx context.Context, job models.AutomationJob) {
if job.NodeID == nil {
targets, err := s.resolveTargets(ctx, job)
if err != nil || len(targets) == 0 {
return
}
node, err := s.nodeService.GetNode(ctx, job.OrganizationID, *job.NodeID)
if err != nil {
return
lastRunAt := time.Now()
for _, node := range targets {
startedAt := time.Now()
output, runErr := s.nodeService.RunSSHCommand(ctx, &node, job.Command)
status := "completed"
if runErr != nil {
status = "failed"
output = strings.TrimSpace(output + "\n" + runErr.Error())
}
finishedAt := time.Now()
_, _ = s.db.ExecContext(ctx, `
INSERT INTO command_runs (job_id, node_id, action, status, output, started_at, finished_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
`, job.ID, node.ID, job.Name, status, output, startedAt, finishedAt)
lastRunAt = finishedAt
}
output, runErr := s.nodeService.RunSSHCommand(ctx, node, job.Command)
status := "completed"
if runErr != nil {
status = "failed"
}
now := time.Now()
next := s.cron.Entry(s.cron.Entries()[len(s.cron.Entries())-1].ID).Next
next := nextRunForSchedule(job.Schedule)
_, _ = s.db.ExecContext(ctx, `
UPDATE automation_jobs
SET last_run_at = ?, next_run_at = ?
WHERE id = ?
`, now, next, job.ID)
_, _ = s.db.ExecContext(ctx, `
INSERT INTO command_runs (job_id, node_id, action, status, output)
VALUES (?, ?, ?, ?, ?)
`, job.ID, node.ID, job.Name, status, output)
`, lastRunAt, next, job.ID)
}
func (s *SchedulerService) resolveTargets(ctx context.Context, job models.AutomationJob) ([]models.Node, error) {
if job.NodeID != nil {
node, err := s.nodeService.GetNode(ctx, job.OrganizationID, *job.NodeID)
if err != nil {
return nil, err
}
return []models.Node{*node}, nil
}
if job.GroupID != nil {
return s.nodeService.ListNodesByGroup(ctx, job.OrganizationID, *job.GroupID)
}
if strings.TrimSpace(job.Tag) != "" {
return s.nodeService.ListNodesByTag(ctx, job.OrganizationID, job.Tag)
}
return nil, nil
}
func nextRunForSchedule(spec string) *time.Time {
if strings.TrimSpace(spec) == "" {
return nil
}
parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
schedule, err := parser.Parse(spec)
if err != nil {
return nil
}
next := schedule.Next(time.Now())
return &next
}
func formatDuration(startedAt time.Time, finishedAt *time.Time) string {
if finishedAt == nil {
return "running"
}
duration := finishedAt.Sub(startedAt).Round(time.Second)
if duration < time.Minute {
return duration.String()
}
hours := int(duration.Hours())
minutes := int(duration.Minutes()) % 60
seconds := int(duration.Seconds()) % 60
if hours > 0 {
return fmt.Sprintf("%dh %dm %ds", hours, minutes, seconds)
}
return fmt.Sprintf("%dm %ds", minutes, seconds)
}