first commit

This commit is contained in:
dgsoft
2025-10-14 21:27:41 +02:00
commit 44b8667f31
40 changed files with 4619 additions and 0 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

187
app/routes/admin.py Normal file
View File

@@ -0,0 +1,187 @@
from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify
from flask_login import login_required, current_user
from functools import wraps
from app.models import User, db
from werkzeug.security import generate_password_hash
admin_bp = Blueprint('admin', __name__, url_prefix='/admin')
def admin_required(f):
@wraps(f)
@login_required
def decorated_function(*args, **kwargs):
if not current_user.is_admin:
flash('Zugriff verweigert. Administrator-Rechte erforderlich.', 'error')
return redirect(url_for('main.dashboard'))
return f(*args, **kwargs)
return decorated_function
@admin_bp.route('/')
@admin_required
def admin_dashboard():
"""Admin Dashboard mit Benutzerübersicht"""
users = User.query.order_by(User.created_at.desc()).all()
return render_template('admin/dashboard.html', users=users)
@admin_bp.route('/users')
@admin_required
def users():
"""Benutzerverwaltung"""
users = User.query.order_by(User.created_at.desc()).all()
return render_template('admin/users.html', users=users)
@admin_bp.route('/users/create', methods=['GET', 'POST'])
@admin_required
def create_user():
"""Neuen Benutzer erstellen"""
if request.method == 'POST':
username = request.form.get('username', '').strip()
email = request.form.get('email', '').strip()
password = request.form.get('password', '').strip()
is_admin = 'is_admin' in request.form
# Validierung
if not username or not email or not password:
flash('Alle Felder sind erforderlich.', 'error')
return render_template('admin/create_user.html')
# Prüfe ob Benutzername bereits existiert
if User.query.filter_by(username=username).first():
flash('Benutzername bereits vergeben.', 'error')
return render_template('admin/create_user.html')
# Prüfe ob E-Mail bereits existiert
if User.query.filter_by(email=email).first():
flash('E-Mail-Adresse bereits vergeben.', 'error')
return render_template('admin/create_user.html')
try:
# Benutzer erstellen
user = User(
username=username,
email=email,
is_admin=is_admin
)
user.set_password(password)
db.session.add(user)
db.session.commit()
flash(f'Benutzer "{username}" erfolgreich erstellt.', 'success')
return redirect(url_for('admin.users'))
except Exception as e:
db.session.rollback()
flash(f'Fehler beim Erstellen des Benutzers: {str(e)}', 'error')
return render_template('admin/create_user.html')
@admin_bp.route('/users/<int:user_id>/edit', methods=['GET', 'POST'])
@admin_required
def edit_user(user_id):
"""Benutzer bearbeiten"""
user = User.query.get_or_404(user_id)
if request.method == 'POST':
username = request.form.get('username', '').strip()
email = request.form.get('email', '').strip()
password = request.form.get('password', '').strip()
is_admin = 'is_admin' in request.form
# Validierung
if not username or not email:
flash('Benutzername und E-Mail sind erforderlich.', 'error')
return render_template('admin/edit_user.html', user=user)
# Prüfe ob Benutzername bereits von anderem User verwendet wird
existing_user = User.query.filter_by(username=username).first()
if existing_user and existing_user.id != user_id:
flash('Benutzername bereits vergeben.', 'error')
return render_template('admin/edit_user.html', user=user)
# Prüfe ob E-Mail bereits von anderem User verwendet wird
existing_email = User.query.filter_by(email=email).first()
if existing_email and existing_email.id != user_id:
flash('E-Mail-Adresse bereits vergeben.', 'error')
return render_template('admin/edit_user.html', user=user)
try:
# Benutzer aktualisieren
user.username = username
user.email = email
user.is_admin = is_admin
# Passwort nur ändern wenn angegeben
if password:
user.set_password(password)
db.session.commit()
flash(f'Benutzer "{username}" erfolgreich aktualisiert.', 'success')
return redirect(url_for('admin.users'))
except Exception as e:
db.session.rollback()
flash(f'Fehler beim Aktualisieren des Benutzers: {str(e)}', 'error')
return render_template('admin/edit_user.html', user=user)
@admin_bp.route('/users/<int:user_id>/delete', methods=['POST'])
@admin_required
def delete_user(user_id):
"""Benutzer löschen"""
user = User.query.get_or_404(user_id)
# Verhindere dass der Admin sich selbst löscht
if user.id == current_user.id:
flash('Sie können sich nicht selbst löschen.', 'error')
return redirect(url_for('admin.users'))
# Verhindere das Löschen des letzten Admins
if user.is_admin:
admin_count = User.query.filter_by(is_admin=True).count()
if admin_count <= 1:
flash('Der letzte Administrator kann nicht gelöscht werden.', 'error')
return redirect(url_for('admin.users'))
try:
username = user.username
db.session.delete(user)
db.session.commit()
flash(f'Benutzer "{username}" erfolgreich gelöscht.', 'success')
except Exception as e:
db.session.rollback()
flash(f'Fehler beim Löschen des Benutzers: {str(e)}', 'error')
return redirect(url_for('admin.users'))
@admin_bp.route('/users/<int:user_id>/toggle_admin', methods=['POST'])
@admin_required
def toggle_admin(user_id):
"""Admin-Status umschalten"""
user = User.query.get_or_404(user_id)
# Verhindere dass der Admin sich selbst die Admin-Rechte entzieht
if user.id == current_user.id and user.is_admin:
return jsonify({'error': 'Sie können sich nicht selbst die Admin-Rechte entziehen.'}), 400
# Verhindere das Entziehen der Admin-Rechte des letzten Admins
if user.is_admin:
admin_count = User.query.filter_by(is_admin=True).count()
if admin_count <= 1:
return jsonify({'error': 'Der letzte Administrator kann nicht degradiert werden.'}), 400
try:
user.is_admin = not user.is_admin
db.session.commit()
status = 'Administrator' if user.is_admin else 'Benutzer'
return jsonify({
'success': True,
'message': f'{user.username} ist jetzt {status}.',
'is_admin': user.is_admin
})
except Exception as e:
db.session.rollback()
return jsonify({'error': str(e)}), 500

186
app/routes/api.py Normal file
View File

@@ -0,0 +1,186 @@
from flask import Blueprint, jsonify, request, make_response
from flask_login import login_required, current_user
from app.models import SavedQuery, db
from app.services.database_service import DatabaseService
from app.services.database_manager import DatabaseManager
import csv
import io
import json
api_bp = Blueprint('api', __name__)
@api_bp.route('/queries', methods=['GET'])
@login_required
def get_saved_queries():
"""API-Endpunkt um alle gespeicherten Queries zu erhalten"""
try:
queries = SavedQuery.query.filter_by(user_id=current_user.id).all()
return jsonify({
'success': True,
'queries': [query.to_dict() for query in queries]
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@api_bp.route('/queries/<query_name>', methods=['GET'])
@login_required
def get_query_by_name(query_name):
"""API-Endpunkt um eine gespeicherte Query nach Namen zu erhalten"""
try:
query = SavedQuery.query.filter_by(name=query_name, user_id=current_user.id).first()
if not query:
return jsonify({'error': 'Query nicht gefunden'}), 404
return jsonify({
'success': True,
'query': query.to_dict()
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@api_bp.route('/queries/<query_name>/execute', methods=['GET'])
@login_required
def execute_saved_query(query_name):
"""API-Endpunkt um eine gespeicherte Query auszuführen und Ergebnisse zu erhalten"""
try:
# Finde gespeicherte Query
saved_query = SavedQuery.query.filter_by(name=query_name, user_id=current_user.id).first()
if not saved_query:
return jsonify({'error': 'Query nicht gefunden'}), 404
# Datenbankverbindung bestimmen
connection = request.args.get('connection', 'oracle')
# Führe Query aus
db_manager = DatabaseManager()
db_service = db_manager.get_database_service(connection)
results = db_service.execute_query(saved_query.query_text)
# Format bestimmen (JSON oder CSV)
format_type = request.args.get('format', 'json').lower()
if format_type == 'csv':
return export_results_as_csv(results, query_name)
else:
return jsonify({
'success': True,
'query_name': query_name,
'results': results
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@api_bp.route('/queries/<query_name>/export/csv', methods=['GET'])
@login_required
def export_query_results_csv(query_name):
"""Exportiere Query-Ergebnisse als CSV"""
try:
# Finde gespeicherte Query
saved_query = SavedQuery.query.filter_by(name=query_name, user_id=current_user.id).first()
if not saved_query:
return jsonify({'error': 'Query nicht gefunden'}), 404
# Datenbankverbindung bestimmen
connection = request.args.get('connection', 'oracle')
# Führe Query aus
db_manager = DatabaseManager()
db_service = db_manager.get_database_service(connection)
results = db_service.execute_query(saved_query.query_text)
return export_results_as_csv(results, query_name)
except Exception as e:
return jsonify({'error': str(e)}), 500
@api_bp.route('/queries/<query_name>/export/json', methods=['GET'])
@login_required
def export_query_results_json(query_name):
"""Exportiere Query-Ergebnisse als JSON"""
try:
# Finde gespeicherte Query
saved_query = SavedQuery.query.filter_by(name=query_name, user_id=current_user.id).first()
if not saved_query:
return jsonify({'error': 'Query nicht gefunden'}), 404
# Datenbankverbindung bestimmen
connection = request.args.get('connection', 'oracle')
# Führe Query aus
db_manager = DatabaseManager()
db_service = db_manager.get_database_service(connection)
results = db_service.execute_query(saved_query.query_text)
# Erstelle JSON-Response mit Download-Header
response = make_response(json.dumps({
'query_name': query_name,
'results': results
}, indent=2))
response.headers['Content-Type'] = 'application/json'
response.headers['Content-Disposition'] = f'attachment; filename="{query_name}_results.json"'
return response
except Exception as e:
return jsonify({'error': str(e)}), 500
@api_bp.route('/update_query_name/<int:query_id>', methods=['PUT'])
@login_required
def update_query_name(query_id):
"""API-Endpunkt um den Namen einer gespeicherten Query zu aktualisieren"""
try:
data = request.get_json()
new_name = data.get('name', '').strip()
if not new_name:
return jsonify({'error': 'Name ist erforderlich'}), 400
# Finde Query
query = SavedQuery.query.filter_by(id=query_id, user_id=current_user.id).first()
if not query:
return jsonify({'error': 'Query nicht gefunden'}), 404
# Prüfe ob Name bereits existiert
existing_query = SavedQuery.query.filter_by(name=new_name, user_id=current_user.id).first()
if existing_query and existing_query.id != query_id:
return jsonify({'error': 'Eine Query mit diesem Namen existiert bereits'}), 400
# Update Name
query.name = new_name
db.session.commit()
return jsonify({
'success': True,
'message': 'Query-Name erfolgreich aktualisiert',
'query': query.to_dict()
})
except Exception as e:
db.session.rollback()
return jsonify({'error': str(e)}), 500
def export_results_as_csv(results, filename):
"""Hilfsfunktion um Ergebnisse als CSV zu exportieren"""
if not results or 'data' not in results or not results['data']:
return jsonify({'error': 'Keine Daten zum Exportieren'}), 400
# Erstelle CSV
output = io.StringIO()
# Headers schreiben
if results['columns']:
writer = csv.writer(output)
writer.writerow(results['columns'])
# Daten schreiben
for row in results['data']:
writer.writerow(row)
# Response erstellen
response = make_response(output.getvalue())
response.headers['Content-Type'] = 'text/csv'
response.headers['Content-Disposition'] = f'attachment; filename="{filename}_results.csv"'
return response

56
app/routes/auth.py Normal file
View File

@@ -0,0 +1,56 @@
from flask import Blueprint, render_template, request, redirect, url_for, flash
from flask_login import login_user, logout_user, login_required, current_user
from app.models import User
from app import db
auth_bp = Blueprint('auth', __name__)
@auth_bp.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
user = User.query.filter_by(username=username).first()
if user and user.check_password(password):
login_user(user)
next_page = request.args.get('next')
return redirect(next_page) if next_page else redirect(url_for('main.dashboard'))
else:
flash('Ungültiger Benutzername oder Passwort')
return render_template('auth/login.html')
@auth_bp.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form['username']
email = request.form['email']
password = request.form['password']
# Prüfe ob Benutzer bereits existiert
if User.query.filter_by(username=username).first():
flash('Benutzername bereits vergeben')
return render_template('auth/register.html')
if User.query.filter_by(email=email).first():
flash('E-Mail bereits vergeben')
return render_template('auth/register.html')
# Erstelle neuen Benutzer
user = User(username=username, email=email)
user.set_password(password)
db.session.add(user)
db.session.commit()
flash('Registrierung erfolgreich')
return redirect(url_for('auth.login'))
return render_template('auth/register.html')
@auth_bp.route('/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('auth.login'))

182
app/routes/main.py Normal file
View File

@@ -0,0 +1,182 @@
from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify
from flask_login import login_required, current_user
from app.models import SavedQuery, DatabaseConnection
from app.services.database_service import DatabaseService
from app.services.database_manager import DatabaseManager
from app import db
import traceback
main_bp = Blueprint('main', __name__)
@main_bp.route('/')
@login_required
def dashboard():
# Lade gespeicherte Queries des Benutzers
saved_queries = SavedQuery.query.filter_by(user_id=current_user.id).all()
# Lade verfügbare Datenbankverbindungen
db_manager = DatabaseManager()
available_connections = db_manager.get_available_connections()
return render_template('dashboard.html',
saved_queries=saved_queries,
available_connections=available_connections)
@main_bp.route('/execute_query', methods=['POST'])
@login_required
def execute_query():
data = request.get_json()
query = data.get('query', '').strip()
connection = data.get('connection', 'oracle') # Standard: Oracle
if not query:
return jsonify({'error': 'Query ist leer'}), 400
try:
# Verwende ausgewählte Datenbankverbindung
db_manager = DatabaseManager()
db_service = db_manager.get_database_service(connection)
results = db_service.execute_query(query)
return jsonify({
'success': True,
'results': results,
'connection': connection
})
except Exception as e:
return jsonify({
'error': f'Fehler beim Ausführen der Query: {str(e)}',
'traceback': traceback.format_exc()
}), 500
@main_bp.route('/get_tables')
@login_required
def get_tables():
connection = request.args.get('connection', 'oracle')
try:
db_manager = DatabaseManager()
db_service = db_manager.get_database_service(connection)
tables = db_service.get_tables()
return jsonify({
'success': True,
'tables': tables,
'connection': connection
})
except Exception as e:
return jsonify({
'error': f'Fehler beim Laden der Tabellen: {str(e)}'
}), 500
@main_bp.route('/get_table_schema/<table_name>')
@login_required
def get_table_schema(table_name):
connection = request.args.get('connection', 'oracle')
try:
db_manager = DatabaseManager()
db_service = db_manager.get_database_service(connection)
schema = db_service.get_table_schema(table_name)
return jsonify({
'success': True,
'schema': schema,
'connection': connection
})
except Exception as e:
return jsonify({
'error': f'Fehler beim Laden des Tabellenschemas: {str(e)}'
}), 500
@main_bp.route('/save_query', methods=['POST'])
@login_required
def save_query():
data = request.get_json()
name = data.get('name', '').strip()
description = data.get('description', '').strip()
query = data.get('query', '').strip()
if not name or not query:
return jsonify({'error': 'Name und Query sind erforderlich'}), 400
# Prüfe ob Query-Name bereits existiert
existing = SavedQuery.query.filter_by(name=name, user_id=current_user.id).first()
if existing:
return jsonify({'error': 'Query-Name bereits vergeben'}), 400
try:
saved_query = SavedQuery(
name=name,
description=description,
query_text=query,
user_id=current_user.id
)
db.session.add(saved_query)
db.session.commit()
return jsonify({
'success': True,
'message': 'Query erfolgreich gespeichert',
'query': saved_query.to_dict()
})
except Exception as e:
db.session.rollback()
return jsonify({'error': f'Fehler beim Speichern: {str(e)}'}), 500
@main_bp.route('/delete_query/<int:query_id>', methods=['DELETE'])
@login_required
def delete_query(query_id):
try:
query = SavedQuery.query.filter_by(id=query_id, user_id=current_user.id).first()
if not query:
return jsonify({'error': 'Query nicht gefunden'}), 404
db.session.delete(query)
db.session.commit()
return jsonify({
'success': True,
'message': 'Query erfolgreich gelöscht'
})
except Exception as e:
db.session.rollback()
return jsonify({'error': f'Fehler beim Löschen: {str(e)}'}), 500
@main_bp.route('/test_connection/<connection_name>')
@login_required
def test_connection(connection_name):
try:
db_manager = DatabaseManager()
result = db_manager.test_connection(connection_name)
return jsonify(result)
except Exception as e:
return jsonify({
'success': False,
'message': f'Fehler beim Testen der Verbindung: {str(e)}'
}), 500
@main_bp.route('/get_connections')
@login_required
def get_connections():
try:
db_manager = DatabaseManager()
connections = db_manager.get_available_connections()
return jsonify({
'success': True,
'connections': connections
})
except Exception as e:
return jsonify({
'error': f'Fehler beim Laden der Verbindungen: {str(e)}'
}), 500