herolib_python/lib/core/heroscript/heroscripts.py
2025-08-05 15:15:36 +02:00

130 lines
5.3 KiB
Python

from pydantic import BaseModel, Field
from typing import Any, Type, TypeVar
import re
import hashlib
import json
import os
from types import List,Dict
T = TypeVar("T", bound=BaseModel)
class HeroScripts:
def __init__(self, class_types: dict, path:str = "", content:str = "", indexpath: str = ""):
self.class_types = class_types
self.heroscripts = List(HeroScript)
self.path = os.path.expanduser(path)
self.indexpath = os.path.expanduser(indexpath)
self.done = Dict[str,str] = {}
# self.done_load()
if self.path:
try:
# self.done_load()
self.load(self.path)
self.done_save()
except FileNotFoundError as e:
print(f"Directory not found: {self.path}")
print(f"Error: {str(e)}")
self.create_indexes()
self.index_objects()
if content:
blocks = extract_heroscript_blocks(content)
self.heroscripts.extend(HeroScript(block) for block in blocks)
def done_load(self):
if self.path:
done_file = os.path.join(self.path, "done.json")
if os.path.exists(done_file):
with open(done_file, "r") as f:
self.done = json.load(f)
def done_save(self):
if self.path:
done_file = os.path.join(self.path, "done.json")
with open(done_file, "w") as f:
json.dump(self.done, f)
def load(self, path):
for root, _, files in os.walk(path):
for filename in files:
print(f" - load {path}/{filename}")
path=f"{path}/{filename}"
if filename.endswith(".md"):
filepath = os.path.join(root, filename)
with open(filepath, "r") as file:
content = file.read()
md5hash = hashlib.md5(content.encode()).hexdigest()
if filepath not in self.done or self.done[filepath] != md5hash:
blocks = self.extract_heroscript_blocks(content)
self.heroscripts.extend(HeroScript(block,path) for block in blocks)
self.done[filepath] = md5hash
@staticmethod
def get_objects(self):
objects = []
for heroscript in self.heroscripts:
if heroscript.content:
try:
class_name = heroscript.content.split("\n")[0].split("!!")[1].split(".")[0].lower()
if class_name in self.class_types:
class_type = self.class_types[class_name]
try:
obj = class_type.from_heroscript(heroscript.content)
objects.append(obj)
except Exception as e:
print(f"Error parsing HeroScript: {e}")
except (IndexError, ValueError):
print(f"Invalid HeroScript format: {heroscript.content}")
return objects
def create_indexes(self):
for class_type in self.class_types.values():
schema = self.create_schema(class_type)
index_dir = os.path.join(self.indexpath, class_type.__name__.lower())
if not os.path.exists(index_dir):
os.makedirs(index_dir)
index.create_in(index_dir, schema)
def create_schema(self, class_type):
schema_fields = {"path": STORED()}
for field_name, field in class_type.__fields__.items():
json_schema_extra = getattr(field, "json_schema_extra", None)
if json_schema_extra is not None:
metadata = json_schema_extra.get("metadata", {})
if isinstance(metadata, list):
metadata = {item: True for item in metadata}
if metadata.get("unique") or metadata.get("indexed"):
if field.annotation == str :
schema_fields[field_name] = ID(stored=True, unique=metadata.get("unique", False))
elif field.annotation == int or field.annotation == float :
schema_fields[field_name] = NUMERIC(stored=True, unique=metadata.get("unique", False))
else:
schema_fields[field_name] = TEXT(stored=True,lowercase=True)
return Schema(**schema_fields)
def index_objects(self):
for heroscript in self.heroscripts:
for obj in self.get_objects():
index_dir = os.path.join(self.indexpath, type(obj).__name__.lower())
ix = index.open_dir(index_dir)
writer = ix.writer()
writer.add_document(path=heroscript.path, **{k: str(v).lower() for k, v in obj.dict().items() if k in ix.schema.names()})
writer.commit()
def search(self, class_type, query):
index_dir = os.path.join(self.indexpath, class_type.__name__.lower())
ix = index.open_dir(index_dir)
qp = QueryParser("name", schema=ix.schema)
q = qp.parse(query)
with ix.searcher() as searcher:
results = searcher.search(q)
# return results
return [result["path"] for result in results]