Add main.py
This commit is contained in:
206
main.py
Normal file
206
main.py
Normal file
@@ -0,0 +1,206 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import json
|
||||||
|
import shutil
|
||||||
|
import logging
|
||||||
|
import datetime
|
||||||
|
import subprocess
|
||||||
|
import re
|
||||||
|
|
||||||
|
# --- CONFIGURATION ---
|
||||||
|
FILE_PATH = "/var/www/invoiceninja/app/Models/Account.php"
|
||||||
|
BACKUP_PATH = "/var/www/invoiceninja/app/Models/Account.php.bak"
|
||||||
|
LOG_FILE = "/var/log/ninja_patcher.log"
|
||||||
|
APP_DIR = "/var/www/invoiceninja"
|
||||||
|
|
||||||
|
# --- ANSI TERMINAL COLORS ---
|
||||||
|
CLR_RESET = "\033[0m"
|
||||||
|
CLR_BOLD = "\033[1m"
|
||||||
|
CLR_GREEN = "\033[32m"
|
||||||
|
CLR_BLUE = "\033[34m"
|
||||||
|
CLR_YELLOW = "\033[33m"
|
||||||
|
CLR_RED = "\033[31m"
|
||||||
|
CLR_CYAN = "\033[36m"
|
||||||
|
CLR_MAGENTA = "\033[35m"
|
||||||
|
CLR_GRAY = "\033[90m"
|
||||||
|
|
||||||
|
# --- LOGGING SETUP ---
|
||||||
|
logger = logging.getLogger("NinjaPatcher")
|
||||||
|
logger.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
class SimpleConsoleFormatter(logging.Formatter):
|
||||||
|
def format(self, record):
|
||||||
|
timestamp = datetime.datetime.now().strftime("%H:%M:%S")
|
||||||
|
prefix = f"{CLR_GRAY}[{timestamp}]{CLR_RESET}"
|
||||||
|
|
||||||
|
if record.levelno == logging.INFO:
|
||||||
|
return f"{prefix} {CLR_GREEN}✔{CLR_RESET} {record.getMessage()}"
|
||||||
|
elif record.levelno == logging.DEBUG:
|
||||||
|
return f"{prefix} {CLR_BLUE}➔{CLR_RESET} {record.getMessage()}"
|
||||||
|
elif record.levelno == logging.WARNING:
|
||||||
|
return f"{prefix} {CLR_YELLOW}⚠{CLR_RESET} {CLR_YELLOW}{record.getMessage()}{CLR_RESET}"
|
||||||
|
elif record.levelno == logging.ERROR:
|
||||||
|
return f"{prefix} {CLR_RED}✖{CLR_RESET} {CLR_BOLD}{CLR_RED}{record.getMessage()}{CLR_RESET}"
|
||||||
|
return f"{prefix} {record.getMessage()}"
|
||||||
|
|
||||||
|
c_handler = logging.StreamHandler(sys.stdout)
|
||||||
|
c_handler.setLevel(logging.INFO) # Keeps the console output to clean info lines
|
||||||
|
c_handler.setFormatter(SimpleConsoleFormatter())
|
||||||
|
logger.addHandler(c_handler)
|
||||||
|
|
||||||
|
f_handler = logging.FileHandler(LOG_FILE)
|
||||||
|
f_handler.setLevel(logging.DEBUG)
|
||||||
|
f_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s'))
|
||||||
|
logger.addHandler(f_handler)
|
||||||
|
|
||||||
|
# --- TARGETS ---
|
||||||
|
TARGET_CLASS = "class Account extends BaseModel"
|
||||||
|
PD_SIG = "public function getPlanDetails($include_inactive = false, $include_trial = true)"
|
||||||
|
HF_SIG = "public function hasFeature($feature)"
|
||||||
|
|
||||||
|
MOCK_PLAN_DETAILS = """{
|
||||||
|
return [
|
||||||
|
'account_id' => $this->id,
|
||||||
|
'num_users' => 100,
|
||||||
|
'plan_price' => 0,
|
||||||
|
'trial' => false,
|
||||||
|
'plan' => self::PLAN_ENTERPRISE,
|
||||||
|
'started' => now(),
|
||||||
|
'expires' => false,
|
||||||
|
'paid' => now(),
|
||||||
|
'term' => 'yearly',
|
||||||
|
'active' => true,
|
||||||
|
];
|
||||||
|
}"""
|
||||||
|
|
||||||
|
MOCK_HAS_FEATURE = """{
|
||||||
|
return true;
|
||||||
|
}"""
|
||||||
|
|
||||||
|
class StageValidationError(Exception): """Custom Exception"""
|
||||||
|
|
||||||
|
def highlight_php_code(code_str):
|
||||||
|
highlighted = code_str
|
||||||
|
keywords = ["public", "function", "class", "extends", "return", "self", "false", "true"]
|
||||||
|
for kw in keywords:
|
||||||
|
highlighted = re.sub(rf"\b{kw}\b", f"{CLR_MAGENTA}{kw}{CLR_RESET}", highlighted)
|
||||||
|
highlighted = re.sub(r"(\$[a-zA-Z_][a-zA-Z0-9_]*)", f"{CLR_CYAN}\\1{CLR_RESET}", highlighted)
|
||||||
|
highlighted = highlighted.replace("->", f"{CLR_YELLOW}->{CLR_RESET}")
|
||||||
|
highlighted = highlighted.replace("=>", f"{CLR_YELLOW}=>{CLR_RESET}")
|
||||||
|
highlighted = re.sub(r"\b([A-Z_][A-Z0-9_]{3,})\b", f"{CLR_BLUE}\\1{CLR_RESET}", highlighted)
|
||||||
|
return highlighted
|
||||||
|
|
||||||
|
def get_line_number(content, char_index):
|
||||||
|
return content.count("\n", 0, char_index) + 1
|
||||||
|
|
||||||
|
def extract_functional_scope(content, signature):
|
||||||
|
sig_idx = content.find(signature)
|
||||||
|
if sig_idx == -1:
|
||||||
|
raise StageValidationError(f"Could not find function: '{signature}'")
|
||||||
|
|
||||||
|
line_num = get_line_number(content, sig_idx)
|
||||||
|
formatted_sig = highlight_php_code(signature)
|
||||||
|
|
||||||
|
logger.info(f"Found function at line {line_num}:\n {formatted_sig}")
|
||||||
|
|
||||||
|
open_brace_idx = content.find("{", sig_idx + len(signature))
|
||||||
|
if open_brace_idx == -1:
|
||||||
|
raise StageValidationError(f"Missing opening brace for function at line {line_num}")
|
||||||
|
|
||||||
|
bracket_balance = 0
|
||||||
|
close_brace_idx = -1
|
||||||
|
|
||||||
|
for idx in range(open_brace_idx, len(content)):
|
||||||
|
if content[idx] == "{":
|
||||||
|
bracket_balance += 1
|
||||||
|
elif content[idx] == "}":
|
||||||
|
bracket_balance -= 1
|
||||||
|
if bracket_balance == 0:
|
||||||
|
close_brace_idx = idx
|
||||||
|
break
|
||||||
|
|
||||||
|
if close_brace_idx == -1:
|
||||||
|
raise StageValidationError(f"Mismatched braces in function at line {line_num}")
|
||||||
|
|
||||||
|
return open_brace_idx, close_brace_idx
|
||||||
|
|
||||||
|
def execute_pipeline():
|
||||||
|
print(f"\n{CLR_BOLD}{CLR_CYAN}======================================================================{CLR_RESET}")
|
||||||
|
print(f"{CLR_BOLD}{CLR_CYAN} INVOICE NINJA PATCHER {CLR_RESET}")
|
||||||
|
print(f"{CLR_BOLD}{CLR_CYAN}======================================================================{CLR_RESET}")
|
||||||
|
|
||||||
|
# STAGE 1
|
||||||
|
logger.info("Checking environment and permissions...")
|
||||||
|
if os.getuid() != 0:
|
||||||
|
logger.error("Must be run as root.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
if not os.path.exists(FILE_PATH):
|
||||||
|
logger.error(f"File missing: {FILE_PATH}")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
with open(FILE_PATH, "r") as f:
|
||||||
|
original_content = f.read()
|
||||||
|
|
||||||
|
if TARGET_CLASS not in original_content:
|
||||||
|
logger.error("Invalid Account.php file structure.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# STAGE 2
|
||||||
|
logger.info("Locating target code functions...")
|
||||||
|
try:
|
||||||
|
pd_open, pd_close = extract_functional_scope(original_content, PD_SIG)
|
||||||
|
hf_open, hf_close = extract_functional_scope(original_content, HF_SIG)
|
||||||
|
except StageValidationError as e:
|
||||||
|
logger.error(str(e))
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
# STAGE 3
|
||||||
|
logger.info("Applying code modifications...")
|
||||||
|
if "return true;\n // Surgical bypass" in original_content or "return [\n 'account_id' => $this->id," in original_content:
|
||||||
|
print(f"\n{CLR_BOLD}{CLR_GREEN}✅ System is already fully patched. No changes needed.{CLR_RESET}\n")
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
shutil.copy2(FILE_PATH, BACKUP_PATH)
|
||||||
|
|
||||||
|
if pd_open > hf_open:
|
||||||
|
patched = original_content[:pd_open] + MOCK_PLAN_DETAILS + original_content[pd_close + 1:]
|
||||||
|
new_hf_open, new_hf_close = extract_functional_scope(patched, HF_SIG)
|
||||||
|
patched = patched[:new_hf_open] + MOCK_HAS_FEATURE + patched[new_hf_close + 1:]
|
||||||
|
else:
|
||||||
|
patched = original_content[:hf_open] + MOCK_HAS_FEATURE + original_content[hf_close + 1:]
|
||||||
|
new_pd_open, new_pd_close = extract_functional_scope(patched, PD_SIG)
|
||||||
|
patched = patched[:new_pd_open] + MOCK_PLAN_DETAILS + patched[new_pd_close + 1:]
|
||||||
|
|
||||||
|
# STAGE 4
|
||||||
|
logger.info("Verifying PHP syntax...")
|
||||||
|
temp_target_file = FILE_PATH + ".tmp"
|
||||||
|
with open(temp_target_file, "w") as f:
|
||||||
|
f.write(patched)
|
||||||
|
|
||||||
|
lint_verification = subprocess.run(["php", "-l", temp_target_file], capture_output=True, text=True)
|
||||||
|
if lint_verification.returncode != 0:
|
||||||
|
logger.error("PHP syntax check failed! Rolling back changes.")
|
||||||
|
os.remove(temp_target_file)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
shutil.move(temp_target_file, FILE_PATH)
|
||||||
|
|
||||||
|
# STAGE 5
|
||||||
|
logger.info("Clearing application caches...")
|
||||||
|
try:
|
||||||
|
subprocess.run(
|
||||||
|
["sudo", "-u", "www-data", "php", "artisan", "optimize:clear"],
|
||||||
|
cwd=APP_DIR, capture_output=True, text=True, check=True
|
||||||
|
)
|
||||||
|
print(f"\n{CLR_BOLD}{CLR_GREEN}🎉 Patch successfully applied! Features unlocked.{CLR_RESET}\n")
|
||||||
|
except subprocess.CalledProcessError:
|
||||||
|
logger.warning("Code updated, but failed to clear artisan cache automatically.")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
try:
|
||||||
|
execute_pipeline()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"An unexpected error occurred: {str(e)}")
|
||||||
|
sys.exit(1)
|
||||||
Reference in New Issue
Block a user