Files
herolib/research/openrpc/dense_md_to_openrpc.py
despiegk cb125e8114 ...
Co-authored-by: Omdanii <mahmmoud.hassanein@gmail.com>
2025-09-07 15:15:41 +04:00

164 lines
5.8 KiB
Python

import re
import yaml
import json
import requests
from collections import defaultdict
from jsonschema import validate, Draft7Validator
# --- Load OpenRPC meta-schema ---
def load_openrpc_schema():
url = "https://open-rpc.github.io/meta-schema.json"
resp = requests.get(url)
resp.raise_for_status()
return resp.json()
OPENRPC_SCHEMA = load_openrpc_schema()
def parse_dense_markdown(md_text: str):
methods = []
schemas = {}
# Split into sections
sections = re.split(r"^# ", md_text, flags=re.M)
secmap = {}
for sec in sections:
if not sec.strip():
continue
title, _, body = sec.partition("\n")
secmap[title.strip()] = body.strip()
# --- Methods ---
if "Methods" in secmap:
method_blocks = re.split(r"^## ", secmap["Methods"], flags=re.M)
for block in method_blocks:
if not block.strip():
continue
name, _, body = block.partition("\n")
method = {"name": name.strip(), "params": [], "result": {}}
lines = [l.rstrip() for l in body.splitlines() if l.strip()]
# description before **Params**
desc_lines = []
i = 0
while i < len(lines) and not lines[i].startswith("**Params**"):
desc_lines.append(lines[i])
i += 1
method["description"] = " ".join(desc_lines).strip()
# Params
if "**Params**" in lines:
pi = lines.index("**Params**") + 1
while pi < len(lines) and not lines[pi].startswith("**Result**"):
line = lines[pi].lstrip("-").strip()
if line and not line.lower().startswith("none"):
pname, _, pdesc = line.partition("(")
pname = pname.strip()
ptype = pdesc.split(",")[0].replace(")", "").strip() if pdesc else "object"
required = "required" in line
param = {
"name": pname,
"required": required,
"schema": {"type": map_type(ptype)}
}
method["params"].append(param)
pi += 1
# Result
if "**Result**" in lines:
ri = lines.index("**Result**") + 1
if ri < len(lines):
rline = lines[ri].lstrip("-").strip()
rname, _, rtype = rline.partition(":")
rname = rname.strip()
rtype = rtype.strip()
schema = parse_result_type(rtype)
method["result"] = {"name": rname, "schema": schema}
methods.append(method)
# --- Schemas ---
if "Schemas" in secmap:
schema_blocks = re.split(r"^## ", secmap["Schemas"], flags=re.M)
for block in schema_blocks:
if not block.strip():
continue
name, _, body = block.partition("\n")
lines = body.splitlines()
code_block = []
req_fields = []
in_yaml = False
for l in lines:
if l.strip().startswith("```yaml"):
in_yaml = True
continue
elif l.strip().startswith("```"):
in_yaml = False
continue
if in_yaml:
code_block.append(l)
if l.strip().startswith("*Required:"):
req_fields = [x.strip() for x in l.split(":")[1].split(",")]
if code_block:
schema_def = yaml.safe_load("\n".join(code_block))
props = {}
for fname, ftype in schema_def.items():
if isinstance(ftype, str):
base = ftype.split("#")[0].strip()
desc = ftype.split("#")[1].strip() if "#" in ftype else None
props[fname] = {"type": map_type(base)}
if desc:
props[fname]["description"] = desc
schemas[name.strip()] = {
"type": "object",
"properties": props,
"required": req_fields
}
spec = {"openrpc": "1.0.0", "methods": methods, "components": {"schemas": schemas}}
# --- Validate against OpenRPC schema ---
validator = Draft7Validator(OPENRPC_SCHEMA)
errors = sorted(validator.iter_errors(spec), key=lambda e: e.path)
if errors:
for err in errors:
print(f"❌ Validation error at {list(err.path)}: {err.message}")
raise ValueError("Generated spec is not valid OpenRPC")
else:
print("✅ Spec is valid OpenRPC")
return spec
# --- Helpers ---
def map_type(t):
mapping = {"int": "integer", "str": "string", "bool": "boolean", "object": "object"}
return mapping.get(t, t)
def parse_result_type(rtype):
if "|" in rtype:
oneofs = []
for variant in rtype.split("|"):
v = variant.strip()
if v.startswith("[") and v.endswith("]"):
oneofs.append({"type": "array", "items": {"$ref": f"#/components/schemas/{v[1:-1]}"}})
else:
oneofs.append({"$ref": f"#/components/schemas/{v}"})
return {"oneOf": oneofs}
elif rtype.startswith("[") and rtype.endswith("]"):
return {"type": "array", "items": {"type": map_type(rtype[1:-1])}}
elif rtype in ["int", "str", "bool", "object"]:
return {"type": map_type(rtype)}
else:
return {"$ref": f"#/components/schemas/{rtype}"}
# --- Example usage ---
if __name__ == "__main__":
with open("dense.md") as f:
md = f.read()
spec = parse_dense_markdown(md)
with open("openrpc.json", "w") as f:
json.dump(spec, f, indent=2)