From 6f533fc15a9032faad6efba87b608109eb9428d9 Mon Sep 17 00:00:00 2001 From: admin Date: Mon, 2 Feb 2026 11:03:25 +0100 Subject: [PATCH] Initial proxy MVP --- Dockerfile | 18 + README.md | 40 ++ requirements.txt | 5 + run.sh | 17 + src/queuegate_proxy/__init__.py | 0 .../__pycache__/app.cpython-311.pyc | Bin 0 -> 28533 bytes src/queuegate_proxy/app.py | 472 ++++++++++++++++++ 7 files changed, 552 insertions(+) create mode 100644 Dockerfile create mode 100644 README.md create mode 100644 requirements.txt create mode 100755 run.sh create mode 100644 src/queuegate_proxy/__init__.py create mode 100644 src/queuegate_proxy/__pycache__/app.cpython-311.pyc create mode 100644 src/queuegate_proxy/app.py diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..ceb3166 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,18 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Runtime deps +RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates \ + && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt ./ +RUN pip install --no-cache-dir -r requirements.txt + +COPY src ./src + +ENV PYTHONUNBUFFERED=1 +EXPOSE 8080 + +CMD ["uvicorn", "src.queuegate_proxy.app:app", "--host", "0.0.0.0", "--port", "8080"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..ce46ef0 --- /dev/null +++ b/README.md @@ -0,0 +1,40 @@ +# QueueGate + +QueueGate is an OpenAI-compatible **LLM proxy** that: +- routes requests to multiple upstream OpenAI-compatible backends (e.g. llama.cpp) +- provides a simple **priority queue** (user > agent) +- supports **stream** and **non-stream** (no fake streaming) +- supports **sticky worker affinity** (same chat -> same upstream when possible) + +## Quick start + +### 1) Configure + +Minimal env: +- `LLM_UPSTREAMS` (comma-separated URLs) + - e.g. `http://llama0:8000/v1/chat/completions,http://llama1:8000/v1/chat/completions` + +Optional: +- `LLM_MAX_CONCURRENCY` (defaults to number of upstreams) +- `STICKY_HEADER` (default: `X-Chat-Id`) +- `AFFINITY_TTL_SEC` (default: `60`) +- `QUEUE_NOTIFY_USER` = `auto|always|never` (default: `auto`) +- `QUEUE_NOTIFY_MIN_MS` (default: `1200`) + +### 2) Run + +```bash +uvicorn queuegate_proxy.app:app --host 0.0.0.0 --port 8080 +``` + +### 3) Health + +`GET /healthz` + +### 4) OpenAI endpoint + +`POST /v1/chat/completions` + +## Notes +- Tool calls are detected and suppressed in streaming output (to prevent leakage). +- This first version is a **proxy-only MVP**; tool execution can be wired in later. diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..bb9702b --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +fastapi==0.115.8 +uvicorn[standard]==0.34.0 +httpx==0.27.2 +pydantic==2.10.6 +python-dotenv==1.0.1 diff --git a/run.sh b/run.sh new file mode 100755 index 0000000..8b0cb36 --- /dev/null +++ b/run.sh @@ -0,0 +1,17 @@ +export HTTP_PROXY=http://192.168.100.2:8118 +export HTTPS_PROXY=http://192.168.100.2:8118 +export http_proxy=http://192.168.100.2:8118 +export https_proxy=http://192.168.100.2:8118 +docker build -t queuegate:proxy . --build-arg http_proxy=http://192.168.100.2:8118 --build-arg https_proxy=http://192.168.100.2:8118 + +docker run --rm -p 8081:8080 \ + -e LLM_UPSTREAMS="http://192.168.100.1:8000/v1/chat/completions,http://192.168.100.1:8001/v1/chat/completions" \ + -e AFFINITY_TTL_SEC=240 \ + -e HTTP_PROXY=http://192.168.100.2:8118 \ + -e HTTPS_PROXY=http://192.168.100.2:8118 \ + -e NO_PROXY=localhost,127.0.0.1,192.168.100.1 \ + -e QUEUE_NOTIFY_USER=auto \ + -e QUEUE_NOTIFY_MIN_MS=1200 \ + -e LLM_READ_TIMEOUT=3600 \ + -e TEXT_TOOLCALL_DETECT=1 \ + queuegate:proxy diff --git a/src/queuegate_proxy/__init__.py b/src/queuegate_proxy/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/queuegate_proxy/__pycache__/app.cpython-311.pyc b/src/queuegate_proxy/__pycache__/app.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..c2bbea5e6865c36eb60a1fbcdb7dec49ab23e31f GIT binary patch literal 28533 zcmc(I32+?OnO^ry&pkM201WOS4iZBW;3@Gw0p27ik)TDvmc}4@NPq+eQ1?J2;eid? z@(R$RqLBg>AuU~+B(@e>b8E4?PQp}@3T?|?rfO5wjp_|1T|tW3#9QlBQh|!vL{ZM~ ze?3H6?_! zDb^$1CRW-8l(w;;v~6N=&6FXu;|8SMjMfREb(&z`B6f<+Xyq3CTGvRm(Of5_q;@cs z>AG9ogr40hwz1x-##chfxjpTs-rEd!$C~+XK^v~hF~3bk9__dl>9?`^wo`p0Z68_D z-}o6%Z<~dSl#3nWwg=k_+G4xN_w7I(Hstb*>=bt#L_&n@LP(n_WG970wkLO|vo;yx zu8(+p7eC_>yTsjZ2nD$J+`6fsTb26xRjHpx>OGnK^G$J2LF%=7!XoZnmE+!ZbKF;u zBjLeXxeDU`Rk`j%%r@ZA0rbp%@nFQCPH7q;==nnkKY$WYv#xr?!{!p87t6#WWCTIzvB8mO-;i$W85W)j1*Ty1p3?J~Gy?n9zQbZmdiOLbZ>~c(s^bHS2Z)Chq>egEk~zn#63x@aGsTm~QV!?j-Bd~CRB_7VeS6Pad#3lz z?0fsjTSrt+o#v@ig8LQEenr^-6iGQ#V^hsoW4{i|U%#BoqY(yShKakQqXMKNM>xh%t=iM3h_Xm!Qsfq zoD_u9sE<85ayuN8r4W3`S1txXbvz|A`=$@f98iQx7JKb7>_p^(GGxbzxePb6BO_B9 zt1gU<4T`#r;BMVI5V<2!o$?EJbL3ihP?S30-GZM?IBgVe|MbC`gPNyS5o%ciR4sBR z)}Q@4B2xFWmbwGWdZmnWYmZrx7cCF0=4s4~VoAz-)nl$T#<0-+g>5W$y?w84AG{GA zks{~@dxWq{#wd0S^vMH5gV%IH9_ZVFfN1}S7|~s53~}%VQ8r%MhL6Zpu^{U*={Z;# zpw^P9ZKz4gXAaH@A9_CUBu_1zp7W@ILt5aF>N%`=4lj95FM3X^p0k?gtRkFU-JhA7 zJe@^m_qYg0ENidj3Ky|PguIE_Ch}~`w)EN~Lc}IonM>$(M4WKhn9JVlVqp&Ea`w7e zn2WjGy&mTBFc;WbO#J5ji&zL4-AD;?EHUDt6-#x`3q+hcN21pUZ|JVEQII?kc=-$0 z)%MPTzF7MiQ7@8XgZ;Peh6f^jVnq54CDu#(u3sOF4#w_=W3i#I9O>7EzOmSdUdCu( zI64v=ynZ)4CL{iWfcQR>KPHgyu>6}+3<|vz-xI?$fJVk*lvB4}90Peo($^zWcuX2P zsrv~D!{poFH#8I$L7w|#D7-Tqj`j^l!eQMV4iArrV?$*3hQqIo^$i&zl7otCMYZ&j zaJVmulJ(JKD2Kxm)kHc>pqao?0>=q-5ja7>N#G=aE-;#H9!b(Z3MQ}>Klyh6CiDKD zl=u>zGj|msfL9d)PpVrcFC=y;LJMA1Xi3#ID1txHuLv+3RH5NXeX}A|BxFT^*{ll9 zsYY0;=6qR?Cw0Me5|}|%2tH|Po9s@Up1VAIK@r;UszO_8!$y2?4qt(}QDqPog91=# z`|-;@w9Fz*efmw%lJ&;3$eTy$D8}*HBiuYvC%8B#m&B+O(vN60=kz^Z6y89rB+z8_ zBihWdy9#FX2|nh@rHb=w^A@7roC`|tc$U&ryjW%#iqD8n(PgGx>wM3>>U*AN`Cc-w zopQA@x2UN+Ua_d4Jo%+97E203^XPbv^Abx}mG+tX7Olg@XzEBmVwqTO{@w%MM;Jgq zp+UqRP#k}^UUdHah47UZF85qIaqPlnog3e7@(2#fK?(flpcD~d&e@f1v4xSe@q=U zWLR`d$9A0`lW7VL8k5Ku?t??%xKZfQ!{d!ZBcN>kjI?$f92x=bk`Kd@iEtF@G`|(I75h+r59pq}R+qwvNmO301?JOqUNo~W5mr5|nx|FqY?&`#a_?Mp?^NBp zHTUkx&XlK^7|4tKa&e7PdqFMk){46oXLpKqPqOoO&;0!}^ZkFX`$ygA4z={CR(f>u z%&$F_sq)(8s_NOh$wswmhgP*isXDsw^`**Fi%{Lt9ch1RrEYPkCrRsT?~FDVv7i(g_@W6 z8G~zyfP)bQ=I4N~Ghe)e5XQ?yD8yP2V$bHoIGhN$9nndCq6@qZd+Un!p(XX9=7GfUp6>EGC^N4?6)`G#YHw zO|so0Ky+UkB0$)rJNo3i(f+{^-F|WmGJ=Q{C12Z%v}%?{DB36iqOk_6LMub5F9R;X zB74|4p2S!-{+@VCCeKZt`|i0^X$4p(ysA)y9a(NHRaBN}PRNP2nOCve!K(_jsZ!tM#mS4`y_l*Xo-NI`mDfz3oa$7B8oa7d zld5f;JTbKwtlc3+XvC`ujj2sr5a)EFXRdipo@<+ZRr7-V+JaXVwyfB>VC&r2!b?AW zRcY)}8@sf|uF2C=H)f)W(1lkOx>60zbCHEpKfRzd99J8TYYoR)gkeQEj#m|qr)q=X z^AJ1D_Zb$Wov4XSEFJX-Jx{6Ya)VdoBUaG@)U-xyH#2cKNFW8su^gg}2Bm0+-6?t@ z&lY-JFiD2(7F|HxBHjLyfpFGA@s94zEEb4I*X{j7gOO;gKSMX@^E5Fs&hh~(Mx2jx zM39XqZh;HEnRZ#>qWJ8|=NE5gbNmQlIWaim6V9>c8RKWGEP_V}XL&8=B3xyU2tkIi zu6Ied;Um#9SGr3;A`k=6?XN-7*4FEN_8BjZ7RD$_D#l1uq2r-T`2s;+HH(45t&$~rhb%bRyRX#aTI zgKf;QW#@xKA0K&e1P*VieFsIXO>Nx{r)vs-2Cfi=GO9hMWlES>lf)2YiFR$R5ce@=+`%$R9Xn?fwmYR3cP>4Cz}iblYq4;CMuelm7(3 z$Gr3%7nYbjhW{Gd+TT3Ou#gd>}(xjC%C3}AK zjiuH-i>-Uq)_q#*KDA`uqZaM>73I)39=`bSMdjoUgDpB)K4nw(e$2=gVa8 zHi_T}FTao9Qxd_Q<9Iad+Utb%q$Kfz)s~2nlWVfy;eKd&U@>#rz~}5ZAq+-CjysI$ zp`8R&v=|QA7zXPeW6}u|it0jG8jBhT`fZdpWMOS9{TA#dwuXKc0CLz~Z{g_)^t}&$`Iryrl@OzLnTXJk$GH>abODYy@ zBbeLjW%Toe_1o5{(=c!I8JRVk1hNwf>zgbyCknS14oNj8yz=F~nzNZ}iQcI;;nK6E zxwWcatUUKJ7Y%1h+l+eTW7gkZd2gGjbMl$Wdz*Fz%y`7gdpo-$VC)Md_?f>5*}8jl z2WUSzUUEPX(7=!GSG?vkRiE)=AoO;-#*U1O4e|K$CXaJW)cHM8-hsg9$2jEP^vmofx7#ir9k^)pj{1Y(*oNR z_CA5PcPY@Z80b&~+qJ-UC9oZ}s|r#{t8*ab4aU&$sg@i@2ZfaChyp{`|nPj;;>tCm|cWKIyQ*>yuqpvL8Nt;t==C zL++Dq>#tlq!6N6WR_m`STTWF7zuM1}{XjMBf8r;8%SGKb5Q~l8(LKj#@z==~3DP8m{(!&?0U9)g%RXYABm&p) zlPQ|9;3K$VaoBdub*^wQ=4EC)JfR)Dz-)z}oVUHo&mk6>Yd|}1yTbenpuOIP&Rz#F zK9Chpx{jjhHsY^j6DK3UA3O~A^dtcIYQBqmr5>BwToUfpPvK7Mgf(uJgt)ce!h@Zg zX~->0cvir5^(5d{nB3X{JSbG010n(}q^{QW8<#rD$m;xTqY|z=nNOfGIxZ z^}T)Wt#fnyboWd*oE|S4%Td0+EUWp zQe<=_%siwYA&&GmK*-9Pnh7SEu$@TcHD&5Apel$KoE7hT&`*G|o~bIG-L(Y04~?blrUCy%EbE~RLv;-EKG zQacq-c`Bz4um;zy{nu|u5oFp=mB>8AKr%FY3k_*rXV7{(Ps)!)mT0{pm|01NW|p{h zeJu-7FiXcE^|vMSOOPo^;lgm8J%rVJF*m@>h>joRDW9#U@gX|RWf5KXtrG%N7V_BN zo3>1W;*c4No(EpDI@N?N+6ZyT5a_!2Q%w{SR(|=Kh&DD`8wl`(A7fG9}Hf&!;h8U9SPS6q=xG4XI z*iP){gFgZ;K7d2|eFA>~05Ln%f8-G<7KTtz*1eIa453=MPwpQa9JdVYhipAbkxB5F z!(&~zDMRPQCXNk5dOk~GM5fuscgQZylbsP3!Zb!zNQMy+b!+TSOt!<5Bo`UkMX7=K zIpL)Aj}YVU;V1tE0M>-g%9OJt!U5&zWyRU^smLx`;5B zrAkVtoDi1zs_wPD+cvjHEpOJ!o0W1JGG*Hr%eJd!J2fy171j5yzI%1xK*@lzoJIj~$@mS~wCoEfC>DJ$ub1TfWu2ENylklzW-hE%2~A=4cA zvx8N~{oK#})yKD6f4+?e?1yjm5z@A&)YxW;9As`5$k+tYBidlvV3G{Z4qZ|QBy~>c zfjQxJK?lt(T15eFlFfOD&V{_jEKJ>Intf;xFtYIPV0n|xKht@_w^-f)&dF~~zS$UY zKDU}R55ETmVhVgAJNTx!>mWF9IPO_xSYYVC^?BtS0ygOTJJB#L_Zt&5=tp(ZKd!U1s~w!sdNl zAif5(@QcV==Le-rFb%mB^)*Jwq@h-F8Lsh7IZ;toy0_Mr;6FiRc?-bj*b{8+H{PGM zJlgQ+#UEdve{q3d*!|Oe>c&Ib#zWa)Hama=V?+0MkXE`6pgVdDl?p?d;h&P1D-siH!`-)70J6@ZZPpX%B!QJj!3NGX{+Ih1X;`3iS`of`fbp%p&~@ z{?RHwG$JU{d1&ZT!FC^OqEs4V#UO}Xkpm1W*kr)Qqp=&yV+2{?# z&X>R1n766Q%;#qmZzn*y${@ajOT`U~#SLn4P%929&UIf_J?RG0ng|>9F_EUH1FQHQ zb4`fV&JE|aS;#^;E8~Vv$E}P&2op94&}`!%Hu)l)95KAb@JNCZc3kMU+_9wH0h2}w zm)mGDle}A~1hA+q3I;z~92er2>!uRNnNC=om|1?p4sn$OrJ(tjJ;3En6VABvSu{o_ zTye*D?Qy{LH(hZTa&z9ylGq9ROk3Q(P6_OoW-Q%?MBc>|8l^`lgTYcTqH`Ed-8vMB z8Uy186iQnsA-5qfp-$AT(1@hnO;RP(h1;})BtUN&I?C4~fG&bz+Pjy2LD2=W%0%cv zO4E;_TIm@@7U`|e@yfhGmgoNo(#iiG031lMe|lhMV9HA4t$wMjX|b#+xnC{o(8@Zd z>?uzfxC2*Rs=9Id>{M5xB?YC@p6T(qp8Hp|`sWu*o>xkqf4Jk3pd7f0cagzg%8z&d z=|T0>Rqa&T#|Q<)zRSX{`EvfC@` zj46*XOx{OTO~ZtS2pcAkVKMLOU>0VGivTXSIga7N=Qsh8PfI*&A#tUA*psBex9TYz zL_lYf?PfWFXHCVHD_I23=M{*9B~tV;(2%XmoCcx|z@N$J45^_TB0Fdv-tL3ONSuOx zMu0?z(A{Q&J8C9|UaORk*$(=6c^)=JqRcOaByt}BCN^*B%=tN=R=rj8w$lo^cxG?n zTZ^71#nY7X7R{Vq@-{Ad8p(LWgnG&k^qyL{EBSEOd6Z+3fZWb(w`A%BM>CO_R!xX8-ag_ zUxAw9ip6HzGo^X66>1sNx=a)Zu00nNY8o;D z!nmCj`}YS=Lh0)dk@O3T+b!*Y1;yomJPnGc!DxLZ zZ>RMp7K)LfSf5@*+P?BY7)mHuKKADm0!qz_(+vxe8gsAYFDgDlj2y3Fo)00u!S+}j z+fVncH=q-GpF}ve?(quR80a94x}gXu?soJKj74u5(~(uTLF57j_hn=((NYJ;Uo235 zw*zsYBCI>o`Za9r4A;QOV1I=8+h$4-q67k?5XRygd8Ry|_D0gh4sdi-ypUM^Z+=w|YWaqrhj!!JxK zewSYvFZxy!X<~;-ZYDQD2w@+>77WhGxEYr#-yVz&poOV+Ixpw*#d`J7$jB`@JT!>b z%xH7VVN&jvs7XJ=FC(0Ed8euqmL+G+qO(SI)@jZ<9E!nFn1gQ}oGVs6n>5cR#dCIk z#~&R0y@Sf$GwPPJ+LoLaetui5JzU0*w^0e&sTM&)-zg}MjgCTPN{)yJf)pqRp)DSa zMEkA{LGyGl8XP5E`Sy|M(B0sLue}gr;|l|yQC0d(TBF0y+M}?B{}~bf2h}h$+%77f zi)^^**1rCJ)AUW_i($ePEYB`b#gdj%rJuMZmN!n#azCjGc#s!2lZLjN&Et}ezJdoO zodQcY^w6?hkC8)yXx%+14@S}T(f$ayHac%+M~6Zm12mEisQ*H-tg(LCJnPo?_l=So z7;7g3o5I(nkzuI!gW^bp4F4NH`8WWmle^^Y&|9IzdDXo^b8ndJTy{Wx>%ExjsM8#@ zjIsq%PS4x!x7>+c(%?^ww5B6g zkCPPnrAz`%*JOxh%k;#H5i2V^MVgqlziHHKVHpWu^N7xyA zqdU_30AO53qM)cT={mw30AV?e7$aJ1E zJxPFQwzNWktsZFJm1t@*_5y~;c9Q_WP*tJB3# z$}49>_d4F~NLFeac0b&m=upd#YUM{2Z{dq&n$wlfMJUcX(2uuYf9v(>J2Q8t1W5B| z_G+FsY#X~Opd(OJk*ch@H}LMj+}-(|^DS!SHm!2o0T&jG2vGVzayN{aG$`e}U ziG(#(Q~%+n4>l!3YRy)yW^2N+?5m%<@nc!_ZP9#NFzu9e@_)Ad%;Qb!_DkCKONPgC z>%NDZ)Yju#>+wYAoa>$LwB`g~L?>9&0J|6o0h3oz1?e!@xyqU;>x?^9QOk^y@|k$P z_9D``HmAHLGv}AQ&5PdVWS8m_8vRf`5gC)=iFTu>nA)9 zXrh@O#9=h^4bY#zG0n8Dv(73|A_I>rtW8tRqm1?)CxjauqjyBr*|Rn!6(P&Y$;v>F z9cFR>f51Da?=w|EfEM?6P6+LFkZg%C*n4p!A+K00=`DJeA4>NnQao0Ln z<~8RxYM~iVEXqwS6YjWsowNn#K}PvOZo(s$#63_iD2uh`Y7zHCeQL4#d|v16X+eZo(Vel1YVD^v1U4T;{K#G$c_-KkYPiBNbU|817?cnnLWZ|~OP#UCpo5x?=CMELcS$SG4n<)C~+QoKc z+K1y}yK;7<%;GdO1IP-VSuD^CE z_T*Z?Otro&UlFGy!&!a`b*<0jk|g=!s>htO++a%4xGFk4#40f`Sg@U($0JpD_b?vG z5Yt>l0^I>&41~Q=U3lT*J3`{A=892*9L&13xCMld?ROu_FDi6hLgs_?bK@ z03ABTCIpuH*T!OL0YIM=F$n?gMq(rghz&#vg#o&Yj%Cq0IQFqc7o?G)2>S%oCt@R` zkP?u1iZN7-zsj7uiin}0MC)9Gonzb?;U#12j5@OeDE}9Jx(A|d=ot+SMkBKHmlW$U z0b(2JmJ)237+l;4`Q9TyqDkXe6|JX@J>rvOE8qZi@*~lfAr1StWHYmY62>4GDqf5K zGkS$JDvwAc1%OtqW3=xs+s`4nXh!z7oppfD_Y*^I+=`Lm5)IQ3Sig%T<8`ZfMR2*e zoq!J6Y@p-~7$a-)iS-7bi0zhpfp-IQo7A!brG3i%1WLL4mVC{NzUJh~`IFk#BacMI*R1-^X+B8DgTz368F>5}_5hZ=4U66e z)f?2jLB$({81Td44~E|#xj(YPH54I-F9Qh?yG6t#)~Eb+u-^O5yWdIPR;#vYRok-O zH4Q(MlY8Ee-;d8t%uQ%D?MpSg7Hf8?HP2}^&m|lwU(MXWqHm+(+nDlKzjI@ z*EHPUoP0^GX-E7-XQI&)YRYKx>U1yv1app*TVB3Us&35Y;nu6N1F`E znp$&Ct2w9m*LYJQdfV4_siqSq6z?%T*ha8-9FQtqN&XA+%P-hCk~#^_W*X&T1WJnfu=af=_Y^ z#q+}a>mL^_Z925L>CnR)>ZX(0rju&rDXsF9Qo6>QY7HfGeT`F%{EN}Yp z-rqmFApdmy(R0e>SCv~)2HlEtj-%N%5}nu9?r{CgmH~l&$^h z)_x7JS=5?EwLGGgM-=ZG?~~%PnIU8M|A%+pAHP4Y7H`&yH!IFH-Y;9~9L!(pT%8|R zTCjU^K=U3@GB2%n4x-at2jvr>BtP2`It{((8vE%+;pf3)UO0Zy?(eb)So^^KiRGB% zgxB#)mj#YrdU&#XJNUH)k{OR;%qRp~ci0Zj zW1|hmL+m)fc!d2n80!L7$nlaF=@{-Q=mr@}^l{tvjvXD_A>Oye24h1Y=cKL(iJorl zJb6mDUG6#7b3%78k^k6eKkAcxtgx}=;|wgTmyc%VppVbLZz`4*vq2n&hWZJ9#?xob zzjrDL!oT`4=WJ$;a;hvgb}NA7N7jb4wt{R_)`C96$;@>1qyVCP9X}?FHfEf^vU8rw z>Gqj+h@!of@5$2_XD&{jScaGy>b~TUi(A%vS(AeU7#NF*U|~$HM=iiwuNRhd>*ZAy z;;}qe5@lNbd{sN-+QK5S%Kr*^6jt69&J<6g%M)m$NFW%1b8f@Q_MzCoxT)^ctE}#Gu&l1TT$aDRA!5U@P|!-3$(Zm7 z`DC7)9)lUPpmoXRw`ne3p;n@3zYj&}kfZxI{~fLIg^PBF=v4k)Fnk_E+J*zDxM#|a z8;be{hkB^dvJXiZeE2;jelD$wxQBM`h|e;f_4ek>9hzT3q_$>Ko2!M8>>v@j`vO!L z_oDDTwr&1BQh+@(j?6;;W=zI~0GZR=I3;=;;l}Zd^JJSKK!wJx-iu`a1wj73o^%-= z+klq0vFWdI|j%}oIx$RqkxRPzc zgfUK(gbjz?DA(U?IG7MSOI+8M85L?k5^fd@Io#l}ddTHt8tgWdnmw@F^Xyp(8~mwe z?8%QW+B3&2^Gm(9)T~hH2uH{r$jhC{S9IQY0NY*NQa##EY9P=EFgOb^?n$p5gJ^YW z5NQl`0*A5qVe6iL^1Vjja{#^EymuAGI*}b-gx+lq$$m!B*$R&7PZZ=cK%tCNAT>OU zB^K#@*z?Dd?j}qyh>5frg`L1p{`-ja)i{aCXJ+%;9dC6&``sPX+`;UPHl1k{zL3>< zSDeS6fH&MsjNN8jdok;z({@ofwer@~fy4i7|Kq^offtnnFa9vH)Ub20VdugwwPBCe zut&lB($!R%f3_MpPDpKl^=^?N`tW}X7fT9cUP$475MAHm$4XvAFW7YyuT zY+0~Nq*!A{q^)Sjx2Y|gQ3AROX#KH`u*vg`M(9>#Vj7b~Q(28@Ied>BfNb&~@Dn5< zB<;;i<+x{=n4!b@3ez;j8J4&;j=M?ai}7`&<^?lAc3c6a8uA&V9L--_wrS3XE?#(+ zcs+G7hp44Po*CgGd))S{=D`fUdb$gH8TrP9Lb)rfXKN(5I12psWcaC;7kblXFR4oaqBc^ z7Nbtsri<3zU@6=dov`O0tuiC)gd^rdTcscK7Ku3~q3JY-A!Uo(F+y!o^K{bDJaiYl;bHbmOXxk?ILoEGrx(g+tL6cd-jQ?B@0dr^-#Z zzU|80KoH9ljK-bVvI2(Z)<00eamIqVLSMH`aZ{ct?^F>KRa|d)C)_b2Z0X0s#ogvM z5#0|w<`Q7|db^o~kuGdwlM$Qm`jLkrc#M#Z#xGODmuLbT{BJ1Lcxi@N46=)+_6OPA z3K=KKRG9)RWs-dewS2(vr;`Bh2?l~sJ95-Aqi&qwCHu5Jhuos&mX&gm{%1atn=3ZK zAtx4#Vv5!@tqQN7BF>RcKXgGsW$-nk=uGkvjGED?m2)dL1pgA5%B05pxxsKg$*Q@lo~ER%Z94eKvAF4!vgy=v zS=Bp%*}#;+NG6?&h7de8jeQsD4n;IJAvss)ZJfeVk$By6e5gA2Qq%7ez6 zur60N{BV4}`N8Ie$fH}zt9{C$Yk1Yheyy=ztrWFNQ7O&6D}vyvN>$h0uUo1PEmnur z>MdIJ7R6Wi(jgBY4tAGT;EUyzQ)i7o2+mzqjOQFLBZb&)K+b5a-PrKJ`9}9}L>%l3 z$L@|sLgkE^H8ykD3Z0=1aYgL(IXiH?zh8T6RBxT;t(!an70{W`WaqDOZ(j>LAiOtuMJ?Z=m2Xjt zwrWLNCr@IXK6gqjZqbTcCQso?_{0s>)1Y}8CcBo)AYe`2UJ%r>=d`ltCeK1i+k6&i z^6anOrPFOQZII`=i*O8R*;oC}-r2plCNNb}L9nzc<*QExns9-phc3|c&;^e zpR>8&yzspX3EbISsR@<1-Tet2EVA1+uLxGx3p~RM#n)Q!PIXOneZE{mXO0mmA1;?S z&UY$%yC3fWptt;@T7FS0zo>XGQsa<>`vo4BUzgM-yOffUS`yMqLW(oAVlT5*y$4Y- zjAsRZwUP7I&t0Co{Qi~uSCnnX6;G$?>C`;9^RQD6pbLNDE$Xb{eo^D@JY@Yvi@$TP z^%r|>u!s1QQWN$4zXk{$Hg3q?05+&_{p?^^;`#T>}4zz?%TN?b_WKYQe}f+b#Vy z1^p)i|CzvN6v>MJ`CzPf&XFIjRdpxsCSqfo36+^_)sPvG0E(^njnU19^Z%h_G%^aL zvtKw58rPj4md+p)FDD1lbU+K9PjSVQ>A$Qs#d#*#Uy5^0vcEi^Y*3rRt?{O~UCOh) zPdM)+`%7{Al&^YI+#zMHcf~I72e9?QA4t$%2!9|cFk=Dd^D`lNwqgfgmGD7*kFTOz zUSK2*gW2s%i|2y zYVaxfeMaC@E8@LTZNzLS;q8g9uW&F5K@-cajK?Xf)!8*H%lNuEs6fJ45Ed>!Y+ktfQzU@%k@$F*^1|iEdq0UXxAO8gm{|zgLVU{# z1eEG5Slf&9!Xqv)L zc?qZmvk(+_^ZdaTZf%&T`ILoxmLGo2!a0g3Zz%rO`P+(pk80ne+4oFZpVV)dv`$`~ zx~17_6mA1vmD`YNXqvQ5-buJMN3Fs&;Z?aNXs}vrZ`OUUZfa2F0~*R?5vX7pKxkZ@ z^)AjtzNzx?#bB}0kjwyzT$AUUv`Ff47U$GSP_PNU|9E@boeDl0K-}a+dnKK6{kA_U-l_p#OHi!KT7=^&= z1f*U=jx{aSC|gN9cHRKe~}0YS^iSO~_@%l1pcVFb4Dg0YHbS0RR91 literal 0 HcmV?d00001 diff --git a/src/queuegate_proxy/app.py b/src/queuegate_proxy/app.py new file mode 100644 index 0000000..68d4c13 --- /dev/null +++ b/src/queuegate_proxy/app.py @@ -0,0 +1,472 @@ +import asyncio +import hashlib +import json +import os +import time +import uuid +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional, Tuple + +import httpx +from fastapi import Body, FastAPI, HTTPException, Request +from fastapi.responses import JSONResponse, StreamingResponse + + +def env_bool(key: str, default: bool = False) -> bool: + v = os.getenv(key) + if v is None: + return default + return v.strip().lower() in {"1", "true", "yes", "on"} + + +def env_int(key: str, default: int) -> int: + v = os.getenv(key) + if v is None: + return default + try: + return int(v) + except ValueError: + return default + + +def now_ts() -> float: + return time.time() + + +def job_id() -> str: + return uuid.uuid4().hex + + +def sha1(text: str) -> str: + return hashlib.sha1(text.encode("utf-8", errors="ignore")).hexdigest() + + +@dataclass +class ProxyConfig: + upstreams: List[str] + + sticky_header: str = "X-Chat-Id" + affinity_ttl_sec: int = 60 + + queue_notify_user: str = "auto" # auto|always|never + queue_notify_min_ms: int = 1200 + + read_timeout_sec: int = 3600 + + toolserver_url: Optional[str] = None # for later + text_toolcall_detect: bool = False + + +def load_config() -> ProxyConfig: + ups = (os.getenv("LLM_UPSTREAMS") or "").strip() + if not ups: + raise RuntimeError("LLM_UPSTREAMS is required (comma-separated URLs)") + upstreams = [u.strip() for u in ups.split(",") if u.strip()] + + return ProxyConfig( + upstreams=upstreams, + sticky_header=(os.getenv("STICKY_HEADER") or "X-Chat-Id").strip() or "X-Chat-Id", + affinity_ttl_sec=env_int("AFFINITY_TTL_SEC", 60), + queue_notify_user=(os.getenv("QUEUE_NOTIFY_USER") or "auto").strip().lower(), + queue_notify_min_ms=env_int("QUEUE_NOTIFY_MIN_MS", 1200), + read_timeout_sec=env_int("LLM_READ_TIMEOUT", 3600), + toolserver_url=(os.getenv("TOOLSERVER_URL") or "").strip() or None, + text_toolcall_detect=env_bool("TEXT_TOOLCALL_DETECT", False), + ) + +@dataclass +class Job: + job_id: str + created_ts: float + kind: str # user_chat | agent_call + stream: bool + body: Dict[str, Any] + headers: Dict[str, str] + thread_key: str + assigned_worker: int + + status: str = "queued" # queued|running|done|error + error: Optional[str] = None + result: Optional[Dict[str, Any]] = None + + # For waiting results + done_fut: asyncio.Future = field(default_factory=asyncio.Future) + stream_q: asyncio.Queue = field(default_factory=asyncio.Queue) + saw_any_output: bool = False + + +class Worker: + def __init__(self, idx: int, upstream_url: str, client: httpx.AsyncClient): + self.idx = idx + self.upstream_url = upstream_url + self.client = client + self.user_q: asyncio.Queue[Job] = asyncio.Queue() + self.agent_q: asyncio.Queue[Job] = asyncio.Queue() + self.current_job_id: Optional[str] = None + self.task: Optional[asyncio.Task] = None + + def pending_count(self) -> int: + return self.user_q.qsize() + self.agent_q.qsize() + (1 if self.current_job_id else 0) + + async def start(self, state: "ProxyState") -> None: + self.task = asyncio.create_task(self._run(state), name=f"worker-{self.idx}") + + async def _run(self, state: "ProxyState") -> None: + while True: + if not self.user_q.empty(): + job = await self.user_q.get() + else: + job = await self.agent_q.get() + + self.current_job_id = job.job_id + job.status = "running" + + try: + if job.stream: + await state.handle_stream_job(job, self) + else: + await state.handle_non_stream_job(job, self) + except Exception as e: + job.status = "error" + job.error = str(e) + if not job.done_fut.done(): + job.done_fut.set_exception(e) + await job.stream_q.put(None) + finally: + self.current_job_id = None + try: + if job.kind == "user_chat": + self.user_q.task_done() + else: + self.agent_q.task_done() + except Exception: + pass + + +def get_header_any(headers: Dict[str, str], names: List[str]) -> Optional[str]: + for n in names: + v = headers.get(n) + if v: + return v + return None + + +def infer_kind(headers: Dict[str, str]) -> str: + jk = (headers.get("X-Job-Kind") or "").strip().lower() + if jk in {"agent", "agent_call", "repo_agent"}: + return "agent_call" + return "user_chat" + + +def infer_thread_key(cfg: ProxyConfig, headers: Dict[str, str], body: Dict[str, Any]) -> str: + v = get_header_any( + headers, + [cfg.sticky_header, "X-OpenWebUI-Chat-Id", "X-Chat-Id", "X-Conversation-Id"], + ) + if v: + return v + + seed = { + "model": (body.get("model") or "").strip(), + "messages": (body.get("messages") or [])[:2], + "user": body.get("user"), + } + try: + txt = json.dumps(seed, sort_keys=True, ensure_ascii=False) + except Exception: + txt = str(seed) + return "h:" + sha1(txt) + + +class ProxyState: + def __init__(self, cfg: ProxyConfig): + self.cfg = cfg + self.http = httpx.AsyncClient(timeout=httpx.Timeout(cfg.read_timeout_sec)) + self.workers: List[Worker] = [Worker(i, u, self.http) for i, u in enumerate(cfg.upstreams)] + self.affinity: Dict[str, Tuple[int, float]] = {} + self.jobs: Dict[str, Job] = {} + + async def start(self) -> None: + for w in self.workers: + await w.start(self) + + async def close(self) -> None: + await self.http.aclose() + + def pick_worker(self, thread_key: str) -> int: + now = now_ts() + sticky = self.affinity.get(thread_key) + if sticky: + widx, last = sticky + if now - last <= self.cfg.affinity_ttl_sec and 0 <= widx < len(self.workers): + self.affinity[thread_key] = (widx, now) + return widx + + best = 0 + best_load = None + for w in self.workers: + load = w.pending_count() + if best_load is None or load < best_load: + best_load = load + best = w.idx + + self.affinity[thread_key] = (best, now) + return best + + def enqueue(self, job: Job) -> None: + w = self.workers[job.assigned_worker] + if job.kind == "user_chat": + w.user_q.put_nowait(job) + else: + w.agent_q.put_nowait(job) + self.jobs[job.job_id] = job + + + +def sse_pack(obj: Dict[str, Any]) -> bytes: + return ("data: " + json.dumps(obj, ensure_ascii=False) + "\n\n").encode("utf-8") + + +def sse_done() -> bytes: + return b"data: [DONE]\n\n" + + +def make_chunk(job_id: str, model: str, delta: Dict[str, Any], finish_reason: Optional[str] = None) -> Dict[str, Any]: + return { + "id": job_id, + "object": "chat.completion.chunk", + "created": int(now_ts()), + "model": model, + "choices": [{"index": 0, "delta": delta, "finish_reason": finish_reason}], + } + + +def looks_like_toolcalls_text(txt: str) -> bool: + s = (txt or "").lstrip() + return s.startswith("[TOOL_CALLS]") or s.startswith("{\"tool_calls\"") + + +def strip_toolcalls_text(txt: str) -> str: + return "(tool-call output suppressed; tools not enabled in proxy-only MVP)" + + +def capture_tool_calls_from_delta(delta: Dict[str, Any], acc: List[Dict[str, Any]]) -> None: + tcs = delta.get("tool_calls") + if isinstance(tcs, list): + acc.extend(tcs) + + + +class ProxyState(ProxyState): # type: ignore + def queue_position(self, job: Job) -> int: + w = self.workers[job.assigned_worker] + if job.kind == "user_chat": + return w.user_q.qsize() + return w.agent_q.qsize() + + async def handle_non_stream_job(self, job: Job, worker: Worker) -> None: + body = dict(job.body) + body["stream"] = False + + r = await self.http.post(worker.upstream_url, json=body) + r.raise_for_status() + data = r.json() + + if self.cfg.text_toolcall_detect: + try: + msg = data.get("choices", [{}])[0].get("message", {}) + content = msg.get("content") + if isinstance(content, str) and looks_like_toolcalls_text(content): + msg["content"] = strip_toolcalls_text(content) + except Exception: + pass + + job.result = data + job.status = "done" + if not job.done_fut.done(): + job.done_fut.set_result(data) + + async def handle_stream_job(self, job: Job, worker: Worker) -> None: + body = dict(job.body) + body["stream"] = True + model = (body.get("model") or "").strip() or "unknown" + + tool_calls: List[Dict[str, Any]] = [] + + async with self.http.stream("POST", worker.upstream_url, json=body) as r: + r.raise_for_status() + async for line in r.aiter_lines(): + if not line: + continue + if not line.startswith("data:"): + continue + payload = line[len("data:"):].strip() + if payload == "[DONE]": + break + + try: + obj = json.loads(payload) + except Exception: + # non-json chunk; pass through + await job.stream_q.put((line + "\n\n").encode("utf-8")) + job.saw_any_output = True + continue + + choice0 = (obj.get("choices") or [{}])[0] + delta = choice0.get("delta") or {} + + if "tool_calls" in delta: + capture_tool_calls_from_delta(delta, tool_calls) + continue + + if self.cfg.text_toolcall_detect: + c = delta.get("content") + if isinstance(c, str) and looks_like_toolcalls_text(c): + continue + + await job.stream_q.put((line + "\n\n").encode("utf-8")) + job.saw_any_output = True + + if tool_calls and not job.saw_any_output: + msg = "(tool-call requested but tools are not enabled yet in the proxy-only MVP)" + await job.stream_q.put(sse_pack(make_chunk(job.job_id, model, {"role": "assistant", "content": msg}))) + await job.stream_q.put(sse_pack(make_chunk(job.job_id, model, {}, finish_reason="stop"))) + + await job.stream_q.put(sse_done()) + job.status = "done" + if not job.done_fut.done(): + job.done_fut.set_result({"status": "streamed"}) + + +app = FastAPI(title="QueueGate Proxy", version="0.1.0") + +CFG: Optional[ProxyConfig] = None +STATE: Optional[ProxyState] = None + + +@app.on_event("startup") +async def _startup() -> None: + global CFG, STATE + CFG = load_config() + STATE = ProxyState(CFG) + await STATE.start() + + +@app.on_event("shutdown") +async def _shutdown() -> None: + global STATE + if STATE: + await STATE.close() + + +@app.get("/healthz") +async def healthz() -> Dict[str, Any]: + if not STATE: + raise HTTPException(status_code=503, detail="not ready") + return { + "ok": True, + "upstreams": len(STATE.workers), + "workers": [ + { + "id": w.idx, + "pending": w.pending_count(), + "busy": bool(w.current_job_id), + "upstream": w.upstream_url, + } + for w in STATE.workers + ], + } + + +@app.get("/v1/jobs/{job_id}") +async def job_status(job_id: str) -> Dict[str, Any]: + if not STATE: + raise HTTPException(status_code=503, detail="not ready") + job = STATE.jobs.get(job_id) + if not job: + raise HTTPException(status_code=404, detail="unknown job") + return { + "job_id": job.job_id, + "status": job.status, + "kind": job.kind, + "created_ts": job.created_ts, + "worker": job.assigned_worker, + "queue_position_est": STATE.queue_position(job) if job.status == "queued" else 0, + "error": job.error, + } + + +def _require_state() -> ProxyState: + if not STATE: + raise HTTPException(status_code=503, detail="not ready") + return STATE + + +def _copy_headers(req: Request) -> Dict[str, str]: + # keep it simple: copy headers for sticky routing + internal kind + keep = { + "X-Job-Kind", + "X-Chat-Id", + "X-Conversation-Id", + "X-OpenWebUI-Chat-Id", + } + out: Dict[str, str] = {} + for k, v in req.headers.items(): + if k in keep or k.lower() == (CFG.sticky_header.lower() if CFG else "").lower(): + out[k] = v + return out + + +@app.post("/v1/chat/completions") +async def chat_completions(body: Dict[str, Any] = Body(...), request: Request = None): + state = _require_state() + cfg = state.cfg + + stream = bool(body.get("stream", False)) + headers = _copy_headers(request) + kind = infer_kind(headers) + thread_key = infer_thread_key(cfg, headers, body) + worker_idx = state.pick_worker(thread_key) + + jid = job_id() + job = Job( + job_id=jid, + created_ts=now_ts(), + kind=kind, + stream=stream, + body=body, + headers=headers, + thread_key=thread_key, + assigned_worker=worker_idx, + ) + state.enqueue(job) + + if not stream: + # wait for result + try: + data = await job.done_fut + except Exception as e: + raise HTTPException(status_code=502, detail=f"upstream error: {e}") + return JSONResponse(content=data) + + # streaming + async def gen() -> Any: + # Optional: queue notice (user jobs only) + if job.kind == "user_chat" and cfg.queue_notify_user != "never": + t0 = now_ts() + # wait until running or until threshold + while job.status == "queued" and (now_ts() - t0) * 1000 < cfg.queue_notify_min_ms: + await asyncio.sleep(0.05) + if job.status == "queued" and cfg.queue_notify_user in {"auto", "always"}: + pos = state.queue_position(job) + model = (body.get("model") or "unknown").strip() or "unknown" + msg = f"⏳ In wachtrij (positie ~{pos})…" + yield sse_pack(make_chunk(job.job_id, model, {"role": "assistant", "content": msg})) + + while True: + item = await job.stream_q.get() + if item is None: + break + yield item + + return StreamingResponse(gen(), media_type="text/event-stream")