FastAPI 驱动的 Amazon Bedrock 代理服务:架构设计与实现
FastAPI 驱动的 Amazon Bedrock 代理服务:架构设计与实现
本文将介绍一个基于 FastAPI 的轻量级代理服务,它将 Amazon Bedrock 的强大功能封装在简洁的 HTTP API 背后。我们将深入探讨 FastAPI 的核心原理与优势,并详细说明为什么选择 FastAPI 作为此项目的 Web 框架。
项目地址:https://github.com/mingyu110/AI/tree/main/bedrock-proxy
一、FastAPI 框架:现代 Python Web 开发的优选方案
1.1 FastAPI 的核心原理
FastAPI 是一个现代、高性能的 Python Web 框架,基于以下关键技术构建:
- Starlette:提供 ASGI 支持和 Web 框架基础功能
- Pydantic:提供数据验证和序列化
- 类型提示:利用 Python 3.6+ 的类型注解实现自动数据验证和文档生成
FastAPI 的核心工作原理可以概括为:
# FastAPI 自动处理请求-响应循环
# 1. 接收 HTTP 请求
# 2. 验证请求数据(基于 Pydantic 模型)
# 3. 执行业务逻辑
# 4. 序列化响应数据
# 5. 返回 HTTP 响应
1.2 FastAPI 的核心功能特性
1.2.1 自动 API 文档生成
FastAPI 最强大的特性之一是能够自动生成 OpenAPI 规范的交互式文档:
# 访问 /docs 路径即可获得 Swagger UI 界面
# 访问 /redoc 路径可获得 ReDoc 界面
1.2.2 类型安全与自动验证
利用 Python 类型提示,FastAPI 在运行时自动验证请求和响应数据:
from pydantic import BaseModel
from typing import List, Optional
class ChatRequest(BaseModel):
mode: str = "chat"
model: Optional[str] = None
payload: dict
stream: bool = False
1.2.3 异步高性能
基于 asyncio 和 ASGI,FastAPI 能够处理大量并发请求:
# 支持 async/await 语法
async def handle_request(request: Request):
# 非阻塞 IO 操作
result = await external_service.call()
return result
1.2.4 依赖注入系统
FastAPI 提供优雅的依赖注入机制,便于管理共享资源:
# 共享 Bedrock 客户端实例
def get_bedrock_client() -> boto3.client:
# 单例模式创建客户端
pass
二、为什么选择 FastAPI 作为 Bedrock 代理框架
2.1 性能考量:应对 AI 推理的高延迟场景
Amazon Bedrock 调用通常存在较高延迟(100-2000ms),使用 FastAPI 的异步特性可以:
- 避免请求阻塞:当一个请求在等待 Bedrock 响应时,服务器可以处理其他请求
- 支持高并发:单进程可处理数百个并发连接
- 资源高效利用:非阻塞 IO 减少线程上下文切换开销
# 在传统同步框架中
# 每个请求占用一个线程,线程池耗尽后无法处理新请求
# 在 FastAPI 异步模式中
# 单个线程可处理多个并发请求,通过事件循环调度
2.2 开发效率:快速迭代 AI 应用
2.2.1 自动生成的 API 文档
团队成员可以直接通过浏览器测试 API,无需额外工具:
# 启动服务后访问 http://localhost:8000/docs
# 即可看到完整的 API 文档和测试界面
2.2.2 类型安全减少错误
在构建复杂的 AI 请求负载时,类型提示能及早发现错误:
# 错误示例:将字符串传入需要字典的字段
payload = "text content" # 类型错误将在开发时被发现
# 正确示例
payload = {"messages": [{"role": "user", "content": "text"}]}
2.3 生态系统集成
FastAPI 与现代 Python 工具链无缝集成:
- boto3:AWS SDK 的异步兼容
- Pydantic:与 Bedrock 的 JSON 请求/响应完美匹配
- pytest:支持异步测试
- Docker:轻量化容器部署
2.4 项目长期可维护性
2.4.1 清晰的代码结构
api/ # HTTP 层:路由、请求处理
├── main.py # FastAPI 应用实例
├── chat_handler.py # 请求路由和编排
└── response.py # 响应辅助函数
core/ # 核心层:配置、客户端
├── settings.py # 集中配置管理
└── bedrock_client.py # 共享 Bedrock 客户端
services/ # 服务层:业务逻辑
├── chat_service.py # 聊天服务
└── embedding_service.py # 嵌入服务
2.4.2 配置集中管理
所有配置集中在 core/settings.py,便于管理和修改:
@dataclass
class AppSettings:
aws_region: str = "us-east-1"
bedrock_model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0"
# 修改一处,全局生效
三、Bedrock 代理服务核心架构
3.1 整体架构概览
┌─────────────────┐ HTTP ┌─────────────────┐ AWS SDK ┌─────────────────┐
│ 您的应用 │ ───────────→ │ FastAPI 代理 │ ───────────→ │ Amazon Bedrock │
│ (Web/移动/API) │ /chat │ (本服务) │ boto3 │ (AI 服务) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
↑ ↑
│ 无需 AWS SDK │ 统一错误处理
│ 无需管理凭据 │ 连接池复用
│ 简单 HTTP │ 自动重试
3.2 FastAPI 应用核心结构
3.2.1 应用入口 (api/main.py)
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI(
title="Amazon Bedrock 代理服务",
description="统一的 HTTP API 接口,用于访问 Bedrock 的聊天和嵌入功能",
version="1.0.0"
)
# CORS 配置,支持前端跨域调用
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 生产环境应配置具体域名
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.post("/chat")
async def chat(request: Request):
"""统一的聊天和嵌入接口"""
body = await request.json()
return await handle_chat_request(body)
@app.get("/health")
async def health():
"""健康检查端点"""
return {"status": "ok", "service": "bedrock-proxy"}
3.2.2 请求处理核心 (api/chat_handler.py)
async def handle_chat_request(body: Dict[str, Any]) -> JSONResponse | StreamingResponse:
"""
主要路由逻辑:
1. 判断模式:chat(聊天)还是 embed(嵌入)
2. 判断响应方式:流式还是非流式
3. 调用相应的服务
4. 返回格式化的响应
"""
mode = body.get("mode", "chat")
if mode == "embed":
# 处理嵌入请求
return await handle_embedding(body)
else:
# 处理聊天请求
stream = body.get("stream", False)
if stream:
return await handle_streaming_chat(body)
else:
return await handle_sync_chat(body)
3.3 Bedrock 客户端管理 (core/bedrock_client.py)
def get_bedrock_client() -> boto3.client:
"""
创建并复用 Bedrock 客户端实例
关键配置:
- max_pool_connections: 连接池大小(默认 50)
- retries: 自适应重试策略
- region: AWS 区域
"""
global _bedrock_client
if _bedrock_client is None:
settings = AppSettings.instance()
config = Config(
region_name=settings.aws_region,
max_pool_connections=settings.max_pool_connections,
retries={
"max_attempts": settings.max_retry_attempts,
"mode": "adaptive"
}
)
_bedrock_client = boto3.client(
"bedrock-runtime",
config=config
)
return _bedrock_client
3.4 聊天服务实现 (services/chat_service.py)
3.4.1 同步聊天模式
适用于不需要实时响应的场景:
def chat_sync(self, body: dict) -> dict:
"""
同步聊天流程:
1. 准备请求负载
2. 调用 invoke_model
3. 解析响应
4. 返回完整结果
"""
response = self.client.invoke_model(**invoke_kwargs)
response_body = json.loads(response["body"].read())
return {
"answer": self._extract_text(response_body),
"usage": response_body.get("usage", {}),
"model": model_id
}
3.4.2 流式聊天模式
适用于需要实时显示生成内容的场景:
def chat_stream(self, prepared: tuple):
"""
流式聊天流程:
1. 准备请求负载
2. 调用 invoke_model_with_response_stream
3. 生成器逐块返回内容
4. 客户端实时接收并显示
"""
response = self.client.invoke_model_with_response_stream(**invoke_kwargs)
def generate():
for event in response["body"]:
chunk = json.loads(event["chunk"]["bytes"])
yield {
"text": chunk.get("text", ""),
"is_final": False
}
return generate()
3.5 嵌入服务实现 (services/embedding_service.py)
def embed(self, text: str, model_id: Optional[str] = None) -> dict:
"""
文本嵌入流程:
1. 构建请求负载(支持 Titan 和 Cohere)
2. 调用 invoke_model
3. 解析嵌入向量
4. 返回标准化格式
"""
model = model_id or self.settings.bedrock_embed_model_id
payload = self._build_payload(text, model)
response = self.client.invoke_model(
modelId=model,
body=json.dumps(payload),
contentType="application/json",
accept="application/json"
)
body = json.loads(response["body"].read())
embedding = self._parse_embedding(body, model)
return {
"embedding": embedding,
"dimensions": len(embedding),
"model": model
}
四、配置管理最佳实践
4.1 环境变量驱动的配置
# core/settings.py
@dataclass
class AppSettings:
# AWS 配置
aws_region: str = os.getenv("AWS_REGION", "us-east-1")
bedrock_model_id: str = os.getenv(
"BEDROCK_MODEL_ID",
"anthropic.claude-3-sonnet-20240229-v1:0"
)
# Guardrails 配置
guardrail_id: Optional[str] = os.getenv("BEDROCK_GUARDRAIL_ID")
guardrail_version: str = os.getenv("BEDROCK_GUARDRAIL_VERSION", "1")
# HTTP 调优
max_pool_connections: int = int(os.getenv("MAX_POOL_CONNECTIONS", "50"))
max_retry_attempts: int = int(os.getenv("MAX_RETRY_ATTEMPTS", "5"))
# 单例模式,全局复用
_settings = None
@classmethod
def instance(cls) -> "AppSettings":
if cls._settings is None:
cls._settings = cls()
return cls._settings
4.2 .env 文件示例
# AWS 配置
AWS_REGION=us-east-1
AWS_ACCESS_KEY_ID=your_access_key
AWS_SECRET_ACCESS_KEY=your_secret_key
# Bedrock 模型配置
BEDROCK_MODEL_ID=anthropic.claude-3-sonnet-20240229-v1:0
BEDROCK_EMBED_MODEL_ID=amazon.titan-embed-text-v2:0
# Guardrails(可选)
BEDROCK_GUARDRAIL_ID=gr-xxxxxxxx
BEDROCK_GUARDRAIL_VERSION=1
# CORS 配置
ALLOWED_ORIGINS=http://localhost:3000,https://your-app.com
# 性能调优
MAX_POOL_CONNECTIONS=50
MAX_RETRY_ATTEMPTS=5
五、部署与使用
5.1 本地开发环境
# 1. 克隆项目
git clone https://github.com/loaynaser3/bedrock-proxy.git
cd bedrock-proxy
# 2. 创建虚拟环境
python -m venv venv
source venv/bin/activate # Linux/Mac
# 或 venv\Scripts\activate # Windows
# 3. 安装依赖
pip install -r requirements.txt
# 4. 配置 AWS 凭据
# 方式 1: 使用 .env 文件
cp .env.example .env
# 编辑 .env 文件填写 AWS 凭据
# 方式 2: 使用 AWS CLI
aws configure
# 5. 启动服务
uvicorn api.main:app --host 0.0.0.0 --port 8000 --reload
# 6. 访问文档
# 浏览器打开: http://localhost:8000/docs
5.2 Docker 部署
# Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "api.main:app", "--host", "0.0.0.0", "--port", "8000"]
# 构建镜像
docker build -t bedrock-proxy .
# 运行容器
docker run -d \
--name bedrock-proxy \
-p 8000:8000 \
--env-file .env \
bedrock-proxy
5.3 Kubernetes 部署示例
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: bedrock-proxy
spec:
replicas: 3
selector:
matchLabels:
app: bedrock-proxy
template:
metadata:
labels:
app: bedrock-proxy
spec:
containers:
- name: proxy
image: your-registry/bedrock-proxy:latest
ports:
- containerPort: 8000
env:
- name: AWS_REGION
value: "us-east-1"
- name: BEDROCK_MODEL_ID
value: "anthropic.claude-3-sonnet-20240229-v1:0"
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
---
apiVersion: v1
kind: Service
metadata:
name: bedrock-proxy-service
spec:
selector:
app: bedrock-proxy
ports:
- port: 80
targetPort: 8000
type: LoadBalancer
六、API 使用示例
6.1 聊天补全(同步模式)
import requests
response = requests.post(
"http://localhost:8000/chat",
json={
"mode": "chat",
"model": "anthropic.claude-3-sonnet-20240229-v1:0",
"payload": {
"anthropic_version": "bedrock-2023-05-31",
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": "解释 DevOps 概念"}
]
}
],
"max_tokens": 256
}
}
)
result = response.json()
print(f"回答: {result['answer']}")
print(f"用量: {result['usage']}")
6.2 流式聊天(实时响应)
import requests
import json
response = requests.post(
"http://localhost:8000/chat",
json={
"mode": "chat",
"stream": True,
"model": "anthropic.claude-3-sonnet-20240229-v1:0",
"payload": {
"anthropic_version": "bedrock-2023-05-31",
"messages": [
{
"role": "user",
"content": "写一段关于机器学习的介绍"
}
],
"max_tokens": 512
}
},
stream=True
)
# 逐块接收并显示
for line in response.iter_lines():
if line:
chunk = json.loads(line)
if "text" in chunk:
print(chunk["text"], end="", flush=True)
6.3 文本嵌入
import requests
response = requests.post(
"http://localhost:8000/chat",
json={
"mode": "embed",
"model": "amazon.titan-embed-text-v2:0",
"text": "这是一个测试句子"
}
)
result = response.json()
print(f"嵌入维度: {result['dimensions']}")
print(f"前5个值: {result['embedding'][:5]}")
七、产品级增强建议
7.1 认证与授权
# 添加 API Key 认证
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
async def verify_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)):
if credentials.credentials not in VALID_API_KEYS:
raise HTTPException(status_code=401, detail="Invalid API Key")
return credentials.credentials
@app.post("/chat")
async def chat(request: Request, api_key: str = Depends(verify_api_key)):
# 业务逻辑
pass
7.2 速率限制
from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter
@app.post("/chat", dependencies=[Depends(RateLimiter(times=10, seconds=60))])
async def chat(request: Request):
# 每个用户每分钟最多 10 次请求
pass
7.3 请求日志与监控
import logging
from datetime import datetime
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@app.middleware("http")
async def log_requests(request: Request, call_next):
start_time = datetime.now()
response = await call_next(request)
duration = (datetime.now() - start_time).total_seconds()
logger.info(
f"{request.method} {request.url.path} "
f"Status: {response.status_code} "
f"Duration: {duration:.3f}s"
)
return response
7.4 缓存策略
from functools import lru_cache
@lru_cache(maxsize=128)
def get_embedding_cached(text: str, model: str):
# 缓存频繁请求的嵌入结果
return embedding_service.embed(text, model)
7.5 负载均衡与高可用
在生产环境建议:
- 部署多个实例:使用 Kubernetes 或 ECS 部署 3+ 实例
- 配置健康检查:定期探测 /health 端点
- 自动扩缩容:根据 CPU/内存指标自动调整实例数
- 使用 ALB:AWS Application Load Balancer 分发流量
八、为什么选择代理架构
8.1 安全性优势
传统直接访问模式:
前端/移动端 → 暴露 AWS 凭据 → 直接调用 Bedrock
❌ 凭据泄露风险
❌ 无法控制访问权限
❌ 难以审计使用量
代理模式:
前端/移动端 → HTTP 调用 → FastAPI 代理 → 安全凭据 → Bedrock
✅ 凭据不暴露
✅ 可添加自定义认证
✅ 完整使用日志
✅ 可实施速率限制
8.2 开发效率提升
| 对比项 | 直接调用 Bedrock | 使用代理服务 |
|---|---|---|
| 前端代码复杂度 | 需要 AWS SDK,处理认证 | 简单 HTTP 请求 |
| 多应用维护 | 各应用独立配置 | 统一配置 |
| 模型切换 | 每个应用修改代码 | 修改代理配置 |
| 错误处理 | 各自实现 | 统一处理 |
| 连接池管理 | 各自实现 | 集中优化 |
8.3 可扩展性设计
代理架构为 future enhancement 提供了扩展点:
# 示例:添加自定义工具调用
class CustomToolService:
def __init__(self, bedrock_client):
self.client = bedrock_client
def register_tool(self, name: str, function: Callable):
# 注册自定义工具
pass
def execute_tool_call(self, tool_call: dict):
# 执行工具调用
pass
# 集成到 ChatService
chat_service = ChatService(bedrock_client, settings)
chat_service.tool_executor = CustomToolService(bedrock_client)
# 示例:添加请求拦截器
class RequestInterceptor:
async def before_request(self, request: dict):
# 日志记录
# 内容审核
# 格式转换
pass
async def after_response(self, response: dict):
# 结果缓存
# 后处理
pass
九、性能优化技巧
9.1 连接池调优
# core/bedrock_client.py
config = Config(
region_name=settings.aws_region,
max_pool_connections=100, # 根据并发量调整
retries={
"max_attempts": 3,
"mode": "adaptive" # 自适应重试策略
}
)
9.2 流式响应优化
# 设置合适的分块大小
@app.post("/chat")
async def chat(request: Request):
response = await chat_service.chat_stream(prepared)
# 使用 StreamingResponse 减少内存占用
return StreamingResponse(
response,
media_type="application/x-ndjson",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no" # 禁用代理缓冲
}
)
9.3 模型预热
# 应用启动时预热连接
@app.on_event("startup")
async def startup_event():
# 初始化 Bedrock 客户端
_ = get_bedrock_client()
# 可选:发送一个测试请求预热模型
logger.info("Bedrock 代理服务启动完成")
十、总结
FastAPI 作为现代化 Python Web 框架,为构建 Amazon Bedrock 代理服务提供了理想的 foundation:
技术优势:
- 高性能:异步 IO 支持高并发,适合 AI 推理场景
- 自动文档:Swagger UI 降低团队协作成本
- 类型安全:Pydantic 模型确保数据完整性
- 易于扩展:中间件和依赖注入支持业务需求演变
- 轻量级:最小化运行时开销
业务价值:
- 安全性:AWS 凭据不再暴露给前端
- 一致性:统一模型配置和 Guardrails 策略
- 可观测性:集中日志和监控
- 简化开发:客户端无需 AWS SDK 知识
通过本文介绍的架构,开发团队可以:
- 快速集成:任何支持 HTTP 的语言/框架均可调用
- 专注业务:屏蔽 AWS SDK 实现细节
- 灵活演进:在代理层添加新功能不影响客户端
- 生产就绪:易于扩展认证、监控、缓存等企业级特性
无论您是构建 AI 驱动的 Web 应用、移动应用还是微服务架构,这个 FastAPI + Amazon Bedrock 的代理模式都能显著提升开发效率和系统可维护性。