main:新增健康检查支持和服务优化
- 在 Worker 中引入轻量级 HTTP 服务器,支持健康检查和就绪检查端点。 - 在 Kubernetes 和 Docker 配置中新增健康检查探针,提升服务稳定性。 - 更新依赖,引入 `aiohttp` 用于健康检查服务。 - 优化部署配置,调整 Redis 主机配置和镜像地址以适配新环境。
This commit is contained in:
@@ -9,6 +9,8 @@ import signal
|
||||
import sys
|
||||
from typing import Optional
|
||||
|
||||
from aiohttp import web
|
||||
|
||||
from .config import settings
|
||||
from .core.job_manager import JobManager
|
||||
from .core.logging import setup_logging
|
||||
@@ -17,6 +19,53 @@ from .core.tracing import set_request_id
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HealthCheckServer:
|
||||
"""轻量级健康检查 HTTP 服务器
|
||||
|
||||
为 Worker 模式提供健康检查端点,满足 FC 3.0 容器健康检查要求。
|
||||
"""
|
||||
|
||||
def __init__(self, host: str = "0.0.0.0", port: int = 8000):
|
||||
self._host = host
|
||||
self._port = port
|
||||
self._app: Optional[web.Application] = None
|
||||
self._runner: Optional[web.AppRunner] = None
|
||||
self._site: Optional[web.TCPSite] = None
|
||||
self._healthy = True
|
||||
|
||||
async def start(self) -> None:
|
||||
"""启动健康检查服务器"""
|
||||
self._app = web.Application()
|
||||
self._app.router.add_get("/healthz", self._healthz_handler)
|
||||
self._app.router.add_get("/readyz", self._readyz_handler)
|
||||
|
||||
self._runner = web.AppRunner(self._app)
|
||||
await self._runner.setup()
|
||||
self._site = web.TCPSite(self._runner, self._host, self._port)
|
||||
await self._site.start()
|
||||
logger.info(f"健康检查服务器已启动: http://{self._host}:{self._port}")
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""停止健康检查服务器"""
|
||||
if self._runner:
|
||||
await self._runner.cleanup()
|
||||
logger.info("健康检查服务器已停止")
|
||||
|
||||
def set_healthy(self, healthy: bool) -> None:
|
||||
"""设置健康状态"""
|
||||
self._healthy = healthy
|
||||
|
||||
async def _healthz_handler(self, request: web.Request) -> web.Response:
|
||||
"""存活检查端点"""
|
||||
return web.json_response({"status": "healthy", "mode": "worker"})
|
||||
|
||||
async def _readyz_handler(self, request: web.Request) -> web.Response:
|
||||
"""就绪检查端点"""
|
||||
if self._healthy:
|
||||
return web.json_response({"status": "ready", "mode": "worker"})
|
||||
return web.json_response({"status": "not ready"}, status=503)
|
||||
|
||||
|
||||
class JobWorker:
|
||||
"""任务 Worker
|
||||
|
||||
@@ -272,12 +321,21 @@ class JobWorker:
|
||||
logger.error(f"超时任务回收异常: {e}")
|
||||
|
||||
|
||||
def setup_signal_handlers(worker: JobWorker, loop: asyncio.AbstractEventLoop) -> None:
|
||||
def setup_signal_handlers(
|
||||
worker: JobWorker,
|
||||
health_server: HealthCheckServer,
|
||||
loop: asyncio.AbstractEventLoop,
|
||||
) -> None:
|
||||
"""设置信号处理器"""
|
||||
|
||||
async def shutdown_all() -> None:
|
||||
"""关闭所有服务"""
|
||||
await worker.shutdown()
|
||||
await health_server.stop()
|
||||
|
||||
def signal_handler(sig: signal.Signals) -> None:
|
||||
logger.info(f"收到信号 {sig.name},准备关闭...")
|
||||
loop.create_task(worker.shutdown())
|
||||
loop.create_task(shutdown_all())
|
||||
|
||||
for sig in (signal.SIGTERM, signal.SIGINT):
|
||||
loop.add_signal_handler(sig, signal_handler, sig)
|
||||
@@ -288,13 +346,19 @@ async def main() -> None:
|
||||
# 设置日志
|
||||
setup_logging(level=settings.log_level, format_type=settings.log_format)
|
||||
|
||||
# 创建健康检查服务器和 Worker
|
||||
health_server = HealthCheckServer(port=8000)
|
||||
worker = JobWorker()
|
||||
|
||||
# 设置信号处理
|
||||
loop = asyncio.get_running_loop()
|
||||
setup_signal_handlers(worker, loop)
|
||||
setup_signal_handlers(worker, health_server, loop)
|
||||
|
||||
try:
|
||||
# 先启动健康检查服务器,确保 FC 健康检查能通过
|
||||
await health_server.start()
|
||||
|
||||
# 初始化并运行 Worker
|
||||
await worker.initialize()
|
||||
await worker.run()
|
||||
except Exception as e:
|
||||
@@ -302,6 +366,7 @@ async def main() -> None:
|
||||
sys.exit(1)
|
||||
finally:
|
||||
await worker.shutdown()
|
||||
await health_server.stop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user