rag normal text upsert added

This commit is contained in:
admin 2026-02-05 16:56:01 +01:00
parent 1aaf0d013a
commit 1592173cf7

422
app.py
View File

@ -478,6 +478,20 @@ MEILI_API_KEY = os.getenv("MEILI_API_KEY", "0xipOmfgi_zMgdFplSdv7L8mlx0RPMQCNxV
MEILI_INDEX = os.getenv("MEILI_INDEX", "code_chunks")
MEILI_ENABLED = bool(MEILI_URL)
# --- Chat memory (hybride RAG: Meili + Chroma) ---
MEILI_MEMORY_INDEX = os.getenv("MEILI_MEMORY_INDEX", "chat_memory")
MEMORY_COLLECTION = os.getenv("MEMORY_COLLECTION", "chat_memory")
MEMORY_CHUNK_CHARS = int(os.getenv("MEMORY_CHUNK_CHARS", "3500"))
MEMORY_OVERLAP_CHARS = int(os.getenv("MEMORY_OVERLAP_CHARS", "300"))
MEMORY_EMB_WEIGHT = float(os.getenv("MEMORY_EMB_WEIGHT", os.getenv("RAG_EMB_WEIGHT", "0.6")))
MEMORY_RECENCY_BIAS = float(os.getenv("MEMORY_RECENCY_BIAS", "0.10"))
MEMORY_HALF_LIFE_SEC = int(os.getenv("MEMORY_HALF_LIFE_SEC", "259200")) # 3 dagen
MEMORY_MAX_TOTAL_CHARS = int(os.getenv("MEMORY_MAX_TOTAL_CHARS", "12000"))
MEMORY_CLIP_CHARS = int(os.getenv("MEMORY_CLIP_CHARS", "1200"))
# Repo summaries (cache on demand)
_SUMMARY_DIR = os.path.join("/rag_db", "repo_summaries")
os.makedirs(_SUMMARY_DIR, exist_ok=True)
@ -769,6 +783,25 @@ def _collection_add(collection, documents: list[str], metadatas: list[dict], ids
break
return out
def _sanitize_meta(meta: dict) -> dict:
out = {}
for k, v in (meta or {}).items():
if v is None:
continue # DROP None: Rust Chroma accepteert dit niet
if isinstance(v, (str, int, float, bool)):
out[k] = v
elif isinstance(v, (list, tuple, set)):
if not v:
continue
if all(isinstance(x, str) for x in v):
out[k] = ",".join(v)
else:
out[k] = json.dumps(list(v), ensure_ascii=False)
elif isinstance(v, dict):
out[k] = json.dumps(v, ensure_ascii=False)
else:
out[k] = str(v)
return out
augmented_docs = []
metadatas_mod = []
for doc, meta in zip(documents, metadatas):
@ -777,11 +810,13 @@ def _collection_add(collection, documents: list[str], metadatas: list[dict], ids
syms = _symbol_hints(doc)
header = f"FILE:{path} | LANG:{ext} | SYMBOLS:{','.join(syms)}\n"
augmented_docs.append(header + (doc or ""))
m = dict(meta or {})
raw = dict(meta or {})
m = _sanitize_meta(dict(meta or {}))
if syms:
m["symbols"] = ",".join(syms[:8])
metadatas_mod.append(m)
embs = _EMBEDDER.embed_documents(augmented_docs)
collection.add(documents=augmented_docs, embeddings=embs, metadatas=metadatas_mod, ids=ids)
@ -2168,6 +2203,221 @@ async def _execute_tool(name: str, args: dict) -> dict:
return out
except Exception as e:
return {"error": f"Error for functioncall '{name}', while doing repo_query. errortext: {str(e)}"}
if name == "memory_upsert":
namespace = (args.get("namespace") or "").strip()
if not namespace:
return {"error": "memory_upsert: namespace is required"}
# accepteer: items[] of shorthand text/role
items = args.get("items")
if not items:
txt = (args.get("text") or "").strip()
if not txt:
return {"error": "memory_upsert: provide items[] or text"}
items = [{
"role": args.get("role", "user"),
"text": txt,
"ts_unix": args.get("ts_unix"),
"source": args.get("source", "openwebui"),
"tags": args.get("tags") or [],
"meta": {}
}]
chunk_chars = int(args.get("chunk_chars", MEMORY_CHUNK_CHARS))
overlap = int(args.get("overlap_chars", MEMORY_OVERLAP_CHARS))
collection_base = (args.get("collection_name") or MEMORY_COLLECTION).strip() or MEMORY_COLLECTION
collection_eff = _collection_effective(collection_base)
col = _get_collection(collection_eff)
docs_for_meili = []
chroma_docs = []
chroma_metas = []
chroma_ids = []
now = int(time.time())
for it in items:
role = (it.get("role") or "user").strip()
text0 = (it.get("text") or "").strip()
if not text0:
continue
ts_unix = int(it.get("ts_unix") or now)
source = (it.get("source") or "openwebui")
turn_id = it.get("turn_id")
tags = it.get("tags") or []
message_id = it.get("message_id") or uuid.uuid4().hex
chunks = _chunk_text(text0, chunk_chars=chunk_chars, overlap=overlap) if len(text0) > chunk_chars else [text0]
for ci, ch in enumerate(chunks):
chunk_id = f"{message_id}:{ci}"
meta = {
"namespace": namespace,
"role": role,
"ts_unix": ts_unix,
"source": source,
"turn_id": turn_id,
"tags": tags,
"message_id": message_id,
"chunk_id": chunk_id,
"chunk_index": ci,
"id": chunk_id,
}
chroma_docs.append(ch)
chroma_metas.append(meta)
chroma_ids.append(chunk_id)
docs_for_meili.append({
"id": chunk_id,
"namespace": namespace,
"role": role,
"ts_unix": ts_unix,
"source": source,
"turn_id": turn_id,
"tags": tags,
"message_id": message_id,
"chunk_index": ci,
"text": ch,
})
if chroma_docs:
_collection_add(col, chroma_docs, chroma_metas, chroma_ids)
if MEILI_ENABLED and docs_for_meili:
await _meili_memory_upsert(docs_for_meili)
return {
"status": "ok",
"namespace": namespace,
"collection_effective": collection_eff,
"chunks_indexed": len(chroma_docs),
"meili_indexed": bool(MEILI_ENABLED and docs_for_meili)
}
if name == "memory_query":
namespace = (args.get("namespace") or "").strip()
query = (args.get("query") or "").strip()
if not namespace or not query:
return {"error": "memory_query: namespace and query are required"}
k = int(args.get("k", 12))
meili_limit = int(args.get("meili_limit", 40))
chroma_limit = int(args.get("chroma_limit", 40))
max_total = int(args.get("max_total_chars", MEMORY_MAX_TOTAL_CHARS))
clip_chars = int(args.get("clip_chars", MEMORY_CLIP_CHARS))
collection_base = (args.get("collection_name") or MEMORY_COLLECTION).strip() or MEMORY_COLLECTION
collection_eff = _collection_effective(collection_base)
col = _get_collection(collection_eff)
# --- Chroma candidates ---
q_emb = _EMBEDDER.embed_query(query)
where = {"namespace": {"$eq": namespace}}
res = col.query(
query_embeddings=[q_emb],
n_results=max(k, chroma_limit),
where=where,
include=["metadatas", "documents", "distances"]
)
docs = (res.get("documents") or [[]])[0]
metas = (res.get("metadatas") or [[]])[0]
dists = (res.get("distances") or [[]])[0]
cands = []
for doc, meta, dist in zip(docs, metas, dists):
meta = meta or {}
cid = meta.get("chunk_id") or meta.get("id") or ""
emb_sim = 1.0 / (1.0 + float(dist or 0.0))
cands.append({
"id": cid,
"text": doc or "",
"meta": meta,
"emb_sim": emb_sim,
"bm25": 0.0,
"score": 0.0
})
# --- Meili hits ---
meili_hits = await _meili_memory_search(namespace, query, limit=meili_limit) if MEILI_ENABLED else []
bm25_map = {}
for h in meili_hits:
if h.get("id"):
bm25_map[h["id"]] = float(h.get("score") or 0.0)
# voeg meili-only hits toe als ze niet al in chroma zitten
cand_ids = set([c["id"] for c in cands if c.get("id")])
for h in meili_hits:
hid = h.get("id") or ""
if hid and hid not in cand_ids:
cands.append({
"id": hid,
"text": (h.get("text") or ""),
"meta": (h.get("meta") or {}),
"emb_sim": 0.0,
"bm25": float(h.get("score") or 0.0),
"score": 0.0
})
cand_ids.add(hid)
# bm25 invullen voor bestaande cands
for c in cands:
cid = c.get("id") or ""
if cid in bm25_map:
c["bm25"] = bm25_map[cid]
# normaliseer bm25
bm25_scores = [c["bm25"] for c in cands]
if bm25_scores:
mn, mx = min(bm25_scores), max(bm25_scores)
def _norm(s): return (s - mn) / (mx - mn) if mx > mn else 0.0
else:
def _norm(s): return 0.0
alpha = float(MEMORY_EMB_WEIGHT)
now = int(time.time())
half = max(1, int(MEMORY_HALF_LIFE_SEC))
bias = float(MEMORY_RECENCY_BIAS)
for c in cands:
bm25_n = _norm(c["bm25"])
base = alpha * float(c["emb_sim"]) + (1.0 - alpha) * bm25_n
ts = int((c.get("meta") or {}).get("ts_unix") or now)
age = max(0, now - ts)
rec = half / (half + age) # 0..1
c["score"] = base + bias * rec
ranked = sorted(cands, key=lambda x: x["score"], reverse=True)
# clip + max_total_chars
out = []
used = 0
for c in ranked:
txt = (c.get("text") or "")
if clip_chars and len(txt) > clip_chars:
txt = txt[:clip_chars] + ""
if max_total and used + len(txt) > max_total:
continue
used += len(txt)
out.append({
"id": c.get("id"),
"score": float(c.get("score") or 0.0),
"text": txt,
"meta": c.get("meta") or {}
})
if len(out) >= k:
break
return {
"namespace": namespace,
"query": query,
"k": k,
"collection_effective": collection_eff,
"results": out,
"stats": {
"meili_hits": len(meili_hits),
"chroma_hits": len(docs),
"returned_chars": used
}
}
# Console tools
@ -2352,6 +2602,58 @@ TOOLS_REGISTRY = {
"required":["query","repo"]
}
},
"memory_upsert": {
"description": "Sla chat/gesprekstekst op in memory (hybride RAG: Meili + Chroma). Gebruik namespace=chat_id.",
"parameters": {
"type": "object",
"properties": {
"namespace": {"type": "string"},
"items": {
"type": "array",
"items": {
"type": "object",
"properties": {
"message_id": {"type": "string"},
"turn_id": {"type": ["string","integer","null"]},
"role": {"type": "string", "default": "user"},
"text": {"type": "string"},
"ts_unix": {"type": ["integer","null"]},
"source": {"type": ["string","null"], "default": "openwebui"},
"tags": {"type": "array", "items": {"type":"string"}},
"meta": {"type": "object"}
},
"required": ["role","text"]
}
},
"text": {"type": "string"},
"role": {"type": "string", "default": "user"},
"ts_unix": {"type": ["integer","null"]},
"source": {"type": ["string","null"], "default": "openwebui"},
"tags": {"type": "array", "items": {"type":"string"}},
"chunk_chars": {"type": "integer", "default": 3500},
"overlap_chars": {"type": "integer", "default": 300},
"collection_name": {"type": "string", "default": "chat_memory"}
},
"required": ["namespace"]
}
},
"memory_query": {
"description": "Zoek relevante chat-memory snippets (hybride Meili + Chroma) binnen namespace.",
"parameters": {
"type": "object",
"properties": {
"namespace": {"type": "string"},
"query": {"type": "string"},
"k": {"type": "integer", "default": 12},
"meili_limit": {"type": "integer", "default": 40},
"chroma_limit": {"type": "integer", "default": 40},
"collection_name": {"type": "string", "default": "chat_memory"},
"max_total_chars": {"type": "integer", "default": 12000},
"clip_chars": {"type": "integer", "default": 1200}
},
"required": ["namespace","query"]
}
},
"web_search_xng": {
"description": "Search the web using SearXNG and get the content of the relevant pages.",
"parameters": {
@ -3624,6 +3926,124 @@ async def _meili_search_internal(query: str, *, repo_full: str | None, branch: s
best[p] = h
return list(best.values())
_MEILI_MEM_READY = False
_MEILI_MEM_LOCK = asyncio.Lock()
def _meili_headers() -> dict:
headers = {"Content-Type": "application/json"}
if MEILI_API_KEY:
headers["Authorization"] = f"Bearer {MEILI_API_KEY}"
headers["X-Meili-API-Key"] = MEILI_API_KEY
return headers
async def _meili_req(method: str, path: str, json_body=None, timeout: float = 15.0):
base = (MEILI_URL or "").rstrip("/")
url = f"{base}{path}"
client = app.state.HTTPX_PROXY if hasattr(app.state, "HTTPX_PROXY") else httpx.AsyncClient()
close_after = not hasattr(app.state, "HTTPX_PROXY")
try:
r = await client.request(
method,
url,
headers=_meili_headers(),
json=json_body,
timeout=httpx.Timeout(timeout, connect=min(5.0, timeout)),
)
return r
finally:
if close_after:
await client.aclose()
async def _ensure_meili_memory_index() -> bool:
"""Zorg dat chat_memory index bestaat + goede settings heeft (filter/sort/searchable)."""
global _MEILI_MEM_READY
if not MEILI_URL:
return False
if _MEILI_MEM_READY:
return True
async with _MEILI_MEM_LOCK:
if _MEILI_MEM_READY:
return True
uid = MEILI_MEMORY_INDEX
# 1) Bestaat index?
r = await _meili_req("GET", f"/indexes/{uid}", timeout=5.0)
if r.status_code == 404:
_ = await _meili_req("POST", "/indexes", json_body={"uid": uid, "primaryKey": "id"}, timeout=10.0)
# 2) Settings (idempotent)
settings = {
"filterableAttributes": ["namespace", "role", "ts_unix", "source", "tags", "turn_id", "message_id"],
"sortableAttributes": ["ts_unix"],
"searchableAttributes": ["text"],
"displayedAttributes": ["*"],
}
_ = await _meili_req("PATCH", f"/indexes/{uid}/settings", json_body=settings, timeout=10.0)
_MEILI_MEM_READY = True
return True
async def _meili_memory_upsert(docs: list[dict]) -> None:
if not docs or not MEILI_URL:
return
ok = await _ensure_meili_memory_index()
if not ok:
return
uid = MEILI_MEMORY_INDEX
path = f"/indexes/{uid}/documents?primaryKey=id"
CH = 500
for i in range(0, len(docs), CH):
chunk = docs[i:i+CH]
r = await _meili_req("POST", path, json_body=chunk, timeout=30.0)
# geen hard fail; toolserver moet niet crashen door Meili
if r.status_code >= 400:
logger.warning("Meili memory upsert failed: %s %s", r.status_code, r.text[:400])
async def _meili_memory_search(namespace: str, query: str, limit: int = 40) -> list[dict]:
if not MEILI_URL:
return []
ok = await _ensure_meili_memory_index()
if not ok:
return []
uid = MEILI_MEMORY_INDEX
# filter strikt op namespace
filt = f'namespace = "{namespace}"'
body = {
"q": query,
"limit": int(limit),
"filter": filt,
"showRankingScore": True,
}
r = await _meili_req("POST", f"/indexes/{uid}/search", json_body=body, timeout=20.0)
if r.status_code >= 400:
logger.warning("Meili memory search failed: %s %s", r.status_code, r.text[:400])
return []
data = r.json()
out = []
for h in data.get("hits", []) or []:
out.append({
"id": h.get("id"),
"text": h.get("text") or "",
"score": float(h.get("_rankingScore") or 0.0),
"meta": {
"namespace": h.get("namespace"),
"role": h.get("role"),
"ts_unix": h.get("ts_unix"),
"source": h.get("source"),
"turn_id": h.get("turn_id"),
"tags": h.get("tags") or [],
"message_id": h.get("message_id"),
"chunk_index": h.get("chunk_index"),
}
})
return out
async def _search_first_candidates(repo_url: str, branch: str, query: str, explicit_paths: list[str] | None = None, limit: int = 50) -> list[str]:
"""
Voorkeursvolgorde: expliciete paden Meilisearch grep (laatste redmiddel) RAG op bestandsniveau