diff --git a/agent_repo.py b/agent_repo.py
new file mode 100644
index 0000000..5eeb047
--- /dev/null
+++ b/agent_repo.py
@@ -0,0 +1,4692 @@
+# agent_repo.py
+# =====================================================================
+# Hybrid RAG + LLM edit-plans met: veilige fallback, anti-destructie guard,
+# en EXPLICIETE UITLEG per diff.
+# =====================================================================
+# agent_repo.py (bovenin)
+
+from __future__ import annotations
+from smart_rag import enrich_intent, expand_queries, hybrid_retrieve, assemble_context
+import os, re, time, uuid, difflib, hashlib, logging, json, fnmatch
+from dataclasses import dataclass, field
+from pathlib import Path
+from typing import Dict, List, Tuple, Optional, Any
+from urllib.parse import urlparse, urlunparse
+import requests
+import base64
+from windowing_utils import approx_token_count
+from starlette.concurrency import run_in_threadpool
+import asyncio
+from collections import defaultdict
+
+
+# --- Async I/O executors (voorkom event-loop blocking) ---
+from concurrent.futures import ThreadPoolExecutor
+
+_IO_POOL = ThreadPoolExecutor(max_workers=int(os.getenv("AGENT_IO_WORKERS", "8")))
+_CPU_POOL = ThreadPoolExecutor(max_workers=int(os.getenv("AGENT_CPU_WORKERS", "2")))
+_CLONE_SEMA = asyncio.Semaphore(int(os.getenv("AGENT_MAX_CONCURRENT_CLONES", "2")))
+
+BACKEND = (os.getenv("VECTOR_BACKEND") or "CHROMA").upper().strip()
+
+#PATH_RE = re.compile(r"(? (ts, files)
+# ---------- Injectie vanuit app.py ----------
+_app = None
+_get_git_repo = None
+_rag_index_repo_internal = None
+_rag_query_internal = None
+_llm_call = None
+_extract_code_block = None
+_read_text_file = None
+_client_ip = None
+_PROFILE_EXCLUDE_DIRS: set[str] = set()
+_get_chroma_collection = None
+_embed_query_fn = None
+_embed_documents = None
+
+
+# === SMART LLM WRAPPER: budget + nette afronding + auto-continue ===
+# Past binnen jouw GPU-cap (typisch 13027 tokens totale context).
+# Non-invasief: behoudt hetzelfde response-shape als _llm_call.
+
+# Harde cap van jouw Mistral-LLM docker (zoals je aangaf)
+_MODEL_BUDGET = int(os.getenv("LLM_TOTAL_TOKEN_BUDGET", "13027"))
+# Veiligheidsmarge voor headers/EOS/afwijkingen in token-raming
+_BUDGET_SAFETY = int(os.getenv("LLM_BUDGET_SAFETY_TOKENS", "512"))
+# Max aantal vervolgstappen als het net afgekapt lijkt
+_MAX_AUTO_CONTINUES = int(os.getenv("LLM_MAX_AUTO_CONTINUES", "2"))
+
+def _est_tokens(text: str) -> int:
+ # Ruwe schatting: ~4 chars/token (conservatief genoeg voor budgettering)
+ if not text: return 0
+ return max(1, len(text) // 4)
+
+def _concat_messages_text(messages: list[dict]) -> str:
+ parts = []
+ for m in messages or []:
+ c = m.get("content")
+ if isinstance(c, str): parts.append(c)
+ return "\n".join(parts)
+
+def _ends_neatly(s: str) -> bool:
+ if not s: return False
+ t = s.rstrip()
+ return t.endswith((".", "!", "?", "…", "”", "’"))
+
+def _append_assistant_and_continue_prompt(base_messages: list[dict], prev_text: str) -> list[dict]:
+ """
+ Bouw een minimale vervolgprompt zonder opnieuw de hele context te sturen.
+ Dit beperkt prompt_tokens en voorkomt dat we opnieuw de cap raken.
+ """
+ tail_words = " ".join(prev_text.split()[-60:]) # laatste ±60 woorden als anker
+ cont_user = (
+ "Ga verder waar je stopte. Herhaal niets. "
+ "Vervolg direct de laatste zin met hetzelfde formaat.\n\n"
+ "Vorige woorden:\n" + tail_words
+ )
+ # We sturen *niet* de volledige history opnieuw; alleen een korte instructie
+ return [
+ {"role": "system", "content": "Vervolg exact en beknopt; geen herhaling van eerder gegenereerde tekst."},
+ {"role": "user", "content": cont_user},
+ ]
+
+def _merge_choice_text(resp_a: dict, resp_b: dict) -> dict:
+ """
+ Plak de content van choices[0] aan elkaar zodat callsites één 'content' blijven lezen.
+ """
+ a = (((resp_a or {}).get("choices") or [{}])[0].get("message") or {}).get("content","")
+ b = (((resp_b or {}).get("choices") or [{}])[0].get("message") or {}).get("content","")
+ merged = (a or "") + (b or "")
+ out = resp_a.copy()
+ if "choices" in out and out["choices"]:
+ out["choices"] = [{
+ "index": 0,
+ "finish_reason": "length" if (out.get("choices",[{}])[0].get("finish_reason") in (None, "length")) else out.get("choices",[{}])[0].get("finish_reason"),
+ "message": {"role":"assistant","content": merged}
+ }]
+ return out
+
+# Voorbeeld: Chroma client/init – vervang door jouw eigen client
+# from chromadb import Client
+# chroma = Client(...)
+
+def _build_where_filter(repo: Optional[str], path_contains: Optional[str], profile: Optional[str]) -> Dict[str, Any]:
+ """
+ Bouw een simpele metadata-filter voor de vector-DB. Pas aan naar jouw DB.
+ """
+ where: Dict[str, Any] = {}
+ if repo:
+ where["repo"] = repo
+ if profile:
+ where["profile"] = profile
+ if path_contains:
+ # Als je DB geen 'contains' ondersteunt: filter achteraf (post-filter)
+ where["path_contains"] = path_contains
+ return where
+
+def _to_distance_from_similarity(x: Optional[float]) -> float:
+ """
+ Converteer een 'similarity' (1=identiek, 0=ver weg) naar distance (lager = beter).
+ """
+ if x is None:
+ return 1.0
+ try:
+ xv = float(x)
+ except Exception:
+ return 1.0
+ # Veiligheids-net: clamp
+ if xv > 1.0 or xv < 0.0:
+ # Sommige backends geven cosine distance al (0=identiek). Als >1, treat as distance passthrough.
+ return max(0.0, xv)
+ # Standaard: cosine similarity → distance
+ return 1.0 - xv
+
+def _post_filter_path_contains(items: List[Dict[str,Any]], path_contains: Optional[str]) -> List[Dict[str,Any]]:
+ if not path_contains:
+ return items
+ key = (path_contains or "").lower()
+ out = []
+ for it in items:
+ p = ((it.get("metadata") or {}).get("path") or "").lower()
+ if key in p:
+ out.append(it)
+ return out
+
+def _chroma_query(collection_name: str, query: str, n_results: int, where: Dict[str,Any]) -> Dict[str,Any]:
+ global _chroma
+ if _chroma is None:
+ raise RuntimeError("Chroma backend niet beschikbaar (module niet geïnstalleerd).")
+ # Gebruik dezelfde collection-factory als de indexer, zodat versie/suffix consistent is
+ if _get_chroma_collection is None:
+ client = _chroma.Client()
+ coll = client.get_or_create_collection(collection_name)
+ else:
+ coll = _get_chroma_collection(collection_name)
+ # Chroma: use 'where' only for exact fields (repo/profile)
+ where_exact = {k:v for k,v in where.items() if k in ("repo","profile")}
+ qr = coll.query(
+ query_texts=[query],
+ n_results=max(1, n_results),
+ where=where_exact,
+ include=["documents","metadatas","distances"]
+ )
+ docs = qr.get("documents", [[]])[0] or []
+ metas = qr.get("metadatas", [[]])[0] or []
+ dists = qr.get("distances", [[]])[0] or []
+ # Chroma 'distances': lager = beter (ok)
+ items: List[Dict[str,Any]] = []
+ for doc, meta, dist in zip(docs, metas, dists):
+ items.append({
+ "document": doc,
+ "metadata": {
+ "repo": meta.get("repo",""),
+ "path": meta.get("path",""),
+ "chunk_index": meta.get("chunk_index", 0),
+ "symbols": meta.get("symbols", []),
+ "profile": meta.get("profile",""),
+ },
+ "distance": float(dist) if dist is not None else 1.0,
+ })
+ return {"results": items}
+
+def _qdrant_query(collection_name: str, query: str, n_results: int, where: Dict[str,Any]) -> Dict[str,Any]:
+ global _qdrant, _qdrant_models
+ if _qdrant is None or _qdrant_models is None:
+ raise RuntimeError("Qdrant backend niet beschikbaar (module niet geïnstalleerd).")
+ Filter, FieldCondition, MatchValue = _qdrant_models
+ # Let op: je hebt hier *ook* een embedder nodig (client-side). In dit skeleton verwachten we dat
+ # je server-side search by text hebt geconfigureerd. Anders: voeg hier je embedder toe.
+ client = _qdrant(host=os.getenv("QDRANT_HOST","localhost"), port=int(os.getenv("QDRANT_PORT","6333")))
+ # Eenvoudig: text search (als ingeschakeld). Anders: raise en laat de mock fallback pakken.
+ try:
+ must: List[Any] = []
+ if where.get("repo"):
+ must.append(FieldCondition(key="repo", match=MatchValue(value=where["repo"])))
+ if where.get("profile"):
+ must.append(FieldCondition(key="profile", match=MatchValue(value=where["profile"])))
+ flt = Filter(must=must) if must else None
+ # NB: Qdrant 'score' is vaak cosine similarity (hoog=goed). Converteer naar distance.
+ res = client.search(
+ collection_name=collection_name,
+ query=query,
+ limit=max(1, n_results),
+ query_filter=flt,
+ with_payload=True,
+ )
+ except Exception as e:
+ raise RuntimeError(f"Qdrant text search niet geconfigureerd: {e}")
+
+ items: List[Dict[str,Any]] = []
+ for p in res:
+ meta = (p.payload or {})
+ sim = getattr(p, "score", None)
+ items.append({
+ "document": meta.get("document",""),
+ "metadata": {
+ "repo": meta.get("repo",""),
+ "path": meta.get("path",""),
+ "chunk_index": meta.get("chunk_index", 0),
+ "symbols": meta.get("symbols", []),
+ "profile": meta.get("profile",""),
+ },
+ "distance": _to_distance_from_similarity(sim),
+ })
+ return {"results": items}
+
+async def rag_query_internal_fn(
+ *, query: str, n_results: int, collection_name: str,
+ repo: Optional[str], path_contains: Optional[str], profile: Optional[str]
+) -> Dict[str, Any]:
+ """
+ Adapter die zoekt in je vector-DB en *exact* het verwachte formaat teruggeeft:
+ {
+ "results": [
+ {"document": str, "metadata": {...}, "distance": float}
+ ]
+ }
+ """
+ # 1) Haal collectie op (pas aan naar jouw client)
+ # coll = chroma.get_or_create_collection(collection_name)
+
+ # 2) Bouw where/filter (optioneel afhankelijk van jouw DB)
+ where = _build_where_filter(repo, path_contains, profile)
+
+ # ?2?) Router naar backend
+ try:
+ if BACKEND == "CHROMA":
+ res = _chroma_query(collection_name, query, n_results, where)
+ elif BACKEND == "QDRANT":
+ res = _qdrant_query(collection_name, query, n_results, where)
+ else:
+ raise RuntimeError(f"Onbekende VECTOR_BACKEND={BACKEND}")
+
+ except Exception as e:
+ # Mock fallback zodat je app bruikbaar blijft
+ qr = {
+ "documents": [["(mock) no DB connected"]],
+ "metadatas": [[{"repo": repo or "", "path": "README.md", "chunk_index": 0, "symbols": []}]],
+ "distances": [[0.99]],
+ }
+ docs = qr.get("documents", [[]])[0] or []
+ metas = qr.get("metadatas", [[]])[0] or []
+ dists = qr.get("distances", [[]])[0] or []
+
+ items: List[Dict[str, Any]] = []
+ for doc, meta, dist in zip(docs, metas, dists):
+ # Post-filter op path_contains als je DB dat niet ondersteunt
+ if path_contains:
+ p = (meta.get("path") or "").lower()
+ if (path_contains or "").lower() not in p:
+ continue
+ items.append({
+ "document": doc,
+ "metadata": {
+ "repo": meta.get("repo",""),
+ "path": meta.get("path",""),
+ "chunk_index": meta.get("chunk_index", 0),
+ "symbols": meta.get("symbols", []),
+ "profile": meta.get("profile",""),
+ },
+ "distance": float(dist) if dist is not None else 1.0,
+ })
+ res = {"results": items[:max(1, n_results)]}
+ # 3) Post-filter path_contains (indien nodig)
+ res["results"] = _post_filter_path_contains(res.get("results", []), path_contains)
+ # 4) Trim
+ res["results"] = res.get("results", [])[:max(1, n_results)]
+ return res
+
+async def _smart_llm_call_base(
+ llm_call_fn,
+ messages: list[dict],
+ *,
+ stop: list[str] | None = None,
+ max_tokens: int | None = None,
+ temperature: float = 0.2,
+ top_p: float = 0.9,
+ stream: bool = False,
+ **kwargs
+):
+ """
+ 1) Dwing max_tokens af binnen totale budget (prompt + output ≤ cap).
+ 2) Voeg milde stop-sequenties toe voor nette afronding.
+ 3) Auto-continue als het lijkt afgekapt en we ruimte willen voor een vervolg.
+ """
+ # 1) Budget berekenen op basis van huidige prompt omvang
+ prompt_text = _concat_messages_text(messages)
+ prompt_tokens = _est_tokens(prompt_text)
+ room = max(128, _MODEL_BUDGET - prompt_tokens - _BUDGET_SAFETY)
+ eff_max_tokens = max(1, min(int(max_tokens or 900), room))
+
+ # 2) Stop-sequenties (mild, niet beperkend voor code)
+ default_stops = ["\n\n", "###"]
+ stops = list(dict.fromkeys((stop or []) + default_stops))
+
+ # eerste call
+ try:
+ resp = await llm_call_fn(
+ messages,
+ stream=stream,
+ temperature=temperature,
+ top_p=top_p,
+ max_tokens=eff_max_tokens,
+ stop=stops,
+ **kwargs
+ )
+ except TypeError as e:
+ # backend accepteert geen 'stop' → probeer opnieuw zonder stop
+ resp = await llm_call_fn(
+ messages,
+ stream=stream,
+ temperature=temperature,
+ top_p=top_p,
+ max_tokens=eff_max_tokens,
+ **kwargs
+ )
+ text = (((resp or {}).get("choices") or [{}])[0].get("message") or {}).get("content","")
+ # Heuristiek: bijna vol + niet netjes eindigen → waarschijnlijk afgekapt
+ near_cap = (_est_tokens(text) >= int(0.92 * eff_max_tokens))
+ needs_more = (near_cap and not _ends_neatly(text))
+
+ continues = 0
+ merged = resp
+ while needs_more and continues < _MAX_AUTO_CONTINUES:
+ continues += 1
+ cont_msgs = _append_assistant_and_continue_prompt(messages, text)
+ # Herbereken budget voor vervolg (nieuwe prompt is veel kleiner)
+ cont_prompt_tokens = _est_tokens(_concat_messages_text(cont_msgs))
+ cont_room = max(128, _MODEL_BUDGET - cont_prompt_tokens - _BUDGET_SAFETY)
+ cont_max = max(1, min(int(max_tokens or 900), cont_room))
+ try:
+ cont_resp = await llm_call_fn(
+ cont_msgs,
+ stream=False,
+ temperature=temperature,
+ top_p=top_p,
+ max_tokens=cont_max,
+ stop=stops,
+ **kwargs
+ )
+ except TypeError:
+ cont_resp = await llm_call_fn(
+ cont_msgs,
+ stream=False,
+ temperature=temperature,
+ top_p=top_p,
+ max_tokens=cont_max,
+ **kwargs
+ )
+ merged = _merge_choice_text(merged, cont_resp)
+ text = (((merged or {}).get("choices") or [{}])[0].get("message") or {}).get("content","")
+ near_cap = (_est_tokens(text.split()[-800:]) >= int(0.9 * cont_max)) # check op laatst stuk
+ needs_more = (near_cap and not _ends_neatly(text))
+
+ return merged
+
+def initialize_agent(*, app, get_git_repo_fn, rag_index_repo_internal_fn, rag_query_internal_fn,
+ llm_call_fn, extract_code_block_fn, read_text_file_fn, client_ip_fn,
+ profile_exclude_dirs, chroma_get_collection_fn, embed_query_fn, embed_documents_fn,
+ search_candidates_fn=None, repo_summary_get_fn=None, meili_search_fn=None):
+ global DEF_INJECTS
+ DEF_INJECTS.update({
+ "app": app,
+ "get_git_repo_fn": get_git_repo_fn,
+ "rag_index_repo_internal_fn": rag_index_repo_internal_fn,
+ "rag_query_internal_fn": rag_query_internal_fn,
+ "llm_call_fn": llm_call_fn,
+ "extract_code_block_fn": extract_code_block_fn,
+ "read_text_file_fn": read_text_file_fn,
+ "client_ip_fn": client_ip_fn,
+ "profile_exclude_dirs": profile_exclude_dirs,
+ "chroma_get_collection_fn": chroma_get_collection_fn,
+ "embed_query_fn": embed_query_fn,
+ "embed_documents_fn": embed_documents_fn,
+ })
+ global _search_candidates_fn, _repo_summary_get_fn, _meili_search_fn
+ _search_candidates_fn = search_candidates_fn
+ _repo_summary_get_fn = repo_summary_get_fn
+ _meili_search_fn = meili_search_fn
+ global _get_chroma_collection, _embed_query_fn
+ global _app, _get_git_repo, _rag_index_repo_internal, _rag_query_internal, _llm_call
+ global _extract_code_block, _read_text_file, _client_ip, _PROFILE_EXCLUDE_DIRS
+ _app = app
+ _get_git_repo = get_git_repo_fn
+ _rag_index_repo_internal = rag_index_repo_internal_fn
+ _rag_query_internal = rag_query_internal_fn
+ # Bewaar de originele en wrap met budget + auto-continue
+ _llm_call_original = llm_call_fn
+ async def _wrapped_llm_call(messages, **kwargs):
+ return await _smart_llm_call_base(_llm_call_original, messages, **kwargs)
+ globals()["_llm_call"] = _wrapped_llm_call
+ _extract_code_block = extract_code_block_fn
+ _read_text_file = read_text_file_fn
+ _client_ip = client_ip_fn
+ _PROFILE_EXCLUDE_DIRS = set(profile_exclude_dirs) | INTERNAL_EXCLUDE_DIRS
+ _get_chroma_collection = chroma_get_collection_fn
+ _embed_query_fn = embed_query_fn
+ _embed_documents = embed_documents_fn
+ if not hasattr(_app.state, "AGENT_SESSIONS"):
+ _app.state.AGENT_SESSIONS: Dict[str, AgentState] = {}
+ logger.info("INFO:agent_repo:init GITEA_URL=%s GITEA_API=%s MEILI_URL=%s", GITEA_URL, GITEA_API, MEILI_URL or "-")
+
+# ---------- Helpers ----------
+def extract_explicit_paths(text: str) -> List[str]:
+ """
+ Robuuste extractor:
+ - negeert urls (http/https)
+ - vereist minstens één '/' en een extensie
+ - dedupe, behoud originele volgorde
+ """
+ if not text:
+ return []
+ # normaliseer “slimme” quotes naar gewone quotes (kan later handig zijn)
+ t = (text or "").replace("“","\"").replace("”","\"").replace("’","'").replace("\\","/").strip()
+ cands = PATH_RE.findall(t)
+ seen = set()
+ out: List[str] = []
+ for p in cands:
+ if p not in seen:
+ seen.add(p)
+ out.append(p)
+ logger.info("EXPLICIT PATHS parsed: %s", out) # <— log
+ return out
+
+async def _llm_recovery_plan(user_goal: str, observed_candidates: list[str], last_reason: str = "") -> dict:
+ """
+ Vraag de LLM om gerichte herstel-zoekpatronen en trefwoorden wanneer we 'geen voorstel' kregen.
+ Output JSON: { "patterns":[{"glob"| "regex": str},...], "keywords":[str,...], "note": str }
+ """
+ sys = ("Return ONLY compact JSON. Schema:\n"
+ "{\"patterns\":[{\"glob\":str}|{\"regex\":str},...],\"keywords\":[str,...],\"note\":str}\n"
+ "Prefer Laravel-centric paths (resources/views/**.blade.php, routes/*.php, app/Http/Controllers/**.php, "
+ "config/*.php, .env, database/migrations/**.php). Max 12 patterns, 8 keywords.")
+ usr = (f"User goal:\n{user_goal}\n\n"
+ f"Candidates we tried (may be irrelevant):\n{json.dumps(observed_candidates[-12:], ensure_ascii=False)}\n\n"
+ f"Failure reason (if any): {last_reason or '(none)'}\n"
+ "Propose minimal extra patterns/keywords to find the exact files.")
+ try:
+ resp = await _llm_call(
+ [{"role":"system","content":sys},{"role":"user","content":usr}],
+ stream=False, temperature=0.0, top_p=1.0, max_tokens=280
+ )
+ raw = (resp.get("choices",[{}])[0].get("message",{}) or {}).get("content","")
+ m = re.search(r"\{[\s\S]*\}", raw or "")
+ obj = json.loads(m.group(0)) if m else {}
+ except Exception:
+ obj = {}
+ # sanitize
+ pats = []
+ for it in (obj.get("patterns") or []):
+ if isinstance(it, dict):
+ if "glob" in it and isinstance(it["glob"], str) and it["glob"].strip():
+ pats.append({"glob": it["glob"].strip()[:200]})
+ elif "regex" in it and isinstance(it["regex"], str) and it["regex"].strip():
+ pats.append({"regex": it["regex"].strip()[:200]})
+ if len(pats) >= 16: break
+ kws = [str(x).strip()[:64] for x in (obj.get("keywords") or []) if str(x).strip()][:8]
+ note = str(obj.get("note",""))[:400]
+ return {"patterns": pats, "keywords": kws, "note": note}
+
+def _extend_candidates_with_keywords(root: Path, all_files: list[str], keywords: list[str], cap: int = 24) -> list[str]:
+ """
+ Deterministische keyword-scan (lichtgewicht). Gebruikt dezelfde text loader.
+ """
+ out: list[str] = []; seen: set[str] = set()
+ kws = [k for k in keywords if k]
+ if not kws: return out
+ for rel in all_files:
+ if len(out) >= cap: break
+ try:
+ txt = _read_text_file(Path(root)/rel)
+ except Exception:
+ txt = ""
+ if not txt: continue
+ low = txt.lower()
+ if any(k.lower() in low for k in kws):
+ if rel not in seen:
+ seen.add(rel); out.append(rel)
+ return out
+
+async def _recovery_expand_candidates(root: Path, all_files: list[str], user_goal: str,
+ current: list[str], *, last_reason: str = "") -> tuple[list[str], dict]:
+ """
+ 1) vraag LLM om recovery plan → patterns + keywords
+ 2) scan deterministisch met _scan_repo_for_patterns
+ 3) keyword-scan als tweede spoor
+ Retourneert (nieuwe_kandidaten_lijst, debug_info)
+ """
+ plan = await _llm_recovery_plan(user_goal, current, last_reason=last_reason)
+ added: list[str] = []
+ # patterns → scan
+ if plan.get("patterns"):
+ hits = _scan_repo_for_patterns(root, all_files, plan["patterns"], max_hits=int(os.getenv("LLM_RECOVERY_MAX_HITS","24")))
+ for h in hits:
+ if h not in current and h not in added:
+ added.append(h)
+ # keywords → scan
+ if len(added) < int(os.getenv("LLM_RECOVERY_MAX_HITS","24")) and plan.get("keywords"):
+ khits = _extend_candidates_with_keywords(root, all_files, plan["keywords"],
+ cap=int(os.getenv("LLM_RECOVERY_MAX_HITS","24")) - len(added))
+ for h in khits:
+ if h not in current and h not in added:
+ added.append(h)
+ new_list = (current + added)[:MAX_FILES_DRYRUN]
+ debug = {"recovery_plan": plan, "added": added[:12]}
+ return new_list, debug
+
+def _scan_repo_for_patterns(root: Path, all_files: list[str], patterns: list[dict], max_hits: int = 40) -> list[str]:
+ """
+ patterns: [{"glob": "resources/views/**.blade.php"}, {"regex": "Truebeam\\s*foutcode"}, ...]
+ Retourneert unieke bestands-paden met 1+ hits. Deterministisch (geen LLM).
+ """
+ hits: list[str] = []
+ seen: set[str] = set()
+ def _match_glob(pat: str) -> list[str]:
+ try:
+ pat = pat.strip().lstrip("./")
+ return [f for f in all_files if fnmatch.fnmatch(f, pat)]
+ except Exception:
+ return []
+ for spec in patterns or []:
+ if len(hits) >= max_hits: break
+ if "glob" in spec and isinstance(spec["glob"], str):
+ for f in _match_glob(spec["glob"]):
+ if f not in seen:
+ seen.add(f); hits.append(f)
+ if len(hits) >= max_hits: break
+ elif "regex" in spec and isinstance(spec["regex"], str):
+ try:
+ rx = re.compile(spec["regex"], re.I|re.M)
+ except Exception:
+ continue
+ for f in all_files:
+ if f in seen: continue
+ try:
+ txt = _read_text_file(Path(root)/f)
+ if rx.search(txt or ""):
+ seen.add(f); hits.append(f)
+ if len(hits) >= max_hits: break
+ except Exception:
+ continue
+ return hits
+
+async def _llm_make_search_specs(user_goal: str, framework: str = "laravel") -> list[dict]:
+ """
+ LLM bedenkt globs/regexen. Output ONLY JSON: {patterns:[{glob|regex: str},...]}
+ We voeren daarna een deterministische scan uit met _scan_repo_for_patterns.
+ """
+ if not (user_goal or "").strip():
+ return []
+ sys = ("Return ONLY JSON matching: {\"patterns\":[{\"glob\":str}|{\"regex\":str}, ...]}\n"
+ "For Laravel, prefer globs like resources/views/**.blade.php, routes/*.php, app/Http/Controllers/**.php, "
+ "config/*.php, .env, database/migrations/**.php. Keep regexes simple and safe.")
+ usr = f"Framework: {framework}\nUser goal:\n{user_goal}\nReturn ≤ 12 items."
+ try:
+ resp = await _llm_call(
+ [{"role":"system","content":sys},{"role":"user","content":usr}],
+ stream=False, temperature=0.0, top_p=1.0, max_tokens=280
+ )
+ raw = (resp.get('choices',[{}])[0].get('message',{}) or {}).get('content','')
+ m = re.search(r"\{[\s\S]*\}", raw or "")
+ obj = json.loads(m.group(0)) if m else {}
+ arr = obj.get("patterns") or []
+ out = []
+ for it in arr:
+ if isinstance(it, dict):
+ if "glob" in it and isinstance(it["glob"], str) and it["glob"].strip():
+ out.append({"glob": it["glob"].strip()[:200]})
+ elif "regex" in it and isinstance(it["regex"], str) and it["regex"].strip():
+ out.append({"regex": it["regex"].strip()[:200]})
+ if len(out) >= 16: break
+ return out
+ except Exception:
+ return []
+
+def _with_preview(text: str, st: "AgentState", *, limit: int = 1200, header: str = "--- SMART-RAG quick scan (preview) ---") -> str:
+ """Plak een compacte SMART-RAG preview onderaan het antwoord, als die er is."""
+ sp = getattr(st, "smart_preview", "") or ""
+ sp = sp.strip()
+ if not sp:
+ return text
+ if limit > 0 and len(sp) > limit:
+ sp = sp[:limit].rstrip() + "\n…"
+ return text + "\n\n" + header + "\n" + sp
+
+
+def _now() -> int:
+ return int(time.time())
+
+def _gitea_headers():
+ return {"Authorization": f"token {GITEA_TOKEN}"} if GITEA_TOKEN else {}
+
+def add_auth_to_url(url: str, user: str | None = None, token: str | None = None) -> str:
+ if not url or not (user and token):
+ return url
+ u = urlparse(url)
+ if u.scheme not in ("http", "https") or "@" in u.netloc:
+ return url
+ netloc = f"{user}:{token}@{u.netloc}"
+ return urlunparse((u.scheme, netloc, u.path, u.params, u.query, u.fragment))
+
+def ensure_git_suffix(url: str) -> str:
+ try:
+ u = urlparse(url)
+ if not u.path.endswith(".git") and "/api/" not in u.path:
+ return urlunparse((u.scheme, u.netloc, u.path.rstrip("/") + ".git", u.params, u.query, u.fragment))
+ return url
+ except Exception:
+ return url
+
+def parse_owner_repo(hint: str) -> tuple[str | None, str | None]:
+ m = re.match(r"^([A-Za-z0-9_.\-]+)/([A-Za-z0-9_.\-]+)$", (hint or "").strip())
+ if not m:
+ return None, None
+ return m.group(1), m.group(2)
+
+def gitea_get_repo(owner: str, repo: str) -> dict | None:
+ try:
+ r = requests.get(f"{GITEA_API}/repos/{owner}/{repo}", headers=_gitea_headers(), timeout=10)
+ if r.status_code == 404:
+ return None
+ r.raise_for_status()
+ return r.json()
+ except Exception as e:
+ logger.warning("WARN:agent_repo:gitea_get_repo %s/%s failed: %s", owner, repo, e)
+ return None
+
+def gitea_search_repos(q: str, limit: int = 5) -> List[dict]:
+ try:
+ r = requests.get(f"{GITEA_API}/repos/search",
+ params={"q": q, "limit": limit},
+ headers=_gitea_headers(), timeout=10)
+ r.raise_for_status()
+ data = r.json() or {}
+ if isinstance(data, dict) and "data" in data: return data["data"]
+ if isinstance(data, list): return data
+ if isinstance(data, dict) and "ok" in data and "data" in data: return data["data"]
+ return []
+ except Exception as e:
+ logger.warning("WARN:agent_repo:/repos/search failed: %s", e)
+ return []
+
+def resolve_repo(hint: str) -> tuple[dict | None, str | None]:
+ hint = (hint or "").strip()
+ logger.info("INFO:agent_repo:resolve_repo hint=%s", hint)
+ if hint.startswith("http://") or hint.startswith("https://"):
+ url = add_auth_to_url(ensure_git_suffix(hint), GITEA_HTTP_USER, GITEA_HTTP_TOKEN)
+ owner, repo = owner_repo_from_url(url)
+ rd = {"full_name": f"{owner}/{repo}" if owner and repo else None, "clone_url": url}
+ logger.info("INFO:agent_repo:resolved direct-url %s", rd.get("full_name"))
+ return rd, "direct-url"
+ owner, repo = parse_owner_repo(hint)
+ if owner and repo:
+ meta = gitea_get_repo(owner, repo)
+ if meta:
+ url = meta.get("clone_url") or f"{GITEA_URL}/{owner}/{repo}.git"
+ url = add_auth_to_url(ensure_git_suffix(url), GITEA_HTTP_USER, GITEA_HTTP_TOKEN)
+ meta["clone_url"] = url
+ logger.info("INFO:agent_repo:resolved owner-repo %s", meta.get("full_name"))
+ return meta, "owner-repo"
+ url = add_auth_to_url(ensure_git_suffix(f"{GITEA_URL}/{owner}/{repo}.git"), GITEA_HTTP_USER, GITEA_HTTP_TOKEN)
+ rd = {"full_name": f"{owner}/{repo}", "clone_url": url}
+ logger.info("INFO:agent_repo:resolved owner-repo-fallback %s", rd.get("full_name"))
+ return rd, "owner-repo-fallback"
+ found = gitea_search_repos(hint, limit=5)
+ if found:
+ found[0]["clone_url"] = add_auth_to_url(ensure_git_suffix(found[0].get("clone_url") or ""), GITEA_HTTP_USER, GITEA_HTTP_TOKEN)
+ logger.info("INFO:agent_repo:resolved search %s", found[0].get("full_name"))
+ return found[0], "search"
+ logger.error("ERROR:agent_repo:repo not found for hint=%s", hint)
+ return None, "not-found"
+
+def extract_context_hints_from_prompt(user_goal: str) -> dict:
+ """
+ Haal dynamisch hints uit de prompt:
+ - tag_names: HTML/XML tags die genoemd zijn (
, ,