from flask import Flask, render_template, request, redirect, url_for, flash, session import pandas as pd import os import json import uuid from datetime import datetime from dateutil.relativedelta import relativedelta from plotly.utils import PlotlyJSONEncoder import plotly.express as px import plotly.graph_objects as go app = Flask(__name__) app.secret_key = "your-secret-key" app.config['UPLOAD_FOLDER'] = 'uploads' app.config['DATA_FOLDER'] = 'data' app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) os.makedirs(app.config['DATA_FOLDER'], exist_ok=True) REQUIRED_COLUMNS = [ "Date", "Transaction ID", "Transaction Type", "Currency", "Amount", "Fee", "Net Amount", "Asset Type", "Asset Price", "Asset Amount", "Status", "Notes", "Name of sender/receiver", "Account" ] def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() == 'csv' MERCHANT_MAP = { "fast_food": ["mcdonald", "wendy", "kfc", "taco", "popeyes", "burger", "subway", "chipotle"], "restaurant": ["olive garden", "applebee", "chili", "roadhouse", "ihop", "panda express", "buffalo wild wings"], "shopping": ["walmart", "target", "amazon", "ebay", "costco", "best buy"], "subscription": ["netflix", "spotify", "hulu", "apple", "youtube", "icloud", "google", "xbox"], "gas": ["kwik trip", "shell", "bp", "exxon", "chevron"], "banking": ["transfer", "add cash", "savings internal transfer", "interest"], } def classify_merchant(name): if not isinstance(name, str) or not name.strip(): return "unknown" name = name.strip().lower() for category, keywords in MERCHANT_MAP.items(): if any(k in name for k in keywords): return category words = name.title().split() if len(words) in [2, 3] and all(w.isalpha() and w[0].isupper() for w in words): return "person" return "unknown" def generate_recommendations(df): tips = [] now = datetime.now() current_month = now.strftime('%Y-%m') last_month = (now - relativedelta(months=1)).strftime('%Y-%m') df['Month'] = df['Date'].dt.strftime('%Y-%m') recurring = ( df[df['Amount'] < 0] .groupby('Name of sender/receiver')['Month'] .nunique() .reset_index() ) recurring = recurring[recurring['Month'] >= 2] for _, row in recurring.iterrows(): name = row['Name of sender/receiver'] kind = classify_merchant(name) if kind == "subscription": tips.append(f"You're regularly paying {name}, which looks like a subscription.") elif kind == "person": tips.append(f"You've been sending money to {name} frequently — consider if it's necessary.") elif kind == "unknown": tips.append(f"You’ve made payments to {name} in {row['Month']} different months. Review if it's essential.") potential_subs = df[ (df['Amount'] < 0) & (df['Amount'].abs() <= 20) & (df['Name of sender/receiver'].str.lower().str.contains("netflix|spotify|hulu|apple|google|amazon|prime", na=False)) ] for name in potential_subs['Name of sender/receiver'].unique(): total = abs(potential_subs[potential_subs['Name of sender/receiver'] == name]['Amount'].sum()) tips.append(f"You may be subscribed to {name} — you've spent ${total:.2f} there this month.") # --- Fast food / Restaurant alerts --- for kind in ['fast_food', 'restaurant']: matches = df[ df['Name of sender/receiver'] .str.lower() .fillna('') .apply(lambda x: any(k in x for k in MERCHANT_MAP[kind])) & (df['Amount'] < 0) ] if not matches.empty: totals = matches.groupby('Name of sender/receiver')['Amount'].sum().abs() for vendor, amt in totals.items(): if kind == "fast_food": tips.append(f"You spent ${amt:.2f} at {vendor} this month. Maybe try cooking more?") else: tips.append(f"You spent ${amt:.2f} dining at {vendor} this month.") # --- Month-to-month trend per vendor --- top_this_month = ( df[(df['Month'] == current_month) & (df['Amount'] < 0)] .groupby('Name of sender/receiver')['Amount'] .sum() .abs() .sort_values(ascending=False) ) top_last_month = ( df[(df['Month'] == last_month) & (df['Amount'] < 0)] .groupby('Name of sender/receiver')['Amount'] .sum() .abs() ) for vendor, this_amt in top_this_month.head(5).items(): last_amt = top_last_month.get(vendor, 0) if this_amt > last_amt and last_amt > 0: increase = this_amt - last_amt tips.append(f"You're spending ${increase:.2f} more at {vendor} this month than last month.") if not tips: tips.append("You're doing great this month! 🎉 Keep up the mindful spending.") return tips def clean_dataframe(df): # Strip trailing timezone for parsing df['Date'] = df['Date'].astype(str).str.replace(r'\s+[A-Z]{3,4}$', '', regex=True) df['Date'] = pd.to_datetime(df['Date'], errors='coerce') # Convert amounts to numeric df['Amount'] = df['Amount'].replace('[\$,]', '', regex=True).astype(float) df['Fee'] = df['Fee'].replace('[\$,]', '', regex=True).astype(float) df['Net Amount'] = df['Net Amount'].replace('[\$,]', '', regex=True).astype(float) df = df.dropna(subset=['Date', 'Amount']) return df def get_dataframe(): if 'data_id' in session: path = os.path.join(app.config['DATA_FOLDER'], f"{session['data_id']}.csv") if os.path.exists(path): df = pd.read_csv(path) if all(col in df.columns for col in REQUIRED_COLUMNS): return clean_dataframe(df) return None def save_dataframe(df): if 'data_id' not in session: session['data_id'] = str(uuid.uuid4()) path = os.path.join(app.config['DATA_FOLDER'], f"{session['data_id']}.csv") df.to_csv(path, index=False) def get_monthly_summary(df): df['Month'] = df['Date'].dt.strftime('%Y-%m') expenses = df[df['Amount'] < 0] income = df[df['Amount'] > 0] monthly = pd.DataFrame() monthly['Total_Expenses'] = expenses.groupby('Month')['Amount'].sum().abs() monthly['Total_Income'] = income.groupby('Month')['Amount'].sum() monthly = monthly.fillna(0).reset_index() monthly['Savings'] = monthly['Total_Income'] - monthly['Total_Expenses'] monthly['Savings_Rate'] = (monthly['Savings'] / monthly['Total_Income'].replace(0, 1) * 100).round(1) return monthly def get_category_spending(df): expenses = df[df['Amount'] < 0] grouped = expenses.groupby('Name of sender/receiver')['Amount'].sum().abs().reset_index() grouped.columns = ['Category', 'Total_Spent'] return grouped.sort_values(by='Total_Spent', ascending=False) def generate_monthly_trend_chart(summary): fig = go.Figure() fig.add_trace(go.Bar(x=summary['Month'], y=summary['Total_Income'], name='Income', marker_color='green')) fig.add_trace(go.Bar(x=summary['Month'], y=summary['Total_Expenses'], name='Expenses', marker_color='red')) fig.add_trace(go.Scatter(x=summary['Month'], y=summary['Savings'], mode='lines+markers', name='Net Savings', line=dict(color='blue'))) fig.update_layout(title='Monthly Trends', barmode='group', height=400) return json.dumps(fig, cls=PlotlyJSONEncoder) def generate_category_pie_chart(category_spending): top = category_spending.head(10) if len(category_spending) > 10: other_sum = category_spending.iloc[10:]['Total_Spent'].sum() top = pd.concat([top, pd.DataFrame([{'Category': 'Other', 'Total_Spent': other_sum}])]) fig = px.pie(top, names='Category', values='Total_Spent', hole=0.4) fig.update_layout(title='Spending by Category', height=400) return json.dumps(fig, cls=PlotlyJSONEncoder) def calculate_metrics(df): metrics = {} metrics['total_income'] = df[df['Amount'] > 0]['Amount'].sum() metrics['total_expenses'] = abs(df[df['Amount'] < 0]['Amount'].sum()) metrics['net_worth'] = df['Net Amount'].iloc[-1] if not df.empty else 0 today = datetime.now() start = datetime(today.year, today.month, 1) last_month = start - relativedelta(months=1) current = df[df['Date'] >= start] prev = df[(df['Date'] >= last_month) & (df['Date'] < start)] current_expenses = abs(current[current['Amount'] < 0]['Amount'].sum()) prev_expenses = abs(prev[prev['Amount'] < 0]['Amount'].sum()) metrics['current_month_expenses'] = current_expenses metrics['last_month_expenses'] = prev_expenses metrics['expense_change'] = round(((current_expenses - prev_expenses) / prev_expenses * 100) if prev_expenses else 0, 1) return metrics @app.route('/') def index(): df = get_dataframe() if df is not None: summary = get_monthly_summary(df) categories = get_category_spending(df) trend = generate_monthly_trend_chart(summary) pie = generate_category_pie_chart(categories) metrics = calculate_metrics(df) recent = df.sort_values(by='Date', ascending=False).head(10).to_dict('records') tips = generate_recommendations(df) return render_template("dashboard.html", monthly_trend_chart=trend, category_pie_chart=pie, metrics=metrics, monthly_summary=summary.to_dict('records'), recent_transactions=recent, recommendations=tips ) return render_template("upload.html") @app.route('/transactions') def transactions(): df = get_dataframe() if df is not None: df = df.sort_values(by='Date', ascending=False) return render_template("transactions.html", transactions=df.to_dict('records')) flash("No data available.") return redirect(url_for('index')) @app.route('/upload', methods=['POST']) def upload_file(): if 'file' not in request.files: flash('No file part') return redirect(request.url) file = request.files['file'] if file.filename == '': flash('No selected file') return redirect(request.url) if file and allowed_file(file.filename): try: df = pd.read_csv(file) if not all(col in df.columns for col in REQUIRED_COLUMNS): flash("CSV is missing required columns.") return redirect(url_for('index')) save_dataframe(df) flash("Upload successful.") return redirect(url_for('index')) except Exception as e: flash(f"Error: {e}") return redirect(url_for('index')) flash("Only CSVs allowed.") return redirect(url_for('index')) @app.route('/clear') def clear_data(): if 'data_id' in session: path = os.path.join(app.config['DATA_FOLDER'], f"{session['data_id']}.csv") if os.path.exists(path): os.remove(path) session.pop('data_id') flash("Data cleared.") return redirect(url_for('index')) if __name__ == '__main__': app.run(debug=True)