2025
请求返回一部分内容后中断,但服务器本身没有主动断开连接
git config pull.rebase false
20250514
UUID vs 雪花ID Base62 编码:唯一标识符在系统设计中的全面对比与选型指南
OpenAPI 到 FastAPI 服务端代码生成项目
ollama Segmentation fault
在 Temporal Python SDK 中系统化管理 RetryPolicy
本文档使用 MrDoc 发布
-
+
首页
在 Temporal Python SDK 中系统化管理 RetryPolicy
在 **Temporal Python SDK** 项目里“系统化管理 `RetryPolicy`”的可落地做法与示例。目标是:**统一、可配置、可测试、易观测**,并避免把策略分散在各个 workflow/activity 调用点。 --- # 一、放在哪里设置 `RetryPolicy` * **Activity 调用处**(最常见): 在 workflow 里 `await workflow.execute_activity(...)` 传入 `retry_policy=RetryPolicy(...)`。 * **子工作流(Child Workflow)**: 在 `workflow.start_child_workflow(...)` 里传入 `retry_policy`。 * **不要依赖默认值**: 不同版本/语言默认值可能不同,且容易被忽略。**强烈建议显式设置**所有关键字段(初始间隔、回退系数、最大间隔、最大尝试次数、非重试错误类型等)。 --- # 二、统一“策略档位”(Profiles) 把常见场景抽象为少数几个可复用的“档位”,并集中管理,避免每次手写一堆数值。 ```python # retry_policies.py from datetime import timedelta from temporalio.common import RetryPolicy from dataclasses import dataclass from typing import Dict, List, Optional @dataclass(frozen=True) class RetryProfile: initial_interval: float backoff_coefficient: float max_interval: float maximum_attempts: int non_retryable_error_types: Optional[List[str]] = None def to_retry_policy(self) -> RetryPolicy: return RetryPolicy( initial_interval=timedelta(seconds=self.initial_interval), backoff_coefficient=self.backoff_coefficient, maximum_interval=timedelta(seconds=self.max_interval), maximum_attempts=self.maximum_attempts, non_retryable_error_types=self.non_retryable_error_types or [], ) PROFILES: Dict[str, RetryProfile] = { # 对外API:易抖动、可幂等,次数稍多但要控住最大间隔 "external_api": RetryProfile( initial_interval=1.0, backoff_coefficient=2.0, max_interval=60.0, maximum_attempts=8, non_retryable_error_types=["ValidationError", "PermissionDeniedError", "NotFoundError"], ), # 内部IO:通常稳定,快速失败+少次重试 "internal_io": RetryProfile( initial_interval=0.5, backoff_coefficient=2.0, max_interval=10.0, maximum_attempts=5, ), # 关键但幂等的长任务:允许更长间隔+更多次 "long_idempotent": RetryProfile( initial_interval=2.0, backoff_coefficient=2.0, max_interval=120.0, maximum_attempts=12, ), # 业务错误不可逆:基本不重试 "no_retry": RetryProfile( initial_interval=1.0, backoff_coefficient=1.0, max_interval=1.0, maximum_attempts=1, ), } def get_retry_policy(name: str) -> RetryPolicy: return PROFILES[name].to_retry_policy() ``` **使用:** ```python # workflows.py from temporalio import workflow from datetime import timedelta from retry_policies import get_retry_policy @workflow.defn class OrderWorkflow: @workflow.run async def run(self, order_id: str) -> None: # 对外API活动使用统一档位 await workflow.execute_activity( "charge_payment", order_id, start_to_close_timeout=timedelta(seconds=30), retry_policy=get_retry_policy("external_api"), ) ``` --- # 三、支持配置化(YAML/ENV) 为了按环境(dev/staging/prod)或按租户动态调整,给 `PROFILES` 加一层 **配置加载**。建议:**代码给默认档位**,再用配置覆盖。 ```python # retry_config.py import os, json from retry_policies import PROFILES, RetryProfile def load_overrides_from_env(): # 例如:export TEMPORAL_RETRY_OVERRIDES='{"external_api":{"maximum_attempts":6}}' raw = os.getenv("TEMPORAL_RETRY_OVERRIDES") if not raw: return data = json.loads(raw) for name, patch in data.items(): base = PROFILES.get(name) if not base: continue updated = RetryProfile( initial_interval=patch.get("initial_interval", base.initial_interval), backoff_coefficient=patch.get("backoff_coefficient", base.backoff_coefficient), max_interval=patch.get("max_interval", base.max_interval), maximum_attempts=patch.get("maximum_attempts", base.maximum_attempts), non_retryable_error_types=patch.get("non_retryable_error_types", base.non_retryable_error_types), ) PROFILES[name] = updated # 在应用启动时调用 load_overrides_from_env() ``` > 也可以用 YAML(如 `retry_policies.yaml`)+ `pyyaml`,或用更成熟的配置系统(Dynaconf、Pydantic Settings)。 --- # 四、异常分类与不重试错误 * 在 **Activity 内**对“业务不可逆错误”抛出带标记的异常,并纳入 `non_retryable_error_types`。 * Temporal Python 可抛 `ApplicationError` 并设置为不重试(如:`non_retryable=True`),或抛出特定异常类名并在策略里列入。 ```python # activities.py from temporalio import activity from temporalio.exceptions import ApplicationError class ValidationError(Exception): ... class PermissionDeniedError(Exception): ... @activity.defn async def charge_payment(order_id: str): if not is_valid(order_id): # 方式1:异常类名放到 non_retryable_error_types raise ValidationError("invalid order_id") if not has_permission(order_id): # 方式2:直接抛 ApplicationError 并标 non-retryable raise ApplicationError("no permission", non_retryable=True) # ... 调第三方支付 ``` **建议分类:** * **不重试**:Validation/Permission/NotFound/Conflict 等“重试也不会成功”的业务错误。 * **可重试**:超时、连接故障、限流、暂时性 5xx 等瞬时故障。 * **谨慎重试**:写操作非幂等时(如扣款),务必**先实现幂等**(幂等键/去重表/幂等接口)再放开重试。 --- # 五、把“每次都传策略”变成强制约束 避免漏配策略可用两种手段: 1. **封装调用**: 写一个 helper,所有 activity/child-workflow 调用都从这里走,强制要求传入 profile 名称。 ```python # exec_helpers.py from temporalio import workflow from retry_policies import get_retry_policy from datetime import timedelta async def exec_activity(name: str, *args, profile: str, timeout_s: int = 30): return await workflow.execute_activity( name, *args, start_to_close_timeout=timedelta(seconds=timeout_s), retry_policy=get_retry_policy(profile), ) ``` 2. **静态/单测约束**: * 用代码搜索(lint/grep)禁止出现直接调用 `workflow.execute_activity` 的模式。 * 写**单元测试**确保各关键 activity 都使用期望的 profile(见下一节)。 --- # 六、可测试性示例 ```python # test_retry_policies.py from retry_policies import get_retry_policy def test_external_api_policy_shape(): p = get_retry_policy("external_api") assert p.maximum_attempts == 8 assert p.backoff_coefficient == 2.0 assert "ValidationError" in (p.non_retryable_error_types or []) ``` 对关键活动/工作流的调用也可以用 **回放测试**(Workflow Replay)验证重试轨迹是否满足预期。 --- # 七、观测与排障 * 在 **Activity** 里记录 `activity.info().attempt`(第几次尝试)、`activity.info().scheduled_time` 等,方便排障。 * 结合日志/指标: * 指标:`activity_retry_attempts{activity="charge_payment"}`、`activity_failed_non_retryable_total` * 告警:达到最大尝试次数仍失败、连续多起非重试失败等。 * 在第三方调用返回里区分 **可重试/不可重试** 错误码,映射到对应异常。 ```python # activities.py from temporalio import activity @activity.defn async def charge_payment(order_id: str): info = activity.info() activity.logger.info("attempt=%s", info.attempt) # ... ``` --- # 八、避免常见坑 1. **重复重试**:不要在 Activity 里再套自定义重试,与 Temporal 的重试叠加可能导致指数爆炸。 2. **幂等性不到位**:对外部“写操作”,务必先实现幂等(幂等键/去重)再开放重试。 3. **统一超时与重试匹配**:`start_to_close_timeout` 要合理覆盖一次尝试的最长执行时间;`maximum_interval` 不应大于上游/下游SLA。 4. **无上限**:避免无限次重试;为每个 profile 设定清晰的 `maximum_attempts`。 5. **非重试错误漏网**:定期审计日志,把频繁出现但“重试无意义”的错误加入 `non_retryable_error_types` 或改为 `ApplicationError(non_retryable=True)`。 --- # 九、按租户/用例动态覆盖(可选) * 在 **输入参数**或 **搜索属性**里带上租户/优先级,然后在调用 helper 时按规则选择 profile,或对某些字段做小幅覆盖(例如对 VIP 客户提高 `maximum_attempts`)。 ```python # exec_helpers.py (变体) def choose_profile(tenant: str) -> str: return "external_api_premium" if tenant in VIPS else "external_api" # workflow 中: await exec_activity("charge_payment", order_id, profile=choose_profile(tenant)) ``` --- # 十、Jitter(抖动) Temporal 的重试退避是确定性的;如需降低碰撞,可在**调用侧**为不同实例随机化 `initial_interval` 的微小偏移(例如 ±10%),并在 profile 里留出该参数以可配置。 --- **一句话落地**: * **集中定义档位**(profiles)→ **统一 helper** 强制使用 → **配置化覆盖**(ENV/YAML)→ **单测+观测**兜底 → **定期审计 non-retryable** 列表。 需要我把上面的骨架打成一个最小可运行样例仓库结构(`workflows/`, `activities/`, `retry/`, `tests/`)给你吗?
幻翼
2025年9月25日 13:47
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
PDF文档(打印)
分享
链接
类型
密码
更新密码