FastAPI/Flask 如何实现高并发:2025年实用技巧
FastAPI/Flask 如何实现高并发:2025年实用技巧 关键
异步编程 :用FastAPI的异步特性提升性能。
负载均衡 :分担流量,防止单点崩溃。
缓存优化 :减少数据库压力,提升响应速度。
Web服务器升级 :Gunicorn+Uvicorn组合更高效。
引言:高并发不是梦 嘿,朋友们!最近用 FastAPI 或 Flask 开发 API,感觉并发量一上来就卡得不行?别慌,高并发问题其实有解!2025年,随着项目规模扩大,优化这些框架的性能成了热门话题。无论是处理几千个请求,还是应对突发流量,我今天就带你看看怎么让 FastAPI 和 Flask 跑得飞快!
基础认知:并发问题的根源 FastAPI 和 Flask 都是 Python 的 Web 框架,但默认情况下,它们处理并发能力有限。Flask 是同步框架,单线程处理请求;FastAPI 虽支持异步,但也得优化才能发挥潜力。2025年的服务器环境更复杂了,流量高峰、数据库瓶颈这些问题得提前解决。走,咱们一步步搞定!
优化方案:让 API 飞起来 1. 异步编程:FastAPI 的杀手锏 FastAPI 内置了对异步的支持,用 async 和 await 能让它处理多个请求不堵塞。比如:
1 2 3 4 5 6 7 from fastapi import FastAPIapp = FastAPI() @app.get("/items/" ) async def read_items (): return {"message" : "Hello, async world!" }
用异步库如 aiohttp 处理外部请求,2025年最新的 httpx 也挺火,能异步调用 API,效率翻倍。Flask 想玩异步,得加 asyncio 或用 Flask 3.0 的新特性,但 FastAPI 天然占优。
负载均衡:分担压力 单台服务器扛不住大流量?试试负载均衡!用 Nginx 或 Traefik,把请求分到多台服务器上。2025年,Kubernetes 配合这些工具更流行,自动扩展实例,流量一多就加机器,稳得一批。配置 Nginx 很简单,改改配置文件就行:
1 2 3 4 5 6 7 8 9 10 11 upstream myapp { server 127.0.0.1:8000 ; server 127.0.0.1:8001 ; } server { listen 80 ; location / { proxy_pass http://myapp; } }
缓存优化:快人一步 数据库查询是瓶颈?加个缓存!Redis 或 Memcached 是好选择,把常见数据存进去,请求直接从内存拿,秒回。FastAPI 里可以用 aioredis:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from fastapi import FastAPIimport aioredisapp = FastAPI() redis = aioredis.from_url("redis://localhost" ) @app.get("/data/" ) async def get_data (): cached = await redis.get("my_key" ) if cached: return {"data" : cached.decode()} data = "Hello from DB" await redis.set ("my_key" , data, ex=3600 ) return {"data" : data}
Web 服务器升级:Gunicorn + Uvicorn 默认的开发服务器扛不住生产环境?用 Gunicorn 搭配 Uvicorn。Gunicorn 管理多个工作进程,Uvicorn 跑异步任务,完美组合。启动命令:
gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app
-w 4 设 4 个工作进程,2025年建议根据 CPU 核心数调整,发挥最大性能。Flask 也适用这个组合,记得加 gevent 异步支持。
数据库优化:异步驱动 数据库优化:对于频繁的数据库操作,可以优化数据库结构、索引等,提高查询性能。
数据库慢?用异步数据库驱动!FastAPI 配 asyncpg(PostgreSQL)或 aiomysql让查询不阻塞主线程。2025年,NoSQL, 如 MongoDB 的异步支持也更成熟,选对驱动事半功倍。
数据库其他解决思路 读写分离 分库分表 集群
分库分表 分库逻辑:按 user_id % 2 选库,扩展时可加更多。 注意:每次切换得重设 SQLALCHEMY_DATABASE_URI。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from fastapi import FastAPI, Dependsfrom fastapi_limiter import FastAPILimiterfrom fastapi_limiter.depends import RateLimiterfrom redis.asyncio import Redisapp = FastAPI() redis = Redis(host="localhost" , port=6379 , encoding="utf-8" , decode_responses=True ) FastAPILimiter.init(redis) @app.get("/api/data" , dependencies=[Depends(RateLimiter(times=100 , seconds=60 ) )] ) async def get_data (): return {"message" : "Hello from FastAPI!" }
按用户 ID 取模分表:
1 2 3 4 class User (db1.Model ): __tablename__ = 'user_{id % 2}' id = db1.Column(db1.Integer, primary_key=True ) name = db1.Column(db1.String(50 ))
限流 用 Flask 或 FastAPI 开发 API 时,流量一上来,服务器容易被挤爆,体验直接拉胯!2025年,限流成了保护系统的重要手段,既能防恶意攻击,也能优化资源使用。想象一下,恶意爬虫狂轰滥炸,或者用户不小心刷爆接口,服务器瞬间瘫痪。限流就是设定请求上限,比如每分钟 100 次,超了就拒绝,保护系统不崩。2025年,随着 DDoS 攻击更猖獗,限流更是必备技能。走,咱一起搞懂
pip install flask-limiter
加个限流装饰器,限制每分钟 100 次请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from flask import Flask from flask_limiter import Limiter from flask_limiter.util import get_remote_address app = Flask(__name__) limiter = Limiter( app=app, key_func=get_remote_address, default_limits=["100 per minute"] ) @app.route('/api/data') @limiter.limit("100 per minute") def get_data(): return {"message": "Hello from Flask!"} if __name__ == "__main__": app.run(debug=True)
key_func:按 IP 限流(get_remote_address)。 default_limits:全局限制,超了返回 429 状态码。 效果:每分钟 100 次,超限用户会看到“Too Many Requests”。
如果用用户认证,可以按用户 ID 限流,改 key_func:
1 2 3 4 def get_user_id(): return getattr(g, 'user_id', get_remote_address()) limiter = Limiter(app=app, key_func=get_user_id)
fastapi中的使用 pip install fastapi-limiter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from fastapi import FastAPI, Dependsfrom fastapi_limiter import FastAPILimiterfrom fastapi_limiter.depends import RateLimiterfrom redis.asyncio import Redisapp = FastAPI() redis = Redis(host="localhost" , port=6379 , encoding="utf-8" , decode_responses=True ) FastAPILimiter.init(redis) @app.get("/api/data" , dependencies=[Depends(RateLimiter(times=100 , seconds=60 ) )] ) async def get_data (): return {"message" : "Hello from FastAPI!" }
消息队列:异步处理解耦 高并发时,请求堆积容易崩盘,消息队列来救场!它把耗时任务(比如发送邮件、处理订单)丢到队列,API 快速响应用户。2025年,RabbitMQ、Kafka 和 Redis Queue(RQ)是热门选择。
FastAPI 示例(用 RQ):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 from fastapi import FastAPIfrom rq import Queuefrom redis import Redisfrom time import sleepapp = FastAPI() redis_conn = Redis(host='localhost' , port=6379 ) q = Queue(connection=redis_conn) def long_task (seconds ): sleep(seconds) return f"Task done after {seconds} seconds" @app.post("/task/" ) async def create_task (seconds: int ): job = q.enqueue(long_task, seconds) return {"message" : "Task queued" , "job_id" : job.id }
Flask 示例(用 Celery):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 from flask import Flaskfrom celery import Celeryapp = Flask(__name__) app.config['CELERY_BROKER_URL' ] = 'redis://localhost:6379/0' app.config['CELERY_RESULT_BACKEND' ] = 'redis://localhost:6379/0' celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL' ]) @celery.task def long_task (seconds ): sleep(seconds) return f"Task done after {seconds} seconds" @app.route('/task/' , methods=['POST' ] ) def create_task (): result = long_task.delay(5 ) return {"message" : "Task queued" , "task_id" : result.id }
优点:解耦主线程,异步处理耗时任务,2025年分布式队列更成熟,适合超高并发。 安装:pip install rq celery redis(按需选)。
使用rabbitmq 作为 削峰 解耦 异步处理的核心
在python中最常使用的rabbitmq库应该是pika,但是一般使用pika时都是一个py文件为一个消费者,需要全部手动启动。那么如何做到像java的Springboot一样可以直接使用注解完成消费者队列的监听,这里使用到的一个第三方库是flask_rabbitmq。
pip install flask_rabbitmq
其他方法
压力测试:用 Locust 或 JMeter 模拟高并发,找瓶颈。
日志监控:加 Sentry 或 ELK,实时看服务器状态。
超时设置:设请求超时,防止死循环拖垮系统。
CDN 加速:用 Cloudflare,把静态资源甩给 CDN。对于静态资源(如图片、CSS、JavaScript等),可以使用CDN(内容分发网络)来加速资源的传输和加载,减轻服务器的负载。