在多台 vLLM 推理节点同时提供服务时,最直接的做法通常是先在前面放一层 Nginx 做转发,再由上游节点共同承接请求。

但如果只是静态轮询,实际效果往往并不理想。因为大模型服务的负载并不均匀,某一台节点可能已经接近 KV Cache 极限、出现等待队列,甚至开始发生 preemption,而另一台节点却还比较空闲。这个时候,负载均衡如果不能感知后端状态,就容易把请求继续打到“已经很忙”的节点上。

这篇文章整理一套比较实用的做法:使用 Nginx 作为统一入口,配合一个定时执行的权重调整脚本,根据 vLLM 指标动态改写 upstream 配置,并自动 reload Nginx。

整体链路

整体上可以把这套链路理解成两条并行路径:一条是业务请求流量,另一条是指标采集与权重回写。

业务流量路径 请求从统一入口进入,再根据 upstream 权重分发到不同推理节点。 Client / SDK OpenAI compatible request Nginx ingress /v1/ unified entry upstream llm_cluster least_conn + dynamic weight vLLM node-a serving backend vLLM node-b serving backend 指标与调度路径 定时任务抓取节点指标,重新计算权重,校验配置后 reload Nginx。 systemd timer periodic trigger vllm_nginx_lb.sh collect metrics + compute weight GET node-a /metrics kv / waiting / running GET node-b /metrics kv / waiting / running rewrite upstream nginx -t & reload new weights applied to ingress

一、Nginx 入口配置

先在 Nginx 里定义一个用于承接 OpenAI 兼容接口请求的 server:

server {
    listen 80;
    server_name 10.0.0.10;

    access_log /var/log/nginx/llm_access.log upstream_trace;
    error_log /var/log/nginx/llm_error.log info;

    location /v1/ {
        proxy_pass http://llm_cluster;

        proxy_buffering off;
        proxy_cache off;

        proxy_http_version 1.1;
        proxy_set_header Connection "";

        client_body_buffer_size 8m;

        proxy_read_timeout 300s;
        proxy_send_timeout 300s;
        proxy_connect_timeout 5s;
        send_timeout 300s;

        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

        add_header X-Upstream-Addr $upstream_addr always;
        add_header X-Upstream-Status $upstream_status always;
    }
}

这里有几个点比较关键:

  • proxy_buffering off 适合流式输出场景,避免响应被 Nginx 缓冲
  • proxy_http_version 1.1Connection "" 有助于上游长连接保持稳定
  • X-Upstream-AddrX-Upstream-Status 打回响应头,后续排查路由问题会方便很多

二、定义 upstream 集群

如果先不考虑动态调度,可以从一个最基本的 upstream 开始:

upstream llm_cluster {
    least_conn;
    server 10.0.0.10:18090 weight=100 max_fails=10 fail_timeout=60s;
    server 10.0.0.11:18090 weight=100 max_fails=10 fail_timeout=60s;
    keepalive 64;
}

least_conn 比简单轮询更适合推理服务,因为不同请求的处理时间差异很大。即便如此,如果后端节点的显存占用、等待队列和 preemption 情况差异明显,单靠 least_conn 依然不够。

三、用 systemd 定时执行权重调整

这套方案的思路很简单:每隔一段时间拉取一次所有后端的指标,根据状态重新计算权重,如果权重有变化,就改写 upstream 配置并 reload Nginx。

为了让这个动作自动运行,可以交给 systemd 的 service 和 timer:

[Unit]
Description=Dynamic nginx load balancing for vLLM backends
After=network.target nginx.service

[Service]
Type=oneshot
ExecStart=/usr/local/bin/vllm_nginx_lb.sh
[Unit]
Description=Run dynamic nginx load balancing every 10 seconds

[Timer]
OnBootSec=10s
OnUnitActiveSec=10s
AccuracySec=1s
Unit=vllm-nginx-lb.service

[Install]
WantedBy=timers.target

这种方式的优点是简单直接,不需要额外常驻一个守护进程,也方便通过 systemctl statusjournalctl 排查问题。

四、指标采集与权重调整脚本

下面是一版示例脚本。核心逻辑是:

  • 从每个 vLLM 节点抓取 /metrics
  • 提取 KV Cache 使用率、等待队列、运行中请求数、preemption 总数
  • 结合当前值与历史值计算新的权重
  • 改写 upstream 配置并 reload Nginx
#!/usr/bin/env python3
import os
import re
import shutil
import subprocess
import sys
import tempfile
import urllib.request

API_KEY = 'YOUR_API_KEY'
STATE_DIR = '/var/lib/vllm-nginx-lb'
UPSTREAM_FILE = '/etc/nginx/conf.d/llm_upstream.conf'
HOSTS = {
    'node-a': '10.0.0.10',
    'node-b': '10.0.0.11',
}

METRIC_PATTERNS = {
    'kv_cache_usage': re.compile(r'^vllm:kv_cache_usage_perc\\{.*\\}\\s+([0-9.eE+-]+)$'),
    'waiting': re.compile(r'^vllm:num_requests_waiting\\{.*\\}\\s+([0-9.eE+-]+)$'),
    'running': re.compile(r'^vllm:num_requests_running\\{.*\\}\\s+([0-9.eE+-]+)$'),
    'preemptions': re.compile(r'^vllm:num_preemptions_total\\{.*\\}\\s+([0-9.eE+-]+)$'),
}

os.makedirs(STATE_DIR, exist_ok=True)


def fetch_metrics(host: str) -> str:
    req = urllib.request.Request(
        f'http://{host}:18090/metrics',
        headers={'Authorization': f'Bearer {API_KEY}'},
    )
    with urllib.request.urlopen(req, timeout=5) as resp:
        return resp.read().decode('utf-8', errors='ignore')


def parse_metric(raw: str, key: str) -> float:
    pattern = METRIC_PATTERNS[key]
    for line in raw.splitlines():
        match = pattern.match(line)
        if match:
            return float(match.group(1))
    raise RuntimeError(f'metric {key} not found')


def load_prev(name: str) -> int:
    path = os.path.join(STATE_DIR, f'{name}.preemptions')
    if not os.path.exists(path):
        return 0
    try:
        return int(open(path, 'r', encoding='utf-8').read().strip() or '0')
    except Exception:
        return 0


def save_prev(name: str, value: int) -> None:
    path = os.path.join(STATE_DIR, f'{name}.preemptions')
    with open(path, 'w', encoding='utf-8') as f:
        f.write(str(value))


def downgrade(weight: int) -> int:
    return {100: 60, 60: 30, 30: 10}.get(weight, 10)


def compute_weight(kv_cache_usage: float, waiting: int, running: int, preemption_delta: int) -> int:
    if kv_cache_usage >= 0.92:
        weight = 10
    elif kv_cache_usage >= 0.85:
        weight = 30
    elif kv_cache_usage >= 0.75:
        weight = 60
    else:
        weight = 100

    if waiting > 0:
        weight = downgrade(weight)
    if preemption_delta > 0:
        weight = downgrade(weight)
    if running > 8:
        weight = downgrade(weight)
    return weight


def render_upstream(weights: dict) -> str:
    return (
        'upstream llm_cluster {\\n'
        '    least_conn;\\n'
        f'    server {HOSTS[\"node-a\"]}:18090 weight={weights[\"node-a\"]} max_fails=10 fail_timeout=60s;\\n'
        f'    server {HOSTS[\"node-b\"]}:18090 weight={weights[\"node-b\"]} max_fails=10 fail_timeout=60s;\\n'
        '    keepalive 64;\\n'
        '}\\n'
    )


def log_metrics(metrics: dict, weights: dict) -> None:
    print(
        'vllm-nginx-lb '
        f\"node-a(weight={weights['node-a']},kv={metrics['node-a']['kv_cache_usage']:.3f},waiting={metrics['node-a']['waiting']},running={metrics['node-a']['running']},preempt_delta={metrics['node-a']['preemption_delta']}) \"
        f\"node-b(weight={weights['node-b']},kv={metrics['node-b']['kv_cache_usage']:.3f},waiting={metrics['node-b']['waiting']},running={metrics['node-b']['running']},preempt_delta={metrics['node-b']['preemption_delta']})\"
    )


def main() -> int:
    metrics = {}
    for name, host in HOSTS.items():
        raw = fetch_metrics(host)
        current_preemptions = int(parse_metric(raw, 'preemptions'))
        previous_preemptions = load_prev(name)
        delta = max(0, current_preemptions - previous_preemptions)
        save_prev(name, current_preemptions)
        metrics[name] = {
            'kv_cache_usage': parse_metric(raw, 'kv_cache_usage'),
            'waiting': int(parse_metric(raw, 'waiting')),
            'running': int(parse_metric(raw, 'running')),
            'preemption_delta': delta,
        }

    weights = {name: compute_weight(**values) for name, values in metrics.items()}
    log_metrics(metrics, weights)
    new_content = render_upstream(weights)

    old_content = ''
    if os.path.exists(UPSTREAM_FILE):
        with open(UPSTREAM_FILE, 'r', encoding='utf-8') as f:
            old_content = f.read()
    if new_content == old_content:
        return 0

    fd, tmp_path = tempfile.mkstemp(prefix='llm_upstream.', text=True)
    try:
        with os.fdopen(fd, 'w', encoding='utf-8') as tmp:
            tmp.write(new_content)
        backup_path = UPSTREAM_FILE + '.bak'
        if os.path.exists(UPSTREAM_FILE):
            shutil.copy2(UPSTREAM_FILE, backup_path)
        shutil.move(tmp_path, UPSTREAM_FILE)
        subprocess.run(['nginx', '-t'], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
        subprocess.run(['nginx', '-s', 'reload'], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
        return 0
    except Exception:
        backup_path = UPSTREAM_FILE + '.bak'
        if os.path.exists(backup_path):
            shutil.move(backup_path, UPSTREAM_FILE)
            subprocess.run(['nginx', '-t'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
            subprocess.run(['nginx', '-s', 'reload'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
        raise
    finally:
        if os.path.exists(tmp_path):
            os.remove(tmp_path)


if __name__ == '__main__':
    try:
        raise SystemExit(main())
    except Exception as exc:
        print(f'vllm_nginx_lb failed: {exc}', file=sys.stderr)
        raise

五、权重逻辑怎么理解

这份脚本没有追求特别复杂的调度算法,而是用一种容易维护的阶梯式权重策略:

  • KV Cache 使用率低,权重给到 100
  • 使用率升高后,逐级降到 60 / 30 / 10
  • 如果节点已经出现等待队列、preemption 增量,或者运行中请求数过高,再额外降级一次

这种策略的优点是足够直观,而且容易根据现场情况调整阈值。对于大模型推理服务来说,很多时候“简单但能稳定工作”的调度策略,反而比复杂策略更实用。

六、为什么这样做

如果只看 CPU 或系统负载,大模型服务的真实压力往往看不出来。真正影响吞吐和时延的,往往是这些更贴近推理内部状态的指标:

  • KV Cache 使用率
  • 等待中的请求数
  • 运行中的请求数
  • preemption 是否开始出现

当某台机器已经开始排队或者频繁 preemption 时,就说明它已经不适合继续承接太多新请求。此时把权重降下来,让流量更多落到相对空闲的节点上,整体体验通常会更稳定。

七、落地时的几个注意点

  • 如果 /metrics 需要鉴权,脚本里要带上对应的认证头
  • 定时周期不要太短,10 秒通常已经够用,过短会让 Nginx reload 过于频繁
  • 改写 upstream 前最好保留备份,并在 reload 失败时自动回滚
  • 如果节点数量继续增加,建议把主机列表和阈值独立成配置文件

这套方案的本质,不是让 Nginx 变成一个“智能调度器”,而是把 vLLM 自己暴露出来的运行指标真正用起来。对小规模推理集群来说,这样的做法已经足够实用,也更容易长期维护。

标签: vllm, 大模型, nginx, 负载均衡, 运维

添加新评论