mistral-api/agent_repo.py

4888 lines
205 KiB
Python
Raw Permalink Normal View History

2025-11-06 13:42:26 +00:00
# agent_repo.py
# =====================================================================
# Hybrid RAG + LLM edit-plans met: veilige fallback, anti-destructie guard,
# en EXPLICIETE UITLEG per diff.
# =====================================================================
# agent_repo.py (bovenin)
from __future__ import annotations
from smart_rag import enrich_intent, expand_queries, hybrid_retrieve, assemble_context
import os, re, time, uuid, difflib, hashlib, logging, json, fnmatch
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Any
from urllib.parse import urlparse, urlunparse
import requests
import base64
from windowing_utils import approx_token_count
from starlette.concurrency import run_in_threadpool
import asyncio
from collections import defaultdict
2025-11-20 15:16:00 +00:00
from llm_client import _llm_call
2025-11-06 13:42:26 +00:00
# --- Async I/O executors (voorkom event-loop blocking) ---
from concurrent.futures import ThreadPoolExecutor
_IO_POOL = ThreadPoolExecutor(max_workers=int(os.getenv("AGENT_IO_WORKERS", "8")))
_CPU_POOL = ThreadPoolExecutor(max_workers=int(os.getenv("AGENT_CPU_WORKERS", "2")))
_CLONE_SEMA = asyncio.Semaphore(int(os.getenv("AGENT_MAX_CONCURRENT_CLONES", "2")))
BACKEND = (os.getenv("VECTOR_BACKEND") or "CHROMA").upper().strip()
#PATH_RE = re.compile(r"(?<!https?:\/\/)([A-Za-z0-9._\-\/]+\/[A-Za-z0-9._\-]+(?:\.[A-Za-z0-9._\-]+))")
#PATH_RE = re.compile(r"(?<!http:\/\/)(?<!https:\/\/)(/[A-Za-z0-9._\-\/]+\/[A-Za-z0-9._\-]+(?:\.[A-Za-z0-9._\-]+))")
#PATH_RE = re.compile(r'(?<!https?://)(?:^|(?<=\s)|(?<=["\'(]))'r'((?:\.{0,2}/)?(?:[A-Za-z0-9._-]+/)+[A-Za-z0-9._-]+\.[A-Za-z0-9._-]+)')
PATH_RE = re.compile(
r'''
(?<!http://)(?<!https://) # niet voorafgegaan door http:// of https://
(?:^|(?<=\s)|(?<=[\'"\[])) # begin van string, whitespace of na ", ', [
( # ---------- capture group ----------
(?:\.{1,2}/)? # optioneel ./ of ../
(?:[\w.-]+/)* # 0 of meer mapsegmenten
[\w.-]+\.[\w.-]+ # bestandsnaam + extensie
)
''',
re.VERBOSE | re.IGNORECASE,
)
# Debounce: onthoud laatst-geïndexeerde HEAD per (repo_url|branch) in-memory
_INDEX_HEAD_MEMO: dict[str, str] = {}
_MEILI_HEAD_MEMO: dict[str, str] = {}
_BM25_HEAD_MEMO: dict[str, str] = {}
DEF_INJECTS = {}
_search_candidates_fn = None
_repo_summary_get_fn = None
_meili_search_fn = None
# --- caches voor graph en tree summaries (per HEAD) ---
_GRAPH_CACHE: dict[str, dict[str, set[str]]] = {}
_TREE_SUM_CACHE: dict[str, dict[str, str]] = {}
2025-11-20 15:16:00 +00:00
# ---------------------------------------------------------
# Fast-path helpers: expliciete paden + vervangpaar (old->new)
# ---------------------------------------------------------
_Q = r"[\"'“”‘’`]"
_PATH_PATS = [
r"[\"“”'](resources\/[A-Za-z0-9_\/\.-]+\.blade\.php)[\"']",
r"(resources\/[A-Za-z0-9_\/\.-]+\.blade\.php)",
r"[\"“”'](app\/[A-Za-z0-9_\/\.-]+\.php)[\"']",
r"(app\/[A-Za-z0-9_\/\.-]+\.php)",
]
_TRANS_WRAPPERS = [
r"__\(\s*{q}(.+?){q}\s*\)".format(q=_Q),
r"@lang\(\s*{q}(.+?){q}\s*\)".format(q=_Q),
r"trans\(\s*{q}(.+?){q}\s*\)".format(q=_Q),
]
2025-11-27 07:54:07 +00:00
def _clean_repo_arg(x):
"""Zet lege/sentinel repo-waarden om naar None (geen filter)."""
if x is None:
return None
s = str(x).strip().lower()
return None if s in ("", "-", "none") else x
2025-11-20 15:16:00 +00:00
def _extract_repo_branch_from_text(txt: str) -> Tuple[Optional[str], str]:
repo_url, branch = None, "main"
m = re.search(r"\bRepo\s*:\s*(\S+)", txt, flags=re.I)
if m: repo_url = m.group(1).strip()
mb = re.search(r"\bbranch\s*:\s*([A-Za-z0-9._/-]+)", txt, flags=re.I)
if mb: branch = mb.group(1).strip()
return repo_url, branch
def _extract_explicit_paths(txt: str) -> List[str]:
out = []
for pat in _PATH_PATS:
for m in re.finditer(pat, txt):
p = m.group(1)
if p and p not in out:
out.append(p)
return out
2025-11-06 13:42:26 +00:00
2025-11-20 15:16:00 +00:00
def _extract_replace_pair(txt: str) -> Tuple[Optional[str], Optional[str]]:
# NL/EN varianten + “slimme” quotes
pats = [
rf"Vervang\s+de\s+tekst\s*{_Q}(.+?){_Q}[^.\n]*?(?:in|naar|verander(?:en)?\s+in)\s*{_Q}(.+?){_Q}",
rf"Replace(?:\s+the)?\s+text\s*{_Q}(.+?){_Q}\s*(?:to|with)\s*{_Q}(.+?){_Q}",
]
for p in pats:
m = re.search(p, txt, flags=re.I|re.S)
if m:
return m.group(1), m.group(2)
mm = re.search(r"(Vervang|Replace)[\s\S]*?"+_Q+"(.+?)"+_Q+"[\s\S]*?"+_Q+"(.+?)"+_Q, txt, flags=re.I)
if mm:
return mm.group(2), mm.group(3)
return None, None
def _looks_like_unified_diff_request(txt: str) -> bool:
if re.search(r"\bunified\s+diff\b", txt, flags=re.I): return True
if re.search(r"\b(diff|patch)\b", txt, flags=re.I) and _extract_explicit_paths(txt):
return True
return False
2025-11-06 13:42:26 +00:00
# zet dit dicht bij de andere module-consts
async def _call_get_git_repo(repo_url: str, branch: str):
"""
Veilig wrapper: ondersteunt zowel sync als async implementaties van _get_git_repo.
"""
if asyncio.iscoroutinefunction(_get_git_repo):
return await _get_git_repo(repo_url, branch)
# sync: draai in IO pool
return await run_io_blocking(_get_git_repo, repo_url, branch)
async def run_io_blocking(func, *args, pool=None, **kwargs):
"""Draai sync/blokkerende I/O in threadpool zodat de event-loop vrij blijft."""
loop = asyncio.get_running_loop()
executor = pool or _IO_POOL
return await loop.run_in_executor(executor, lambda: func(*args, **kwargs))
async def run_cpu_blocking(func, *args, pool=None, **kwargs):
"""Voor CPU-zwaardere taken (bv. index bouwen)."""
loop = asyncio.get_running_loop()
executor = pool or _CPU_POOL
return await loop.run_in_executor(executor, lambda: func(*args, **kwargs))
# Lazy imports
_chroma = None
_qdrant = None
_qdrant_models = None
try:
if BACKEND == "CHROMA":
import chromadb # type: ignore
_chroma = chromadb
except Exception:
_chroma = None
try:
if BACKEND == "QDRANT":
from qdrant_client import QdrantClient # type: ignore
from qdrant_client.http.models import Filter, FieldCondition, MatchValue # type: ignore
_qdrant = QdrantClient
_qdrant_models = (Filter, FieldCondition, MatchValue)
except Exception:
_qdrant = None
_qdrant_models = None
try:
from rank_bm25 import BM25Okapi
except Exception:
BM25Okapi = None
logger = logging.getLogger("agent_repo")
# ---------- Omgeving / Config ----------
2026-02-02 09:28:41 +00:00
GITEA_URL = os.environ.get("GITEA_URL", "http://10.25.138.40:30085").rstrip("/")
2025-11-06 13:42:26 +00:00
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "8bdbe18dd2ec93ecbf9cd0a8f01a6eadf9cfa87d")
GITEA_API = os.environ.get("GITEA_API", f"{GITEA_URL}/api/v1").rstrip("/")
AGENT_DEFAULT_BRANCH = os.environ.get("AGENT_DEFAULT_BRANCH", "main")
AGENT_MAX_QUESTIONS = int(os.environ.get("AGENT_MAX_QUESTIONS", "3"))
MAX_FILES_DRYRUN = int(os.environ.get("AGENT_MAX_FILES_DRYRUN", "27"))
RAG_TOPK = int(os.environ.get("AGENT_RAG_TOPK", "24")) # grotere kandidaatpool helpt de reranker
AGENT_DISCOVER_MAX_REPOS = int(os.environ.get("AGENT_DISCOVER_MAX_REPOS", "200"))
AGENT_AUTOSELECT_THRESHOLD = float(os.environ.get("AGENT_AUTOSELECT_THRESHOLD", "0.80")) # 0..1
REPO_CATALOG_MEILI_INDEX = os.environ.get("REPO_CATALOG_MEILI_INDEX", "repo-catalog")
AGENT_ENABLE_GOAL_REFINE = os.environ.get("AGENT_ENABLE_GOAL_REFINE", "1").lower() in ("1","true","yes")
AGENT_CLARIFY_THRESHOLD = float(os.environ.get("AGENT_CLARIFY_THRESHOLD", "0.6"))
# Meilisearch (optioneel)
2026-02-02 09:28:41 +00:00
MEILI_URL = os.environ.get("MEILI_URL", "http://192.168.100.1:7700").strip()
2025-11-06 13:42:26 +00:00
MEILI_KEY = os.environ.get("MEILI_KEY", "0xipOmfgi_zMgdFplSdv7L8mlx0RPMQCNxVTNJc54lQ").strip()
MEILI_INDEX_PREFIX = os.environ.get("MEILI_INDEX_PREFIX", "code").strip()
# optioneel: basic auth injectie voor HTTP clone (private repos)
GITEA_HTTP_USER = os.environ.get("GITEA_HTTP_USER", "Mistral-llm")
GITEA_HTTP_TOKEN = os.environ.get("GITEA_HTTP_TOKEN", "8bdbe18dd2ec93ecbf9cd0a8f01a6eadf9cfa87d")
# Geen destructive edits. (geen complete inhoud van files verwijderen.)
AGENT_DESTRUCTIVE_RATIO = float(os.environ.get("AGENT_DESTRUCTIVE_RATIO", "0.50"))
# Alleen relevante code/tekst-extensies (geen binaire/caches)
ALLOWED_EXTS = {
".php",".blade.php",".vue",".js",".ts",".jsx",".tsx",".css",".scss",
".html",".htm",".json",".md",".ini",".cfg",".yml",".yaml",".toml",
".py",".go",".rb",".java",".cs",".txt"
}
INTERNAL_EXCLUDE_DIRS = {
".git",".npm","node_modules","vendor","storage","dist","build",".next",
"__pycache__",".venv","venv",".mypy_cache",".pytest_cache",
"target","bin","obj","logs","cache","temp",".cache"
}
_LIST_FILES_CACHE: dict[str, tuple[float, List[str]]] = {} # path -> (ts, files)
# ---------- Injectie vanuit app.py ----------
_app = None
_get_git_repo = None
_rag_index_repo_internal = None
_rag_query_internal = None
_llm_call = None
_extract_code_block = None
_read_text_file = None
_client_ip = None
_PROFILE_EXCLUDE_DIRS: set[str] = set()
_get_chroma_collection = None
_embed_query_fn = None
_embed_documents = None
# === SMART LLM WRAPPER: budget + nette afronding + auto-continue ===
# Past binnen jouw GPU-cap (typisch 13027 tokens totale context).
# Non-invasief: behoudt hetzelfde response-shape als _llm_call.
# Harde cap van jouw Mistral-LLM docker (zoals je aangaf)
_MODEL_BUDGET = int(os.getenv("LLM_TOTAL_TOKEN_BUDGET", "42000"))
2025-11-06 13:42:26 +00:00
# Veiligheidsmarge voor headers/EOS/afwijkingen in token-raming
_BUDGET_SAFETY = int(os.getenv("LLM_BUDGET_SAFETY_TOKENS", "512"))
# Max aantal vervolgstappen als het net afgekapt lijkt
_MAX_AUTO_CONTINUES = int(os.getenv("LLM_MAX_AUTO_CONTINUES", "2"))
def _est_tokens(text: str) -> int:
# Ruwe schatting: ~4 chars/token (conservatief genoeg voor budgettering)
if not text: return 0
return max(1, len(text) // 4)
def _concat_messages_text(messages: list[dict]) -> str:
parts = []
for m in messages or []:
c = m.get("content")
if isinstance(c, str): parts.append(c)
return "\n".join(parts)
def _ends_neatly(s: str) -> bool:
if not s: return False
t = s.rstrip()
return t.endswith((".", "!", "?", "", "", ""))
def _append_assistant_and_continue_prompt(base_messages: list[dict], prev_text: str) -> list[dict]:
"""
Bouw een minimale vervolgprompt zonder opnieuw de hele context te sturen.
Dit beperkt prompt_tokens en voorkomt dat we opnieuw de cap raken.
"""
tail_words = " ".join(prev_text.split()[-60:]) # laatste ±60 woorden als anker
cont_user = (
"Ga verder waar je stopte. Herhaal niets. "
"Vervolg direct de laatste zin met hetzelfde formaat.\n\n"
"Vorige woorden:\n" + tail_words
)
# We sturen *niet* de volledige history opnieuw; alleen een korte instructie
return [
{"role": "system", "content": "Vervolg exact en beknopt; geen herhaling van eerder gegenereerde tekst."},
{"role": "user", "content": cont_user},
]
def _merge_choice_text(resp_a: dict, resp_b: dict) -> dict:
"""
Plak de content van choices[0] aan elkaar zodat callsites één 'content' blijven lezen.
"""
a = (((resp_a or {}).get("choices") or [{}])[0].get("message") or {}).get("content","")
b = (((resp_b or {}).get("choices") or [{}])[0].get("message") or {}).get("content","")
merged = (a or "") + (b or "")
out = resp_a.copy()
if "choices" in out and out["choices"]:
out["choices"] = [{
"index": 0,
"finish_reason": "length" if (out.get("choices",[{}])[0].get("finish_reason") in (None, "length")) else out.get("choices",[{}])[0].get("finish_reason"),
"message": {"role":"assistant","content": merged}
}]
return out
# Voorbeeld: Chroma client/init vervang door jouw eigen client
# from chromadb import Client
# chroma = Client(...)
def _build_where_filter(repo: Optional[str], path_contains: Optional[str], profile: Optional[str]) -> Dict[str, Any]:
"""
Bouw een simpele metadata-filter voor de vector-DB. Pas aan naar jouw DB.
"""
where: Dict[str, Any] = {}
if repo:
where["repo"] = repo
if profile:
where["profile"] = profile
if path_contains:
# Als je DB geen 'contains' ondersteunt: filter achteraf (post-filter)
where["path_contains"] = path_contains
return where
def _to_distance_from_similarity(x: Optional[float]) -> float:
"""
Converteer een 'similarity' (1=identiek, 0=ver weg) naar distance (lager = beter).
"""
if x is None:
return 1.0
try:
xv = float(x)
except Exception:
return 1.0
# Veiligheids-net: clamp
if xv > 1.0 or xv < 0.0:
# Sommige backends geven cosine distance al (0=identiek). Als >1, treat as distance passthrough.
return max(0.0, xv)
# Standaard: cosine similarity → distance
return 1.0 - xv
def _post_filter_path_contains(items: List[Dict[str,Any]], path_contains: Optional[str]) -> List[Dict[str,Any]]:
if not path_contains:
return items
key = (path_contains or "").lower()
out = []
for it in items:
p = ((it.get("metadata") or {}).get("path") or "").lower()
if key in p:
out.append(it)
return out
def _chroma_query(collection_name: str, query: str, n_results: int, where: Dict[str,Any]) -> Dict[str,Any]:
global _chroma
if _chroma is None:
raise RuntimeError("Chroma backend niet beschikbaar (module niet geïnstalleerd).")
# Gebruik dezelfde collection-factory als de indexer, zodat versie/suffix consistent is
if _get_chroma_collection is None:
client = _chroma.Client()
coll = client.get_or_create_collection(collection_name)
else:
coll = _get_chroma_collection(collection_name)
# Chroma: use 'where' only for exact fields (repo/profile)
where_exact = {k:v for k,v in where.items() if k in ("repo","profile")}
qr = coll.query(
query_texts=[query],
n_results=max(1, n_results),
where=where_exact,
include=["documents","metadatas","distances"]
)
docs = qr.get("documents", [[]])[0] or []
metas = qr.get("metadatas", [[]])[0] or []
dists = qr.get("distances", [[]])[0] or []
# Chroma 'distances': lager = beter (ok)
items: List[Dict[str,Any]] = []
for doc, meta, dist in zip(docs, metas, dists):
items.append({
"document": doc,
"metadata": {
"repo": meta.get("repo",""),
"path": meta.get("path",""),
"chunk_index": meta.get("chunk_index", 0),
"symbols": meta.get("symbols", []),
"profile": meta.get("profile",""),
},
"distance": float(dist) if dist is not None else 1.0,
})
return {"results": items}
def _qdrant_query(collection_name: str, query: str, n_results: int, where: Dict[str,Any]) -> Dict[str,Any]:
global _qdrant, _qdrant_models
if _qdrant is None or _qdrant_models is None:
raise RuntimeError("Qdrant backend niet beschikbaar (module niet geïnstalleerd).")
Filter, FieldCondition, MatchValue = _qdrant_models
# Let op: je hebt hier *ook* een embedder nodig (client-side). In dit skeleton verwachten we dat
# je server-side search by text hebt geconfigureerd. Anders: voeg hier je embedder toe.
2026-02-02 09:28:41 +00:00
client = _qdrant(host=os.getenv("QDRANT_HOST","192.168.100.1"), port=int(os.getenv("QDRANT_PORT","6333")))
2025-11-06 13:42:26 +00:00
# Eenvoudig: text search (als ingeschakeld). Anders: raise en laat de mock fallback pakken.
try:
must: List[Any] = []
if where.get("repo"):
must.append(FieldCondition(key="repo", match=MatchValue(value=where["repo"])))
if where.get("profile"):
must.append(FieldCondition(key="profile", match=MatchValue(value=where["profile"])))
flt = Filter(must=must) if must else None
# NB: Qdrant 'score' is vaak cosine similarity (hoog=goed). Converteer naar distance.
res = client.search(
collection_name=collection_name,
query=query,
limit=max(1, n_results),
query_filter=flt,
with_payload=True,
)
except Exception as e:
raise RuntimeError(f"Qdrant text search niet geconfigureerd: {e}")
items: List[Dict[str,Any]] = []
for p in res:
meta = (p.payload or {})
sim = getattr(p, "score", None)
items.append({
"document": meta.get("document",""),
"metadata": {
"repo": meta.get("repo",""),
"path": meta.get("path",""),
"chunk_index": meta.get("chunk_index", 0),
"symbols": meta.get("symbols", []),
"profile": meta.get("profile",""),
},
"distance": _to_distance_from_similarity(sim),
})
return {"results": items}
async def rag_query_internal_fn(
*, query: str, n_results: int, collection_name: str,
repo: Optional[str], path_contains: Optional[str], profile: Optional[str]
) -> Dict[str, Any]:
"""
Adapter die zoekt in je vector-DB en *exact* het verwachte formaat teruggeeft:
{
"results": [
{"document": str, "metadata": {...}, "distance": float}
]
}
"""
# 1) Haal collectie op (pas aan naar jouw client)
# coll = chroma.get_or_create_collection(collection_name)
# 2) Bouw where/filter (optioneel afhankelijk van jouw DB)
where = _build_where_filter(repo, path_contains, profile)
# ?2?) Router naar backend
try:
if BACKEND == "CHROMA":
res = _chroma_query(collection_name, query, n_results, where)
elif BACKEND == "QDRANT":
res = _qdrant_query(collection_name, query, n_results, where)
else:
raise RuntimeError(f"Onbekende VECTOR_BACKEND={BACKEND}")
except Exception as e:
# Mock fallback zodat je app bruikbaar blijft
qr = {
"documents": [["(mock) no DB connected"]],
"metadatas": [[{"repo": repo or "", "path": "README.md", "chunk_index": 0, "symbols": []}]],
"distances": [[0.99]],
}
docs = qr.get("documents", [[]])[0] or []
metas = qr.get("metadatas", [[]])[0] or []
dists = qr.get("distances", [[]])[0] or []
items: List[Dict[str, Any]] = []
for doc, meta, dist in zip(docs, metas, dists):
# Post-filter op path_contains als je DB dat niet ondersteunt
if path_contains:
p = (meta.get("path") or "").lower()
if (path_contains or "").lower() not in p:
continue
items.append({
"document": doc,
"metadata": {
"repo": meta.get("repo",""),
"path": meta.get("path",""),
"chunk_index": meta.get("chunk_index", 0),
"symbols": meta.get("symbols", []),
"profile": meta.get("profile",""),
},
"distance": float(dist) if dist is not None else 1.0,
})
res = {"results": items[:max(1, n_results)]}
# 3) Post-filter path_contains (indien nodig)
res["results"] = _post_filter_path_contains(res.get("results", []), path_contains)
# 4) Trim
res["results"] = res.get("results", [])[:max(1, n_results)]
return res
async def _smart_llm_call_base(
llm_call_fn,
messages: list[dict],
*,
stop: list[str] | None = None,
max_tokens: int | None = None,
temperature: float = 0.2,
top_p: float = 0.9,
stream: bool = False,
**kwargs
):
"""
1) Dwing max_tokens af binnen totale budget (prompt + output cap).
2) Voeg milde stop-sequenties toe voor nette afronding.
3) Auto-continue als het lijkt afgekapt en we ruimte willen voor een vervolg.
"""
# 1) Budget berekenen op basis van huidige prompt omvang
prompt_text = _concat_messages_text(messages)
prompt_tokens = _est_tokens(prompt_text)
room = max(128, _MODEL_BUDGET - prompt_tokens - _BUDGET_SAFETY)
eff_max_tokens = max(1, min(int(max_tokens or 900), room))
# 2) Stop-sequenties (mild, niet beperkend voor code)
default_stops = ["\n\n", "###"]
stops = list(dict.fromkeys((stop or []) + default_stops))
# eerste call
try:
resp = await llm_call_fn(
messages,
stream=stream,
temperature=temperature,
top_p=top_p,
max_tokens=eff_max_tokens,
stop=stops,
**kwargs
)
except TypeError as e:
# backend accepteert geen 'stop' → probeer opnieuw zonder stop
resp = await llm_call_fn(
messages,
stream=stream,
temperature=temperature,
top_p=top_p,
max_tokens=eff_max_tokens,
**kwargs
)
text = (((resp or {}).get("choices") or [{}])[0].get("message") or {}).get("content","")
# Heuristiek: bijna vol + niet netjes eindigen → waarschijnlijk afgekapt
near_cap = (_est_tokens(text) >= int(0.92 * eff_max_tokens))
needs_more = (near_cap and not _ends_neatly(text))
continues = 0
merged = resp
while needs_more and continues < _MAX_AUTO_CONTINUES:
continues += 1
cont_msgs = _append_assistant_and_continue_prompt(messages, text)
# Herbereken budget voor vervolg (nieuwe prompt is veel kleiner)
cont_prompt_tokens = _est_tokens(_concat_messages_text(cont_msgs))
cont_room = max(128, _MODEL_BUDGET - cont_prompt_tokens - _BUDGET_SAFETY)
cont_max = max(1, min(int(max_tokens or 900), cont_room))
try:
cont_resp = await llm_call_fn(
cont_msgs,
stream=False,
temperature=temperature,
top_p=top_p,
max_tokens=cont_max,
stop=stops,
**kwargs
)
except TypeError:
cont_resp = await llm_call_fn(
cont_msgs,
stream=False,
temperature=temperature,
top_p=top_p,
max_tokens=cont_max,
**kwargs
)
merged = _merge_choice_text(merged, cont_resp)
text = (((merged or {}).get("choices") or [{}])[0].get("message") or {}).get("content","")
near_cap = (_est_tokens(text.split()[-800:]) >= int(0.9 * cont_max)) # check op laatst stuk
needs_more = (near_cap and not _ends_neatly(text))
return merged
def initialize_agent(*, app, get_git_repo_fn, rag_index_repo_internal_fn, rag_query_internal_fn,
llm_call_fn, extract_code_block_fn, read_text_file_fn, client_ip_fn,
profile_exclude_dirs, chroma_get_collection_fn, embed_query_fn, embed_documents_fn,
search_candidates_fn=None, repo_summary_get_fn=None, meili_search_fn=None):
global DEF_INJECTS
DEF_INJECTS.update({
"app": app,
"get_git_repo_fn": get_git_repo_fn,
"rag_index_repo_internal_fn": rag_index_repo_internal_fn,
"rag_query_internal_fn": rag_query_internal_fn,
"llm_call_fn": llm_call_fn,
"extract_code_block_fn": extract_code_block_fn,
"read_text_file_fn": read_text_file_fn,
"client_ip_fn": client_ip_fn,
"profile_exclude_dirs": profile_exclude_dirs,
"chroma_get_collection_fn": chroma_get_collection_fn,
"embed_query_fn": embed_query_fn,
"embed_documents_fn": embed_documents_fn,
})
global _search_candidates_fn, _repo_summary_get_fn, _meili_search_fn
_search_candidates_fn = search_candidates_fn
_repo_summary_get_fn = repo_summary_get_fn
_meili_search_fn = meili_search_fn
global _get_chroma_collection, _embed_query_fn
global _app, _get_git_repo, _rag_index_repo_internal, _rag_query_internal, _llm_call
global _extract_code_block, _read_text_file, _client_ip, _PROFILE_EXCLUDE_DIRS
_app = app
_get_git_repo = get_git_repo_fn
_rag_index_repo_internal = rag_index_repo_internal_fn
_rag_query_internal = rag_query_internal_fn
# Bewaar de originele en wrap met budget + auto-continue
_llm_call_original = llm_call_fn
async def _wrapped_llm_call(messages, **kwargs):
return await _smart_llm_call_base(_llm_call_original, messages, **kwargs)
globals()["_llm_call"] = _wrapped_llm_call
_extract_code_block = extract_code_block_fn
_read_text_file = read_text_file_fn
_client_ip = client_ip_fn
_PROFILE_EXCLUDE_DIRS = set(profile_exclude_dirs) | INTERNAL_EXCLUDE_DIRS
_get_chroma_collection = chroma_get_collection_fn
_embed_query_fn = embed_query_fn
_embed_documents = embed_documents_fn
if not hasattr(_app.state, "AGENT_SESSIONS"):
_app.state.AGENT_SESSIONS: Dict[str, AgentState] = {}
logger.info("INFO:agent_repo:init GITEA_URL=%s GITEA_API=%s MEILI_URL=%s", GITEA_URL, GITEA_API, MEILI_URL or "-")
# ---------- Helpers ----------
def extract_explicit_paths(text: str) -> List[str]:
"""
Robuuste extractor:
- negeert urls (http/https)
- vereist minstens één '/' en een extensie
- dedupe, behoud originele volgorde
"""
if not text:
return []
# normaliseer “slimme” quotes naar gewone quotes (kan later handig zijn)
t = (text or "").replace("","\"").replace("","\"").replace("","'").replace("\\","/").strip()
cands = PATH_RE.findall(t)
seen = set()
out: List[str] = []
for p in cands:
if p not in seen:
seen.add(p)
out.append(p)
logger.info("EXPLICIT PATHS parsed: %s", out) # <— log
return out
async def _llm_recovery_plan(user_goal: str, observed_candidates: list[str], last_reason: str = "") -> dict:
"""
Vraag de LLM om gerichte herstel-zoekpatronen en trefwoorden wanneer we 'geen voorstel' kregen.
Output JSON: { "patterns":[{"glob"| "regex": str},...], "keywords":[str,...], "note": str }
"""
sys = ("Return ONLY compact JSON. Schema:\n"
"{\"patterns\":[{\"glob\":str}|{\"regex\":str},...],\"keywords\":[str,...],\"note\":str}\n"
"Prefer Laravel-centric paths (resources/views/**.blade.php, routes/*.php, app/Http/Controllers/**.php, "
"config/*.php, .env, database/migrations/**.php). Max 12 patterns, 8 keywords.")
usr = (f"User goal:\n{user_goal}\n\n"
f"Candidates we tried (may be irrelevant):\n{json.dumps(observed_candidates[-12:], ensure_ascii=False)}\n\n"
f"Failure reason (if any): {last_reason or '(none)'}\n"
"Propose minimal extra patterns/keywords to find the exact files.")
try:
resp = await _llm_call(
[{"role":"system","content":sys},{"role":"user","content":usr}],
stream=False, temperature=0.0, top_p=1.0, max_tokens=280
)
raw = (resp.get("choices",[{}])[0].get("message",{}) or {}).get("content","")
m = re.search(r"\{[\s\S]*\}", raw or "")
obj = json.loads(m.group(0)) if m else {}
except Exception:
obj = {}
# sanitize
pats = []
for it in (obj.get("patterns") or []):
if isinstance(it, dict):
if "glob" in it and isinstance(it["glob"], str) and it["glob"].strip():
pats.append({"glob": it["glob"].strip()[:200]})
elif "regex" in it and isinstance(it["regex"], str) and it["regex"].strip():
pats.append({"regex": it["regex"].strip()[:200]})
if len(pats) >= 16: break
kws = [str(x).strip()[:64] for x in (obj.get("keywords") or []) if str(x).strip()][:8]
note = str(obj.get("note",""))[:400]
return {"patterns": pats, "keywords": kws, "note": note}
def _extend_candidates_with_keywords(root: Path, all_files: list[str], keywords: list[str], cap: int = 24) -> list[str]:
"""
Deterministische keyword-scan (lichtgewicht). Gebruikt dezelfde text loader.
"""
out: list[str] = []; seen: set[str] = set()
kws = [k for k in keywords if k]
if not kws: return out
for rel in all_files:
if len(out) >= cap: break
try:
txt = _read_text_file(Path(root)/rel)
except Exception:
txt = ""
if not txt: continue
low = txt.lower()
if any(k.lower() in low for k in kws):
if rel not in seen:
seen.add(rel); out.append(rel)
return out
async def _recovery_expand_candidates(root: Path, all_files: list[str], user_goal: str,
current: list[str], *, last_reason: str = "") -> tuple[list[str], dict]:
"""
1) vraag LLM om recovery plan patterns + keywords
2) scan deterministisch met _scan_repo_for_patterns
3) keyword-scan als tweede spoor
Retourneert (nieuwe_kandidaten_lijst, debug_info)
"""
plan = await _llm_recovery_plan(user_goal, current, last_reason=last_reason)
added: list[str] = []
# patterns → scan
if plan.get("patterns"):
hits = _scan_repo_for_patterns(root, all_files, plan["patterns"], max_hits=int(os.getenv("LLM_RECOVERY_MAX_HITS","24")))
for h in hits:
if h not in current and h not in added:
added.append(h)
# keywords → scan
if len(added) < int(os.getenv("LLM_RECOVERY_MAX_HITS","24")) and plan.get("keywords"):
khits = _extend_candidates_with_keywords(root, all_files, plan["keywords"],
cap=int(os.getenv("LLM_RECOVERY_MAX_HITS","24")) - len(added))
for h in khits:
if h not in current and h not in added:
added.append(h)
new_list = (current + added)[:MAX_FILES_DRYRUN]
debug = {"recovery_plan": plan, "added": added[:12]}
return new_list, debug
def _scan_repo_for_patterns(root: Path, all_files: list[str], patterns: list[dict], max_hits: int = 40) -> list[str]:
"""
patterns: [{"glob": "resources/views/**.blade.php"}, {"regex": "Truebeam\\s*foutcode"}, ...]
Retourneert unieke bestands-paden met 1+ hits. Deterministisch (geen LLM).
"""
hits: list[str] = []
seen: set[str] = set()
def _match_glob(pat: str) -> list[str]:
try:
pat = pat.strip().lstrip("./")
return [f for f in all_files if fnmatch.fnmatch(f, pat)]
except Exception:
return []
for spec in patterns or []:
if len(hits) >= max_hits: break
if "glob" in spec and isinstance(spec["glob"], str):
for f in _match_glob(spec["glob"]):
if f not in seen:
seen.add(f); hits.append(f)
if len(hits) >= max_hits: break
elif "regex" in spec and isinstance(spec["regex"], str):
try:
rx = re.compile(spec["regex"], re.I|re.M)
except Exception:
continue
for f in all_files:
if f in seen: continue
try:
txt = _read_text_file(Path(root)/f)
if rx.search(txt or ""):
seen.add(f); hits.append(f)
if len(hits) >= max_hits: break
except Exception:
continue
return hits
async def _llm_make_search_specs(user_goal: str, framework: str = "laravel") -> list[dict]:
"""
LLM bedenkt globs/regexen. Output ONLY JSON: {patterns:[{glob|regex: str},...]}
We voeren daarna een deterministische scan uit met _scan_repo_for_patterns.
"""
if not (user_goal or "").strip():
return []
sys = ("Return ONLY JSON matching: {\"patterns\":[{\"glob\":str}|{\"regex\":str}, ...]}\n"
"For Laravel, prefer globs like resources/views/**.blade.php, routes/*.php, app/Http/Controllers/**.php, "
"config/*.php, .env, database/migrations/**.php. Keep regexes simple and safe.")
usr = f"Framework: {framework}\nUser goal:\n{user_goal}\nReturn ≤ 12 items."
try:
resp = await _llm_call(
[{"role":"system","content":sys},{"role":"user","content":usr}],
stream=False, temperature=0.0, top_p=1.0, max_tokens=280
)
raw = (resp.get('choices',[{}])[0].get('message',{}) or {}).get('content','')
m = re.search(r"\{[\s\S]*\}", raw or "")
obj = json.loads(m.group(0)) if m else {}
arr = obj.get("patterns") or []
out = []
for it in arr:
if isinstance(it, dict):
if "glob" in it and isinstance(it["glob"], str) and it["glob"].strip():
out.append({"glob": it["glob"].strip()[:200]})
elif "regex" in it and isinstance(it["regex"], str) and it["regex"].strip():
out.append({"regex": it["regex"].strip()[:200]})
if len(out) >= 16: break
return out
except Exception:
return []
def _with_preview(text: str, st: "AgentState", *, limit: int = 1200, header: str = "--- SMART-RAG quick scan (preview) ---") -> str:
"""Plak een compacte SMART-RAG preview onderaan het antwoord, als die er is."""
sp = getattr(st, "smart_preview", "") or ""
sp = sp.strip()
if not sp:
return text
if limit > 0 and len(sp) > limit:
sp = sp[:limit].rstrip() + "\n"
return text + "\n\n" + header + "\n" + sp
def _now() -> int:
return int(time.time())
def _gitea_headers():
return {"Authorization": f"token {GITEA_TOKEN}"} if GITEA_TOKEN else {}
def add_auth_to_url(url: str, user: str | None = None, token: str | None = None) -> str:
if not url or not (user and token):
return url
u = urlparse(url)
if u.scheme not in ("http", "https") or "@" in u.netloc:
return url
netloc = f"{user}:{token}@{u.netloc}"
return urlunparse((u.scheme, netloc, u.path, u.params, u.query, u.fragment))
def ensure_git_suffix(url: str) -> str:
try:
u = urlparse(url)
if not u.path.endswith(".git") and "/api/" not in u.path:
return urlunparse((u.scheme, u.netloc, u.path.rstrip("/") + ".git", u.params, u.query, u.fragment))
return url
except Exception:
return url
def parse_owner_repo(hint: str) -> tuple[str | None, str | None]:
m = re.match(r"^([A-Za-z0-9_.\-]+)/([A-Za-z0-9_.\-]+)$", (hint or "").strip())
if not m:
return None, None
return m.group(1), m.group(2)
def gitea_get_repo(owner: str, repo: str) -> dict | None:
try:
r = requests.get(f"{GITEA_API}/repos/{owner}/{repo}", headers=_gitea_headers(), timeout=10)
if r.status_code == 404:
return None
r.raise_for_status()
return r.json()
except Exception as e:
logger.warning("WARN:agent_repo:gitea_get_repo %s/%s failed: %s", owner, repo, e)
return None
def gitea_search_repos(q: str, limit: int = 5) -> List[dict]:
try:
r = requests.get(f"{GITEA_API}/repos/search",
params={"q": q, "limit": limit},
headers=_gitea_headers(), timeout=10)
r.raise_for_status()
data = r.json() or {}
if isinstance(data, dict) and "data" in data: return data["data"]
if isinstance(data, list): return data
if isinstance(data, dict) and "ok" in data and "data" in data: return data["data"]
return []
except Exception as e:
logger.warning("WARN:agent_repo:/repos/search failed: %s", e)
return []
def resolve_repo(hint: str) -> tuple[dict | None, str | None]:
hint = (hint or "").strip()
logger.info("INFO:agent_repo:resolve_repo hint=%s", hint)
if hint.startswith("http://") or hint.startswith("https://"):
url = add_auth_to_url(ensure_git_suffix(hint), GITEA_HTTP_USER, GITEA_HTTP_TOKEN)
owner, repo = owner_repo_from_url(url)
rd = {"full_name": f"{owner}/{repo}" if owner and repo else None, "clone_url": url}
logger.info("INFO:agent_repo:resolved direct-url %s", rd.get("full_name"))
return rd, "direct-url"
owner, repo = parse_owner_repo(hint)
if owner and repo:
meta = gitea_get_repo(owner, repo)
if meta:
url = meta.get("clone_url") or f"{GITEA_URL}/{owner}/{repo}.git"
url = add_auth_to_url(ensure_git_suffix(url), GITEA_HTTP_USER, GITEA_HTTP_TOKEN)
meta["clone_url"] = url
logger.info("INFO:agent_repo:resolved owner-repo %s", meta.get("full_name"))
return meta, "owner-repo"
url = add_auth_to_url(ensure_git_suffix(f"{GITEA_URL}/{owner}/{repo}.git"), GITEA_HTTP_USER, GITEA_HTTP_TOKEN)
rd = {"full_name": f"{owner}/{repo}", "clone_url": url}
logger.info("INFO:agent_repo:resolved owner-repo-fallback %s", rd.get("full_name"))
return rd, "owner-repo-fallback"
found = gitea_search_repos(hint, limit=5)
if found:
found[0]["clone_url"] = add_auth_to_url(ensure_git_suffix(found[0].get("clone_url") or ""), GITEA_HTTP_USER, GITEA_HTTP_TOKEN)
logger.info("INFO:agent_repo:resolved search %s", found[0].get("full_name"))
return found[0], "search"
logger.error("ERROR:agent_repo:repo not found for hint=%s", hint)
return None, "not-found"
def extract_context_hints_from_prompt(user_goal: str) -> dict:
"""
Haal dynamisch hints uit de prompt:
- tag_names: HTML/XML tags die genoemd zijn (<title>, <h1>, <button> ...)
- attr_names: genoemde HTML attributen (value, placeholder, title, aria-label ...)
"""
tag_names = set()
for m in re.finditer(r"<\s*([A-Za-z][A-Za-z0-9:_-]*)\s*>", user_goal):
tag_names.add(m.group(1).lower())
attr_names = set()
for m in re.finditer(r"\b(value|placeholder|title|aria-[a-z-]+|alt|label)\b", user_goal, flags=re.IGNORECASE):
attr_names.add(m.group(1).lower())
return {"tag_names": tag_names, "attr_names": attr_names}
def gitea_list_all_repos(limit: int = AGENT_DISCOVER_MAX_REPOS) -> List[dict]:
"""
Haal zo veel mogelijk repos op die de token kan zien.
Probeert /repos/search paginated; valt terug op lege lijst bij problemen.
"""
out = []
page = 1
per_page = 50
try:
while len(out) < limit:
r = requests.get(
f"{GITEA_API}/repos/search",
params={"q":"", "limit": per_page, "page": page},
headers=_gitea_headers(), timeout=10
)
r.raise_for_status()
data = r.json()
items = data.get("data") if isinstance(data, dict) else (data if isinstance(data, list) else [])
if not items:
break
out.extend(items)
if len(items) < per_page:
break
page += 1
except Exception as e:
logger.warning("WARN:agent_repo:gitea_list_all_repos failed: %s", e)
# Normaliseer velden
norm = []
for it in out[:limit]:
full = it.get("full_name") or (f"{it.get('owner',{}).get('login','')}/{it.get('name','')}".strip("/"))
clone = it.get("clone_url") or (f"{GITEA_URL}/{full}.git" if full else None)
default_branch = it.get("default_branch") or "main"
norm.append({
"full_name": full,
"name": it.get("name"),
"owner": (it.get("owner") or {}).get("login"),
"description": it.get("description") or "",
"language": it.get("language") or "",
"topics": it.get("topics") or [],
"default_branch": default_branch,
"clone_url": add_auth_to_url(ensure_git_suffix(clone), GITEA_HTTP_USER, GITEA_HTTP_TOKEN) if clone else None,
})
return [n for n in norm if n.get("full_name")]
def gitea_fetch_readme(owner: str, repo: str, ref: str = "main") -> str:
"""Probeer README via API; dek meerdere varianten af; decode base64 als nodig."""
candidates = [
f"{GITEA_API}/repos/{owner}/{repo}/readme",
f"{GITEA_API}/repos/{owner}/{repo}/contents/README.md",
f"{GITEA_API}/repos/{owner}/{repo}/contents/README",
f"{GITEA_API}/repos/{owner}/{repo}/contents/readme.md",
]
for url in candidates:
try:
r = requests.get(url, params={"ref": ref}, headers=_gitea_headers(), timeout=10)
if r.status_code == 404:
continue
r.raise_for_status()
js = r.json()
# content in base64?
if isinstance(js, dict) and "content" in js:
try:
return base64.b64decode(js["content"]).decode("utf-8", errors="ignore")
except Exception:
pass
# sommige Gitea versies hebben 'download_url'
dl = js.get("download_url") if isinstance(js, dict) else None
if dl:
rr = requests.get(dl, timeout=10, headers=_gitea_headers())
rr.raise_for_status()
return rr.text
except Exception:
continue
return ""
def gitea_repo_exists(owner: str, name: str) -> bool:
"""Controleer via de Gitea API of owner/name bestaat (en je token rechten heeft)."""
try:
r = requests.get(f"{GITEA_API}/repos/{owner}/{name}",
headers=_gitea_headers(), timeout=5)
return r.status_code == 200
except Exception:
return False
def owner_repo_from_url(url: str) -> tuple[str|None, str|None]:
"""
Probeer owner/repo uit een http(s) .git URL te halen.
Voorbeeld: http://host:3080/owner/repo.git -> ('owner', 'repo')
"""
try:
from urllib.parse import urlparse
p = urlparse(url)
parts = [x for x in (p.path or "").split("/") if x]
if len(parts) >= 2:
repo = parts[-1]
if repo.endswith(".git"):
repo = repo[:-4]
owner = parts[-2]
return owner, repo
except Exception:
pass
return None, None
# === Repo-catalogus indexeren in Meili (optioneel) en Chroma ===
def meili_get_index(name: str):
cli = get_meili()
if not cli: return None
try:
return cli.index(name)
except Exception:
try:
return cli.create_index(uid=name, options={"primaryKey":"id"})
except Exception:
return None
def meili_catalog_upsert(docs: List[dict]):
idx = meili_get_index(REPO_CATALOG_MEILI_INDEX)
if not idx or not docs: return
try:
idx.add_documents(docs)
try:
idx.update_searchable_attributes(["full_name","name","description","readme","topics","language"])
idx.update_filterable_attributes(["full_name","owner","language","topics"])
except Exception:
pass
except Exception as e:
logger.warning("WARN:agent_repo:meili_catalog_upsert: %s", e)
def meili_catalog_search(q: str, limit: int = 10) -> List[dict]:
idx = meili_get_index(REPO_CATALOG_MEILI_INDEX)
if not idx: return []
try:
2025-11-27 14:35:46 +00:00
res = idx.search(q, {"limit": limit})
2025-11-27 07:54:07 +00:00
# Gebruik ALTIJD de injectie:
2025-11-27 14:35:46 +00:00
#res = meili_search_fn(
# q,
# limit=limit,
# filter={"repo_full": st.owner_repo, "branch": st.branch_base}
#)
2025-11-06 13:42:26 +00:00
return res.get("hits", [])
except Exception as e:
logger.warning("WARN:agent_repo:meili_catalog_search: %s", e)
return []
def chroma_catalog_upsert(docs: List[dict]):
"""Indexeer/upsérten van repo-catalogus in Chroma met de GEÏNJECTEERDE embedding_function. (bij HTTP-mode embeddings client-side meezenden.)"""
try:
if not docs or _get_chroma_collection is None:
return
col = _get_chroma_collection("repo_catalog") # naam wordt in app.py gesuffixed met __<slug>__v<ver>
ids = [d["id"] for d in docs]
texts = [d["doc"] for d in docs]
metas = [d["meta"] for d in docs]
# schoon oud weg, best-effort
try:
col.delete(ids=ids)
except Exception:
pass
if _embed_documents:
embs = _embed_documents(texts)
col.add(ids=ids, documents=texts, embeddings=embs, metadatas=metas)
else:
col.add(ids=ids, documents=texts, metadatas=metas)
except Exception as e:
logger.warning("WARN:agent_repo:chroma_catalog_upsert: %s", e)
def chroma_catalog_search(q: str, n: int = 8) -> List[dict]:
try:
if _get_chroma_collection is None or _embed_query_fn is None:
return []
col = _get_chroma_collection("repo_catalog")
q_emb = _embed_query_fn(q)
res = col.query(query_embeddings=[q_emb], n_results=n, include=["documents","metadatas","distances"])
docs = (res.get("documents") or [[]])[0]
metas = (res.get("metadatas") or [[]])[0]
dists = (res.get("distances") or [[]])[0]
out = []
for doc, meta, dist in zip(docs, metas, dists):
if isinstance(meta, dict):
sim = 1.0 / (1.0 + float(dist or 0.0)) # simpele afstand→similarity
out.append({"full_name": meta.get("full_name"), "score": float(sim), "preview": doc})
return out
except Exception as e:
logger.warning("WARN:agent_repo:chroma_catalog_search: %s", e)
return []
# === Documenten maken voor catalogus ===
def build_repo_catalog_doc(meta: dict, readme: str) -> dict:
full_name = meta.get("full_name","")
name = meta.get("name","")
desc = meta.get("description","")
lang = meta.get("language","")
topics = " ".join(meta.get("topics") or [])
preview = (readme or "")[:2000]
doc = (
f"{full_name}\n"
f"{name}\n"
f"{desc}\n"
f"language: {lang}\n"
f"topics: {topics}\n"
f"README:\n{preview}"
)
return {
"id": f"repo:{full_name}",
"doc": doc,
"meta": {
"full_name": full_name,
"name": name,
"description": desc,
"language": lang,
"topics": topics,
}
}
# === Heuristische (lexicale) score als fallback ===
def lexical_repo_score(q: str, meta: dict, readme: str) -> float:
qtokens = re.findall(r"[A-Za-z0-9_]{2,}", q.lower())
text = " ".join([
meta.get("full_name",""),
meta.get("name",""),
meta.get("description",""),
" ".join(meta.get("topics") or []),
(readme or "")[:4000],
]).lower()
if not qtokens or not text:
return 0.0
score = 0
for t in set(qtokens):
score += text.count(t)
# kleine bonus als 'mainten', 'admin', 'viewer' etc tegelijk voorkomen in naam
name = (meta.get("name") or "").lower()
for t in set(qtokens):
if t in name:
score += 2
return float(score)
# === LLM-rerank voor repo's (hergebruik van je bestaande reranker) ===
async def llm_rerank_repos(user_goal: str, candidates: List[dict], topk: int = 5) -> List[dict]:
if not candidates:
return []
pack = []
for i, c in enumerate(candidates[:12], 1):
pv = c.get("preview","")[:700]
pack.append(f"{i}. REPO: {c['full_name']}\nDESC: {c.get('description','')}\nPREVIEW:\n{pv}")
prompt = (
"Rangschik onderstaande repositories op geschiktheid voor het doel. "
"Geef een geldige JSON-array met objecten: {\"full_name\":\"...\",\"score\":0-100}.\n\n"
"DOEL:\n" + user_goal + "\n\nCANDIDATES:\n" + "\n\n".join(pack)
)
try:
resp = await _llm_call(
[{"role":"system","content":"Alleen geldige JSON."},
{"role":"user","content":prompt}],
stream=False, temperature=0.0, top_p=0.9, max_tokens=600
)
raw = resp.get("choices",[{}])[0].get("message",{}).get("content","")
arr = safe_json_loads(raw)
if not isinstance(arr, list):
return candidates[:topk]
smap = {}
for d in (arr or []):
if not isinstance(d, dict):
continue
fn = d.get("full_name"); sc = d.get("score")
try:
if isinstance(fn, str):
smap[fn] = float(sc)
except Exception:
continue
#smap = {d.get("full_name"): float(d.get("score",0)) for d in arr if isinstance(d, dict) and "full_name" in d}
resc = []
for c in candidates:
resc.append({**c, "score": smap.get(c["full_name"], 0.0)/100.0})
resc.sort(key=lambda x: x.get("score",0.0), reverse=True)
return resc[:topk]
except Exception as e:
logger.warning("WARN:agent_repo:llm_rerank_repos failed: %s", e)
return candidates[:topk]
# --- Intent/goal refine ---
async def llm_refine_goal(raw_goal: str) -> tuple[str, List[str], float]:
"""
Laat LLM een compacte, concrete 'refined_goal' maken + max 2 verduidelijkingsvragen.
Retourneert (refined_goal, clarifying_questions, confidence(0..1)).
"""
SYSTEM = "Geef uitsluitend geldige JSON; geen uitleg."
USER = (
"Vat de bedoeling van deze opdracht ultra-kort en concreet samen als 'refined_goal'. "
"Als er kritieke onduidelijkheden zijn: geef max 2 korte 'clarifying_questions'. "
"Geef ook 'confidence' (0..1). JSON:\n"
"{ \"refined_goal\": \"...\", \"clarifying_questions\": [\"...\"], \"confidence\": 0.0 }\n\n"
f"RAW_GOAL:\n{raw_goal}"
)
try:
resp = await _llm_call(
[{"role":"system","content":SYSTEM},{"role":"user","content":USER}],
stream=False, temperature=0.0, top_p=0.9, max_tokens=300
)
raw = resp.get("choices",[{}])[0].get("message",{}).get("content","")
js = safe_json_loads(raw) or {}
rg = (js.get("refined_goal") or "").strip() or raw_goal
qs = [q.strip() for q in (js.get("clarifying_questions") or []) if isinstance(q, str) and q.strip()][:2]
cf = float(js.get("confidence", 0.0) or 0.0)
cf = max(0.0, min(1.0, cf))
return rg, qs, cf
except Exception as e:
logger.warning("WARN:agent_repo:llm_refine_goal failed: %s", e)
return raw_goal, [], 0.0
# === Discovery pipeline ===
async def discover_candidate_repos(user_goal: str) -> List[dict]:
"""Zoek een passende repo puur op basis van de vraag (zonder hint)."""
#repos = gitea_list_all_repos(limit=AGENT_DISCOVER_MAX_REPOS)
repos = await run_io_blocking(gitea_list_all_repos, limit=AGENT_DISCOVER_MAX_REPOS)
if not repos:
return []
# Concurrerende fetch (beperk paralleliteit licht voor stabiliteit)
sem = asyncio.Semaphore(int(os.getenv("AGENT_DISCOVER_README_CONCURRENCY", "8")))
async def _fetch_readme(m):
async with sem:
return await run_io_blocking(
gitea_fetch_readme,
m.get("owner",""), m.get("name",""), m.get("default_branch","main")
)
readmes = await asyncio.gather(*[_fetch_readme(m) for m in repos], return_exceptions=True)
# Verzamel README's (kort) en bouw catalogus docs
docs_meili = []
docs_chroma = []
cands = []
for i, m in enumerate(repos):
#readme = gitea_fetch_readme(m.get("owner",""), m.get("name",""), m.get("default_branch","main"))
readme = "" if isinstance(readmes[i], Exception) else (readmes[i] or "")
doc = build_repo_catalog_doc(m, readme)
docs_chroma.append(doc)
docs_meili.append({
"id": m["full_name"],
"full_name": m["full_name"],
"name": m.get("name",""),
"owner": m.get("owner",""),
"description": m.get("description",""),
"language": m.get("language",""),
"topics": " ".join(m.get("topics") or []),
"readme": (readme or "")[:5000],
})
cands.append({
"full_name": m["full_name"],
"description": m.get("description",""),
"clone_url": m.get("clone_url"),
"preview": (readme or "")[:1200],
"base_score": 0.0, # vullen we zo
})
# Indexeer catalogus (best effort)
if MEILI_URL:
meili_catalog_upsert(docs_meili)
chroma_catalog_upsert(docs_chroma)
# Multi-query expand
queries = await llm_expand_queries(user_goal, extract_quotes(user_goal), extract_word_hints(user_goal), k=5)
# Heuristische score + Meili/Chroma boosts
score_map: Dict[str, float] = {c["full_name"]: 0.0 for c in cands}
for q in queries:
# lexicale score
for i, m in enumerate(repos):
score_map[m["full_name"]] += 0.2 * lexical_repo_score(q, m, (docs_meili[i].get("readme") if i < len(docs_meili) else ""))
# Meili boost
if MEILI_URL:
hits = meili_catalog_search(q, limit=10)
for h in hits:
fn = h.get("full_name")
if fn in score_map:
score_map[fn] += 2.0
# Chroma boost
chroma_hits = chroma_catalog_search(q, n=6)
for h in chroma_hits:
fn = h.get("full_name")
if fn in score_map:
score_map[fn] += 1.2
# Combineer in kandidaten
for c in cands:
c["score"] = score_map.get(c["full_name"], 0.0)
# Snelle preselectie
cands.sort(key=lambda x: x["score"], reverse=True)
pre = cands[:8]
# LLM rerank met uitleg-score
top = await llm_rerank_repos(user_goal, pre, topk=5)
return top
# ---------- Chroma collection naam ----------
def sanitize_collection_name(s: str) -> str:
s = re.sub(r"[^A-Za-z0-9._-]+", "-", s).strip("-")[:128]
return s or "code_docs"
def repo_collection_name(owner_repo: str | None, branch: str) -> str:
return sanitize_collection_name(f"code_docs-{owner_repo or 'repo'}-{branch}")
def _get_session_id(messages: List[dict], request) -> str:
for m in messages:
if m.get("role") == "system" and str(m.get("content","")).startswith("session:"):
return str(m["content"]).split("session:",1)[1].strip()
key = (messages[0].get("content","") + "|" + _client_ip(request)).encode("utf-8", errors="ignore")
return hashlib.sha256(key).hexdigest()[:16]
# ---------- Files & filters ----------
def allowed_file(p: Path) -> bool:
lo = p.name.lower()
return any(lo.endswith(ext) for ext in ALLOWED_EXTS)
def list_repo_files(repo_root: Path) -> List[str]:
# lichte TTL-cache om herhaalde rglob/IO te beperken (sneller bij multi-queries)
ttl = float(os.getenv("AGENT_LIST_CACHE_TTL", "20"))
key = str(repo_root.resolve())
now = time.time()
if key in _LIST_FILES_CACHE:
ts, cached = _LIST_FILES_CACHE[key]
if now - ts <= ttl:
return list(cached)
files: List[str] = []
for p in repo_root.rglob("*"):
if p.is_dir(): continue
if any(part in _PROFILE_EXCLUDE_DIRS for part in p.parts): continue
try:
if p.stat().st_size > 2_000_000: continue
except Exception:
continue
if not allowed_file(p): continue
files.append(str(p.relative_to(repo_root)))
_LIST_FILES_CACHE[key] = (now, files)
return files
# ---------- Query parsing ----------
def extract_quotes(text: str) -> List[str]:
if not text: return []
t = (text or "").replace("","\"").replace("","\"").replace("","'").strip()
return re.findall(r"['\"]([^'\"]{2,})['\"]", t)
def extract_word_hints(text: str) -> List[str]:
if not text: return []
words = set(re.findall(r"[A-Za-z_][A-Za-z0-9_]{1,}", text))
blacklist = {"de","het","een","and","the","voor","naar","op","in","of","to","is","are","van","met","die","dat"}
return [w for w in words if w.lower() not in blacklist]
# ---------- SAFE JSON loader ----------
def safe_json_loads(s: str):
if not s: return None
t = s.strip()
if t.startswith("```"):
t = re.sub(r"^```(?:json)?", "", t.strip(), count=1).strip()
if t.endswith("```"): t = t[:-3].strip()
try:
return json.loads(t)
except Exception:
return None
# ---------- Meilisearch (optioneel) ----------
_meili_client = None
def get_meili():
global _meili_client
if _meili_client is not None:
return _meili_client
if not MEILI_URL:
return None
try:
from meilisearch import Client
_meili_client = Client(MEILI_URL, MEILI_KEY or None)
return _meili_client
except Exception as e:
logger.warning("WARN:agent_repo:Meilisearch not available: %s", e)
return None
def meili_index_name(owner_repo: Optional[str], branch: str) -> str:
base = sanitize_collection_name((owner_repo or "repo") + "-" + branch)
return sanitize_collection_name(f"{MEILI_INDEX_PREFIX}-{base}")
# --- Slimmere, taalbewuste chunker ---
_LANG_BY_EXT = {
".php": "php", ".blade.php": "blade", ".js": "js", ".ts": "ts",
".jsx": "js", ".tsx": "ts", ".py": "py", ".go": "go",
".rb": "rb", ".java": "java", ".cs": "cs",
".css": "css", ".scss": "css",
".html": "html", ".htm": "html", ".md": "md",
".yml": "yaml", ".yaml": "yaml", ".toml": "toml", ".ini": "ini",
".json": "json",
}
def _detect_lang_from_path(path: str) -> str:
lo = path.lower()
for ext, lang in _LANG_BY_EXT.items():
if lo.endswith(ext):
return lang
return "txt"
def _find_breakpoints(text: str, lang: str) -> list[int]:
"""
Retourneer lijst met 'mooie' breekposities (char indices) om chunks te knippen.
We houden het conservatief; false-positives zijn OK (we kiezen toch dichtstbij).
"""
bps = set()
# Altijd: lege-regelblokken en paragrafen
for m in re.finditer(r"\n\s*\n\s*", text):
bps.add(m.end())
if lang in ("php", "js", "ts", "java", "cs", "go", "rb", "py"):
# Functie/klasse boundaries
pats = [
r"\n\s*(class|interface|trait)\s+[A-Za-z_][A-Za-z0-9_]*\b",
r"\n\s*(public|private|protected|static|\s)*\s*function\b",
r"\n\s*def\s+[A-Za-z_][A-Za-z0-9_]*\s*\(", # py
r"\n\s*func\s+[A-Za-z_][A-Za-z0-9_]*\s*\(", # go
r"\n\s*[A-Za-z0-9_<>\[\]]+\s+[A-Za-z_][A-Za-z0-9_]*\s*\(", # java/cs method-ish
r"\n\}", # sluitende brace op kolom 0 → goed eind
]
for p in pats:
for m in re.finditer(p, text):
bps.add(m.start())
if lang == "blade":
for p in [r"\n\s*@section\b", r"\n\s*@endsection\b", r"\n\s*@if\b", r"\n\s*@endif\b", r"\n\s*<\w"]:
for m in re.finditer(p, text, flags=re.I):
bps.add(m.start())
if lang in ("html", "css"):
for p in [r"\n\s*<\w", r"\n\s*</\w", r"\n\s*}\s*\n"]:
for m in re.finditer(p, text):
bps.add(m.start())
if lang in ("md",):
for p in [r"\n#+\s", r"\n\-{3,}\n", r"\n\*\s", r"\n\d+\.\s"]:
for m in re.finditer(p, text):
bps.add(m.start())
if lang in ("yaml", "toml", "ini"):
# secties/keys aan kolom 0
for m in re.finditer(r"\n[A-Za-z0-9_\-]+\s*[:=]", text):
bps.add(m.start())
# JSON: split op object/array boundaries (conservatief: op { of [ aan kolom 0-ish)
if lang == "json":
for m in re.finditer(r"\n\s*[\{\[]\s*\n", text):
bps.add(m.start())
# Altijd: regelgrenzen
for m in re.finditer(r"\n", text):
bps.add(m.start()+1)
# sorteer & filter binnen range
out = sorted([bp for bp in bps if 0 < bp < len(text)])
return out
def smart_chunk_text(text: str, path_hint: str, target_chars: int = 1800,
hard_max: int = 2600, min_chunk: int = 800) -> List[str]:
"""
Chunk op ~target_chars, maar breek op dichtstbijzijnde semantische breakpoint.
- Als geen goed breakpoint: breek op dichtstbijzijnde newline.
- Adaptieve overlap: 200 bij nette break, 350 bij 'ruwe' break.
"""
if not text:
return []
lang = _detect_lang_from_path(path_hint or "")
bps = _find_breakpoints(text, lang)
if not bps:
# fallback: vaste stappen met overlap
chunks = []
i, n = 0, len(text)
step = max(min_chunk, target_chars - 300)
while i < n:
j = min(n, i + target_chars)
chunks.append(text[i:j])
i = min(n, i + step)
return chunks
chunks = []
i, n = 0, len(text)
while i < n:
# streef naar i+target_chars, maar zoek 'mooie' breakpoints tussen [i+min_chunk, i+hard_max]
ideal = i + target_chars
lo = i + min_chunk
hi = min(n, i + hard_max)
# kandidaten = bps in range
candidates = [bp for bp in bps if lo <= bp <= hi]
if not candidates:
# geen mooie; breek grof op ideal of n
j = min(n, ideal)
chunk = text[i:j]
chunks.append(chunk)
# grotere overlap (ruw)
i = j - 350 if j - 350 > i else j
continue
# kies dichtstbij het ideaal
j = min(candidates, key=lambda bp: abs(bp - ideal))
chunk = text[i:j]
chunks.append(chunk)
# nette break → kleine overlap
i = j - 200 if j - 200 > i else j
# schoon lege/te-kleine staarten
out = [c for c in chunks if c and c.strip()]
return out
def meili_index_repo(repo_root: Path, owner_repo: Optional[str], branch: str):
cli = get_meili()
if not cli: return
idx_name = meili_index_name(owner_repo, branch)
try:
idx = cli.index(idx_name)
except Exception:
idx = cli.create_index(uid=idx_name, options={"primaryKey":"id"})
docs = []
bm25_docs = [] # ← verzamel hier voor BM25
count = 0
for rel in list_repo_files(repo_root):
p = repo_root / rel
try:
txt = _read_text_file(p) or ""
except Exception:
continue
for ci, chunk in enumerate(smart_chunk_text(txt, rel, target_chars=int(os.getenv("CHUNK_TARGET_CHARS","1800")),hard_max=int(os.getenv("CHUNK_HARD_MAX","2600")),min_chunk=int(os.getenv("CHUNK_MIN_CHARS","800")))):
doc_id = f"{owner_repo}:{branch}:{rel}:{ci}"
item = {"id": doc_id, "path": rel, "repo": owner_repo, "branch": branch, "content": chunk}
docs.append(item)
bm25_docs.append(item) # ← ook hier
count += 1
if len(docs) >= 1000:
idx.add_documents(docs); docs.clear()
if docs:
idx.add_documents(docs)
try:
idx.update_searchable_attributes(["content","path","repo","branch"])
idx.update_filterable_attributes(["repo","branch","path"])
except Exception:
pass
logger.info("INFO:agent_repo:meili indexed ~%d chunks into %s", count, idx_name)
# Lokale BM25 cache opbouwen uit bm25_docs (niet uit docs dat intussen leeg kan zijn)
try:
if BM25Okapi and bm25_docs:
toks = [re.findall(r"[A-Za-z0-9_]+", d["content"].lower()) for d in bm25_docs]
bm = BM25Okapi(toks) if toks else None
if bm:
_BM25_CACHE[idx_name] = {"bm25": bm, "docs": bm25_docs}
except Exception as e:
logger.warning("WARN:agent_repo:bm25 build failed: %s", e)
def meili_search(owner_repo: Optional[str], branch: str, q: str, limit: int = 10) -> List[dict]:
cli = get_meili()
if not cli: return []
try:
2025-11-27 14:35:46 +00:00
idx = cli.index(meili_index_name(owner_repo, branch))
res = idx.search(q, {"limit": limit})
2025-11-27 07:54:07 +00:00
# Gebruik ALTIJD de injectie:
2025-11-27 14:35:46 +00:00
#res = meili_search_fn(
# q,
# limit=limit,
# filter={"repo_full": st.owner_repo, "branch": st.branch_base}
#)
2025-11-06 13:42:26 +00:00
return res.get("hits", [])
except Exception as e:
logger.warning("WARN:agent_repo:meili_search failed: %s", e)
return []
# ---------- BM25 fallback ----------
_BM25_CACHE: Dict[str, dict] = {}
# module-scope
_BM25_BY_REPO: dict[str, tuple[BM25Okapi, list[dict]]] = {}
def _tok(s: str) -> list[str]:
return re.findall(r"[A-Za-z0-9_]+", s.lower())
# --- Lightweight symbol index (in-memory, per repo collection) ---
_SYMBOL_INDEX: dict[str, dict[str, dict[str, int]]] = {}
# structuur: { collection_name: { symbol_lower: { path: count } } }
def bm25_index_name(owner_repo: Optional[str], branch: str) -> str:
return meili_index_name(owner_repo, branch) # dezelfde naam, andere cache
def bm25_build_index(repo_root: Path, owner_repo: Optional[str], branch: str):
# hergebruik meili_index_repos docs-opbouw om dubbele IO te vermijden? Hier snel en lokaal:
if not BM25Okapi:
return
idx_name = bm25_index_name(owner_repo, branch)
docs = []
for rel in list_repo_files(repo_root):
p = repo_root / rel
try:
txt = _read_text_file(p) or ""
except Exception:
continue
for ci, chunk in enumerate(smart_chunk_text(txt, rel,
target_chars=int(os.getenv("CHUNK_TARGET_CHARS","1800")),
hard_max=int(os.getenv("CHUNK_HARD_MAX","2600")),
min_chunk=int(os.getenv("CHUNK_MIN_CHARS","800")))):
docs.append({"id": f"{owner_repo}:{branch}:{rel}:{ci}", "path": rel, "repo": owner_repo, "branch": branch, "content": chunk})
toks = [re.findall(r"[A-Za-z0-9_]+", d["content"].lower()) for d in docs]
if toks:
_BM25_CACHE[idx_name] = {"bm25": BM25Okapi(toks), "docs": docs}
def bm25_search(owner_repo: Optional[str], branch: str, q: str, limit: int = 10) -> List[dict]:
idx = _BM25_CACHE.get(bm25_index_name(owner_repo, branch))
if not idx:
return []
bm = idx.get("bm25"); docs = idx.get("docs") or []
if not bm:
return []
toks = re.findall(r"[A-Za-z0-9_]+", (q or "").lower())
if not toks:
return []
scores = bm.get_scores(toks)
order = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True)[:limit]
return [docs[i] for i in order]
def _extract_symbols_generic(path: str, text: str) -> list[str]:
"""
Ultra-simpele symbol scraper (taal-agnostisch):
- class/interface/trait namen
- function foo(...), Foo::bar, "Controller@method"
- Laravel: ->name('route.name')
- React-ish: function Foo(...) { return ( ... ) }, export default function Foo(...)
- Blade-ish: @section('...'), @component('...'), <x-foo-bar>
- Basename van file als pseudo-symbool
"""
if not text:
return []
syms = set()
for m in re.finditer(r"\b(class|interface|trait)\s+([A-Za-z_][A-Za-z0-9_\\]*)", text):
syms.add(m.group(2))
for m in re.finditer(r"\bfunction\s+([A-Za-z_][A-Za-z0-9_]*)\s*\(", text):
syms.add(m.group(1))
for m in re.finditer(r"([A-Za-z_][A-Za-z0-9_\\]*)::([A-Za-z_][A-Za-z0-9_]*)", text):
syms.add(m.group(1) + "::" + m.group(2))
for m in re.finditer(r"['\"]([A-Za-z0-9_\\]+)@([A-Za-z0-9_]+)['\"]", text):
syms.add(m.group(1) + "@" + m.group(2))
for m in re.finditer(r"->\s*name\s*\(\s*['\"]([^'\"]+)['\"]\s*\)", text):
syms.add(m.group(1))
for m in re.finditer(r"\bfunction\s+([A-Z][A-Za-z0-9_]*)\s*\(", text):
syms.add(m.group(1))
for m in re.finditer(r"export\s+default\s+function\s+([A-Za-z_][A-Za-z0-9_]*)\s*\(", text):
syms.add(m.group(1))
for m in re.finditer(r"@\s*(section|component|slot)\s*\(\s*['\"]([^'\"]+)['\"]\s*\)", text):
syms.add(m.group(2))
for m in re.finditer(r"<\s*x-([a-z0-9\-:]+)", text, flags=re.IGNORECASE):
syms.add("x-" + m.group(1).lower())
base = os.path.basename(path)
if base:
syms.add(base)
return list(syms)
def _symbol_index_name(owner_repo: Optional[str], branch: str) -> str:
return repo_collection_name(owner_repo, branch)
def symbol_index_repo(repo_root: Path, owner_repo: Optional[str], branch: str):
"""Best-effort: bouw/refresh symbol index voor dit repo/branch."""
try:
coll = _symbol_index_name(owner_repo, branch)
store: dict[str, dict[str, int]] = {}
for rel in list_repo_files(repo_root):
p = repo_root / rel
try:
if p.stat().st_size > 500_000:
continue
txt = _read_text_file(p) or ""
except Exception:
continue
for s in _extract_symbols_generic(rel, txt):
k = s.strip().lower()
if not k:
continue
bucket = store.setdefault(k, {})
bucket[rel] = bucket.get(rel, 0) + 1
_SYMBOL_INDEX[coll] = store
except Exception as e:
logger.warning("WARN:agent_repo:symbol_index_repo: %s", e)
def symbol_search(owner_repo: Optional[str], branch: str, q: str, limit: int = 10) -> list[tuple[str, int]]:
"""Eenvoudige symbol-zoeker -> [(path, score)]."""
coll = _symbol_index_name(owner_repo, branch)
idx = _SYMBOL_INDEX.get(coll) or {}
if not idx or not q:
return []
quoted = re.findall(r"['\"]([^'\"]{2,})['\"]", q)
words = re.findall(r"[A-Za-z0-9_:\\.\-]{2,}", q)
seen = set(); tokens = []
for t in quoted + words:
tl = t.lower()
if tl not in seen:
seen.add(tl); tokens.append(tl)
scores: dict[str, int] = {}
# exact
for t in tokens[:12]:
if t in idx:
for path, c in idx[t].items():
scores[path] = scores.get(path, 0) + 3 * c
# zachte substring
for sym, paths in idx.items():
if t in sym:
for path, c in paths.items():
scores[path] = scores.get(path, 0) + 1
return sorted(scores.items(), key=lambda x: x[1], reverse=True)[:limit]
# ---------- Signal-first scan ----------
def glob_match(rel: str, patterns: List[str]) -> bool:
for pat in patterns or []:
if fnmatch.fnmatch(rel, pat):
return True
return False
def scan_with_signals(repo_root: Path, files: List[str], sig: dict, phrase_boosts: List[str], hint_boosts: List[str], limit: int = 20) -> List[Tuple[str,int,dict]]:
file_globs = sig.get("file_globs") or []
must = [s.lower() for s in (sig.get("must_substrings") or [])]
maybe = [s.lower() for s in (sig.get("maybe_substrings") or [])]
regexes = sig.get("regexes") or []
path_hints = [s.lower() for s in (sig.get("path_hints") or [])]
exclude_dirs = set(sig.get("exclude_dirs") or [])
maybe = list(set(maybe + [p.lower() for p in phrase_boosts]))[:20]
path_hints = list(set(path_hints + [h.lower() for h in hint_boosts]))[:20]
scored: List[Tuple[str,int,dict]] = []
for rel in files:
if any(part in exclude_dirs for part in Path(rel).parts): continue
if file_globs and not glob_match(rel, file_globs): continue
score = 0
meta = {"must_hits":0,"maybe_hits":0,"regex_hits":0,"path_hits":0,"phrase_hits":0}
rel_lo = rel.lower()
for h in path_hints:
if h and h in rel_lo: meta["path_hits"] += 1; score += 1
try:
txt = _read_text_file(repo_root / rel) or ""
except Exception:
continue
txt_lo = txt.lower()
if any(m and (m not in txt_lo) for m in must):
continue
meta["must_hits"] = len([m for m in must if m and m in txt_lo]); score += 3*meta["must_hits"]
meta["maybe_hits"] = len([m for m in maybe if m and m in txt_lo]); score += meta["maybe_hits"]
for rp in regexes:
try:
if re.search(rp, txt, flags=re.IGNORECASE|re.DOTALL):
meta["regex_hits"] += 1; score += 2
except re.error:
pass
phrase_hits = 0
for ph in phrase_boosts:
if ph and ph.lower() in txt_lo:
phrase_hits += 1
if phrase_hits:
meta["phrase_hits"] = phrase_hits
score += 2*phrase_hits
if score > 0:
scored.append((rel, score, meta))
scored.sort(key=lambda x: x[1], reverse=True)
return scored[:limit]
# ---------- Simple keyword fallback ----------
def simple_keyword_search(repo_root: Path, files: List[str], query: str, limit: int = 8) -> List[Tuple[str,int]]:
toks = set(re.findall(r"[A-Za-z0-9_]{2,}", (query or "").lower()))
scores: List[Tuple[str,int]] = []
for rel in files:
score = 0
lo = rel.lower()
for t in toks:
if t in lo: score += 1
if score == 0:
try:
txt = _read_text_file(Path(repo_root) / rel) or ""
txt_lo = txt.lower()
score += sum(txt_lo.count(t) for t in toks)
except Exception:
pass
if score > 0: scores.append((rel, score))
scores.sort(key=lambda x: x[1], reverse=True)
return scores[:limit]
# ---------- Expliciete paden ----------
def best_path_by_basename(all_files: List[str], hint: str) -> str | None:
base = os.path.basename(hint)
if not base: return None
hint_tokens = set(re.findall(r"[A-Za-z0-9_]+", hint.lower()))
scored = []
for rel in all_files:
if os.path.basename(rel).lower() == base.lower():
score = 1
lo = rel.lower()
for t in hint_tokens:
if t in lo: score += 1
scored.append((rel, score))
if not scored: return None
scored.sort(key=lambda x: x[1], reverse=True)
return scored[0][0]
# ---------- Hybrid RAG ----------
def _append_ctx_preview(answer: str, chunks: list[dict], limit: int = 12) -> str:
paths = []
for h in chunks:
meta = h.get("metadata") or {}
p = meta.get("path");
if p and p not in paths: paths.append(p)
if not paths: return answer
head = paths[:limit]
return answer + "\n\n--- context (paths) ---\n" + "\n".join(f"- {p}" for p in head)
async def smart_rag_answer(messages: list[dict], *, n_ctx: int = 8,
owner_repo: Optional[str] = None,
branch: Optional[str] = None,
collection_name: Optional[str] = None,
add_preview: bool = True) -> str:
# 1) intent
spec = await enrich_intent(_llm_call, messages)
task = (spec.get("task") or "").strip()
if not task:
return "Geen vraag gedetecteerd."
# 2) queries
variants = await expand_queries(_llm_call, task, k=3)
# 3) hybrid retrieve (let op: gebruik dezelfde collectie als index; 'code_docs' wordt in app.py al versied via _collection_versioned)
# resolve collection: expliciet > (owner_repo,branch) > default
coll = collection_name or (repo_collection_name(owner_repo, branch or AGENT_DEFAULT_BRANCH) if owner_repo else "code_docs")
all_hits = []
for q in variants:
hits = await hybrid_retrieve(
_rag_query_internal,
q,
n_results=n_ctx,
per_query_k=max(30, n_ctx * 6),
alpha=0.6,
# expliciet doorgeven om zeker de juiste (versie-gesufficede) collectie te raken:
collection_name=coll,
)
all_hits.extend(hits)
# dedup op path + chunk_index
seen = set()
uniq = []
for h in sorted(all_hits, key=lambda x: x.get("score", 0), reverse=True):
meta = h.get("metadata") or {}
key = (meta.get("path"), meta.get("chunk_index"))
if key in seen:
continue
seen.add(key)
uniq.append(h)
if len(uniq) >= n_ctx:
break
# 4) context
ctx, top = assemble_context(uniq, max_chars=int(os.getenv("REPO_AGENT_CONTEXT_CHARS","640000")))
if not ctx:
return "Geen context gevonden."
# 5) laat LLM antwoorden
sys = "Beantwoord concreet en kort. Citeer relevante paths. Als iets onzeker is: zeg dat."
usr = f"Vraag: {task}\n\n--- CONTEXT ---\n{ctx}"
resp = await _llm_call(
[{"role":"system","content":sys},{"role":"user","content":usr}],
stream=False, temperature=0.2, top_p=0.9, max_tokens=700
)
ans = (resp.get("choices",[{}])[0].get("message",{}) or {}).get("content","")
return _append_ctx_preview(ans, uniq) if (add_preview and os.getenv("REPO_AGENT_PREVIEW","1") not in ("0","false")) else ans
async def llm_expand_queries(user_goal: str, quotes: List[str], hints: List[str], k: int = 5, extra_seeds: Optional[List[str]] = None) -> List[str]: # already defined above
# (duplicate name kept intentionally — Python allows redef; using the latest one)
seed = []
if quotes: seed += quotes
if hints: seed += hints[:6]
if extra_seeds: seed += extra_seeds[:6]
seed = list(dict.fromkeys(seed))[:8]
prompt = (
f"Maak {k} alternatieve zoekqueries (kort, divers). Mix NL/EN, synoniemen, veldnamen."
" Alleen geldige JSON-array met strings.\n"
f"Doel:\n{user_goal}\n\nHints:\n" + ", ".join(seed)
)
try:
resp = await _llm_call(
[{"role":"system","content":"Alleen geldige JSON, geen uitleg."},
{"role":"user","content":prompt}],
stream=False, temperature=0.3, top_p=0.9, max_tokens=400
)
raw = resp.get("choices",[{}])[0].get("message",{}).get("content","")
arr = safe_json_loads(raw)
base = [user_goal]
if isinstance(arr, list):
base += [s for s in arr if isinstance(s, str) and s.strip()]
out = []
for q in base:
qn = re.sub(r"\s+", " ", q.strip())
if qn and qn not in out: out.append(qn)
return out[:1+k]
except Exception as e:
logger.warning("WARN:agent_repo:llm_expand_queries failed: %s", e)
return [user_goal]
def get_file_preview(repo_root: Path, rel: str, terms: List[str], window: int = 180) -> str:
try:
txt = _read_text_file(repo_root / rel) or ""
except Exception:
return ""
if not txt: return ""
if not terms: return txt[:window*2]
lo = txt.lower()
for t in terms:
i = lo.find(t.lower())
if i >= 0:
a = max(0, i - window); b = min(len(txt), i + len(t) + window)
return txt[a:b]
return txt[:window*2]
async def llm_rerank_candidates(user_goal: str, candidates: List[dict], topk: int = 8) -> List[dict]:
if not candidates: return []
pack = []
for i, c in enumerate(candidates[:20], 1):
pv = c.get("preview","")[:600]
pth = c["path"]
base = os.path.basename(pth)
dr = os.path.dirname(pth)
pack.append(f"{i}. PATH: {pth}\nDIR: {dr}\nBASENAME: {base}\nPREVIEW:\n{pv}")
prompt = (
"Rangschik de onderstaande codefragmenten op relevantie om het doel te behalen. "
"Geef een JSON-array met objecten: {\"path\":\"...\",\"score\":0-100}."
"\n\nDOEL:\n" + user_goal + "\n\nFRAGMENTEN:\n" + "\n\n".join(pack)
)
try:
resp = await _llm_call(
[{"role":"system","content":"Alleen geldige JSON zonder uitleg."},
{"role":"user","content":prompt}],
stream=False, temperature=0.0, top_p=0.9, max_tokens=600
)
raw = resp.get("choices",[{}])[0].get("message",{}).get("content","")
arr = safe_json_loads(raw)
if not isinstance(arr, list):
return candidates[:topk]
score_map = {d.get("path"): float(d.get("score",0)) for d in arr if isinstance(d, dict) and "path" in d}
rescored = []
for c in candidates:
rescored.append({**c, "score": score_map.get(c["path"], 0.0)})
rescored.sort(key=lambda x: x.get("score",0.0), reverse=True)
return rescored[:topk]
except Exception as e:
logger.warning("WARN:agent_repo:llm_rerank_candidates failed: %s", e)
return candidates[:topk]
def _rrf_fuse_paths(*ordered_lists: List[str], k: int = int(os.getenv("RRF_K","60"))) -> List[str]:
"""
Neem meerdere geordende padlijsten (beste eerst) en geef een RRF-fusie.
"""
acc = defaultdict(float)
for lst in ordered_lists:
for i, p in enumerate(lst):
acc[p] += 1.0 / (k + i + 1)
# path prior
def _prior(p: str) -> float:
return (
(0.35 if p.lower().startswith("routes/") else 0.0) +
(0.30 if p.lower().startswith("app/http/controllers/") else 0.0) +
(0.25 if p.lower().startswith("resources/views/") or p.lower().endswith(".blade.php") else 0.0) +
(0.12 if p.lower().startswith(("src/","app/","lib/","pages/","components/")) else 0.0) +
(0.05 if p.lower().endswith((".php",".ts",".tsx",".js",".jsx",".py",".go",".rb",".java",".cs",".vue",".html",".md")) else 0.0) -
(0.10 if ("/tests/" in p.lower() or p.lower().startswith(("tests/","test/"))) else 0.0) -
(0.10 if p.lower().endswith((".lock",".map",".min.js",".min.css")) else 0.0)
)
for p in list(acc.keys()):
acc[p] += float(os.getenv("RRF_PATH_PRIOR_WEIGHT","0.25")) * _prior(p)
return [p for p,_ in sorted(acc.items(), key=lambda t: t[1], reverse=True)]
async def hybrid_rag_select_paths(repo_root: Path,
owner_repo: Optional[str],
branch: str,
user_goal: str,
all_files: List[str],
max_out: int = 8) -> List[str]:
quotes = extract_quotes(user_goal)
hints = extract_word_hints(user_goal)
# signals
sig_messages = [
{"role":"system","content":"Produceer alleen geldige JSON zonder uitleg."},
{"role":"user","content":(
"Bedenk een compacte zoekstrategie als JSON om relevante bestanden te vinden (globs/must/maybe/regex/path_hints/excludes). Wijziging:\n"
+ user_goal
)}
]
try:
resp = await _llm_call(sig_messages, stream=False, temperature=0.1, top_p=0.9, max_tokens=384)
raw = resp.get("choices",[{}])[0].get("message",{}).get("content","").strip()
sig = safe_json_loads(raw) or {}
except Exception as e:
logger.warning("WARN:agent_repo:signals LLM failed: %s", e)
sig = {}
# Tweepassig: eerst lenient (recall), dan strict (precision)
sig_lenient = dict(sig or {})
sig_lenient["must_substrings"] = []
sig_lenient["regexes"] = []
scan_hits_lenient = scan_with_signals(
repo_root, all_files, sig_lenient,
phrase_boosts=quotes, hint_boosts=hints, limit=24
)
scan_hits_strict = scan_with_signals(
repo_root, all_files, sig,
phrase_boosts=quotes, hint_boosts=hints, limit=20
)
# combineer met voorkeur voor strict
seen_paths_local = set()
prepicked = []
for rel, _sc, _m in scan_hits_strict + scan_hits_lenient:
if rel not in seen_paths_local:
seen_paths_local.add(rel); prepicked.append(rel)
# --- NIEUW: expliciete pad-hints uit de user prompt voorrang geven ---
try:
explicit = extract_explicit_paths(user_goal)
except Exception:
explicit = []
explicit_resolved: List[str] = []
for ep in explicit:
if ep in all_files:
explicit_resolved.append(ep)
else:
bp = best_path_by_basename(all_files, ep)
if bp: explicit_resolved.append(bp)
# plaats expliciete paden vooraan met dedupe
for ep in reversed(explicit_resolved):
if ep not in seen_paths_local:
prepicked.insert(0, ep); seen_paths_local.add(ep)
# lichte stack-seeds
seeds = []
if (repo_root / "artisan").exists() or (repo_root / "composer.json").exists():
seeds += ["Route::get", "Controller", "blade", "resources/views", "routes/web.php", "app/Http/Controllers"]
if (repo_root / "package.json").exists():
seeds += ["component", "pages", "src/components", "useState", "useEffect"]
queries = await llm_expand_queries(user_goal, quotes, hints, k=5, extra_seeds=seeds)
chroma_paths: List[str] = []
for q in queries:
try:
rag_res = await _rag_query_internal(
query=q, n_results=RAG_TOPK,
# zoek in de versie-consistente collectie:
collection_name=repo_collection_name(owner_repo, branch),
repo=None, path_contains=None, profile=None
)
for item in rag_res.get("results", []):
meta = item.get("metadata") or {}
pth = meta.get("path")
if pth and pth in all_files:
chroma_paths.append(pth)
except Exception as e:
logger.warning("WARN:agent_repo:Chroma query failed: %s", e)
meili_paths: List[str] = []
if MEILI_URL:
for q in queries:
hits = meili_search(owner_repo, branch, q, limit=RAG_TOPK)
for h in hits:
p = h.get("path")
if p and p in all_files:
meili_paths.append(p)
else:
# BM25 fallback wanneer Meili uit staat
# zorg dat er een (eenmalige) index is
try:
if bm25_index_name(owner_repo, branch) not in _BM25_CACHE:
bm25_build_index(repo_root, owner_repo, branch)
except Exception:
pass
for q in queries:
hits = bm25_search(owner_repo, branch, q, limit=RAG_TOPK)
for h in hits:
p = h.get("path")
if p and p in all_files:
meili_paths.append(p)
try:
laravel_picks = laravel_signal_candidates(repo_root, user_goal, all_files, max_out=6)
except Exception:
laravel_picks = []
# --- NIEUW: Symbol-driven candidates ---
sym_hits = symbol_search(owner_repo, branch, user_goal, limit=12)
sym_paths = [p for p, _sc in sym_hits if p in all_files]
# RRF-fusie van bronnen + Laravel-picks
#fused = _rrf_fuse_paths(prepicked, chroma_paths, meili_paths, laravel_picks)
# --- Optionele RRF-fusie van kanalen (standaard UIT) ---
use_rrf = str(os.getenv("RRF_ENABLE", "1")).lower() in ("1","true","yes")
if use_rrf:
k = int(os.getenv("RRF_K", "30"))
# eenvoudige gewichten per kanaal (pas aan via env)
w_signals = float(os.getenv("RRF_W_SIGNALS", "1.0"))
w_chroma = float(os.getenv("RRF_W_CHROMA", "1.0"))
w_meili = float(os.getenv("RRF_W_MEILI", "0.8"))
w_sym = float(os.getenv("RRF_W_SYMBOLS", "1.3"))
w_lara = float(os.getenv("RRF_W_LARAVEL", "1.2"))
sources = [
("signals", prepicked, w_signals),
("chroma", chroma_paths, w_chroma),
("meili", meili_paths, w_meili),
("symbols", sym_paths, w_sym),
("laravel", laravel_picks,w_lara),
]
rrf_scores: dict[str, float] = {}
seen_any = set()
for _name, paths, w in sources:
for rank, p in enumerate(paths, start=1):
if p not in all_files:
continue
seen_any.add(p)
rrf_scores[p] = rrf_scores.get(p, 0.0) + (w * (1.0 / (k + rank)))
# kies top op basis van RRF; val terug op union als leeg
fused_paths = [p for p, _ in sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True)]
base_pool = fused_paths[: max_out*3] if fused_paths else []
# bouw pool (met dedupe) + vul aan met de oude volgorde indien nodig
pool, seen = [], set()
def add(p):
if p not in seen and p in all_files:
seen.add(p); pool.append(p)
for p in base_pool: add(p)
if len(pool) < max_out:
for lst in (prepicked, chroma_paths, meili_paths, sym_paths, laravel_picks):
for p in lst:
add(p)
else:
# oude (jouw huidige) manier zonder RRF
pool, seen = [], set()
def add(p):
if p not in seen and p in all_files:
seen.add(p); pool.append(p)
for lst in (prepicked, chroma_paths, meili_paths, sym_paths, laravel_picks):
for p in lst:
add(p)
# LLM-rerank blijft identiek:
cands = [{"path": p, "preview": get_file_preview(repo_root, p, quotes+hints)} for p in pool[:20]]
ranked = await llm_rerank_candidates(user_goal, cands, topk=max_out)
# symbol-boost (licht) ná LLM-rerank (ongewijzigd)
sym_map = {p: sc for p, sc in sym_hits}
boost = float(os.getenv("SYMBOL_LIGHT_BOOST", "0.15"))
rescored = []
for c in ranked:
base = float(c.get("score", 0.0))
s = sym_map.get(c["path"], 0)
adj = base + (boost if s > 0 else 0.0)
rescored.append({**c, "score": adj})
rescored.sort(key=lambda x: x["score"], reverse=True)
return [c["path"] for c in rescored[:max_out]]
# ---------- Focus-snippets ----------
def extract_focus_snippets(text: str, needles: List[str], window: int = 240, max_snippets: int = 3) -> str:
if not text or not needles: return (text[:window*2] if text else "")
lo = text.lower()
hits = []
for n in needles:
nlo = (n or "").lower()
if not nlo: continue
start = 0
for _ in range(4):
idx = lo.find(nlo, start)
if idx < 0: break
a = max(0, idx - window)
b = min(len(text), idx + len(nlo) + window)
hits.append(text[a:b]); start = idx + len(nlo)
uniq = []
for h in hits:
# de-dupe met wederzijdse containment (voorkom overlap/ingebed)
if all((h not in u) and (u not in h) for u in uniq):
uniq.append(h)
if len(uniq) >= max_snippets: break
return "\n----- CONTEXT SPLIT -----\n".join(uniq) if uniq else text[:window*2]
# ---------- LLM edit-plan ----------
async def llm_plan_edits_for_file(user_goal: str, rel: str, focus_snippet: str) -> dict | None:
SYSTEM = "Produceer uitsluitend geldige JSON; geen verdere uitleg. Minimaliseer edits; raak zo min mogelijk regels."
# (optioneel) korte tree-hint in de prompt zet AGENT_TREE_PROMPT=1 om te activeren
# Tree-hint standaard aan: korte mapoverzicht + samenvattingen van nabije files
tree_block = globals().get("_LLM_EDIT_TREE_HINT", "")
tree_hint = os.getenv("AGENT_TREE_PROMPT","1").lower() not in ("0","false")
try:
if tree_hint:
# NB: eenvoudige, lokale context: alleen siblings + map info om tokens te sparen
# (Vereist repo_root hier normaal gesproken; als niet beschikbaar, laat leeg)
2025-11-27 07:54:07 +00:00
if not tree_block:
tree_block = "\n(Tree-overzicht niet beschikbaar in deze context)\n"
2025-11-06 13:42:26 +00:00
except Exception:
pass
USER = (
"Doel:\n" + user_goal + "\n\n" +
f"Bestand: {rel}\n" +
"Relevante contextfragmenten:\n----- BEGIN SNIPPETS -----\n" +
focus_snippet + "\n----- EIND SNIPPETS -----\n\n" +
("Korte tree-hint:\n" + tree_block + "\n") +
"JSON schema:\n" +
"{ \"allow_destructive\": false, \"edits\": [\n" +
" {\"type\":\"regex_replace\",\"pattern\":\"...\",\"replacement\":\"...\",\"flags\":\"ims\",\"count\":1,\"explain\":\"...\"},\n" +
" {\"type\":\"string_replace\",\"find\":\"...\",\"replace\":\"...\",\"count\":1,\"explain\":\"...\"},\n" +
" {\"type\":\"insert_after\",\"anchor_regex\":\"...\",\"text\":\"...\",\"occur\":\"first|last\",\"flags\":\"ims\",\"explain\":\"...\"},\n" +
" {\"type\":\"insert_before\",\"anchor_regex\":\"...\",\"text\":\"...\",\"occur\":\"first|last\",\"flags\":\"ims\",\"explain\":\"...\"},\n" +
" {\"type\":\"replace_between_anchors\",\"start_regex\":\"...\",\"end_regex\":\"...\",\"replacement\":\"...\",\"flags\":\"ims\",\"explain\":\"...\"},\n" +
" {\"type\":\"delete_between_anchors\",\"start_regex\":\"...\",\"end_regex\":\"...\",\"keep_anchors\":false,\"flags\":\"ims\",\"explain\":\"...\"},\n" +
" {\"type\":\"conditional_insert\",\"absent_regex\":\"...\",\"anchor_regex\":\"...\",\"text\":\"...\",\"occur\":\"first|last\",\"flags\":\"ims\",\"explain\":\"...\"},\n" +
" {\"type\":\"insert_at_top\",\"text\":\"...\",\"explain\":\"...\"},\n" +
" {\"type\":\"insert_at_bottom\",\"text\":\"...\",\"explain\":\"...\"}\n" +
"]}\n" +
"Maximaal 4 edits. Geef bij elke edit een korte 'explain'."
)
try:
resp = await _llm_call(
[{"role":"system","content":SYSTEM},{"role":"user","content":USER}],
stream=False, temperature=0.1, top_p=0.9, max_tokens=800
)
raw = resp.get("choices",[{}])[0].get("message",{}).get("content","").strip()
plan = safe_json_loads(raw)
if isinstance(plan, dict) and isinstance(plan.get("edits"), list):
return plan
return None
except Exception as e:
logger.warning("WARN:agent_repo:llm_plan_edits_for_file failed for %s: %s", rel, e)
return None
# ---------- Apply helpers ----------
def _regex_flags(flag_str: str) -> int:
flags = 0
if not flag_str: return flags
for ch in flag_str.lower():
if ch == 'i': flags |= re.IGNORECASE
if ch == 'm': flags |= re.MULTILINE
if ch == 's': flags |= re.DOTALL
return flags
def apply_edit_plan(original: str, plan: dict) -> tuple[str, int, List[str], bool]:
"""
Returns: (modified, changes_count, explains[], allow_destructive)
"""
if not original or not plan or not isinstance(plan.get("edits"), list):
return original, 0, [], False
txt = original
changes = 0
explains: List[str] = []
for ed in plan["edits"]:
try:
et = (ed.get("type") or "").lower()
ex = ed.get("explain") or et
if et == "string_replace":
find = ed.get("find") or ""; rep = ed.get("replace") or ""
cnt = int(ed.get("count") or 0) or 1
if find:
new = txt.replace(find, rep, cnt)
if new != txt: changes += 1; txt = new; explains.append(f"string_replace: {ex}")
elif et == "regex_replace":
pat = ed.get("pattern") or ""; rep = ed.get("replacement") or ""
flags = _regex_flags(ed.get("flags") or ""); cnt = int(ed.get("count") or 0) or 1
if pat:
new, n = re.subn(pat, rep, txt, count=cnt, flags=flags)
if n > 0: changes += 1; txt = new; explains.append(f"regex_replace: {ex}")
elif et in ("insert_after","insert_before"):
anchor = ed.get("anchor_regex") or ""; ins = ed.get("text") or ""
occur = (ed.get("occur") or "first").lower(); flags = _regex_flags(ed.get("flags") or "")
if not anchor or not ins: continue
matches = list(re.finditer(anchor, txt, flags))
if not matches: continue
m = matches[0] if occur != "last" else matches[-1]
pos = m.end() if et == "insert_after" else m.start()
# idempotentie: voeg niet opnieuw in als de tekst al vlakbij staat
win_a, win_b = max(0, pos-200), min(len(txt), pos+200)
if ins in txt[win_a:win_b]:
continue
txt = txt[:pos] + ins + txt[pos:]; changes += 1; explains.append(f"{et}: {ex}")
elif et in ("replace_between_anchors","delete_between_anchors"):
srx = ed.get("start_regex") or ""; erx = ed.get("end_regex") or ""
flags = _regex_flags(ed.get("flags") or ""); keep_anchors = bool(ed.get("keep_anchors")) if et == "delete_between_anchors" else True
repl = ed.get("replacement") or ""
if not srx or not erx: continue
s_matches = list(re.finditer(srx, txt, flags))
e_matches = list(re.finditer(erx, txt, flags))
if not s_matches or not e_matches: continue
s0 = s_matches[0]
# Kies de eerste end-anker ná het start-anker
e0 = next((em for em in e_matches if em.start() >= s0.end()), None)
if not e0: continue
a = s0.end(); b = e0.start()
if et == "replace_between_anchors":
txt = txt[:a] + repl + txt[b:]; changes += 1; explains.append(f"replace_between_anchors: {ex}")
else:
if keep_anchors: txt = txt[:a] + txt[b:]
else: txt = txt[:s0.start()] + txt[e0.end():]
changes += 1; explains.append(f"delete_between_anchors: {ex}")
elif et == "conditional_insert":
absent = ed.get("absent_regex") or ""; anchor = ed.get("anchor_regex") or ""
occur = (ed.get("occur") or "first").lower(); ins = ed.get("text") or ""
flags = _regex_flags(ed.get("flags") or "")
if not anchor or not ins: continue
if absent and re.search(absent, txt, flags): continue
matches = list(re.finditer(anchor, txt, flags))
if not matches: continue
m = matches[0] if occur != "last" else matches[-1]
pos = m.end()
# idempotentie: lokale window-check
win_a, win_b = max(0, pos-200), min(len(txt), pos+200)
if ins in txt[win_a:win_b]:
continue
txt = txt[:pos] + ins + txt[pos:]; changes += 1; explains.append(f"conditional_insert: {ex}")
elif et == "insert_at_top":
ins = ed.get("text") or ""
if ins: txt = ins + txt; changes += 1; explains.append(f"insert_at_top: {ex}")
elif et == "insert_at_bottom":
ins = ed.get("text") or ""
if ins: txt = txt + ins; changes += 1; explains.append(f"insert_at_bottom: {ex}")
except Exception as e:
logger.warning("WARN:agent_repo:apply_edit_plan step failed: %s", e)
continue
allow_destructive = bool(plan.get("allow_destructive"))
return txt, changes, explains, allow_destructive
# ==== BEGIN PATCH A: destructiviteit op diff-basis + drempel via env ====
# Veilige default voor AGENT_DESTRUCTIVE_RATIO (voorkom NameError als niet gedefinieerd)
try:
AGENT_DESTRUCTIVE_RATIO
except NameError:
AGENT_DESTRUCTIVE_RATIO = float(os.getenv("AGENT_DESTRUCTIVE_RATIO", "0.45"))
def _deletion_ratio(original: str, modified: str) -> float:
"""Schat welk deel van de originele regels als deletions wegvalt."""
ol = original.splitlines()
ml = modified.splitlines()
if not ol:
return 0.0
# ndiff: regels met prefix '- ' tellen we als deletions
dels = 0
for line in difflib.ndiff(ol, ml):
if line.startswith("- "):
dels += 1
return dels / max(1, len(ol))
def is_destructive(original: str, modified: str, allow_destructive: bool) -> bool:
"""Blokkeer alleen als er aantoonbaar veel deletions zijn."""
if allow_destructive:
return False
# heel kleine files: laat door, we willen niet te streng zijn
if len(original.splitlines()) < 6:
return False
ratio = _deletion_ratio(original, modified)
return ratio > AGENT_DESTRUCTIVE_RATIO
# ==== END PATCH A ====
def list_sibling_files(repo_root: Path, rel: str, limit: int = 12) -> List[str]:
d = (repo_root / rel).parent
if not d.exists():
# directory kan nog niet bestaan; kies dichtstbijzijnde bestaande ouder
d = repo_root / os.path.dirname(rel)
while not d.exists() and d != repo_root:
d = d.parent
outs = []
if d.exists():
for p in d.iterdir():
if p.is_file() and allowed_file(p) and p.stat().st_size < 500_000:
outs.append(str(p.name))
# stabiele output i.p.v. FS-volgorde
outs.sort(key=str.lower)
return outs[:limit]
def read_snippet(p: Path, max_chars: int = 2000) -> str:
try:
t = _read_text_file(p) or ""
return t[:max_chars]
except Exception:
return ""
async def propose_new_file(repo_root: Path, rel: str, user_goal: str) -> tuple[Optional[str], str]:
"""
Vraag de LLM om een *volledig nieuwe file* te genereren op pad `rel`
met minimale aannames. Geeft (content, reason).
"""
ext = os.path.splitext(rel)[1].lower()
siblings = list_sibling_files(repo_root, rel)
sibling_snippets = []
for name in siblings[:3]:
snippet = read_snippet(repo_root / os.path.join(os.path.dirname(rel), name), max_chars=1600)
if snippet:
sibling_snippets.append({"name": name, "snippet": snippet[:1600]})
SYSTEM = "Je bent een zorgvuldige codegenerator. Lever exact één compleet bestand. Geen extra refactors."
USER = (
f"Doel (nieuwe file aanmaken):\n{user_goal}\n\n"
f"Bestandspad: {rel}\n"
f"Directory siblings: {', '.join(siblings) if siblings else '(geen)'}\n\n"
"Enkele nabije referenties (indien aanwezig):\n" +
"\n".join([f"--- {s['name']} ---\n{s['snippet']}" for s in sibling_snippets]) +
"\n\nEisen:\n"
"- Maak een minimal-werkende versie van dit bestand die past bij de context hierboven.\n"
"- Raak geen andere paden aan; geen includes naar niet-bestaande bestanden.\n"
"- Gebruik hetzelfde framework/stack als de referenties suggereren (indien duidelijk).\n"
"- Output: alleen de VOLLEDIGE bestandinformatie in één codeblok, niets anders."
)
try:
resp = await _llm_call(
[{"role":"system","content":SYSTEM},{"role":"user","content":USER}],
stream=False, temperature=0.2, top_p=0.9, max_tokens=2048
)
content = _extract_code_block(
resp.get("choices",[{}])[0].get("message",{}).get("content","")
) or ""
content = content.strip()
if not content:
return None, "LLM gaf geen inhoud terug."
# simpele sanity-limit
if len(content) > 200_000:
content = content[:200_000]
return content, "Nieuw bestand voorgesteld op basis van directory-context en doel."
except Exception as e:
logger.warning("WARN:agent_repo:propose_new_file failed for %s: %s", rel, e)
return None, f"Kon geen nieuwe file genereren: {e}"
# ---------- Diff helper ----------
def make_diffs(original: str, modified: str, filename: str, max_lines: int = 200) -> str:
diff = list(difflib.unified_diff(
original.splitlines(keepends=True),
modified.splitlines(keepends=True),
fromfile=f"a/{filename}",
tofile=f"b/{filename}",
lineterm=""
))
if len(diff) > max_lines:
return "".join(diff[:max_lines]) + "\n... (diff ingekort)"
return "".join(diff)
def make_new_file_diff(filename: str, content: str, max_lines: int = 400) -> str:
new_lines = content.splitlines(keepends=True)
diff = list(difflib.unified_diff(
[], new_lines,
fromfile="/dev/null",
tofile=f"b/{filename}",
lineterm=""
))
if len(diff) > max_lines:
return "".join(diff[:max_lines]) + "\n... (diff ingekort)"
return "".join(diff)
# ---------- Lightweight Laravel Graph helpers ----------
def _view_name_to_path(repo_root: Path, view_name: str) -> Optional[str]:
"""
'users.index' -> resources/views/users/index.blade.php (als bestaand)
'users/index' -> idem. Return relatieve path of None als niet gevonden.
"""
if not view_name:
return None
cand = view_name.replace(".", "/").strip("/ ")
for ext in [".blade.php", ".php"]:
rel = f"resources/views/{cand}{ext}"
if (repo_root / rel).exists():
return rel
return None
def _controller_extract_views(text: str, repo_root: Path) -> list[str]:
"""
Zoek 'return view("x.y")' en map naar blade-bestanden.
Ondersteunt ook: View::make('x.y'), Inertia::render('X/Y') -> best effort naar blade.
"""
outs: list[str] = []
# view('foo.bar')
for m in re.finditer(r"(?:return\s+)?view\s*\(\s*['\"]([^'\"]+)['\"]", text, flags=re.I):
rel = _view_name_to_path(repo_root, m.group(1))
if rel:
outs.append(rel)
# View::make('foo.bar')
for m in re.finditer(r"View::make\s*\(\s*['\"]([^'\"]+)['\"]", text, flags=re.I):
rel = _view_name_to_path(repo_root, m.group(1))
if rel:
outs.append(rel)
# Inertia::render('Foo/Bar') -> probeer view pad heuristisch
for m in re.finditer(r"Inertia::render\s*\(\s*['\"]([^'\"]+)['\"]", text, flags=re.I):
rel = _view_name_to_path(repo_root, m.group(1))
if rel:
outs.append(rel)
# dedupe
seen=set(); uniq=[]
for r in outs:
if r not in seen:
uniq.append(r); seen.add(r)
return uniq
def _blade_extract_lang_keys(text: str) -> list[str]:
"""
Haal vertaalkeys uit Blade/PHP: __('x.y'), @lang('x.y'), trans('x.y')
"""
keys = []
for rx in [
r"__\(\s*['\"]([^'\"]+)['\"]\s*\)",
r"@lang\(\s*['\"]([^'\"]+)['\"]\s*\)",
r"trans\(\s*['\"]([^'\"]+)['\"]\s*\)"
]:
for m in re.finditer(rx, text):
keys.append(m.group(1))
# dedupe
seen=set(); out=[]
for k in keys:
if k not in seen:
out.append(k); seen.add(k)
return out
def _grep_lang_files_for_key(repo_root: Path, key: str, limit: int = 6) -> list[str]:
"""
Zoek in resources/lang/**/*.(json|php) naar KEY. Best-effort, klein limiet.
"""
base = repo_root / "resources/lang"
if not base.exists():
return []
hits=[]
try:
for p in base.rglob("*"):
if p.is_dir():
continue
if not (str(p).endswith(".json") or str(p).endswith(".php")):
continue
if p.stat().st_size > 300_000:
continue
txt = p.read_text(encoding="utf-8", errors="ignore")
if key in txt:
hits.append(str(p.relative_to(repo_root)))
if len(hits) >= limit:
break
except Exception:
pass
return hits
def _build_laravel_graph(repo_root: Path) -> dict[str, set[str]]:
"""
Maak een lichte ongerichte graaf:
- routes/web.php|api.php controller-bestanden
- controller views (via return view(...))
- view lang-bestanden (voor keys die in de view voorkomen)
Node-labels = relatieve padnamen; edges zijn ongericht (buren).
"""
g: dict[str, set[str]] = {}
def _add(a: str, b: str):
g.setdefault(a, set()).add(b)
g.setdefault(b, set()).add(a)
# 1) routes → controllers (reeds beschikbare scanner hergebruiken)
routes = laravel_scan_routes(repo_root)
for r in routes:
rp = r.get("file") or ""
ctrl = r.get("controller") or ""
if not ctrl:
continue
for cpath in _candidate_paths_for_controller(repo_root, ctrl):
_add(rp, cpath)
# 2) controllers → views (parse controller file)
try:
txt = _read_text_file(repo_root / cpath) or ""
except Exception:
txt = ""
for vrel in _controller_extract_views(txt, repo_root):
_add(cpath, vrel)
# 3) views → lang-files (op basis van keys)
try:
vtxt = _read_text_file(repo_root / vrel) or ""
except Exception:
vtxt = ""
for key in _blade_extract_lang_keys(vtxt):
for lrel in _grep_lang_files_for_key(repo_root, key, limit=4):
_add(vrel, lrel)
return g
def _graph_bfs_boosts(graph: dict[str, set[str]], seeds: list[str], max_depth: int = 3) -> dict[str, tuple[int, str]]:
"""
BFS vanaf seed-nodes. Return: {node: (distance, via)} met via=eerste buur of route.
"""
from collections import deque
dist: dict[str, int] = {}
via: dict[str, str] = {}
q = deque()
for s in seeds:
if s in graph:
dist[s] = 0
via[s] = s
q.append(s)
while q:
cur = q.popleft()
if dist[cur] >= max_depth:
continue
for nb in graph.get(cur, ()):
if nb not in dist:
dist[nb] = dist[cur] + 1
via[nb] = cur if via.get(cur) == cur else via.get(cur, cur)
q.append(nb)
return {n: (d, via.get(n, "")) for n, d in dist.items()}
def _get_graph_cached(repo_root: Path, memo_key: str) -> dict[str, set[str]]:
if os.getenv("AGENT_GRAPH_ENABLE", "1").lower() in ("0", "false"):
return {}
g = _GRAPH_CACHE.get(memo_key)
if g is not None:
return g
try:
g = _build_laravel_graph(repo_root)
except Exception:
g = {}
_GRAPH_CACHE[memo_key] = g
return g
# ---------- Tree summaries (korte per-file beschrijving) ----------
def _summarize_file_for_tree(path: Path) -> str:
"""
Heuristische mini-samenvatting (<=160 chars):
- eerste docblock / commentregel / heading
- anders eerste niet-lege regel
"""
try:
txt = path.read_text(encoding="utf-8", errors="ignore")
except Exception:
return ""
head = txt[:1200]
# PHP docblock
m = re.search(r"/\*\*([\s\S]{0,400}?)\*/", head)
if m:
s = re.sub(r"[*\s]+", " ", m.group(1)).strip()
return (s[:160])
# single-line comments / headings
for rx in [r"^\s*//\s*(.+)$", r"^\s*#\s*(.+)$", r"^\s*<!--\s*(.+?)\s*-->", r"^\s*<h1[^>]*>([^<]+)</h1>", r"^\s*<title[^>]*>([^<]+)</title>"]:
mm = re.search(rx, head, flags=re.M|re.I)
if mm:
return mm.group(1).strip()[:160]
# first non-empty line
for line in head.splitlines():
ln = line.strip()
if ln:
return ln[:160]
return ""
def _build_tree_summaries(repo_root: Path, all_files: list[str], max_files: int = 2000) -> dict[str, str]:
out: dict[str, str] = {}
count = 0
for rel in all_files:
if count >= max_files:
break
p = repo_root / rel
try:
if p.stat().st_size > 200_000:
continue
except Exception:
continue
s = _summarize_file_for_tree(p)
if s:
out[rel] = s
count += 1
return out
def _get_tree_cached(repo_root: Path, memo_key: str, all_files: list[str]) -> dict[str, str]:
if os.getenv("AGENT_TREE_ENABLE", "1").lower() in ("0","false"):
return {}
t = _TREE_SUM_CACHE.get(memo_key)
if t is not None:
return t
try:
t = _build_tree_summaries(repo_root, all_files)
except Exception:
t = {}
_TREE_SUM_CACHE[memo_key] = t
return t
# ---------- Mini tree-hint voor LLM edit-plannen ----------
def _make_local_tree_hint(repo_root: Path, rel: str, max_siblings: int = 14) -> str:
"""
Bouw een compact overzicht van de map van 'rel' met 1014 nabije files en korte samenvattingen.
Houd het kort en voorspelbaar voor de LLM.
"""
try:
base_dir = (repo_root / rel).parent
except Exception:
return ""
lines = []
try:
folder = str(base_dir.relative_to(repo_root))
except Exception:
folder = base_dir.name
lines.append(f"Map: {folder or '.'}")
items = []
try:
for p in sorted(base_dir.iterdir(), key=lambda x: x.name.lower()):
if not p.is_file():
continue
try:
if not allowed_file(p) or p.stat().st_size > 200_000:
continue
except Exception:
continue
summ = _summarize_file_for_tree(p)
name = p.name
if summ:
items.append(f"- {name}: {summ[:120]}")
else:
items.append(f"- {name}")
if len(items) >= max_siblings:
break
except Exception:
pass
lines.extend(items)
return "\n".join(lines)
# ---------- Basic syntax guards ----------
def _write_tmp(content: str, suffix: str) -> Path:
import tempfile
fd, path = tempfile.mkstemp(suffix=suffix)
os.close(fd)
p = Path(path)
p.write_text(content, encoding="utf-8")
return p
def _php_lint_ok(tmp_path: Path) -> bool:
# disable via AGENT_SYNTAX_GUARD=0
if os.getenv("AGENT_SYNTAX_GUARD","1").lower() in ("0","false"):
return True
try:
import subprocess
res = subprocess.run(["php","-l",str(tmp_path)], capture_output=True, text=True, timeout=8)
return res.returncode == 0
except Exception:
return True
def _blade_balance_ok(text: str) -> bool:
# Zeer conservatieve balans-check voor veelvoorkomende Blade directives
tl = (text or "").lower()
pairs = [("section","endsection"),("if","endif"),("foreach","endforeach"),("isset","endisset"),("php","endphp")]
for a,b in pairs:
if tl.count("@"+a) != tl.count("@"+b):
return False
return True
# ---------- Gerichte, veilige literal fallback ----------
# === PATCH: generieke HTML-scope vervanging ===
def html_scoped_literal_replace(html: str, old: str, new: str, tag_names: set[str]) -> tuple[str, bool, str]:
"""
Probeer 'old' -> 'new' te vervangen, maar ALLEEN binnen de genoemde tags.
Werkt zonder externe libs; gebruikt conservatieve regex (DOTALL).
Retour: (modified, changed, rationale)
"""
if not html or not old or not tag_names:
return html, False, ""
changed = False
rationale = []
result = html
for tag in sorted(tag_names):
# <tag ...> ... </tag> (greedy genoeg per blok, maar beperkt via DOTALL)
tag_re = re.compile(rf"(<\s*{re.escape(tag)}\b[^>]*>)(.*?)(</\s*{re.escape(tag)}\s*>)",
flags=re.IGNORECASE | re.DOTALL)
def _one(m):
nonlocal changed
open_tag, inner, close_tag = m.group(1), m.group(2), m.group(3)
if old in inner:
# maximaal 1 vervanging per tag-blok (conform docstring)
new_inner = inner.replace(old, new, 1)
if new_inner != inner:
changed = True
rationale.append(f"'{old}' vervangen binnen <{tag}> (1x)")
return open_tag + new_inner + close_tag
return m.group(0)
result_new = tag_re.sub(_one, result)
result = result_new
return result, changed, "; ".join(rationale) if changed else ""
# === PATCH: veilige, algemene string-literal vervanging ===
def quoted_literal_replace(original: str, old: str, new: str, max_occurrences: int = 2) -> tuple[str, bool, str]:
"""
Vervang 'old' of "old" als string-literal, maximaal 'max_occurrences' keer.
Dit is taalagnostisch en wijzigt geen identifiers, enkel stringwaarden.
Return: (modified, changed, rationale)
"""
if not original or not old:
return original, False, ""
pat = re.compile(rf"(?P<q>['\"])({re.escape(old)})(?P=q)")
cnt = 0
def _repl(m):
nonlocal cnt
if cnt >= max_occurrences:
return m.group(0)
cnt += 1
q = m.group("q")
return q + new + q
new_text = pat.sub(_repl, original)
if new_text != original and cnt > 0:
return new_text, True, f"'{old}''{new}' als string-literal ({cnt}x, limiet {max_occurrences})"
return original, False, ""
# ==== BEGIN PATCH B: per-bestand oud/nieuw bepalen + generieke fallback ====
def _literal_matches_with_context(src: str, needle: str, window: int = 160):
"""Vind alle posities waar 'needle' als literal voorkomt en geef de operator-context terug."""
escaped = re.escape(needle)
pat = re.compile(r"(?P<q>['\"])(" + escaped + r")(?P=q)")
for m in pat.finditer(src):
a, b = m.span()
before = src[max(0, a - window):a]
op = None
if re.search(r"\?\?\s*$", before):
op = "??"
elif re.search(r"\?\s*[^:\n]{0,120}:\s*$", before):
op = "?:"
elif re.search(r"\|\|\s*$", before):
op = "||"
elif re.search(r"\bor\b\s*$", before, flags=re.IGNORECASE):
op = "or"
yield (a, b, op)
def deduce_old_new_literals(user_goal: str, original: str) -> tuple[Optional[str], Optional[str], str]:
"""
Kies 'old' als de quoted string uit de prompt die ook in de file staat
én het vaakst in fallback-context (??, ?:, ||, or) voorkomt.
Kies 'new' als een andere quoted string uit de prompt (liefst die níet in de file voorkomt).
Retourneer (old, new, rationale).
"""
quotes = extract_quotes(user_goal)
if not quotes:
return None, None, "Geen quoted strings in prompt gevonden."
# Score candidates for OLD
scores = []
for q in quotes:
hits = list(_literal_matches_with_context(original, q))
if hits:
# gewicht: aantal hits + bonus als er operator context is
ctx_hits = sum(1 for _,_,op in hits if op)
score = 2 * ctx_hits + len(hits)
scores.append((q, score, ctx_hits))
if not scores:
# Geen van de quotes komt in de file voor; dan geen gerichte fallback
return None, None, "Geen van de quotes uit prompt kwam in de file voor."
scores.sort(key=lambda x: (x[1], x[2]), reverse=True)
old = scores[0][0]
# Kies NEW uit overige quotes: bij voorkeur eentje die niet in de file voorkomt
rest = [q for q in quotes if q != old]
if not rest:
return old, None, f"OLD='{old}' gekozen; geen 'new' gevonden."
prefer = [q for q in rest if q not in original]
new = (prefer[0] if prefer else rest[0])
why = f"OLD='{old}' (meeste fallback-contexthits), NEW='{new}'."
return old, new, why
def targeted_fallback_replace(original: str, old: str, new: str) -> tuple[str, bool, str]:
"""
Vervang uitsluitend de literal OLD als die duidelijk fallback is nabij ??, ?:, || of 'or'.
Retourneer (modified, changed_bool, rationale).
"""
if not original or not old:
return original, False, ""
window = 160
escaped_old = re.escape(old)
pat = re.compile(r"(?P<q>['\"])(" + escaped_old + r")(?P=q)")
text = original
for m in pat.finditer(text):
q = m.group("q")
a, b = m.span()
before = text[max(0, a - window):a]
op = None
if re.search(r"\?\?\s*$", before):
op = "??"
elif re.search(r"\?\s*[^:\n]{0,120}:\s*$", before):
op = "?:"
elif re.search(r"\|\|\s*$", before):
op = "||"
elif re.search(r"\bor\b\s*$", before, flags=re.IGNORECASE):
op = "or"
if not op:
continue
new_text = text[:a] + q + new + q + text[b:]
reason = f"Gerichte vervanging van fallback-literal nabij operator '{op}'"
return new_text, True, reason
return original, False, ""
# ==== END PATCH B ====
# === Repo-QA: vraag-antwoord over 1 specifieke repository ===
_LARAVEL_CREATE_HINTS = {
"verbs": ["create", "store", "new", "aanmaken", "aanmaak", "nieuw", "toevoegen", "add"],
"nouns": ["melding", "incident", "ticket", "aanvraag", "report", "issue", "storingen", "storing"]
}
def _read_file_safe(p: Path) -> str:
try:
return _read_text_file(p) or ""
except Exception:
return ""
def laravel_scan_routes(repo_root: Path) -> list[dict]:
out = []
for rp in ["routes/web.php", "routes/api.php"]:
p = repo_root / rp
if not p.exists():
continue
txt = _read_file_safe(p)
2025-11-20 15:16:00 +00:00
for m in re.finditer(r"Route::(get|post|put|patch|delete|match)\s*\(\s*['\"]([^'\"]+)['\"]\s*,\s*([^)]+)\)", txt, flags=re.I):
2025-11-06 13:42:26 +00:00
verb, uri, target = m.group(1).lower(), m.group(2), m.group(3)
ctrl = None; method = None; name = None
# controller@method
m2 = re.search(r"['\"]([A-Za-z0-9_\\]+)@([A-Za-z0-9_]+)['\"]", target)
if m2:
ctrl, method = m2.group(1), m2.group(2)
else:
# ['Foo\\BarController::class', 'index'] of [Foo\\BarController::class, 'index']
m2b = re.search(r"\[\s*([A-Za-z0-9_\\]+)::class\s*,\s*['\"]([A-Za-z0-9_]+)['\"]\s*\]", target)
if m2b:
ctrl, method = m2b.group(1), m2b.group(2)
# ->name('...')
tail = txt[m.end(): m.end()+140]
m3 = re.search(r"->\s*name\s*\(\s*['\"]([^'\"]+)['\"]\s*\)", tail)
if m3: name = m3.group(1)
out.append({"file": rp, "verb": verb, "uri": uri, "target": target, "controller": ctrl, "method": method, "name": name})
# Route::resource
for m in re.finditer(r"Route::resource\s*\(\s*['\"]([^'\"]+)['\"]\s*,\s*['\"]([^'\"]+)['\"]\s*\)", txt, flags=re.I):
res, ctrl = m.group(1), m.group(2)
out.append({"file": rp, "verb": "resource", "uri": res, "target": ctrl, "controller": ctrl, "method": None, "name": None})
return out
def _candidate_paths_for_controller(repo_root: Path, controller_fqcn: str) -> list[str]:
"""
Probeer Controller-bestand + views te vinden vanuit FQCN zoals App\\Http\\Controllers\\Foo\\BarController.
"""
rels = []
# controller pad
base = controller_fqcn.replace("\\\\","/").replace("\\","/")
name = base.split("/")[-1]
ctrl_guess = [
f"app/Http/Controllers/{base}.php",
f"app/Http/Controllers/{name}.php"
]
for g in ctrl_guess:
if (repo_root / g).exists():
rels.append(g)
# view dir guesses (resource-achtig)
view_roots = ["resources/views", "resources/views/livewire", "resources/views/components"]
stem = re.sub(r"Controller$", "", name, flags=re.I)
for vr in view_roots:
for hint in [stem, stem.lower()]:
dp = repo_root / f"{vr}/{hint}"
if dp.exists() and dp.is_dir():
for bp in dp.rglob("*.blade.php"):
if bp.stat().st_size < 500000:
rels.append(str(bp.relative_to(repo_root)))
return list(dict.fromkeys(rels))[:8]
def laravel_signal_candidates(repo_root: Path, user_goal: str, all_files: list[str], max_out: int = 6) -> list[str]:
"""
Heuristische preselectie voor Laravel 'aanmaken/nieuw' use-cases:
- zoekt in routes naar 'create|store' of semantic hints
- projecteert naar controllers + blade views
"""
# snelle exit als er geen laravel markers zijn
if not (repo_root / "artisan").exists() and not (repo_root / "composer.json").exists():
return []
goal = (user_goal or "").lower()
verbs = _LARAVEL_CREATE_HINTS["verbs"]
nouns = _LARAVEL_CREATE_HINTS["nouns"]
def _goal_hits(s: str) -> int:
lo = s.lower()
v = sum(1 for w in verbs if w in lo)
n = sum(1 for w in nouns if w in lo)
return v*2 + n # verbs wegen iets zwaarder
routes = laravel_scan_routes(repo_root)
scored = []
for r in routes:
base_s = f"{r.get('uri','')} {r.get('name','')} {r.get('controller','') or ''} {r.get('method','') or ''}"
score = _goal_hits(base_s)
# bonus als expliciet create/store
if (r.get("method") or "").lower() in ("create","store"):
score += 3
if r.get("verb") == "resource":
# resource → heeft impliciet create/store routes
score += 2
if score > 0:
scored.append((score, r))
if not scored:
return []
scored.sort(key=lambda x: x[0], reverse=True)
picks: list[str] = []
for _score, r in scored[:8]:
# controller + vermoedelijke views
if r.get("controller"):
for rel in _candidate_paths_for_controller(repo_root, r["controller"]):
if rel in all_files and rel not in picks:
picks.append(rel)
# view guess als padnaam “melding*create.blade.php”
for rel in all_files:
name = os.path.basename(rel).lower()
dirname = os.path.dirname(rel).lower()
if any(n in dirname for n in nouns) and ("create" in name or "form" in name):
if rel not in picks:
picks.append(rel)
if len(picks) >= max_out:
break
return picks[:max_out]
def _detect_stack_summary(repo_root: Path) -> dict:
"""Heuristieken: taal/vermoed framework, routes/migraties/DB hints."""
summary = {
"languages": {},
"framework": [],
"entrypoints": [],
"routes": [],
"db": [],
"notable_dirs": [],
}
# talen tellen (globaal)
ext_map = {}
for rel in list_repo_files(repo_root):
ext = os.path.splitext(rel)[1].lower()
ext_map[ext] = ext_map.get(ext, 0) + 1
summary["languages"] = dict(sorted(ext_map.items(), key=lambda x: x[1], reverse=True)[:8])
# PHP/Laravel hints
comp = repo_root / "composer.json"
if comp.exists():
try:
import json as _json
js = _json.loads(comp.read_text(encoding="utf-8", errors="ignore"))
req = (js.get("require") or {}) | (js.get("require-dev") or {})
if any("laravel/framework" in k for k in req.keys()):
summary["framework"].append("Laravel")
except Exception:
pass
if (repo_root / "artisan").exists():
summary["entrypoints"].append("artisan (Laravel CLI)")
# Node hints
pkg = repo_root / "package.json"
if pkg.exists():
try:
import json as _json
js = _json.loads(pkg.read_text(encoding="utf-8", errors="ignore"))
deps = list((js.get("dependencies") or {}).keys()) + list((js.get("devDependencies") or {}).keys())
if any(x in deps for x in ["next", "nuxt", "react", "vue", "vite"]):
summary["framework"].append("Node/Frontend")
except Exception:
pass
# Routes (Laravel)
for rp in ["routes/web.php", "routes/api.php"]:
p = repo_root / rp
if p.exists():
txt = _read_text_file(p) or ""
for m in re.finditer(r"Route::(get|post|put|patch|delete)\s*\(\s*['\"]([^'\"]+)['\"]", txt):
summary["routes"].append(f"{rp}: {m.group(1).upper()} {m.group(2)}")
# DB hints (Laravel/vanilla PHP)
for rp in ["config/database.php", ".env", ".env.example", "app/config/database.php"]:
p = repo_root / rp
if p.exists():
txt = _read_text_file(p) or ""
if "DB_" in txt or "mysql" in txt or "sqlite" in txt or "pgsql" in txt:
snippet = txt[:800].replace("\r"," ")
summary["db"].append(f"{rp}: {snippet}")
# Notable dirs
for d in ["app", "app/admin", "app/public", "public", "resources", "storage", "config", "routes", "src", "docs", "tests"]:
if (repo_root / d).exists():
summary["notable_dirs"].append(d)
return summary
def _format_stack_summary_text(s: dict) -> str:
lines = []
if s.get("framework"):
lines.append("Frameworks (heuristiek): " + ", ".join(sorted(set(s["framework"]))))
if s.get("languages"):
langs = ", ".join([f"{k or ''}×{v}" for k,v in s["languages"].items()])
lines.append("Talen (bestandext): " + langs)
if s.get("notable_dirs"):
lines.append("Mappen: " + ", ".join(s["notable_dirs"]))
if s.get("entrypoints"):
lines.append("Entrypoints: " + ", ".join(s["entrypoints"]))
if s.get("routes"):
sample = "; ".join(s["routes"][:8])
lines.append("Routes (sample): " + sample)
if s.get("db"):
# toon alleen paden, geen volledige secrets
lines.append("DB-config aanwezig in: " + ", ".join([d.split(":")[0] for d in s["db"]]))
return "\n".join(lines)
def _collect_repo_context(repo_root: Path, owner_repo: Optional[str], branch: str, question: str, n_ctx: int = 8) -> list[dict]:
"""Kies relevante paden + snippets via hybrid RAG/keywords, voor QA."""
# Deze sync helper is bewust niet geïmplementeerd om misbruik te voorkomen.
# Gebruik altijd de async-variant: _collect_repo_context_async(...)
raise NotImplementedError("_collect_repo_context is niet beschikbaar; gebruik _collect_repo_context_async")
all_files = list_repo_files(repo_root)
# explicit paths uit vraag
picked: List[str] = []
for pth in extract_explicit_paths(question):
if pth in all_files and pth not in picked:
picked.append(pth)
else:
best = best_path_by_basename(all_files, pth)
if best and best not in picked: picked.append(best)
# hybrid rag
loop = asyncio.get_event_loop()
# NB: call hybrag via run_until_complete buiten async? we zitten al in async in hoofdhandler; hier helper sync → laat caller het async deel doen
return [] # placeholder; deze helper niet direct gebruiken buiten async
async def _collect_repo_context_async(repo_root: Path, owner_repo: Optional[str], branch: str, question: str, n_ctx: int = 8) -> list[dict]:
all_files = list_repo_files(repo_root)
picked: List[str] = []
for pth in extract_explicit_paths(question):
if pth in all_files and pth not in picked:
picked.append(pth)
else:
best = best_path_by_basename(all_files, pth)
if best and best not in picked: picked.append(best)
# DB-vragen: seed eerst met bekende DB-artefacten zodat recall direct goed is
def _db_seed_paths() -> list[str]:
prefer: list[str] = []
# 1) directe, bekende locaties
for rel in [
".env", ".env.example", "config/database.php", "config/database.yml",
"database/database.sqlite"
]:
if (repo_root / rel).exists() and rel in all_files:
prefer.append(rel)
# 2) migrations / seeders / modellen
for rel in all_files:
lo = rel.lower()
if lo.startswith("database/migrations/") or lo.startswith("database/seeders/"):
prefer.append(rel)
elif lo.startswith(("app/models/", "app/model/", "app/Models/")) and lo.endswith(".php"):
prefer.append(rel)
elif lo.endswith(".sql"):
prefer.append(rel)
# 3) ruwe heuristiek: bestanden met Schema::, DB::, select/insert/update
hits = []
for rel in all_files:
try:
txt = _read_text_file(repo_root / rel) or ""
except Exception:
continue
tlo = txt.lower()
if any(x in tlo for x in ["schema::create(", "schema::table(", "db::table(", "db::select(", "select ", "insert into ", "create table "]):
hits.append(rel)
# dedupe en cap
seen = set(); out = []
for rel in prefer + hits:
if rel not in seen:
seen.add(rel); out.append(rel)
if len(out) >= n_ctx:
break
return out
if _db_intent(question):
for p in _db_seed_paths():
if p in all_files and p not in picked:
picked.append(p)
hybrid = await hybrid_rag_select_paths(repo_root, owner_repo, branch, question, all_files, max_out=n_ctx)
for p in hybrid:
if p not in picked: picked.append(p)
# keyword fallback als nodig
if len(picked) < n_ctx:
for rel, _s in simple_keyword_search(repo_root, all_files, question, limit=n_ctx):
if rel not in picked: picked.append(rel)
# maak snippets
quotes = extract_quotes(question)
hints = extract_word_hints(question)
out = []
for rel in picked[:n_ctx]:
txt = _read_text_file(repo_root / rel) or ""
snippet = extract_focus_snippets(txt, (quotes + hints)[:6], window=320, max_snippets=2)
out.append({"path": rel, "snippet": snippet})
return out
def _trim_text_to_tokens(text: str, max_tokens: int, tok_len=approx_token_count) -> str:
if tok_len(text) <= max_tokens:
return text
# ruwe char-slice obv 4 chars/token
max_chars = max(200, max_tokens * 4)
return text[:max_chars]
def _jaccard_tokens(a: str, b: str) -> float:
ta = set(re.findall(r"[A-Za-z0-9_]+", (a or "").lower()))
tb = set(re.findall(r"[A-Za-z0-9_]+", (b or "").lower()))
if not ta or not tb:
return 0.0
return len(ta & tb) / max(1, len(ta | tb))
def _db_intent(text: str) -> bool:
"""Detecteer of de vraag over DB-verbindingen/schema/queries gaat."""
t = (text or "").lower()
keys = [
"database", "sql", "microsoft sql", "ms sql", "mssql", "sql server",
"schema", "tabel", "tabellen", "migratie", "migrations",
"query", "queries", "select", "insert", "update", "delete",
"db_", "connection string", "dsn", "driver", "host", "poort", "poortnummer",
"database.php", ".env"
]
return any(k in t for k in keys)
def _prepare_contexts_under_budget(
contexts: List[dict],
question: str,
stack_summary_text: str,
*,
budget_tokens: int = int(os.getenv("AGENT_QA_CTX_BUDGET_TOKENS", "7000")),
2025-11-06 13:42:26 +00:00
tok_len=approx_token_count
) -> List[dict]:
"""
Slimme budgetverdeler:
- dedup & near-dedup
- novelty-gewicht t.o.v. reeds gekozen snippets
- adaptieve toekenningsstrategie met min/max per snippet
"""
if not contexts:
return contexts
# Tunables (mil de default iets conservatiever):
MIN_PER = int(os.getenv("QA_MIN_PER_SNIPPET", "180")) # hard min
MAX_PER = int(os.getenv("QA_MAX_PER_SNIPPET", "900")) # hard max
KEEP_TOP = int(os.getenv("QA_KEEP_TOP_K", "8")) # cap op #snippets
NOVELTY_THRESH = float(os.getenv("QA_NOVELTY_DROP", "0.25")) # onder deze novelty laten we vallen
DEDUP_THRESH = float(os.getenv("QA_DEDUP_JACCARD", "0.85")) # zeer hoge overlap => drop
# 0) cap aantal snippets alvast (caller leverde al gerankt)
contexts = contexts[:KEEP_TOP]
# 1) brute dedup op pad + near-dup op tekst (Jaccard)
unique: List[dict] = []
seen_paths = set()
for c in contexts:
p = c.get("path","")
s = str(c.get("snippet",""))
if p in seen_paths:
continue
# near-dup check tegen al gekozen snippets
is_dup = False
for u in unique:
if _jaccard_tokens(u["snippet"], s) >= DEDUP_THRESH:
is_dup = True
break
if not is_dup:
unique.append({"path": p, "snippet": s})
seen_paths.add(p)
contexts = unique
if not contexts:
return contexts
# Overhead raming zoals voorheen (headers + vraag + stack)
header = (
"Beantwoord de vraag over deze codebase. Wees concreet, kort, en noem bronnen (padnamen).\n"
"Als zekerheid laag is, stel max 2 verduidelijkingsvragen.\n\n"
f"VRAAG:\n{question}\n\n"
f"REPO SAMENVATTING:\n{stack_summary_text or '(geen)'}\n\n"
"RELEVANTE FRAGMENTEN:\n"
)
frag_headers = "\n\n".join([f"{i+1}) PATH: {c['path']}\nFRAGMENT:\n" for i, c in enumerate(contexts)])
overhead_tokens = tok_len(header) + tok_len(frag_headers) + 200
# Beschikbaar voor echte snippet-inhoud
remain = max(300, budget_tokens - overhead_tokens)
n = len(contexts)
# 2) Schat "relevance proxy" = overlap tussen vraag en snippet
def rel(sn: str) -> float:
return _jaccard_tokens(question, sn)
# 3) Greedy novelty: per snippet extra score voor info die nog niet gedekt is
chosen_text = "" # cumulatieve "coverage"
scores = []
for i, c in enumerate(contexts):
s = c["snippet"]
r = rel(s)
# novelty = 1 - overlap met reeds gekozen tekst
nov = 1.0 - _jaccard_tokens(chosen_text, s) if chosen_text else 1.0
# filter extreem lage novelty: helpt ruis te schrappen
if nov < NOVELTY_THRESH and i > 0:
# Markeer als zwak; we geven m een heel lage score (kan later afvallen)
scores.append((i, r * 0.05, nov))
else:
# na 3 snippets weeg novelty zwaarder
if i >= 3:
scores.append((i, r * (0.35 + 0.65 * nov), nov))
else:
scores.append((i, r * (0.5 + 0.5 * nov), nov))
# update coverage grof: voeg tokens toe (beperkt) om drift te vermijden
if tok_len(chosen_text) < 4000:
chosen_text += "\n" + s[:1200]
# 4) Als totaal-minima al boven budget → kap staart
total_min = n * MIN_PER
if total_min > remain:
# Sorteer op score aflopend, en hou zoveel als past met MIN_PER
ranked_idx = sorted(range(n), key=lambda i: scores[i][1], reverse=True)
keep_idx = ranked_idx[: max(1, remain // MIN_PER)]
contexts = [contexts[i] for i in keep_idx]
scores = [scores[i] for i in keep_idx]
n = len(keep_idx)
# 5) Verdeel budget: iedereen MIN_PER, rest proportioneel op score; cap op MAX_PER
base = n * MIN_PER
extra = max(0, remain - base)
# normaliseer score-gewichten
raw = [max(0.0, sc) for (_i, sc, _nov) in scores]
ssum = sum(raw) or 1.0
weights = [x / ssum for x in raw]
alloc = [MIN_PER + int(extra * w) for w in weights]
# enforce MAX_PER; redistribueer overschot grofweg
overshoot = 0
for i in range(n):
if alloc[i] > MAX_PER:
overshoot += alloc[i] - MAX_PER
alloc[i] = MAX_PER
if overshoot > 0:
# verdeel overschot naar anderen die nog onder MAX_PER zitten
holes = [i for i in range(n) if alloc[i] < MAX_PER]
if holes:
plus = overshoot // len(holes)
for i in holes:
alloc[i] = min(MAX_PER, alloc[i] + plus)
# 6) Trim snippet-tekst op toegekend budget
trimmed = []
for i, c in enumerate(contexts):
sn = str(c.get("snippet",""))
sn = _trim_text_to_tokens(sn, alloc[i], tok_len)
trimmed.append({"path": c["path"], "snippet": sn})
return trimmed
async def _llm_qa_answer(question: str, stack_summary_text: str, contexts: List[dict]) -> str:
"""
Laat de LLM een bondig antwoord formuleren met bronverwijzingen.
- Antwoord in NL
- Noem paden als bronnen
- Stel max 2 verduidelijkingsvragen als informatie ontbreekt
"""
# --- NIEUW: trim contexts onder tokenbudget ---
contexts = _prepare_contexts_under_budget(
contexts, question, stack_summary_text,
budget_tokens=int(os.getenv("AGENT_QA_CTX_BUDGET_TOKENS", "7000")),
2025-11-06 13:42:26 +00:00
tok_len=approx_token_count
)
ctx_blocks = []
for i, c in enumerate(contexts, 1):
ctx_blocks.append(f"{i}) PATH: {c['path']}\nFRAGMENT:\n{c['snippet'][:1200]}") # laat 1200 char-cap staan; _prepare_contexts_ kapt al eerder af
USER = (
"Beantwoord de vraag over deze codebase. Wees concreet, kort, en noem bronnen (padnamen).\n"
"Als zekerheid laag is, stel max 2 verduidelijkingsvragen.\n\n"
f"VRAAG:\n{question}\n\n"
"REPO SAMENVATTING:\n" + (stack_summary_text or "(geen)") + "\n\n"
"RELEVANTE FRAGMENTEN:\n" + ("\n\n".join(ctx_blocks) if ctx_blocks else "(geen)") + "\n\n"
"FORMAT:\n"
"- Antwoord (kort en feitelijk)\n"
"- Bronnen: lijst van paden die je gebruikt hebt\n"
"- (optioneel) Vervolgvragen als iets onduidelijk is\n"
)
resp = await _llm_call(
[{"role":"system","content":"Je bent een zeer precieze, nuchtere code-assistent. Antwoord in het Nederlands."},
{"role":"user","content": USER}],
stream=False, temperature=0.2, top_p=0.9, max_tokens=900
)
return resp.get("choices",[{}])[0].get("message",{}).get("content","").strip()
# heuristics: iets kleinere chunks voor Laravel/Blade/Routes, anders iets groter
def _chunk_params_for_repo(root: Path) -> tuple[int,int]:
# simpele stack detectie:
is_laravel = (root / "artisan").exists() or (root / "composer.json").exists()
if is_laravel:
return int(os.getenv("CHUNK_CHARS_LARAVEL","1800")), int(os.getenv("CHUNK_OVERLAP_LARAVEL","300"))
return int(os.getenv("CHUNK_CHARS_DEFAULT","2600")), int(os.getenv("CHUNK_OVERLAP_DEFAULT","350"))
# ---------- QA repo agent ----------
async def repo_qa_answer(repo_hint: str, question: str, branch: str = "main", n_ctx: int = 8) -> str:
"""
High-level QA over een specifieke repo:
- resolve + clone/update
- (re)index RAG collectie
- stack summary
- context ophalen
- LLM antwoord met bronnen
"""
meta, _reason = resolve_repo(repo_hint)
if not meta:
# Als hint owner/repo is: meteen bestaan-check
if re.match(r"^[A-Za-z0-9_.-]+/[A-Za-z0-9_.-]+$", repo_hint):
owner, name = repo_hint.split("/", 1)
if not gitea_repo_exists(owner, name):
return f"Repo `{repo_hint}` niet gevonden of geen rechten. Controleer naam/URL/token."
return f"Kon repo niet vinden voor hint: {repo_hint}"
repo_url = meta.get("clone_url") or repo_hint
owner_repo = meta.get("full_name")
# clone/checkout
try:
async with _CLONE_SEMA:
repo_path = await _call_get_git_repo(repo_url, branch)
except Exception as e:
# fallback naar master
branch = "master"
try:
async with _CLONE_SEMA:
repo_path = await _call_get_git_repo(repo_url, branch)
except Exception as e:
return (f"Clonen mislukte voor `{owner_repo or repo_hint}`: {e}. "
"Controleer repo-naam/URL of je toegangsrechten.")
root = Path(repo_path)
# (re)index collectie voor deze repo
collection = repo_collection_name(owner_repo, branch)
chunk_chars, overlap = _chunk_params_for_repo(Path(repo_path))
try:
await _rag_index_repo_internal(
repo_url=repo_url, branch=branch, profile="auto",
include="", exclude_dirs="", chunk_chars=chunk_chars, overlap=overlap,
collection_name=collection
)
except Exception as e:
logger.warning("WARN:agent_repo:rag_index for QA failed (%s), fallback 'code_docs': %s", collection, e)
collection = "code_docs"
await _rag_index_repo_internal(
repo_url=repo_url, branch=branch, profile="auto",
include="", exclude_dirs="", chunk_chars=chunk_chars, overlap=overlap,
collection_name=collection
)
# stack summary
stack = _detect_stack_summary(root)
stack_txt = _format_stack_summary_text(stack)
try:
symbol_index_repo(root, owner_repo, branch)
except Exception as e:
logger.warning("WARN:agent_repo:symbol index build (QA) failed: %s", e)
# context
contexts = await _collect_repo_context_async(root, owner_repo, branch, question, n_ctx=n_ctx)
# antwoord
answer = await _llm_qa_answer(question, stack_txt, contexts)
return answer
# ---------- Dry-run voorstel ----------
async def propose_patches_without_apply(repo_path: str, candidates: List[str], user_goal: str) -> Tuple[Dict[str,str], Dict[str,str], Dict[str,str]]:
"""
Returns: proposed, diffs, reasons
- reasons[pad] bevat korte uitleg over de wijziging/keuze
"""
proposed, diffs, reasons = {}, {}, {}
root = Path(repo_path)
token_steps = [1536, 1024, 768, 512]
quotes = extract_quotes(user_goal)
hints = extract_word_hints(user_goal)
old_new = (quotes[0], quotes[1]) if len(quotes) >= 2 else (None, None)
# Bepaal taaktype lokaal (lichtgewicht, 1 LLM-call; framework-heuristiek)
is_laravel = (root / "artisan").exists() or (root / "composer.json").exists()
try:
_route = await _llm_task_route(user_goal, framework=("laravel" if is_laravel else "generic"))
_task_type = (_route.get("task_type") or "").lower()
except Exception:
_task_type = ""
def _is_view_or_lang(path: str) -> bool:
return path.endswith(".blade.php") or path.startswith("resources/lang/")
for rel in candidates:
p = root / rel
# als het pad nog niet bestaat probeer een create-voorstel
if not p.exists():
content, because = await propose_new_file(root, rel, user_goal)
if content:
proposed[rel] = content
diffs[rel] = make_new_file_diff(rel, content, max_lines=300)
reasons[rel] = because
else:
logger.info("INFO:agent_repo:no create-proposal for missing file %s", rel)
continue
try:
original = _read_text_file(p)
except Exception:
original = ""
if not original:
logger.info("INFO:agent_repo:skip unreadable/empty %s", rel)
continue
# 0) Gerichte, veilige fallback-literal replace (alleen bij oud->nieuw)
old, new, why_pair = deduce_old_new_literals(user_goal, original)
if old and new:
tmp, ok, because = targeted_fallback_replace(original, old, new)
if ok and tmp != original:
# anti-destructie niet nodig: minimale vervanging
proposed[rel] = tmp
diffs[rel] = make_diffs(original, tmp, rel, max_lines=200)
reasons[rel] = f"{because}. ({why_pair})"
continue
# 1) HTML-scope als prompt tags noemt
ctx = extract_context_hints_from_prompt(user_goal)
if old and new and ctx["tag_names"]:
scoped, ok, because = html_scoped_literal_replace(original, old, new, ctx["tag_names"])
if ok and scoped != original and not is_destructive(original, scoped, allow_destructive=False):
proposed[rel] = scoped
diffs[rel] = make_diffs(original, scoped, rel, max_lines=200)
reasons[rel] = (because + (f" ({why_pair})" if why_pair else ""))
continue
# 2) Fallback-literal (??,?:, "", or) - volledig generiek
#if old and new:
# tmp, ok, because = targeted_fallback_replace(original, old, new)
# if ok and tmp != original and not is_destructive(original, tmp, allow_destructive=False):
# proposed[rel] = tmp
# diffs[rel] = make_diffs(original, tmp, rel, max_lines=200)
# reasons[rel] = (because + (f" ({why_pair})" if why_pair else ""))
# continue
# Zit al in stap 0)
# 3) Algemene quoted-literal (taalagnostisch, behoud minimaliteit)
if old and new:
qrep, ok, because = quoted_literal_replace(original, old, new, max_occurrences=2)
if ok and qrep != original and not is_destructive(original, qrep, allow_destructive=False):
proposed[rel] = qrep
diffs[rel] = make_diffs(original, qrep, rel, max_lines=200)
reasons[rel] = (because + (f" ({why_pair})" if why_pair else ""))
continue
# 4) Focus-snippets + LLM edit-plan
needles = []
if quotes: needles += quotes
if hints: needles += hints[:6]
focus = extract_focus_snippets(original, needles, window=240, max_snippets=3)
# Tree-hint standaard aan: maak compacte map-tree en zet in globale var voor de prompt
try:
globals()["_LLM_EDIT_TREE_HINT"] = _make_local_tree_hint(root, rel, max_siblings=14)
except Exception:
globals()["_LLM_EDIT_TREE_HINT"] = ""
plan = await llm_plan_edits_for_file(user_goal, rel, focus)
if plan:
patched, change_count, explains, allow_destructive = apply_edit_plan(original, plan)
if change_count > 0 and patched.strip() != original.strip():
if is_destructive(original, patched, allow_destructive):
logger.warning("WARN:agent_repo:destructive patch blocked for %s", rel)
else:
proposed[rel] = patched
diffs[rel] = make_diffs(original, patched, rel, max_lines=200)
reasons[rel] = "LLM edit-plan: " + "; ".join(explains[:4])
continue
# 5) Volledige rewrite fallback (met guard)
# Bij UI-label taken verbieden we volledige rewrites op NIET-view/lang bestanden.
if _task_type == "ui_label_change" and not _is_view_or_lang(rel):
logger.info("INFO:agent_repo:skip full rewrite for non-view/lang during ui_label_change: %s", rel)
# sla deze stap over; ga door naar volgende kandidaat
continue
last_err = None
for mx in [2048]:
2025-11-06 13:42:26 +00:00
try:
messages = [
{"role":"system","content":"Voer exact de gevraagde wijziging uit. GEEN extra refactors/best practices. Lever de volledige, werkende bestandinformatie als 1 codeblok."},
{"role":"user","content": f"Doel:\n{user_goal}\n\nBestand ({rel}) huidige inhoud:\n```\n{original}\n```"}
]
resp = await _llm_call(messages, stream=False, temperature=0.2, top_p=0.9, max_tokens=mx)
newc = _extract_code_block(resp.get("choices",[{}])[0].get("message",{}).get("content","")) or original
if newc.strip() != original.strip():
if is_destructive(original, newc, allow_destructive=False):
logger.warning("WARN:agent_repo:destructive rewrite blocked for %s (ratio>%.2f)", rel, AGENT_DESTRUCTIVE_RATIO)
break # early-exit: geen extra pogingen
proposed[rel] = newc
diffs[rel] = make_diffs(original, newc, rel, max_lines=200)
reasons[rel] = "Full rewrite (guarded): minimale aanpassing om het doel te halen."
break
except Exception as e:
last_err = e
logger.warning("WARN:agent_repo:LLM rewrite fail %s mx=%d: %s", rel, mx, repr(e))
#continue
if rel not in proposed and last_err:
logger.error("ERROR:agent_repo:give up on %s after retries: %s", rel, repr(last_err))
# --- Syntax guard filtering (laatste stap) ---
drop: List[str] = []
for rel, content in proposed.items():
try:
if rel.endswith(".php"):
tmp = _write_tmp(content, ".php")
ok = _php_lint_ok(tmp)
try: tmp.unlink(missing_ok=True)
except Exception: pass
if not ok:
reasons[rel] = (reasons.get(rel,"") + " [PHP lint failed]").strip()
drop.append(rel)
elif rel.endswith(".blade.php"):
if not _blade_balance_ok(content):
reasons[rel] = (reasons.get(rel,"") + " [Blade balance failed]").strip()
drop.append(rel)
except Exception:
# in twijfel: laat de patch door (fail-open), maar log upstream
pass
for rel in drop:
proposed.pop(rel, None); diffs.pop(rel, None)
return proposed, diffs, reasons
# ---------- Agent state ----------
@dataclass
class AgentState:
stage: str = "TRIAGE"
questions_asked: int = 0
user_goal: str = ""
repo_hint: str = ""
selected_repo: dict | None = None
repo_url: str = ""
branch_base: str = AGENT_DEFAULT_BRANCH
repo_path: str = ""
owner_repo: str | None = None
collection_name: str = ""
candidate_paths: List[str] = field(default_factory=list)
proposed_patches: Dict[str, str] = field(default_factory=dict)
reasons: Dict[str, str] = field(default_factory=dict)
new_branch: str = ""
dry_run: bool = True
repo_candidates: List[dict] = field(default_factory=list)
smart_preview: str = ""
recovery_attempted: bool = False
# --- bootstrap op echte repo-inhoud ------------------------------------------------
async def _detect_repo_url(text: str) -> str | None:
m = re.search(r"(https?://\S+?\.git)\b", text or "")
return m.group(1) if m else None
async def _ensure_indexed(repo_url: str, *, branch: str = "main", profile: str = "auto",
rag_index_repo_internal_fn=None, get_git_repo_fn=None):
# clone/update (best-effort) om failures vroeg te vangen
if get_git_repo_fn:
try:
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, get_git_repo_fn, repo_url, branch)
except Exception:
pass
if rag_index_repo_internal_fn:
await rag_index_repo_internal_fn(
repo_url=repo_url, branch=branch, profile=profile,
include="", exclude_dirs="",
chunk_chars=int(os.getenv("RAG_CHUNK_CHARS","3000")),
overlap=int(os.getenv("RAG_CHUNK_OVERLAP","400")),
collection_name=os.getenv("RAG_COLLECTION","code_docs"),
)
async def _bootstrap_overview(repo_url: str, rag_query_internal_fn, *, collection="code_docs") -> str:
"""Haalt echte passages op en maakt een compacte context."""
# Bij per-repo collections is een extra repo-filter contraproductief.
# Gebruik daarom repo=None zodra we een collection doorgeven.
owner, name = owner_repo_from_url(repo_url)
repo_full = f"{owner}/{name}" if (owner and name) else None
wants = [
{"q": "project overview readme", "path_contains": "README"},
{"q": "install setup configuration", "path_contains": "README"},
{"q": "composer dependencies autoload", "path_contains": "composer.json"},
{"q": "npm dependencies scripts", "path_contains": "package.json"},
{"q": "routes definitions", "path_contains": "routes"},
{"q": "controllers overview", "path_contains": "app/Http/Controllers"},
{"q": "views templates blade", "path_contains": "resources/views"},
{"q": "env example", "path_contains": ".env"},
]
chunks = []
for w in wants:
res = await rag_query_internal_fn(
query=w["q"], n_results=3,
collection_name=collection, # per-repo collectie al gebruikt
repo=None, # voorkom dubbele/te strikte scoping
path_contains=w["path_contains"], profile=None
)
chunks.extend((res or {}).get("results", []))
seen = set(); buf = []
for r in chunks[:18]:
meta = r.get("metadata") or {}
key = (meta.get("path",""), meta.get("chunk_index"))
if key in seen:
continue
seen.add(key)
body = (r.get("document") or "").strip()[:1200]
buf.append(f"### {meta.get('path','')}\n{body}")
return "\n\n".join(buf[:8]).strip()
def _extract_explicit_paths_robust(text: str) -> list[str]:
"""
Haalt bestands-paden uit vrije tekst robuust op.
Herkent tokens met minimaal één '/' en één '.' (extensie),
negeert trailing leestekens.
"""
if not text:
return []
pats = re.findall(r"[A-Za-z0-9_./\\-]+\\.[A-Za-z0-9_.-]+", text)
out = []
for p in pats:
# normaliseer Windows backslashes → unix
p = p.replace("\\", "/")
# strip algemene trailing chars
p = p.strip().strip(",.;:)]}>'\"")
if "/" in p and "." in p:
out.append(p)
# de-dup behoud volgorde
seen = set(); uniq = []
for p in out:
if p not in seen:
uniq.append(p); seen.add(p)
return uniq
2025-11-27 07:54:07 +00:00
def _sanitize_path_hints(hints: list[str], all_files: list[str]) -> list[str]:
"""
Filter pseudo-paden zoals 'tool.list' weg. Sta alleen echte projectpaden of
bekende extensies toe en vereis een '/' om pure tokens te weren.
"""
if not hints:
return []
ALLOWED_SUFFIXES = (
".blade.php",".php",".js",".ts",".json",".yml",".yaml",".py",".md",".env",
".sql",".css",".vue",".jsx",".tsx"
)
BAD_BASENAMES = {"tool","tools","list","search","update","create","store","index"}
out, seen = [], set()
for h in hints:
if not h:
continue
h = h.strip().lstrip("./").replace("\\","/")
if "/" not in h:
continue
base = os.path.basename(h)
stem = base.split(".",1)[0].lower()
if h not in all_files and not any(h.endswith(suf) for suf in ALLOWED_SUFFIXES):
continue
if stem in BAD_BASENAMES and h not in all_files:
continue
if h not in seen:
seen.add(h); out.append(h)
return out
2025-11-06 13:42:26 +00:00
def _grep_repo_for_literal(root: Path, needle: str, limit: int = 12) -> list[str]:
"""
Heel snelle, ruwe literal-zoeker over tekstbestanden in de repo.
Retourneert lijst met relatieve paden waar 'needle' voorkomt (top 'limit').
"""
if not needle or len(needle) < 2:
return []
hits = []
try:
for p in root.rglob("*"):
if p.is_dir():
continue
# respecteer uitgesloten directories en grootte-limiet
if any(part in _PROFILE_EXCLUDE_DIRS for part in p.parts):
continue
try:
if p.stat().st_size > 500_000:
continue
except Exception:
continue
# alleen tekst-achtige extensies volgens allowed_file()
if not allowed_file(p):
continue
# lees als tekst (met best-effort fallback)
try:
txt = p.read_text(encoding="utf-8", errors="ignore")
except Exception:
try:
txt = p.read_text(encoding="latin-1", errors="ignore")
except Exception:
continue
if needle in txt:
try:
rel = str(p.relative_to(root))
except Exception:
rel = str(p)
hits.append(rel)
if len(hits) >= limit:
break
except Exception:
pass
return hits
def _laravel_priors_from_prompt(user_goal: str, root: Path, all_files: list[str], max_k: int = 8) -> list[str]:
"""
Geef een lijst met waarschijnlijke Laravel-bestanden op basis van conventies + prompt-keywords.
Neem ALLEEN paden op die daadwerkelijk bestaan in de repo (all_files).
"""
text = (user_goal or "").lower()
exists = set(all_files)
priors: list[str] = []
def add_if_present(paths: list[str]):
for p in paths:
if p in exists and p not in priors:
priors.append(p)
# Altijd nuttige ankerpunten in Laravel repos
add_if_present([
"routes/web.php",
"routes/api.php",
"config/app.php",
"config/database.php",
".env",
".env.example",
"resources/lang/en.json",
"resources/lang/nl.json",
])
# Prompt-gestuurde hints
if any(k in text for k in ("api ", "endpoint", "jwt", "sanctum", "api-route")):
add_if_present(["routes/api.php"])
if any(k in text for k in ("route", "router", "web", "pagina", "page", "url ")):
add_if_present(["routes/web.php"])
if any(k in text for k in ("controller", "actie", "action", "handler", "store(", "update(", "create(", "edit(")):
# neem de meest voorkomende controllers-map mee
# (geen directory listing; we kiezen alleen de indexerende anchor-files)
for p in exists:
if p.startswith("app/Http/Controllers/") and p.endswith(".php"):
priors.append(p)
if len(priors) >= max_k:
break
if any(k in text for k in ("view", "blade", "template", "pagina", "page", "formulier", "form")):
# bekende view-locaties
add_if_present([
"resources/views/layouts/app.blade.php",
"resources/views/welcome.blade.php",
"resources/views/dashboard.blade.php",
])
# heuristisch: als prompt een padfragment noemt (b.v. 'log/create'), pak views daaronder
m = re.search(r"resources/views/([A-Za-z0-9_/\-]+)/", user_goal)
if m:
base = f"resources/views/{m.group(1).strip('/')}/"
for p in exists:
if p.startswith(base) and p.endswith(".blade.php") and p not in priors:
priors.append(p)
if len(priors) >= max_k:
break
if any(k in text for k in ("validatie", "validation", "formrequest", "request class", "rules(")):
# vaak custom FormRequest classes
for p in exists:
if p.startswith("app/Http/Requests/") and p.endswith(".php"):
priors.append(p)
if len(priors) >= max_k:
break
if any(k in text for k in ("database", "db", "sql", "sqlserver", "mssql", "mysql", "pgsql", "connection", "migratie", "migration", "schema")):
add_if_present(["config/database.php", ".env", ".env.example"])
# migrations en models zijn vaak relevant
for p in exists:
if (p.startswith("database/migrations/") and p.endswith(".php")) or \
(p.startswith("app/Models/") and p.endswith(".php")):
priors.append(p)
if len(priors) >= max_k:
break
if any(k in text for k in ("taal", "language", "vertaling", "translation", "lang", "i18n")):
# neem json én php lang packs mee
for p in exists:
if p.startswith("resources/lang/") and (p.endswith(".json") or p.endswith(".php")):
priors.append(p)
if len(priors) >= max_k:
break
# dedupe + cap
uniq: list[str] = []
seen = set()
for p in priors:
if p not in seen:
uniq.append(p); seen.add(p)
if len(uniq) >= max_k:
break
return uniq
async def _llm_framework_priors(user_goal: str, all_files: list[str], framework: str = "laravel", max_k: int = 10) -> list[str]:
"""
Laat de LLM kansrijke BESTAANDE bestanden/globs voorstellen op basis van framework-conventies.
- Output MOET JSON zijn: {"files":[...]} met relatieve paden of simpele globs.
- We filteren op echt-bestaande paden (match tegen all_files), globs toegestaan.
- Geen netwerk I/O; 1 kleine LLM-call.
"""
text = (user_goal or "").strip()
if not text:
return []
# Bescheiden token budget
sys = ("You are a precise code navigator. Output ONLY compact JSON with likely file paths for the task.\n"
"Rules:\n- Return: {\"files\":[\"relative/path/or/glob\", ...]}\n"
"- Use framework conventions (e.g., Laravel routes/controllers/views, config, .env, migrations, lang).\n"
"- Do NOT invent files that cannot exist; prefer generic globs (e.g., resources/views/**/create*.blade.php).\n"
"- No explanations, no prose.")
usr = (f"Framework: {framework}\n"
f"Task/prompt:\n{text}\n"
"Return at most 15 items.\n"
"Examples for Laravel (if applicable): routes/web.php, app/Http/Controllers/**.php, "
"resources/views/**.blade.php, config/database.php, .env, database/migrations/**.php, resources/lang/**")
try:
resp = await _llm_call(
[{"role":"system","content":sys},{"role":"user","content":usr}],
stream=False, temperature=0.0, top_p=1.0, max_tokens=300
)
raw = (resp.get("choices",[{}])[0].get("message",{}) or {}).get("content","").strip()
except Exception:
return []
# Haal eventuele ```json fences weg
m = re.search(r"\{[\s\S]*\}", raw)
if not m:
return []
try:
obj = json.loads(m.group(0))
except Exception:
return []
items = obj.get("files") or []
if not isinstance(items, list):
return []
# Glob -> concrete bestanden; filter op bestaande paden
exists = set(all_files)
out: list[str] = []
def _match(pat: str) -> list[str]:
# simpele glob: **, *, ?. We matchen tegen all_files.
try:
pat_norm = pat.strip().lstrip("./")
return [f for f in all_files if fnmatch.fnmatch(f, pat_norm)]
except Exception:
return []
for it in items:
if not isinstance(it, str) or not it.strip():
continue
it = it.strip().lstrip("./")
if it in exists:
if it not in out:
out.append(it)
else:
for hit in _match(it):
if hit not in out:
out.append(hit)
if len(out) >= max_k:
break
return out[:max_k]
async def _llm_task_route(user_goal: str, framework: str = "laravel") -> dict:
"""
Laat de LLM expliciet kiezen: {task_type, categories[], hints[]}
Voorbeelden task_type:
- "ui_label_change", "db_credentials", "db_queries", "routes_to_views", "config_env", "generic_code_change"
categories: welke mappen/artefacten zijn relevant (bv. ["views","controllers","routes","migrations","config",".env"])
hints: korte trefwoorden of view/controller namen.
"""
if not (user_goal or "").strip():
return {}
sys = ("You are a precise task router. Return ONLY compact JSON.\n"
"Schema: {\"task_type\":str, \"categories\":[str,...], \"hints\":[str,...]}\n"
"Use framework conventions (e.g., Laravel). No explanations.")
usr = f"Framework: {framework}\nUser goal:\n{user_goal}\nReturn at most 6 categories and 8 hints."
try:
resp = await _llm_call(
[{"role":"system","content":sys},{"role":"user","content":usr}],
stream=False, temperature=0.0, top_p=1.0, max_tokens=250
)
raw = (resp.get('choices',[{}])[0].get('message',{}) or {}).get('content','')
m = re.search(r"\{[\s\S]*\}", raw or "")
obj = json.loads(m.group(0)) if m else {}
# sanitize
obj["task_type"] = (obj.get("task_type") or "generic_code_change")[:64]
obj["categories"] = [str(x)[:32] for x in (obj.get("categories") or [])][:8]
obj["hints"] = [str(x)[:64] for x in (obj.get("hints") or [])][:8]
return obj
except Exception:
return {"task_type":"generic_code_change","categories":[],"hints":[]}
# ---------- Hoofd-handler ----------
async def handle_repo_agent(messages: List[dict], request) -> str:
2025-11-20 15:16:00 +00:00
"""
Uitbreiding: fast-path voor unified diffs op expliciete bestanden met tekstvervanging.
Als niet van toepassing, valt automatisch terug op de bestaande flow.
"""
# 1) Combineer user/system content om opdracht te parsen
try:
full_txt = "\n".join([m.get("content","") for m in messages if m.get("role") in ("system","user")])
except Exception:
full_txt = ""
# 2) Herken fast-path
try_fast = _looks_like_unified_diff_request(full_txt)
paths_fp = _extract_explicit_paths(full_txt) if try_fast else []
old_txt, new_txt = _extract_replace_pair(full_txt) if try_fast else (None, None)
# NB: we gebruiken de injecties die via initialize_agent zijn gezet:
# - get_git_repo_fn (async)
# - read_text_file_fn (sync)
# Deze symbolen worden onderin initialize_agent aan globals() gehangen.
get_git_repo_fn = globals().get("get_git_repo_fn")
read_text_file_fn = globals().get("read_text_file_fn")
if try_fast and paths_fp and old_txt and new_txt and callable(get_git_repo_fn) and callable(read_text_file_fn):
# 3) repo + branch bepalen
repo_url, branch = _extract_repo_branch_from_text(full_txt)
if not repo_url:
# fallback: probeer repo uit eerdere agent-state (optioneel), anders stop fast-path
repo_url = globals().get("_last_repo_url")
branch = globals().get("_last_branch", "main")
if repo_url:
try:
repo_root = await get_git_repo_fn(repo_url, branch or "main")
root = Path(repo_root)
lang_path = root / "resources" / "lang" / "nl.json"
lang_before = lang_path.read_text(encoding="utf-8", errors="ignore") if lang_path.exists() else "{}"
lang_data = {}
try:
lang_data = json.loads(lang_before or "{}")
except Exception:
lang_data = {}
diffs_out = []
lang_changed = False
def _make_udiff(a: str, b: str, rel: str) -> str:
return "".join(difflib.unified_diff(
a.splitlines(keepends=True),
b.splitlines(keepends=True),
fromfile=f"a/{rel}", tofile=f"b/{rel}", n=3
))
# 4) per bestand: ofwel inline replace, ofwel vertaling bijwerken
for rel in paths_fp:
p = root / rel
if not p.exists():
continue
before = read_text_file_fn(p)
if not before:
continue
# Als de 'oude' tekst voorkomt BINNEN een vertaalwrapper, dan géén blade-edit
found_in_wrapper = False
for pat in _TRANS_WRAPPERS:
for m in re.finditer(pat, before):
inner = m.group(1)
if inner == old_txt:
found_in_wrapper = True
break
if found_in_wrapper:
break
if found_in_wrapper:
# update nl.json: {"oude": "nieuwe"}
if lang_data.get(old_txt) != new_txt:
lang_data[old_txt] = new_txt
lang_changed = True
continue
# anders: directe, exacte vervanging (conservatief)
after = before.replace(old_txt, new_txt)
if after != before:
diff = _make_udiff(before, after, rel)
if diff.strip():
diffs_out.append(("blade", rel, diff))
# 5) indien vertaling gewijzigd: diff voor nl.json toevoegen
if lang_changed:
new_lang = json.dumps(lang_data, ensure_ascii=False, indent=2, sort_keys=True) + "\n"
diff_lang = _make_udiff(lang_before if isinstance(lang_before, str) else "", new_lang, "resources/lang/nl.json")
if diff_lang.strip():
diffs_out.append(("lang", "resources/lang/nl.json", diff_lang))
if diffs_out:
parts = ["### Unified diffs"]
for kind, rel, d in diffs_out:
parts.append(f"**{rel}**")
parts.append("```diff\n" + d + "```")
return "\n\n".join(parts)
else:
return "Dry-run: geen wijzigbare treffers gevonden in opgegeven bestanden (of reeds actueel)."
except Exception as e:
# mislukt → val terug op bestaande discover/agent flow
pass
# === GEEN fast-path → ga door met de bestaande flow hieronder ===
2025-11-06 13:42:26 +00:00
sid = _get_session_id(messages, request)
st = _app.state.AGENT_SESSIONS.get(sid) or AgentState()
_app.state.AGENT_SESSIONS[sid] = st
user_last = next((m["content"] for m in reversed(messages) if m.get("role")=="user"), "").strip()
user_last_lower = user_last.lower()
logger.info("INFO:agent_repo:[%s] stage=%s", sid, st.stage)
2025-11-20 15:16:00 +00:00
from smart_rag import (
enrich_intent,
expand_queries,
hybrid_retrieve,
_laravel_pairs_from_route_text,
_laravel_guess_view_paths_from_text,
)
2025-11-06 13:42:26 +00:00
# Als user een .git URL meegeeft: zet state en ga via de state-machine verder
user_txt = next((m.get("content","") for m in reversed(messages) if m.get("role")=="user"), "")
repo_url = await _detect_repo_url(user_txt)
if repo_url:
st.repo_hint = repo_url
st.stage = "SELECT_REPO"
logger.info("INFO:agent_repo:[%s] direct SELECT_REPO via .git url: %s", sid, repo_url)
# LET OP: geen vroegtijdige return hier; de SELECT_REPO tak hieronder handelt DISCOVER/INDEX etc. af.
# === SMART-RAG: opt-in pad (alleen als er nog GEEN repo is) ===
smart_enabled = str(os.getenv("REPO_AGENT_SMART","1")).lower() not in ("0","false")
if smart_enabled and not st.repo_hint and st.stage in ("TRIAGE","ASK"):
# 1) intent → plan
spec = await enrich_intent(_llm_call, messages)
task = spec.get("task","").strip()
file_hints = spec.get("file_hints") or []
keywords = spec.get("keywords") or []
constraints= spec.get("constraints") or []
acceptance = spec.get("acceptance") or []
ask = spec.get("ask")
# 2) query expansion (kort) en hybride retrieval
variants = await expand_queries(_llm_call, task, k=int(os.getenv("RAG_EXPAND_K","3")))
merged: list[dict] = []
for i, qv in enumerate(variants):
partial = await hybrid_retrieve(
_rag_query_internal,
qv,
repo= None,
profile= None,
path_contains=(file_hints[0] if file_hints else None),
per_query_k=int(os.getenv("RAG_PER_QUERY_K","30")),
n_results=int(os.getenv("RAG_N_RESULTS","18")),
alpha=float(os.getenv("RAG_EMB_WEIGHT","0.6")),
)
merged.extend(partial)
# dedupe op path+chunk
seen = set(); uniq = []
for r in sorted(merged, key=lambda x: x["score"], reverse=True):
meta = r.get("metadata") or {}
key = (meta.get("path",""), meta.get("chunk_index",""))
if key in seen: continue
seen.add(key); uniq.append(r)
# 3) context + confidence
ctx_text, top_score = assemble_context(uniq, max_chars=int(os.getenv("REPO_AGENT_CONTEXT_CHARS","640000")))
# heel simpele confidence: als top_score erg laag is en vragen toegestaan → stel 1 verhelderingsvraag
if ask and float(os.getenv("REPO_AGENT_ASK_CLARIFY","1")) and top_score < float(os.getenv("REPO_AGENT_ASK_THRESHOLD","0.35")):
return f"Snelle check: {ask}"
# 4) finale prompt samenstellen
sys = (
"Je bent een senior code-assistent. "
"Lees de contextfragmenten (met padheaders). "
"Beantwoord taakgericht, concreet en veilig. "
"Als je verbeteringen doet, geef dan eerst een kort plan en daarna exacte, toepasbare wijzigingen."
)
user = (
f"TAKEN:\n{task}\n\n"
f"CONSTRAINTS: {', '.join(constraints) or '-'}\n"
f"ACCEPTANCE: {', '.join(acceptance) or '-'}\n"
f"KEYWORDS: {', '.join(keywords) or '-'}\n"
f"FILE HINTS: {', '.join(file_hints) or '-'}\n\n"
f"--- CONTEXT (gedeeltelijk) ---\n{ctx_text}\n--- EINDE CONTEXT ---\n\n"
"Geef eerst een kort, puntsgewijs plan (max 6 bullets). "
"Daarna de concrete wijzigingen per bestand met codeblokken. "
"Geen herhaling van hele bestanden als dat niet nodig is."
)
llm_resp = await _llm_call(
[{"role":"system","content":sys},{"role":"user","content":user}],
stream=False, temperature=0.2, top_p=0.9, max_tokens=2048
2025-11-06 13:42:26 +00:00
)
out = (llm_resp.get("choices",[{}])[0].get("message",{}) or {}).get("content","")
if out.strip():
# niet returnen — maar bijvoorbeeld loggen of meesturen als “quick analysis”
st.smart_preview = out
logger.info("SMART-RAG preview gemaakt (geen vroegtijdige exit)")
# === /SMART-RAG ===
if any(k in user_last_lower for k in ["dry-run","dryrun","preview"]): st.dry_run = True
if "apply" in user_last_lower and ("akkoord" in user_last_lower or "ga door" in user_last_lower): st.dry_run = False
if st.stage == "TRIAGE":
logger.info("Stage TRIAGE")
st.user_goal = user_last
# Optioneel: intent refine + verduidelijkingsvragen
if AGENT_ENABLE_GOAL_REFINE and st.user_goal:
try:
refined, questions, conf = await llm_refine_goal(st.user_goal)
if refined and refined != st.user_goal:
st.user_goal = refined
if questions and conf < AGENT_CLARIFY_THRESHOLD:
st.stage = "ASK"
qtxt = "\n".join([f"- {q}" for q in questions])
return ("Om zeker de juiste bestanden te kiezen, beantwoord kort:\n" + qtxt)
except Exception:
pass
st.stage = "ASK"
base = ("Ik verken de code en doe een voorstel. Geef de repo (bv. `admin/image-viewing-website` of "
2026-02-02 09:28:41 +00:00
"`http://10.25.138.40:30085/admin/image-viewing-website.git`). "
2025-11-06 13:42:26 +00:00
"Of zeg: **'zoek repo'** als ik zelf moet zoeken.")
return _with_preview(base, st)
if st.stage == "ASK":
logger.info("Stage ASK ")
# 1) check of er een repo-hint in de zin zit
hint = None
m = re.search(r"(https?://\S+)", user_last)
if m: hint = m.group(1)
elif "/" in user_last:
for p in user_last.split():
if re.match(r"^[A-Za-z0-9_.\-]+/[A-Za-z0-9_.\-]+$", p): hint = p; break
# 2) Als expliciete vraag om repo te zoeken óf geen hint → auto-discovery
if (not hint) and ("zoek repo" in user_last_lower):
# Probeer auto-discovery
st.repo_candidates = await discover_candidate_repos(st.user_goal)
if not st.repo_candidates:
st.questions_asked += 1
return _with_preview("Ik kon geen repos vinden. Geef de Gitea repo (owner/repo) of volledige .git-URL.", st)
# Normalize scores naar 0..1
maxs = max((c.get("score",0.0) for c in st.repo_candidates), default=0.0) or 1.0
for c in st.repo_candidates:
c["score"] = min(1.0, c["score"]/maxs) if maxs else 0.0
best = st.repo_candidates[0]
# Als hoogste score duidelijk is, auto-select
if best.get("score",0.0) >= AGENT_AUTOSELECT_THRESHOLD and best.get("clone_url"):
st.repo_hint = best["clone_url"]
st.stage = "SELECT_REPO"
return _with_preview(f"Repo automatisch gekozen: **{best['full_name']}** (score {best['score']:.2f}).", st)
# Anders: laat top-3 zien en vraag keuze
st.stage = "CONFIRM_REPO"
lines = []
for i, c in enumerate(st.repo_candidates[:3], 1):
lines.append(f"{i}. {c['full_name']} — score {c.get('score',0.0):.2f}")
base = "Ik vond deze passende repos:\n" + "\n".join(lines) + "\nKies een nummer, of typ de naam/URL."
return _with_preview(base, st)
# 3) Er is wel een hint - ga door
if hint:
st.repo_hint = hint
st.stage = "SELECT_REPO"
else:
st.questions_asked += 1
if st.questions_asked <= AGENT_MAX_QUESTIONS:
return _with_preview("Graag de Gitea repo (owner/repo) of volledige .git-URL.", st)
return _with_preview("Ik heb de repo-naam of URL nodig om verder te gaan.", st)
if st.stage == "CONFIRM_REPO":
logger.info("Stage CONFIRM_REPO")
# parse keuze
pick = None
m = re.match(r"^\s*([1-5])\s*$", user_last)
if m:
idx = int(m.group(1)) - 1
if 0 <= idx < len(st.repo_candidates):
pick = st.repo_candidates[idx]
if not pick:
# probeer naam match
for c in st.repo_candidates:
if c["full_name"].lower() in user_last_lower or (c.get("clone_url","") and c["clone_url"] in user_last):
pick = c; break
if not pick:
return _with_preview("Typ een nummer (1..3) of de naam/URL van de repo.", st)
st.repo_hint = pick.get("clone_url") or (f"{GITEA_URL}/{pick['full_name']}.git")
st.stage = "SELECT_REPO"
return _with_preview(f"Repo gekozen: **{pick['full_name']}**.", st)
if st.stage == "SELECT_REPO":
logger.info("Stage SELECT_REPO")
repo_meta, reason = resolve_repo(st.repo_hint)
if not repo_meta:
return (f"Geen repo gevonden voor “{st.repo_hint}”. Probeer volledige URL: {GITEA_URL}/<owner>/<repo>.git")
st.selected_repo = repo_meta
st.repo_url = repo_meta.get("clone_url") or ""
st.owner_repo = repo_meta.get("full_name")
if not st.repo_url:
return f"Geen clone URL voor “{st.repo_hint}”."
progress = [f"Repo ({reason}): {st.owner_repo or st.repo_url}"]
# DISCOVER
logger.info("DISCOVER")
try:
try:
st.repo_path = await _call_get_git_repo(st.repo_url, st.branch_base)
except Exception as e_main:
logger.warning("WARN:agent_repo:get_git_repo %s failed: %s; fallback master", st.branch_base, e_main)
st.branch_base = "master"
st.repo_path = await _call_get_git_repo(st.repo_url, st.branch_base)
st.collection_name = repo_collection_name(st.owner_repo, st.branch_base)
chunk_chars, overlap = _chunk_params_for_repo(Path(st.repo_path))
# ── Fast-path: check HEAD en sla index over als ongewijzigd ──
try:
import git
head_sha = await run_in_threadpool(lambda: git.Repo(st.repo_path).head.commit.hexsha)
except Exception:
head_sha = ""
#memo_key = f"{st.repo_url}|{st.branch_base}|{st.collection_name}"
# Brede key (repo+branch) voorkomt dubbele index runs bij dezelfde HEAD,
# ook als collection_name varieert.
memo_key = f"{st.repo_url}|{st.branch_base}"
if _INDEX_HEAD_MEMO.get(memo_key) == head_sha and head_sha:
progress.append(f"Index overslaan: HEAD ongewijzigd ({head_sha[:7]}).")
else:
try:
res = await _rag_index_repo_internal(
repo_url=st.repo_url, branch=st.branch_base,
profile="auto", include="", exclude_dirs="",
chunk_chars=chunk_chars, overlap=overlap, collection_name=st.collection_name
)
# alleen updaten als index call succesvol was
_INDEX_HEAD_MEMO[memo_key] = head_sha or _INDEX_HEAD_MEMO.get(memo_key, "")
if isinstance(res, dict) and res.get("status") == "skipped":
progress.append(f"Index: skip (cache) — HEAD {head_sha[:7]}.")
else:
progress.append("Index: bijgewerkt.")
except Exception as e_idx:
logger.warning("WARN:agent_repo:rag index failed '%s': %s; fallback 'code_docs'", st.collection_name, e_idx)
st.collection_name = "code_docs"
res = await _rag_index_repo_internal(
repo_url=st.repo_url, branch=st.branch_base,
profile="auto", include="", exclude_dirs="",
chunk_chars=chunk_chars, overlap=overlap, collection_name=st.collection_name
)
_INDEX_HEAD_MEMO[memo_key] = head_sha or _INDEX_HEAD_MEMO.get(memo_key, "")
# na succesvolle _rag_index_repo_internal(...) en meili/bm25:
logger.info("Symbol index repo")
try:
symbol_index_repo(Path(st.repo_path), st.owner_repo, st.branch_base)
except Exception as e:
logger.warning("WARN:agent_repo:symbol index build failed: %s", e)
logger.info("Meili part")
if MEILI_URL:
try:
# Skip Meili herindex als HEAD ongewijzigd
if _MEILI_HEAD_MEMO.get(memo_key) == head_sha and head_sha:
progress.append("Meili: overslaan (HEAD ongewijzigd).")
else:
await run_cpu_blocking(meili_index_repo, Path(st.repo_path), st.owner_repo, st.branch_base)
_MEILI_HEAD_MEMO[memo_key] = head_sha or _MEILI_HEAD_MEMO.get(memo_key, "")
except Exception as e:
logger.warning("WARN:agent_repo:meili_index_repo failed: %s", e)
else:
try:
if _BM25_HEAD_MEMO.get(memo_key) == head_sha and head_sha:
progress.append("BM25: overslaan (HEAD ongewijzigd).")
else:
await run_cpu_blocking(bm25_build_index, Path(st.repo_path), st.owner_repo, st.branch_base)
_BM25_HEAD_MEMO[memo_key] = head_sha or _BM25_HEAD_MEMO.get(memo_key, "")
except Exception as e:
logger.warning("WARN:agent_repo:bm25_build_index failed: %s", e)
progress.append("DISCOVER klaar.")
logger.info("DISCOVER klaar.")
except Exception as e:
logger.exception("ERROR:agent_repo:DISCOVER failed")
st.stage = "ASK"
return _with_preview("\n".join(progress + [f"DISCOVER mislukte: {e}"]), st)
# RANK via hybrid RAG
logger.info("RANK via hybrid RAG")
root = Path(st.repo_path)
all_files = list_repo_files(root)
# Precompute graph + tree (per HEAD) voor ranking-boost en explain
graph = _get_graph_cached(root, memo_key=(f"{st.repo_url}|{st.branch_base}|{head_sha or ''}"))
tree_summ = _get_tree_cached(root, memo_key=(f"{st.repo_url}|{st.branch_base}|{head_sha or ''}"), all_files=all_files)
picked: List[str] = []
# 1) expliciete paden uit de prompt (bestaande extractor)
2025-11-27 07:54:07 +00:00
explicit = _sanitize_path_hints(list(extract_explicit_paths(st.user_goal) or []), all_files)
2025-11-06 13:42:26 +00:00
# 2) robuuste fallback extractor
2025-11-27 07:54:07 +00:00
robust = _sanitize_path_hints(_extract_explicit_paths_robust(st.user_goal), all_files)
2025-11-06 13:42:26 +00:00
for pth in explicit + [p for p in robust if p not in explicit]:
norm = pth.replace("\\", "/").strip()
if norm in all_files and norm not in picked:
picked.append(norm)
continue
best = best_path_by_basename(all_files, norm)
if best and best not in picked:
picked.append(best)
continue
# Als het niet bestaat: toch opnemen (voor create-flow)
if norm not in picked:
picked.append(norm)
# Laravel priors (alleen bestaande paden), vóór RAG
try:
is_laravel = (root / "artisan").exists() or (root / "composer.json").exists()
except Exception:
is_laravel = False
if is_laravel:
priors = _laravel_priors_from_prompt(st.user_goal, root, all_files, max_k=int(os.getenv("LARAVEL_PRIORS_K","8")))
for p in priors:
if p not in picked:
picked.append(p)
# ---- LLM-PRIORS (optioneel via env, standaard aan) ----
use_llm_priors = os.getenv("LLM_PRIORS_ENABLE", "1").lower() not in ("0","false","no")
if use_llm_priors:
try:
# Hint framework adhv repo
is_laravel = (root / "artisan").exists() or (root / "composer.json").exists()
except Exception:
is_laravel = False
fw = "laravel" if is_laravel else "generic"
llm_hits = await _llm_framework_priors(st.user_goal, all_files, framework=fw, max_k=int(os.getenv("LLM_PRIORS_K","12")))
for p in llm_hits:
if p not in picked:
picked.append(p)
# ---- Rules fallback (alleen als nog mager) ----
try:
is_laravel = (root / "artisan").exists() or (root / "composer.json").exists()
except Exception:
is_laravel = False
if is_laravel and len(picked) < max(4, int(os.getenv("LLM_PRIORS_MIN_BEFORE_RAG","4"))):
priors = _laravel_priors_from_prompt(st.user_goal, root, all_files, max_k=int(os.getenv("LARAVEL_PRIORS_K","8")))
for p in priors:
if p not in picked:
picked.append(p)
# --- LLM Task Router ---
is_laravel = (root / "artisan").exists() or (root / "composer.json").exists()
route = await _llm_task_route(st.user_goal, framework=("laravel" if is_laravel else "generic"))
st.reasons["task_route"] = json.dumps(route, ensure_ascii=False)
task_type = (route.get("task_type") or "").lower()
# --- LLM zoekpatronen → deterministische scan ---
if os.getenv("LLM_PATTERN_SCAN","1").lower() not in ("0","false","no"):
specs = await _llm_make_search_specs(st.user_goal, framework=("laravel" if is_laravel else "generic"))
scan_hits = _scan_repo_for_patterns(root, all_files, specs, max_hits=int(os.getenv("LLM_PATTERN_MAX_HITS","24")))
for f in scan_hits:
if f not in picked:
picked.append(f)
# --- VIEW/LANG bias voor UI-label wijzigingen ---
2025-11-20 15:16:00 +00:00
# Pak de eerste quote uit de prompt als "oude" literal
qs = extract_quotes(st.user_goal) or []
old_lit = qs[0] if qs else None
def _contains_old(rel: str) -> bool:
if not old_lit:
return True # fallback: geen filtering
2025-11-06 13:42:26 +00:00
try:
2025-11-20 15:16:00 +00:00
txt = _read_text_file(Path(st.repo_path) / rel) or ""
return old_lit in txt
2025-11-06 13:42:26 +00:00
except Exception:
2025-11-20 15:16:00 +00:00
return False
2025-11-06 13:42:26 +00:00
2025-11-27 07:54:07 +00:00
view_files = [f for f in all_files
if f.startswith("resources/views/") and f.endswith(".blade.php")]
lang_files = [f for f in all_files
if f.startswith("resources/lang/") and (f.endswith(".json") or f.endswith(".php"))]
2025-11-06 13:42:26 +00:00
2025-11-27 07:54:07 +00:00
# Als we de oude literal kennen: eerst de files waar die echt in staat
if old_lit:
view_hits = [f for f in view_files if _contains_old(f)]
lang_hits = [f for f in lang_files if _contains_old(f)]
else:
view_hits = view_files
lang_hits = lang_files
2025-11-06 13:42:26 +00:00
2025-11-27 07:54:07 +00:00
# Zet de meest waarschijnlijke kandidaten vóóraan, behoud verder huidige volgorde
front = []
for lst in (view_hits, lang_hits):
for f in lst:
if f in all_files and f not in front:
front.append(f)
picked = list(dict.fromkeys(front + picked))[:MAX_FILES_DRYRUN]
2025-11-06 13:42:26 +00:00
# --- (optioneel) priors op basis van framework (je eerdere patch A/B) ---
# LLM priors + rule-based priors kun je hier behouden zoals je eerder hebt toegevoegd.
# --- NIEUW: Smart-RAG path selectie op repo-collectie ---
# 1) intent (voor file_hints) + query-expansion
logger.info("Smart RAG path select. 1) intent")
spec = await enrich_intent(_llm_call, [{"role":"user","content": st.user_goal}])
file_hints = (spec.get("file_hints") or [])
variants = await expand_queries(_llm_call, spec.get("task") or st.user_goal, k=2)
# 2) retrieval per variant met repo-filter & collectie van deze repo
logger.info("Smart RAG path select. 2) retrieval")
merged = []
for qv in variants:
2025-11-27 07:54:07 +00:00
use_collection = bool(st.collection_name)
2025-11-06 13:42:26 +00:00
part = await hybrid_retrieve(
_rag_query_internal,
qv,
2025-11-27 07:54:07 +00:00
repo=_clean_repo_arg(st.owner_repo) if not use_collection else None,
2025-11-06 13:42:26 +00:00
profile=None,
path_contains=(file_hints[0] if file_hints else None),
per_query_k=int(os.getenv("RAG_PER_QUERY_K","30")),
n_results=int(os.getenv("RAG_N_RESULTS","18")),
alpha=float(os.getenv("RAG_EMB_WEIGHT","0.6")),
2025-11-27 07:54:07 +00:00
collection_name=(st.collection_name if use_collection else None)
2025-11-06 13:42:26 +00:00
)
merged.extend(part)
# 3) naar unieke paden + sort op score
logger.info("Smart RAG path select. 3) unieke paden sort op score")
seen=set()
for r in sorted(merged, key=lambda x: x.get("score",0.0), reverse=True):
meta = r.get("metadata") or {}
rel = meta.get("path","")
if not rel or rel in seen:
continue
seen.add(rel)
if rel not in picked:
picked.append(rel)
# 4) Laravel neighbors (klein zetje, opt-in via env)
logger.info("Smart RAG path select. 4) Laravel neighbors")
if os.getenv("RAG_NEIGHBORS", "1").lower() not in ("0","false"):
add = []
for rel in picked[:8]:
# routes -> controllers
if rel in ("routes/web.php","routes/api.php"):
txt = (Path(st.repo_path)/rel).read_text(encoding="utf-8", errors="ignore")
2025-11-20 15:16:00 +00:00
for ctrl_path, _m in _laravel_pairs_from_route_text(txt):
if ctrl_path and ctrl_path not in picked and ctrl_path not in add:
add.append(ctrl_path)
2025-11-06 13:42:26 +00:00
# controllers -> views
if rel.startswith("app/Http/Controllers/") and rel.endswith(".php"):
txt = (Path(st.repo_path)/rel).read_text(encoding="utf-8", errors="ignore")
2025-11-20 15:16:00 +00:00
for v in _laravel_guess_view_paths_from_text(txt):
if v and v not in picked and v not in add:
add.append(v)
2025-11-06 13:42:26 +00:00
# Extra: neem kleine nabije partials/layouts mee (zelfde dir, ≤40KB)
more = []
for rel in (picked + add)[:8]:
if rel.endswith(".blade.php"):
d = (Path(st.repo_path) / rel).parent
try:
for bp in d.glob("*.blade.php"):
if bp.name == os.path.basename(rel):
continue
if bp.stat().st_size <= 40_000:
cand = str(bp.relative_to(Path(st.repo_path)))
if cand not in picked and cand not in add and cand not in more:
more.append(cand)
except Exception:
pass
picked = (picked + add + more)[:MAX_FILES_DRYRUN]
# 5) Literal-grep fallback: als de user een oud->nieuw wijziging impliceert, zoek de 'old' literal repo-breed
2025-11-20 15:16:00 +00:00
qs = extract_quotes(st.user_goal) or []
old = qs[0].strip() if qs and qs[0].strip() else None
if old:
grep_hits = _grep_repo_for_literal(Path(st.repo_path), old, limit=16)
2025-11-06 13:42:26 +00:00
for rel in grep_hits:
if rel in all_files and rel not in picked:
picked.append(rel)
# Keyword fallback alleen als we nog te weinig zeker zijn
top_conf = 0.0
try:
top_conf = max([r.get("score",0.0) for r in merged]) if merged else 0.0
except Exception:
pass
if len(picked) < MAX_FILES_DRYRUN and top_conf < float(os.getenv("RAG_FALLBACK_THRESHOLD","0.42")):
for rel, _s in simple_keyword_search(root, all_files, st.user_goal, limit=MAX_FILES_DRYRUN):
if rel not in picked: picked.append(rel)
# --- Gewogen her-ranking (Meili/embeddings/heuristiek/explicit) ---
explicit_all = extract_explicit_paths(st.user_goal) + _extract_explicit_paths_robust(st.user_goal)
explicit_all = [p.replace("\\","/").strip() for p in explicit_all]
# 1) verzamel meili/embeddings scores vanuit 'merged'
meili_scores = {}
for r in merged:
meta = (r or {}).get("metadata") or {}
rel = meta.get("path","")
if rel:
try:
sc = float(r.get("score", 0.0))
except Exception:
sc = 0.0
meili_scores[rel] = max(meili_scores.get(rel, 0.0), sc)
# 2) weeg en motiveer
cand_scores = {}
cand_why = {}
def _boost(rel: str, amt: float, why: str):
cand_scores[rel] = cand_scores.get(rel, 0.0) + float(amt)
if amt > 0:
cand_why[rel] = (cand_why.get(rel, "") + f"{why}; ").strip()
for rel in picked:
# Meili/embeddings top-hit
if rel in meili_scores:
_boost(rel, 0.55 * meili_scores[rel], "meili")
# pad-heuristiek
lo = rel.lower()
if lo.startswith("routes/"): _boost(rel, 0.08, "routes")
if lo.startswith("app/http/controllers/"): _boost(rel, 0.06, "controller")
if lo.startswith("resources/views/"): _boost(rel, 0.06, "view")
if lo.startswith("resources/lang/"): _boost(rel, 0.05, "lang")
# expliciet genoemd door user
if rel in explicit_all: _boost(rel, 0.20, "explicit")
# 2b) Graph-boost: BFS vanaf expliciete seeds (en evt. route-bestanden)
try:
seeds = [p for p in picked if p in explicit_all]
# heuristisch: als gebruiker over "route" praat, neem routes/web.php als seed
if any(k in st.user_goal.lower() for k in [" route", "routes", "/"]):
for rp in ["routes/web.php","routes/api.php"]:
if rp in picked and rp not in seeds:
seeds.append(rp)
if graph and seeds:
bfs = _graph_bfs_boosts(graph, seeds, max_depth=int(os.getenv("AGENT_GRAPH_MAX_DEPTH","3")))
for rel in picked:
if rel in bfs:
d, via = bfs[rel]
# afstand → boost: 0:0.08, 1:0.06, 2:0.03, 3:0.01
boost_map = {0:0.08, 1:0.06, 2:0.03, 3:0.01}
b = boost_map.get(min(d,3), 0.0)
if b > 0:
_boost(rel, b, f"graph:d={d} via {via}")
st.reasons[f"graph::{rel}"] = f"d={d}, via {via}"
except Exception:
pass
# 2c) Tree-summary boost: hits van prompt-keywords in samenvatting
try:
hints = extract_word_hints(st.user_goal) or []
if hints and tree_summ:
lo_hints = [h.lower() for h in hints[:8]]
for rel in picked:
s = (tree_summ.get(rel) or "").lower()
if not s:
continue
hits = sum(1 for h in lo_hints if h in s)
if hits:
_boost(rel, min(0.04, 0.01 * hits), f"tree:{hits}hit")
if hits >= 2:
st.reasons[f"tree::{rel}"] = tree_summ.get(rel, "")[:200]
except Exception:
pass
# 3) sorteer op totale score (desc)
picked.sort(key=lambda p: cand_scores.get(p, 0.0), reverse=True)
# 4) leg motivatie vast voor UI/preview
for rel in picked[:MAX_FILES_DRYRUN]:
if cand_scores.get(rel, 0.0) > 0:
st.reasons[f"rank::{rel}"] = f"{cand_scores[rel]:.2f} via {cand_why.get(rel,'')}"
st.candidate_paths = picked[:MAX_FILES_DRYRUN]
logger.info("CANDIDATES (explicit first, capped=%d): %s", MAX_FILES_DRYRUN, st.candidate_paths)
if not len(st.candidate_paths)>0:
st.stage = "ASK"
return _with_preview("\n".join(progress + ["Geen duidelijke kandidaten. Noem een pagina/onderdeel of (optioneel) bestandsnaam."]), st)
progress.append("Kandidaten:\n" + "\n".join([f"- {rel}" for rel in st.candidate_paths]))
logger.info("Kandidaten gevonden!")
# DRY-RUN
logger.info("dry-run")
try:
proposed, diffs, reasons = await propose_patches_without_apply(st.repo_path, st.candidate_paths, st.user_goal)
if not proposed:
# ---- T3: automatische recovery (éénmalig) ----
if not st.recovery_attempted:
st.recovery_attempted = True
try:
new_list, dbg = await _recovery_expand_candidates(
Path(st.repo_path), list_repo_files(Path(st.repo_path)),
st.user_goal, st.candidate_paths, last_reason="no_proposal_after_dryrun"
)
st.candidate_paths = new_list
st.reasons["recovery_note"] = dbg.get("recovery_plan",{}).get("note","")
# opnieuw proberen
proposed2, diffs2, reasons2 = await propose_patches_without_apply(st.repo_path, st.candidate_paths, st.user_goal)
if proposed2:
st.proposed_patches = proposed2
st.reasons.update(reasons2 or {})
st.stage = "APPLY"
preview = []
for rel in list(diffs2.keys())[:3]:
why = st.reasons.get(rel, "")
preview.append(f"### {rel}\n```\n{diffs2[rel]}\n```\n**Waarom**: {why}")
more = "" if len(diffs2) <= 3 else f"\n(Plus {len(diffs2)-3} extra diff(s).)"
base = "\n".join(progress + [
"**Dry-run voorstel (na recovery):**",
"\n\n".join(preview) + more,
"\nTyp **'Akkoord apply'** om te schrijven & pushen, of geef feedback."
])
return _with_preview(base, st, header="--- SMART-RAG + recovery notities ---")
except Exception as e:
logger.warning("WARN:agent_repo:recovery attempt failed: %s", e)
# geen succes → val terug op bestaande melding
st.stage = "PROPOSE_DIFF_DRYRUN"
return "\n".join(progress + ["Dry-run: geen bruikbaar voorstel met deze kandidaten. Geef extra hint (pagina/ term)."])
st.proposed_patches = proposed
st.reasons = reasons
st.stage = "APPLY"
preview = []
for rel in list(diffs.keys())[:3]:
why = reasons.get(rel, "")
preview.append(f"### {rel}\n```\n{diffs[rel]}\n```\n**Waarom**: {why}")
more = "" if len(diffs) <= 3 else f"\n(Plus {len(diffs)-3} extra diff(s).)"
base= "\n".join(progress + [
"**Dry-run voorstel (geen writes):**",
"\n\n".join(preview) + more,
"\nTyp **'Akkoord apply'** om te schrijven & pushen, of geef feedback."
])
return _with_preview(base, st, header="--- SMART-RAG contextnotities ---")
except Exception as e:
logger.exception("ERROR:agent_repo:PROPOSE_DIFF_DRYRUN failed")
st.stage = "PROPOSE_DIFF_DRYRUN"
return "\n".join(progress + [f"Dry-run mislukte: {e}"])
if st.stage == "PROPOSE_DIFF_DRYRUN":
logger.info("Stage PROPOSE_DIFF_DRYRUN")
root = Path(st.repo_path)
all_files = list_repo_files(root)
added = []
for pth in extract_explicit_paths(user_last):
if pth in all_files and pth not in st.candidate_paths:
added.append(pth)
else:
best = best_path_by_basename(all_files, pth)
if best and best not in st.candidate_paths: added.append(best)
st.candidate_paths = (added + st.candidate_paths)[:MAX_FILES_DRYRUN]
# extra: grep op 'old' literal uit user_goal om kandidaten te verrijken
2025-11-20 15:16:00 +00:00
qs = extract_quotes(st.user_goal) or []
old = qs[0].strip() if qs and qs[0].strip() else None
2025-11-06 13:42:26 +00:00
if old:
for rel in _grep_repo_for_literal(root, old, limit=16):
if rel in all_files and rel not in st.candidate_paths:
st.candidate_paths.append(rel)
2025-11-20 15:16:00 +00:00
2025-11-06 13:42:26 +00:00
try:
proposed, diffs, reasons = await propose_patches_without_apply(st.repo_path, st.candidate_paths, st.user_goal)
if not proposed:
if not st.recovery_attempted:
st.recovery_attempted = True
try:
new_list, dbg = await _recovery_expand_candidates(
Path(st.repo_path), list_repo_files(Path(st.repo_path)),
st.user_goal, st.candidate_paths, last_reason="no_proposal_in_propose_diff"
)
st.candidate_paths = new_list
st.reasons["recovery_note"] = dbg.get("recovery_plan",{}).get("note","")
# direct nog een poging
proposed2, diffs2, reasons2 = await propose_patches_without_apply(st.repo_path, st.candidate_paths, st.user_goal)
if proposed2:
st.proposed_patches = proposed2
st.reasons.update(reasons2 or {})
st.stage = "APPLY"
preview = []
for rel in list(diffs2.keys())[:3]:
why = st.reasons.get(rel, "")
preview.append(f"### {rel}\n```\n{diffs2[rel]}\n```\n**Waarom**: {why}")
more = "" if len(diffs2) <= 3 else f"\n(Plus {len(diffs2)-3} extra diff(s).)"
base = ("**Dry-run voorstel (na recovery):**\n" +
"\n\n".join(preview) + more +
"\n\nTyp **'Akkoord apply'** om te schrijven & pushen, of geef feedback.")
return _with_preview(base, st, header="--- SMART-RAG + recovery notities ---")
except Exception as e:
logger.warning("WARN:agent_repo:recovery in PROPOSE_DIFF failed: %s", e)
return _with_preview("Nog geen bruikbaar voorstel. Noem exact bestand/pagina of plak relevante code.", st)
st.proposed_patches = proposed
st.reasons = reasons
st.stage = "APPLY"
preview = []
for rel in list(diffs.keys())[:3]:
why = reasons.get(rel, "")
preview.append(f"### {rel}\n```\n{diffs[rel]}\n```\n**Waarom**: {why}")
more = "" if len(diffs) <= 3 else f"\n(Plus {len(diffs)-3} extra diff(s).)"
base = ("**Dry-run voorstel (geen writes):**\n" +
"\n\n".join(preview) + more +
"\n\nTyp **'Akkoord apply'** om te schrijven & pushen, of geef feedback.")
return _with_preview(base, st, header="--- SMART-RAG contextnotities ---")
except Exception as e:
logger.exception("ERROR:agent_repo:PROPOSE_DIFF_DRYRUN retry failed")
return _with_preview(f"Dry-run mislukte: {e}", st)
def _apply():
if not (("akkoord" in user_last_lower) and ("apply" in user_last_lower)):
return "Typ **'Akkoord apply'** om de dry-run wijzigingen te schrijven & pushen."
try:
repo_path = _get_git_repo(st.repo_url, st.branch_base)
import git
repo = git.Repo(repo_path)
short = re.sub(r'[^a-z0-9\-]+','-', st.user_goal.lower()).strip("-")
st.new_branch = f"task/{short[:40]}-{time.strftime('%Y%m%d-%H%M%S')}"
repo.git.checkout("-b", st.new_branch)
changed = []
for rel, content in st.proposed_patches.items():
f = Path(repo_path) / rel
f.parent.mkdir(parents=True, exist_ok=True)
f.write_text(content, encoding="utf-8")
changed.append(str(f))
if not changed:
return "Er waren geen wijzigingen om te commiten."
repo.index.add(changed)
msg = (f"feat: {st.user_goal}\n\nScope:\n" +
"\n".join([f"- {Path(c).relative_to(repo_path)}" for c in changed]) +
"\n\nRationale (samengevat):\n" +
"\n".join([f"- {k}: {v}" for k,v in st.reasons.items()]) +
"\n\nCo-authored-by: repo-agent\n")
repo.index.commit(msg)
repo.remotes.origin.push(refspec=f"{st.new_branch}:{st.new_branch}")
st.stage = "DONE"
return f"✅ Branch aangemaakt en gepusht: `{st.new_branch}`. Maak nu je PR in Gitea."
except Exception as e:
logger.exception("ERROR:agent_repo:APPLY failed")
st.stage = "PROPOSE_DIFF_DRYRUN"
return f"Apply/push mislukte: {e}"
if st.stage == "APPLY":
logger.info("Stage APPLY")
return await run_in_threadpool(_apply)
if st.stage == "DONE":
logger.info("Stage DONE")
st.smart_preview = ""
return f"Klaar. Branch: `{st.new_branch}`."
return "Interne status onduidelijk; begin opnieuw of herformuleer je doel."