2025-11-06 13:49:56 +00:00
|
|
|
|
# -------------------------------------------------------------
|
|
|
|
|
|
# queue_helper.py – minimalistisch thread‑based wachtrij‑manager
|
|
|
|
|
|
# -------------------------------------------------------------
|
|
|
|
|
|
import threading, queue, uuid, time
|
|
|
|
|
|
from typing import Callable, Any, Dict
|
|
|
|
|
|
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
|
# Configuratie – pas eventueel aan
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
|
USER_MAX_QUEUE = 20 # max. wachtende gebruikers
|
|
|
|
|
|
AGENT_MAX_QUEUE = 50 # max. wachtende agents (stil)
|
|
|
|
|
|
UPDATE_INTERVAL = 10.0 # sec tussen “position‑update” berichten
|
|
|
|
|
|
WORKER_TIMEOUT = 30.0 # max. tijd die een inference mag duren
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
class _Job:
|
|
|
|
|
|
__slots__ = ("job_id","payload","callback","created_at","event","result","error")
|
|
|
|
|
|
def __init__(self, payload: Dict, callback: Callable[[Dict], None]):
|
|
|
|
|
|
self.job_id = str(uuid.uuid4())
|
|
|
|
|
|
self.payload = payload
|
|
|
|
|
|
self.callback = callback
|
|
|
|
|
|
self.created_at = time.time()
|
|
|
|
|
|
self.event = threading.Event()
|
|
|
|
|
|
self.result = None
|
|
|
|
|
|
self.error = None
|
|
|
|
|
|
def set_success(self, answer: Dict):
|
|
|
|
|
|
self.result = answer
|
|
|
|
|
|
self.event.set()
|
|
|
|
|
|
self.callback(answer)
|
|
|
|
|
|
def set_error(self, exc: Exception):
|
|
|
|
|
|
self.error = str(exc)
|
|
|
|
|
|
self.event.set()
|
|
|
|
|
|
self.callback({"error": self.error})
|
|
|
|
|
|
|
|
|
|
|
|
class QueueManager:
|
|
|
|
|
|
"""Beheert één gedeelde queue + één worker‑thread."""
|
|
|
|
|
|
def __init__(self, model_infer_fn: Callable[[Dict], Dict]):
|
|
|
|
|
|
self._infer_fn = model_infer_fn
|
|
|
|
|
|
self._user_q = queue.Queue(maxsize=USER_MAX_QUEUE)
|
|
|
|
|
|
self._agent_q = queue.Queue(maxsize=AGENT_MAX_QUEUE)
|
|
|
|
|
|
self._shutdown = threading.Event()
|
|
|
|
|
|
self._worker = threading.Thread(target=self._run_worker,
|
|
|
|
|
|
daemon=True,
|
|
|
|
|
|
name="LLM‑worker")
|
|
|
|
|
|
self._worker.start()
|
|
|
|
|
|
|
|
|
|
|
|
# ---------- public API ----------
|
2025-11-20 15:16:00 +00:00
|
|
|
|
def enqueue_user(
|
|
|
|
|
|
self,
|
|
|
|
|
|
payload: Dict,
|
|
|
|
|
|
progress_cb: Callable[[Dict], None],
|
|
|
|
|
|
*,
|
|
|
|
|
|
notify_position: bool = False,
|
|
|
|
|
|
) -> tuple[str, int]:
|
2025-11-06 13:49:56 +00:00
|
|
|
|
job = _Job(payload, progress_cb)
|
|
|
|
|
|
try: self._user_q.put_nowait(job)
|
|
|
|
|
|
except queue.Full: raise RuntimeError(f"User‑queue vol (≥{USER_MAX_QUEUE})")
|
|
|
|
|
|
position = self._user_q.qsize()
|
2025-11-20 15:16:00 +00:00
|
|
|
|
if notify_position:
|
|
|
|
|
|
# start een aparte notifier-thread die periodiek de wachtrijpositie meldt
|
|
|
|
|
|
start_position_notifier(job, self._user_q)
|
2025-11-06 13:49:56 +00:00
|
|
|
|
return job.job_id, position
|
|
|
|
|
|
|
|
|
|
|
|
def enqueue_agent(self, payload: Dict, progress_cb: Callable[[Dict], None]) -> str:
|
|
|
|
|
|
job = _Job(payload, progress_cb)
|
|
|
|
|
|
try: self._agent_q.put_nowait(job)
|
|
|
|
|
|
except queue.Full: raise RuntimeError(f"Agent‑queue vol (≥{AGENT_MAX_QUEUE})")
|
|
|
|
|
|
return job.job_id
|
|
|
|
|
|
|
2025-11-20 15:16:00 +00:00
|
|
|
|
# ---------- sync helper voor agents/tools ----------
|
|
|
|
|
|
def request_agent_sync(self, payload: Dict, timeout: float = WORKER_TIMEOUT) -> Dict:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Gebruik dit voor interne calls (agents/tools).
|
|
|
|
|
|
- Job wordt in de agent-queue gezet (lagere prioriteit dan users).
|
|
|
|
|
|
- We wachten blokkerend tot de worker klaar is of tot timeout.
|
|
|
|
|
|
- Er worden GEEN wachtrij-meldingen ("U bent #...") verstuurd.
|
|
|
|
|
|
"""
|
|
|
|
|
|
result_box: Dict[str, Any] = {}
|
|
|
|
|
|
|
|
|
|
|
|
def _cb(msg: Dict):
|
|
|
|
|
|
# alleen het eindresultaat is interessant voor tools/agents
|
|
|
|
|
|
result_box["answer"] = msg
|
|
|
|
|
|
|
|
|
|
|
|
job = _Job(payload, _cb)
|
|
|
|
|
|
try:
|
|
|
|
|
|
self._agent_q.put_nowait(job)
|
|
|
|
|
|
except queue.Full:
|
|
|
|
|
|
raise RuntimeError(f"Agent-queue vol (≥{AGENT_MAX_QUEUE})")
|
|
|
|
|
|
|
|
|
|
|
|
ok = job.event.wait(timeout)
|
|
|
|
|
|
if not ok:
|
|
|
|
|
|
raise TimeoutError(f"LLM-inference duurde langer dan {timeout} seconden.")
|
|
|
|
|
|
if job.error:
|
|
|
|
|
|
raise RuntimeError(job.error)
|
|
|
|
|
|
return result_box.get("answer") or {}
|
2025-11-06 13:49:56 +00:00
|
|
|
|
# ---------- worker ----------
|
|
|
|
|
|
def _run_worker(self):
|
|
|
|
|
|
while not self._shutdown.is_set():
|
|
|
|
|
|
job = self._pop_job(self._user_q) or self._pop_job(self._agent_q)
|
|
|
|
|
|
if not job:
|
|
|
|
|
|
time.sleep(0.1)
|
|
|
|
|
|
continue
|
|
|
|
|
|
try:
|
|
|
|
|
|
answer = self._infer_fn(job.payload)
|
|
|
|
|
|
job.set_success(answer)
|
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
|
job.set_error(exc)
|
|
|
|
|
|
|
|
|
|
|
|
def _pop_job(self, q: queue.Queue):
|
|
|
|
|
|
try: return q.get_nowait()
|
|
|
|
|
|
except queue.Empty: return None
|
|
|
|
|
|
|
|
|
|
|
|
def stop(self):
|
|
|
|
|
|
self._shutdown.set()
|
|
|
|
|
|
self._worker.join(timeout=5)
|
|
|
|
|
|
|
2025-11-20 15:16:00 +00:00
|
|
|
|
def start_position_notifier(
|
|
|
|
|
|
job: _Job,
|
|
|
|
|
|
queue_ref: queue.Queue,
|
|
|
|
|
|
interval: float = UPDATE_INTERVAL,
|
|
|
|
|
|
):
|
2025-11-06 13:49:56 +00:00
|
|
|
|
"""Stuurt elke `interval` seconden een bericht met de huidige positie."""
|
|
|
|
|
|
def _notifier():
|
2025-11-20 15:16:00 +00:00
|
|
|
|
# Stop zodra het job-event wordt gezet (success/fout/timeout upstream)
|
|
|
|
|
|
while not job.event.wait(interval):
|
|
|
|
|
|
# Neem een snapshot van de queue-inhoud op een thread-safe manier
|
|
|
|
|
|
with queue_ref.mutex:
|
|
|
|
|
|
snapshot = list(queue_ref.queue)
|
2025-11-06 13:49:56 +00:00
|
|
|
|
try:
|
2025-11-20 15:16:00 +00:00
|
|
|
|
pos = snapshot.index(job) + 1 # 1-based
|
2025-11-06 13:49:56 +00:00
|
|
|
|
except ValueError:
|
2025-11-20 15:16:00 +00:00
|
|
|
|
# Job staat niet meer in de wachtrij → geen updates meer nodig
|
2025-11-06 13:49:56 +00:00
|
|
|
|
break
|
|
|
|
|
|
job.callback({"info": f"U bent #{pos} in de wachtrij. Even geduld…" })
|
|
|
|
|
|
t = threading.Thread(target=_notifier, daemon=True)
|
|
|
|
|
|
t.start()
|
|
|
|
|
|
return t
|