diff --git a/app.py b/app.py index 1a9574f..3667cfc 100644 --- a/app.py +++ b/app.py @@ -478,6 +478,20 @@ MEILI_API_KEY = os.getenv("MEILI_API_KEY", "0xipOmfgi_zMgdFplSdv7L8mlx0RPMQCNxV MEILI_INDEX = os.getenv("MEILI_INDEX", "code_chunks") MEILI_ENABLED = bool(MEILI_URL) +# --- Chat memory (hybride RAG: Meili + Chroma) --- +MEILI_MEMORY_INDEX = os.getenv("MEILI_MEMORY_INDEX", "chat_memory") +MEMORY_COLLECTION = os.getenv("MEMORY_COLLECTION", "chat_memory") + +MEMORY_CHUNK_CHARS = int(os.getenv("MEMORY_CHUNK_CHARS", "3500")) +MEMORY_OVERLAP_CHARS = int(os.getenv("MEMORY_OVERLAP_CHARS", "300")) + +MEMORY_EMB_WEIGHT = float(os.getenv("MEMORY_EMB_WEIGHT", os.getenv("RAG_EMB_WEIGHT", "0.6"))) +MEMORY_RECENCY_BIAS = float(os.getenv("MEMORY_RECENCY_BIAS", "0.10")) +MEMORY_HALF_LIFE_SEC = int(os.getenv("MEMORY_HALF_LIFE_SEC", "259200")) # 3 dagen +MEMORY_MAX_TOTAL_CHARS = int(os.getenv("MEMORY_MAX_TOTAL_CHARS", "12000")) +MEMORY_CLIP_CHARS = int(os.getenv("MEMORY_CLIP_CHARS", "1200")) + + # Repo summaries (cache on demand) _SUMMARY_DIR = os.path.join("/rag_db", "repo_summaries") os.makedirs(_SUMMARY_DIR, exist_ok=True) @@ -768,7 +782,26 @@ def _collection_add(collection, documents: list[str], metadatas: list[dict], ids if len(out) >= 6: break return out - + + def _sanitize_meta(meta: dict) -> dict: + out = {} + for k, v in (meta or {}).items(): + if v is None: + continue # DROP None: Rust Chroma accepteert dit niet + if isinstance(v, (str, int, float, bool)): + out[k] = v + elif isinstance(v, (list, tuple, set)): + if not v: + continue + if all(isinstance(x, str) for x in v): + out[k] = ",".join(v) + else: + out[k] = json.dumps(list(v), ensure_ascii=False) + elif isinstance(v, dict): + out[k] = json.dumps(v, ensure_ascii=False) + else: + out[k] = str(v) + return out augmented_docs = [] metadatas_mod = [] for doc, meta in zip(documents, metadatas): @@ -777,11 +810,13 @@ def _collection_add(collection, documents: list[str], metadatas: list[dict], ids syms = _symbol_hints(doc) header = f"FILE:{path} | LANG:{ext} | SYMBOLS:{','.join(syms)}\n" augmented_docs.append(header + (doc or "")) - m = dict(meta or {}) + raw = dict(meta or {}) + m = _sanitize_meta(dict(meta or {})) if syms: m["symbols"] = ",".join(syms[:8]) metadatas_mod.append(m) + embs = _EMBEDDER.embed_documents(augmented_docs) collection.add(documents=augmented_docs, embeddings=embs, metadatas=metadatas_mod, ids=ids) @@ -2168,6 +2203,221 @@ async def _execute_tool(name: str, args: dict) -> dict: return out except Exception as e: return {"error": f"Error for functioncall '{name}', while doing repo_query. errortext: {str(e)}"} + if name == "memory_upsert": + namespace = (args.get("namespace") or "").strip() + if not namespace: + return {"error": "memory_upsert: namespace is required"} + + # accepteer: items[] of shorthand text/role + items = args.get("items") + if not items: + txt = (args.get("text") or "").strip() + if not txt: + return {"error": "memory_upsert: provide items[] or text"} + items = [{ + "role": args.get("role", "user"), + "text": txt, + "ts_unix": args.get("ts_unix"), + "source": args.get("source", "openwebui"), + "tags": args.get("tags") or [], + "meta": {} + }] + + chunk_chars = int(args.get("chunk_chars", MEMORY_CHUNK_CHARS)) + overlap = int(args.get("overlap_chars", MEMORY_OVERLAP_CHARS)) + collection_base = (args.get("collection_name") or MEMORY_COLLECTION).strip() or MEMORY_COLLECTION + collection_eff = _collection_effective(collection_base) + col = _get_collection(collection_eff) + + docs_for_meili = [] + chroma_docs = [] + chroma_metas = [] + chroma_ids = [] + + now = int(time.time()) + for it in items: + role = (it.get("role") or "user").strip() + text0 = (it.get("text") or "").strip() + if not text0: + continue + ts_unix = int(it.get("ts_unix") or now) + source = (it.get("source") or "openwebui") + turn_id = it.get("turn_id") + tags = it.get("tags") or [] + message_id = it.get("message_id") or uuid.uuid4().hex + + chunks = _chunk_text(text0, chunk_chars=chunk_chars, overlap=overlap) if len(text0) > chunk_chars else [text0] + for ci, ch in enumerate(chunks): + chunk_id = f"{message_id}:{ci}" + meta = { + "namespace": namespace, + "role": role, + "ts_unix": ts_unix, + "source": source, + "turn_id": turn_id, + "tags": tags, + "message_id": message_id, + "chunk_id": chunk_id, + "chunk_index": ci, + "id": chunk_id, + } + + chroma_docs.append(ch) + chroma_metas.append(meta) + chroma_ids.append(chunk_id) + + docs_for_meili.append({ + "id": chunk_id, + "namespace": namespace, + "role": role, + "ts_unix": ts_unix, + "source": source, + "turn_id": turn_id, + "tags": tags, + "message_id": message_id, + "chunk_index": ci, + "text": ch, + }) + + if chroma_docs: + _collection_add(col, chroma_docs, chroma_metas, chroma_ids) + + if MEILI_ENABLED and docs_for_meili: + await _meili_memory_upsert(docs_for_meili) + + return { + "status": "ok", + "namespace": namespace, + "collection_effective": collection_eff, + "chunks_indexed": len(chroma_docs), + "meili_indexed": bool(MEILI_ENABLED and docs_for_meili) + } + + if name == "memory_query": + namespace = (args.get("namespace") or "").strip() + query = (args.get("query") or "").strip() + if not namespace or not query: + return {"error": "memory_query: namespace and query are required"} + + k = int(args.get("k", 12)) + meili_limit = int(args.get("meili_limit", 40)) + chroma_limit = int(args.get("chroma_limit", 40)) + max_total = int(args.get("max_total_chars", MEMORY_MAX_TOTAL_CHARS)) + clip_chars = int(args.get("clip_chars", MEMORY_CLIP_CHARS)) + + collection_base = (args.get("collection_name") or MEMORY_COLLECTION).strip() or MEMORY_COLLECTION + collection_eff = _collection_effective(collection_base) + col = _get_collection(collection_eff) + + # --- Chroma candidates --- + q_emb = _EMBEDDER.embed_query(query) + where = {"namespace": {"$eq": namespace}} + res = col.query( + query_embeddings=[q_emb], + n_results=max(k, chroma_limit), + where=where, + include=["metadatas", "documents", "distances"] + ) + docs = (res.get("documents") or [[]])[0] + metas = (res.get("metadatas") or [[]])[0] + dists = (res.get("distances") or [[]])[0] + + cands = [] + for doc, meta, dist in zip(docs, metas, dists): + meta = meta or {} + cid = meta.get("chunk_id") or meta.get("id") or "" + emb_sim = 1.0 / (1.0 + float(dist or 0.0)) + cands.append({ + "id": cid, + "text": doc or "", + "meta": meta, + "emb_sim": emb_sim, + "bm25": 0.0, + "score": 0.0 + }) + + # --- Meili hits --- + meili_hits = await _meili_memory_search(namespace, query, limit=meili_limit) if MEILI_ENABLED else [] + bm25_map = {} + for h in meili_hits: + if h.get("id"): + bm25_map[h["id"]] = float(h.get("score") or 0.0) + + # voeg meili-only hits toe als ze niet al in chroma zitten + cand_ids = set([c["id"] for c in cands if c.get("id")]) + for h in meili_hits: + hid = h.get("id") or "" + if hid and hid not in cand_ids: + cands.append({ + "id": hid, + "text": (h.get("text") or ""), + "meta": (h.get("meta") or {}), + "emb_sim": 0.0, + "bm25": float(h.get("score") or 0.0), + "score": 0.0 + }) + cand_ids.add(hid) + + # bm25 invullen voor bestaande cands + for c in cands: + cid = c.get("id") or "" + if cid in bm25_map: + c["bm25"] = bm25_map[cid] + + # normaliseer bm25 + bm25_scores = [c["bm25"] for c in cands] + if bm25_scores: + mn, mx = min(bm25_scores), max(bm25_scores) + def _norm(s): return (s - mn) / (mx - mn) if mx > mn else 0.0 + else: + def _norm(s): return 0.0 + + alpha = float(MEMORY_EMB_WEIGHT) + now = int(time.time()) + half = max(1, int(MEMORY_HALF_LIFE_SEC)) + bias = float(MEMORY_RECENCY_BIAS) + + for c in cands: + bm25_n = _norm(c["bm25"]) + base = alpha * float(c["emb_sim"]) + (1.0 - alpha) * bm25_n + ts = int((c.get("meta") or {}).get("ts_unix") or now) + age = max(0, now - ts) + rec = half / (half + age) # 0..1 + c["score"] = base + bias * rec + + ranked = sorted(cands, key=lambda x: x["score"], reverse=True) + + # clip + max_total_chars + out = [] + used = 0 + for c in ranked: + txt = (c.get("text") or "") + if clip_chars and len(txt) > clip_chars: + txt = txt[:clip_chars] + " …" + if max_total and used + len(txt) > max_total: + continue + used += len(txt) + out.append({ + "id": c.get("id"), + "score": float(c.get("score") or 0.0), + "text": txt, + "meta": c.get("meta") or {} + }) + if len(out) >= k: + break + + return { + "namespace": namespace, + "query": query, + "k": k, + "collection_effective": collection_eff, + "results": out, + "stats": { + "meili_hits": len(meili_hits), + "chroma_hits": len(docs), + "returned_chars": used + } + } # Console tools @@ -2352,6 +2602,58 @@ TOOLS_REGISTRY = { "required":["query","repo"] } }, + "memory_upsert": { + "description": "Sla chat/gesprekstekst op in memory (hybride RAG: Meili + Chroma). Gebruik namespace=chat_id.", + "parameters": { + "type": "object", + "properties": { + "namespace": {"type": "string"}, + "items": { + "type": "array", + "items": { + "type": "object", + "properties": { + "message_id": {"type": "string"}, + "turn_id": {"type": ["string","integer","null"]}, + "role": {"type": "string", "default": "user"}, + "text": {"type": "string"}, + "ts_unix": {"type": ["integer","null"]}, + "source": {"type": ["string","null"], "default": "openwebui"}, + "tags": {"type": "array", "items": {"type":"string"}}, + "meta": {"type": "object"} + }, + "required": ["role","text"] + } + }, + "text": {"type": "string"}, + "role": {"type": "string", "default": "user"}, + "ts_unix": {"type": ["integer","null"]}, + "source": {"type": ["string","null"], "default": "openwebui"}, + "tags": {"type": "array", "items": {"type":"string"}}, + "chunk_chars": {"type": "integer", "default": 3500}, + "overlap_chars": {"type": "integer", "default": 300}, + "collection_name": {"type": "string", "default": "chat_memory"} + }, + "required": ["namespace"] + } + }, + "memory_query": { + "description": "Zoek relevante chat-memory snippets (hybride Meili + Chroma) binnen namespace.", + "parameters": { + "type": "object", + "properties": { + "namespace": {"type": "string"}, + "query": {"type": "string"}, + "k": {"type": "integer", "default": 12}, + "meili_limit": {"type": "integer", "default": 40}, + "chroma_limit": {"type": "integer", "default": 40}, + "collection_name": {"type": "string", "default": "chat_memory"}, + "max_total_chars": {"type": "integer", "default": 12000}, + "clip_chars": {"type": "integer", "default": 1200} + }, + "required": ["namespace","query"] + } + }, "web_search_xng": { "description": "Search the web using SearXNG and get the content of the relevant pages.", "parameters": { @@ -3623,6 +3925,124 @@ async def _meili_search_internal(query: str, *, repo_full: str | None, branch: s if p and p not in best: best[p] = h return list(best.values()) + +_MEILI_MEM_READY = False +_MEILI_MEM_LOCK = asyncio.Lock() + +def _meili_headers() -> dict: + headers = {"Content-Type": "application/json"} + if MEILI_API_KEY: + headers["Authorization"] = f"Bearer {MEILI_API_KEY}" + headers["X-Meili-API-Key"] = MEILI_API_KEY + return headers + +async def _meili_req(method: str, path: str, json_body=None, timeout: float = 15.0): + base = (MEILI_URL or "").rstrip("/") + url = f"{base}{path}" + client = app.state.HTTPX_PROXY if hasattr(app.state, "HTTPX_PROXY") else httpx.AsyncClient() + close_after = not hasattr(app.state, "HTTPX_PROXY") + try: + r = await client.request( + method, + url, + headers=_meili_headers(), + json=json_body, + timeout=httpx.Timeout(timeout, connect=min(5.0, timeout)), + ) + return r + finally: + if close_after: + await client.aclose() + +async def _ensure_meili_memory_index() -> bool: + """Zorg dat chat_memory index bestaat + goede settings heeft (filter/sort/searchable).""" + global _MEILI_MEM_READY + if not MEILI_URL: + return False + if _MEILI_MEM_READY: + return True + + async with _MEILI_MEM_LOCK: + if _MEILI_MEM_READY: + return True + + uid = MEILI_MEMORY_INDEX + + # 1) Bestaat index? + r = await _meili_req("GET", f"/indexes/{uid}", timeout=5.0) + if r.status_code == 404: + _ = await _meili_req("POST", "/indexes", json_body={"uid": uid, "primaryKey": "id"}, timeout=10.0) + + # 2) Settings (idempotent) + settings = { + "filterableAttributes": ["namespace", "role", "ts_unix", "source", "tags", "turn_id", "message_id"], + "sortableAttributes": ["ts_unix"], + "searchableAttributes": ["text"], + "displayedAttributes": ["*"], + } + _ = await _meili_req("PATCH", f"/indexes/{uid}/settings", json_body=settings, timeout=10.0) + + _MEILI_MEM_READY = True + return True + +async def _meili_memory_upsert(docs: list[dict]) -> None: + if not docs or not MEILI_URL: + return + ok = await _ensure_meili_memory_index() + if not ok: + return + + uid = MEILI_MEMORY_INDEX + path = f"/indexes/{uid}/documents?primaryKey=id" + CH = 500 + for i in range(0, len(docs), CH): + chunk = docs[i:i+CH] + r = await _meili_req("POST", path, json_body=chunk, timeout=30.0) + # geen hard fail; toolserver moet niet crashen door Meili + if r.status_code >= 400: + logger.warning("Meili memory upsert failed: %s %s", r.status_code, r.text[:400]) + +async def _meili_memory_search(namespace: str, query: str, limit: int = 40) -> list[dict]: + if not MEILI_URL: + return [] + ok = await _ensure_meili_memory_index() + if not ok: + return [] + + uid = MEILI_MEMORY_INDEX + # filter strikt op namespace + filt = f'namespace = "{namespace}"' + body = { + "q": query, + "limit": int(limit), + "filter": filt, + "showRankingScore": True, + } + r = await _meili_req("POST", f"/indexes/{uid}/search", json_body=body, timeout=20.0) + if r.status_code >= 400: + logger.warning("Meili memory search failed: %s %s", r.status_code, r.text[:400]) + return [] + + data = r.json() + out = [] + for h in data.get("hits", []) or []: + out.append({ + "id": h.get("id"), + "text": h.get("text") or "", + "score": float(h.get("_rankingScore") or 0.0), + "meta": { + "namespace": h.get("namespace"), + "role": h.get("role"), + "ts_unix": h.get("ts_unix"), + "source": h.get("source"), + "turn_id": h.get("turn_id"), + "tags": h.get("tags") or [], + "message_id": h.get("message_id"), + "chunk_index": h.get("chunk_index"), + } + }) + return out + async def _search_first_candidates(repo_url: str, branch: str, query: str, explicit_paths: list[str] | None = None, limit: int = 50) -> list[str]: """