FastAPI/Flask 如何实现高并发:2025年实用技巧

FastAPI/Flask 如何实现高并发:2025年实用技巧

关键

flask高并发

  • 异步编程:用FastAPI的异步特性提升性能。
  • 负载均衡:分担流量,防止单点崩溃。
  • 缓存优化:减少数据库压力,提升响应速度。
  • Web服务器升级:Gunicorn+Uvicorn组合更高效。

引言:高并发不是梦

嘿,朋友们!最近用 FastAPI 或 Flask 开发 API,感觉并发量一上来就卡得不行?别慌,高并发问题其实有解!2025年,随着项目规模扩大,优化这些框架的性能成了热门话题。无论是处理几千个请求,还是应对突发流量,我今天就带你看看怎么让 FastAPI 和 Flask 跑得飞快!

基础认知:并发问题的根源

FastAPI 和 Flask 都是 Python 的 Web 框架,但默认情况下,它们处理并发能力有限。Flask 是同步框架,单线程处理请求;FastAPI 虽支持异步,但也得优化才能发挥潜力。2025年的服务器环境更复杂了,流量高峰、数据库瓶颈这些问题得提前解决。走,咱们一步步搞定!

优化方案:让 API 飞起来

1. 异步编程:FastAPI 的杀手锏

FastAPI 内置了对异步的支持,用 asyncawait 能让它处理多个请求不堵塞。比如:

1
2
3
4
5
6
7
from fastapi import FastAPI

app = FastAPI()

@app.get("/items/")
async def read_items():
return {"message": "Hello, async world!"}

用异步库如 aiohttp 处理外部请求,2025年最新的 httpx 也挺火,能异步调用 API,效率翻倍。Flask 想玩异步,得加 asyncio 或用 Flask 3.0 的新特性,但 FastAPI 天然占优。

负载均衡:分担压力

单台服务器扛不住大流量?试试负载均衡!用 Nginx 或 Traefik,把请求分到多台服务器上。2025年,Kubernetes 配合这些工具更流行,自动扩展实例,流量一多就加机器,稳得一批。配置 Nginx 很简单,改改配置文件就行:

1
2
3
4
5
6
7
8
9
10
11
upstream myapp {
server 127.0.0.1:8000;
server 127.0.0.1:8001;
}

server {
listen 80;
location / {
proxy_pass http://myapp;
}
}

缓存优化:快人一步

数据库查询是瓶颈?加个缓存!Redis 或 Memcached 是好选择,把常见数据存进去,请求直接从内存拿,秒回。FastAPI 里可以用 aioredis:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from fastapi import FastAPI
import aioredis

app = FastAPI()
redis = aioredis.from_url("redis://localhost")

@app.get("/data/")
async def get_data():
cached = await redis.get("my_key")
if cached:
return {"data": cached.decode()}
# 模拟数据库查询
data = "Hello from DB"
await redis.set("my_key", data, ex=3600) # 缓存1小时
return {"data": data}

Web 服务器升级:Gunicorn + Uvicorn

默认的开发服务器扛不住生产环境?用 Gunicorn 搭配 Uvicorn。Gunicorn 管理多个工作进程,Uvicorn 跑异步任务,完美组合。启动命令:

gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app

-w 4 设 4 个工作进程,2025年建议根据 CPU 核心数调整,发挥最大性能。Flask 也适用这个组合,记得加 gevent 异步支持。

数据库优化:异步驱动

数据库优化:对于频繁的数据库操作,可以优化数据库结构、索引等,提高查询性能。

数据库慢?用异步数据库驱动!FastAPI 配 asyncpg(PostgreSQL)或 aiomysql让查询不阻塞主线程。2025年,NoSQL, 如 MongoDB 的异步支持也更成熟,选对驱动事半功倍。

数据库其他解决思路

读写分离
分库分表
集群

分库分表

分库逻辑:按 user_id % 2 选库,扩展时可加更多。
注意:每次切换得重设 SQLALCHEMY_DATABASE_URI。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from fastapi import FastAPI, Depends
from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter
from redis.asyncio import Redis

app = FastAPI()

# 初始化 Redis(限流存储)
redis = Redis(host="localhost", port=6379, encoding="utf-8", decode_responses=True)
FastAPILimiter.init(redis)

@app.get("/api/data", dependencies=[Depends(RateLimiter(times=100, seconds=60))])
async def get_data():
return {"message": "Hello from FastAPI!"}


按用户 ID 取模分表:

1
2
3
4
class User(db1.Model):
__tablename__ = 'user_{id % 2}' # 动态表名
id = db1.Column(db1.Integer, primary_key=True)
name = db1.Column(db1.String(50))

限流

用 Flask 或 FastAPI 开发 API 时,流量一上来,服务器容易被挤爆,体验直接拉胯!2025年,限流成了保护系统的重要手段,既能防恶意攻击,也能优化资源使用。想象一下,恶意爬虫狂轰滥炸,或者用户不小心刷爆接口,服务器瞬间瘫痪。限流就是设定请求上限,比如每分钟 100 次,超了就拒绝,保护系统不崩。2025年,随着 DDoS 攻击更猖獗,限流更是必备技能。走,咱一起搞懂

pip install flask-limiter

加个限流装饰器,限制每分钟 100 次请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from flask import Flask
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

app = Flask(__name__)
limiter = Limiter(
app=app,
key_func=get_remote_address,
default_limits=["100 per minute"]
)

@app.route('/api/data')
@limiter.limit("100 per minute")
def get_data():
return {"message": "Hello from Flask!"}

if __name__ == "__main__":
app.run(debug=True)

key_func:按 IP 限流(get_remote_address)。
default_limits:全局限制,超了返回 429 状态码。
效果:每分钟 100 次,超限用户会看到“Too Many Requests”。

如果用用户认证,可以按用户 ID 限流,改 key_func:

1
2
3
4
def get_user_id():
return getattr(g, 'user_id', get_remote_address())

limiter = Limiter(app=app, key_func=get_user_id)

fastapi中的使用

pip install fastapi-limiter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from fastapi import FastAPI, Depends
from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter
from redis.asyncio import Redis

app = FastAPI()

# 初始化 Redis(限流存储)
redis = Redis(host="localhost", port=6379, encoding="utf-8", decode_responses=True)
FastAPILimiter.init(redis)

@app.get("/api/data", dependencies=[Depends(RateLimiter(times=100, seconds=60))])
async def get_data():
return {"message": "Hello from FastAPI!"}

消息队列:异步处理解耦

高并发时,请求堆积容易崩盘,消息队列来救场!它把耗时任务(比如发送邮件、处理订单)丢到队列,API 快速响应用户。2025年,RabbitMQ、Kafka 和 Redis Queue(RQ)是热门选择。

FastAPI 示例(用 RQ):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from fastapi import FastAPI
from rq import Queue
from redis import Redis
from time import sleep

app = FastAPI()
redis_conn = Redis(host='localhost', port=6379)
q = Queue(connection=redis_conn)

def long_task(seconds):
sleep(seconds)
return f"Task done after {seconds} seconds"

@app.post("/task/")
async def create_task(seconds: int):
job = q.enqueue(long_task, seconds)
return {"message": "Task queued", "job_id": job.id}

Flask 示例(用 Celery):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from flask import Flask
from celery import Celery

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])

@celery.task
def long_task(seconds):
sleep(seconds)
return f"Task done after {seconds} seconds"

@app.route('/task/', methods=['POST'])
def create_task():
result = long_task.delay(5)
return {"message": "Task queued", "task_id": result.id}

优点:解耦主线程,异步处理耗时任务,2025年分布式队列更成熟,适合超高并发。
安装:pip install rq celery redis(按需选)。

使用rabbitmq

作为 削峰 解耦 异步处理的核心

在python中最常使用的rabbitmq库应该是pika,但是一般使用pika时都是一个py文件为一个消费者,需要全部手动启动。那么如何做到像java的Springboot一样可以直接使用注解完成消费者队列的监听,这里使用到的一个第三方库是flask_rabbitmq。

pip install flask_rabbitmq

其他方法

  • 压力测试:用 Locust 或 JMeter 模拟高并发,找瓶颈。
  • 日志监控:加 Sentry 或 ELK,实时看服务器状态。
  • 超时设置:设请求超时,防止死循环拖垮系统。
  • CDN 加速:用 Cloudflare,把静态资源甩给 CDN。对于静态资源(如图片、CSS、JavaScript等),可以使用CDN(内容分发网络)来加速资源的传输和加载,减轻服务器的负载。