Compare commits

...

2 Commits

Author SHA1 Message Date
404b55207f Merge branch 'main' of git.ourworld.tf:freeflowuniverse/heroweb 2024-09-05 06:56:56 +02:00
01d0fb35b9 heroweb 2024-09-05 06:56:55 +02:00
11 changed files with 357 additions and 2 deletions

25
authserver/group_test.py Normal file
View File

@ -0,0 +1,25 @@
from utils.model import FileSystemDatabase,User, Group, ACE, ACL, Info, RightEnum
db = FileSystemDatabase("/tmp/userdb")
# Create a user and save it
user = User(email="user@example.com", description="Test user", profile="Profile data")
db.save_user(user)
# Load a user
loaded_user = db.load_user("user@example.com")
print(loaded_user)
# Create a group and save it
group = Group(name="Admins", members=[user])
db.save_group(group)
# Create an ACL and Info object and save it
ace = ACE(entity="Admins", right=RightEnum.admin)
acl = ACL(name="MainACL", entries=[ace])
info = Info(name="File1", path="/path/to/file", acl=acl, description="Important file", expiration="2025-01-01")
db.save_info(info)
# Load an info object
loaded_info = db.load_info("File1")
print(loaded_info)

103
authserver/main.py Normal file
View File

@ -0,0 +1,103 @@
from fastapi import FastAPI, Form, Request, Depends, HTTPException
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi_mail import FastMail, MessageSchema, ConnectionConfig
from fastapi.templating import Jinja2Templates
import jwt
from jwt.exceptions import PyJWTError
from jwt import ExpiredSignatureError, InvalidTokenError
from datetime import datetime, timedelta
from pydantic import BaseModel, EmailStr
from starlette.middleware.sessions import SessionMiddleware
from starlette.responses import Response
from utils.auth import create_access_token, verify_access_token
# Initialize FastAPI app
app = FastAPI()
# Add session middleware for cookie management
app.add_middleware(SessionMiddleware, secret_key="your_secret_key")
# Email configuration
conf = ConnectionConfig(
MAIL_USERNAME="kristof@incubaid.com",
MAIL_PASSWORD="xsmtpsib-e13e99275ddbb31b16d295a5631d85bbb9299f741d1c8b633b852bf0e4c19972-hc0kgRmB8JDYGNQV",
MAIL_FROM="info@threefold.me",
MAIL_PORT=587,
MAIL_SERVER="smtp-relay.brevo.com",
MAIL_STARTTLS=True, # This replaces MAIL_TLS
MAIL_SSL_TLS=False, # This replaces MAIL_SSL
USE_CREDENTIALS=True
)
# Jinja2 templates for rendering HTML
templates = Jinja2Templates(directory="templates")
SECRET_KEY = "your_jwt_secret_key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# Step 1: Render login page to accept email
@app.get("/", response_class=HTMLResponse)
async def login_form(request: Request):
return RedirectResponse(url="/signup")
@app.get("/signup", response_class=HTMLResponse)
async def login_form(request: Request):
return templates.TemplateResponse("login.html", {"request": request})
# Step 2: Handle form submission, generate token, send login email
@app.post("/loginsubmit")
async def send_login_email(request: Request, email: str = Form(...)):
if not email:
# Redirect to signup page if email is not provided
return RedirectResponse(url="/signup")
# Generate the access token and email link
access_token = create_access_token({"sub": email})
email_link = f"http://verse.tf:8000/register?token={access_token}"
# Render the email template with the email link
rendered_template = templates.get_template("email.html").render({"email_link": email_link})
# Create the email message
message = MessageSchema(
subject="Login to your account",
recipients=[email], # List of recipient emails
body=rendered_template, # The rendered HTML content
subtype="html" # Specify the subtype as HTML
)
# Send the email
fm = FastMail(conf)
await fm.send_message(message)
return {"message": "Login link has been sent to your email."}
# Step 3: Handle email link redirection and set JWT in cookies
@app.get("/register")
async def register(request: Request, token: str):
try:
email = verify_access_token(token)
except PyJWTError:
raise HTTPException(status_code=401, detail="Invalid token")
response = RedirectResponse(url="/info")
response.set_cookie(key="access_token", value=token)
return response
# Step 4: User info page, read JWT from cookies
@app.get("/info", response_class=HTMLResponse)
async def get_user_info(request: Request):
token = request.cookies.get("access_token")
if not token:
#raise HTTPException(status_code=401, detail="Not authenticated")
return RedirectResponse(url="/signup")
try:
email = verify_access_token(token)
except PyJWTError:
#raise HTTPException(status_code=401, detail="Invalid token")
return RedirectResponse(url="/signup")
return templates.TemplateResponse("info.html", {"request": request, "email": email})

2
authserver/run.sh Normal file
View File

@ -0,0 +1,2 @@
uvicorn main:app --host 0.0.0.0 --port 8000 --reload

View File

@ -0,0 +1,10 @@
<!DOCTYPE html>
<html>
<head>
<title>Login Email</title>
</head>
<body>
<p>Click the link below to login:</p>
<a href="{{ email_link }}">Login</a>
</body>
</html>

View File

@ -0,0 +1,10 @@
<!DOCTYPE html>
<html>
<head>
<title>User Info</title>
</head>
<body>
<h2>Welcome!</h2>
<p>Your email is: {{ email }}</p>
</body>
</html>

View File

@ -0,0 +1,15 @@
<!DOCTYPE html>
<html>
<head>
<title>Login</title>
</head>
<body>
<h2>Login with Email</h2>
<form method="post" action="/loginsubmit">
<label for="email">Email:</label>
<input type="email" name="email" required>
<button type="submit">Login</button>
</form>
</body>
</html>

23
authserver/utils/auth.py Normal file
View File

@ -0,0 +1,23 @@
import jwt
from jwt.exceptions import ExpiredSignatureError, InvalidTokenError
from datetime import datetime, timedelta
SECRET_KEY = "your_jwt_secret_key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def verify_access_token(token: str):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
email: str = payload.get("sub")
if email is None:
raise InvalidTokenError
return email
except (ExpiredSignatureError, InvalidTokenError):
raise InvalidTokenError

154
authserver/utils/model.py Normal file
View File

@ -0,0 +1,154 @@
from pydantic import BaseModel, EmailStr
from typing import List, Union
from enum import Enum
import os
import toml
from pathlib import Path
from typing import Optional
# Enum for the ACE rights
class RightEnum(str, Enum):
read = "read"
write = "write"
admin = "admin"
# Access Control Entry (ACE)
class ACE(BaseModel):
group: Optional[str] = None # Group name, optional
user: Optional[str] = None # User email, optional
right: RightEnum # The access right (read, write, admin)
# Access Control List (ACL)
class ACL(BaseModel):
name: str # Name of the ACL
entries: List[ACE] # List of ACEs (Access Control Entries)
# User model
class User(BaseModel):
email: EmailStr # User's email (used as a key)
description: str # User's description
profile: str # User's profile
# Group model
class Group(BaseModel):
name: str # Group name
members: List[Union['User', 'Group']] # Group members can be users or other groups
# Info model
class Info(BaseModel):
name: str # Name of the information
path: str # Path to the resource
acl: ACL # Associated ACL
description: str # Description of the information
expiration: str # Expiration date (as string)
# Update forward references to allow self-referencing models
User.update_forward_refs()
Group.update_forward_refs()
ACE.update_forward_refs()
class FileSystemDatabase:
def __init__(self, base_dir: str):
self.base_dir = Path(base_dir)
self.users_dir = self.base_dir / "users"
self.groups_dir = self.base_dir / "groups"
self.info_dir = self.base_dir / "info"
# Create directories if they don't exist
self.users_dir.mkdir(parents=True, exist_ok=True)
self.groups_dir.mkdir(parents=True, exist_ok=True)
self.info_dir.mkdir(parents=True, exist_ok=True)
# --- USER METHODS ---
def save_user(self, user: User):
user_path = self.users_dir / f"{user.email}.toml"
with open(user_path, 'w') as f:
toml.dump(user.dict(), f)
def load_user(self, email: str) -> User:
user_path = self.users_dir / f"{email}.toml"
if not user_path.exists():
raise FileNotFoundError(f"User with email {email} not found.")
with open(user_path, 'r') as f:
user_data = toml.load(f)
return User(**user_data)
def list_users(self) -> List[str]:
return [f.stem for f in self.users_dir.glob("*.toml")]
# --- GROUP METHODS ---
def save_group(self, group: Group):
group_path = self.groups_dir / f"{group.name}.toml"
with open(group_path, 'w') as f:
toml.dump(group.dict(), f)
def load_group(self, name: str) -> Group:
group_path = self.groups_dir / f"{name}.toml"
if not group_path.exists():
raise FileNotFoundError(f"Group with name {name} not found.")
with open(group_path, 'r') as f:
group_data = toml.load(f)
return Group(**group_data)
def list_groups(self) -> List[str]:
return [f.stem for f in self.groups_dir.glob("*.toml")]
# --- INFO METHODS ---
def save_info(self, info: Info):
info_path = self.info_dir / f"{info.name}.toml"
# Save ACL as part of the same TOML file
info_data = info.dict()
info_data['acl'] = info.acl.dict() # Store ACL as nested dictionary
with open(info_path, 'w') as f:
toml.dump(info_data, f)
def load_info(self, name: str) -> Info:
info_path = self.info_dir / f"{name}.toml"
if not info_path.exists():
raise FileNotFoundError(f"Info with name {name} not found.")
with open(info_path, 'r') as f:
info_data = toml.load(f)
return Info(**info_data)
def list_infos(self) -> List[str]:
return [f.stem for f in self.info_dir.glob("*.toml")]
# --- ACL METHODS ---
# ACLs are stored within Info objects, so we don't need separate files for ACLs.
def save_acl(self, acl: ACL, info_name: str):
info_path = self.info_dir / f"{info_name}.toml"
if not info_path.exists():
raise FileNotFoundError(f"Info with name {info_name} not found.")
# Load the info object
with open(info_path, 'r') as f:
info_data = toml.load(f)
# Update the ACL
info_data['acl'] = acl.dict()
# Save the updated info with the new ACL
with open(info_path, 'w') as f:
toml.dump(info_data, f)
def load_acl(self, info_name: str) -> ACL:
info_path = self.info_dir / f"{info_name}.toml"
if not info_path.exists():
raise FileNotFoundError(f"Info with name {info_name} not found.")
with open(info_path, 'r') as f:
info_data = toml.load(f)
acl_data = info_data.get('acl')
if not acl_data:
raise ValueError(f"ACL not found for info {info_name}.")
return ACL(**acl_data)

View File

@ -1,5 +1,6 @@
#!/bin/bash
export BASE_DIR="$(cd "$(dirname "$0")" && pwd)"
#!/bin/bash
SCRIPT_PATH="${BASH_SOURCE[0]:-$0}"
export BASE_DIR="$(cd "$(dirname "$SCRIPT_PATH")" && pwd)"
#export VENV_DIR="${HOME}/.venv"
export VENV_DIR="${BASE_DIR}/.venv"

View File

@ -3,6 +3,7 @@ colorama
colorlog
email-validator
fastapi
fastapi_mail
fuzzywuzzy
GitPython
IPython
@ -24,3 +25,5 @@ termcolor
toml
urllib3
uvicorn
itsdangerous
python-multipart

9
start_authserver.sh Executable file
View File

@ -0,0 +1,9 @@
#!/bin/bash
set -ex
BASE_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"
source ${BASE_DIR}/myenv.sh
cd $BASE_DIR/authserver
uvicorn main:app --host 0.0.0.0 --port 8000 --reload