From da627a9c483649b26062fa2dc71a8305efc3710d Mon Sep 17 00:00:00 2001 From: kristof de spiegeleer Date: Thu, 5 Sep 2024 08:27:47 +0200 Subject: [PATCH] web --- authdb/acls/mainacl.toml | 9 + authdb/groups/admins.toml | 3 + authdb/groups/moreusers.toml | 3 + authdb/info/boardinfo.toml | 5 + authdb/users/user2@example.com.toml | 4 + authdb/users/user@example.com.toml | 4 + authserver/group_test.py | 56 ++++-- authserver/utils/model.py | 251 ++++++++++++++++++------ authserver/utils/model_authenticated.py | 61 ++++++ 9 files changed, 328 insertions(+), 68 deletions(-) create mode 100644 authdb/acls/mainacl.toml create mode 100644 authdb/groups/admins.toml create mode 100644 authdb/groups/moreusers.toml create mode 100644 authdb/info/boardinfo.toml create mode 100644 authdb/users/user2@example.com.toml create mode 100644 authdb/users/user@example.com.toml create mode 100644 authserver/utils/model_authenticated.py diff --git a/authdb/acls/mainacl.toml b/authdb/acls/mainacl.toml new file mode 100644 index 0000000..45ee0e8 --- /dev/null +++ b/authdb/acls/mainacl.toml @@ -0,0 +1,9 @@ +name = "mainacl" +[[entries]] +user = "user@example.com" +right = [ "w", "r", "i", "t", "e",] + +[[entries]] +group = "admins" +right = [ "a", "d", "m", "i", "n",] + diff --git a/authdb/groups/admins.toml b/authdb/groups/admins.toml new file mode 100644 index 0000000..3b26a87 --- /dev/null +++ b/authdb/groups/admins.toml @@ -0,0 +1,3 @@ +name = "admins" +users = [ "user@example.com", "user2@example.com",] +groups = [] diff --git a/authdb/groups/moreusers.toml b/authdb/groups/moreusers.toml new file mode 100644 index 0000000..78a514a --- /dev/null +++ b/authdb/groups/moreusers.toml @@ -0,0 +1,3 @@ +name = "moreusers" +users = [ "user@example.com",] +groups = [ "admins",] diff --git a/authdb/info/boardinfo.toml b/authdb/info/boardinfo.toml new file mode 100644 index 0000000..ffdc4f6 --- /dev/null +++ b/authdb/info/boardinfo.toml @@ -0,0 +1,5 @@ +name = "boardinfo" +path = "/tmp" +acl = [ "mainacl",] +description = "Important file" +expiration = "2025-01-01" diff --git a/authdb/users/user2@example.com.toml b/authdb/users/user2@example.com.toml new file mode 100644 index 0000000..25f9c6a --- /dev/null +++ b/authdb/users/user2@example.com.toml @@ -0,0 +1,4 @@ +email = "user2@example.com" +description = "Test user 2" +profile = "Profile data 2" +admin = false diff --git a/authdb/users/user@example.com.toml b/authdb/users/user@example.com.toml new file mode 100644 index 0000000..d8ab504 --- /dev/null +++ b/authdb/users/user@example.com.toml @@ -0,0 +1,4 @@ +email = "user@example.com" +description = "Test user" +profile = "Profile data" +admin = false diff --git a/authserver/group_test.py b/authserver/group_test.py index acf9cff..9532de8 100644 --- a/authserver/group_test.py +++ b/authserver/group_test.py @@ -1,25 +1,57 @@ -from utils.model import FileSystemDatabase,User, Group, ACE, ACL, Info, RightEnum +from utils.model import FileSystemDatabase, User, Group, ACE, ACL, Info, RightEnum -db = FileSystemDatabase("/tmp/userdb") +db = FileSystemDatabase("authdb",reset=True) # Create a user and save it -user = User(email="user@example.com", description="Test user", profile="Profile data") -db.save_user(user) + +user1 = db.user_set(User(email="user@example.com", description="Test user", profile="Profile data")) +user2 = db.user_set(User(email="user2@example.com", description="Test user 2", profile="Profile data 2")) # Load a user -loaded_user = db.load_user("user@example.com") +loaded_user = db.user_get("user@example.com") print(loaded_user) +# List all users +all_users = db.user_list() +print(all_users) + # Create a group and save it -group = Group(name="Admins", members=[user]) -db.save_group(group) +group = Group(name="admins", users=[user1.email, user2.email]) +db.group_set(group) + +group = Group(name="moreusers", users=[user1.email], groups=["admins"]) +db.group_set(group) + + +# Load a group +loaded_group = db.group_get("admins") +print(loaded_group) + +loaded_group = db.group_get("moreusers") +print(loaded_group) + +# List all groups +all_groups = db.group_list() +print(all_groups) # Create an ACL and Info object and save it -ace = ACE(entity="Admins", right=RightEnum.admin) -acl = ACL(name="MainACL", entries=[ace]) -info = Info(name="File1", path="/path/to/file", acl=acl, description="Important file", expiration="2025-01-01") -db.save_info(info) +acl = db.acl_set(ACL(name="MainACL", entries=[ + ACE(user=user1.email, right=RightEnum.write), + ACE(group="admins", right=RightEnum.admin)] +)) +info = Info(name="BoardInfo", path="/tmp", acl=[acl.name], description="Important file", expiration="2025-01-01") +db.info_set(info) + + # Load an info object -loaded_info = db.load_info("File1") +loaded_info = db.info_get("BoardInfo") print(loaded_info) + +# List all info objects +all_info = db.info_list() +print(all_info) + +# List all ACLs +all_acls = db.acl_list() +print(all_acls) diff --git a/authserver/utils/model.py b/authserver/utils/model.py index cebaf54..270e392 100644 --- a/authserver/utils/model.py +++ b/authserver/utils/model.py @@ -1,4 +1,4 @@ -from pydantic import BaseModel, EmailStr +from pydantic import BaseModel,validator from typing import List, Union from enum import Enum import os @@ -6,6 +6,13 @@ import toml from pathlib import Path from typing import Optional +class ObjNotFound(Exception): + """Exception raised when an object is not found in the database.""" + pass + +def name_fix(name: str) -> str: + return name.strip().replace(" ", "_").lower() + # Enum for the ACE rights class RightEnum(str, Enum): read = "read" @@ -19,6 +26,12 @@ class ACE(BaseModel): user: Optional[str] = None # User email, optional right: RightEnum # The access right (read, write, admin) + @validator('right', pre=True) + def validate_right(cls, value): + if isinstance(value, list): + value = ''.join(value) + return RightEnum(value) + # Access Control List (ACL) class ACL(BaseModel): @@ -26,27 +39,36 @@ class ACL(BaseModel): entries: List[ACE] # List of ACEs (Access Control Entries) + # User model class User(BaseModel): - email: EmailStr # User's email (used as a key) - description: str # User's description - profile: str # User's profile + email: str # User's email (used as a key) + description: str = "" # User's description + profile: str = "" # User's profile + admin: bool = False # Admin flag, default to False # Group model class Group(BaseModel): name: str # Group name - members: List[Union['User', 'Group']] # Group members can be users or other groups - + users: List[str] = [] # List of user emails + groups: List[str] = [] # List of group names # Info model class Info(BaseModel): name: str # Name of the information path: str # Path to the resource - acl: ACL # Associated ACL + acl: List[str] # List of associated ACLs by means of their names description: str # Description of the information expiration: str # Expiration date (as string) +class InfoMem(BaseModel): + name: str # Name of the information + path: str # Path to the resource + read: List[str] # List of user emails with read access (do lower case) + write: List[str] # List of user emails with write access (do lower case) + admin: List[str] # List of user emails with admin access (do lower case) + # Update forward references to allow self-referencing models User.update_forward_refs() @@ -54,101 +76,218 @@ Group.update_forward_refs() ACE.update_forward_refs() class FileSystemDatabase: - def __init__(self, base_dir: str): + def __init__(self, base_dir: str, reset: bool = False): self.base_dir = Path(base_dir) self.users_dir = self.base_dir / "users" self.groups_dir = self.base_dir / "groups" self.info_dir = self.base_dir / "info" - + self.acl_dir = self.base_dir / "acls" + + if reset: + self.reset_database() + # Create directories if they don't exist self.users_dir.mkdir(parents=True, exist_ok=True) self.groups_dir.mkdir(parents=True, exist_ok=True) self.info_dir.mkdir(parents=True, exist_ok=True) + self.acl_dir.mkdir(parents=True, exist_ok=True) + + self.load_all_data() + + def reset_database(self): + if self.base_dir.exists(): + for item in self.base_dir.iterdir(): + if item.is_dir(): + for subitem in item.iterdir(): + subitem.unlink() + item.rmdir() + else: + item.unlink() + self.base_dir.rmdir() + + def load_all_data(self): + acls = {acl_name: self.acl_get(acl_name) for acl_name in self.acl_list()} + users = {user_email: self.user_get(user_email) for user_email in self.user_list()} + infos = {info_name: self.info_get(info_name) for info_name in self.info_list()} + + self.info_mem_dict = {} + for info_name, info in infos.items(): + read_emails = set() + write_emails = set() + admin_emails = set() + + for acl_name in info.acl: + acl = acls.get(acl_name) + if acl: + for entry in acl.entries: + if entry.user: + if entry.right == RightEnum.read: + read_emails.add(entry.user) + elif entry.right == RightEnum.write: + write_emails.add(entry.user) + elif entry.right == RightEnum.admin: + admin_emails.add(entry.user) + if entry.group: + group = self.group_get(entry.group) + for member in group.members: + if member in users: + if entry.right == RightEnum.read: + read_emails.add(member) + elif entry.right == RightEnum.write: + write_emails.add(member) + elif entry.right == RightEnum.admin: + admin_emails.add(member) + + self.info_mem_dict[info_name] = InfoMem( + name=info.name, + path=info.path, + read=list(read_emails), + write=list(write_emails), + admin=list(admin_emails) + ) # --- USER METHODS --- - def save_user(self, user: User): + def admin_check(self,current_user: User): + if not current_user.admin: + raise PermissionError("Only admin users can perform this action.") + + def user_exists(self, email: str) -> bool: + email = name_fix(email) + user_path = self.users_dir / f"{email}.toml" + return user_path.exists() + + def user_set(self, user: User) -> User: + user.email = name_fix(user.email) user_path = self.users_dir / f"{user.email}.toml" + group_path = self.groups_dir / f"{user.email}.toml" + + if group_path.exists(): + raise ValueError(f"A group with the name {user.email} already exists, cannot create a user with the same name.") + with open(user_path, 'w') as f: toml.dump(user.dict(), f) + return user - def load_user(self, email: str) -> User: + def user_get(self, email: str) -> User: + email = name_fix(email) user_path = self.users_dir / f"{email}.toml" + if not user_path.exists(): - raise FileNotFoundError(f"User with email {email} not found.") + raise ObjNotFound(f"User with email {email} not found.") with open(user_path, 'r') as f: user_data = toml.load(f) return User(**user_data) - def list_users(self) -> List[str]: + def user_list(self) -> List[str]: return [f.stem for f in self.users_dir.glob("*.toml")] # --- GROUP METHODS --- - def save_group(self, group: Group): + def group_exists(self, name: str) -> bool: + name = name_fix(name) + group_path = self.groups_dir / f"{name}.toml" + return group_path.exists() + + def group_set(self, group: Group) -> Group: + group.name = name_fix(group.name) group_path = self.groups_dir / f"{group.name}.toml" + user_path = self.users_dir / f"{group.name}.toml" + + if user_path.exists(): + raise ValueError(f"A user with the name {group.name} already exists. Cannot create a group with the same name.") + + # Check that all mentioned users and groups exist + for user_email in group.users: + if not self.user_exists(user_email): + raise ObjNotFound(f"User with email {user_email} not found.") + + for group_name in group.groups: + if not self.group_exists(group_name): + raise ObjNotFound(f"Group with name {group_name} not found.") + with open(group_path, 'w') as f: toml.dump(group.dict(), f) + return group - def load_group(self, name: str) -> Group: + def group_get(self, name: str) -> Group: + name = name_fix(name) group_path = self.groups_dir / f"{name}.toml" if not group_path.exists(): - raise FileNotFoundError(f"Group with name {name} not found.") + raise ObjNotFound(f"Group with name {name} not found.") with open(group_path, 'r') as f: group_data = toml.load(f) return Group(**group_data) - def list_groups(self) -> List[str]: + def group_list(self) -> List[str]: return [f.stem for f in self.groups_dir.glob("*.toml")] # --- INFO METHODS --- - def save_info(self, info: Info): - info_path = self.info_dir / f"{info.name}.toml" - # Save ACL as part of the same TOML file - info_data = info.dict() - info_data['acl'] = info.acl.dict() # Store ACL as nested dictionary - with open(info_path, 'w') as f: - toml.dump(info_data, f) + def info_exists(self, name: str) -> bool: + name = name_fix(name) + info_path = self.info_dir / f"{name}.toml" + return info_path.exists() - def load_info(self, name: str) -> Info: + def info_set(self, info: Info) -> Info: + info.name = name_fix(info.name) + info.acl = [name_fix(acl_name) for acl_name in info.acl] + + # Check that all ACLs mentioned exist + for acl_name in info.acl: + if not self.acl_exists(acl_name): + raise ObjNotFound(f"ACL with name {acl_name} not found.") + + info_path = self.info_dir / f"{info.name}.toml" + with open(info_path, 'w') as f: + toml.dump(info.dict(), f) + return info + + def info_get(self, name: str) -> Info: + name = name_fix(name) info_path = self.info_dir / f"{name}.toml" if not info_path.exists(): - raise FileNotFoundError(f"Info with name {name} not found.") + raise ObjNotFound(f"Info with name {name} not found.") with open(info_path, 'r') as f: info_data = toml.load(f) return Info(**info_data) - def list_infos(self) -> List[str]: + def info_list(self) -> List[str]: return [f.stem for f in self.info_dir.glob("*.toml")] # --- ACL METHODS --- - # ACLs are stored within Info objects, so we don't need separate files for ACLs. - def save_acl(self, acl: ACL, info_name: str): - info_path = self.info_dir / f"{info_name}.toml" - if not info_path.exists(): - raise FileNotFoundError(f"Info with name {info_name} not found.") - - # Load the info object - with open(info_path, 'r') as f: - info_data = toml.load(f) - - # Update the ACL - info_data['acl'] = acl.dict() - - # Save the updated info with the new ACL - with open(info_path, 'w') as f: - toml.dump(info_data, f) + def acl_exists(self, name: str) -> bool: + name = name_fix(name) + acl_path = self.acl_dir / f"{name}.toml" + return acl_path.exists() - def load_acl(self, info_name: str) -> ACL: - info_path = self.info_dir / f"{info_name}.toml" - if not info_path.exists(): - raise FileNotFoundError(f"Info with name {info_name} not found.") - - with open(info_path, 'r') as f: - info_data = toml.load(f) - - acl_data = info_data.get('acl') - if not acl_data: - raise ValueError(f"ACL not found for info {info_name}.") - + def acl_set(self, acl: ACL) -> ACL: + acl.name = name_fix(acl.name) + acl.entries = [ + ACE( + group=name_fix(entry.group) if entry.group else None, + user=name_fix(entry.user) if entry.user else None, + right=entry.right + ) for entry in acl.entries + ] + + # Check that all mentioned users and groups exist + for entry in acl.entries: + if entry.user and not self.user_exists(entry.user): + raise ObjNotFound(f"User with email {entry.user} not found.") + if entry.group and not self.group_exists(entry.group): + raise ObjNotFound(f"Group with name {entry.group} not found.") + + acl_path = self.acl_dir / f"{acl.name}.toml" + with open(acl_path, 'w') as f: + toml.dump(acl.dict(), f) + return acl + + def acl_get(self, acl_name: str) -> ACL: + acl_name = name_fix(acl_name) + acl_path = self.acl_dir / f"{acl_name}.toml" + if not acl_path.exists(): + raise ObjNotFound(f"ACL with name {acl_name} not found.") + with open(acl_path, 'r') as f: + acl_data = toml.load(f) return ACL(**acl_data) - + def acl_list(self) -> List[str]: + return [f.stem for f in self.acl_dir.glob("*.toml")] diff --git a/authserver/utils/model_authenticated.py b/authserver/utils/model_authenticated.py new file mode 100644 index 0000000..b9b3aed --- /dev/null +++ b/authserver/utils/model_authenticated.py @@ -0,0 +1,61 @@ +from authserver.utils.model import FileSystemDatabase, User, Group, ACE, ACL, Info, RightEnum +from typing import List + +class FileSystemDatabaseAuthenticated: + def __init__(self, parent: FileSystemDatabase, current_user_email: str): + self.parent = parent + self.current_user = self.parent.user_get(current_user_email) + + def acl_check(self, info:Info) -> RightEnum: + info_mem = self.parent.info_mem_dict.get(info.name) + if not info_mem: + raise FileNotFoundError(f"Info with name {info.name} not found in memory.") + + if self.current_user.email in info_mem.admin: + return RightEnum.admin + if self.current_user.email in info_mem.write: + return RightEnum.write + if self.current_user.email in info_mem.read: + return RightEnum.read + + raise PermissionError(f"User {self.current_user.email} does not have access to info {info.name}.") + + # --- USER METHODS --- + def user_set(self, user: User): + return self.parent.user_set(user) + + def user_get(self, email: str) -> User: + return self.parent.user_get(email) + + def user_list(self) -> List[str]: + return self.parent.user_list() + + # --- GROUP METHODS --- + def group_set(self, group: Group): + return self.parent.group_set(group) + + def group_get(self, name: str) -> Group: + return self.parent.group_get(name) + + def group_list(self) -> List[str]: + return self.parent.group_list() + + # --- INFO METHODS --- + def info_set(self, info: Info): + return self.parent.info_set(info) + + def info_get(self, name: str) -> Info: + return self.parent.info_get(name) + + def info_list(self) -> List[str]: + return self.parent.info_list() + + # --- ACL METHODS --- + def acl_set(self, acl: ACL): + return self.parent.acl_set(acl) + + def acl_get(self, info_name: str) -> ACL: + return self.parent.acl_get(info_name) + + def acl_list(self) -> List[str]: + return self.parent.acl_list()