# agent_repo.py
# =====================================================================
# Hybrid RAG + LLM edit-plans met: veilige fallback, anti-destructie guard,
# en EXPLICIETE UITLEG per diff.
# =====================================================================
# agent_repo.py (bovenin)
from __future__ import annotations
from smart_rag import enrich_intent, expand_queries, hybrid_retrieve, assemble_context
import os, re, time, uuid, difflib, hashlib, logging, json, fnmatch
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Any
from urllib.parse import urlparse, urlunparse
import requests
import base64
from windowing_utils import approx_token_count
from starlette.concurrency import run_in_threadpool
import asyncio
from collections import defaultdict
from llm_client import _llm_call
# --- Async I/O executors (voorkom event-loop blocking) ---
from concurrent.futures import ThreadPoolExecutor
_IO_POOL = ThreadPoolExecutor(max_workers=int(os.getenv("AGENT_IO_WORKERS", "8")))
_CPU_POOL = ThreadPoolExecutor(max_workers=int(os.getenv("AGENT_CPU_WORKERS", "2")))
_CLONE_SEMA = asyncio.Semaphore(int(os.getenv("AGENT_MAX_CONCURRENT_CLONES", "2")))
BACKEND = (os.getenv("VECTOR_BACKEND") or "CHROMA").upper().strip()
#PATH_RE = re.compile(r"(?new)
# ---------------------------------------------------------
_Q = r"[\"'“”‘’`]"
_PATH_PATS = [
r"[\"“”'](resources\/[A-Za-z0-9_\/\.-]+\.blade\.php)[\"”']",
r"(resources\/[A-Za-z0-9_\/\.-]+\.blade\.php)",
r"[\"“”'](app\/[A-Za-z0-9_\/\.-]+\.php)[\"”']",
r"(app\/[A-Za-z0-9_\/\.-]+\.php)",
]
_TRANS_WRAPPERS = [
r"__\(\s*{q}(.+?){q}\s*\)".format(q=_Q),
r"@lang\(\s*{q}(.+?){q}\s*\)".format(q=_Q),
r"trans\(\s*{q}(.+?){q}\s*\)".format(q=_Q),
]
def _clean_repo_arg(x):
"""Zet lege/sentinel repo-waarden om naar None (geen filter)."""
if x is None:
return None
s = str(x).strip().lower()
return None if s in ("", "-", "none") else x
def _extract_repo_branch_from_text(txt: str) -> Tuple[Optional[str], str]:
repo_url, branch = None, "main"
m = re.search(r"\bRepo\s*:\s*(\S+)", txt, flags=re.I)
if m: repo_url = m.group(1).strip()
mb = re.search(r"\bbranch\s*:\s*([A-Za-z0-9._/-]+)", txt, flags=re.I)
if mb: branch = mb.group(1).strip()
return repo_url, branch
def _extract_explicit_paths(txt: str) -> List[str]:
out = []
for pat in _PATH_PATS:
for m in re.finditer(pat, txt):
p = m.group(1)
if p and p not in out:
out.append(p)
return out
def _extract_replace_pair(txt: str) -> Tuple[Optional[str], Optional[str]]:
# NL/EN varianten + “slimme” quotes
pats = [
rf"Vervang\s+de\s+tekst\s*{_Q}(.+?){_Q}[^.\n]*?(?:in|naar|verander(?:en)?\s+in)\s*{_Q}(.+?){_Q}",
rf"Replace(?:\s+the)?\s+text\s*{_Q}(.+?){_Q}\s*(?:to|with)\s*{_Q}(.+?){_Q}",
]
for p in pats:
m = re.search(p, txt, flags=re.I|re.S)
if m:
return m.group(1), m.group(2)
mm = re.search(r"(Vervang|Replace)[\s\S]*?"+_Q+"(.+?)"+_Q+"[\s\S]*?"+_Q+"(.+?)"+_Q, txt, flags=re.I)
if mm:
return mm.group(2), mm.group(3)
return None, None
def _looks_like_unified_diff_request(txt: str) -> bool:
if re.search(r"\bunified\s+diff\b", txt, flags=re.I): return True
if re.search(r"\b(diff|patch)\b", txt, flags=re.I) and _extract_explicit_paths(txt):
return True
return False
# zet dit dicht bij de andere module-consts
async def _call_get_git_repo(repo_url: str, branch: str):
"""
Veilig wrapper: ondersteunt zowel sync als async implementaties van _get_git_repo.
"""
if asyncio.iscoroutinefunction(_get_git_repo):
return await _get_git_repo(repo_url, branch)
# sync: draai in IO pool
return await run_io_blocking(_get_git_repo, repo_url, branch)
async def run_io_blocking(func, *args, pool=None, **kwargs):
"""Draai sync/blokkerende I/O in threadpool zodat de event-loop vrij blijft."""
loop = asyncio.get_running_loop()
executor = pool or _IO_POOL
return await loop.run_in_executor(executor, lambda: func(*args, **kwargs))
async def run_cpu_blocking(func, *args, pool=None, **kwargs):
"""Voor CPU-zwaardere taken (bv. index bouwen)."""
loop = asyncio.get_running_loop()
executor = pool or _CPU_POOL
return await loop.run_in_executor(executor, lambda: func(*args, **kwargs))
# Lazy imports
_chroma = None
_qdrant = None
_qdrant_models = None
try:
if BACKEND == "CHROMA":
import chromadb # type: ignore
_chroma = chromadb
except Exception:
_chroma = None
try:
if BACKEND == "QDRANT":
from qdrant_client import QdrantClient # type: ignore
from qdrant_client.http.models import Filter, FieldCondition, MatchValue # type: ignore
_qdrant = QdrantClient
_qdrant_models = (Filter, FieldCondition, MatchValue)
except Exception:
_qdrant = None
_qdrant_models = None
try:
from rank_bm25 import BM25Okapi
except Exception:
BM25Okapi = None
logger = logging.getLogger("agent_repo")
# ---------- Omgeving / Config ----------
GITEA_URL = os.environ.get("GITEA_URL", "http://localhost:3080").rstrip("/")
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "8bdbe18dd2ec93ecbf9cd0a8f01a6eadf9cfa87d")
GITEA_API = os.environ.get("GITEA_API", f"{GITEA_URL}/api/v1").rstrip("/")
AGENT_DEFAULT_BRANCH = os.environ.get("AGENT_DEFAULT_BRANCH", "main")
AGENT_MAX_QUESTIONS = int(os.environ.get("AGENT_MAX_QUESTIONS", "3"))
MAX_FILES_DRYRUN = int(os.environ.get("AGENT_MAX_FILES_DRYRUN", "27"))
RAG_TOPK = int(os.environ.get("AGENT_RAG_TOPK", "24")) # grotere kandidaatpool helpt de reranker
AGENT_DISCOVER_MAX_REPOS = int(os.environ.get("AGENT_DISCOVER_MAX_REPOS", "200"))
AGENT_AUTOSELECT_THRESHOLD = float(os.environ.get("AGENT_AUTOSELECT_THRESHOLD", "0.80")) # 0..1
REPO_CATALOG_MEILI_INDEX = os.environ.get("REPO_CATALOG_MEILI_INDEX", "repo-catalog")
AGENT_ENABLE_GOAL_REFINE = os.environ.get("AGENT_ENABLE_GOAL_REFINE", "1").lower() in ("1","true","yes")
AGENT_CLARIFY_THRESHOLD = float(os.environ.get("AGENT_CLARIFY_THRESHOLD", "0.6"))
# Meilisearch (optioneel)
MEILI_URL = os.environ.get("MEILI_URL", "http://localhost:7700").strip()
MEILI_KEY = os.environ.get("MEILI_KEY", "0xipOmfgi_zMgdFplSdv7L8mlx0RPMQCNxVTNJc54lQ").strip()
MEILI_INDEX_PREFIX = os.environ.get("MEILI_INDEX_PREFIX", "code").strip()
# optioneel: basic auth injectie voor HTTP clone (private repos)
GITEA_HTTP_USER = os.environ.get("GITEA_HTTP_USER", "Mistral-llm")
GITEA_HTTP_TOKEN = os.environ.get("GITEA_HTTP_TOKEN", "8bdbe18dd2ec93ecbf9cd0a8f01a6eadf9cfa87d")
# Geen destructive edits. (geen complete inhoud van files verwijderen.)
AGENT_DESTRUCTIVE_RATIO = float(os.environ.get("AGENT_DESTRUCTIVE_RATIO", "0.50"))
# Alleen relevante code/tekst-extensies (geen binaire/caches)
ALLOWED_EXTS = {
".php",".blade.php",".vue",".js",".ts",".jsx",".tsx",".css",".scss",
".html",".htm",".json",".md",".ini",".cfg",".yml",".yaml",".toml",
".py",".go",".rb",".java",".cs",".txt"
}
INTERNAL_EXCLUDE_DIRS = {
".git",".npm","node_modules","vendor","storage","dist","build",".next",
"__pycache__",".venv","venv",".mypy_cache",".pytest_cache",
"target","bin","obj","logs","cache","temp",".cache"
}
_LIST_FILES_CACHE: dict[str, tuple[float, List[str]]] = {} # path -> (ts, files)
# ---------- Injectie vanuit app.py ----------
_app = None
_get_git_repo = None
_rag_index_repo_internal = None
_rag_query_internal = None
_llm_call = None
_extract_code_block = None
_read_text_file = None
_client_ip = None
_PROFILE_EXCLUDE_DIRS: set[str] = set()
_get_chroma_collection = None
_embed_query_fn = None
_embed_documents = None
# === SMART LLM WRAPPER: budget + nette afronding + auto-continue ===
# Past binnen jouw GPU-cap (typisch 13027 tokens totale context).
# Non-invasief: behoudt hetzelfde response-shape als _llm_call.
# Harde cap van jouw Mistral-LLM docker (zoals je aangaf)
_MODEL_BUDGET = int(os.getenv("LLM_TOTAL_TOKEN_BUDGET", "42000"))
# Veiligheidsmarge voor headers/EOS/afwijkingen in token-raming
_BUDGET_SAFETY = int(os.getenv("LLM_BUDGET_SAFETY_TOKENS", "512"))
# Max aantal vervolgstappen als het net afgekapt lijkt
_MAX_AUTO_CONTINUES = int(os.getenv("LLM_MAX_AUTO_CONTINUES", "2"))
def _est_tokens(text: str) -> int:
# Ruwe schatting: ~4 chars/token (conservatief genoeg voor budgettering)
if not text: return 0
return max(1, len(text) // 4)
def _concat_messages_text(messages: list[dict]) -> str:
parts = []
for m in messages or []:
c = m.get("content")
if isinstance(c, str): parts.append(c)
return "\n".join(parts)
def _ends_neatly(s: str) -> bool:
if not s: return False
t = s.rstrip()
return t.endswith((".", "!", "?", "…", "”", "’"))
def _append_assistant_and_continue_prompt(base_messages: list[dict], prev_text: str) -> list[dict]:
"""
Bouw een minimale vervolgprompt zonder opnieuw de hele context te sturen.
Dit beperkt prompt_tokens en voorkomt dat we opnieuw de cap raken.
"""
tail_words = " ".join(prev_text.split()[-60:]) # laatste ±60 woorden als anker
cont_user = (
"Ga verder waar je stopte. Herhaal niets. "
"Vervolg direct de laatste zin met hetzelfde formaat.\n\n"
"Vorige woorden:\n" + tail_words
)
# We sturen *niet* de volledige history opnieuw; alleen een korte instructie
return [
{"role": "system", "content": "Vervolg exact en beknopt; geen herhaling van eerder gegenereerde tekst."},
{"role": "user", "content": cont_user},
]
def _merge_choice_text(resp_a: dict, resp_b: dict) -> dict:
"""
Plak de content van choices[0] aan elkaar zodat callsites één 'content' blijven lezen.
"""
a = (((resp_a or {}).get("choices") or [{}])[0].get("message") or {}).get("content","")
b = (((resp_b or {}).get("choices") or [{}])[0].get("message") or {}).get("content","")
merged = (a or "") + (b or "")
out = resp_a.copy()
if "choices" in out and out["choices"]:
out["choices"] = [{
"index": 0,
"finish_reason": "length" if (out.get("choices",[{}])[0].get("finish_reason") in (None, "length")) else out.get("choices",[{}])[0].get("finish_reason"),
"message": {"role":"assistant","content": merged}
}]
return out
# Voorbeeld: Chroma client/init – vervang door jouw eigen client
# from chromadb import Client
# chroma = Client(...)
def _build_where_filter(repo: Optional[str], path_contains: Optional[str], profile: Optional[str]) -> Dict[str, Any]:
"""
Bouw een simpele metadata-filter voor de vector-DB. Pas aan naar jouw DB.
"""
where: Dict[str, Any] = {}
if repo:
where["repo"] = repo
if profile:
where["profile"] = profile
if path_contains:
# Als je DB geen 'contains' ondersteunt: filter achteraf (post-filter)
where["path_contains"] = path_contains
return where
def _to_distance_from_similarity(x: Optional[float]) -> float:
"""
Converteer een 'similarity' (1=identiek, 0=ver weg) naar distance (lager = beter).
"""
if x is None:
return 1.0
try:
xv = float(x)
except Exception:
return 1.0
# Veiligheids-net: clamp
if xv > 1.0 or xv < 0.0:
# Sommige backends geven cosine distance al (0=identiek). Als >1, treat as distance passthrough.
return max(0.0, xv)
# Standaard: cosine similarity → distance
return 1.0 - xv
def _post_filter_path_contains(items: List[Dict[str,Any]], path_contains: Optional[str]) -> List[Dict[str,Any]]:
if not path_contains:
return items
key = (path_contains or "").lower()
out = []
for it in items:
p = ((it.get("metadata") or {}).get("path") or "").lower()
if key in p:
out.append(it)
return out
def _chroma_query(collection_name: str, query: str, n_results: int, where: Dict[str,Any]) -> Dict[str,Any]:
global _chroma
if _chroma is None:
raise RuntimeError("Chroma backend niet beschikbaar (module niet geïnstalleerd).")
# Gebruik dezelfde collection-factory als de indexer, zodat versie/suffix consistent is
if _get_chroma_collection is None:
client = _chroma.Client()
coll = client.get_or_create_collection(collection_name)
else:
coll = _get_chroma_collection(collection_name)
# Chroma: use 'where' only for exact fields (repo/profile)
where_exact = {k:v for k,v in where.items() if k in ("repo","profile")}
qr = coll.query(
query_texts=[query],
n_results=max(1, n_results),
where=where_exact,
include=["documents","metadatas","distances"]
)
docs = qr.get("documents", [[]])[0] or []
metas = qr.get("metadatas", [[]])[0] or []
dists = qr.get("distances", [[]])[0] or []
# Chroma 'distances': lager = beter (ok)
items: List[Dict[str,Any]] = []
for doc, meta, dist in zip(docs, metas, dists):
items.append({
"document": doc,
"metadata": {
"repo": meta.get("repo",""),
"path": meta.get("path",""),
"chunk_index": meta.get("chunk_index", 0),
"symbols": meta.get("symbols", []),
"profile": meta.get("profile",""),
},
"distance": float(dist) if dist is not None else 1.0,
})
return {"results": items}
def _qdrant_query(collection_name: str, query: str, n_results: int, where: Dict[str,Any]) -> Dict[str,Any]:
global _qdrant, _qdrant_models
if _qdrant is None or _qdrant_models is None:
raise RuntimeError("Qdrant backend niet beschikbaar (module niet geïnstalleerd).")
Filter, FieldCondition, MatchValue = _qdrant_models
# Let op: je hebt hier *ook* een embedder nodig (client-side). In dit skeleton verwachten we dat
# je server-side search by text hebt geconfigureerd. Anders: voeg hier je embedder toe.
client = _qdrant(host=os.getenv("QDRANT_HOST","localhost"), port=int(os.getenv("QDRANT_PORT","6333")))
# Eenvoudig: text search (als ingeschakeld). Anders: raise en laat de mock fallback pakken.
try:
must: List[Any] = []
if where.get("repo"):
must.append(FieldCondition(key="repo", match=MatchValue(value=where["repo"])))
if where.get("profile"):
must.append(FieldCondition(key="profile", match=MatchValue(value=where["profile"])))
flt = Filter(must=must) if must else None
# NB: Qdrant 'score' is vaak cosine similarity (hoog=goed). Converteer naar distance.
res = client.search(
collection_name=collection_name,
query=query,
limit=max(1, n_results),
query_filter=flt,
with_payload=True,
)
except Exception as e:
raise RuntimeError(f"Qdrant text search niet geconfigureerd: {e}")
items: List[Dict[str,Any]] = []
for p in res:
meta = (p.payload or {})
sim = getattr(p, "score", None)
items.append({
"document": meta.get("document",""),
"metadata": {
"repo": meta.get("repo",""),
"path": meta.get("path",""),
"chunk_index": meta.get("chunk_index", 0),
"symbols": meta.get("symbols", []),
"profile": meta.get("profile",""),
},
"distance": _to_distance_from_similarity(sim),
})
return {"results": items}
async def rag_query_internal_fn(
*, query: str, n_results: int, collection_name: str,
repo: Optional[str], path_contains: Optional[str], profile: Optional[str]
) -> Dict[str, Any]:
"""
Adapter die zoekt in je vector-DB en *exact* het verwachte formaat teruggeeft:
{
"results": [
{"document": str, "metadata": {...}, "distance": float}
]
}
"""
# 1) Haal collectie op (pas aan naar jouw client)
# coll = chroma.get_or_create_collection(collection_name)
# 2) Bouw where/filter (optioneel afhankelijk van jouw DB)
where = _build_where_filter(repo, path_contains, profile)
# ?2?) Router naar backend
try:
if BACKEND == "CHROMA":
res = _chroma_query(collection_name, query, n_results, where)
elif BACKEND == "QDRANT":
res = _qdrant_query(collection_name, query, n_results, where)
else:
raise RuntimeError(f"Onbekende VECTOR_BACKEND={BACKEND}")
except Exception as e:
# Mock fallback zodat je app bruikbaar blijft
qr = {
"documents": [["(mock) no DB connected"]],
"metadatas": [[{"repo": repo or "", "path": "README.md", "chunk_index": 0, "symbols": []}]],
"distances": [[0.99]],
}
docs = qr.get("documents", [[]])[0] or []
metas = qr.get("metadatas", [[]])[0] or []
dists = qr.get("distances", [[]])[0] or []
items: List[Dict[str, Any]] = []
for doc, meta, dist in zip(docs, metas, dists):
# Post-filter op path_contains als je DB dat niet ondersteunt
if path_contains:
p = (meta.get("path") or "").lower()
if (path_contains or "").lower() not in p:
continue
items.append({
"document": doc,
"metadata": {
"repo": meta.get("repo",""),
"path": meta.get("path",""),
"chunk_index": meta.get("chunk_index", 0),
"symbols": meta.get("symbols", []),
"profile": meta.get("profile",""),
},
"distance": float(dist) if dist is not None else 1.0,
})
res = {"results": items[:max(1, n_results)]}
# 3) Post-filter path_contains (indien nodig)
res["results"] = _post_filter_path_contains(res.get("results", []), path_contains)
# 4) Trim
res["results"] = res.get("results", [])[:max(1, n_results)]
return res
async def _smart_llm_call_base(
llm_call_fn,
messages: list[dict],
*,
stop: list[str] | None = None,
max_tokens: int | None = None,
temperature: float = 0.2,
top_p: float = 0.9,
stream: bool = False,
**kwargs
):
"""
1) Dwing max_tokens af binnen totale budget (prompt + output ≤ cap).
2) Voeg milde stop-sequenties toe voor nette afronding.
3) Auto-continue als het lijkt afgekapt en we ruimte willen voor een vervolg.
"""
# 1) Budget berekenen op basis van huidige prompt omvang
prompt_text = _concat_messages_text(messages)
prompt_tokens = _est_tokens(prompt_text)
room = max(128, _MODEL_BUDGET - prompt_tokens - _BUDGET_SAFETY)
eff_max_tokens = max(1, min(int(max_tokens or 900), room))
# 2) Stop-sequenties (mild, niet beperkend voor code)
default_stops = ["\n\n", "###"]
stops = list(dict.fromkeys((stop or []) + default_stops))
# eerste call
try:
resp = await llm_call_fn(
messages,
stream=stream,
temperature=temperature,
top_p=top_p,
max_tokens=eff_max_tokens,
stop=stops,
**kwargs
)
except TypeError as e:
# backend accepteert geen 'stop' → probeer opnieuw zonder stop
resp = await llm_call_fn(
messages,
stream=stream,
temperature=temperature,
top_p=top_p,
max_tokens=eff_max_tokens,
**kwargs
)
text = (((resp or {}).get("choices") or [{}])[0].get("message") or {}).get("content","")
# Heuristiek: bijna vol + niet netjes eindigen → waarschijnlijk afgekapt
near_cap = (_est_tokens(text) >= int(0.92 * eff_max_tokens))
needs_more = (near_cap and not _ends_neatly(text))
continues = 0
merged = resp
while needs_more and continues < _MAX_AUTO_CONTINUES:
continues += 1
cont_msgs = _append_assistant_and_continue_prompt(messages, text)
# Herbereken budget voor vervolg (nieuwe prompt is veel kleiner)
cont_prompt_tokens = _est_tokens(_concat_messages_text(cont_msgs))
cont_room = max(128, _MODEL_BUDGET - cont_prompt_tokens - _BUDGET_SAFETY)
cont_max = max(1, min(int(max_tokens or 900), cont_room))
try:
cont_resp = await llm_call_fn(
cont_msgs,
stream=False,
temperature=temperature,
top_p=top_p,
max_tokens=cont_max,
stop=stops,
**kwargs
)
except TypeError:
cont_resp = await llm_call_fn(
cont_msgs,
stream=False,
temperature=temperature,
top_p=top_p,
max_tokens=cont_max,
**kwargs
)
merged = _merge_choice_text(merged, cont_resp)
text = (((merged or {}).get("choices") or [{}])[0].get("message") or {}).get("content","")
near_cap = (_est_tokens(text.split()[-800:]) >= int(0.9 * cont_max)) # check op laatst stuk
needs_more = (near_cap and not _ends_neatly(text))
return merged
def initialize_agent(*, app, get_git_repo_fn, rag_index_repo_internal_fn, rag_query_internal_fn,
llm_call_fn, extract_code_block_fn, read_text_file_fn, client_ip_fn,
profile_exclude_dirs, chroma_get_collection_fn, embed_query_fn, embed_documents_fn,
search_candidates_fn=None, repo_summary_get_fn=None, meili_search_fn=None):
global DEF_INJECTS
DEF_INJECTS.update({
"app": app,
"get_git_repo_fn": get_git_repo_fn,
"rag_index_repo_internal_fn": rag_index_repo_internal_fn,
"rag_query_internal_fn": rag_query_internal_fn,
"llm_call_fn": llm_call_fn,
"extract_code_block_fn": extract_code_block_fn,
"read_text_file_fn": read_text_file_fn,
"client_ip_fn": client_ip_fn,
"profile_exclude_dirs": profile_exclude_dirs,
"chroma_get_collection_fn": chroma_get_collection_fn,
"embed_query_fn": embed_query_fn,
"embed_documents_fn": embed_documents_fn,
})
global _search_candidates_fn, _repo_summary_get_fn, _meili_search_fn
_search_candidates_fn = search_candidates_fn
_repo_summary_get_fn = repo_summary_get_fn
_meili_search_fn = meili_search_fn
global _get_chroma_collection, _embed_query_fn
global _app, _get_git_repo, _rag_index_repo_internal, _rag_query_internal, _llm_call
global _extract_code_block, _read_text_file, _client_ip, _PROFILE_EXCLUDE_DIRS
_app = app
_get_git_repo = get_git_repo_fn
_rag_index_repo_internal = rag_index_repo_internal_fn
_rag_query_internal = rag_query_internal_fn
# Bewaar de originele en wrap met budget + auto-continue
_llm_call_original = llm_call_fn
async def _wrapped_llm_call(messages, **kwargs):
return await _smart_llm_call_base(_llm_call_original, messages, **kwargs)
globals()["_llm_call"] = _wrapped_llm_call
_extract_code_block = extract_code_block_fn
_read_text_file = read_text_file_fn
_client_ip = client_ip_fn
_PROFILE_EXCLUDE_DIRS = set(profile_exclude_dirs) | INTERNAL_EXCLUDE_DIRS
_get_chroma_collection = chroma_get_collection_fn
_embed_query_fn = embed_query_fn
_embed_documents = embed_documents_fn
if not hasattr(_app.state, "AGENT_SESSIONS"):
_app.state.AGENT_SESSIONS: Dict[str, AgentState] = {}
logger.info("INFO:agent_repo:init GITEA_URL=%s GITEA_API=%s MEILI_URL=%s", GITEA_URL, GITEA_API, MEILI_URL or "-")
# ---------- Helpers ----------
def extract_explicit_paths(text: str) -> List[str]:
"""
Robuuste extractor:
- negeert urls (http/https)
- vereist minstens één '/' en een extensie
- dedupe, behoud originele volgorde
"""
if not text:
return []
# normaliseer “slimme” quotes naar gewone quotes (kan later handig zijn)
t = (text or "").replace("“","\"").replace("”","\"").replace("’","'").replace("\\","/").strip()
cands = PATH_RE.findall(t)
seen = set()
out: List[str] = []
for p in cands:
if p not in seen:
seen.add(p)
out.append(p)
logger.info("EXPLICIT PATHS parsed: %s", out) # <— log
return out
async def _llm_recovery_plan(user_goal: str, observed_candidates: list[str], last_reason: str = "") -> dict:
"""
Vraag de LLM om gerichte herstel-zoekpatronen en trefwoorden wanneer we 'geen voorstel' kregen.
Output JSON: { "patterns":[{"glob"| "regex": str},...], "keywords":[str,...], "note": str }
"""
sys = ("Return ONLY compact JSON. Schema:\n"
"{\"patterns\":[{\"glob\":str}|{\"regex\":str},...],\"keywords\":[str,...],\"note\":str}\n"
"Prefer Laravel-centric paths (resources/views/**.blade.php, routes/*.php, app/Http/Controllers/**.php, "
"config/*.php, .env, database/migrations/**.php). Max 12 patterns, 8 keywords.")
usr = (f"User goal:\n{user_goal}\n\n"
f"Candidates we tried (may be irrelevant):\n{json.dumps(observed_candidates[-12:], ensure_ascii=False)}\n\n"
f"Failure reason (if any): {last_reason or '(none)'}\n"
"Propose minimal extra patterns/keywords to find the exact files.")
try:
resp = await _llm_call(
[{"role":"system","content":sys},{"role":"user","content":usr}],
stream=False, temperature=0.0, top_p=1.0, max_tokens=280
)
raw = (resp.get("choices",[{}])[0].get("message",{}) or {}).get("content","")
m = re.search(r"\{[\s\S]*\}", raw or "")
obj = json.loads(m.group(0)) if m else {}
except Exception:
obj = {}
# sanitize
pats = []
for it in (obj.get("patterns") or []):
if isinstance(it, dict):
if "glob" in it and isinstance(it["glob"], str) and it["glob"].strip():
pats.append({"glob": it["glob"].strip()[:200]})
elif "regex" in it and isinstance(it["regex"], str) and it["regex"].strip():
pats.append({"regex": it["regex"].strip()[:200]})
if len(pats) >= 16: break
kws = [str(x).strip()[:64] for x in (obj.get("keywords") or []) if str(x).strip()][:8]
note = str(obj.get("note",""))[:400]
return {"patterns": pats, "keywords": kws, "note": note}
def _extend_candidates_with_keywords(root: Path, all_files: list[str], keywords: list[str], cap: int = 24) -> list[str]:
"""
Deterministische keyword-scan (lichtgewicht). Gebruikt dezelfde text loader.
"""
out: list[str] = []; seen: set[str] = set()
kws = [k for k in keywords if k]
if not kws: return out
for rel in all_files:
if len(out) >= cap: break
try:
txt = _read_text_file(Path(root)/rel)
except Exception:
txt = ""
if not txt: continue
low = txt.lower()
if any(k.lower() in low for k in kws):
if rel not in seen:
seen.add(rel); out.append(rel)
return out
async def _recovery_expand_candidates(root: Path, all_files: list[str], user_goal: str,
current: list[str], *, last_reason: str = "") -> tuple[list[str], dict]:
"""
1) vraag LLM om recovery plan → patterns + keywords
2) scan deterministisch met _scan_repo_for_patterns
3) keyword-scan als tweede spoor
Retourneert (nieuwe_kandidaten_lijst, debug_info)
"""
plan = await _llm_recovery_plan(user_goal, current, last_reason=last_reason)
added: list[str] = []
# patterns → scan
if plan.get("patterns"):
hits = _scan_repo_for_patterns(root, all_files, plan["patterns"], max_hits=int(os.getenv("LLM_RECOVERY_MAX_HITS","24")))
for h in hits:
if h not in current and h not in added:
added.append(h)
# keywords → scan
if len(added) < int(os.getenv("LLM_RECOVERY_MAX_HITS","24")) and plan.get("keywords"):
khits = _extend_candidates_with_keywords(root, all_files, plan["keywords"],
cap=int(os.getenv("LLM_RECOVERY_MAX_HITS","24")) - len(added))
for h in khits:
if h not in current and h not in added:
added.append(h)
new_list = (current + added)[:MAX_FILES_DRYRUN]
debug = {"recovery_plan": plan, "added": added[:12]}
return new_list, debug
def _scan_repo_for_patterns(root: Path, all_files: list[str], patterns: list[dict], max_hits: int = 40) -> list[str]:
"""
patterns: [{"glob": "resources/views/**.blade.php"}, {"regex": "Truebeam\\s*foutcode"}, ...]
Retourneert unieke bestands-paden met 1+ hits. Deterministisch (geen LLM).
"""
hits: list[str] = []
seen: set[str] = set()
def _match_glob(pat: str) -> list[str]:
try:
pat = pat.strip().lstrip("./")
return [f for f in all_files if fnmatch.fnmatch(f, pat)]
except Exception:
return []
for spec in patterns or []:
if len(hits) >= max_hits: break
if "glob" in spec and isinstance(spec["glob"], str):
for f in _match_glob(spec["glob"]):
if f not in seen:
seen.add(f); hits.append(f)
if len(hits) >= max_hits: break
elif "regex" in spec and isinstance(spec["regex"], str):
try:
rx = re.compile(spec["regex"], re.I|re.M)
except Exception:
continue
for f in all_files:
if f in seen: continue
try:
txt = _read_text_file(Path(root)/f)
if rx.search(txt or ""):
seen.add(f); hits.append(f)
if len(hits) >= max_hits: break
except Exception:
continue
return hits
async def _llm_make_search_specs(user_goal: str, framework: str = "laravel") -> list[dict]:
"""
LLM bedenkt globs/regexen. Output ONLY JSON: {patterns:[{glob|regex: str},...]}
We voeren daarna een deterministische scan uit met _scan_repo_for_patterns.
"""
if not (user_goal or "").strip():
return []
sys = ("Return ONLY JSON matching: {\"patterns\":[{\"glob\":str}|{\"regex\":str}, ...]}\n"
"For Laravel, prefer globs like resources/views/**.blade.php, routes/*.php, app/Http/Controllers/**.php, "
"config/*.php, .env, database/migrations/**.php. Keep regexes simple and safe.")
usr = f"Framework: {framework}\nUser goal:\n{user_goal}\nReturn ≤ 12 items."
try:
resp = await _llm_call(
[{"role":"system","content":sys},{"role":"user","content":usr}],
stream=False, temperature=0.0, top_p=1.0, max_tokens=280
)
raw = (resp.get('choices',[{}])[0].get('message',{}) or {}).get('content','')
m = re.search(r"\{[\s\S]*\}", raw or "")
obj = json.loads(m.group(0)) if m else {}
arr = obj.get("patterns") or []
out = []
for it in arr:
if isinstance(it, dict):
if "glob" in it and isinstance(it["glob"], str) and it["glob"].strip():
out.append({"glob": it["glob"].strip()[:200]})
elif "regex" in it and isinstance(it["regex"], str) and it["regex"].strip():
out.append({"regex": it["regex"].strip()[:200]})
if len(out) >= 16: break
return out
except Exception:
return []
def _with_preview(text: str, st: "AgentState", *, limit: int = 1200, header: str = "--- SMART-RAG quick scan (preview) ---") -> str:
"""Plak een compacte SMART-RAG preview onderaan het antwoord, als die er is."""
sp = getattr(st, "smart_preview", "") or ""
sp = sp.strip()
if not sp:
return text
if limit > 0 and len(sp) > limit:
sp = sp[:limit].rstrip() + "\n…"
return text + "\n\n" + header + "\n" + sp
def _now() -> int:
return int(time.time())
def _gitea_headers():
return {"Authorization": f"token {GITEA_TOKEN}"} if GITEA_TOKEN else {}
def add_auth_to_url(url: str, user: str | None = None, token: str | None = None) -> str:
if not url or not (user and token):
return url
u = urlparse(url)
if u.scheme not in ("http", "https") or "@" in u.netloc:
return url
netloc = f"{user}:{token}@{u.netloc}"
return urlunparse((u.scheme, netloc, u.path, u.params, u.query, u.fragment))
def ensure_git_suffix(url: str) -> str:
try:
u = urlparse(url)
if not u.path.endswith(".git") and "/api/" not in u.path:
return urlunparse((u.scheme, u.netloc, u.path.rstrip("/") + ".git", u.params, u.query, u.fragment))
return url
except Exception:
return url
def parse_owner_repo(hint: str) -> tuple[str | None, str | None]:
m = re.match(r"^([A-Za-z0-9_.\-]+)/([A-Za-z0-9_.\-]+)$", (hint or "").strip())
if not m:
return None, None
return m.group(1), m.group(2)
def gitea_get_repo(owner: str, repo: str) -> dict | None:
try:
r = requests.get(f"{GITEA_API}/repos/{owner}/{repo}", headers=_gitea_headers(), timeout=10)
if r.status_code == 404:
return None
r.raise_for_status()
return r.json()
except Exception as e:
logger.warning("WARN:agent_repo:gitea_get_repo %s/%s failed: %s", owner, repo, e)
return None
def gitea_search_repos(q: str, limit: int = 5) -> List[dict]:
try:
r = requests.get(f"{GITEA_API}/repos/search",
params={"q": q, "limit": limit},
headers=_gitea_headers(), timeout=10)
r.raise_for_status()
data = r.json() or {}
if isinstance(data, dict) and "data" in data: return data["data"]
if isinstance(data, list): return data
if isinstance(data, dict) and "ok" in data and "data" in data: return data["data"]
return []
except Exception as e:
logger.warning("WARN:agent_repo:/repos/search failed: %s", e)
return []
def resolve_repo(hint: str) -> tuple[dict | None, str | None]:
hint = (hint or "").strip()
logger.info("INFO:agent_repo:resolve_repo hint=%s", hint)
if hint.startswith("http://") or hint.startswith("https://"):
url = add_auth_to_url(ensure_git_suffix(hint), GITEA_HTTP_USER, GITEA_HTTP_TOKEN)
owner, repo = owner_repo_from_url(url)
rd = {"full_name": f"{owner}/{repo}" if owner and repo else None, "clone_url": url}
logger.info("INFO:agent_repo:resolved direct-url %s", rd.get("full_name"))
return rd, "direct-url"
owner, repo = parse_owner_repo(hint)
if owner and repo:
meta = gitea_get_repo(owner, repo)
if meta:
url = meta.get("clone_url") or f"{GITEA_URL}/{owner}/{repo}.git"
url = add_auth_to_url(ensure_git_suffix(url), GITEA_HTTP_USER, GITEA_HTTP_TOKEN)
meta["clone_url"] = url
logger.info("INFO:agent_repo:resolved owner-repo %s", meta.get("full_name"))
return meta, "owner-repo"
url = add_auth_to_url(ensure_git_suffix(f"{GITEA_URL}/{owner}/{repo}.git"), GITEA_HTTP_USER, GITEA_HTTP_TOKEN)
rd = {"full_name": f"{owner}/{repo}", "clone_url": url}
logger.info("INFO:agent_repo:resolved owner-repo-fallback %s", rd.get("full_name"))
return rd, "owner-repo-fallback"
found = gitea_search_repos(hint, limit=5)
if found:
found[0]["clone_url"] = add_auth_to_url(ensure_git_suffix(found[0].get("clone_url") or ""), GITEA_HTTP_USER, GITEA_HTTP_TOKEN)
logger.info("INFO:agent_repo:resolved search %s", found[0].get("full_name"))
return found[0], "search"
logger.error("ERROR:agent_repo:repo not found for hint=%s", hint)
return None, "not-found"
def extract_context_hints_from_prompt(user_goal: str) -> dict:
"""
Haal dynamisch hints uit de prompt:
- tag_names: HTML/XML tags die genoemd zijn (
, ,