Movie-Me-Now/app.py
2025-04-03 15:13:40 -05:00

238 lines
8.5 KiB
Python

import json, math, random
from flask import Flask, render_template, request, redirect, url_for, session, flash
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
# Initialize Flask and database
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your_secret_key' # Replace with a secure secret key
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
# -------------------- Models --------------------
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(150), unique=True, nullable=False)
password_hash = db.Column(db.String(150), nullable=False)
# Store the profile as a JSON string mapping movie title to rating.
profile = db.Column(db.Text, nullable=False, default='{}')
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
# Create the database tables (if needed)
with app.app_context():
db.create_all()
# -------------------- Movie Data & Class --------------------
class Movie:
__slots__ = ('title', 'year', 'imdb_rating', 'runtime', 'features', 'description', 'poster')
def __init__(self, data):
self.title = data.get("title", "")
try:
self.year = int(data.get("year", 0))
except (ValueError, TypeError):
self.year = 0
try:
self.imdb_rating = float(data.get("imdb_rating", 0))
except (ValueError, TypeError):
self.imdb_rating = 0.0
try:
self.runtime = int(data.get("runtime", 0))
except (ValueError, TypeError):
self.runtime = 0
# For genres/tags, use a frozenset for fast membership testing.
genres = data.get("genres", [])
tags = data.get("tags", [])
self.features = frozenset(genres + tags)
self.description = data.get("description", "")
self.poster = data.get("poster", "")
# Preload movies from JSON once.
MOVIES = []
MOVIE_INDEX = {} # mapping from title to Movie object
with open('movies.json', 'r', encoding='utf-8') as f:
data = json.load(f)
for entry in data:
movie = Movie(entry)
MOVIES.append(movie)
MOVIE_INDEX[movie.title] = movie
# -------------------- Similarity Functions --------------------
def jaccard_similarity(set1, set2):
if not set1 and not set2:
return 0
inter = len(set1 & set2)
union = len(set1 | set2)
return inter / union if union else 0
def advanced_similarity(movie1, movie2, weights=None):
if weights is None:
weights = {
"genres_tags": 0.5,
"imdb": 0.2,
"year": 0.15,
"runtime": 0.15
}
# Genres/tags similarity via Jaccard index.
sim_genres = jaccard_similarity(movie1.features, movie2.features)
# IMDb rating similarity: difference normalized over a 10-point scale.
sim_imdb = max(0, 1 - abs(movie1.imdb_rating - movie2.imdb_rating) / 10)
# Year similarity using exponential decay.
sim_year = math.exp(-abs(movie1.year - movie2.year) / 10)
# Runtime similarity using exponential decay.
sim_runtime = math.exp(-abs(movie1.runtime - movie2.runtime) / 30)
overall = (weights["genres_tags"] * sim_genres +
weights["imdb"] * sim_imdb +
weights["year"] * sim_year +
weights["runtime"] * sim_runtime)
return overall
def recommend_movie(profile, weights=None):
"""
Given a user profile (dict mapping movie title to rating), return one recommended Movie.
The function iterates over candidate movies (those not rated yet) and computes an aggregated
weighted similarity score against all rated movies.
"""
rated_titles = set(profile.keys())
candidates = [m for m in MOVIES if m.title not in rated_titles]
best_movie = None
best_score = -1
for candidate in candidates:
weighted_scores = []
for title, rating in profile.items():
# Skip movies not seen (rating 0)
if rating == 0:
continue
rated_movie = MOVIE_INDEX.get(title)
if not rated_movie:
continue
# Normalize rating: 1 becomes 0 and 5 becomes 1.
norm_rating = (rating - 1) / 4
sim = advanced_similarity(candidate, rated_movie, weights=weights)
weighted_scores.append(norm_rating * sim)
avg_score = sum(weighted_scores) / len(weighted_scores) if weighted_scores else 0
if avg_score > best_score:
best_score = avg_score
best_movie = candidate
return best_movie
# -------------------- Helper Functions --------------------
import json as pyjson
def get_profile():
"""Return the current user's profile as a dict.
If not logged in, use session storage."""
if 'user_id' in session:
user = User.query.get(session['user_id'])
if user:
try:
return pyjson.loads(user.profile)
except:
return {}
return session.get('profile', {})
def save_profile(profile):
"""Save the profile to the database (if logged in) or session."""
if 'user_id' in session:
user = User.query.get(session['user_id'])
if user:
user.profile = pyjson.dumps(profile)
db.session.commit()
else:
session['profile'] = profile
# -------------------- Routes --------------------
@app.route('/')
def index():
if 'user_id' in session:
return redirect(url_for('survey'))
return redirect(url_for('login'))
# ----- User Login/Register -----
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username').strip()
password = request.form.get('password')
user = User.query.filter_by(username=username).first()
if user and user.check_password(password):
session['user_id'] = user.id
flash("Logged in successfully.", "success")
return redirect(url_for('survey'))
else:
flash("Invalid username or password.", "danger")
return render_template('login.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form.get('username').strip()
password = request.form.get('password')
if User.query.filter_by(username=username).first():
flash("Username already exists.", "danger")
else:
user = User(username=username)
user.set_password(password)
user.profile = pyjson.dumps({})
db.session.add(user)
db.session.commit()
flash("Registration successful. Please log in.", "success")
return redirect(url_for('login'))
return render_template('register.html')
@app.route('/logout')
def logout():
session.clear()
flash("Logged out.", "info")
return redirect(url_for('login'))
# ----- Survey & Recommendation -----
@app.route('/survey', methods=['GET', 'POST'])
def survey():
if request.method == 'POST':
profile = {}
for key, value in request.form.items():
if key.startswith("rating_"):
movie_title = key[len("rating_"):]
try:
rating = float(value)
except (ValueError, TypeError):
rating = 0
profile[movie_title] = rating
# Merge with any existing profile.
current_profile = get_profile()
current_profile.update(profile)
save_profile(current_profile)
return redirect(url_for('recommend'))
else:
# Show 10 random movies for the survey.
sample_movies = random.sample(MOVIES, min(10, len(MOVIES)))
return render_template('survey.html', movies=sample_movies)
@app.route('/recommend')
def recommend():
profile = get_profile()
movie = recommend_movie(profile)
return render_template('recommend.html', movie=movie)
@app.route('/feedback', methods=['POST'])
def feedback():
movie_title = request.form.get("movie_title")
try:
rating = float(request.form.get("rating"))
except (ValueError, TypeError):
rating = 0
profile = get_profile()
profile[movie_title] = rating
save_profile(profile)
return redirect(url_for('recommend'))
if __name__ == '__main__':
app.run(debug=True)