296 lines
9.0 KiB
Python
296 lines
9.0 KiB
Python
from flask import Flask, render_template, redirect, url_for, request, flash, jsonify
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
import os
|
|
import json
|
|
from sqlalchemy import Column, Integer, String, Float, Boolean
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
|
|
app = Flask(__name__)
|
|
app.config['SECRET_KEY'] = 'your-secret-key'
|
|
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.db'
|
|
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
|
|
|
|
db = SQLAlchemy(app)
|
|
login_manager = LoginManager(app)
|
|
login_manager.login_view = 'login'
|
|
|
|
Base = declarative_base()
|
|
dynamic_models = {}
|
|
|
|
class User(UserMixin, db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
username = db.Column(db.String(150), unique=True, nullable=False)
|
|
password_hash = db.Column(db.String(256), nullable=False)
|
|
email = db.Column(db.String(150), unique=True, nullable=True)
|
|
phone = db.Column(db.String(20))
|
|
verified = db.Column(db.Boolean, default=False)
|
|
is_admin = db.Column(db.Boolean, default=False)
|
|
is_mod = db.Column(db.Boolean, default=False)
|
|
is_premium = db.Column(db.Boolean, default=False)
|
|
api_token = db.Column(db.String(256), unique=True, nullable=True)
|
|
group = db.Column(db.String(50), default="default")
|
|
|
|
def set_password(self, password):
|
|
self.password_hash = generate_password_hash(password)
|
|
|
|
def check_password(self, password):
|
|
return check_password_hash(self.password_hash, password)
|
|
|
|
def generate_api_token(self):
|
|
import secrets
|
|
self.api_token = secrets.token_hex(32)
|
|
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
return User.query.get(int(user_id))
|
|
|
|
# Home route
|
|
@app.route('/')
|
|
def home():
|
|
return redirect(url_for('dashboard'))
|
|
|
|
@app.route('/register', methods=['GET', 'POST'])
|
|
def register():
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
email = request.form['email']
|
|
|
|
# Check if the username already exists
|
|
existing_user = User.query.filter_by(username=username).first()
|
|
if existing_user:
|
|
flash('Username already exists. Please choose a different one.')
|
|
return redirect(url_for('register'))
|
|
|
|
hashed_password = generate_password_hash(password)
|
|
is_admin = False
|
|
if User.query.first() is None:
|
|
is_admin = True
|
|
new_user = User(username=username, password_hash=hashed_password, is_admin=is_admin, email=email)
|
|
# Add and commit the new user to the database
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
flash('Registration successful. You can now log in.')
|
|
return redirect(url_for('login'))
|
|
return render_template('register.html')
|
|
|
|
# Login route
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
user = User.query.filter_by(username=username).first()
|
|
if user and user.check_password(password):
|
|
login_user(user)
|
|
return redirect(url_for('dashboard'))
|
|
flash('Invalid username or password.')
|
|
return render_template('login.html')
|
|
|
|
# Logout route
|
|
@app.route('/logout')
|
|
@login_required
|
|
def logout():
|
|
logout_user()
|
|
return redirect(url_for('login'))
|
|
|
|
# Dashboard route
|
|
@app.route('/dashboard')
|
|
@login_required
|
|
def dashboard():
|
|
tables = list(dynamic_models.keys())
|
|
return render_template('dashboard.html', tables=tables)
|
|
|
|
# Create table route
|
|
@app.route('/create_table', methods=['GET', 'POST'])
|
|
@login_required
|
|
def create_table():
|
|
if not current_user.is_admin:
|
|
flash('Access denied.')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
if request.method == 'POST':
|
|
table_name = request.form['table_name']
|
|
columns_json = request.form['columns']
|
|
try:
|
|
columns_list = json.loads(columns_json)
|
|
|
|
if table_name in dynamic_models:
|
|
flash('Table already exists.')
|
|
return redirect(url_for('create_table'))
|
|
|
|
attrs = {
|
|
'__tablename__': table_name,
|
|
'id': Column(Integer, primary_key=True)
|
|
}
|
|
|
|
for col in columns_list:
|
|
col_name = col['name']
|
|
col_type = col['type']
|
|
|
|
if col_type == 'text':
|
|
attrs[col_name] = Column(String(255))
|
|
elif col_type == 'number':
|
|
attrs[col_name] = Column(Integer)
|
|
elif col_type == 'boolean':
|
|
attrs[col_name] = Column(Boolean)
|
|
else:
|
|
flash(f'Unsupported column type: {col_type}')
|
|
return redirect(url_for('create_table'))
|
|
|
|
model = type(table_name.capitalize(), (Base,), attrs)
|
|
dynamic_models[table_name] = model
|
|
|
|
model.__table__.create(bind=db.engine)
|
|
flash(f'Table {table_name} created successfully.')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
except Exception as e:
|
|
flash(f'Error creating table: {e}')
|
|
return redirect(url_for('create_table'))
|
|
|
|
return render_template('create_table.html')
|
|
|
|
|
|
# View table route
|
|
@app.route('/view_table/<table_name>')
|
|
@login_required
|
|
def view_table(table_name):
|
|
model = dynamic_models.get(table_name)
|
|
if not model:
|
|
flash('Table not found.')
|
|
return redirect(url_for('dashboard'))
|
|
conn = db.engine.connect()
|
|
result = conn.execute(model.__table__.select())
|
|
columns = result.keys()
|
|
rows = result.fetchall()
|
|
conn.close()
|
|
return render_template('view_table.html', table_name=table_name, columns=columns, rows=rows)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
## ! API
|
|
|
|
|
|
@app.route('/api/register', methods=['POST'])
|
|
def api_register():
|
|
data = request.json
|
|
username = data.get('username')
|
|
password = data.get('password')
|
|
email = data.get('email')
|
|
|
|
if not username or not password:
|
|
return jsonify({'error': 'Username and password required.'}), 400
|
|
|
|
if User.query.filter_by(username=username).first():
|
|
return jsonify({'error': 'Username already exists.'}), 400
|
|
|
|
user = User(username=username, email=email)
|
|
user.set_password(password)
|
|
user.generate_api_token()
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
|
|
return jsonify({'message': 'Registration successful.', 'api_token': user.api_token})
|
|
|
|
|
|
|
|
@app.route('/api/login', methods=['POST'])
|
|
def api_login():
|
|
data = request.json
|
|
username = data.get('username')
|
|
password = data.get('password')
|
|
|
|
user = User.query.filter_by(username=username).first()
|
|
if user and user.check_password(password):
|
|
if not user.api_token:
|
|
user.generate_api_token()
|
|
db.session.commit()
|
|
return jsonify({'api_token': user.api_token})
|
|
else:
|
|
return jsonify({'error': 'Invalid credentials'}), 401
|
|
|
|
|
|
@app.route('/api/get_table/<table_name>', methods=['GET'])
|
|
def api_get_table(table_name):
|
|
token = request.headers.get('Authorization')
|
|
if not token:
|
|
return jsonify({'error': 'Missing API token.'}), 401
|
|
|
|
user = User.query.filter_by(api_token=token).first()
|
|
if not user:
|
|
return jsonify({'error': 'Invalid API token.'}), 401
|
|
|
|
model = dynamic_models.get(table_name)
|
|
if not model:
|
|
return jsonify({'error': 'Table not found.'}), 404
|
|
|
|
allowed_groups = table_permissions.get(table_name, {}).get('read', [])
|
|
if user.group not in allowed_groups:
|
|
return jsonify({'error': 'Access denied.'}), 403
|
|
|
|
conn = db.engine.connect()
|
|
result = conn.execute(model.__table__.select())
|
|
columns = result.keys()
|
|
rows = [dict(zip(columns, row)) for row in result.fetchall()]
|
|
conn.close()
|
|
|
|
return jsonify({'columns': columns, 'rows': rows})
|
|
|
|
|
|
@app.route('/api/set_table/<table_name>', methods=['POST'])
|
|
def api_set_table(table_name):
|
|
token = request.headers.get('Authorization')
|
|
if not token:
|
|
return jsonify({'error': 'Missing API token.'}), 401
|
|
|
|
user = User.query.filter_by(api_token=token).first()
|
|
if not user:
|
|
return jsonify({'error': 'Invalid API token.'}), 401
|
|
|
|
model = dynamic_models.get(table_name)
|
|
if not model:
|
|
return jsonify({'error': 'Table not found.'}), 404
|
|
|
|
allowed_groups = table_permissions.get(table_name, {}).get('write', [])
|
|
if user.group not in allowed_groups:
|
|
return jsonify({'error': 'Access denied.'}), 403
|
|
|
|
data = request.json
|
|
if not isinstance(data, dict):
|
|
return jsonify({'error': 'Invalid data format.'}), 400
|
|
|
|
conn = db.engine.connect()
|
|
try:
|
|
conn.execute(model.__table__.insert(), [data])
|
|
conn.close()
|
|
return jsonify({'message': 'Insert successful.'})
|
|
except Exception as e:
|
|
conn.close()
|
|
return jsonify({'error': str(e)}), 400
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
with app.app_context():
|
|
db.create_all()
|
|
app.run(debug=True)
|