Files
herolib_python/herolib/core/logger/search.py
2025-08-20 05:06:05 +02:00

102 lines
3.9 KiB
Python

import os
from typing import Optional, List
from herolib.core.texttools.texttools import name_fix
from herolib.data.ourtime.ourtime import OurTime, new as ourtime_new
from herolib.core.logger.model import Logger, LogItem, LogType
class SearchArgs:
def __init__(self, timestamp_from: Optional[OurTime] = None,
timestamp_to: Optional[OurTime] = None,
cat: str = "", log: str = "", logtype: Optional[LogType] = None,
maxitems: int = 10000):
self.timestamp_from = timestamp_from
self.timestamp_to = timestamp_to
self.cat = cat
self.log = log
self.logtype = logtype
self.maxitems = maxitems
def search(l: Logger, args_: SearchArgs) -> List[LogItem]:
args = args_
args.cat = name_fix(args.cat)
if len(args.cat) > 10:
raise ValueError('category cannot be longer than 10 chars')
from_time = args.timestamp_from.unix() if args.timestamp_from else 0
to_time = args.timestamp_to.unix() + 1 if args.timestamp_to else ourtime_new('2100-01-01').unix()
if from_time > to_time:
raise ValueError(f'from_time cannot be after to_time: {from_time} > {to_time}')
result: List[LogItem] = []
if not os.path.exists(l.path.path):
return []
files = sorted(os.listdir(l.path.path))
for file in files:
if not file.endswith('.log'):
continue
dayhour = file[:-4]
try:
file_time = ourtime_new(dayhour)
except ValueError:
continue
file_hour_start_unix = file_time.unix()
file_hour_end_unix = file_hour_start_unix + 3599
if file_hour_end_unix < from_time or file_hour_start_unix > to_time:
continue
try:
with open(os.path.join(l.path.path, file), 'r') as f:
content = f.read()
except FileNotFoundError:
continue
current_time = None
current_item = None
for line in content.splitlines():
if len(result) >= args.maxitems:
break
# Helper to add current_item to result if it matches criteria
def _add_item_if_matches():
nonlocal current_item
if current_item:
if from_time <= current_item.timestamp.unix() <= to_time:
if (not args.cat or args.cat == current_item.cat) and \
(not args.log or args.log.lower() in current_item.log.lower()) and \
(args.logtype is None or args.logtype == current_item.logtype):
result.append(current_item)
current_item = None # Reset after processing
if not line.strip(): # Empty line, finalize previous item
_add_item_if_matches()
continue
if not line.startswith(' ') and not line.startswith('E'): # New timestamp line
_add_item_if_matches() # Finalize previous item
try:
current_time = ourtime_new(f"{file_time.day()} {line.strip()}")
except ValueError:
current_time = None
current_item = None # Reset for new item
elif current_time:
if len(line) > 14 and line[13] == '-': # New log entry line
_add_item_if_matches() # Finalize previous item
is_error = line.startswith('E')
logtype = LogType.ERROR if is_error else LogType.STDOUT
cat = line[2:12].strip()
log_content = line[15:]
current_item = LogItem(timestamp=current_time, cat=cat, log=log_content.strip(), logtype=logtype)
elif current_item: # Continuation line
current_item.log += "\n" + (line[15:] if len(line) >15 else line)
_add_item_if_matches() # Finalize the last item in the file
return result