109 lines
3.3 KiB
Python
109 lines
3.3 KiB
Python
from fastapi import FastAPI, HTTPException, UploadFile, File, Query, Body, status
|
|
from fastapi.responses import JSONResponse, FileResponse
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel
|
|
from typing import List, Optional
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
|
|
app = FastAPI()
|
|
|
|
class FileUpdate(BaseModel):
|
|
operation: str
|
|
name: Optional[str] = None
|
|
target: Optional[str] = None
|
|
ids: Optional[List[str]] = None
|
|
|
|
class NewFile(BaseModel):
|
|
name: str
|
|
type: str
|
|
|
|
class ResponseModel(BaseModel):
|
|
error: Optional[str] = None
|
|
result: Optional[dict] = None
|
|
|
|
class ResponseMultiModel(BaseModel):
|
|
error: Optional[str] = None
|
|
result: Optional[List[dict]] = None
|
|
|
|
# Configuration and global variables
|
|
UPLOAD_LIMIT = 10_000_000
|
|
ROOT_DIR = tempfile.mkdtemp()
|
|
|
|
# CORS middleware
|
|
origins = ["*"]
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=origins,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
def get_full_path(relative_path: str) -> str:
|
|
return os.path.join(ROOT_DIR, relative_path)
|
|
|
|
@app.get("/files")
|
|
async def list_files():
|
|
try:
|
|
files = os.listdir(ROOT_DIR)
|
|
return {"files": files}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@app.get("/files/{path:path}")
|
|
async def get_file(path: str):
|
|
full_path = get_full_path(path)
|
|
if not os.path.exists(full_path):
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
return FileResponse(full_path)
|
|
|
|
@app.post("/files")
|
|
async def create_file(data: NewFile):
|
|
full_path = get_full_path(data.name)
|
|
if os.path.exists(full_path):
|
|
raise HTTPException(status_code=400, detail="File already exists")
|
|
if data.type == "folder":
|
|
os.makedirs(full_path, exist_ok=True)
|
|
else:
|
|
open(full_path, 'a').close()
|
|
return {"result": {"name": data.name}}
|
|
|
|
@app.put("/files/{id:path}")
|
|
async def update_file(id: str, data: FileUpdate):
|
|
full_path = get_full_path(id)
|
|
if not os.path.exists(full_path):
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
if data.operation == "rename":
|
|
if not data.name:
|
|
raise HTTPException(status_code=400, detail="'name' parameter must be provided")
|
|
new_path = get_full_path(data.name)
|
|
os.rename(full_path, new_path)
|
|
return {"result": {"id": new_path, "name": data.name}}
|
|
else:
|
|
raise HTTPException(status_code=400, detail="Operation is not supported")
|
|
|
|
@app.delete("/files/{id:path}")
|
|
async def delete_file(id: str):
|
|
full_path = get_full_path(id)
|
|
if not os.path.exists(full_path):
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
if os.path.isdir(full_path):
|
|
shutil.rmtree(full_path)
|
|
else:
|
|
os.remove(full_path)
|
|
return JSONResponse(status_code=status.HTTP_204_NO_CONTENT)
|
|
|
|
@app.post("/upload")
|
|
async def upload_file(file: UploadFile = File(...), id: Optional[str] = Query("")):
|
|
directory = get_full_path(id)
|
|
if not os.path.exists(directory):
|
|
raise HTTPException(status_code=404, detail="Directory not found")
|
|
file_path = os.path.join(directory, file.filename)
|
|
with open(file_path, "wb") as buffer:
|
|
shutil.copyfileobj(file.file, buffer)
|
|
return {"result": {"name": file.filename}}
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=3200) |