From 55419443cdfc82e2485d2a3f513978828edf4422 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roog=20=28=E9=A1=BE=E6=96=B0=E5=9F=B9=29?= Date: Wed, 4 Feb 2026 11:58:56 +0800 Subject: [PATCH] =?UTF-8?q?main:=E6=96=B0=E5=A2=9E=E5=81=A5=E5=BA=B7?= =?UTF-8?q?=E6=A3=80=E6=9F=A5=E6=94=AF=E6=8C=81=E5=92=8C=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在 Worker 中引入轻量级 HTTP 服务器,支持健康检查和就绪检查端点。 - 在 Kubernetes 和 Docker 配置中新增健康检查探针,提升服务稳定性。 - 更新依赖,引入 `aiohttp` 用于健康检查服务。 - 优化部署配置,调整 Redis 主机配置和镜像地址以适配新环境。 --- deployment/docker-compose.yml | 9 ++++ deployment/kubernetes/deployment.yaml | 23 ++++++--- deployment/serverless/s.yaml | 15 ++++-- pyproject.toml | 2 + requirements.txt | 1 + src/functional_scaffold/worker.py | 71 +++++++++++++++++++++++++-- 6 files changed, 107 insertions(+), 14 deletions(-) diff --git a/deployment/docker-compose.yml b/deployment/docker-compose.yml index ab7efd1..9a21159 100644 --- a/deployment/docker-compose.yml +++ b/deployment/docker-compose.yml @@ -45,6 +45,9 @@ services: build: context: .. dockerfile: deployment/Dockerfile + platform: linux/amd64 + ports: + - "8112:8000" environment: - APP_ENV=development - LOG_LEVEL=INFO @@ -69,6 +72,12 @@ services: depends_on: redis: condition: service_healthy + healthcheck: + test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/healthz')"] + interval: 30s + timeout: 3s + retries: 3 + start_period: 10s deploy: replicas: 2 diff --git a/deployment/kubernetes/deployment.yaml b/deployment/kubernetes/deployment.yaml index e15a4a5..ff088f0 100644 --- a/deployment/kubernetes/deployment.yaml +++ b/deployment/kubernetes/deployment.yaml @@ -127,16 +127,25 @@ spec: limits: memory: "512Mi" cpu: "500m" - # Worker 没有 HTTP 端口,使用命令探针 + # Worker 现在有 HTTP 健康检查端点 + ports: + - containerPort: 8000 + name: http livenessProbe: - exec: - command: - - python - - -c - - "import redis; r = redis.Redis(host='functional-scaffold-redis'); r.ping()" + httpGet: + path: /healthz + port: 8000 initialDelaySeconds: 10 periodSeconds: 30 - timeoutSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 3 + readinessProbe: + httpGet: + path: /readyz + port: 8000 + initialDelaySeconds: 5 + periodSeconds: 10 + timeoutSeconds: 3 failureThreshold: 3 --- diff --git a/deployment/serverless/s.yaml b/deployment/serverless/s.yaml index f1d0869..fab12a5 100644 --- a/deployment/serverless/s.yaml +++ b/deployment/serverless/s.yaml @@ -5,11 +5,11 @@ name: functional-scaffold access: default vars: - region: cn-hangzhou - image: registry.cn-hangzhou.aliyuncs.com/your-namespace/functional-scaffold:latest - redis_host: r-xxxxx.redis.rds.aliyuncs.com + region: cn-beijing + image: crpi-om2xd9y8cmaizszf-vpc.cn-beijing.personal.cr.aliyuncs.com/test-namespace-gu/fc-test:test-v1 + redis_host: 172.17.133.51 redis_port: "6379" - redis_password: "your-password" + redis_password: "roog-pass-redis" resources: # API 服务函数 @@ -78,6 +78,13 @@ resources: port: 8000 command: - /app/entrypoint.sh + healthCheckConfig: + httpGetUrl: /healthz + initialDelaySeconds: 5 + periodSeconds: 10 + timeoutSeconds: 3 + failureThreshold: 3 + successThreshold: 1 environmentVariables: APP_ENV: production LOG_LEVEL: INFO diff --git a/pyproject.toml b/pyproject.toml index e8373ce..cc6e652 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,8 @@ dependencies = [ "pyyaml>=6.0.0", # HTTP 客户端(Webhook 回调) "httpx>=0.27.0", + # 轻量级 HTTP 服务器(Worker 健康检查) + "aiohttp>=3.9.0", ] [project.optional-dependencies] diff --git a/requirements.txt b/requirements.txt index 6337e8e..b196b02 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ pydantic>=2.5.0 pydantic-settings>=2.0.0 prometheus-client>=0.19.0 python-json-logger>=2.0.7 +aiohttp>=3.9.0 # Redis - 任务队列和指标存储 redis>=5.0.0 diff --git a/src/functional_scaffold/worker.py b/src/functional_scaffold/worker.py index 820e235..a9ae120 100644 --- a/src/functional_scaffold/worker.py +++ b/src/functional_scaffold/worker.py @@ -9,6 +9,8 @@ import signal import sys from typing import Optional +from aiohttp import web + from .config import settings from .core.job_manager import JobManager from .core.logging import setup_logging @@ -17,6 +19,53 @@ from .core.tracing import set_request_id logger = logging.getLogger(__name__) +class HealthCheckServer: + """轻量级健康检查 HTTP 服务器 + + 为 Worker 模式提供健康检查端点,满足 FC 3.0 容器健康检查要求。 + """ + + def __init__(self, host: str = "0.0.0.0", port: int = 8000): + self._host = host + self._port = port + self._app: Optional[web.Application] = None + self._runner: Optional[web.AppRunner] = None + self._site: Optional[web.TCPSite] = None + self._healthy = True + + async def start(self) -> None: + """启动健康检查服务器""" + self._app = web.Application() + self._app.router.add_get("/healthz", self._healthz_handler) + self._app.router.add_get("/readyz", self._readyz_handler) + + self._runner = web.AppRunner(self._app) + await self._runner.setup() + self._site = web.TCPSite(self._runner, self._host, self._port) + await self._site.start() + logger.info(f"健康检查服务器已启动: http://{self._host}:{self._port}") + + async def stop(self) -> None: + """停止健康检查服务器""" + if self._runner: + await self._runner.cleanup() + logger.info("健康检查服务器已停止") + + def set_healthy(self, healthy: bool) -> None: + """设置健康状态""" + self._healthy = healthy + + async def _healthz_handler(self, request: web.Request) -> web.Response: + """存活检查端点""" + return web.json_response({"status": "healthy", "mode": "worker"}) + + async def _readyz_handler(self, request: web.Request) -> web.Response: + """就绪检查端点""" + if self._healthy: + return web.json_response({"status": "ready", "mode": "worker"}) + return web.json_response({"status": "not ready"}, status=503) + + class JobWorker: """任务 Worker @@ -272,12 +321,21 @@ class JobWorker: logger.error(f"超时任务回收异常: {e}") -def setup_signal_handlers(worker: JobWorker, loop: asyncio.AbstractEventLoop) -> None: +def setup_signal_handlers( + worker: JobWorker, + health_server: HealthCheckServer, + loop: asyncio.AbstractEventLoop, +) -> None: """设置信号处理器""" + async def shutdown_all() -> None: + """关闭所有服务""" + await worker.shutdown() + await health_server.stop() + def signal_handler(sig: signal.Signals) -> None: logger.info(f"收到信号 {sig.name},准备关闭...") - loop.create_task(worker.shutdown()) + loop.create_task(shutdown_all()) for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler(sig, signal_handler, sig) @@ -288,13 +346,19 @@ async def main() -> None: # 设置日志 setup_logging(level=settings.log_level, format_type=settings.log_format) + # 创建健康检查服务器和 Worker + health_server = HealthCheckServer(port=8000) worker = JobWorker() # 设置信号处理 loop = asyncio.get_running_loop() - setup_signal_handlers(worker, loop) + setup_signal_handlers(worker, health_server, loop) try: + # 先启动健康检查服务器,确保 FC 健康检查能通过 + await health_server.start() + + # 初始化并运行 Worker await worker.initialize() await worker.run() except Exception as e: @@ -302,6 +366,7 @@ async def main() -> None: sys.exit(1) finally: await worker.shutdown() + await health_server.stop() if __name__ == "__main__":