#!/usr/bin/env python3 import os import sys import json import shutil import logging import datetime import subprocess import re # --- CONFIGURATION --- FILE_PATH = "/var/www/invoiceninja/app/Models/Account.php" BACKUP_PATH = "/var/www/invoiceninja/app/Models/Account.php.bak" LOG_FILE = "/var/log/ninja_patcher.log" APP_DIR = "/var/www/invoiceninja" # --- ANSI TERMINAL COLORS --- CLR_RESET = "\033[0m" CLR_BOLD = "\033[1m" CLR_GREEN = "\033[32m" CLR_BLUE = "\033[34m" CLR_YELLOW = "\033[33m" CLR_RED = "\033[31m" CLR_CYAN = "\033[36m" CLR_MAGENTA = "\033[35m" CLR_GRAY = "\033[90m" # --- LOGGING SETUP --- logger = logging.getLogger("NinjaPatcher") logger.setLevel(logging.DEBUG) class SimpleConsoleFormatter(logging.Formatter): def format(self, record): timestamp = datetime.datetime.now().strftime("%H:%M:%S") prefix = f"{CLR_GRAY}[{timestamp}]{CLR_RESET}" if record.levelno == logging.INFO: return f"{prefix} {CLR_GREEN}✔{CLR_RESET} {record.getMessage()}" elif record.levelno == logging.DEBUG: return f"{prefix} {CLR_BLUE}➔{CLR_RESET} {record.getMessage()}" elif record.levelno == logging.WARNING: return f"{prefix} {CLR_YELLOW}⚠{CLR_RESET} {CLR_YELLOW}{record.getMessage()}{CLR_RESET}" elif record.levelno == logging.ERROR: return f"{prefix} {CLR_RED}✖{CLR_RESET} {CLR_BOLD}{CLR_RED}{record.getMessage()}{CLR_RESET}" return f"{prefix} {record.getMessage()}" c_handler = logging.StreamHandler(sys.stdout) c_handler.setLevel(logging.INFO) # Keeps the console output to clean info lines c_handler.setFormatter(SimpleConsoleFormatter()) logger.addHandler(c_handler) f_handler = logging.FileHandler(LOG_FILE) f_handler.setLevel(logging.DEBUG) f_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')) logger.addHandler(f_handler) # --- TARGETS --- TARGET_CLASS = "class Account extends BaseModel" PD_SIG = "public function getPlanDetails($include_inactive = false, $include_trial = true)" HF_SIG = "public function hasFeature($feature)" MOCK_PLAN_DETAILS = """{ return [ 'account_id' => $this->id, 'num_users' => 100, 'plan_price' => 0, 'trial' => false, 'plan' => self::PLAN_ENTERPRISE, 'started' => now(), 'expires' => false, 'paid' => now(), 'term' => 'yearly', 'active' => true, ]; }""" MOCK_HAS_FEATURE = """{ return true; }""" class StageValidationError(Exception): """Custom Exception""" def highlight_php_code(code_str): highlighted = code_str keywords = ["public", "function", "class", "extends", "return", "self", "false", "true"] for kw in keywords: highlighted = re.sub(rf"\b{kw}\b", f"{CLR_MAGENTA}{kw}{CLR_RESET}", highlighted) highlighted = re.sub(r"(\$[a-zA-Z_][a-zA-Z0-9_]*)", f"{CLR_CYAN}\\1{CLR_RESET}", highlighted) highlighted = highlighted.replace("->", f"{CLR_YELLOW}->{CLR_RESET}") highlighted = highlighted.replace("=>", f"{CLR_YELLOW}=>{CLR_RESET}") highlighted = re.sub(r"\b([A-Z_][A-Z0-9_]{3,})\b", f"{CLR_BLUE}\\1{CLR_RESET}", highlighted) return highlighted def get_line_number(content, char_index): return content.count("\n", 0, char_index) + 1 def extract_functional_scope(content, signature): sig_idx = content.find(signature) if sig_idx == -1: raise StageValidationError(f"Could not find function: '{signature}'") line_num = get_line_number(content, sig_idx) formatted_sig = highlight_php_code(signature) logger.info(f"Found function at line {line_num}:\n {formatted_sig}") open_brace_idx = content.find("{", sig_idx + len(signature)) if open_brace_idx == -1: raise StageValidationError(f"Missing opening brace for function at line {line_num}") bracket_balance = 0 close_brace_idx = -1 for idx in range(open_brace_idx, len(content)): if content[idx] == "{": bracket_balance += 1 elif content[idx] == "}": bracket_balance -= 1 if bracket_balance == 0: close_brace_idx = idx break if close_brace_idx == -1: raise StageValidationError(f"Mismatched braces in function at line {line_num}") return open_brace_idx, close_brace_idx def execute_pipeline(): print(f"\n{CLR_BOLD}{CLR_CYAN}======================================================================{CLR_RESET}") print(f"{CLR_BOLD}{CLR_CYAN} INVOICE NINJA PATCHER {CLR_RESET}") print(f"{CLR_BOLD}{CLR_CYAN}======================================================================{CLR_RESET}") # STAGE 1 logger.info("Checking environment and permissions...") if os.getuid() != 0: logger.error("Must be run as root.") sys.exit(1) if not os.path.exists(FILE_PATH): logger.error(f"File missing: {FILE_PATH}") sys.exit(1) with open(FILE_PATH, "r") as f: original_content = f.read() if TARGET_CLASS not in original_content: logger.error("Invalid Account.php file structure.") sys.exit(1) # STAGE 2 logger.info("Locating target code functions...") try: pd_open, pd_close = extract_functional_scope(original_content, PD_SIG) hf_open, hf_close = extract_functional_scope(original_content, HF_SIG) except StageValidationError as e: logger.error(str(e)) sys.exit(1) # STAGE 3 logger.info("Applying code modifications...") if "return true;\n // Surgical bypass" in original_content or "return [\n 'account_id' => $this->id," in original_content: print(f"\n{CLR_BOLD}{CLR_GREEN}✅ System is already fully patched. No changes needed.{CLR_RESET}\n") sys.exit(0) shutil.copy2(FILE_PATH, BACKUP_PATH) if pd_open > hf_open: patched = original_content[:pd_open] + MOCK_PLAN_DETAILS + original_content[pd_close + 1:] new_hf_open, new_hf_close = extract_functional_scope(patched, HF_SIG) patched = patched[:new_hf_open] + MOCK_HAS_FEATURE + patched[new_hf_close + 1:] else: patched = original_content[:hf_open] + MOCK_HAS_FEATURE + original_content[hf_close + 1:] new_pd_open, new_pd_close = extract_functional_scope(patched, PD_SIG) patched = patched[:new_pd_open] + MOCK_PLAN_DETAILS + patched[new_pd_close + 1:] # STAGE 4 logger.info("Verifying PHP syntax...") temp_target_file = FILE_PATH + ".tmp" with open(temp_target_file, "w") as f: f.write(patched) lint_verification = subprocess.run(["php", "-l", temp_target_file], capture_output=True, text=True) if lint_verification.returncode != 0: logger.error("PHP syntax check failed! Rolling back changes.") os.remove(temp_target_file) sys.exit(1) shutil.move(temp_target_file, FILE_PATH) # STAGE 5 logger.info("Clearing application caches...") try: subprocess.run( ["sudo", "-u", "www-data", "php", "artisan", "optimize:clear"], cwd=APP_DIR, capture_output=True, text=True, check=True ) print(f"\n{CLR_BOLD}{CLR_GREEN}🎉 Patch successfully applied! Features unlocked.{CLR_RESET}\n") except subprocess.CalledProcessError: logger.warning("Code updated, but failed to clear artisan cache automatically.") if __name__ == "__main__": try: execute_pipeline() except Exception as e: logger.error(f"An unexpected error occurred: {str(e)}") sys.exit(1)