#!/usr/bin/env python3 # ConsoleX 2.4.13 — lokale console-agent voor je LLM Proxy # -------------------------------------------------------- # Highlights t.o.v. 2.4.12: # - ask_user werkt nu ook in BG-modus: # * Tijdens een BG-job wordt de vraag met een ID getoond: # [ask_user] job vraag : # Jij kunt antwoorden met: # :answer # De job wacht op dit antwoord en gaat daarna verder. # - Foreground-modus (BG uit): ask_user vraagt nog steeds direct om input. # - Verder alles van 2.4.12: # * Uitgebreide salvage voor content-toolcalls (call_tool, directe toolkeys). # * 5-stappen werkwijze in system prompt (web_search → inspect → plan → execute → verify). # * run_shell ondersteunt optionele 'cwd'. # * Korte toolcall-previews in de logging. import asyncio import html as _html import json import os import re import time from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional from urllib.parse import urlencode, urlparse import httpx # -------- Optional rich UI -------- try: from rich import box from rich.console import Console from rich.panel import Panel from rich.prompt import Prompt from rich.table import Table RICH = True console = Console() except Exception: RICH = False class _C: def print(self, *a, **k): print(*a, **k) console = _C() # ---------------------------- Config ----------------------------- DEFAULT_UA = "ConsoleX/2.4.13 (httpx)" BASE_URL = os.getenv("CONSOLEX_BASE_URL", "http://localhost:8080/v1").rstrip("/") MODEL = os.getenv("CONSOLEX_MODEL", "magistral-24b") TIMEOUT_S = int(os.getenv("CONSOLEX_TIMEOUT_S", "600")) MAX_MSGS = int(os.getenv("CONSOLEX_MAX_MSGS", "24")) MAX_TOKENS = int(os.getenv("CONSOLEX_MAX_TOKENS", "13000") or "13000") VERBOSITY = max(0, min(3, int(os.getenv("CONSOLEX_VERBOSITY", "2")))) EVENTS = os.getenv("CONSOLEX_EVENTS", "1") == "1" # ticks + tool-logs DEBUG_HTTP = os.getenv("CONSOLEX_DEBUG_HTTP", "0") == "1" STREAM_DEFAULT = os.getenv("CONSOLEX_STREAM", "1") == "1" # Salvage staat standaard AAN SALVAGE = os.getenv("CONSOLEX_SALVAGE", "1") == "1" TOOL_MODE = os.getenv("CONSOLEX_TOOL_MODE", "auto").lower() # auto|native|content|off TOOL_RETURN_ROLE = os.getenv("CONSOLEX_TOOL_RETURN_ROLE", "tool") BAN_RM = os.getenv("CONSOLEX_BAN_RM", "1") == "1" BACKGROUND_DEFAULT = os.getenv("CONSOLEX_BG", "1") == "1" TICK_SECS = float(os.getenv("CONSOLEX_TICK_SECS", "3")) CONTEXT_PATH = Path(os.getenv("CONSOLEX_CONTEXT", str(Path.cwd() / "CONTEXT.json"))) LOG_DIR = Path(".consolex_log") LOG_DIR.mkdir(exist_ok=True) SEARCH_BACKEND = os.getenv("CONSOLEX_SEARCH_BACKEND", "auto").lower() SEARCH_PROXY_URL = os.getenv("CONSOLEX_SEARCH_PROXY", "").rstrip("/") SERPAPI_KEY = os.getenv("SERPAPI_API_KEY", "") GOOGLE_CSE_ID = os.getenv("GOOGLE_CSE_ID", "") GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "") WEB_BLOCK_LOCAL = os.getenv("CONSOLEX_WEB_BLOCK_LOCAL", "1") == "1" USER_AGENT = os.getenv("CONSOLEX_UA", DEFAULT_UA) def v(level: int, *args): if VERBOSITY >= level: try: console.print(*args) except Exception: print(*args, **{}) def dump_json(name: str, data: Any): try: (LOG_DIR / name).write_text(json.dumps(data, ensure_ascii=False, indent=2)) except Exception as e: v(3, f"[log] write {name} failed: {e}") def save_md(name: str, text: str): try: (LOG_DIR / name).write_text(text) except Exception as e: v(3, f"[log] write {name} failed: {e}") # ------------------ Context (history/goal) ----------------------- def load_context() -> Dict[str, Any]: if CONTEXT_PATH.exists(): try: return json.loads(CONTEXT_PATH.read_text(encoding="utf-8")) except Exception: pass return {"history": [], "goal": "Probeer te verifiëren of ik een werkzaam programma ben."} def save_context(ctx: Dict[str, Any]): hist = ctx.get("history", []) if len(hist) > MAX_MSGS: ctx["history"] = hist[-MAX_MSGS:] CONTEXT_PATH.write_text(json.dumps(ctx, ensure_ascii=False, indent=2), encoding="utf-8") CTX = load_context() # ------------------- Regex & salvage parsers --------------------- FENCE_RX = re.compile(r"```([a-z0-9_\-+.]*?)\s*\n(.*?)```", re.S | re.I) # Alle bekende toolnamen (voor salvage) TOOL_NAMES = [ "read_file", "write_file", "append_file", "list_tree", "search_text", "run_shell", "ask_user", "set_goal", "add_task", "update_task", "rag_upsert", "rag_search", "web_search", "web_get", ] def html_unescape_multi(s: str, rounds: int = 3) -> str: """Herhaald HTML-unescape (voor &#34; → \" enz.).""" if not s: return s for _ in range(rounds): ns = _html.unescape(s) if ns == s: break s = ns return s def _extract_tools_from_obj(obj: Any, sink: List[dict]): """ Recursief tools extraheren uit een JSON-structuur. Ondersteunt o.a.: - {"call_tool":{"name":"run_shell","arguments":{...}}} - {"call_tool":"run_shell","commands":["cmd1","cmd2",...]} - {"run_shell":{"command":"..."}} - lijsten met bovenstaande varianten. """ if isinstance(obj, dict): # 1) call_tool als dict (OpenAI-stijl) if "call_tool" in obj and isinstance(obj["call_tool"], dict): c = obj["call_tool"] name = c.get("name") args = c.get("arguments") or {} if name: sink.append({"name": name, "arguments": args}) # 1b) call_tool als string + commands/command elif "call_tool" in obj and isinstance(obj["call_tool"], str): name = obj["call_tool"] cmds: List[Dict[str, Any]] = [] if isinstance(obj.get("commands"), list) and obj["commands"]: for cmd in obj["commands"]: cmds.append({"command": str(cmd)}) elif "command" in obj: cmds.append({"command": str(obj["command"])}) else: cmds.append({}) if name: for a in cmds: sink.append({"name": name, "arguments": a}) # 2) directe toolnamen als key for tname in TOOL_NAMES: if tname in obj and isinstance(obj[tname], dict): sink.append({"name": tname, "arguments": obj[tname]}) elif isinstance(obj, list): for it in obj: _extract_tools_from_obj(it, sink) def find_call_tool_json(content: str) -> List[dict]: """ Zoek naar JSON-varianten met tools, zowel plain als HTML-escaped, eventueel in codefences. Geeft lijst terug met items: {"name": "", "arguments": {...}}. """ found: List[dict] = [] if not content: return found # 1) volledige content als JSON proberen (plain + multi-unescaped) for candidate in (content, html_unescape_multi(content)): try: obj = json.loads(candidate) except Exception: continue _extract_tools_from_obj(obj, found) # 2) fenced JSON/TOOL/OPENAI blokken (inclusief lege taal == onbekend) for lang, body in FENCE_RX.findall(content): lang_l = (lang or "").lower().strip() if lang_l in ("json", "tool", "openai", "chatml", ""): s = html_unescape_multi(body.strip()) try: obj = json.loads(s) except Exception: continue _extract_tools_from_obj(obj, found) return found def find_toolcall_xml_like(content: str) -> List[dict]: """ Parse pseudo-XML (ook HTML-escaped varianten). Geeft lijst van {name, arguments}-dicts terug. """ txt = html_unescape_multi(content) results: List[dict] = [] # match met attributen tussen /]*?(?:\{.*?\})?[^>]*)/?>", txt, re.I | re.S): attrs = m.group(1) or "" # name="..." mname = re.search(r'name\s*=\s*"([^"]+)"', attrs, re.I) if not mname: mname = re.search(r"name\s*=\s*'([^']+)'", attrs, re.I) name = mname.group(1) if mname else None # args={...} marg = re.search(r"args\s*=\s*({.*})", attrs, re.I | re.S) args_dict: Dict[str, Any] = {} if marg: raw = marg.group(1) try: args_dict = json.loads(raw) except Exception: # fallback: single quotes → double, nogmaals unescape try: fixed = html_unescape_multi(raw.replace("'", '"')) args_dict = json.loads(fixed) except Exception: args_dict = {} if name: results.append({"name": name, "arguments": args_dict}) return results # -------------------------- Tools impl --------------------------- from pathlib import Path as _P def _safe_path(p: str) -> _P: base = Path.cwd().resolve() path = (base / p).resolve() if not str(path).startswith(str(base)): raise ValueError("Pad buiten projectmap") return path def t_read_file(args: dict) -> str: return _safe_path(args["path"]).read_text(encoding="utf-8") def t_write_file(args: dict) -> dict: p = _safe_path(args["path"]) p.parent.mkdir(parents=True, exist_ok=True) if not p.exists() and not args.get("create", True): raise FileNotFoundError(str(p)) content = args["content"] p.write_text(content, encoding="utf-8") return {"ok": True, "path": str(p), "bytes": len(content)} def t_append_file(args: dict) -> dict: p = _safe_path(args["path"]) p.parent.mkdir(parents=True, exist_ok=True) with p.open("a", encoding="utf-8") as f: f.write(args["content"]) return {"ok": True, "path": str(p), "bytes_appended": len(args["content"])} def _is_bin(p: _P) -> bool: try: with p.open("rb") as f: return b"\0" in f.read(1024) except Exception: return False def t_list_tree(args: dict) -> dict: max_depth = int(args.get("max_depth", 3)) max_entries = int(args.get("max_entries", 4000)) root = Path.cwd() lines = ["./"] count = 0 for path in sorted(root.rglob("*")): if path == root: continue rel = path.relative_to(root) depth = len(rel.parts) if depth > max_depth: continue prefix = " " * (depth - 1) if depth > 0 else "" lines.append(prefix + str(rel)) count += 1 if count >= max_entries: break return {"tree": "\n".join(lines), "truncated": count >= max_entries, "lines": len(lines)} def t_search_text(args: dict) -> dict: pat = re.compile(args["pattern"]) glob_pat = args.get("glob", "**/*") ignore_bin = bool(args.get("ignore_bin", True)) results = [] for p in Path.cwd().glob(glob_pat): if p.is_file(): if ignore_bin and _is_bin(p): continue try: txt = p.read_text(encoding="utf-8", errors="ignore") for m in pat.finditer(txt): line_no = txt[: m.start()].count("\n") + 1 results.append( { "path": str(p), "line": line_no, "match": txt[m.start() : m.end()][:200], } ) except Exception: continue return {"count": len(results), "results": results[:500]} async def t_run_shell(args: dict) -> dict: """ Voert een shell-commando uit. Ondersteunt: - command: string (vereist) - timeout: int (default 600) - cwd: optioneel werkdirectory (relatief aan projectroot, of absoluut) """ cmd = args["command"] timeout = int(args.get("timeout", 600)) # Optionele cwd cwd_arg = args.get("cwd") workdir: Optional[Path] = None if isinstance(cwd_arg, str) and cwd_arg.strip(): p = Path(cwd_arg).expanduser() if not p.is_absolute(): # relatief aan projectmap p = _safe_path(cwd_arg) workdir = p if BAN_RM and re.search(r"\brm\s+-", cmd): return { "exit_code": 1, "stdout": "", "stderr": "rm is verboden", "duration": 0.0, "log_path": "", } start = time.time() proc = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=str(workdir) if workdir is not None else None, ) try: out, err = await asyncio.wait_for(proc.communicate(), timeout=timeout) except asyncio.TimeoutError: try: proc.kill() except Exception: pass return { "exit_code": -1, "stdout": "", "stderr": "timeout", "duration": time.time() - start, "log_path": "", } return { "exit_code": proc.returncode, "stdout": out.decode(errors="ignore"), "stderr": err.decode(errors="ignore"), "duration": time.time() - start, "cwd": str(workdir) if workdir is not None else "", "log_path": "", } # -------- ask_user met BG-ondersteuning -------- async def t_ask_user(args: dict) -> dict: """ - In foreground (geen actieve BG-job): stel direct een vraag via stdin. - In BG-job: toon vraag + qid en wacht op :answer . """ q = args["question"] loop = asyncio.get_running_loop() # BG-job: vraag via :answer job = BG_STATE.active_job if job is not None: BG_STATE.q_counter += 1 qid = f"q{BG_STATE.q_counter:04d}" fut: asyncio.Future = loop.create_future() BG_STATE.pending_questions[qid] = { "job_id": job.id, "question": q, "future": fut, } msg = ( f"[ask_user] job {job.id} vraag {qid}: {q}\n" f"Antwoord met: :answer {qid} " ) if RICH: console.print(msg) else: print(msg) try: answer = await fut except asyncio.CancelledError: return {"answer": "", "note": f"cancelled ({qid})"} return {"answer": answer, "id": qid} # Foreground: gewoon via stdin if RICH: ans = await asyncio.to_thread(Prompt.ask, f"[bold cyan]Vraag[/] {q}") else: def _read() -> str: try: return input(f"Vraag {q}\n> ") except EOFError: return "" ans = await asyncio.to_thread(_read) return {"answer": ans} def t_set_goal(args: dict) -> dict: CTX["goal"] = args["title"] save_context(CTX) return {"ok": True, "title": args["title"]} def t_add_task(args: dict) -> dict: tasks = CTX.setdefault("tasks", {}) tid = f"{int(time.time()*1000)%0xffffffff:08x}" tasks[tid] = { "title": args["title"], "status": "doing", "parent_id": args.get("parent_id"), "notes": "", } save_context(CTX) return {"id": tid, "title": args["title"], "status": "doing"} def t_update_task(args: dict) -> dict: t = CTX.setdefault("tasks", {}).get(args["task_id"]) if not t: return {"ok": False, "error": "unknown task_id"} if args.get("status"): t["status"] = args["status"] if args.get("notes"): t["notes"] = args["notes"] save_context(CTX) return {"ok": True, "task": {"id": args["task_id"], **t}} def t_rag_upsert(args: dict) -> dict: rag = Path(".rag") rag.mkdir(exist_ok=True) out = rag / f"{args['namespace']}__{args['doc_id']}.txt" out.write_text( json.dumps({"text": args["text"], "meta": args.get("meta") or {}}, ensure_ascii=False, indent=2), encoding="utf-8", ) return {"ok": True, "path": str(out), "bytes": len(args["text"])} def t_rag_search(args: dict) -> dict: """ Eenvoudige RAG-zoektool: - zoekt in .rag/__*.txt - leest JSON: {"text": ..., "meta": {...}} - doet simpele substring- of regex-search in 'text' """ namespace = (args.get("namespace") or "").strip() query = (args.get("query") or "").strip() top_k = int(args.get("top_k", 5)) regex = bool(args.get("regex", False)) if not namespace: return {"ok": False, "error": "missing namespace"} if not query: return {"ok": False, "error": "missing query"} rag_dir = Path(".rag") if not rag_dir.exists(): return {"ok": True, "count": 0, "results": []} prefix = f"{namespace}__" results = [] pattern = None if regex: try: pattern = re.compile(query, re.I) except Exception as e: return {"ok": False, "error": f"invalid regex: {e}"} q_lower = query.lower() for p in rag_dir.glob(f"{prefix}*.txt"): try: raw = p.read_text(encoding="utf-8", errors="ignore") data = json.loads(raw) text = data.get("text") or "" if not isinstance(text, str): continue except Exception: # kapotte of niet-JSON bestanden overslaan continue if regex: matches = list(pattern.finditer(text)) if not matches: continue score = len(matches) first = matches[0].start() else: txt_lower = text.lower() score = txt_lower.count(q_lower) if score == 0: continue first = txt_lower.find(q_lower) start = max(0, first - 80) end = min(len(text), first + 80) snippet = text[start:end].replace("\n", " ") # doc_id afleiden uit bestandsnaam: namespace__docid(.txt) name = p.name rest = name[len(prefix) :] if rest.endswith(".txt"): rest = rest[:-4] doc_id = rest results.append( { "namespace": namespace, "doc_id": doc_id, "path": str(p), "score": score, "snippet": snippet, } ) results.sort(key=lambda r: r["score"], reverse=True) top = results[: max(1, top_k)] return {"ok": True, "count": len(results), "results": top} # -------------------- Web get/search tools ----------------------- from ipaddress import ip_address # noqa: F401 _PRIVATE_PREFIX = ( "localhost", "127.", "0.", "10.", "172.16.", "172.17.", "172.18.", "172.19.", "172.2", "192.168.", "169.254.", "[::1]", "::1", ) def _looks_private(host: Optional[str]) -> bool: if not host: return True h = (host or "").lower() return any(h.startswith(p) for p in _PRIVATE_PREFIX) try: from bs4 import BeautifulSoup except Exception: BeautifulSoup = None def _html_to_text(html: str) -> str: if not html: return "" if BeautifulSoup: try: soup = BeautifulSoup(html, "html.parser") for t in soup(["script", "style", "noscript"]): t.decompose() main = soup.find(["article", "main"]) or soup.body or soup text = main.get_text(separator="\n") text = re.sub(r"\n{3,}", "\n\n", text) return text.strip() except Exception: pass txt = re.sub(r"<[^>]+>", " ", html) txt = re.sub(r"\s{2,}", " ", txt) return txt.strip() async def t_web_get(args: dict) -> dict: url = args.get("url") or "" max_bytes = int(args.get("max_bytes", 200_000)) timeout_s = int(args.get("timeout", 20)) if not url: return {"error": "missing url"} try: p = urlparse(url) if WEB_BLOCK_LOCAL and _looks_private(p.hostname): return {"error": f"blocked local url: {url}"} async with httpx.AsyncClient( headers={"User-Agent": USER_AGENT}, follow_redirects=True, timeout=timeout_s ) as client: r = await client.get(url) text = r.text or "" if len(text) > max_bytes: text = text[:max_bytes] title_m = re.search(r"]*>(.*?)", r.text or "", re.S | re.I) title = title_m.group(1).strip() if title_m else "" return { "url": str(r.url), "status": r.status_code, "title": title, "text": _html_to_text(text), } except Exception as e: return {"error": f"{type(e).__name__}: {e}"} async def _search_via_proxy( query: str, max_results: int, recency_days: Optional[int], site: Optional[str] ) -> Optional[dict]: if not SEARCH_PROXY_URL: return None payload = {"q": query, "max_results": max_results} if recency_days: payload["recency_days"] = recency_days if site: payload["site"] = site url = SEARCH_PROXY_URL + "/web/search" try: async with httpx.AsyncClient(headers={"User-Agent": USER_AGENT}, timeout=30.0) as client: r = await client.post(url, json=payload) if r.status_code == 200: return r.json() except Exception: pass return None async def _search_via_google_cse(query: str, max_results: int) -> List[dict]: if not (GOOGLE_API_KEY and GOOGLE_CSE_ID): return [] try: params = { "key": GOOGLE_API_KEY, "cx": GOOGLE_CSE_ID, "q": query, "num": min(max_results, 10), } async with httpx.AsyncClient(headers={"User-Agent": USER_AGENT}, timeout=30.0) as client: r = await client.get("https://www.googleapis.com/customsearch/v1", params=params) r.raise_for_status() data = r.json() out = [] for it in data.get("items", []): out.append( {"title": it.get("title", ""), "url": it.get("link", ""), "snippet": it.get("snippet", "")} ) return out[:max_results] except Exception: return [] async def _search_via_serpapi(query: str, max_results: int) -> List[dict]: if not SERPAPI_KEY: return [] try: params = {"engine": "google", "q": query, "api_key": SERPAPI_KEY, "num": min(max_results, 10)} async with httpx.AsyncClient(headers={"User-Agent": USER_AGENT}, timeout=30.0) as client: r = await client.get("https://serpapi.com/search.json", params=params) r.raise_for_status() data = r.json() rows = [] for it in (data.get("organic_results") or []): rows.append( { "title": it.get("title", ""), "url": it.get("link", ""), "snippet": it.get("snippet", "") or "", } ) return rows[:max_results] except Exception: return [] async def _search_via_duckhtml(query: str, max_results: int) -> List[dict]: try: params = {"q": query} url = "https://duckduckgo.com/html/?" + urlencode(params) async with httpx.AsyncClient( headers={"User-Agent": USER_AGENT}, timeout=30.0, follow_redirects=True ) as client: r = await client.get(url) r.raise_for_status() html = r.text results = [] if BeautifulSoup: soup = BeautifulSoup(html, "html.parser") for res in soup.select(".result"): a = res.select_one("a.result__a") or res.select_one("a") if not a or not a.get("href"): continue title = a.get_text(strip=True) href = a.get("href") snip = res.select_one(".result__snippet") snippet = snip.get_text(" ", strip=True) if snip else "" results.append({"title": title, "url": href, "snippet": snippet}) if len(results) >= max_results: break else: for m in re.finditer( r']+class="[^"]*result__a[^"]*"[^>]+href="([^"]+)"[^>]*>(.*?)', html, re.I | re.S, ): href = m.group(1) title = re.sub(r"<[^>]+>", "", m.group(2)).strip() results.append({"title": title, "url": href, "snippet": ""}) if len(results) >= max_results: break return results except Exception: return [] async def t_web_search(args: dict) -> dict: query = (args.get("query") or "").strip() if not query: return {"error": "missing query"} max_results = int(args.get("max_results", 5)) recency_days = args.get("recency_days") site = args.get("site") backend = (args.get("backend") or SEARCH_BACKEND or "auto").lower() if backend in ("auto", "proxy"): data = await _search_via_proxy(query, max_results, recency_days, site) if data and isinstance(data, dict) and data.get("results"): return {"backend": "proxy", "results": data["results"][:max_results]} if backend in ("auto", "google") and GOOGLE_API_KEY and GOOGLE_CSE_ID: rows = await _search_via_google_cse(query, max_results) if rows: return {"backend": "google", "results": rows} if backend in ("auto", "serpapi") and SERPAPI_KEY: rows = await _search_via_serpapi(query, max_results) if rows: return {"backend": "serpapi", "results": rows} # duckhtml fallback q = query if not site else f"site:{site} {query}" rows = await _search_via_duckhtml(q, max_results) if rows: return {"backend": "duckhtml", "results": rows} return {"backend": backend, "results": [], "warning": "no results"} # -------------------- OpenAI-style client ------------------------ class OpenAIClient: def __init__(self, base_url: str, model: str): self.base_url = base_url self.model = model self._client = httpx.AsyncClient(timeout=TIMEOUT_S) async def close(self): await self._client.aclose() async def chat( self, messages: List[dict], tools: Optional[List[dict]] = None, tool_choice: Optional[str] = "auto", stream: bool = False, on_event=None, ) -> dict: payload = {"model": self.model, "messages": messages, "max_tokens": MAX_TOKENS} if tools is not None: payload["tools"] = tools if tool_choice: payload["tool_choice"] = tool_choice if stream: payload["stream"] = True if DEBUG_HTTP: dump_json("last_request.json", payload) url = f"{self.base_url}/chat/completions" async def _post(_payload): if not stream: r = await self._client.post(url, json=_payload) r.raise_for_status() return r.json() tool_builders: Dict[int, dict] = {} content_parts: List[str] = [] async with self._client.stream("POST", url, json=_payload) as resp: resp.raise_for_status() async for chunk in resp.aiter_text(): for line in chunk.splitlines(): s = line.strip() if not s.startswith("data:"): continue data = s[5:].strip() if data == "[DONE]": break try: evt = json.loads(data) except Exception: continue for ch in evt.get("choices", []): delta = ch.get("delta", {}) if on_event and delta: try: on_event(delta) except Exception: pass dc = delta.get("content") if dc: content_parts.append(dc) if "tool_calls" in delta: for t in delta["tool_calls"]: idx = t.get("index", 0) cur = tool_builders.setdefault( idx, { "type": "function", "function": {"name": "", "arguments": ""}, }, ) if t.get("id"): cur["id"] = t["id"] if t.get("type"): cur["type"] = t["type"] fn = t.get("function") or {} if "name" in fn and fn["name"]: cur["function"]["name"] = fn["name"] if "arguments" in fn and fn["arguments"]: cur["function"]["arguments"] += fn["arguments"] tool_calls = [] for k in sorted(tool_builders.keys()): b = tool_builders[k] tool_calls.append(b) return { "id": "streamed", "object": "chat.completion", "model": self.model, "choices": [ { "index": 0, "finish_reason": "stop", "message": { "role": "assistant", "content": "".join(content_parts) if content_parts else None, "tool_calls": tool_calls or None, }, } ], } try: res = await _post(payload) if DEBUG_HTTP: dump_json("last_response.json", res) return res except (httpx.ReadError, httpx.ProtocolError, httpx.HTTPStatusError): # fallback zonder stream if stream: payload.pop("stream", None) r = await self._client.post(url, json=payload) r.raise_for_status() res = r.json() if DEBUG_HTTP: dump_json("last_response.json", res) return res raise # -------------------- Tool schemas (OpenAI) ---------------------- def tool_schema() -> List[dict]: return [ { "type": "function", "function": { "name": "read_file", "description": "Lees een tekstbestand binnen de projectmap.", "parameters": { "type": "object", "properties": {"path": {"type": "string"}}, "required": ["path"], }, }, }, { "type": "function", "function": { "name": "write_file", "description": "Schrijf/overschrijf een tekstbestand binnen de projectmap.", "parameters": { "type": "object", "properties": { "path": {"type": "string"}, "content": {"type": "string"}, "create": {"type": "boolean", "default": True}, }, "required": ["path", "content"], }, }, }, { "type": "function", "function": { "name": "append_file", "description": "Voeg tekst toe aan een bestand binnen de projectmap.", "parameters": { "type": "object", "properties": { "path": {"type": "string"}, "content": {"type": "string"}, }, "required": ["path", "content"], }, }, }, { "type": "function", "function": { "name": "list_tree", "description": "Geef een boomoverzicht van de projectmap terug.", "parameters": { "type": "object", "properties": { "max_depth": {"type": "integer", "default": 3}, "max_entries": {"type": "integer", "default": 4000}, }, }, }, }, { "type": "function", "function": { "name": "search_text", "description": "Zoek regex in tekstbestanden (glob patroon).", "parameters": { "type": "object", "properties": { "pattern": {"type": "string"}, "glob": {"type": "string", "default": "**/*"}, "ignore_bin": {"type": "boolean", "default": True}, }, "required": ["pattern"], }, }, }, { "type": "function", "function": { "name": "run_shell", "description": "Voer een shell-commando uit (stdout/stderr).", "parameters": { "type": "object", "properties": { "command": {"type": "string"}, "timeout": {"type": "integer", "default": 600}, "cwd": { "type": "string", "description": "Optioneel werkdirectory (relatief of absoluut).", }, }, "required": ["command"], }, }, }, { "type": "function", "function": { "name": "ask_user", "description": "Stel een gerichte vraag aan de gebruiker. " "In BG-modus wacht de agent op een :answer van de gebruiker.", "parameters": { "type": "object", "properties": {"question": {"type": "string"}}, "required": ["question"], }, }, }, { "type": "function", "function": { "name": "set_goal", "description": "Stel of wijzig het Hoofddoel.", "parameters": { "type": "object", "properties": {"title": {"type": "string"}}, "required": ["title"], }, }, }, { "type": "function", "function": { "name": "add_task", "description": "Voeg een (sub)doel / taak toe.", "parameters": { "type": "object", "properties": { "title": {"type": "string"}, "parent_id": {"type": "string"}, }, "required": ["title"], }, }, }, { "type": "function", "function": { "name": "update_task", "description": "Werk status/notes bij.", "parameters": { "type": "object", "properties": { "task_id": {"type": "string"}, "status": {"type": "string"}, "notes": {"type": "string"}, }, "required": ["task_id"], }, }, }, { "type": "function", "function": { "name": "rag_upsert", "description": "Sla tekst op in RAG of archiveer lokaal.", "parameters": { "type": "object", "properties": { "namespace": {"type": "string"}, "doc_id": {"type": "string"}, "text": {"type": "string"}, "meta": {"type": "object"}, }, "required": ["namespace", "doc_id", "text"], }, }, }, { "type": "function", "function": { "name": "rag_search", "description": "Zoek in lokaal opgeslagen RAG-tekst.", "parameters": { "type": "object", "properties": { "namespace": {"type": "string"}, "query": {"type": "string"}, "top_k": {"type": "integer", "default": 5}, "regex": {"type": "boolean", "default": False}, }, "required": ["namespace", "query"], }, }, }, { "type": "function", "function": { "name": "web_search", "description": "Zoek op het web via proxy/google/serpapi/duckhtml.", "parameters": { "type": "object", "properties": { "query": {"type": "string"}, "max_results": {"type": "integer", "default": 5}, "recency_days": {"type": "integer"}, "site": {"type": "string"}, "backend": { "type": "string", "enum": ["auto", "proxy", "google", "serpapi", "duckhtml", "off"], }, }, "required": ["query"], }, }, }, { "type": "function", "function": { "name": "web_get", "description": "Haal raw tekst op van een URL.", "parameters": { "type": "object", "properties": { "url": {"type": "string"}, "max_bytes": {"type": "integer", "default": 200000}, "timeout": {"type": "integer", "default": 20}, }, "required": ["url"], }, }, }, ] async def dispatch_tool(name: str, args: dict) -> str: # Kleine preview-log per tool try: preview = name if isinstance(args, dict): if name in ("write_file", "read_file", "append_file") and "path" in args: preview += f" {args['path']}" elif name == "run_shell" and "command" in args: c = str(args["command"]).replace("\n", " ") if len(c) > 60: c = c[:57] + "..." preview += f" {c}" v(1, preview) except Exception: pass try: if name == "read_file": return t_read_file(args) if name == "write_file": return json.dumps(t_write_file(args), ensure_ascii=False) if name == "append_file": return json.dumps(t_append_file(args), ensure_ascii=False) if name == "list_tree": return json.dumps(t_list_tree(args), ensure_ascii=False) if name == "search_text": return json.dumps(t_search_text(args), ensure_ascii=False) if name == "run_shell": return json.dumps(await t_run_shell(args), ensure_ascii=False) if name == "ask_user": return json.dumps(await t_ask_user(args), ensure_ascii=False) if name == "set_goal": return json.dumps(t_set_goal(args), ensure_ascii=False) if name == "add_task": return json.dumps(t_add_task(args), ensure_ascii=False) if name == "update_task": return json.dumps(t_update_task(args), ensure_ascii=False) if name == "rag_upsert": return json.dumps(t_rag_upsert(args), ensure_ascii=False) if name == "rag_search": return json.dumps(t_rag_search(args), ensure_ascii=False) if name == "web_search": return json.dumps(await t_web_search(args), ensure_ascii=False) if name == "web_get": return json.dumps(await t_web_get(args), ensure_ascii=False) return json.dumps({"ok": False, "error": f"unknown tool {name}"}, ensure_ascii=False) except Exception as e: return json.dumps({"ok": False, "error": str(e)}, ensure_ascii=False) def append_tool_result( msgs: List[dict], tool_call_id: str, name: str, content: str, role_pref: str ): if role_pref == "tool": msgs.append( { "role": "tool", "tool_call_id": tool_call_id, "name": name, "content": content, } ) else: msgs.append( { "role": "user", "content": f'\n{content}\n', } ) # --------------------- System & history helpers ------------------ def system_preamble() -> List[dict]: rules = ( "Je bent ConsoleX, een autonome console-agent die code kan lezen/bewerken, " "shell-commando's uitvoert, en doelen beheert.\n" "Basisregels:\n" "1) Werk binnen de projectmap (cwd + submappen) tenzij expliciet anders aangegeven.\n" "2) Gebruik tools doelgericht. Lees shell-output en stuur bij.\n" "3) Houd 1 Hoofddoel aan + (sub)doelen; update status klaar/blocked/done.\n" "4) Context kort; archiveer lange stukken met rag_upsert.\n" "5) Vraag verduidelijking met ask_user; zie user-answers niet als nieuw hoofddoel.\n" "6) rm-commando's met flags zijn niet toegestaan.\n" "7) Bij creëer/maak/schrijf: gebruik write_file met volledige inhoud & pad in huidig project.\n" "8) Bij aanpassing zonder pad: list_tree/search_text → read_file → write_file.\n" "9) Test met run_shell en interpreteer output (apt/pip/docker/etc).\n" "10) Voor recente/online info: gebruik web_search en eventueel web_get voor detail.\n" "\n" "Werkmethode voor complexe, stapsgewijze taken (installaties, Docker, builds, config):\n" "11) Volg steeds deze cyclus:\n" " a) Oriëntatie: gebruik web_search om kort te kijken hoe dit normaal gaat (maximaal enkele resultaten).\n" " b) Inspectie: onderzoek de huidige situatie met list_tree, read_file en run_shell (bijv. ls, docker ps, git status).\n" " c) Plan: maak een beknopt stappenplan in bullet points voordat je tools gebruikt (kort, max ~10 regels).\n" " d) Uitvoering: voer de stappen één voor één uit met tools (run_shell, write_file, etc.).\n" " e) Controle: na elke belangrijke stap een check (bijv. opnieuw ls, docker ps, tests) om te verifiëren dat de stap gelukt is.\n" "12) Geef geen 'simulaties' van toolcalls in tekst als tools beschikbaar zijn. Gebruik echte tool_calls. " "Alleen als je echt geen tools hebt, mag je uitleg geven zonder tool_calls én zeg je dat expliciet.\n" ) if TOOL_MODE == "native": rules += ( "13) Gebruik het tools-mechanisme van de API (native tool_calls). " "Stop géén tool-aanroepen in normale tekst of codeblokken; gebruik altijd tools.\n" ) elif TOOL_MODE == "content": rules += ( "13) Gebruik géén native tool_calls. Encodeer tool-aanroepen als JSON in de content " 'met een sleutel zoals \"call_tool\".\n' ) elif TOOL_MODE == "off": rules += "13) Gebruik geen tools; antwoord uitsluitend in tekst.\n" else: # auto rules += ( "13) Geef de voorkeur aan native tool_calls van de API. Alleen als dat duidelijk niet werkt, " 'mag je tool-aanroepen als JSON in de content geven met \"call_tool\" of .\n' ) state = f"STATE\nHoofddoel: {CTX.get('goal', '(geen)')}\nLaatste taken:" return [ {"role": "system", "content": rules}, {"role": "system", "content": state}, ] def build_history(extra: Optional[List[dict]] = None) -> List[dict]: hist = system_preamble() + (CTX.get("history") or []) if extra: hist += extra return hist[-MAX_MSGS:] # ----------------------- Chat core -------------------------------- async def chat_with_tools( client: OpenAIClient, user_text: str, stream: bool, print_events: bool = True, ) -> str: history = build_history([{"role": "user", "content": user_text}]) role_pref = TOOL_RETURN_ROLE tools = tool_schema() if TOOL_MODE in ("native", "auto", "content") else None def on_event(delta: dict): if not print_events: return if "tool_calls" in delta: for t in delta["tool_calls"]: fnn = ((t.get("function") or {}).get("name")) or "" if fnn: v(1, f"→ tool {fnn} (delta)") final_assistant = "" for _ in range(8): # max 8 rondes tool-use res = await client.chat( history, tools=tools, tool_choice=("auto" if tools and TOOL_MODE != "off" else None), stream=stream, on_event=on_event, ) msg = (res.get("choices") or [{}])[0].get("message", {}) # 1) Native tool_calls via API if msg.get("tool_calls"): if print_events and EVENTS: for tc in msg["tool_calls"]: fnn = ((tc.get("function") or {}).get("name")) or "" if fnn: v(1, f"→ tool {fnn} (native)") for tc in msg["tool_calls"]: fn = ((tc.get("function") or {}).get("name")) or "" args_json = (tc.get("function") or {}).get("arguments") or "{}" try: args = json.loads(args_json) except Exception: args = {} try: m = re.findall(r'"(\w+)"\s*:\s*"([^"]*)"', args_json) if m: args = {k: v for (k, v) in m} except Exception: args = {} out = await dispatch_tool(fn, args) append_tool_result(history, tc.get("id", "tool1"), fn, out, role_pref) continue # 2) Geen native tools → salvage (JSON + ) content = (msg.get("content") or "").strip() if content and SALVAGE: items = find_call_tool_json(content) if not items: items = find_toolcall_xml_like(content) if items: for idx, it in enumerate(items, start=1): fn = it.get("name", "") if not fn: continue args = it.get("arguments") or {} if isinstance(args, str): try: args = json.loads(args) except Exception: args = {} v(1, f"[salvage] {fn}") out = await dispatch_tool(fn, args) append_tool_result(history, f"salvage{idx}", fn, out, role_pref) # na salvage: opnieuw naar model met tool_result in history continue # 3) Plain tekst-antwoord final_assistant = content break CTX["history"] = (CTX.get("history") or []) + [{"role": "user", "content": user_text}] if final_assistant: CTX["history"].append({"role": "assistant", "content": final_assistant}) save_context(CTX) if final_assistant: save_md("last_assistant.md", final_assistant) return final_assistant or "" # ---------------------- BG runtime (queue) ------------------------ @dataclass class Job: id: str prompt: str status: str = "queued" # queued|running|done|aborted|error result: str = "" err: str = "" created: float = field(default_factory=time.time) started: float = 0.0 finished: float = 0.0 class BGState: def __init__(self): self.enabled = BACKGROUND_DEFAULT self.queue: asyncio.Queue[Job] = asyncio.Queue() self.worker_task: Optional[asyncio.Task] = None self.current_task: Optional[asyncio.Task] = None self.jobs: Dict[str, Job] = {} # Nieuw: actieve job + pending ask_user-vragen self.active_job: Optional[Job] = None self.pending_questions: Dict[str, Dict[str, Any]] = {} self.q_counter: int = 0 BG_STATE = BGState() def bg_is_running() -> bool: return BG_STATE.worker_task is not None and not BG_STATE.worker_task.done() async def heartbeat(job: Job): k = 0 while job.status == "running": await asyncio.sleep(TICK_SECS) k += 1 if EVENTS and TICK_SECS > 0: v(1, f"[{job.id}] … tick {k}") async def bg_worker(): client = OpenAIClient(BASE_URL, MODEL) try: while True: try: if not BG_STATE.enabled and BG_STATE.queue.empty(): break job: Job = await BG_STATE.queue.get() except asyncio.CancelledError: break if job.status == "aborted": BG_STATE.queue.task_done() continue BG_STATE.jobs[job.id] = job BG_STATE.active_job = job job.status = "running" job.started = time.time() v(1, f"[{job.id}] start: {job.prompt}") hb = asyncio.create_task(heartbeat(job)) try: task = asyncio.create_task( chat_with_tools( client, job.prompt, stream=STREAM_DEFAULT, print_events=EVENTS, ) ) BG_STATE.current_task = task res = await task job.result = res job.status = "done" job.finished = time.time() if res: if RICH: console.print(Panel(res, title=f"Agent (job {job.id})", border_style="green")) else: print(f"\n[Agent {job.id}] {res}") except asyncio.CancelledError: job.status = "aborted" job.finished = time.time() v(1, f"[{job.id}] aborted") # openstaande vragen voor deze job annuleren for qid, meta in list(BG_STATE.pending_questions.items()): if meta.get("job_id") == job.id: fut = meta.get("future") if fut and not fut.done(): fut.set_exception(asyncio.CancelledError()) BG_STATE.pending_questions.pop(qid, None) break except Exception as e: job.status = "error" job.err = str(e) job.finished = time.time() v(1, f"[{job.id}] error: {e}") finally: BG_STATE.current_task = None BG_STATE.active_job = None hb.cancel() try: await hb except Exception: pass BG_STATE.queue.task_done() finally: await client.close() def start_bg(): BG_STATE.enabled = True if not bg_is_running(): BG_STATE.worker_task = asyncio.create_task(bg_worker()) v(1, "[bg] worker gestart") async def stop_bg(): BG_STATE.enabled = False # queue mag leeglopen maar niks nieuws if bg_is_running(): v(1, "[bg] worker stoppen…") BG_STATE.worker_task.cancel() try: await BG_STATE.worker_task except asyncio.CancelledError: pass BG_STATE.worker_task = None v(1, "[bg] worker gestopt") def add_job(prompt: str) -> Job: jid = f"{int(time.time()*1000)%0xffffffff:08x}" job = Job(id=jid, prompt=prompt) BG_STATE.jobs[jid] = job BG_STATE.queue.put_nowait(job) if BG_STATE.enabled: start_bg() return job def format_queue() -> str: pending = [j for j in BG_STATE.jobs.values() if j.status in ("queued", "running")] lines = [f"queue size: {BG_STATE.queue.qsize()} (pending: {len(pending)})"] for j in sorted(pending, key=lambda x: x.created): lines.append(f"- {j.id} [{j.status}] {j.prompt[:80]}") return "\n".join(lines) def format_jobs() -> str: rows = sorted(BG_STATE.jobs.values(), key=lambda x: x.created, reverse=True)[:30] lines = [] for j in rows: lines.append(f"- {j.id} [{j.status}] {j.prompt[:80]}") return "\n".join(lines) if lines else "(geen jobs)" def format_questions() -> str: if not BG_STATE.pending_questions: return "(geen openstaande vragen)" lines = [] for qid, meta in BG_STATE.pending_questions.items(): lines.append(f"- {qid} (job {meta.get('job_id')}): {meta.get('question')}") return "\n".join(lines) # -------------------------- Probe -------------------------------- async def probe_capabilities(): client = OpenAIClient(BASE_URL, MODEL) table = [ ["A native tool_calls", "FAIL", ""], ["B content JSON", "FAIL", "0 items"], ] try: r = await client.chat( messages=[ {"role": "system", "content": "You can call tools."}, { "role": "user", "content": "Create A.txt with 'ok' via tool.", }, ], tools=[t for t in tool_schema() if t["function"]["name"] == "write_file"], tool_choice="auto", stream=False, ) ch = r.get("choices", [{}])[0].get("message", {}) tc = ch.get("tool_calls") or [] if tc: table[0][1] = "PASS" table[0][2] = tc[0]["function"]["name"] except Exception: pass try: r = await client.chat( messages=[ { "role": "system", "content": ( 'Geef platte tekst met exact: ' '{\\"call_tool\\":{\\"name\\":\\"write_file\\",' '\\"arguments\\":{\\"path\\":\\"B.txt\\",' '\\"content\\":\\"ok\\"}}}' ), }, {"role": "user", "content": "Doe nu."}, ], stream=False, ) msg = r.get("choices", [{}])[0].get("message", {}).get("content", "") or "" items = find_call_tool_json(msg) if items: table[1][1] = "PASS" table[1][2] = str(len(items)) + " items" except Exception: pass await client.close() if RICH: t = Table(title=":probe — capability matrix", box=box.MINIMAL_DOUBLE_HEAD) t.add_column("Check") t.add_column("Status") t.add_column("Details") for row in table: t.add_row(*row) console.print(t) else: print(table) # --------------------------- UI / REPL --------------------------- HELP = """ :help Toon hulp :goal Stel Hoofddoel :status Toon status :history Toon laatste berichten :tasks Toon taken :add Voeg prompt toe aan BG-queue :queue Toon BG wachtrij :jobs Toon recente jobs :questions Toon openstaande ask_user-vragen (BG) :answer Beantwoord een ask_user-vraag :abort Annuleer queued/lopende job :start Start BG-worker :stop Stop BG-worker :bg on|off BG-mode aan/uit (input → queue of direct) :events on|off Events/ticks & tool-logs aan/uit :tick Heartbeat interval :v [0-3] | :quiet | :verbose | :debug :stream on|off SSE aan/uit :log http on|off HTTP dumps aan/uit :toolmode auto|native|content|off :dumb on|off Salvage aan/uit :probe Snelle capability-check :init Reset CONTEXT.json :exit Stop """.strip() def banner(): title = f"ConsoleX 2.4.13 — lokale console-agent voor je LLM Proxy" info = ( f"Model: {MODEL} • Proxy: {BASE_URL}\n" f"Verbosity: {VERBOSITY} (0 stil,1 normaal,2 verbose,3 debug)\n" f"BG-mode: {'aan' if BG_STATE.enabled else 'uit'} • " f"Tool mode: {TOOL_MODE} • Search: {SEARCH_BACKEND} • " f"Salvage: {1 if SALVAGE else 0} • Stream: {1 if STREAM_DEFAULT else 0} • " f"Events: {1 if EVENTS else 0}" ) if RICH: console.print(Panel(info, title=title, border_style="cyan")) else: print(title) print(info) def status_panel(): if RICH: t = Table(box=box.SIMPLE) t.add_column("Hoofddoel") t.add_column("Instellingen") t.add_row( CTX.get("goal", "(geen)"), f"Verb:{VERBOSITY} • BG:{'aan' if BG_STATE.enabled else 'uit'} " f"• Tool:{TOOL_MODE} • Stream:{1 if STREAM_DEFAULT else 0} " f"• Events:{1 if EVENTS else 0}", ) console.print(Panel(t, title="Status")) else: print(f"Hoofddoel: {CTX.get('goal')}") print( f"Verbosity:{VERBOSITY} BG:{'aan' if BG_STATE.enabled else 'uit'} " f"Tool:{TOOL_MODE} Stream:{1 if STREAM_DEFAULT else 0} " f"Events:{1 if EVENTS else 0}" ) async def ainput(prompt: str) -> str: def _read() -> str: try: return input(prompt) except EOFError: return ":exit" return await asyncio.to_thread(_read) async def repl(): global VERBOSITY, STREAM_DEFAULT, SALVAGE, TOOL_MODE, TICK_SECS, EVENTS, DEBUG_HTTP banner() if BG_STATE.enabled: start_bg() try: while True: try: line = (await ainput("\nConsoleX > : ")).strip() except KeyboardInterrupt: line = ":exit" if not line: continue # ----- BG / worker control ----- if line.strip() == ":start": start_bg() continue if line.strip() == ":stop": await stop_bg() continue if line.startswith(":bg "): val = line.split(" ", 1)[1].strip().lower() if val == "on": start_bg() else: await stop_bg() v(1, f"Background-mode: {'aan' if BG_STATE.enabled else 'uit'}") continue if line.startswith(":add "): prompt = line[5:].strip() job = add_job(prompt) v(1, f"Job toegevoegd: {job.id}") continue if line.strip() == ":queue": console.print(format_queue()) continue if line.strip() == ":jobs": console.print(format_jobs()) continue if line.strip() == ":questions": console.print(format_questions()) continue if line.startswith(":answer "): # :answer rest = line.split(" ", 1)[1].strip() if not rest: v(1, "Gebruik: :answer ") continue parts = rest.split(" ", 1) qid = parts[0].strip() answer = parts[1].strip() if len(parts) > 1 else "" meta = BG_STATE.pending_questions.get(qid) if not meta: v(1, f"Onbekende vraag-id: {qid}") continue fut = meta.get("future") if fut and not fut.done(): fut.set_result(answer) v(1, f"Antwoord verstuurd naar job {meta.get('job_id')} voor {qid}") BG_STATE.pending_questions.pop(qid, None) continue if line.startswith(":abort "): jid = line.split(" ", 1)[1].strip() j = BG_STATE.jobs.get(jid) if j: if j.status == "queued": j.status = "aborted" v(1, f"[{jid}] removed from queue") elif j.status == "running" and BG_STATE.current_task: BG_STATE.current_task.cancel() else: v(1, f"[{jid}] not found/finished") continue # ----- Info / status / config ----- if line.startswith(":help"): console.print(HELP) continue if line.startswith(":goal "): title = line[6:].strip() t_set_goal({"title": title}) v(1, f"Hoofddoel: {title}") continue if line.strip() == ":status": status_panel() continue if line.strip() == ":tasks": tasks = CTX.get("tasks") or {} if not tasks: console.print("(geen taken)") else: for tid, tsk in tasks.items(): console.print(f"- [{tsk.get('status')}] {tid} {tsk.get('title')}") continue if line.strip() == ":history": hist = CTX.get("history") or [] v(1, f"(laatste {len(hist)})") for m in hist[-10:]: who = m.get("role", "?") txt = (m.get("content", "") or "")[:100].replace("\n", " ") console.print(f"- {who}: {txt}{'…' if len(txt) >= 100 else ''}") continue if line.strip() == ":init": CTX["history"] = [] save_context(CTX) v(1, "CONTEXT.json gereset") continue if line.strip() == ":probe": await probe_capabilities() continue if line.startswith(":tick "): try: TICK_SECS = float(line.split()[1]) v(1, f"Tick: {TICK_SECS:.1f}s") except Exception: v(1, "Ongeldige waarde") continue if line.startswith(":events"): parts = line.split() if len(parts) == 1: v(1, f"Events: {'aan' if EVENTS else 'uit'}") else: val = parts[1].strip().lower() if val in ("on", "aan"): EVENTS = True elif val in ("off", "uit"): EVENTS = False v(1, f"Events: {'aan' if EVENTS else 'uit'}") continue if line.startswith(":v ") or line.strip() in (":quiet", ":verbose", ":debug"): if line.strip() == ":quiet": VERBOSITY = 0 elif line.strip() == ":verbose": VERBOSITY = 2 elif line.strip() == ":debug": VERBOSITY = 3 else: try: VERBOSITY = max(0, min(3, int(line.split()[1]))) except Exception: pass v(1, f"Verbosity: {VERBOSITY}") continue if line.startswith(":stream "): STREAM_DEFAULT = line.split(" ", 1)[1].strip().lower() == "on" v(1, f"Stream: {'aan' if STREAM_DEFAULT else 'uit'}") continue if line.startswith(":log http "): val = line.split(" ", 2)[2].strip().lower() DEBUG_HTTP = val == "on" v(1, f"HTTP-debug: {'aan' if DEBUG_HTTP else 'uit'}") continue if line.startswith(":toolmode "): TOOL_MODE = line.split(" ", 1)[1].strip().lower() v(1, f"Tool mode: {TOOL_MODE}") continue if line.startswith(":dumb "): SALVAGE = line.split(" ", 1)[1].strip().lower() == "on" v(1, f"Salvage: {'aan' if SALVAGE else 'uit'}") continue if line.strip() == ":exit": console.print("Afsluiten…") break # ---- Vrije prompt ---- if BG_STATE.enabled: job = add_job(line) v(1, f"[{job.id}] queued") continue else: client = OpenAIClient(BASE_URL, MODEL) try: res = await chat_with_tools( client, line, stream=STREAM_DEFAULT, print_events=EVENTS, ) if res: if RICH: console.print(Panel(res, title="Agent", border_style="green")) else: print(res) finally: await client.close() except Exception as e: console.print(str(e)) # --------------------------- Main -------------------------------- def main(): try: asyncio.run(repl()) except KeyboardInterrupt: pass if __name__ == "__main__": main()