Files
Maintainarr/internal/services/node.go

657 lines
22 KiB
Go

package services
import (
"context"
"database/sql"
"fmt"
"net"
"strings"
"time"
"github.com/robfig/cron/v3"
"golang.org/x/crypto/ssh"
"maintainarr/internal/models"
)
type NodeService struct {
db *sql.DB
crypto *CryptoService
}
func NewNodeService(database *sql.DB, crypto *CryptoService) *NodeService {
return &NodeService{db: database, crypto: crypto}
}
func (s *NodeService) ListNodes(ctx context.Context, orgID int64) ([]models.Node, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT n.id, n.organization_id, n.group_id, COALESCE(g.name, ''), n.tag, n.name, n.distro, n.hostname, n.ip_address, n.mac_address,
n.ssh_port, n.ssh_username, n.ssh_password, n.package_manager, n.architecture, n.kernel_version, n.cpu_model, n.memory_total_mb, n.disk_total_gb,
n.cpu_usage, n.ram_usage, n.disk_usage,
n.uptime_seconds, n.last_seen_at, n.auto_updates_enabled, n.notes, n.created_at, n.updated_at
FROM nodes n
LEFT JOIN vm_groups g ON g.id = n.group_id
WHERE n.organization_id = ?
ORDER BY n.name ASC
`, orgID)
if err != nil {
return nil, err
}
defer rows.Close()
return s.scanNodes(rows)
}
func (s *NodeService) ListNodesByGroup(ctx context.Context, orgID, groupID int64) ([]models.Node, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT n.id, n.organization_id, n.group_id, COALESCE(g.name, ''), n.tag, n.name, n.distro, n.hostname, n.ip_address, n.mac_address,
n.ssh_port, n.ssh_username, n.ssh_password, n.package_manager, n.architecture, n.kernel_version, n.cpu_model, n.memory_total_mb, n.disk_total_gb,
n.cpu_usage, n.ram_usage, n.disk_usage,
n.uptime_seconds, n.last_seen_at, n.auto_updates_enabled, n.notes, n.created_at, n.updated_at
FROM nodes n
LEFT JOIN vm_groups g ON g.id = n.group_id
WHERE n.organization_id = ? AND n.group_id = ?
ORDER BY n.name ASC
`, orgID, groupID)
if err != nil {
return nil, err
}
defer rows.Close()
return s.scanNodes(rows)
}
func (s *NodeService) ListNodesByTag(ctx context.Context, orgID int64, tag string) ([]models.Node, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT n.id, n.organization_id, n.group_id, COALESCE(g.name, ''), n.tag, n.name, n.distro, n.hostname, n.ip_address, n.mac_address,
n.ssh_port, n.ssh_username, n.ssh_password, n.package_manager, n.architecture, n.kernel_version, n.cpu_model, n.memory_total_mb, n.disk_total_gb,
n.cpu_usage, n.ram_usage, n.disk_usage,
n.uptime_seconds, n.last_seen_at, n.auto_updates_enabled, n.notes, n.created_at, n.updated_at
FROM nodes n
LEFT JOIN vm_groups g ON g.id = n.group_id
WHERE n.organization_id = ? AND n.tag = ?
ORDER BY n.name ASC
`, orgID, tag)
if err != nil {
return nil, err
}
defer rows.Close()
return s.scanNodes(rows)
}
func (s *NodeService) scanNodes(rows *sql.Rows) ([]models.Node, error) {
var nodes []models.Node
for rows.Next() {
var node models.Node
if err := rows.Scan(
&node.ID, &node.OrganizationID, &node.GroupID, &node.GroupName, &node.Tag, &node.Name, &node.Distro, &node.Hostname,
&node.IPAddress, &node.MACAddress, &node.SSHPort, &node.SSHUsername, &node.SSHPassword,
&node.PackageManager, &node.Architecture, &node.KernelVersion, &node.CPUModel, &node.MemoryTotalMB, &node.DiskTotalGB,
&node.CPUUsage, &node.RAMUsage, &node.DiskUsage, &node.UptimeSeconds, &node.LastSeenAt,
&node.AutoUpdatesEnabled, &node.Notes, &node.CreatedAt, &node.UpdatedAt,
); err != nil {
return nil, err
}
if node.SSHPassword != "" {
if decrypted, err := s.crypto.Decrypt(node.SSHPassword); err == nil {
node.SSHPassword = decrypted
}
}
nodes = append(nodes, node)
}
return nodes, rows.Err()
}
func (s *NodeService) GetNode(ctx context.Context, orgID, nodeID int64) (*models.Node, error) {
node := &models.Node{}
err := s.db.QueryRowContext(ctx, `
SELECT n.id, n.organization_id, n.group_id, COALESCE(g.name, ''), n.tag, n.name, n.distro, n.hostname, n.ip_address, n.mac_address,
n.ssh_port, n.ssh_username, n.ssh_password, n.package_manager, n.architecture, n.kernel_version, n.cpu_model, n.memory_total_mb, n.disk_total_gb,
n.cpu_usage, n.ram_usage, n.disk_usage,
n.uptime_seconds, n.last_seen_at, n.auto_updates_enabled, n.notes, n.created_at, n.updated_at
FROM nodes n
LEFT JOIN vm_groups g ON g.id = n.group_id
WHERE n.organization_id = ? AND n.id = ?
`, orgID, nodeID).Scan(
&node.ID, &node.OrganizationID, &node.GroupID, &node.GroupName, &node.Tag, &node.Name, &node.Distro, &node.Hostname,
&node.IPAddress, &node.MACAddress, &node.SSHPort, &node.SSHUsername, &node.SSHPassword,
&node.PackageManager, &node.Architecture, &node.KernelVersion, &node.CPUModel, &node.MemoryTotalMB, &node.DiskTotalGB,
&node.CPUUsage, &node.RAMUsage, &node.DiskUsage, &node.UptimeSeconds, &node.LastSeenAt,
&node.AutoUpdatesEnabled, &node.Notes, &node.CreatedAt, &node.UpdatedAt,
)
if err != nil {
return nil, err
}
if node.SSHPassword != "" {
if decrypted, err := s.crypto.Decrypt(node.SSHPassword); err == nil {
node.SSHPassword = decrypted
}
}
return node, nil
}
func (s *NodeService) SaveNode(ctx context.Context, node *models.Node) error {
encryptedPassword := ""
if node.SSHPassword != "" {
token, err := s.crypto.Encrypt(node.SSHPassword)
if err != nil {
return err
}
encryptedPassword = token
}
if node.ID == 0 {
result, err := s.db.ExecContext(ctx, `
INSERT INTO nodes (
organization_id, group_id, tag, name, distro, hostname, ip_address, mac_address,
ssh_port, ssh_username, ssh_password, package_manager, architecture, kernel_version, cpu_model, memory_total_mb, disk_total_gb,
auto_updates_enabled, notes
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, node.OrganizationID, node.GroupID, node.Tag, node.Name, node.Distro, node.Hostname, node.IPAddress,
node.MACAddress, node.SSHPort, node.SSHUsername, encryptedPassword, node.PackageManager, node.Architecture, node.KernelVersion, node.CPUModel, node.MemoryTotalMB, node.DiskTotalGB, node.AutoUpdatesEnabled, node.Notes)
if err != nil {
return err
}
node.ID, _ = result.LastInsertId()
return nil
}
_, err := s.db.ExecContext(ctx, `
UPDATE nodes
SET group_id = ?, tag = ?, name = ?, distro = ?, hostname = ?, ip_address = ?, mac_address = ?,
ssh_port = ?, ssh_username = ?, ssh_password = ?, package_manager = ?, architecture = ?, kernel_version = ?, cpu_model = ?, memory_total_mb = ?, disk_total_gb = ?,
auto_updates_enabled = ?, notes = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ? AND organization_id = ?
`, node.GroupID, node.Tag, node.Name, node.Distro, node.Hostname, node.IPAddress, node.MACAddress,
node.SSHPort, node.SSHUsername, encryptedPassword, node.PackageManager, node.Architecture, node.KernelVersion, node.CPUModel, node.MemoryTotalMB, node.DiskTotalGB, node.AutoUpdatesEnabled, node.Notes,
node.ID, node.OrganizationID)
return err
}
func (s *NodeService) DeleteNode(ctx context.Context, orgID, nodeID int64) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
if _, err := tx.ExecContext(ctx, `DELETE FROM command_runs WHERE node_id = ?`, nodeID); err != nil {
return err
}
if _, err := tx.ExecContext(ctx, `DELETE FROM automation_jobs WHERE organization_id = ? AND node_id = ?`, orgID, nodeID); err != nil {
return err
}
result, err := tx.ExecContext(ctx, `DELETE FROM nodes WHERE organization_id = ? AND id = ?`, orgID, nodeID)
if err != nil {
return err
}
affected, err := result.RowsAffected()
if err != nil {
return err
}
if affected == 0 {
return sql.ErrNoRows
}
return tx.Commit()
}
func (s *NodeService) RefreshNodeStats(ctx context.Context, node *models.Node) (string, error) {
const statsScript = `
read _ user nice system idle iowait irq softirq steal _ _ < /proc/stat
total1=$((user + nice + system + idle + iowait + irq + softirq + steal))
idle1=$((idle + iowait))
sleep 1
read _ user nice system idle iowait irq softirq steal _ _ < /proc/stat
total2=$((user + nice + system + idle + iowait + irq + softirq + steal))
idle2=$((idle + iowait))
diff_total=$((total2 - total1))
diff_idle=$((idle2 - idle1))
if [ "$diff_total" -gt 0 ]; then
cpu="$(awk -v total="$diff_total" -v idle="$diff_idle" 'BEGIN { printf "%.2f", (total - idle) * 100 / total }')"
else
cpu="0.00"
fi
ram="$(awk '
/MemTotal:/ { total=$2 }
/MemAvailable:/ { available=$2 }
END {
if (total > 0) {
printf "%.2f", ((total - available) * 100) / total
} else {
printf "0.00"
}
}
' /proc/meminfo)"
disk="$(df / --output=pcent 2>/dev/null | tail -1 | tr -dc '0-9')"
uptime="$(cut -d. -f1 /proc/uptime 2>/dev/null)"
echo "CPU=${cpu}"
echo "RAM=${ram}"
echo "DISK=${disk:-0}"
echo "UPTIME=${uptime:-0}"
`
output, err := s.RunSSHCommand(ctx, node, strings.TrimSpace(statsScript))
if err != nil {
return "", err
}
stats := map[string]float64{}
for _, line := range strings.Split(output, "\n") {
line = strings.TrimSpace(line)
parts := strings.SplitN(line, "=", 2)
if len(parts) != 2 {
continue
}
switch parts[0] {
case "CPU", "RAM", "DISK", "UPTIME":
var value float64
fmt.Sscanf(parts[1], "%f", &value)
stats[parts[0]] = value
}
}
now := time.Now()
_, err = s.db.ExecContext(ctx, `
UPDATE nodes
SET cpu_usage = ?, ram_usage = ?, disk_usage = ?, uptime_seconds = ?, last_seen_at = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
`, stats["CPU"], stats["RAM"], stats["DISK"], int64(stats["UPTIME"]), now, node.ID)
if err != nil {
return output, err
}
node.CPUUsage = stats["CPU"]
node.RAMUsage = stats["RAM"]
node.DiskUsage = stats["DISK"]
node.UptimeSeconds = int64(stats["UPTIME"])
node.LastSeenAt = &now
return output, nil
}
func (s *NodeService) StatsStale(node *models.Node, maxAge time.Duration) bool {
if node.LastSeenAt == nil {
return true
}
return time.Since(*node.LastSeenAt) >= maxAge
}
func (s *NodeService) RefreshNodeInventory(ctx context.Context, node *models.Node) (string, error) {
output, err := s.RunSSHCommand(ctx, node, strings.Join([]string{
`if [ -r /etc/os-release ]; then . /etc/os-release; echo DISTRO="${PRETTY_NAME:-$ID}"; else echo DISTRO="$(uname -s)"; fi`,
`echo HOSTNAME="$(hostname 2>/dev/null || uname -n)"`,
`echo ARCH="$(uname -m 2>/dev/null)"`,
`echo KERNEL="$(uname -r 2>/dev/null)"`,
`if command -v apt >/dev/null 2>&1; then echo PKG_MGR=apt; elif command -v dnf >/dev/null 2>&1; then echo PKG_MGR=dnf; elif command -v yum >/dev/null 2>&1; then echo PKG_MGR=yum; elif command -v pacman >/dev/null 2>&1; then echo PKG_MGR=pacman; elif command -v zypper >/dev/null 2>&1; then echo PKG_MGR=zypper; elif command -v apk >/dev/null 2>&1; then echo PKG_MGR=apk; elif command -v nix-env >/dev/null 2>&1; then echo PKG_MGR=nix; elif command -v emerge >/dev/null 2>&1; then echo PKG_MGR=emerge; else echo PKG_MGR=unknown; fi`,
`CPU_MODEL="$( (command -v lscpu >/dev/null 2>&1 && lscpu | awk -F: '/Model name/ {gsub(/^[ \t]+/, "", $2); print $2; exit}') || awk -F: '/model name/ {gsub(/^[ \t]+/, "", $2); print $2; exit}' /proc/cpuinfo )"; echo CPU_MODEL="${CPU_MODEL}"`,
`MEMORY_MB="$(awk '/MemTotal/ {printf "%d", $2/1024}' /proc/meminfo 2>/dev/null)"; echo MEMORY_MB="${MEMORY_MB:-0}"`,
`DISK_GB="$(df -BG / 2>/dev/null | awk 'NR==2 {gsub(/G/, "", $2); print $2}')"; echo DISK_GB="${DISK_GB:-0}"`,
}, " ; "))
if err != nil {
return "", err
}
values := map[string]string{}
for _, line := range strings.Split(output, "\n") {
line = strings.TrimSpace(line)
parts := strings.SplitN(line, "=", 2)
if len(parts) != 2 {
continue
}
values[parts[0]] = strings.Trim(strings.TrimSpace(parts[1]), `"`)
}
var memoryMB int64
var diskGB int64
fmt.Sscanf(values["MEMORY_MB"], "%d", &memoryMB)
fmt.Sscanf(values["DISK_GB"], "%d", &diskGB)
node.Distro = fallbackInventoryValue(values["DISTRO"], node.Distro, "Linux")
node.Hostname = fallbackInventoryValue(values["HOSTNAME"], node.Hostname, node.IPAddress)
node.PackageManager = fallbackInventoryValue(values["PKG_MGR"], node.PackageManager, "")
node.Architecture = fallbackInventoryValue(values["ARCH"], node.Architecture, "")
node.KernelVersion = fallbackInventoryValue(values["KERNEL"], node.KernelVersion, "")
node.CPUModel = fallbackInventoryValue(values["CPU_MODEL"], node.CPUModel, "")
if memoryMB > 0 {
node.MemoryTotalMB = memoryMB
}
if diskGB > 0 {
node.DiskTotalGB = diskGB
}
_, err = s.db.ExecContext(ctx, `
UPDATE nodes
SET distro = ?, hostname = ?, package_manager = ?, architecture = ?, kernel_version = ?, cpu_model = ?, memory_total_mb = ?, disk_total_gb = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ? AND organization_id = ?
`, node.Distro, node.Hostname, node.PackageManager, node.Architecture, node.KernelVersion, node.CPUModel, node.MemoryTotalMB, node.DiskTotalGB, node.ID, node.OrganizationID)
if err != nil {
return output, err
}
return output, nil
}
func fallbackInventoryValue(primary, current, fallback string) string {
if strings.TrimSpace(primary) != "" {
return strings.TrimSpace(primary)
}
if strings.TrimSpace(current) != "" {
return strings.TrimSpace(current)
}
return fallback
}
func (s *NodeService) RunSSHCommand(ctx context.Context, node *models.Node, command string) (string, error) {
config := &ssh.ClientConfig{
User: node.SSHUsername,
Auth: []ssh.AuthMethod{ssh.Password(node.SSHPassword)},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
Timeout: 15 * time.Second,
}
client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", node.IPAddress, node.SSHPort), config)
if err != nil {
return "", err
}
defer client.Close()
session, err := client.NewSession()
if err != nil {
return "", err
}
defer session.Close()
output, err := session.CombinedOutput(command)
return string(output), err
}
func (s *NodeService) RunAction(ctx context.Context, node *models.Node, action string, userID *int64) (string, error) {
command := map[string]string{
"shutdown": "sudo shutdown -h now",
"restart": "sudo reboot",
"wake": "",
"refresh": "printf 'refresh requested'",
"apt-upgrade": "sudo apt update && sudo apt upgrade -y && sudo apt autoremove -y",
"apt-install": "sudo apt update",
}[action]
var output string
var err error
if action == "wake" {
err = sendMagicPacket(node.MACAddress)
output = "Wake-on-LAN packet sent"
} else if action == "refresh" {
output, err = s.RefreshNodeStats(ctx, node)
} else if command != "" {
output, err = s.RunSSHCommand(ctx, node, command)
} else {
err = fmt.Errorf("unknown action")
}
status := "completed"
if err != nil {
status = "failed"
}
_, _ = s.db.ExecContext(ctx, `
INSERT INTO command_runs (node_id, action, status, output, triggered_by)
VALUES (?, ?, ?, ?, ?)
`, node.ID, action, status, output, userID)
return output, err
}
func (s *NodeService) RunAdHocCommand(ctx context.Context, node *models.Node, label, command string, userID *int64) (string, error) {
output, err := s.RunSSHCommand(ctx, node, command)
status := "completed"
if err != nil {
status = "failed"
output = strings.TrimSpace(output + "\n" + err.Error())
}
now := time.Now()
_, _ = s.db.ExecContext(ctx, `
INSERT INTO command_runs (node_id, action, status, output, triggered_by, started_at, finished_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
`, node.ID, label, status, output, userID, now, now)
return output, err
}
func (s *NodeService) ListAutomations(ctx context.Context, orgID int64) ([]models.AutomationJob, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT j.id, j.organization_id, j.node_id, j.group_id, j.tag, j.name, j.trigger_type, j.schedule,
j.command, j.enabled, j.last_run_at, j.next_run_at, j.created_at,
COALESCE(n.name, ''), COALESCE(g.name, '')
FROM automation_jobs j
LEFT JOIN nodes n ON n.id = j.node_id
LEFT JOIN vm_groups g ON g.id = j.group_id
WHERE j.organization_id = ?
ORDER BY j.name ASC
`, orgID)
if err != nil {
return nil, err
}
defer rows.Close()
var jobs []models.AutomationJob
for rows.Next() {
var job models.AutomationJob
if err := rows.Scan(
&job.ID, &job.OrganizationID, &job.NodeID, &job.GroupID, &job.Tag, &job.Name, &job.TriggerType,
&job.Schedule, &job.Command, &job.Enabled, &job.LastRunAt, &job.NextRunAt, &job.CreatedAt,
&job.NodeName, &job.GroupName,
); err != nil {
return nil, err
}
jobs = append(jobs, job)
}
return jobs, rows.Err()
}
func (s *NodeService) CreateAutomation(ctx context.Context, job *models.AutomationJob) error {
job.NextRunAt = nextRunForSchedule(job.Schedule)
_, err := s.db.ExecContext(ctx, `
INSERT INTO automation_jobs (organization_id, node_id, group_id, tag, name, trigger_type, schedule, command, enabled, next_run_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, job.OrganizationID, job.NodeID, job.GroupID, job.Tag, job.Name, job.TriggerType, job.Schedule, job.Command, job.Enabled, job.NextRunAt)
return err
}
func (s *NodeService) ListJobRuns(ctx context.Context, orgID int64) ([]models.CommandRun, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT cr.id, cr.job_id, cr.node_id, cr.action, cr.status, cr.output, cr.triggered_by,
cr.started_at, cr.finished_at, COALESCE(j.name, ''), COALESCE(n.name, '')
FROM command_runs cr
LEFT JOIN automation_jobs j ON j.id = cr.job_id
LEFT JOIN nodes n ON n.id = cr.node_id
WHERE j.organization_id = ? OR (j.id IS NULL AND n.organization_id = ?)
ORDER BY cr.started_at DESC
LIMIT 50
`, orgID, orgID)
if err != nil {
return nil, err
}
defer rows.Close()
var runs []models.CommandRun
for rows.Next() {
var run models.CommandRun
if err := rows.Scan(
&run.ID, &run.JobID, &run.NodeID, &run.Action, &run.Status, &run.Output, &run.TriggeredBy,
&run.StartedAt, &run.FinishedAt, &run.JobName, &run.NodeName,
); err != nil {
return nil, err
}
run.DurationText = formatDuration(run.StartedAt, run.FinishedAt)
runs = append(runs, run)
}
return runs, rows.Err()
}
func sendMagicPacket(macAddress string) error {
hw, err := net.ParseMAC(macAddress)
if err != nil {
return err
}
packet := make([]byte, 6+16*len(hw))
for i := 0; i < 6; i++ {
packet[i] = 0xFF
}
for i := 6; i < len(packet); i += len(hw) {
copy(packet[i:], hw)
}
conn, err := net.Dial("udp", "255.255.255.255:9")
if err != nil {
return err
}
defer conn.Close()
_, err = conn.Write(packet)
return err
}
type SchedulerService struct {
cron *cron.Cron
nodeService *NodeService
db *sql.DB
}
func NewSchedulerService(database *sql.DB, nodeService *NodeService) *SchedulerService {
return &SchedulerService{
cron: cron.New(),
nodeService: nodeService,
db: database,
}
}
func (s *SchedulerService) Start(ctx context.Context, orgID int64, refreshSpec string) error {
if _, err := s.cron.AddFunc(refreshSpec, func() {
nodes, err := s.nodeService.ListNodes(ctx, orgID)
if err != nil {
return
}
for i := range nodes {
if nodes[i].SSHUsername == "" || nodes[i].SSHPassword == "" {
continue
}
_, _ = s.nodeService.RefreshNodeStats(context.Background(), &nodes[i])
}
}); err != nil {
return err
}
jobs, err := s.nodeService.ListAutomations(ctx, orgID)
if err != nil {
return err
}
for _, job := range jobs {
if !job.Enabled || job.Schedule == "" {
continue
}
job := job
_, _ = s.db.ExecContext(ctx, `UPDATE automation_jobs SET next_run_at = ? WHERE id = ?`, nextRunForSchedule(job.Schedule), job.ID)
_, err := s.cron.AddFunc(job.Schedule, func() {
s.runAutomation(context.Background(), job)
})
if err != nil {
return err
}
}
s.cron.Start()
return nil
}
func (s *SchedulerService) runAutomation(ctx context.Context, job models.AutomationJob) {
targets, err := s.resolveTargets(ctx, job)
if err != nil || len(targets) == 0 {
return
}
lastRunAt := time.Now()
for _, node := range targets {
startedAt := time.Now()
output, runErr := s.nodeService.RunSSHCommand(ctx, &node, job.Command)
status := "completed"
if runErr != nil {
status = "failed"
output = strings.TrimSpace(output + "\n" + runErr.Error())
}
finishedAt := time.Now()
_, _ = s.db.ExecContext(ctx, `
INSERT INTO command_runs (job_id, node_id, action, status, output, started_at, finished_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
`, job.ID, node.ID, job.Name, status, output, startedAt, finishedAt)
lastRunAt = finishedAt
}
next := nextRunForSchedule(job.Schedule)
_, _ = s.db.ExecContext(ctx, `
UPDATE automation_jobs
SET last_run_at = ?, next_run_at = ?
WHERE id = ?
`, lastRunAt, next, job.ID)
}
func (s *SchedulerService) resolveTargets(ctx context.Context, job models.AutomationJob) ([]models.Node, error) {
if job.NodeID != nil {
node, err := s.nodeService.GetNode(ctx, job.OrganizationID, *job.NodeID)
if err != nil {
return nil, err
}
return []models.Node{*node}, nil
}
if job.GroupID != nil {
return s.nodeService.ListNodesByGroup(ctx, job.OrganizationID, *job.GroupID)
}
if strings.TrimSpace(job.Tag) != "" {
return s.nodeService.ListNodesByTag(ctx, job.OrganizationID, job.Tag)
}
return nil, nil
}
func nextRunForSchedule(spec string) *time.Time {
if strings.TrimSpace(spec) == "" {
return nil
}
parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
schedule, err := parser.Parse(spec)
if err != nil {
return nil
}
next := schedule.Next(time.Now())
return &next
}
func formatDuration(startedAt time.Time, finishedAt *time.Time) string {
if finishedAt == nil {
return "running"
}
duration := finishedAt.Sub(startedAt).Round(time.Second)
if duration < time.Minute {
return duration.String()
}
hours := int(duration.Hours())
minutes := int(duration.Minutes()) % 60
seconds := int(duration.Seconds()) % 60
if hours > 0 {
return fmt.Sprintf("%dh %dm %ds", hours, minutes, seconds)
}
return fmt.Sprintf("%dm %ds", minutes, seconds)
}