Compare commits
2 Commits
5e4a6adb32
...
404b55207f
Author | SHA1 | Date | |
---|---|---|---|
404b55207f | |||
01d0fb35b9 |
25
authserver/group_test.py
Normal file
25
authserver/group_test.py
Normal file
@ -0,0 +1,25 @@
|
||||
from utils.model import FileSystemDatabase,User, Group, ACE, ACL, Info, RightEnum
|
||||
|
||||
db = FileSystemDatabase("/tmp/userdb")
|
||||
|
||||
# Create a user and save it
|
||||
user = User(email="user@example.com", description="Test user", profile="Profile data")
|
||||
db.save_user(user)
|
||||
|
||||
# Load a user
|
||||
loaded_user = db.load_user("user@example.com")
|
||||
print(loaded_user)
|
||||
|
||||
# Create a group and save it
|
||||
group = Group(name="Admins", members=[user])
|
||||
db.save_group(group)
|
||||
|
||||
# Create an ACL and Info object and save it
|
||||
ace = ACE(entity="Admins", right=RightEnum.admin)
|
||||
acl = ACL(name="MainACL", entries=[ace])
|
||||
info = Info(name="File1", path="/path/to/file", acl=acl, description="Important file", expiration="2025-01-01")
|
||||
db.save_info(info)
|
||||
|
||||
# Load an info object
|
||||
loaded_info = db.load_info("File1")
|
||||
print(loaded_info)
|
103
authserver/main.py
Normal file
103
authserver/main.py
Normal file
@ -0,0 +1,103 @@
|
||||
from fastapi import FastAPI, Form, Request, Depends, HTTPException
|
||||
from fastapi.responses import HTMLResponse, RedirectResponse
|
||||
from fastapi_mail import FastMail, MessageSchema, ConnectionConfig
|
||||
from fastapi.templating import Jinja2Templates
|
||||
import jwt
|
||||
from jwt.exceptions import PyJWTError
|
||||
from jwt import ExpiredSignatureError, InvalidTokenError
|
||||
from datetime import datetime, timedelta
|
||||
from pydantic import BaseModel, EmailStr
|
||||
from starlette.middleware.sessions import SessionMiddleware
|
||||
from starlette.responses import Response
|
||||
from utils.auth import create_access_token, verify_access_token
|
||||
|
||||
# Initialize FastAPI app
|
||||
app = FastAPI()
|
||||
|
||||
# Add session middleware for cookie management
|
||||
app.add_middleware(SessionMiddleware, secret_key="your_secret_key")
|
||||
|
||||
# Email configuration
|
||||
conf = ConnectionConfig(
|
||||
MAIL_USERNAME="kristof@incubaid.com",
|
||||
MAIL_PASSWORD="xsmtpsib-e13e99275ddbb31b16d295a5631d85bbb9299f741d1c8b633b852bf0e4c19972-hc0kgRmB8JDYGNQV",
|
||||
MAIL_FROM="info@threefold.me",
|
||||
MAIL_PORT=587,
|
||||
MAIL_SERVER="smtp-relay.brevo.com",
|
||||
MAIL_STARTTLS=True, # This replaces MAIL_TLS
|
||||
MAIL_SSL_TLS=False, # This replaces MAIL_SSL
|
||||
USE_CREDENTIALS=True
|
||||
)
|
||||
|
||||
# Jinja2 templates for rendering HTML
|
||||
templates = Jinja2Templates(directory="templates")
|
||||
|
||||
SECRET_KEY = "your_jwt_secret_key"
|
||||
ALGORITHM = "HS256"
|
||||
ACCESS_TOKEN_EXPIRE_MINUTES = 30
|
||||
|
||||
# Step 1: Render login page to accept email
|
||||
@app.get("/", response_class=HTMLResponse)
|
||||
async def login_form(request: Request):
|
||||
return RedirectResponse(url="/signup")
|
||||
|
||||
@app.get("/signup", response_class=HTMLResponse)
|
||||
async def login_form(request: Request):
|
||||
return templates.TemplateResponse("login.html", {"request": request})
|
||||
|
||||
|
||||
# Step 2: Handle form submission, generate token, send login email
|
||||
@app.post("/loginsubmit")
|
||||
async def send_login_email(request: Request, email: str = Form(...)):
|
||||
|
||||
if not email:
|
||||
# Redirect to signup page if email is not provided
|
||||
return RedirectResponse(url="/signup")
|
||||
# Generate the access token and email link
|
||||
access_token = create_access_token({"sub": email})
|
||||
email_link = f"http://verse.tf:8000/register?token={access_token}"
|
||||
|
||||
# Render the email template with the email link
|
||||
rendered_template = templates.get_template("email.html").render({"email_link": email_link})
|
||||
|
||||
# Create the email message
|
||||
message = MessageSchema(
|
||||
subject="Login to your account",
|
||||
recipients=[email], # List of recipient emails
|
||||
body=rendered_template, # The rendered HTML content
|
||||
subtype="html" # Specify the subtype as HTML
|
||||
)
|
||||
|
||||
# Send the email
|
||||
fm = FastMail(conf)
|
||||
await fm.send_message(message)
|
||||
|
||||
return {"message": "Login link has been sent to your email."}
|
||||
|
||||
# Step 3: Handle email link redirection and set JWT in cookies
|
||||
@app.get("/register")
|
||||
async def register(request: Request, token: str):
|
||||
try:
|
||||
email = verify_access_token(token)
|
||||
except PyJWTError:
|
||||
raise HTTPException(status_code=401, detail="Invalid token")
|
||||
|
||||
response = RedirectResponse(url="/info")
|
||||
response.set_cookie(key="access_token", value=token)
|
||||
return response
|
||||
|
||||
# Step 4: User info page, read JWT from cookies
|
||||
@app.get("/info", response_class=HTMLResponse)
|
||||
async def get_user_info(request: Request):
|
||||
token = request.cookies.get("access_token")
|
||||
if not token:
|
||||
#raise HTTPException(status_code=401, detail="Not authenticated")
|
||||
return RedirectResponse(url="/signup")
|
||||
|
||||
try:
|
||||
email = verify_access_token(token)
|
||||
except PyJWTError:
|
||||
#raise HTTPException(status_code=401, detail="Invalid token")
|
||||
return RedirectResponse(url="/signup")
|
||||
|
||||
return templates.TemplateResponse("info.html", {"request": request, "email": email})
|
2
authserver/run.sh
Normal file
2
authserver/run.sh
Normal file
@ -0,0 +1,2 @@
|
||||
|
||||
uvicorn main:app --host 0.0.0.0 --port 8000 --reload
|
10
authserver/templates/email.html
Normal file
10
authserver/templates/email.html
Normal file
@ -0,0 +1,10 @@
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<title>Login Email</title>
|
||||
</head>
|
||||
<body>
|
||||
<p>Click the link below to login:</p>
|
||||
<a href="{{ email_link }}">Login</a>
|
||||
</body>
|
||||
</html>
|
10
authserver/templates/info.html
Normal file
10
authserver/templates/info.html
Normal file
@ -0,0 +1,10 @@
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<title>User Info</title>
|
||||
</head>
|
||||
<body>
|
||||
<h2>Welcome!</h2>
|
||||
<p>Your email is: {{ email }}</p>
|
||||
</body>
|
||||
</html>
|
15
authserver/templates/login.html
Normal file
15
authserver/templates/login.html
Normal file
@ -0,0 +1,15 @@
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<title>Login</title>
|
||||
</head>
|
||||
<body>
|
||||
<h2>Login with Email</h2>
|
||||
<form method="post" action="/loginsubmit">
|
||||
<label for="email">Email:</label>
|
||||
<input type="email" name="email" required>
|
||||
<button type="submit">Login</button>
|
||||
</form>
|
||||
</body>
|
||||
</html>
|
||||
|
23
authserver/utils/auth.py
Normal file
23
authserver/utils/auth.py
Normal file
@ -0,0 +1,23 @@
|
||||
import jwt
|
||||
from jwt.exceptions import ExpiredSignatureError, InvalidTokenError
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
SECRET_KEY = "your_jwt_secret_key"
|
||||
ALGORITHM = "HS256"
|
||||
ACCESS_TOKEN_EXPIRE_MINUTES = 30
|
||||
|
||||
def create_access_token(data: dict):
|
||||
to_encode = data.copy()
|
||||
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
||||
to_encode.update({"exp": expire})
|
||||
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
||||
|
||||
def verify_access_token(token: str):
|
||||
try:
|
||||
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
||||
email: str = payload.get("sub")
|
||||
if email is None:
|
||||
raise InvalidTokenError
|
||||
return email
|
||||
except (ExpiredSignatureError, InvalidTokenError):
|
||||
raise InvalidTokenError
|
154
authserver/utils/model.py
Normal file
154
authserver/utils/model.py
Normal file
@ -0,0 +1,154 @@
|
||||
from pydantic import BaseModel, EmailStr
|
||||
from typing import List, Union
|
||||
from enum import Enum
|
||||
import os
|
||||
import toml
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
# Enum for the ACE rights
|
||||
class RightEnum(str, Enum):
|
||||
read = "read"
|
||||
write = "write"
|
||||
admin = "admin"
|
||||
|
||||
|
||||
# Access Control Entry (ACE)
|
||||
class ACE(BaseModel):
|
||||
group: Optional[str] = None # Group name, optional
|
||||
user: Optional[str] = None # User email, optional
|
||||
right: RightEnum # The access right (read, write, admin)
|
||||
|
||||
|
||||
# Access Control List (ACL)
|
||||
class ACL(BaseModel):
|
||||
name: str # Name of the ACL
|
||||
entries: List[ACE] # List of ACEs (Access Control Entries)
|
||||
|
||||
|
||||
# User model
|
||||
class User(BaseModel):
|
||||
email: EmailStr # User's email (used as a key)
|
||||
description: str # User's description
|
||||
profile: str # User's profile
|
||||
|
||||
|
||||
# Group model
|
||||
class Group(BaseModel):
|
||||
name: str # Group name
|
||||
members: List[Union['User', 'Group']] # Group members can be users or other groups
|
||||
|
||||
|
||||
# Info model
|
||||
class Info(BaseModel):
|
||||
name: str # Name of the information
|
||||
path: str # Path to the resource
|
||||
acl: ACL # Associated ACL
|
||||
description: str # Description of the information
|
||||
expiration: str # Expiration date (as string)
|
||||
|
||||
|
||||
# Update forward references to allow self-referencing models
|
||||
User.update_forward_refs()
|
||||
Group.update_forward_refs()
|
||||
ACE.update_forward_refs()
|
||||
|
||||
class FileSystemDatabase:
|
||||
def __init__(self, base_dir: str):
|
||||
self.base_dir = Path(base_dir)
|
||||
self.users_dir = self.base_dir / "users"
|
||||
self.groups_dir = self.base_dir / "groups"
|
||||
self.info_dir = self.base_dir / "info"
|
||||
|
||||
# Create directories if they don't exist
|
||||
self.users_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.groups_dir.mkdir(parents=True, exist_ok=True)
|
||||
self.info_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# --- USER METHODS ---
|
||||
def save_user(self, user: User):
|
||||
user_path = self.users_dir / f"{user.email}.toml"
|
||||
with open(user_path, 'w') as f:
|
||||
toml.dump(user.dict(), f)
|
||||
|
||||
def load_user(self, email: str) -> User:
|
||||
user_path = self.users_dir / f"{email}.toml"
|
||||
if not user_path.exists():
|
||||
raise FileNotFoundError(f"User with email {email} not found.")
|
||||
with open(user_path, 'r') as f:
|
||||
user_data = toml.load(f)
|
||||
return User(**user_data)
|
||||
|
||||
def list_users(self) -> List[str]:
|
||||
return [f.stem for f in self.users_dir.glob("*.toml")]
|
||||
|
||||
# --- GROUP METHODS ---
|
||||
def save_group(self, group: Group):
|
||||
group_path = self.groups_dir / f"{group.name}.toml"
|
||||
with open(group_path, 'w') as f:
|
||||
toml.dump(group.dict(), f)
|
||||
|
||||
def load_group(self, name: str) -> Group:
|
||||
group_path = self.groups_dir / f"{name}.toml"
|
||||
if not group_path.exists():
|
||||
raise FileNotFoundError(f"Group with name {name} not found.")
|
||||
with open(group_path, 'r') as f:
|
||||
group_data = toml.load(f)
|
||||
return Group(**group_data)
|
||||
|
||||
def list_groups(self) -> List[str]:
|
||||
return [f.stem for f in self.groups_dir.glob("*.toml")]
|
||||
|
||||
# --- INFO METHODS ---
|
||||
def save_info(self, info: Info):
|
||||
info_path = self.info_dir / f"{info.name}.toml"
|
||||
# Save ACL as part of the same TOML file
|
||||
info_data = info.dict()
|
||||
info_data['acl'] = info.acl.dict() # Store ACL as nested dictionary
|
||||
with open(info_path, 'w') as f:
|
||||
toml.dump(info_data, f)
|
||||
|
||||
def load_info(self, name: str) -> Info:
|
||||
info_path = self.info_dir / f"{name}.toml"
|
||||
if not info_path.exists():
|
||||
raise FileNotFoundError(f"Info with name {name} not found.")
|
||||
with open(info_path, 'r') as f:
|
||||
info_data = toml.load(f)
|
||||
return Info(**info_data)
|
||||
|
||||
def list_infos(self) -> List[str]:
|
||||
return [f.stem for f in self.info_dir.glob("*.toml")]
|
||||
|
||||
# --- ACL METHODS ---
|
||||
# ACLs are stored within Info objects, so we don't need separate files for ACLs.
|
||||
def save_acl(self, acl: ACL, info_name: str):
|
||||
info_path = self.info_dir / f"{info_name}.toml"
|
||||
if not info_path.exists():
|
||||
raise FileNotFoundError(f"Info with name {info_name} not found.")
|
||||
|
||||
# Load the info object
|
||||
with open(info_path, 'r') as f:
|
||||
info_data = toml.load(f)
|
||||
|
||||
# Update the ACL
|
||||
info_data['acl'] = acl.dict()
|
||||
|
||||
# Save the updated info with the new ACL
|
||||
with open(info_path, 'w') as f:
|
||||
toml.dump(info_data, f)
|
||||
|
||||
def load_acl(self, info_name: str) -> ACL:
|
||||
info_path = self.info_dir / f"{info_name}.toml"
|
||||
if not info_path.exists():
|
||||
raise FileNotFoundError(f"Info with name {info_name} not found.")
|
||||
|
||||
with open(info_path, 'r') as f:
|
||||
info_data = toml.load(f)
|
||||
|
||||
acl_data = info_data.get('acl')
|
||||
if not acl_data:
|
||||
raise ValueError(f"ACL not found for info {info_name}.")
|
||||
|
||||
return ACL(**acl_data)
|
||||
|
||||
|
5
myenv.sh
5
myenv.sh
@ -1,5 +1,6 @@
|
||||
#!/bin/bash
|
||||
export BASE_DIR="$(cd "$(dirname "$0")" && pwd)"
|
||||
#!/bin/bash
|
||||
SCRIPT_PATH="${BASH_SOURCE[0]:-$0}"
|
||||
export BASE_DIR="$(cd "$(dirname "$SCRIPT_PATH")" && pwd)"
|
||||
#export VENV_DIR="${HOME}/.venv"
|
||||
export VENV_DIR="${BASE_DIR}/.venv"
|
||||
|
||||
|
@ -3,6 +3,7 @@ colorama
|
||||
colorlog
|
||||
email-validator
|
||||
fastapi
|
||||
fastapi_mail
|
||||
fuzzywuzzy
|
||||
GitPython
|
||||
IPython
|
||||
@ -24,3 +25,5 @@ termcolor
|
||||
toml
|
||||
urllib3
|
||||
uvicorn
|
||||
itsdangerous
|
||||
python-multipart
|
||||
|
9
start_authserver.sh
Executable file
9
start_authserver.sh
Executable file
@ -0,0 +1,9 @@
|
||||
#!/bin/bash
|
||||
set -ex
|
||||
BASE_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"
|
||||
source ${BASE_DIR}/myenv.sh
|
||||
|
||||
cd $BASE_DIR/authserver
|
||||
|
||||
uvicorn main:app --host 0.0.0.0 --port 8000 --reload
|
||||
|
Loading…
Reference in New Issue
Block a user