238 lines
8.5 KiB
Python
238 lines
8.5 KiB
Python
import json, math, random
|
|
from flask import Flask, render_template, request, redirect, url_for, session, flash
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
|
|
# Initialize Flask and database
|
|
app = Flask(__name__)
|
|
app.config['SECRET_KEY'] = 'your_secret_key' # Replace with a secure secret key
|
|
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db'
|
|
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
|
|
db = SQLAlchemy(app)
|
|
|
|
# -------------------- Models --------------------
|
|
class User(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
username = db.Column(db.String(150), unique=True, nullable=False)
|
|
password_hash = db.Column(db.String(150), nullable=False)
|
|
# Store the profile as a JSON string mapping movie title to rating.
|
|
profile = db.Column(db.Text, nullable=False, default='{}')
|
|
|
|
def set_password(self, password):
|
|
self.password_hash = generate_password_hash(password)
|
|
|
|
def check_password(self, password):
|
|
return check_password_hash(self.password_hash, password)
|
|
|
|
# Create the database tables (if needed)
|
|
with app.app_context():
|
|
db.create_all()
|
|
|
|
# -------------------- Movie Data & Class --------------------
|
|
class Movie:
|
|
__slots__ = ('title', 'year', 'imdb_rating', 'runtime', 'features', 'description', 'poster')
|
|
def __init__(self, data):
|
|
self.title = data.get("title", "")
|
|
try:
|
|
self.year = int(data.get("year", 0))
|
|
except (ValueError, TypeError):
|
|
self.year = 0
|
|
try:
|
|
self.imdb_rating = float(data.get("imdb_rating", 0))
|
|
except (ValueError, TypeError):
|
|
self.imdb_rating = 0.0
|
|
try:
|
|
self.runtime = int(data.get("runtime", 0))
|
|
except (ValueError, TypeError):
|
|
self.runtime = 0
|
|
# For genres/tags, use a frozenset for fast membership testing.
|
|
genres = data.get("genres", [])
|
|
tags = data.get("tags", [])
|
|
self.features = frozenset(genres + tags)
|
|
self.description = data.get("description", "")
|
|
self.poster = data.get("poster", "")
|
|
|
|
# Preload movies from JSON once.
|
|
MOVIES = []
|
|
MOVIE_INDEX = {} # mapping from title to Movie object
|
|
with open('movies.json', 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
for entry in data:
|
|
movie = Movie(entry)
|
|
MOVIES.append(movie)
|
|
MOVIE_INDEX[movie.title] = movie
|
|
|
|
# -------------------- Similarity Functions --------------------
|
|
def jaccard_similarity(set1, set2):
|
|
if not set1 and not set2:
|
|
return 0
|
|
inter = len(set1 & set2)
|
|
union = len(set1 | set2)
|
|
return inter / union if union else 0
|
|
|
|
def advanced_similarity(movie1, movie2, weights=None):
|
|
if weights is None:
|
|
weights = {
|
|
"genres_tags": 0.5,
|
|
"imdb": 0.2,
|
|
"year": 0.15,
|
|
"runtime": 0.15
|
|
}
|
|
# Genres/tags similarity via Jaccard index.
|
|
sim_genres = jaccard_similarity(movie1.features, movie2.features)
|
|
# IMDb rating similarity: difference normalized over a 10-point scale.
|
|
sim_imdb = max(0, 1 - abs(movie1.imdb_rating - movie2.imdb_rating) / 10)
|
|
# Year similarity using exponential decay.
|
|
sim_year = math.exp(-abs(movie1.year - movie2.year) / 10)
|
|
# Runtime similarity using exponential decay.
|
|
sim_runtime = math.exp(-abs(movie1.runtime - movie2.runtime) / 30)
|
|
overall = (weights["genres_tags"] * sim_genres +
|
|
weights["imdb"] * sim_imdb +
|
|
weights["year"] * sim_year +
|
|
weights["runtime"] * sim_runtime)
|
|
return overall
|
|
|
|
def recommend_movie(profile, weights=None):
|
|
"""
|
|
Given a user profile (dict mapping movie title to rating), return one recommended Movie.
|
|
The function iterates over candidate movies (those not rated yet) and computes an aggregated
|
|
weighted similarity score against all rated movies.
|
|
"""
|
|
rated_titles = set(profile.keys())
|
|
candidates = [m for m in MOVIES if m.title not in rated_titles]
|
|
|
|
best_movie = None
|
|
best_score = -1
|
|
for candidate in candidates:
|
|
weighted_scores = []
|
|
for title, rating in profile.items():
|
|
# Skip movies not seen (rating 0)
|
|
if rating == 0:
|
|
continue
|
|
rated_movie = MOVIE_INDEX.get(title)
|
|
if not rated_movie:
|
|
continue
|
|
# Normalize rating: 1 becomes 0 and 5 becomes 1.
|
|
norm_rating = (rating - 1) / 4
|
|
sim = advanced_similarity(candidate, rated_movie, weights=weights)
|
|
weighted_scores.append(norm_rating * sim)
|
|
avg_score = sum(weighted_scores) / len(weighted_scores) if weighted_scores else 0
|
|
if avg_score > best_score:
|
|
best_score = avg_score
|
|
best_movie = candidate
|
|
return best_movie
|
|
|
|
# -------------------- Helper Functions --------------------
|
|
import json as pyjson
|
|
|
|
def get_profile():
|
|
"""Return the current user's profile as a dict.
|
|
If not logged in, use session storage."""
|
|
if 'user_id' in session:
|
|
user = User.query.get(session['user_id'])
|
|
if user:
|
|
try:
|
|
return pyjson.loads(user.profile)
|
|
except:
|
|
return {}
|
|
return session.get('profile', {})
|
|
|
|
def save_profile(profile):
|
|
"""Save the profile to the database (if logged in) or session."""
|
|
if 'user_id' in session:
|
|
user = User.query.get(session['user_id'])
|
|
if user:
|
|
user.profile = pyjson.dumps(profile)
|
|
db.session.commit()
|
|
else:
|
|
session['profile'] = profile
|
|
|
|
# -------------------- Routes --------------------
|
|
@app.route('/')
|
|
def index():
|
|
if 'user_id' in session:
|
|
return redirect(url_for('survey'))
|
|
return redirect(url_for('login'))
|
|
|
|
# ----- User Login/Register -----
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if request.method == 'POST':
|
|
username = request.form.get('username').strip()
|
|
password = request.form.get('password')
|
|
user = User.query.filter_by(username=username).first()
|
|
if user and user.check_password(password):
|
|
session['user_id'] = user.id
|
|
flash("Logged in successfully.", "success")
|
|
return redirect(url_for('survey'))
|
|
else:
|
|
flash("Invalid username or password.", "danger")
|
|
return render_template('login.html')
|
|
|
|
@app.route('/register', methods=['GET', 'POST'])
|
|
def register():
|
|
if request.method == 'POST':
|
|
username = request.form.get('username').strip()
|
|
password = request.form.get('password')
|
|
if User.query.filter_by(username=username).first():
|
|
flash("Username already exists.", "danger")
|
|
else:
|
|
user = User(username=username)
|
|
user.set_password(password)
|
|
user.profile = pyjson.dumps({})
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
flash("Registration successful. Please log in.", "success")
|
|
return redirect(url_for('login'))
|
|
return render_template('register.html')
|
|
|
|
@app.route('/logout')
|
|
def logout():
|
|
session.clear()
|
|
flash("Logged out.", "info")
|
|
return redirect(url_for('login'))
|
|
|
|
# ----- Survey & Recommendation -----
|
|
@app.route('/survey', methods=['GET', 'POST'])
|
|
def survey():
|
|
if request.method == 'POST':
|
|
profile = {}
|
|
for key, value in request.form.items():
|
|
if key.startswith("rating_"):
|
|
movie_title = key[len("rating_"):]
|
|
try:
|
|
rating = float(value)
|
|
except (ValueError, TypeError):
|
|
rating = 0
|
|
profile[movie_title] = rating
|
|
# Merge with any existing profile.
|
|
current_profile = get_profile()
|
|
current_profile.update(profile)
|
|
save_profile(current_profile)
|
|
return redirect(url_for('recommend'))
|
|
else:
|
|
# Show 10 random movies for the survey.
|
|
sample_movies = random.sample(MOVIES, min(10, len(MOVIES)))
|
|
return render_template('survey.html', movies=sample_movies)
|
|
|
|
@app.route('/recommend')
|
|
def recommend():
|
|
profile = get_profile()
|
|
movie = recommend_movie(profile)
|
|
return render_template('recommend.html', movie=movie)
|
|
|
|
@app.route('/feedback', methods=['POST'])
|
|
def feedback():
|
|
movie_title = request.form.get("movie_title")
|
|
try:
|
|
rating = float(request.form.get("rating"))
|
|
except (ValueError, TypeError):
|
|
rating = 0
|
|
profile = get_profile()
|
|
profile[movie_title] = rating
|
|
save_profile(profile)
|
|
return redirect(url_for('recommend'))
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug=True)
|