700 lines
20 KiB
Python
700 lines
20 KiB
Python
import argparse
|
|
import sys
|
|
import os
|
|
import re
|
|
import random
|
|
|
|
from flask import Flask, render_template, request, redirect, url_for, flash, session, abort
|
|
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
|
|
from werkzeug.security import generate_password_hash, check_password_hash
|
|
|
|
from Modules.windows import get_windows_content
|
|
from Modules.rule_creation import get_rule_creation_content
|
|
from Modules.linux import get_linux_content
|
|
from Modules.tips import get_random_tip_or_joke
|
|
from Modules.methodology import get_methodology_content
|
|
from Modules.investigate import get_investigate_content
|
|
|
|
from Modules.Investigate.threat import *
|
|
from Modules.Investigate.ip import *
|
|
from Modules.Investigate.domain import *
|
|
from Modules.Investigate.filehash import *
|
|
from Modules.Investigate.malware import *
|
|
|
|
from static.ascii_text_prompts import full_ascii_art_stripped, infinitei_stripped
|
|
from Config.config import VERSION
|
|
|
|
# Initialize Flask app
|
|
app = Flask(__name__)
|
|
app.secret_key = os.urandom(24)
|
|
|
|
# Initialize login manager for session handling
|
|
login_manager = LoginManager()
|
|
login_manager.init_app(app)
|
|
login_manager.login_view = 'login' # Redirect to login if user is not authenticated
|
|
|
|
# SQLAlchemy configuration (Replace this with your database URI)
|
|
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///users.db' # Example for SQLite
|
|
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # Disable modification tracking
|
|
|
|
# Initialize the db object
|
|
db = SQLAlchemy(app)
|
|
|
|
# In-memory database simulation
|
|
users_db = {}
|
|
|
|
# User class for SQLAlchemy (you can replace your in-memory dictionary)
|
|
class User(db.Model, UserMixin):
|
|
id = db.Column(db.String(80), primary_key=True)
|
|
username = db.Column(db.String(80), unique=True, nullable=False)
|
|
email = db.Column(db.String(120), unique=True, nullable=False)
|
|
password_hash = db.Column(db.String(120), nullable=False)
|
|
role = db.Column(db.String(50), nullable=False, default='Unknown')
|
|
theme = db.Column(db.String(50), nullable=False, default='dark')
|
|
team = db.Column(db.String(50), nullable=True, default='Unknown')
|
|
manager = db.Column(db.String(50), nullable=True, default='Unknown')
|
|
|
|
def __repr__(self):
|
|
return f"<User {self.username}>"
|
|
|
|
def check_password(self, password):
|
|
return check_password_hash(self.password_hash, password)
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id):
|
|
# Ensure that user_id corresponds to the username or unique user identifier
|
|
user = users_db.get(user_id)
|
|
if user:
|
|
# Return a User instance, not a dictionary
|
|
return User(id=user_id,
|
|
username=user["username"],
|
|
email=user["email"],
|
|
password_hash=user["password_hash"],
|
|
role=user["role"],
|
|
theme=user.get("theme", "modern"),
|
|
team=user.get("team", "Unknown"),
|
|
manager=user.get("manager", "Unknown"))
|
|
return None # Return None if user not found
|
|
|
|
@app.route('/profile', methods=['GET', 'POST'])
|
|
@login_required
|
|
def profile():
|
|
if request.method == 'POST':
|
|
new_role = request.form.get('role')
|
|
app.logger.debug(f'New role selected: {new_role}') # Log the new role
|
|
|
|
role = request.form['role'] # Capture the role from the form
|
|
theme = request.form['theme']
|
|
team = request.form['team']
|
|
manager = request.form['manager']
|
|
new_password = request.form['password']
|
|
password_confirm = request.form['password_confirm']
|
|
|
|
# Validate the role
|
|
if not role:
|
|
flash('Please select a valid role.', 'error')
|
|
return redirect(url_for('profile'))
|
|
|
|
# Validate team and manager fields
|
|
if not team or not manager:
|
|
flash('Team and Manager fields are required.', 'error')
|
|
return redirect(url_for('profile'))
|
|
|
|
# Update the password if provided
|
|
if new_password:
|
|
if new_password != password_confirm:
|
|
flash('The passwords do not match.', 'error')
|
|
return redirect(url_for('profile'))
|
|
current_user.password_hash = generate_password_hash(new_password)
|
|
|
|
# Update the current_user fields (role, theme, team, manager)
|
|
current_user.role = role
|
|
current_user.theme = theme
|
|
current_user.team = team
|
|
current_user.manager = manager
|
|
|
|
# Commit changes to the database
|
|
db.session.commit()
|
|
flash('Profile updated successfully.')
|
|
|
|
# Update the theme in the session to reflect immediately
|
|
session['theme'] = current_user.theme
|
|
session['role'] = current_user.role # Save the role to the session
|
|
|
|
return render_template('profile.html',
|
|
username=current_user.username,
|
|
email=current_user.email,
|
|
role=current_user.role,
|
|
theme=session.get('theme', 'dark'),
|
|
team=current_user.team,
|
|
manager=current_user.manager)
|
|
|
|
# Get the current theme from the session or default to 'dark'
|
|
theme = session.get('theme', current_user.theme if current_user.is_authenticated else 'dark')
|
|
|
|
return render_template('profile.html',
|
|
username=current_user.username,
|
|
email=current_user.email,
|
|
role=current_user.role,
|
|
theme=theme,
|
|
team=current_user.team,
|
|
manager=current_user.manager)
|
|
|
|
@app.route('/set_theme', methods=['POST'])
|
|
def set_theme():
|
|
theme = request.json.get('theme', 'modern') # Default to 'modern'
|
|
if theme not in ['modern', 'dark', 'light']:
|
|
return {"error": "Invalid theme"}, 400
|
|
|
|
if current_user.is_authenticated:
|
|
current_user.theme = theme
|
|
db.session.commit()
|
|
session['theme'] = theme # Update session theme for guests
|
|
return {"message": "Theme updated successfully"}, 200
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# In-memory data store for the notebook page
|
|
notebook = {
|
|
"notes": [],
|
|
"ips": [],
|
|
"domains": [],
|
|
"services": [],
|
|
"tasks": []
|
|
}
|
|
|
|
@app.context_processor
|
|
def inject_theme():
|
|
# Default to 'modern' if no theme is set in the session or user
|
|
theme = session.get('theme', current_user.theme if current_user.is_authenticated else 'modern')
|
|
return dict(theme=theme)
|
|
|
|
@app.context_processor
|
|
def inject_tip():
|
|
"""
|
|
Add a random tip or joke to every page's context.
|
|
"""
|
|
return {"random_tip": get_random_tip_or_joke()}
|
|
|
|
README_PATH = os.path.join(os.getcwd(), "README.md")
|
|
|
|
|
|
def get_readme_description():
|
|
"""
|
|
Extract the description starting with "HUNT-AI" and stopping at the next blank line from the README.md file.
|
|
"""
|
|
if not os.path.exists(README_PATH):
|
|
return "No description available."
|
|
|
|
description = []
|
|
capture = False
|
|
|
|
try:
|
|
with open(README_PATH, "r", encoding="utf-8") as readme_file:
|
|
for line in readme_file:
|
|
# Start capturing from the line containing "HUNT-AI"
|
|
if "HUNT-AI" in line:
|
|
capture = True
|
|
description.append(line.strip())
|
|
# Continue capturing until a blank line is encountered
|
|
elif capture:
|
|
if line.strip() == "":
|
|
break
|
|
description.append(line.strip())
|
|
except Exception as e:
|
|
return f"Error reading README.md: {e}"
|
|
|
|
return " ".join(description) if description else "No description available."
|
|
|
|
# Home route
|
|
@app.route('/')
|
|
def home():
|
|
cover_images_path = os.path.join(app.static_folder, 'Pictures', 'Cover_Images')
|
|
cover_images = [os.path.join('Pictures', 'Cover_Images', filename) for filename in os.listdir(cover_images_path) if filename.lower().endswith(('png', 'jpg', 'jpeg', 'gif'))]
|
|
selected_image = random.choice(cover_images) if cover_images else None
|
|
readme_description = get_readme_description()
|
|
|
|
links = [
|
|
{"name": "Visit Start.me", "url": "https://start.me/p/qbzw4e/cyber-security"},
|
|
{"name": "Visit My Website", "url": "https://infinit3i.com/"}
|
|
]
|
|
return render_template(
|
|
'index.html',
|
|
full_ascii_art=full_ascii_art_stripped,
|
|
infinitei=infinitei_stripped,
|
|
links=links,
|
|
readme_description=readme_description,
|
|
selected_image=selected_image
|
|
)
|
|
|
|
@app.route('/methodology')
|
|
def methodology():
|
|
content = get_methodology_content()
|
|
return render_template('methodology.html', content=content)
|
|
|
|
@app.route('/methodology/<title>')
|
|
def methodology_section(title):
|
|
content = get_methodology_content()
|
|
section = next((s for s in content if s["title"].replace(" ", "_").lower() == title.lower()), None)
|
|
if not section:
|
|
abort(404)
|
|
return render_template('section.html', section=section)
|
|
|
|
@app.route('/linux')
|
|
def linux():
|
|
sections = get_linux_content()
|
|
return render_template('linux.html', sections=sections)
|
|
|
|
@app.route('/linux/<title>')
|
|
def linux_section(title):
|
|
sections = get_linux_content()
|
|
section = next((s for s in sections if s["title"].replace(" ", "_").lower() == title.lower()), None)
|
|
if not section:
|
|
abort(404)
|
|
return render_template('section.html', section=section)
|
|
|
|
@app.route('/rule_creation')
|
|
def rule_creation():
|
|
content = get_rule_creation_content()
|
|
return render_template('rule_creation.html', content=content)
|
|
|
|
@app.route('/windows')
|
|
def windows():
|
|
# Load all sections
|
|
sections = get_windows_content()
|
|
return render_template('windows.html', sections=sections)
|
|
|
|
@app.route('/windows/<title>')
|
|
def windows_section(title):
|
|
# Find the section matching the title
|
|
sections = get_windows_content()
|
|
section = next((s for s in sections if s["title"].replace(" ", "_").lower() == title.lower()), None)
|
|
|
|
if not section:
|
|
abort(404) # Return a 404 if the section is not found
|
|
|
|
return render_template('section.html', section=section)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route('/persistence')
|
|
def persistence_submenu():
|
|
"""
|
|
Displays the submenu for all persistence methods.
|
|
"""
|
|
menu = get_persistence_menu()
|
|
return render_template('persistence_submenu.html', menu=menu)
|
|
|
|
@app.route('/persistence/<method>')
|
|
def persistence_method(method):
|
|
"""
|
|
Dynamically load content for a specific persistence method.
|
|
"""
|
|
try:
|
|
# Dynamically import the requested method module
|
|
module = __import__(f"Modules.Persistence.{method}", fromlist=["get_content"])
|
|
content = module.get_content()
|
|
return render_template('persistence_method.html', content=content)
|
|
except ModuleNotFoundError:
|
|
abort(404, description=f"Persistence method '{method}' not found.")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.route('/investigate')
|
|
def investigate():
|
|
content = get_investigate_content()
|
|
return render_template('investigate.html', content=content)
|
|
|
|
@app.route('/investigate/threat')
|
|
def investigate_threat():
|
|
content = get_threat_content()
|
|
return render_template('investigate.html', content=content)
|
|
|
|
@app.route('/investigate/domain')
|
|
def investigate_domain():
|
|
content = get_domain_content()
|
|
return render_template('investigate.html', content=content)
|
|
|
|
@app.route('/investigate/filehash')
|
|
def investigate_filehash():
|
|
content = get_filehash_content()
|
|
return render_template('investigate.html', content=content)
|
|
|
|
@app.route('/investigate/ip')
|
|
def investigate_ip():
|
|
content = get_ip_content()
|
|
return render_template('investigate.html', content=content)
|
|
|
|
@app.route('/investigate/malware')
|
|
def investigate_malware():
|
|
content = get_malware_content()
|
|
return render_template('investigate.html', content=content)
|
|
|
|
def display_help():
|
|
"""
|
|
Display help information for the command-line interface.
|
|
"""
|
|
help_text = f"""
|
|
Usage:
|
|
python3 app.py Start the Flask application
|
|
python3 app.py -v Display the current version
|
|
python3 app.py -h Show this help page
|
|
python3 app.py -f <file> Load the last session file for collaboration
|
|
"""
|
|
print(help_text)
|
|
|
|
def get_version():
|
|
"""
|
|
Display the version of the application.
|
|
"""
|
|
print(f"App Version: {VERSION}")
|
|
|
|
def load_session(file_path):
|
|
"""
|
|
Load a session file and display its contents.
|
|
"""
|
|
try:
|
|
with open(file_path, "r") as file:
|
|
session_data = file.read()
|
|
print(f"Session loaded successfully from {file_path}:\n")
|
|
print(session_data)
|
|
except FileNotFoundError:
|
|
print(f"Error: File '{file_path}' not found.")
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
|
|
def handle_arguments():
|
|
"""
|
|
Parse and handle command-line arguments.
|
|
"""
|
|
parser = argparse.ArgumentParser(
|
|
description="App Runner - Threat AI Command Interface",
|
|
usage="python3 app.py [-v | -h | -f <file>]"
|
|
)
|
|
parser.add_argument("-v", "--version", action="store_true", help="Display the current version")
|
|
parser.add_argument("-f", "--file", type=str, help="Load the last session file for collaboration")
|
|
args = parser.parse_args()
|
|
|
|
if args.version:
|
|
get_version()
|
|
sys.exit(0)
|
|
elif args.file:
|
|
load_session(args.file)
|
|
sys.exit(0)
|
|
|
|
|
|
|
|
# Registration route
|
|
@app.route('/register', methods=['GET', 'POST'])
|
|
def register():
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
email = request.form['email']
|
|
password = request.form['password']
|
|
confirm_password = request.form['confirm_password']
|
|
|
|
# Password validation
|
|
password_pattern = re.compile(r'^(?=.*[A-Z])(?=.*\d)(?=.*[\W_]).{10,}$')
|
|
if not password_pattern.match(password):
|
|
flash("Password must be at least 10 characters, contain 1 uppercase letter, 1 special character, and 1 number.", "error")
|
|
return redirect(url_for('register'))
|
|
|
|
if password != confirm_password:
|
|
flash("Passwords do not match.", "error")
|
|
return redirect(url_for('register'))
|
|
|
|
# Hash the password
|
|
password_hash = generate_password_hash(password)
|
|
|
|
# Create user and store in the in-memory database
|
|
user = {
|
|
'username': username,
|
|
'email': email,
|
|
'password_hash': password_hash,
|
|
'role': 'Lead Analyst', # Default role
|
|
'theme': 'dark', # Default theme
|
|
'team': 'Unknown', # Default team
|
|
'manager': 'Unknown' # Default manager
|
|
}
|
|
|
|
users_db[username] = user
|
|
|
|
# Log the user in after registration
|
|
user_obj = User(
|
|
id=username,
|
|
username=username,
|
|
email=email,
|
|
password_hash=password_hash,
|
|
role=user['role'],
|
|
theme=user['theme'],
|
|
team=user['team'],
|
|
manager=user['manager']
|
|
)
|
|
login_user(user_obj)
|
|
|
|
flash("Account created and logged in successfully!", "success")
|
|
return redirect(url_for('home')) # Redirect to home or another page after successful login
|
|
|
|
return render_template('register.html')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# User Login Route (optional if you want a login page)
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
|
|
# Get the user dictionary from users_db
|
|
user = users_db.get(username)
|
|
|
|
if user and check_password_hash(user["password_hash"], password):
|
|
# Create a User instance, not just a dictionary
|
|
user_obj = User(id=username, # Assign user ID (username)
|
|
username=user["username"],
|
|
email=user["email"],
|
|
password_hash=user["password_hash"],
|
|
role=user["role"],
|
|
theme=user.get("theme", "light"), # Default to 'light' if not found
|
|
team=user.get("team", "Unknown"), # Default to 'Unknown' if not found
|
|
manager=user.get("manager", "Unknown")) # Default to 'Unknown' if not found
|
|
|
|
# Log the user in
|
|
login_user(user_obj)
|
|
return redirect(url_for('notebook_page')) # Redirect to notebook page after login
|
|
else:
|
|
flash('Invalid username or password.')
|
|
return redirect(url_for('login'))
|
|
|
|
return render_template('login.html')
|
|
|
|
|
|
|
|
# Notebook route (Protected)
|
|
@app.route('/notebook', methods=['GET', 'POST'])
|
|
@login_required
|
|
def notebook_page():
|
|
if request.method == 'POST':
|
|
category = request.form.get('category')
|
|
entry = request.form.get('entry', '').strip() # Strip whitespace from the entry
|
|
|
|
if not category or not entry:
|
|
flash("Category and entry cannot be empty.", "error")
|
|
return redirect(url_for('notebook_page'))
|
|
|
|
if category not in notebook:
|
|
flash("Invalid category selected.", "error")
|
|
return redirect(url_for('notebook_page'))
|
|
|
|
# Input validation
|
|
if category == "ips":
|
|
# Ensure valid IP address
|
|
ip_pattern = re.compile(
|
|
r'^((25[0-5]|2[0-4][0-9]|[0-1]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[0-1]?[0-9][0-9]?)$'
|
|
)
|
|
if not ip_pattern.match(entry):
|
|
flash("Invalid IP address format.", "error")
|
|
return redirect(url_for('notebook_page'))
|
|
elif category == "notes":
|
|
# Ensure notes are over 1 character
|
|
if len(entry) < 2:
|
|
flash("Notes must be at least 2 characters long.", "error")
|
|
return redirect(url_for('notebook_page'))
|
|
|
|
# Add valid entry to the notebook
|
|
notebook[category].append(entry)
|
|
flash(f"Entry added to {category.capitalize()}!", "success")
|
|
return redirect(url_for('notebook_page'))
|
|
|
|
return render_template('notebook.html', notebook=notebook)
|
|
|
|
|
|
# Delete an entry from notebook
|
|
@app.route('/delete/<category>/<int:index>')
|
|
@login_required
|
|
def delete_entry(category, index):
|
|
if category in notebook and 0 <= index < len(notebook[category]):
|
|
deleted_entry = notebook[category].pop(index)
|
|
flash(f"Deleted entry: {deleted_entry} from {category.capitalize()}.", "info")
|
|
else:
|
|
flash("Invalid entry or category.", "error")
|
|
return redirect(url_for('notebook_page'))
|
|
|
|
# Logout Route
|
|
@app.route('/logout')
|
|
@login_required
|
|
def logout():
|
|
logout_user()
|
|
flash('You have been logged out.')
|
|
return redirect(url_for('login'))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def aggregate_content():
|
|
"""
|
|
Aggregates content from all module files under the Modules directory.
|
|
Returns a list of dictionaries with 'title' and 'content'.
|
|
"""
|
|
content_list = []
|
|
modules_dir = "Modules"
|
|
|
|
for root, _, files in os.walk(modules_dir):
|
|
for file in files:
|
|
if file.endswith(".py") and not file.startswith("__"):
|
|
module_path = os.path.join(root, file).replace("/", ".").replace("\\", ".")[:-3]
|
|
try:
|
|
module = importlib.import_module(module_path)
|
|
if hasattr(module, "get_content"):
|
|
content_list.extend(module.get_content())
|
|
except Exception as e:
|
|
print(f"Error importing {module_path}: {e}")
|
|
|
|
return content_list
|
|
|
|
|
|
|
|
def perform_search(query):
|
|
"""
|
|
Searches the aggregated content for matching titles or content.
|
|
"""
|
|
all_content = aggregate_content()
|
|
query = query.lower()
|
|
results = [
|
|
{
|
|
"title": item["title"],
|
|
"snippet": item["content"][:150] + "...", # Return the first 150 characters as a snippet
|
|
"module": item.get("module", "Unknown") # Optional: Include module info
|
|
}
|
|
for item in all_content
|
|
if query in item["title"].lower() or query in item["content"].lower()
|
|
]
|
|
return results
|
|
|
|
@app.route('/search', methods=['GET'])
|
|
def search():
|
|
query = request.args.get('query', '').strip()
|
|
if not query:
|
|
flash("Search query cannot be empty.", "error")
|
|
return redirect(url_for('home'))
|
|
|
|
# Perform search
|
|
results = perform_search(query)
|
|
|
|
# Render results
|
|
return render_template('search_results.html', query=query, results=results)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# If arguments are passed, handle them; otherwise, run the Flask app.
|
|
if len(sys.argv) > 1:
|
|
handle_arguments()
|
|
else:
|
|
app.run(debug=True, port=31337) |