diff --git a/main.py b/main.py new file mode 100644 index 0000000..9cddcc4 --- /dev/null +++ b/main.py @@ -0,0 +1,206 @@ +#!/usr/bin/env python3 +import os +import sys +import json +import shutil +import logging +import datetime +import subprocess +import re + +# --- CONFIGURATION --- +FILE_PATH = "/var/www/invoiceninja/app/Models/Account.php" +BACKUP_PATH = "/var/www/invoiceninja/app/Models/Account.php.bak" +LOG_FILE = "/var/log/ninja_patcher.log" +APP_DIR = "/var/www/invoiceninja" + +# --- ANSI TERMINAL COLORS --- +CLR_RESET = "\033[0m" +CLR_BOLD = "\033[1m" +CLR_GREEN = "\033[32m" +CLR_BLUE = "\033[34m" +CLR_YELLOW = "\033[33m" +CLR_RED = "\033[31m" +CLR_CYAN = "\033[36m" +CLR_MAGENTA = "\033[35m" +CLR_GRAY = "\033[90m" + +# --- LOGGING SETUP --- +logger = logging.getLogger("NinjaPatcher") +logger.setLevel(logging.DEBUG) + +class SimpleConsoleFormatter(logging.Formatter): + def format(self, record): + timestamp = datetime.datetime.now().strftime("%H:%M:%S") + prefix = f"{CLR_GRAY}[{timestamp}]{CLR_RESET}" + + if record.levelno == logging.INFO: + return f"{prefix} {CLR_GREEN}✔{CLR_RESET} {record.getMessage()}" + elif record.levelno == logging.DEBUG: + return f"{prefix} {CLR_BLUE}➔{CLR_RESET} {record.getMessage()}" + elif record.levelno == logging.WARNING: + return f"{prefix} {CLR_YELLOW}⚠{CLR_RESET} {CLR_YELLOW}{record.getMessage()}{CLR_RESET}" + elif record.levelno == logging.ERROR: + return f"{prefix} {CLR_RED}✖{CLR_RESET} {CLR_BOLD}{CLR_RED}{record.getMessage()}{CLR_RESET}" + return f"{prefix} {record.getMessage()}" + +c_handler = logging.StreamHandler(sys.stdout) +c_handler.setLevel(logging.INFO) # Keeps the console output to clean info lines +c_handler.setFormatter(SimpleConsoleFormatter()) +logger.addHandler(c_handler) + +f_handler = logging.FileHandler(LOG_FILE) +f_handler.setLevel(logging.DEBUG) +f_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s')) +logger.addHandler(f_handler) + +# --- TARGETS --- +TARGET_CLASS = "class Account extends BaseModel" +PD_SIG = "public function getPlanDetails($include_inactive = false, $include_trial = true)" +HF_SIG = "public function hasFeature($feature)" + +MOCK_PLAN_DETAILS = """{ + return [ + 'account_id' => $this->id, + 'num_users' => 100, + 'plan_price' => 0, + 'trial' => false, + 'plan' => self::PLAN_ENTERPRISE, + 'started' => now(), + 'expires' => false, + 'paid' => now(), + 'term' => 'yearly', + 'active' => true, + ]; + }""" + +MOCK_HAS_FEATURE = """{ + return true; + }""" + +class StageValidationError(Exception): """Custom Exception""" + +def highlight_php_code(code_str): + highlighted = code_str + keywords = ["public", "function", "class", "extends", "return", "self", "false", "true"] + for kw in keywords: + highlighted = re.sub(rf"\b{kw}\b", f"{CLR_MAGENTA}{kw}{CLR_RESET}", highlighted) + highlighted = re.sub(r"(\$[a-zA-Z_][a-zA-Z0-9_]*)", f"{CLR_CYAN}\\1{CLR_RESET}", highlighted) + highlighted = highlighted.replace("->", f"{CLR_YELLOW}->{CLR_RESET}") + highlighted = highlighted.replace("=>", f"{CLR_YELLOW}=>{CLR_RESET}") + highlighted = re.sub(r"\b([A-Z_][A-Z0-9_]{3,})\b", f"{CLR_BLUE}\\1{CLR_RESET}", highlighted) + return highlighted + +def get_line_number(content, char_index): + return content.count("\n", 0, char_index) + 1 + +def extract_functional_scope(content, signature): + sig_idx = content.find(signature) + if sig_idx == -1: + raise StageValidationError(f"Could not find function: '{signature}'") + + line_num = get_line_number(content, sig_idx) + formatted_sig = highlight_php_code(signature) + + logger.info(f"Found function at line {line_num}:\n {formatted_sig}") + + open_brace_idx = content.find("{", sig_idx + len(signature)) + if open_brace_idx == -1: + raise StageValidationError(f"Missing opening brace for function at line {line_num}") + + bracket_balance = 0 + close_brace_idx = -1 + + for idx in range(open_brace_idx, len(content)): + if content[idx] == "{": + bracket_balance += 1 + elif content[idx] == "}": + bracket_balance -= 1 + if bracket_balance == 0: + close_brace_idx = idx + break + + if close_brace_idx == -1: + raise StageValidationError(f"Mismatched braces in function at line {line_num}") + + return open_brace_idx, close_brace_idx + +def execute_pipeline(): + print(f"\n{CLR_BOLD}{CLR_CYAN}======================================================================{CLR_RESET}") + print(f"{CLR_BOLD}{CLR_CYAN} INVOICE NINJA PATCHER {CLR_RESET}") + print(f"{CLR_BOLD}{CLR_CYAN}======================================================================{CLR_RESET}") + + # STAGE 1 + logger.info("Checking environment and permissions...") + if os.getuid() != 0: + logger.error("Must be run as root.") + sys.exit(1) + + if not os.path.exists(FILE_PATH): + logger.error(f"File missing: {FILE_PATH}") + sys.exit(1) + + with open(FILE_PATH, "r") as f: + original_content = f.read() + + if TARGET_CLASS not in original_content: + logger.error("Invalid Account.php file structure.") + sys.exit(1) + + # STAGE 2 + logger.info("Locating target code functions...") + try: + pd_open, pd_close = extract_functional_scope(original_content, PD_SIG) + hf_open, hf_close = extract_functional_scope(original_content, HF_SIG) + except StageValidationError as e: + logger.error(str(e)) + sys.exit(1) + + # STAGE 3 + logger.info("Applying code modifications...") + if "return true;\n // Surgical bypass" in original_content or "return [\n 'account_id' => $this->id," in original_content: + print(f"\n{CLR_BOLD}{CLR_GREEN}✅ System is already fully patched. No changes needed.{CLR_RESET}\n") + sys.exit(0) + + shutil.copy2(FILE_PATH, BACKUP_PATH) + + if pd_open > hf_open: + patched = original_content[:pd_open] + MOCK_PLAN_DETAILS + original_content[pd_close + 1:] + new_hf_open, new_hf_close = extract_functional_scope(patched, HF_SIG) + patched = patched[:new_hf_open] + MOCK_HAS_FEATURE + patched[new_hf_close + 1:] + else: + patched = original_content[:hf_open] + MOCK_HAS_FEATURE + original_content[hf_close + 1:] + new_pd_open, new_pd_close = extract_functional_scope(patched, PD_SIG) + patched = patched[:new_pd_open] + MOCK_PLAN_DETAILS + patched[new_pd_close + 1:] + + # STAGE 4 + logger.info("Verifying PHP syntax...") + temp_target_file = FILE_PATH + ".tmp" + with open(temp_target_file, "w") as f: + f.write(patched) + + lint_verification = subprocess.run(["php", "-l", temp_target_file], capture_output=True, text=True) + if lint_verification.returncode != 0: + logger.error("PHP syntax check failed! Rolling back changes.") + os.remove(temp_target_file) + sys.exit(1) + + shutil.move(temp_target_file, FILE_PATH) + + # STAGE 5 + logger.info("Clearing application caches...") + try: + subprocess.run( + ["sudo", "-u", "www-data", "php", "artisan", "optimize:clear"], + cwd=APP_DIR, capture_output=True, text=True, check=True + ) + print(f"\n{CLR_BOLD}{CLR_GREEN}🎉 Patch successfully applied! Features unlocked.{CLR_RESET}\n") + except subprocess.CalledProcessError: + logger.warning("Code updated, but failed to clear artisan cache automatically.") + +if __name__ == "__main__": + try: + execute_pipeline() + except Exception as e: + logger.error(f"An unexpected error occurred: {str(e)}") + sys.exit(1) \ No newline at end of file