ConsoleX/ConsoleX_2_4_13.py

1904 lines
65 KiB
Python
Raw Permalink Normal View History

2026-02-23 15:19:52 +00:00
#!/usr/bin/env python3
# ConsoleX 2.4.13 — lokale console-agent voor je LLM Proxy
# --------------------------------------------------------
# Highlights t.o.v. 2.4.12:
# - ask_user werkt nu ook in BG-modus:
# * Tijdens een BG-job wordt de vraag met een ID getoond:
# [ask_user] job <jobid> vraag <qid>: <question>
# Jij kunt antwoorden met:
# :answer <qid> <tekst>
# De job wacht op dit antwoord en gaat daarna verder.
# - Foreground-modus (BG uit): ask_user vraagt nog steeds direct om input.
# - Verder alles van 2.4.12:
# * Uitgebreide salvage voor content-toolcalls (call_tool, directe toolkeys).
# * 5-stappen werkwijze in system prompt (web_search → inspect → plan → execute → verify).
# * run_shell ondersteunt optionele 'cwd'.
# * Korte toolcall-previews in de logging.
import asyncio
import html as _html
import json
import os
import re
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
from urllib.parse import urlencode, urlparse
import httpx
# -------- Optional rich UI --------
try:
from rich import box
from rich.console import Console
from rich.panel import Panel
from rich.prompt import Prompt
from rich.table import Table
RICH = True
console = Console()
except Exception:
RICH = False
class _C:
def print(self, *a, **k):
print(*a, **k)
console = _C()
# ---------------------------- Config -----------------------------
DEFAULT_UA = "ConsoleX/2.4.13 (httpx)"
BASE_URL = os.getenv("CONSOLEX_BASE_URL", "http://localhost:8080/v1").rstrip("/")
MODEL = os.getenv("CONSOLEX_MODEL", "magistral-24b")
TIMEOUT_S = int(os.getenv("CONSOLEX_TIMEOUT_S", "600"))
MAX_MSGS = int(os.getenv("CONSOLEX_MAX_MSGS", "24"))
MAX_TOKENS = int(os.getenv("CONSOLEX_MAX_TOKENS", "13000") or "13000")
VERBOSITY = max(0, min(3, int(os.getenv("CONSOLEX_VERBOSITY", "2"))))
EVENTS = os.getenv("CONSOLEX_EVENTS", "1") == "1" # ticks + tool-logs
DEBUG_HTTP = os.getenv("CONSOLEX_DEBUG_HTTP", "0") == "1"
STREAM_DEFAULT = os.getenv("CONSOLEX_STREAM", "1") == "1"
# Salvage staat standaard AAN
SALVAGE = os.getenv("CONSOLEX_SALVAGE", "1") == "1"
TOOL_MODE = os.getenv("CONSOLEX_TOOL_MODE", "auto").lower() # auto|native|content|off
TOOL_RETURN_ROLE = os.getenv("CONSOLEX_TOOL_RETURN_ROLE", "tool")
BAN_RM = os.getenv("CONSOLEX_BAN_RM", "1") == "1"
BACKGROUND_DEFAULT = os.getenv("CONSOLEX_BG", "1") == "1"
TICK_SECS = float(os.getenv("CONSOLEX_TICK_SECS", "3"))
CONTEXT_PATH = Path(os.getenv("CONSOLEX_CONTEXT", str(Path.cwd() / "CONTEXT.json")))
LOG_DIR = Path(".consolex_log")
LOG_DIR.mkdir(exist_ok=True)
SEARCH_BACKEND = os.getenv("CONSOLEX_SEARCH_BACKEND", "auto").lower()
SEARCH_PROXY_URL = os.getenv("CONSOLEX_SEARCH_PROXY", "").rstrip("/")
SERPAPI_KEY = os.getenv("SERPAPI_API_KEY", "")
GOOGLE_CSE_ID = os.getenv("GOOGLE_CSE_ID", "")
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "")
WEB_BLOCK_LOCAL = os.getenv("CONSOLEX_WEB_BLOCK_LOCAL", "1") == "1"
USER_AGENT = os.getenv("CONSOLEX_UA", DEFAULT_UA)
def v(level: int, *args):
if VERBOSITY >= level:
try:
console.print(*args)
except Exception:
print(*args, **{})
def dump_json(name: str, data: Any):
try:
(LOG_DIR / name).write_text(json.dumps(data, ensure_ascii=False, indent=2))
except Exception as e:
v(3, f"[log] write {name} failed: {e}")
def save_md(name: str, text: str):
try:
(LOG_DIR / name).write_text(text)
except Exception as e:
v(3, f"[log] write {name} failed: {e}")
# ------------------ Context (history/goal) -----------------------
def load_context() -> Dict[str, Any]:
if CONTEXT_PATH.exists():
try:
return json.loads(CONTEXT_PATH.read_text(encoding="utf-8"))
except Exception:
pass
return {"history": [], "goal": "Probeer te verifiëren of ik een werkzaam programma ben."}
def save_context(ctx: Dict[str, Any]):
hist = ctx.get("history", [])
if len(hist) > MAX_MSGS:
ctx["history"] = hist[-MAX_MSGS:]
CONTEXT_PATH.write_text(json.dumps(ctx, ensure_ascii=False, indent=2), encoding="utf-8")
CTX = load_context()
# ------------------- Regex & salvage parsers ---------------------
FENCE_RX = re.compile(r"```([a-z0-9_\-+.]*?)\s*\n(.*?)```", re.S | re.I)
# Alle bekende toolnamen (voor salvage)
TOOL_NAMES = [
"read_file",
"write_file",
"append_file",
"list_tree",
"search_text",
"run_shell",
"ask_user",
"set_goal",
"add_task",
"update_task",
"rag_upsert",
"rag_search",
"web_search",
"web_get",
]
def html_unescape_multi(s: str, rounds: int = 3) -> str:
"""Herhaald HTML-unescape (voor &amp;#34; → \" enz.)."""
if not s:
return s
for _ in range(rounds):
ns = _html.unescape(s)
if ns == s:
break
s = ns
return s
def _extract_tools_from_obj(obj: Any, sink: List[dict]):
"""
Recursief tools extraheren uit een JSON-structuur.
Ondersteunt o.a.:
- {"call_tool":{"name":"run_shell","arguments":{...}}}
- {"call_tool":"run_shell","commands":["cmd1","cmd2",...]}
- {"run_shell":{"command":"..."}}
- lijsten met bovenstaande varianten.
"""
if isinstance(obj, dict):
# 1) call_tool als dict (OpenAI-stijl)
if "call_tool" in obj and isinstance(obj["call_tool"], dict):
c = obj["call_tool"]
name = c.get("name")
args = c.get("arguments") or {}
if name:
sink.append({"name": name, "arguments": args})
# 1b) call_tool als string + commands/command
elif "call_tool" in obj and isinstance(obj["call_tool"], str):
name = obj["call_tool"]
cmds: List[Dict[str, Any]] = []
if isinstance(obj.get("commands"), list) and obj["commands"]:
for cmd in obj["commands"]:
cmds.append({"command": str(cmd)})
elif "command" in obj:
cmds.append({"command": str(obj["command"])})
else:
cmds.append({})
if name:
for a in cmds:
sink.append({"name": name, "arguments": a})
# 2) directe toolnamen als key
for tname in TOOL_NAMES:
if tname in obj and isinstance(obj[tname], dict):
sink.append({"name": tname, "arguments": obj[tname]})
elif isinstance(obj, list):
for it in obj:
_extract_tools_from_obj(it, sink)
def find_call_tool_json(content: str) -> List[dict]:
"""
Zoek naar JSON-varianten met tools, zowel plain als HTML-escaped,
eventueel in codefences. Geeft lijst terug met items:
{"name": "<toolnaam>", "arguments": {...}}.
"""
found: List[dict] = []
if not content:
return found
# 1) volledige content als JSON proberen (plain + multi-unescaped)
for candidate in (content, html_unescape_multi(content)):
try:
obj = json.loads(candidate)
except Exception:
continue
_extract_tools_from_obj(obj, found)
# 2) fenced JSON/TOOL/OPENAI blokken (inclusief lege taal == onbekend)
for lang, body in FENCE_RX.findall(content):
lang_l = (lang or "").lower().strip()
if lang_l in ("json", "tool", "openai", "chatml", ""):
s = html_unescape_multi(body.strip())
try:
obj = json.loads(s)
except Exception:
continue
_extract_tools_from_obj(obj, found)
return found
def find_toolcall_xml_like(content: str) -> List[dict]:
"""
Parse pseudo-XML <toolcall name="..." args={...}/> (ook HTML-escaped varianten).
Geeft lijst van {name, arguments}-dicts terug.
"""
txt = html_unescape_multi(content)
results: List[dict] = []
# match <toolcall ...> met attributen tussen <toolcall en afsluiter
for m in re.finditer(r"<toolcall\s+([^>/]*?(?:\{.*?\})?[^>]*)/?>", txt, re.I | re.S):
attrs = m.group(1) or ""
# name="..."
mname = re.search(r'name\s*=\s*"([^"]+)"', attrs, re.I)
if not mname:
mname = re.search(r"name\s*=\s*'([^']+)'", attrs, re.I)
name = mname.group(1) if mname else None
# args={...}
marg = re.search(r"args\s*=\s*({.*})", attrs, re.I | re.S)
args_dict: Dict[str, Any] = {}
if marg:
raw = marg.group(1)
try:
args_dict = json.loads(raw)
except Exception:
# fallback: single quotes → double, nogmaals unescape
try:
fixed = html_unescape_multi(raw.replace("'", '"'))
args_dict = json.loads(fixed)
except Exception:
args_dict = {}
if name:
results.append({"name": name, "arguments": args_dict})
return results
# -------------------------- Tools impl ---------------------------
from pathlib import Path as _P
def _safe_path(p: str) -> _P:
base = Path.cwd().resolve()
path = (base / p).resolve()
if not str(path).startswith(str(base)):
raise ValueError("Pad buiten projectmap")
return path
def t_read_file(args: dict) -> str:
return _safe_path(args["path"]).read_text(encoding="utf-8")
def t_write_file(args: dict) -> dict:
p = _safe_path(args["path"])
p.parent.mkdir(parents=True, exist_ok=True)
if not p.exists() and not args.get("create", True):
raise FileNotFoundError(str(p))
content = args["content"]
p.write_text(content, encoding="utf-8")
return {"ok": True, "path": str(p), "bytes": len(content)}
def t_append_file(args: dict) -> dict:
p = _safe_path(args["path"])
p.parent.mkdir(parents=True, exist_ok=True)
with p.open("a", encoding="utf-8") as f:
f.write(args["content"])
return {"ok": True, "path": str(p), "bytes_appended": len(args["content"])}
def _is_bin(p: _P) -> bool:
try:
with p.open("rb") as f:
return b"\0" in f.read(1024)
except Exception:
return False
def t_list_tree(args: dict) -> dict:
max_depth = int(args.get("max_depth", 3))
max_entries = int(args.get("max_entries", 4000))
root = Path.cwd()
lines = ["./"]
count = 0
for path in sorted(root.rglob("*")):
if path == root:
continue
rel = path.relative_to(root)
depth = len(rel.parts)
if depth > max_depth:
continue
prefix = " " * (depth - 1) if depth > 0 else ""
lines.append(prefix + str(rel))
count += 1
if count >= max_entries:
break
return {"tree": "\n".join(lines), "truncated": count >= max_entries, "lines": len(lines)}
def t_search_text(args: dict) -> dict:
pat = re.compile(args["pattern"])
glob_pat = args.get("glob", "**/*")
ignore_bin = bool(args.get("ignore_bin", True))
results = []
for p in Path.cwd().glob(glob_pat):
if p.is_file():
if ignore_bin and _is_bin(p):
continue
try:
txt = p.read_text(encoding="utf-8", errors="ignore")
for m in pat.finditer(txt):
line_no = txt[: m.start()].count("\n") + 1
results.append(
{
"path": str(p),
"line": line_no,
"match": txt[m.start() : m.end()][:200],
}
)
except Exception:
continue
return {"count": len(results), "results": results[:500]}
async def t_run_shell(args: dict) -> dict:
"""
Voert een shell-commando uit.
Ondersteunt:
- command: string (vereist)
- timeout: int (default 600)
- cwd: optioneel werkdirectory (relatief aan projectroot, of absoluut)
"""
cmd = args["command"]
timeout = int(args.get("timeout", 600))
# Optionele cwd
cwd_arg = args.get("cwd")
workdir: Optional[Path] = None
if isinstance(cwd_arg, str) and cwd_arg.strip():
p = Path(cwd_arg).expanduser()
if not p.is_absolute():
# relatief aan projectmap
p = _safe_path(cwd_arg)
workdir = p
if BAN_RM and re.search(r"\brm\s+-", cmd):
return {
"exit_code": 1,
"stdout": "",
"stderr": "rm is verboden",
"duration": 0.0,
"log_path": "",
}
start = time.time()
proc = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(workdir) if workdir is not None else None,
)
try:
out, err = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
try:
proc.kill()
except Exception:
pass
return {
"exit_code": -1,
"stdout": "",
"stderr": "timeout",
"duration": time.time() - start,
"log_path": "",
}
return {
"exit_code": proc.returncode,
"stdout": out.decode(errors="ignore"),
"stderr": err.decode(errors="ignore"),
"duration": time.time() - start,
"cwd": str(workdir) if workdir is not None else "",
"log_path": "",
}
# -------- ask_user met BG-ondersteuning --------
async def t_ask_user(args: dict) -> dict:
"""
- In foreground (geen actieve BG-job): stel direct een vraag via stdin.
- In BG-job: toon vraag + qid en wacht op :answer <qid> <antwoord>.
"""
q = args["question"]
loop = asyncio.get_running_loop()
# BG-job: vraag via :answer
job = BG_STATE.active_job
if job is not None:
BG_STATE.q_counter += 1
qid = f"q{BG_STATE.q_counter:04d}"
fut: asyncio.Future = loop.create_future()
BG_STATE.pending_questions[qid] = {
"job_id": job.id,
"question": q,
"future": fut,
}
msg = (
f"[ask_user] job {job.id} vraag {qid}: {q}\n"
f"Antwoord met: :answer {qid} <tekst>"
)
if RICH:
console.print(msg)
else:
print(msg)
try:
answer = await fut
except asyncio.CancelledError:
return {"answer": "", "note": f"cancelled ({qid})"}
return {"answer": answer, "id": qid}
# Foreground: gewoon via stdin
if RICH:
ans = await asyncio.to_thread(Prompt.ask, f"[bold cyan]Vraag[/] {q}")
else:
def _read() -> str:
try:
return input(f"Vraag {q}\n> ")
except EOFError:
return ""
ans = await asyncio.to_thread(_read)
return {"answer": ans}
def t_set_goal(args: dict) -> dict:
CTX["goal"] = args["title"]
save_context(CTX)
return {"ok": True, "title": args["title"]}
def t_add_task(args: dict) -> dict:
tasks = CTX.setdefault("tasks", {})
tid = f"{int(time.time()*1000)%0xffffffff:08x}"
tasks[tid] = {
"title": args["title"],
"status": "doing",
"parent_id": args.get("parent_id"),
"notes": "",
}
save_context(CTX)
return {"id": tid, "title": args["title"], "status": "doing"}
def t_update_task(args: dict) -> dict:
t = CTX.setdefault("tasks", {}).get(args["task_id"])
if not t:
return {"ok": False, "error": "unknown task_id"}
if args.get("status"):
t["status"] = args["status"]
if args.get("notes"):
t["notes"] = args["notes"]
save_context(CTX)
return {"ok": True, "task": {"id": args["task_id"], **t}}
def t_rag_upsert(args: dict) -> dict:
rag = Path(".rag")
rag.mkdir(exist_ok=True)
out = rag / f"{args['namespace']}__{args['doc_id']}.txt"
out.write_text(
json.dumps({"text": args["text"], "meta": args.get("meta") or {}}, ensure_ascii=False, indent=2),
encoding="utf-8",
)
return {"ok": True, "path": str(out), "bytes": len(args["text"])}
def t_rag_search(args: dict) -> dict:
"""
Eenvoudige RAG-zoektool:
- zoekt in .rag/<namespace>__*.txt
- leest JSON: {"text": ..., "meta": {...}}
- doet simpele substring- of regex-search in 'text'
"""
namespace = (args.get("namespace") or "").strip()
query = (args.get("query") or "").strip()
top_k = int(args.get("top_k", 5))
regex = bool(args.get("regex", False))
if not namespace:
return {"ok": False, "error": "missing namespace"}
if not query:
return {"ok": False, "error": "missing query"}
rag_dir = Path(".rag")
if not rag_dir.exists():
return {"ok": True, "count": 0, "results": []}
prefix = f"{namespace}__"
results = []
pattern = None
if regex:
try:
pattern = re.compile(query, re.I)
except Exception as e:
return {"ok": False, "error": f"invalid regex: {e}"}
q_lower = query.lower()
for p in rag_dir.glob(f"{prefix}*.txt"):
try:
raw = p.read_text(encoding="utf-8", errors="ignore")
data = json.loads(raw)
text = data.get("text") or ""
if not isinstance(text, str):
continue
except Exception:
# kapotte of niet-JSON bestanden overslaan
continue
if regex:
matches = list(pattern.finditer(text))
if not matches:
continue
score = len(matches)
first = matches[0].start()
else:
txt_lower = text.lower()
score = txt_lower.count(q_lower)
if score == 0:
continue
first = txt_lower.find(q_lower)
start = max(0, first - 80)
end = min(len(text), first + 80)
snippet = text[start:end].replace("\n", " ")
# doc_id afleiden uit bestandsnaam: namespace__docid(.txt)
name = p.name
rest = name[len(prefix) :]
if rest.endswith(".txt"):
rest = rest[:-4]
doc_id = rest
results.append(
{
"namespace": namespace,
"doc_id": doc_id,
"path": str(p),
"score": score,
"snippet": snippet,
}
)
results.sort(key=lambda r: r["score"], reverse=True)
top = results[: max(1, top_k)]
return {"ok": True, "count": len(results), "results": top}
# -------------------- Web get/search tools -----------------------
from ipaddress import ip_address # noqa: F401
_PRIVATE_PREFIX = (
"localhost",
"127.",
"0.",
"10.",
"172.16.",
"172.17.",
"172.18.",
"172.19.",
"172.2",
"192.168.",
"169.254.",
"[::1]",
"::1",
)
def _looks_private(host: Optional[str]) -> bool:
if not host:
return True
h = (host or "").lower()
return any(h.startswith(p) for p in _PRIVATE_PREFIX)
try:
from bs4 import BeautifulSoup
except Exception:
BeautifulSoup = None
def _html_to_text(html: str) -> str:
if not html:
return ""
if BeautifulSoup:
try:
soup = BeautifulSoup(html, "html.parser")
for t in soup(["script", "style", "noscript"]):
t.decompose()
main = soup.find(["article", "main"]) or soup.body or soup
text = main.get_text(separator="\n")
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
except Exception:
pass
txt = re.sub(r"<[^>]+>", " ", html)
txt = re.sub(r"\s{2,}", " ", txt)
return txt.strip()
async def t_web_get(args: dict) -> dict:
url = args.get("url") or ""
max_bytes = int(args.get("max_bytes", 200_000))
timeout_s = int(args.get("timeout", 20))
if not url:
return {"error": "missing url"}
try:
p = urlparse(url)
if WEB_BLOCK_LOCAL and _looks_private(p.hostname):
return {"error": f"blocked local url: {url}"}
async with httpx.AsyncClient(
headers={"User-Agent": USER_AGENT}, follow_redirects=True, timeout=timeout_s
) as client:
r = await client.get(url)
text = r.text or ""
if len(text) > max_bytes:
text = text[:max_bytes]
title_m = re.search(r"<title[^>]*>(.*?)</title>", r.text or "", re.S | re.I)
title = title_m.group(1).strip() if title_m else ""
return {
"url": str(r.url),
"status": r.status_code,
"title": title,
"text": _html_to_text(text),
}
except Exception as e:
return {"error": f"{type(e).__name__}: {e}"}
async def _search_via_proxy(
query: str, max_results: int, recency_days: Optional[int], site: Optional[str]
) -> Optional[dict]:
if not SEARCH_PROXY_URL:
return None
payload = {"q": query, "max_results": max_results}
if recency_days:
payload["recency_days"] = recency_days
if site:
payload["site"] = site
url = SEARCH_PROXY_URL + "/web/search"
try:
async with httpx.AsyncClient(headers={"User-Agent": USER_AGENT}, timeout=30.0) as client:
r = await client.post(url, json=payload)
if r.status_code == 200:
return r.json()
except Exception:
pass
return None
async def _search_via_google_cse(query: str, max_results: int) -> List[dict]:
if not (GOOGLE_API_KEY and GOOGLE_CSE_ID):
return []
try:
params = {
"key": GOOGLE_API_KEY,
"cx": GOOGLE_CSE_ID,
"q": query,
"num": min(max_results, 10),
}
async with httpx.AsyncClient(headers={"User-Agent": USER_AGENT}, timeout=30.0) as client:
r = await client.get("https://www.googleapis.com/customsearch/v1", params=params)
r.raise_for_status()
data = r.json()
out = []
for it in data.get("items", []):
out.append(
{"title": it.get("title", ""), "url": it.get("link", ""), "snippet": it.get("snippet", "")}
)
return out[:max_results]
except Exception:
return []
async def _search_via_serpapi(query: str, max_results: int) -> List[dict]:
if not SERPAPI_KEY:
return []
try:
params = {"engine": "google", "q": query, "api_key": SERPAPI_KEY, "num": min(max_results, 10)}
async with httpx.AsyncClient(headers={"User-Agent": USER_AGENT}, timeout=30.0) as client:
r = await client.get("https://serpapi.com/search.json", params=params)
r.raise_for_status()
data = r.json()
rows = []
for it in (data.get("organic_results") or []):
rows.append(
{
"title": it.get("title", ""),
"url": it.get("link", ""),
"snippet": it.get("snippet", "") or "",
}
)
return rows[:max_results]
except Exception:
return []
async def _search_via_duckhtml(query: str, max_results: int) -> List[dict]:
try:
params = {"q": query}
url = "https://duckduckgo.com/html/?" + urlencode(params)
async with httpx.AsyncClient(
headers={"User-Agent": USER_AGENT}, timeout=30.0, follow_redirects=True
) as client:
r = await client.get(url)
r.raise_for_status()
html = r.text
results = []
if BeautifulSoup:
soup = BeautifulSoup(html, "html.parser")
for res in soup.select(".result"):
a = res.select_one("a.result__a") or res.select_one("a")
if not a or not a.get("href"):
continue
title = a.get_text(strip=True)
href = a.get("href")
snip = res.select_one(".result__snippet")
snippet = snip.get_text(" ", strip=True) if snip else ""
results.append({"title": title, "url": href, "snippet": snippet})
if len(results) >= max_results:
break
else:
for m in re.finditer(
r'<a[^>]+class="[^"]*result__a[^"]*"[^>]+href="([^"]+)"[^>]*>(.*?)</a>',
html,
re.I | re.S,
):
href = m.group(1)
title = re.sub(r"<[^>]+>", "", m.group(2)).strip()
results.append({"title": title, "url": href, "snippet": ""})
if len(results) >= max_results:
break
return results
except Exception:
return []
async def t_web_search(args: dict) -> dict:
query = (args.get("query") or "").strip()
if not query:
return {"error": "missing query"}
max_results = int(args.get("max_results", 5))
recency_days = args.get("recency_days")
site = args.get("site")
backend = (args.get("backend") or SEARCH_BACKEND or "auto").lower()
if backend in ("auto", "proxy"):
data = await _search_via_proxy(query, max_results, recency_days, site)
if data and isinstance(data, dict) and data.get("results"):
return {"backend": "proxy", "results": data["results"][:max_results]}
if backend in ("auto", "google") and GOOGLE_API_KEY and GOOGLE_CSE_ID:
rows = await _search_via_google_cse(query, max_results)
if rows:
return {"backend": "google", "results": rows}
if backend in ("auto", "serpapi") and SERPAPI_KEY:
rows = await _search_via_serpapi(query, max_results)
if rows:
return {"backend": "serpapi", "results": rows}
# duckhtml fallback
q = query if not site else f"site:{site} {query}"
rows = await _search_via_duckhtml(q, max_results)
if rows:
return {"backend": "duckhtml", "results": rows}
return {"backend": backend, "results": [], "warning": "no results"}
# -------------------- OpenAI-style client ------------------------
class OpenAIClient:
def __init__(self, base_url: str, model: str):
self.base_url = base_url
self.model = model
self._client = httpx.AsyncClient(timeout=TIMEOUT_S)
async def close(self):
await self._client.aclose()
async def chat(
self,
messages: List[dict],
tools: Optional[List[dict]] = None,
tool_choice: Optional[str] = "auto",
stream: bool = False,
on_event=None,
) -> dict:
payload = {"model": self.model, "messages": messages, "max_tokens": MAX_TOKENS}
if tools is not None:
payload["tools"] = tools
if tool_choice:
payload["tool_choice"] = tool_choice
if stream:
payload["stream"] = True
if DEBUG_HTTP:
dump_json("last_request.json", payload)
url = f"{self.base_url}/chat/completions"
async def _post(_payload):
if not stream:
r = await self._client.post(url, json=_payload)
r.raise_for_status()
return r.json()
tool_builders: Dict[int, dict] = {}
content_parts: List[str] = []
async with self._client.stream("POST", url, json=_payload) as resp:
resp.raise_for_status()
async for chunk in resp.aiter_text():
for line in chunk.splitlines():
s = line.strip()
if not s.startswith("data:"):
continue
data = s[5:].strip()
if data == "[DONE]":
break
try:
evt = json.loads(data)
except Exception:
continue
for ch in evt.get("choices", []):
delta = ch.get("delta", {})
if on_event and delta:
try:
on_event(delta)
except Exception:
pass
dc = delta.get("content")
if dc:
content_parts.append(dc)
if "tool_calls" in delta:
for t in delta["tool_calls"]:
idx = t.get("index", 0)
cur = tool_builders.setdefault(
idx,
{
"type": "function",
"function": {"name": "", "arguments": ""},
},
)
if t.get("id"):
cur["id"] = t["id"]
if t.get("type"):
cur["type"] = t["type"]
fn = t.get("function") or {}
if "name" in fn and fn["name"]:
cur["function"]["name"] = fn["name"]
if "arguments" in fn and fn["arguments"]:
cur["function"]["arguments"] += fn["arguments"]
tool_calls = []
for k in sorted(tool_builders.keys()):
b = tool_builders[k]
tool_calls.append(b)
return {
"id": "streamed",
"object": "chat.completion",
"model": self.model,
"choices": [
{
"index": 0,
"finish_reason": "stop",
"message": {
"role": "assistant",
"content": "".join(content_parts) if content_parts else None,
"tool_calls": tool_calls or None,
},
}
],
}
try:
res = await _post(payload)
if DEBUG_HTTP:
dump_json("last_response.json", res)
return res
except (httpx.ReadError, httpx.ProtocolError, httpx.HTTPStatusError):
# fallback zonder stream
if stream:
payload.pop("stream", None)
r = await self._client.post(url, json=payload)
r.raise_for_status()
res = r.json()
if DEBUG_HTTP:
dump_json("last_response.json", res)
return res
raise
# -------------------- Tool schemas (OpenAI) ----------------------
def tool_schema() -> List[dict]:
return [
{
"type": "function",
"function": {
"name": "read_file",
"description": "Lees een tekstbestand binnen de projectmap.",
"parameters": {
"type": "object",
"properties": {"path": {"type": "string"}},
"required": ["path"],
},
},
},
{
"type": "function",
"function": {
"name": "write_file",
"description": "Schrijf/overschrijf een tekstbestand binnen de projectmap.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string"},
"content": {"type": "string"},
"create": {"type": "boolean", "default": True},
},
"required": ["path", "content"],
},
},
},
{
"type": "function",
"function": {
"name": "append_file",
"description": "Voeg tekst toe aan een bestand binnen de projectmap.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string"},
"content": {"type": "string"},
},
"required": ["path", "content"],
},
},
},
{
"type": "function",
"function": {
"name": "list_tree",
"description": "Geef een boomoverzicht van de projectmap terug.",
"parameters": {
"type": "object",
"properties": {
"max_depth": {"type": "integer", "default": 3},
"max_entries": {"type": "integer", "default": 4000},
},
},
},
},
{
"type": "function",
"function": {
"name": "search_text",
"description": "Zoek regex in tekstbestanden (glob patroon).",
"parameters": {
"type": "object",
"properties": {
"pattern": {"type": "string"},
"glob": {"type": "string", "default": "**/*"},
"ignore_bin": {"type": "boolean", "default": True},
},
"required": ["pattern"],
},
},
},
{
"type": "function",
"function": {
"name": "run_shell",
"description": "Voer een shell-commando uit (stdout/stderr).",
"parameters": {
"type": "object",
"properties": {
"command": {"type": "string"},
"timeout": {"type": "integer", "default": 600},
"cwd": {
"type": "string",
"description": "Optioneel werkdirectory (relatief of absoluut).",
},
},
"required": ["command"],
},
},
},
{
"type": "function",
"function": {
"name": "ask_user",
"description": "Stel een gerichte vraag aan de gebruiker. "
"In BG-modus wacht de agent op een :answer <qid> <antwoord> van de gebruiker.",
"parameters": {
"type": "object",
"properties": {"question": {"type": "string"}},
"required": ["question"],
},
},
},
{
"type": "function",
"function": {
"name": "set_goal",
"description": "Stel of wijzig het Hoofddoel.",
"parameters": {
"type": "object",
"properties": {"title": {"type": "string"}},
"required": ["title"],
},
},
},
{
"type": "function",
"function": {
"name": "add_task",
"description": "Voeg een (sub)doel / taak toe.",
"parameters": {
"type": "object",
"properties": {
"title": {"type": "string"},
"parent_id": {"type": "string"},
},
"required": ["title"],
},
},
},
{
"type": "function",
"function": {
"name": "update_task",
"description": "Werk status/notes bij.",
"parameters": {
"type": "object",
"properties": {
"task_id": {"type": "string"},
"status": {"type": "string"},
"notes": {"type": "string"},
},
"required": ["task_id"],
},
},
},
{
"type": "function",
"function": {
"name": "rag_upsert",
"description": "Sla tekst op in RAG of archiveer lokaal.",
"parameters": {
"type": "object",
"properties": {
"namespace": {"type": "string"},
"doc_id": {"type": "string"},
"text": {"type": "string"},
"meta": {"type": "object"},
},
"required": ["namespace", "doc_id", "text"],
},
},
},
{
"type": "function",
"function": {
"name": "rag_search",
"description": "Zoek in lokaal opgeslagen RAG-tekst.",
"parameters": {
"type": "object",
"properties": {
"namespace": {"type": "string"},
"query": {"type": "string"},
"top_k": {"type": "integer", "default": 5},
"regex": {"type": "boolean", "default": False},
},
"required": ["namespace", "query"],
},
},
},
{
"type": "function",
"function": {
"name": "web_search",
"description": "Zoek op het web via proxy/google/serpapi/duckhtml.",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string"},
"max_results": {"type": "integer", "default": 5},
"recency_days": {"type": "integer"},
"site": {"type": "string"},
"backend": {
"type": "string",
"enum": ["auto", "proxy", "google", "serpapi", "duckhtml", "off"],
},
},
"required": ["query"],
},
},
},
{
"type": "function",
"function": {
"name": "web_get",
"description": "Haal raw tekst op van een URL.",
"parameters": {
"type": "object",
"properties": {
"url": {"type": "string"},
"max_bytes": {"type": "integer", "default": 200000},
"timeout": {"type": "integer", "default": 20},
},
"required": ["url"],
},
},
},
]
async def dispatch_tool(name: str, args: dict) -> str:
# Kleine preview-log per tool
try:
preview = name
if isinstance(args, dict):
if name in ("write_file", "read_file", "append_file") and "path" in args:
preview += f" {args['path']}"
elif name == "run_shell" and "command" in args:
c = str(args["command"]).replace("\n", " ")
if len(c) > 60:
c = c[:57] + "..."
preview += f" {c}"
v(1, preview)
except Exception:
pass
try:
if name == "read_file":
return t_read_file(args)
if name == "write_file":
return json.dumps(t_write_file(args), ensure_ascii=False)
if name == "append_file":
return json.dumps(t_append_file(args), ensure_ascii=False)
if name == "list_tree":
return json.dumps(t_list_tree(args), ensure_ascii=False)
if name == "search_text":
return json.dumps(t_search_text(args), ensure_ascii=False)
if name == "run_shell":
return json.dumps(await t_run_shell(args), ensure_ascii=False)
if name == "ask_user":
return json.dumps(await t_ask_user(args), ensure_ascii=False)
if name == "set_goal":
return json.dumps(t_set_goal(args), ensure_ascii=False)
if name == "add_task":
return json.dumps(t_add_task(args), ensure_ascii=False)
if name == "update_task":
return json.dumps(t_update_task(args), ensure_ascii=False)
if name == "rag_upsert":
return json.dumps(t_rag_upsert(args), ensure_ascii=False)
if name == "rag_search":
return json.dumps(t_rag_search(args), ensure_ascii=False)
if name == "web_search":
return json.dumps(await t_web_search(args), ensure_ascii=False)
if name == "web_get":
return json.dumps(await t_web_get(args), ensure_ascii=False)
return json.dumps({"ok": False, "error": f"unknown tool {name}"}, ensure_ascii=False)
except Exception as e:
return json.dumps({"ok": False, "error": str(e)}, ensure_ascii=False)
def append_tool_result(
msgs: List[dict], tool_call_id: str, name: str, content: str, role_pref: str
):
if role_pref == "tool":
msgs.append(
{
"role": "tool",
"tool_call_id": tool_call_id,
"name": name,
"content": content,
}
)
else:
msgs.append(
{
"role": "user",
"content": f'<tool_result id="{tool_call_id}" name="{name}">\n{content}\n</tool_result>',
}
)
# --------------------- System & history helpers ------------------
def system_preamble() -> List[dict]:
rules = (
"Je bent ConsoleX, een autonome console-agent die code kan lezen/bewerken, "
"shell-commando's uitvoert, en doelen beheert.\n"
"Basisregels:\n"
"1) Werk binnen de projectmap (cwd + submappen) tenzij expliciet anders aangegeven.\n"
"2) Gebruik tools doelgericht. Lees shell-output en stuur bij.\n"
"3) Houd 1 Hoofddoel aan + (sub)doelen; update status klaar/blocked/done.\n"
"4) Context kort; archiveer lange stukken met rag_upsert.\n"
"5) Vraag verduidelijking met ask_user; zie user-answers niet als nieuw hoofddoel.\n"
"6) rm-commando's met flags zijn niet toegestaan.\n"
"7) Bij creëer/maak/schrijf: gebruik write_file met volledige inhoud & pad in huidig project.\n"
"8) Bij aanpassing zonder pad: list_tree/search_text → read_file → write_file.\n"
"9) Test met run_shell en interpreteer output (apt/pip/docker/etc).\n"
"10) Voor recente/online info: gebruik web_search en eventueel web_get voor detail.\n"
"\n"
"Werkmethode voor complexe, stapsgewijze taken (installaties, Docker, builds, config):\n"
"11) Volg steeds deze cyclus:\n"
" a) Oriëntatie: gebruik web_search om kort te kijken hoe dit normaal gaat (maximaal enkele resultaten).\n"
" b) Inspectie: onderzoek de huidige situatie met list_tree, read_file en run_shell (bijv. ls, docker ps, git status).\n"
" c) Plan: maak een beknopt stappenplan in bullet points voordat je tools gebruikt (kort, max ~10 regels).\n"
" d) Uitvoering: voer de stappen één voor één uit met tools (run_shell, write_file, etc.).\n"
" e) Controle: na elke belangrijke stap een check (bijv. opnieuw ls, docker ps, tests) om te verifiëren dat de stap gelukt is.\n"
"12) Geef geen 'simulaties' van toolcalls in tekst als tools beschikbaar zijn. Gebruik echte tool_calls. "
"Alleen als je echt geen tools hebt, mag je uitleg geven zonder tool_calls én zeg je dat expliciet.\n"
)
if TOOL_MODE == "native":
rules += (
"13) Gebruik het tools-mechanisme van de API (native tool_calls). "
"Stop géén tool-aanroepen in normale tekst of codeblokken; gebruik altijd tools.\n"
)
elif TOOL_MODE == "content":
rules += (
"13) Gebruik géén native tool_calls. Encodeer tool-aanroepen als JSON in de content "
'met een sleutel zoals \"call_tool\".\n'
)
elif TOOL_MODE == "off":
rules += "13) Gebruik geen tools; antwoord uitsluitend in tekst.\n"
else: # auto
rules += (
"13) Geef de voorkeur aan native tool_calls van de API. Alleen als dat duidelijk niet werkt, "
'mag je tool-aanroepen als JSON in de content geven met \"call_tool\" of <toolcall>.\n'
)
state = f"STATE\nHoofddoel: {CTX.get('goal', '(geen)')}\nLaatste taken:"
return [
{"role": "system", "content": rules},
{"role": "system", "content": state},
]
def build_history(extra: Optional[List[dict]] = None) -> List[dict]:
hist = system_preamble() + (CTX.get("history") or [])
if extra:
hist += extra
return hist[-MAX_MSGS:]
# ----------------------- Chat core --------------------------------
async def chat_with_tools(
client: OpenAIClient,
user_text: str,
stream: bool,
print_events: bool = True,
) -> str:
history = build_history([{"role": "user", "content": user_text}])
role_pref = TOOL_RETURN_ROLE
tools = tool_schema() if TOOL_MODE in ("native", "auto", "content") else None
def on_event(delta: dict):
if not print_events:
return
if "tool_calls" in delta:
for t in delta["tool_calls"]:
fnn = ((t.get("function") or {}).get("name")) or ""
if fnn:
v(1, f"→ tool {fnn} (delta)")
final_assistant = ""
for _ in range(8): # max 8 rondes tool-use
res = await client.chat(
history,
tools=tools,
tool_choice=("auto" if tools and TOOL_MODE != "off" else None),
stream=stream,
on_event=on_event,
)
msg = (res.get("choices") or [{}])[0].get("message", {})
# 1) Native tool_calls via API
if msg.get("tool_calls"):
if print_events and EVENTS:
for tc in msg["tool_calls"]:
fnn = ((tc.get("function") or {}).get("name")) or ""
if fnn:
v(1, f"→ tool {fnn} (native)")
for tc in msg["tool_calls"]:
fn = ((tc.get("function") or {}).get("name")) or ""
args_json = (tc.get("function") or {}).get("arguments") or "{}"
try:
args = json.loads(args_json)
except Exception:
args = {}
try:
m = re.findall(r'"(\w+)"\s*:\s*"([^"]*)"', args_json)
if m:
args = {k: v for (k, v) in m}
except Exception:
args = {}
out = await dispatch_tool(fn, args)
append_tool_result(history, tc.get("id", "tool1"), fn, out, role_pref)
continue
# 2) Geen native tools → salvage (JSON + <toolcall ...>)
content = (msg.get("content") or "").strip()
if content and SALVAGE:
items = find_call_tool_json(content)
if not items:
items = find_toolcall_xml_like(content)
if items:
for idx, it in enumerate(items, start=1):
fn = it.get("name", "")
if not fn:
continue
args = it.get("arguments") or {}
if isinstance(args, str):
try:
args = json.loads(args)
except Exception:
args = {}
v(1, f"[salvage] {fn}")
out = await dispatch_tool(fn, args)
append_tool_result(history, f"salvage{idx}", fn, out, role_pref)
# na salvage: opnieuw naar model met tool_result in history
continue
# 3) Plain tekst-antwoord
final_assistant = content
break
CTX["history"] = (CTX.get("history") or []) + [{"role": "user", "content": user_text}]
if final_assistant:
CTX["history"].append({"role": "assistant", "content": final_assistant})
save_context(CTX)
if final_assistant:
save_md("last_assistant.md", final_assistant)
return final_assistant or ""
# ---------------------- BG runtime (queue) ------------------------
@dataclass
class Job:
id: str
prompt: str
status: str = "queued" # queued|running|done|aborted|error
result: str = ""
err: str = ""
created: float = field(default_factory=time.time)
started: float = 0.0
finished: float = 0.0
class BGState:
def __init__(self):
self.enabled = BACKGROUND_DEFAULT
self.queue: asyncio.Queue[Job] = asyncio.Queue()
self.worker_task: Optional[asyncio.Task] = None
self.current_task: Optional[asyncio.Task] = None
self.jobs: Dict[str, Job] = {}
# Nieuw: actieve job + pending ask_user-vragen
self.active_job: Optional[Job] = None
self.pending_questions: Dict[str, Dict[str, Any]] = {}
self.q_counter: int = 0
BG_STATE = BGState()
def bg_is_running() -> bool:
return BG_STATE.worker_task is not None and not BG_STATE.worker_task.done()
async def heartbeat(job: Job):
k = 0
while job.status == "running":
await asyncio.sleep(TICK_SECS)
k += 1
if EVENTS and TICK_SECS > 0:
v(1, f"[{job.id}] … tick {k}")
async def bg_worker():
client = OpenAIClient(BASE_URL, MODEL)
try:
while True:
try:
if not BG_STATE.enabled and BG_STATE.queue.empty():
break
job: Job = await BG_STATE.queue.get()
except asyncio.CancelledError:
break
if job.status == "aborted":
BG_STATE.queue.task_done()
continue
BG_STATE.jobs[job.id] = job
BG_STATE.active_job = job
job.status = "running"
job.started = time.time()
v(1, f"[{job.id}] start: {job.prompt}")
hb = asyncio.create_task(heartbeat(job))
try:
task = asyncio.create_task(
chat_with_tools(
client,
job.prompt,
stream=STREAM_DEFAULT,
print_events=EVENTS,
)
)
BG_STATE.current_task = task
res = await task
job.result = res
job.status = "done"
job.finished = time.time()
if res:
if RICH:
console.print(Panel(res, title=f"Agent (job {job.id})", border_style="green"))
else:
print(f"\n[Agent {job.id}] {res}")
except asyncio.CancelledError:
job.status = "aborted"
job.finished = time.time()
v(1, f"[{job.id}] aborted")
# openstaande vragen voor deze job annuleren
for qid, meta in list(BG_STATE.pending_questions.items()):
if meta.get("job_id") == job.id:
fut = meta.get("future")
if fut and not fut.done():
fut.set_exception(asyncio.CancelledError())
BG_STATE.pending_questions.pop(qid, None)
break
except Exception as e:
job.status = "error"
job.err = str(e)
job.finished = time.time()
v(1, f"[{job.id}] error: {e}")
finally:
BG_STATE.current_task = None
BG_STATE.active_job = None
hb.cancel()
try:
await hb
except Exception:
pass
BG_STATE.queue.task_done()
finally:
await client.close()
def start_bg():
BG_STATE.enabled = True
if not bg_is_running():
BG_STATE.worker_task = asyncio.create_task(bg_worker())
v(1, "[bg] worker gestart")
async def stop_bg():
BG_STATE.enabled = False # queue mag leeglopen maar niks nieuws
if bg_is_running():
v(1, "[bg] worker stoppen…")
BG_STATE.worker_task.cancel()
try:
await BG_STATE.worker_task
except asyncio.CancelledError:
pass
BG_STATE.worker_task = None
v(1, "[bg] worker gestopt")
def add_job(prompt: str) -> Job:
jid = f"{int(time.time()*1000)%0xffffffff:08x}"
job = Job(id=jid, prompt=prompt)
BG_STATE.jobs[jid] = job
BG_STATE.queue.put_nowait(job)
if BG_STATE.enabled:
start_bg()
return job
def format_queue() -> str:
pending = [j for j in BG_STATE.jobs.values() if j.status in ("queued", "running")]
lines = [f"queue size: {BG_STATE.queue.qsize()} (pending: {len(pending)})"]
for j in sorted(pending, key=lambda x: x.created):
lines.append(f"- {j.id} [{j.status}] {j.prompt[:80]}")
return "\n".join(lines)
def format_jobs() -> str:
rows = sorted(BG_STATE.jobs.values(), key=lambda x: x.created, reverse=True)[:30]
lines = []
for j in rows:
lines.append(f"- {j.id} [{j.status}] {j.prompt[:80]}")
return "\n".join(lines) if lines else "(geen jobs)"
def format_questions() -> str:
if not BG_STATE.pending_questions:
return "(geen openstaande vragen)"
lines = []
for qid, meta in BG_STATE.pending_questions.items():
lines.append(f"- {qid} (job {meta.get('job_id')}): {meta.get('question')}")
return "\n".join(lines)
# -------------------------- Probe --------------------------------
async def probe_capabilities():
client = OpenAIClient(BASE_URL, MODEL)
table = [
["A native tool_calls", "FAIL", ""],
["B content JSON", "FAIL", "0 items"],
]
try:
r = await client.chat(
messages=[
{"role": "system", "content": "You can call tools."},
{
"role": "user",
"content": "Create A.txt with 'ok' via tool.",
},
],
tools=[t for t in tool_schema() if t["function"]["name"] == "write_file"],
tool_choice="auto",
stream=False,
)
ch = r.get("choices", [{}])[0].get("message", {})
tc = ch.get("tool_calls") or []
if tc:
table[0][1] = "PASS"
table[0][2] = tc[0]["function"]["name"]
except Exception:
pass
try:
r = await client.chat(
messages=[
{
"role": "system",
"content": (
'Geef platte tekst met exact: '
'{\\"call_tool\\":{\\"name\\":\\"write_file\\",'
'\\"arguments\\":{\\"path\\":\\"B.txt\\",'
'\\"content\\":\\"ok\\"}}}'
),
},
{"role": "user", "content": "Doe nu."},
],
stream=False,
)
msg = r.get("choices", [{}])[0].get("message", {}).get("content", "") or ""
items = find_call_tool_json(msg)
if items:
table[1][1] = "PASS"
table[1][2] = str(len(items)) + " items"
except Exception:
pass
await client.close()
if RICH:
t = Table(title=":probe — capability matrix", box=box.MINIMAL_DOUBLE_HEAD)
t.add_column("Check")
t.add_column("Status")
t.add_column("Details")
for row in table:
t.add_row(*row)
console.print(t)
else:
print(table)
# --------------------------- UI / REPL ---------------------------
HELP = """
:help Toon hulp
:goal <titel> Stel Hoofddoel
:status Toon status
:history Toon laatste berichten
:tasks Toon taken
:add <prompt> Voeg prompt toe aan BG-queue
:queue Toon BG wachtrij
:jobs Toon recente jobs
:questions Toon openstaande ask_user-vragen (BG)
:answer <qid> <txt> Beantwoord een ask_user-vraag
:abort <jobid> Annuleer queued/lopende job
:start Start BG-worker
:stop Stop BG-worker
:bg on|off BG-mode aan/uit (input queue of direct)
:events on|off Events/ticks & tool-logs aan/uit
:tick <secs> Heartbeat interval
:v [0-3] | :quiet | :verbose | :debug
:stream on|off SSE aan/uit
:log http on|off HTTP dumps aan/uit
:toolmode <m> auto|native|content|off
:dumb on|off Salvage aan/uit
:probe Snelle capability-check
:init Reset CONTEXT.json
:exit Stop
""".strip()
def banner():
title = f"ConsoleX 2.4.13 — lokale console-agent voor je LLM Proxy"
info = (
f"Model: {MODEL} • Proxy: {BASE_URL}\n"
f"Verbosity: {VERBOSITY} (0 stil,1 normaal,2 verbose,3 debug)\n"
f"BG-mode: {'aan' if BG_STATE.enabled else 'uit'}"
f"Tool mode: {TOOL_MODE} • Search: {SEARCH_BACKEND}"
f"Salvage: {1 if SALVAGE else 0} • Stream: {1 if STREAM_DEFAULT else 0}"
f"Events: {1 if EVENTS else 0}"
)
if RICH:
console.print(Panel(info, title=title, border_style="cyan"))
else:
print(title)
print(info)
def status_panel():
if RICH:
t = Table(box=box.SIMPLE)
t.add_column("Hoofddoel")
t.add_column("Instellingen")
t.add_row(
CTX.get("goal", "(geen)"),
f"Verb:{VERBOSITY} • BG:{'aan' if BG_STATE.enabled else 'uit'} "
f"• Tool:{TOOL_MODE} • Stream:{1 if STREAM_DEFAULT else 0} "
f"• Events:{1 if EVENTS else 0}",
)
console.print(Panel(t, title="Status"))
else:
print(f"Hoofddoel: {CTX.get('goal')}")
print(
f"Verbosity:{VERBOSITY} BG:{'aan' if BG_STATE.enabled else 'uit'} "
f"Tool:{TOOL_MODE} Stream:{1 if STREAM_DEFAULT else 0} "
f"Events:{1 if EVENTS else 0}"
)
async def ainput(prompt: str) -> str:
def _read() -> str:
try:
return input(prompt)
except EOFError:
return ":exit"
return await asyncio.to_thread(_read)
async def repl():
global VERBOSITY, STREAM_DEFAULT, SALVAGE, TOOL_MODE, TICK_SECS, EVENTS, DEBUG_HTTP
banner()
if BG_STATE.enabled:
start_bg()
try:
while True:
try:
line = (await ainput("\nConsoleX > : ")).strip()
except KeyboardInterrupt:
line = ":exit"
if not line:
continue
# ----- BG / worker control -----
if line.strip() == ":start":
start_bg()
continue
if line.strip() == ":stop":
await stop_bg()
continue
if line.startswith(":bg "):
val = line.split(" ", 1)[1].strip().lower()
if val == "on":
start_bg()
else:
await stop_bg()
v(1, f"Background-mode: {'aan' if BG_STATE.enabled else 'uit'}")
continue
if line.startswith(":add "):
prompt = line[5:].strip()
job = add_job(prompt)
v(1, f"Job toegevoegd: {job.id}")
continue
if line.strip() == ":queue":
console.print(format_queue())
continue
if line.strip() == ":jobs":
console.print(format_jobs())
continue
if line.strip() == ":questions":
console.print(format_questions())
continue
if line.startswith(":answer "):
# :answer <qid> <tekst>
rest = line.split(" ", 1)[1].strip()
if not rest:
v(1, "Gebruik: :answer <qid> <tekst>")
continue
parts = rest.split(" ", 1)
qid = parts[0].strip()
answer = parts[1].strip() if len(parts) > 1 else ""
meta = BG_STATE.pending_questions.get(qid)
if not meta:
v(1, f"Onbekende vraag-id: {qid}")
continue
fut = meta.get("future")
if fut and not fut.done():
fut.set_result(answer)
v(1, f"Antwoord verstuurd naar job {meta.get('job_id')} voor {qid}")
BG_STATE.pending_questions.pop(qid, None)
continue
if line.startswith(":abort "):
jid = line.split(" ", 1)[1].strip()
j = BG_STATE.jobs.get(jid)
if j:
if j.status == "queued":
j.status = "aborted"
v(1, f"[{jid}] removed from queue")
elif j.status == "running" and BG_STATE.current_task:
BG_STATE.current_task.cancel()
else:
v(1, f"[{jid}] not found/finished")
continue
# ----- Info / status / config -----
if line.startswith(":help"):
console.print(HELP)
continue
if line.startswith(":goal "):
title = line[6:].strip()
t_set_goal({"title": title})
v(1, f"Hoofddoel: {title}")
continue
if line.strip() == ":status":
status_panel()
continue
if line.strip() == ":tasks":
tasks = CTX.get("tasks") or {}
if not tasks:
console.print("(geen taken)")
else:
for tid, tsk in tasks.items():
console.print(f"- [{tsk.get('status')}] {tid} {tsk.get('title')}")
continue
if line.strip() == ":history":
hist = CTX.get("history") or []
v(1, f"(laatste {len(hist)})")
for m in hist[-10:]:
who = m.get("role", "?")
txt = (m.get("content", "") or "")[:100].replace("\n", " ")
console.print(f"- {who}: {txt}{'' if len(txt) >= 100 else ''}")
continue
if line.strip() == ":init":
CTX["history"] = []
save_context(CTX)
v(1, "CONTEXT.json gereset")
continue
if line.strip() == ":probe":
await probe_capabilities()
continue
if line.startswith(":tick "):
try:
TICK_SECS = float(line.split()[1])
v(1, f"Tick: {TICK_SECS:.1f}s")
except Exception:
v(1, "Ongeldige waarde")
continue
if line.startswith(":events"):
parts = line.split()
if len(parts) == 1:
v(1, f"Events: {'aan' if EVENTS else 'uit'}")
else:
val = parts[1].strip().lower()
if val in ("on", "aan"):
EVENTS = True
elif val in ("off", "uit"):
EVENTS = False
v(1, f"Events: {'aan' if EVENTS else 'uit'}")
continue
if line.startswith(":v ") or line.strip() in (":quiet", ":verbose", ":debug"):
if line.strip() == ":quiet":
VERBOSITY = 0
elif line.strip() == ":verbose":
VERBOSITY = 2
elif line.strip() == ":debug":
VERBOSITY = 3
else:
try:
VERBOSITY = max(0, min(3, int(line.split()[1])))
except Exception:
pass
v(1, f"Verbosity: {VERBOSITY}")
continue
if line.startswith(":stream "):
STREAM_DEFAULT = line.split(" ", 1)[1].strip().lower() == "on"
v(1, f"Stream: {'aan' if STREAM_DEFAULT else 'uit'}")
continue
if line.startswith(":log http "):
val = line.split(" ", 2)[2].strip().lower()
DEBUG_HTTP = val == "on"
v(1, f"HTTP-debug: {'aan' if DEBUG_HTTP else 'uit'}")
continue
if line.startswith(":toolmode "):
TOOL_MODE = line.split(" ", 1)[1].strip().lower()
v(1, f"Tool mode: {TOOL_MODE}")
continue
if line.startswith(":dumb "):
SALVAGE = line.split(" ", 1)[1].strip().lower() == "on"
v(1, f"Salvage: {'aan' if SALVAGE else 'uit'}")
continue
if line.strip() == ":exit":
console.print("Afsluiten…")
break
# ---- Vrije prompt ----
if BG_STATE.enabled:
job = add_job(line)
v(1, f"[{job.id}] queued")
continue
else:
client = OpenAIClient(BASE_URL, MODEL)
try:
res = await chat_with_tools(
client,
line,
stream=STREAM_DEFAULT,
print_events=EVENTS,
)
if res:
if RICH:
console.print(Panel(res, title="Agent", border_style="green"))
else:
print(res)
finally:
await client.close()
except Exception as e:
console.print(str(e))
# --------------------------- Main --------------------------------
def main():
try:
asyncio.run(repl())
except KeyboardInterrupt:
pass
if __name__ == "__main__":
main()