Files
research_meet/server/lib/livekit.py
2025-09-09 11:09:43 +04:00

36 lines
1.1 KiB
Python

import os
from livekit import api
LIVEKIT_URL = os.getenv("LIVEKIT_URL")
LIVEKIT_API_KEY = os.getenv("LIVEKIT_API_KEY")
LIVEKIT_API_SECRET = os.getenv("LIVEKIT_API_SECRET")
lkapi = api.LiveKitAPI(LIVEKIT_URL, LIVEKIT_API_KEY, LIVEKIT_API_SECRET)
def create_access_token(identity: str, name: str, metadata: str, room_name: str) -> str:
token = (
api.AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET)
.with_identity(identity)
.with_name(name)
.with_metadata(metadata)
.with_grants(
api.VideoGrants(
room_join=True,
room=room_name,
can_publish=True,
can_publish_data=True,
can_subscribe=True,
)
)
.to_jwt()
)
return token
async def create_room_if_not_exists(room_name: str):
try:
await lkapi.room.create_room(api.CreateRoomRequest(name=room_name))
except api.RoomError as e:
if "room already exists" in str(e):
pass # Room already exists, which is fine
else:
raise e