mistral-api/llm_client.py
2025-11-20 16:16:00 +01:00

132 lines
4.1 KiB
Python
Executable File

from __future__ import annotations
import os
import asyncio
import logging
from typing import List, Dict, Any, Optional
import httpx
from queue_helper import QueueManager
logger = logging.getLogger(__name__)
# -------------------------------------------------------------
# Config voor onderliggende LLM-backend
# -------------------------------------------------------------
# Dit is NIET jouw eigen /v1/chat/completions endpoint,
# maar de *echte* model-backend (bijv. Ollama, vLLM, Mistral server, etc.).
LLM_API_BASE = os.getenv("LLM_API_BASE", "http://127.0.0.1:11434")
LLM_DEFAULT_MODEL = os.getenv("LLM_MODEL", "gpt-4o-mini")
LLM_REQUEST_TIMEOUT = float(os.getenv("LLM_REQUEST_TIMEOUT", "120"))
# Deze wordt in app.py gezet via init_llm_client(...)
LLM_QUEUE: QueueManager | None = None
def init_llm_client(queue: QueueManager) -> None:
"""
Koppel de globale LLM_QUEUE aan de QueueManager uit app.py.
Deze MOET je in app.py één keer aanroepen.
"""
global LLM_QUEUE
LLM_QUEUE = queue
logger.info("llm_client: LLM_QUEUE gekoppeld via init_llm_client.")
def _sync_model_infer(payload: Dict[str, Any]) -> Dict[str, Any]:
"""
Synchronous call naar de echte LLM-backend.
Dit is de functie die je in app.py gebruikt bij het maken van de QueueManager.
"""
url = f"{LLM_API_BASE.rstrip('/')}/v1/chat/completions"
try:
with httpx.Client(timeout=LLM_REQUEST_TIMEOUT) as client:
resp = client.post(url, json=payload)
resp.raise_for_status()
return resp.json()
except Exception as exc:
logger.exception("LLM backend call failed: %s", exc)
return {
"object": "chat.completion",
"choices": [{
"index": 0,
"finish_reason": "error",
"message": {
"role": "assistant",
"content": f"[LLM-fout] {exc}",
},
}],
"usage": {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0,
},
}
async def _llm_call(
messages: List[Dict[str, str]],
*,
stream: bool = False,
temperature: float = 0.2,
top_p: float = 0.9,
max_tokens: Optional[int] = None,
model: Optional[str] = None,
**extra: Any,
) -> Dict[str, Any]:
"""
Centrale helper voor tools/agents/smart_rag/repo-agent.
Belangrijk:
- Gebruikt de *bestaande* QueueManager uit app.py (via init_llm_client).
- Stuurt jobs in de agent-queue (lagere prioriteit dan users).
- GEEN wachtrij-meldingen ("u bent #...") voor deze interne calls.
"""
if stream:
# In deze agent gebruiken we geen streaming.
raise NotImplementedError("_llm_call(stream=True) wordt momenteel niet ondersteund.")
if LLM_QUEUE is None:
# Hard fail: dan weet je meteen dat init_llm_client nog niet is aangeroepen.
raise RuntimeError("LLM_QUEUE is niet geïnitialiseerd. Roep init_llm_client(...) aan in app.py")
payload: Dict[str, Any] = {
"model": model or LLM_DEFAULT_MODEL,
"messages": messages,
"stream": False,
"temperature": float(temperature),
"top_p": float(top_p),
}
if max_tokens is not None:
payload["max_tokens"] = int(max_tokens)
payload.update(extra)
loop = asyncio.get_running_loop()
try:
# request_agent_sync blokkeert → naar threadpool
response: Dict[str, Any] = await loop.run_in_executor(
None, lambda: LLM_QUEUE.request_agent_sync(payload)
)
return response
except Exception as exc:
logger.exception("_llm_call via agent-queue failed: %s", exc)
return {
"object": "chat.completion",
"choices": [{
"index": 0,
"finish_reason": "error",
"message": {
"role": "assistant",
"content": f"[LLM-queue-fout] {exc}",
},
}],
"usage": {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0,
},
}