from flask import Flask, render_template, redirect, url_for, request, flash, jsonify from flask_sqlalchemy import SQLAlchemy from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user from werkzeug.security import generate_password_hash, check_password_hash import os import json from sqlalchemy import Column, Integer, String, Float, Boolean from sqlalchemy.ext.declarative import declarative_base app = Flask(__name__) app.config['SECRET_KEY'] = 'your-secret-key' app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False db = SQLAlchemy(app) login_manager = LoginManager(app) login_manager.login_view = 'login' Base = declarative_base() dynamic_models = {} class User(UserMixin, db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(150), unique=True, nullable=False) password_hash = db.Column(db.String(256), nullable=False) email = db.Column(db.String(150), unique=True, nullable=True) phone = db.Column(db.String(20)) verified = db.Column(db.Boolean, default=False) is_admin = db.Column(db.Boolean, default=False) is_mod = db.Column(db.Boolean, default=False) is_premium = db.Column(db.Boolean, default=False) api_token = db.Column(db.String(256), unique=True, nullable=True) group = db.Column(db.String(50), default="default") def set_password(self, password): self.password_hash = generate_password_hash(password) def check_password(self, password): return check_password_hash(self.password_hash, password) def generate_api_token(self): import secrets self.api_token = secrets.token_hex(32) @login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id)) # Home route @app.route('/') def home(): return redirect(url_for('dashboard')) @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] email = request.form['email'] # Check if the username already exists existing_user = User.query.filter_by(username=username).first() if existing_user: flash('Username already exists. Please choose a different one.') return redirect(url_for('register')) hashed_password = generate_password_hash(password) is_admin = False if User.query.first() is None: is_admin = True new_user = User(username=username, password_hash=hashed_password, is_admin=is_admin, email=email) # Add and commit the new user to the database db.session.add(new_user) db.session.commit() flash('Registration successful. You can now log in.') return redirect(url_for('login')) return render_template('register.html') # Login route @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] user = User.query.filter_by(username=username).first() if user and user.check_password(password): login_user(user) return redirect(url_for('dashboard')) flash('Invalid username or password.') return render_template('login.html') # Logout route @app.route('/logout') @login_required def logout(): logout_user() return redirect(url_for('login')) # Dashboard route @app.route('/dashboard') @login_required def dashboard(): tables = list(dynamic_models.keys()) return render_template('dashboard.html', tables=tables) # Create table route @app.route('/create_table', methods=['GET', 'POST']) @login_required def create_table(): if not current_user.is_admin: flash('Access denied.') return redirect(url_for('dashboard')) if request.method == 'POST': table_name = request.form['table_name'] columns_json = request.form['columns'] try: columns_list = json.loads(columns_json) if table_name in dynamic_models: flash('Table already exists.') return redirect(url_for('create_table')) attrs = { '__tablename__': table_name, 'id': Column(Integer, primary_key=True) } for col in columns_list: col_name = col['name'] col_type = col['type'] if col_type == 'text': attrs[col_name] = Column(String(255)) elif col_type == 'number': attrs[col_name] = Column(Integer) elif col_type == 'boolean': attrs[col_name] = Column(Boolean) else: flash(f'Unsupported column type: {col_type}') return redirect(url_for('create_table')) model = type(table_name.capitalize(), (Base,), attrs) dynamic_models[table_name] = model model.__table__.create(bind=db.engine) flash(f'Table {table_name} created successfully.') return redirect(url_for('dashboard')) except Exception as e: flash(f'Error creating table: {e}') return redirect(url_for('create_table')) return render_template('create_table.html') # View table route @app.route('/view_table/') @login_required def view_table(table_name): model = dynamic_models.get(table_name) if not model: flash('Table not found.') return redirect(url_for('dashboard')) conn = db.engine.connect() result = conn.execute(model.__table__.select()) columns = result.keys() rows = result.fetchall() conn.close() return render_template('view_table.html', table_name=table_name, columns=columns, rows=rows) ## ! API @app.route('/api/register', methods=['POST']) def api_register(): data = request.json username = data.get('username') password = data.get('password') email = data.get('email') if not username or not password: return jsonify({'error': 'Username and password required.'}), 400 if User.query.filter_by(username=username).first(): return jsonify({'error': 'Username already exists.'}), 400 user = User(username=username, email=email) user.set_password(password) user.generate_api_token() db.session.add(user) db.session.commit() return jsonify({'message': 'Registration successful.', 'api_token': user.api_token}) @app.route('/api/login', methods=['POST']) def api_login(): data = request.json username = data.get('username') password = data.get('password') user = User.query.filter_by(username=username).first() if user and user.check_password(password): if not user.api_token: user.generate_api_token() db.session.commit() return jsonify({'api_token': user.api_token}) else: return jsonify({'error': 'Invalid credentials'}), 401 @app.route('/api/get_table/', methods=['GET']) def api_get_table(table_name): token = request.headers.get('Authorization') if not token: return jsonify({'error': 'Missing API token.'}), 401 user = User.query.filter_by(api_token=token).first() if not user: return jsonify({'error': 'Invalid API token.'}), 401 model = dynamic_models.get(table_name) if not model: return jsonify({'error': 'Table not found.'}), 404 allowed_groups = table_permissions.get(table_name, {}).get('read', []) if user.group not in allowed_groups: return jsonify({'error': 'Access denied.'}), 403 conn = db.engine.connect() result = conn.execute(model.__table__.select()) columns = result.keys() rows = [dict(zip(columns, row)) for row in result.fetchall()] conn.close() return jsonify({'columns': columns, 'rows': rows}) @app.route('/api/set_table/', methods=['POST']) def api_set_table(table_name): token = request.headers.get('Authorization') if not token: return jsonify({'error': 'Missing API token.'}), 401 user = User.query.filter_by(api_token=token).first() if not user: return jsonify({'error': 'Invalid API token.'}), 401 model = dynamic_models.get(table_name) if not model: return jsonify({'error': 'Table not found.'}), 404 allowed_groups = table_permissions.get(table_name, {}).get('write', []) if user.group not in allowed_groups: return jsonify({'error': 'Access denied.'}), 403 data = request.json if not isinstance(data, dict): return jsonify({'error': 'Invalid data format.'}), 400 conn = db.engine.connect() try: conn.execute(model.__table__.insert(), [data]) conn.close() return jsonify({'message': 'Insert successful.'}) except Exception as e: conn.close() return jsonify({'error': str(e)}), 400 if __name__ == '__main__': with app.app_context(): db.create_all() app.run(debug=True)