www_projectmycelium_io/poc_threefold/server.py
2024-11-15 10:49:55 +03:00

190 lines
6.1 KiB
Python

from fastapi import FastAPI, Request, HTTPException, WebSocket
from fastapi.responses import HTMLResponse, FileResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from jinja2 import Environment, FileSystemLoader, select_autoescape, TemplateNotFound
import os
import markdown
import re
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import asyncio
from typing import List
import time
from contextlib import asynccontextmanager
# Get BASE_DIR from environment variables with a default fallback
BASE_DIR = os.environ.get('BASE_DIR', os.path.dirname(os.path.abspath(__file__)))
# Check if BASE_DIR exists
if not os.path.exists(BASE_DIR):
raise RuntimeError(f"The BASE_DIR '{BASE_DIR}' does not exist.")
sources_dir = os.path.expanduser(f"{BASE_DIR}/poc_threefold")
if not os.path.exists(sources_dir):
raise RuntimeError(f"The source directory '{sources_dir}' does not exist.")
static_dir = f"{sources_dir}/static"
content_dir = f"{sources_dir}/content"
# Store active WebSocket connections
active_connections: List[WebSocket] = []
# Store the main event loop reference
main_loop = None
class FileChangeHandler(FileSystemEventHandler):
def __init__(self):
self.last_modified = 0
def on_modified(self, event):
if event.is_directory:
return
current_time = time.time()
if current_time - self.last_modified > 0.5: # Debounce multiple events
self.last_modified = current_time
if main_loop is not None:
asyncio.run_coroutine_threadsafe(broadcast_reload(), main_loop)
async def broadcast_reload():
for connection in active_connections[:]: # Create a copy of the list to iterate over
try:
await connection.send_text("reload")
except:
if connection in active_connections:
active_connections.remove(connection)
def get_content(name: str) -> str:
"""Get content by name from either HTML or markdown files in content directory"""
# Remove any leading/trailing slashes
name = name.strip('/')
# Check for file with .html extension
html_path = os.path.join(content_dir, f"{name}.html")
if os.path.exists(html_path):
with open(html_path, 'r') as f:
return f.read()
# Check for file with .md extension
md_path = os.path.join(content_dir, f"{name}.md")
if os.path.exists(md_path):
with open(md_path, 'r') as f:
content = f.read()
return markdown.markdown(content)
return f"[[{name} not found]]"
def process_content(content: str) -> str:
"""Process content and replace [[name]] with corresponding HTML content"""
def replace_content(match):
name = match.group(1)
return get_content(name)
# Replace all [[name]] patterns
content = re.sub(r'\[\[(.*?)\]\]', replace_content, content)
# Inject live reload script before </body>
reload_script = """
<script>
const ws = new WebSocket('ws://' + window.location.host + '/ws');
ws.onmessage = function(event) {
if (event.data === 'reload') {
window.location.reload();
}
};
ws.onclose = function() {
console.log('WebSocket connection closed. Retrying...');
setTimeout(() => {
window.location.reload();
}, 1000);
};
</script>
"""
content = content.replace('</body>', f'{reload_script}</body>')
return content
# Setup file watcher and store observer reference
observer = None
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
global main_loop, observer
main_loop = asyncio.get_running_loop()
# Setup file watcher
event_handler = FileChangeHandler()
observer = Observer()
observer.schedule(event_handler, sources_dir, recursive=True)
observer.start()
yield
# Shutdown
if observer:
observer.stop()
observer.join()
app = FastAPI(lifespan=lifespan)
if not os.path.exists(static_dir):
raise RuntimeError(f"The directory '{static_dir}' does not exist.")
if not os.path.exists(sources_dir):
raise RuntimeError(f"The templates dir '{sources_dir}' does not exist.")
# Mount the static files directory
app.mount("/static", StaticFiles(directory=static_dir), name="static")
env = Environment(
loader=FileSystemLoader(sources_dir),
autoescape=select_autoescape(['html', 'xml'])
)
# Initialize Jinja2 templates
templates = Jinja2Templates(directory=sources_dir)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
active_connections.append(websocket)
try:
while True:
await websocket.receive_text()
except:
if websocket in active_connections:
active_connections.remove(websocket)
@app.get("/favicon.ico")
async def favicon():
# First try to serve from static directory
favicon_path = os.path.join(static_dir, "favicon.ico")
if os.path.exists(favicon_path):
return FileResponse(favicon_path)
# If not found, return 404
raise HTTPException(status_code=404, detail="Favicon not found")
@app.get("/", response_class=HTMLResponse)
async def read_index(request: Request):
template = env.get_template("index.html")
content = template.render(request=request)
return process_content(content)
@app.get("/{path:path}", response_class=HTMLResponse)
async def read_template(request: Request, path: str):
# Add .html extension if not present
if not path.endswith('.html'):
path = f"{path}.html"
try:
# Try to load and render the template (this will work for both direct files and templates)
template = env.get_template(path)
content = template.render(request=request)
return process_content(content)
except TemplateNotFound:
raise HTTPException(status_code=404, detail=f"Template {path} not found")
if __name__ == "__main__":
import uvicorn
uvicorn.run("server:app", host="127.0.0.1", port=8001, reload=True)