"""FastAPI 应用入口""" from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import Response import logging import time from .api import router from .config import settings from .core.logging import setup_logging from .core.tracing import generate_request_id, set_request_id, get_request_id from .core.metrics_unified import ( get_metrics_manager, incr, observe, gauge_incr, gauge_decr, export, ) from .core.job_manager import get_job_manager, shutdown_job_manager # 设置日志 setup_logging( level=settings.log_level, format_type=settings.log_format, file_path=settings.log_file_path if settings.log_file_enabled else None, ) logger = logging.getLogger(__name__) # 创建 FastAPI 应用 app = FastAPI( title=settings.app_name, description="算法工程化 Serverless 脚手架 - 提供标准化的算法服务接口", version=settings.app_version, docs_url="/docs", redoc_url="/redoc", openapi_url="/openapi.json", ) # CORS 中间件 app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 请求日志中间件 @app.middleware("http") async def log_requests(request: Request, call_next): """记录所有HTTP请求""" # 从请求头获取或生成 request_id request_id = request.headers.get("x-request-id") or generate_request_id() set_request_id(request_id) logger.info(f"Request: {request.method} {request.url.path}") response = await call_next(request) logger.info(f"Response: {response.status_code}") return response def normalize_path(path: str) -> str: """ 规范化路径,将路径参数替换为模板形式 Args: path: 原始路径 Returns: 规范化后的路径 Examples: /jobs/a1b2c3d4e5f6 -> /jobs/{job_id} /invoke -> /invoke """ # 匹配 /jobs/{任意字符串} 模式 if path.startswith("/jobs/") and len(path) > 6: return "/jobs/{job_id}" return path # 指标跟踪中间件 @app.middleware("http") async def track_metrics(request: Request, call_next): """记录所有HTTP请求的指标""" if not settings.metrics_enabled: return await call_next(request) # 跳过不需要记录指标的端点 skip_paths = {"/metrics", "/readyz", "/healthz"} if request.url.path in skip_paths: return await call_next(request) gauge_incr("http_requests_in_progress") start_time = time.time() status = "success" try: response = await call_next(request) # 根据 HTTP 状态码判断成功或失败 if response.status_code >= 400: status = "error" return response except Exception as e: status = "error" raise e finally: elapsed = time.time() - start_time # 使用规范化后的路径记录指标 normalized_path = normalize_path(request.url.path) incr( "http_requests_total", {"method": request.method, "endpoint": normalized_path, "status": status}, ) observe( "http_request_duration_seconds", {"method": request.method, "endpoint": normalized_path}, elapsed, ) gauge_decr("http_requests_in_progress") # 注册路由 app.include_router(router, tags=["Algorithm"]) # Prometheus 指标端点 @app.get( "/metrics", tags=["Monitoring"], summary="Prometheus 指标", description="导出 Prometheus 格式的监控指标", ) async def metrics(): """ Prometheus 指标端点 返回应用的监控指标,供 Prometheus 抓取 """ if not settings.metrics_enabled: return Response(content="Metrics disabled", status_code=404) return Response( content=export(), media_type="text/plain; version=0.0.4; charset=utf-8", ) # 启动事件 @app.on_event("startup") async def startup_event(): """应用启动时执行""" logger.info(f"Starting {settings.app_name} v{settings.app_version}") logger.info(f"Environment: {settings.app_env}") logger.info(f"Metrics enabled: {settings.metrics_enabled}") # 初始化指标管理器 if settings.metrics_enabled: manager = get_metrics_manager() if manager.is_available(): logger.info("Redis 指标收集已启用") else: logger.warning("Redis 不可用,指标将不会被收集") # 初始化任务管理器 try: job_manager = await get_job_manager() if job_manager.is_available(): logger.info("异步任务管理器已启用") else: logger.warning("Redis 不可用,异步任务功能将不可用") except Exception as e: logger.warning(f"任务管理器初始化失败: {e}") # 关闭事件 @app.on_event("shutdown") async def shutdown_event(): """应用关闭时执行""" logger.info(f"Shutting down {settings.app_name}") # 关闭任务管理器 try: await shutdown_job_manager() logger.info("任务管理器已关闭") except Exception as e: logger.warning(f"任务管理器关闭失败: {e}") if __name__ == "__main__": import uvicorn uvicorn.run( "functional_scaffold.main:app", host=settings.host, port=settings.port, reload=settings.app_env == "development", log_level=settings.log_level.lower(), )