190 lines
6.1 KiB
Python
190 lines
6.1 KiB
Python
from fastapi import FastAPI, Request, HTTPException, WebSocket
|
|
from fastapi.responses import HTMLResponse, FileResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
from fastapi.staticfiles import StaticFiles
|
|
from jinja2 import Environment, FileSystemLoader, select_autoescape, TemplateNotFound
|
|
import os
|
|
import markdown
|
|
import re
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
import asyncio
|
|
from typing import List
|
|
import time
|
|
from contextlib import asynccontextmanager
|
|
|
|
# Get BASE_DIR from environment variables with a default fallback
|
|
BASE_DIR = os.environ.get('BASE_DIR', os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
# Check if BASE_DIR exists
|
|
if not os.path.exists(BASE_DIR):
|
|
raise RuntimeError(f"The BASE_DIR '{BASE_DIR}' does not exist.")
|
|
|
|
sources_dir = os.path.expanduser(f"{BASE_DIR}/poc_threefold")
|
|
if not os.path.exists(sources_dir):
|
|
raise RuntimeError(f"The source directory '{sources_dir}' does not exist.")
|
|
static_dir = f"{sources_dir}/static"
|
|
content_dir = f"{sources_dir}/content"
|
|
|
|
# Store active WebSocket connections
|
|
active_connections: List[WebSocket] = []
|
|
|
|
# Store the main event loop reference
|
|
main_loop = None
|
|
|
|
class FileChangeHandler(FileSystemEventHandler):
|
|
def __init__(self):
|
|
self.last_modified = 0
|
|
|
|
def on_modified(self, event):
|
|
if event.is_directory:
|
|
return
|
|
|
|
current_time = time.time()
|
|
if current_time - self.last_modified > 0.5: # Debounce multiple events
|
|
self.last_modified = current_time
|
|
if main_loop is not None:
|
|
asyncio.run_coroutine_threadsafe(broadcast_reload(), main_loop)
|
|
|
|
async def broadcast_reload():
|
|
for connection in active_connections[:]: # Create a copy of the list to iterate over
|
|
try:
|
|
await connection.send_text("reload")
|
|
except:
|
|
if connection in active_connections:
|
|
active_connections.remove(connection)
|
|
|
|
def get_content(name: str) -> str:
|
|
"""Get content by name from either HTML or markdown files in content directory"""
|
|
# Remove any leading/trailing slashes
|
|
name = name.strip('/')
|
|
|
|
# Check for file with .html extension
|
|
html_path = os.path.join(content_dir, f"{name}.html")
|
|
if os.path.exists(html_path):
|
|
with open(html_path, 'r') as f:
|
|
return f.read()
|
|
|
|
# Check for file with .md extension
|
|
md_path = os.path.join(content_dir, f"{name}.md")
|
|
if os.path.exists(md_path):
|
|
with open(md_path, 'r') as f:
|
|
content = f.read()
|
|
return markdown.markdown(content)
|
|
|
|
return f"[[{name} not found]]"
|
|
|
|
def process_content(content: str) -> str:
|
|
"""Process content and replace [[name]] with corresponding HTML content"""
|
|
def replace_content(match):
|
|
name = match.group(1)
|
|
return get_content(name)
|
|
|
|
# Replace all [[name]] patterns
|
|
content = re.sub(r'\[\[(.*?)\]\]', replace_content, content)
|
|
|
|
# Inject live reload script before </body>
|
|
reload_script = """
|
|
<script>
|
|
const ws = new WebSocket('ws://' + window.location.host + '/ws');
|
|
ws.onmessage = function(event) {
|
|
if (event.data === 'reload') {
|
|
window.location.reload();
|
|
}
|
|
};
|
|
ws.onclose = function() {
|
|
console.log('WebSocket connection closed. Retrying...');
|
|
setTimeout(() => {
|
|
window.location.reload();
|
|
}, 1000);
|
|
};
|
|
</script>
|
|
"""
|
|
content = content.replace('</body>', f'{reload_script}</body>')
|
|
return content
|
|
|
|
# Setup file watcher and store observer reference
|
|
observer = None
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
# Startup
|
|
global main_loop, observer
|
|
main_loop = asyncio.get_running_loop()
|
|
|
|
# Setup file watcher
|
|
event_handler = FileChangeHandler()
|
|
observer = Observer()
|
|
observer.schedule(event_handler, sources_dir, recursive=True)
|
|
observer.start()
|
|
|
|
yield
|
|
|
|
# Shutdown
|
|
if observer:
|
|
observer.stop()
|
|
observer.join()
|
|
|
|
app = FastAPI(lifespan=lifespan)
|
|
|
|
if not os.path.exists(static_dir):
|
|
raise RuntimeError(f"The directory '{static_dir}' does not exist.")
|
|
|
|
if not os.path.exists(sources_dir):
|
|
raise RuntimeError(f"The templates dir '{sources_dir}' does not exist.")
|
|
|
|
# Mount the static files directory
|
|
app.mount("/static", StaticFiles(directory=static_dir), name="static")
|
|
|
|
env = Environment(
|
|
loader=FileSystemLoader(sources_dir),
|
|
autoescape=select_autoescape(['html', 'xml'])
|
|
)
|
|
|
|
# Initialize Jinja2 templates
|
|
templates = Jinja2Templates(directory=sources_dir)
|
|
|
|
@app.websocket("/ws")
|
|
async def websocket_endpoint(websocket: WebSocket):
|
|
await websocket.accept()
|
|
active_connections.append(websocket)
|
|
try:
|
|
while True:
|
|
await websocket.receive_text()
|
|
except:
|
|
if websocket in active_connections:
|
|
active_connections.remove(websocket)
|
|
|
|
@app.get("/favicon.ico")
|
|
async def favicon():
|
|
# First try to serve from static directory
|
|
favicon_path = os.path.join(static_dir, "favicon.ico")
|
|
if os.path.exists(favicon_path):
|
|
return FileResponse(favicon_path)
|
|
# If not found, return 404
|
|
raise HTTPException(status_code=404, detail="Favicon not found")
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def read_index(request: Request):
|
|
template = env.get_template("index.html")
|
|
content = template.render(request=request)
|
|
return process_content(content)
|
|
|
|
@app.get("/{path:path}", response_class=HTMLResponse)
|
|
async def read_template(request: Request, path: str):
|
|
# Add .html extension if not present
|
|
if not path.endswith('.html'):
|
|
path = f"{path}.html"
|
|
|
|
try:
|
|
# Try to load and render the template (this will work for both direct files and templates)
|
|
template = env.get_template(path)
|
|
content = template.render(request=request)
|
|
return process_content(content)
|
|
except TemplateNotFound:
|
|
raise HTTPException(status_code=404, detail=f"Template {path} not found")
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run("server:app", host="127.0.0.1", port=8001, reload=True)
|