This commit is contained in:
kristof de spiegeleer 2024-09-05 08:27:47 +02:00
parent 404b55207f
commit da627a9c48
9 changed files with 328 additions and 68 deletions

9
authdb/acls/mainacl.toml Normal file
View File

@ -0,0 +1,9 @@
name = "mainacl"
[[entries]]
user = "user@example.com"
right = [ "w", "r", "i", "t", "e",]
[[entries]]
group = "admins"
right = [ "a", "d", "m", "i", "n",]

View File

@ -0,0 +1,3 @@
name = "admins"
users = [ "user@example.com", "user2@example.com",]
groups = []

View File

@ -0,0 +1,3 @@
name = "moreusers"
users = [ "user@example.com",]
groups = [ "admins",]

View File

@ -0,0 +1,5 @@
name = "boardinfo"
path = "/tmp"
acl = [ "mainacl",]
description = "Important file"
expiration = "2025-01-01"

View File

@ -0,0 +1,4 @@
email = "user2@example.com"
description = "Test user 2"
profile = "Profile data 2"
admin = false

View File

@ -0,0 +1,4 @@
email = "user@example.com"
description = "Test user"
profile = "Profile data"
admin = false

View File

@ -1,25 +1,57 @@
from utils.model import FileSystemDatabase,User, Group, ACE, ACL, Info, RightEnum
from utils.model import FileSystemDatabase, User, Group, ACE, ACL, Info, RightEnum
db = FileSystemDatabase("/tmp/userdb")
db = FileSystemDatabase("authdb",reset=True)
# Create a user and save it
user = User(email="user@example.com", description="Test user", profile="Profile data")
db.save_user(user)
user1 = db.user_set(User(email="user@example.com", description="Test user", profile="Profile data"))
user2 = db.user_set(User(email="user2@example.com", description="Test user 2", profile="Profile data 2"))
# Load a user
loaded_user = db.load_user("user@example.com")
loaded_user = db.user_get("user@example.com")
print(loaded_user)
# List all users
all_users = db.user_list()
print(all_users)
# Create a group and save it
group = Group(name="Admins", members=[user])
db.save_group(group)
group = Group(name="admins", users=[user1.email, user2.email])
db.group_set(group)
group = Group(name="moreusers", users=[user1.email], groups=["admins"])
db.group_set(group)
# Load a group
loaded_group = db.group_get("admins")
print(loaded_group)
loaded_group = db.group_get("moreusers")
print(loaded_group)
# List all groups
all_groups = db.group_list()
print(all_groups)
# Create an ACL and Info object and save it
ace = ACE(entity="Admins", right=RightEnum.admin)
acl = ACL(name="MainACL", entries=[ace])
info = Info(name="File1", path="/path/to/file", acl=acl, description="Important file", expiration="2025-01-01")
db.save_info(info)
acl = db.acl_set(ACL(name="MainACL", entries=[
ACE(user=user1.email, right=RightEnum.write),
ACE(group="admins", right=RightEnum.admin)]
))
info = Info(name="BoardInfo", path="/tmp", acl=[acl.name], description="Important file", expiration="2025-01-01")
db.info_set(info)
# Load an info object
loaded_info = db.load_info("File1")
loaded_info = db.info_get("BoardInfo")
print(loaded_info)
# List all info objects
all_info = db.info_list()
print(all_info)
# List all ACLs
all_acls = db.acl_list()
print(all_acls)

View File

@ -1,4 +1,4 @@
from pydantic import BaseModel, EmailStr
from pydantic import BaseModel,validator
from typing import List, Union
from enum import Enum
import os
@ -6,6 +6,13 @@ import toml
from pathlib import Path
from typing import Optional
class ObjNotFound(Exception):
"""Exception raised when an object is not found in the database."""
pass
def name_fix(name: str) -> str:
return name.strip().replace(" ", "_").lower()
# Enum for the ACE rights
class RightEnum(str, Enum):
read = "read"
@ -19,6 +26,12 @@ class ACE(BaseModel):
user: Optional[str] = None # User email, optional
right: RightEnum # The access right (read, write, admin)
@validator('right', pre=True)
def validate_right(cls, value):
if isinstance(value, list):
value = ''.join(value)
return RightEnum(value)
# Access Control List (ACL)
class ACL(BaseModel):
@ -26,27 +39,36 @@ class ACL(BaseModel):
entries: List[ACE] # List of ACEs (Access Control Entries)
# User model
class User(BaseModel):
email: EmailStr # User's email (used as a key)
description: str # User's description
profile: str # User's profile
email: str # User's email (used as a key)
description: str = "" # User's description
profile: str = "" # User's profile
admin: bool = False # Admin flag, default to False
# Group model
class Group(BaseModel):
name: str # Group name
members: List[Union['User', 'Group']] # Group members can be users or other groups
users: List[str] = [] # List of user emails
groups: List[str] = [] # List of group names
# Info model
class Info(BaseModel):
name: str # Name of the information
path: str # Path to the resource
acl: ACL # Associated ACL
acl: List[str] # List of associated ACLs by means of their names
description: str # Description of the information
expiration: str # Expiration date (as string)
class InfoMem(BaseModel):
name: str # Name of the information
path: str # Path to the resource
read: List[str] # List of user emails with read access (do lower case)
write: List[str] # List of user emails with write access (do lower case)
admin: List[str] # List of user emails with admin access (do lower case)
# Update forward references to allow self-referencing models
User.update_forward_refs()
@ -54,101 +76,218 @@ Group.update_forward_refs()
ACE.update_forward_refs()
class FileSystemDatabase:
def __init__(self, base_dir: str):
def __init__(self, base_dir: str, reset: bool = False):
self.base_dir = Path(base_dir)
self.users_dir = self.base_dir / "users"
self.groups_dir = self.base_dir / "groups"
self.info_dir = self.base_dir / "info"
self.acl_dir = self.base_dir / "acls"
if reset:
self.reset_database()
# Create directories if they don't exist
self.users_dir.mkdir(parents=True, exist_ok=True)
self.groups_dir.mkdir(parents=True, exist_ok=True)
self.info_dir.mkdir(parents=True, exist_ok=True)
self.acl_dir.mkdir(parents=True, exist_ok=True)
self.load_all_data()
def reset_database(self):
if self.base_dir.exists():
for item in self.base_dir.iterdir():
if item.is_dir():
for subitem in item.iterdir():
subitem.unlink()
item.rmdir()
else:
item.unlink()
self.base_dir.rmdir()
def load_all_data(self):
acls = {acl_name: self.acl_get(acl_name) for acl_name in self.acl_list()}
users = {user_email: self.user_get(user_email) for user_email in self.user_list()}
infos = {info_name: self.info_get(info_name) for info_name in self.info_list()}
self.info_mem_dict = {}
for info_name, info in infos.items():
read_emails = set()
write_emails = set()
admin_emails = set()
for acl_name in info.acl:
acl = acls.get(acl_name)
if acl:
for entry in acl.entries:
if entry.user:
if entry.right == RightEnum.read:
read_emails.add(entry.user)
elif entry.right == RightEnum.write:
write_emails.add(entry.user)
elif entry.right == RightEnum.admin:
admin_emails.add(entry.user)
if entry.group:
group = self.group_get(entry.group)
for member in group.members:
if member in users:
if entry.right == RightEnum.read:
read_emails.add(member)
elif entry.right == RightEnum.write:
write_emails.add(member)
elif entry.right == RightEnum.admin:
admin_emails.add(member)
self.info_mem_dict[info_name] = InfoMem(
name=info.name,
path=info.path,
read=list(read_emails),
write=list(write_emails),
admin=list(admin_emails)
)
# --- USER METHODS ---
def save_user(self, user: User):
def admin_check(self,current_user: User):
if not current_user.admin:
raise PermissionError("Only admin users can perform this action.")
def user_exists(self, email: str) -> bool:
email = name_fix(email)
user_path = self.users_dir / f"{email}.toml"
return user_path.exists()
def user_set(self, user: User) -> User:
user.email = name_fix(user.email)
user_path = self.users_dir / f"{user.email}.toml"
group_path = self.groups_dir / f"{user.email}.toml"
if group_path.exists():
raise ValueError(f"A group with the name {user.email} already exists, cannot create a user with the same name.")
with open(user_path, 'w') as f:
toml.dump(user.dict(), f)
return user
def load_user(self, email: str) -> User:
def user_get(self, email: str) -> User:
email = name_fix(email)
user_path = self.users_dir / f"{email}.toml"
if not user_path.exists():
raise FileNotFoundError(f"User with email {email} not found.")
raise ObjNotFound(f"User with email {email} not found.")
with open(user_path, 'r') as f:
user_data = toml.load(f)
return User(**user_data)
def list_users(self) -> List[str]:
def user_list(self) -> List[str]:
return [f.stem for f in self.users_dir.glob("*.toml")]
# --- GROUP METHODS ---
def save_group(self, group: Group):
def group_exists(self, name: str) -> bool:
name = name_fix(name)
group_path = self.groups_dir / f"{name}.toml"
return group_path.exists()
def group_set(self, group: Group) -> Group:
group.name = name_fix(group.name)
group_path = self.groups_dir / f"{group.name}.toml"
user_path = self.users_dir / f"{group.name}.toml"
if user_path.exists():
raise ValueError(f"A user with the name {group.name} already exists. Cannot create a group with the same name.")
# Check that all mentioned users and groups exist
for user_email in group.users:
if not self.user_exists(user_email):
raise ObjNotFound(f"User with email {user_email} not found.")
for group_name in group.groups:
if not self.group_exists(group_name):
raise ObjNotFound(f"Group with name {group_name} not found.")
with open(group_path, 'w') as f:
toml.dump(group.dict(), f)
return group
def load_group(self, name: str) -> Group:
def group_get(self, name: str) -> Group:
name = name_fix(name)
group_path = self.groups_dir / f"{name}.toml"
if not group_path.exists():
raise FileNotFoundError(f"Group with name {name} not found.")
raise ObjNotFound(f"Group with name {name} not found.")
with open(group_path, 'r') as f:
group_data = toml.load(f)
return Group(**group_data)
def list_groups(self) -> List[str]:
def group_list(self) -> List[str]:
return [f.stem for f in self.groups_dir.glob("*.toml")]
# --- INFO METHODS ---
def save_info(self, info: Info):
info_path = self.info_dir / f"{info.name}.toml"
# Save ACL as part of the same TOML file
info_data = info.dict()
info_data['acl'] = info.acl.dict() # Store ACL as nested dictionary
with open(info_path, 'w') as f:
toml.dump(info_data, f)
def info_exists(self, name: str) -> bool:
name = name_fix(name)
info_path = self.info_dir / f"{name}.toml"
return info_path.exists()
def load_info(self, name: str) -> Info:
def info_set(self, info: Info) -> Info:
info.name = name_fix(info.name)
info.acl = [name_fix(acl_name) for acl_name in info.acl]
# Check that all ACLs mentioned exist
for acl_name in info.acl:
if not self.acl_exists(acl_name):
raise ObjNotFound(f"ACL with name {acl_name} not found.")
info_path = self.info_dir / f"{info.name}.toml"
with open(info_path, 'w') as f:
toml.dump(info.dict(), f)
return info
def info_get(self, name: str) -> Info:
name = name_fix(name)
info_path = self.info_dir / f"{name}.toml"
if not info_path.exists():
raise FileNotFoundError(f"Info with name {name} not found.")
raise ObjNotFound(f"Info with name {name} not found.")
with open(info_path, 'r') as f:
info_data = toml.load(f)
return Info(**info_data)
def list_infos(self) -> List[str]:
def info_list(self) -> List[str]:
return [f.stem for f in self.info_dir.glob("*.toml")]
# --- ACL METHODS ---
# ACLs are stored within Info objects, so we don't need separate files for ACLs.
def save_acl(self, acl: ACL, info_name: str):
info_path = self.info_dir / f"{info_name}.toml"
if not info_path.exists():
raise FileNotFoundError(f"Info with name {info_name} not found.")
# Load the info object
with open(info_path, 'r') as f:
info_data = toml.load(f)
# Update the ACL
info_data['acl'] = acl.dict()
# Save the updated info with the new ACL
with open(info_path, 'w') as f:
toml.dump(info_data, f)
def acl_exists(self, name: str) -> bool:
name = name_fix(name)
acl_path = self.acl_dir / f"{name}.toml"
return acl_path.exists()
def load_acl(self, info_name: str) -> ACL:
info_path = self.info_dir / f"{info_name}.toml"
if not info_path.exists():
raise FileNotFoundError(f"Info with name {info_name} not found.")
with open(info_path, 'r') as f:
info_data = toml.load(f)
acl_data = info_data.get('acl')
if not acl_data:
raise ValueError(f"ACL not found for info {info_name}.")
def acl_set(self, acl: ACL) -> ACL:
acl.name = name_fix(acl.name)
acl.entries = [
ACE(
group=name_fix(entry.group) if entry.group else None,
user=name_fix(entry.user) if entry.user else None,
right=entry.right
) for entry in acl.entries
]
# Check that all mentioned users and groups exist
for entry in acl.entries:
if entry.user and not self.user_exists(entry.user):
raise ObjNotFound(f"User with email {entry.user} not found.")
if entry.group and not self.group_exists(entry.group):
raise ObjNotFound(f"Group with name {entry.group} not found.")
acl_path = self.acl_dir / f"{acl.name}.toml"
with open(acl_path, 'w') as f:
toml.dump(acl.dict(), f)
return acl
def acl_get(self, acl_name: str) -> ACL:
acl_name = name_fix(acl_name)
acl_path = self.acl_dir / f"{acl_name}.toml"
if not acl_path.exists():
raise ObjNotFound(f"ACL with name {acl_name} not found.")
with open(acl_path, 'r') as f:
acl_data = toml.load(f)
return ACL(**acl_data)
def acl_list(self) -> List[str]:
return [f.stem for f in self.acl_dir.glob("*.toml")]

View File

@ -0,0 +1,61 @@
from authserver.utils.model import FileSystemDatabase, User, Group, ACE, ACL, Info, RightEnum
from typing import List
class FileSystemDatabaseAuthenticated:
def __init__(self, parent: FileSystemDatabase, current_user_email: str):
self.parent = parent
self.current_user = self.parent.user_get(current_user_email)
def acl_check(self, info:Info) -> RightEnum:
info_mem = self.parent.info_mem_dict.get(info.name)
if not info_mem:
raise FileNotFoundError(f"Info with name {info.name} not found in memory.")
if self.current_user.email in info_mem.admin:
return RightEnum.admin
if self.current_user.email in info_mem.write:
return RightEnum.write
if self.current_user.email in info_mem.read:
return RightEnum.read
raise PermissionError(f"User {self.current_user.email} does not have access to info {info.name}.")
# --- USER METHODS ---
def user_set(self, user: User):
return self.parent.user_set(user)
def user_get(self, email: str) -> User:
return self.parent.user_get(email)
def user_list(self) -> List[str]:
return self.parent.user_list()
# --- GROUP METHODS ---
def group_set(self, group: Group):
return self.parent.group_set(group)
def group_get(self, name: str) -> Group:
return self.parent.group_get(name)
def group_list(self) -> List[str]:
return self.parent.group_list()
# --- INFO METHODS ---
def info_set(self, info: Info):
return self.parent.info_set(info)
def info_get(self, name: str) -> Info:
return self.parent.info_get(name)
def info_list(self) -> List[str]:
return self.parent.info_list()
# --- ACL METHODS ---
def acl_set(self, acl: ACL):
return self.parent.acl_set(acl)
def acl_get(self, info_name: str) -> ACL:
return self.parent.acl_get(info_name)
def acl_list(self) -> List[str]:
return self.parent.acl_list()