跳到主要内容

Cron 内部

Hermes Agent 的计划任务系统允许你安排定期作业。本页详细介绍其内部实现。

概述

Cron 系统支持:

  • 定期作业 — 按 cron 表达式调度
  • 一次性作业 — 在指定时间执行
  • 间隔作业 — 按固定间隔执行
  • 依赖链 — 作业之间的依赖关系

核心组件

组件文件职责
CronSchedulercron/scheduler.py主调度器,管理所有作业
CronJobcron/job.py单个作业定义
CronExecutorcron/executor.py执行上下文管理
CronTriggercron/trigger.py触发器实现

调度器架构

class CronScheduler:
def __init__(self):
self.jobs: dict[str, CronJob] = {}
self.executor = CronExecutor()
self.running = False

def add_job(self, job: CronJob):
"""添加新作业"""
self.jobs[job.id] = job

def remove_job(self, job_id: str):
"""移除作业"""
self.jobs.pop(job_id, None)

async def start(self):
"""启动调度器"""
self.running = True
while self.running:
await self._tick()
await asyncio.sleep(1)

async def _tick(self):
"""每个tick检查是否有作业应该运行"""
now = datetime.now()
for job in self.jobs.values():
if job.should_run(now):
await self.executor.run(job)

作业定义

@dataclass
class CronJob:
id: str
name: str
cron_expr: str # 6字段 cron 表达式
command: str # 要执行的命令
enabled: bool = True
max_retries: int = 3
timeout: Optional[int] = None # 秒
on_failure: Optional[str] = None # 失败时执行的命令

# 钩子
on_start: Optional[Callable] = None
on_success: Optional[Callable] = None
on_failure: Optional[Callable] = None

Cron 表达式

Hermes 使用 6 字段 cron 表达式:

┌───────────── 分钟 (0-59)
│ ┌───────────── 小时 (0-23)
│ │ ┌───────────── 日期 (1-31)
│ │ │ ┌───────────── 月份 (1-12)
│ │ │ │ ┌───────────── 星期 (0-6, 0 = 周日)
│ │ │ │ │ ┌───────────── 可选秒 (0-59)
│ │ │ │ │ │
* * * * * *

示例:

表达式含义
0 * * * *每小时整点
0 9 * * 1-5工作日 9:00
*/15 * * * *每 15 分钟
0 0 1 * *每月第一天
0 0 * * 0每周日午夜

执行上下文

class CronExecutor:
def __init__(self):
self.max_concurrent = 5
self.semaphore = asyncio.Semaphore(self.max_concurrent)

async def run(self, job: CronJob):
async with self.semaphore:
ctx = ExecutionContext(job)
await ctx.execute()

执行上下文

@dataclass
class ExecutionContext:
job: CronJob
started_at: datetime
attempts: int = 0
output: str = ""
error: Optional[str] = None
status: Literal["pending", "running", "success", "failed"] = "pending"

async def execute(self):
self.status = "running"
try:
result = await asyncio.wait_for(
run_command(self.job.command),
timeout=self.job.timeout
)
self.output = result.stdout
self.status = "success"
self._trigger_hook("on_success")
except Exception as e:
self.error = str(e)
self.status = "failed"
self._trigger_hook("on_failure")
if self.job.on_failure:
await run_command(self.job.on_failure)

持久化

作业定义存储在 ~/.hermes/cron/jobs.json

{
"jobs": [
{
"id": "daily-report",
"name": "每日报告",
"cron_expr": "0 9 * * *",
"command": "hermes run report --format html",
"enabled": true,
"max_retries": 3
}
]
}

执行历史存储在 ~/.hermes/cron/history/

~/.hermes/cron/history/
├── daily-report/
│ ├── 2026-03-30-09-00-00.json
│ ├── 2026-03-31-09-00-00.json
│ └── 2026-04-01-09-00-00.json

CLI 命令

# 列出所有作业
hermes cron list

# 添加作业
hermes cron add "daily-report" "0 9 * * *" "hermes run report"

# 移除作业
hermes cron remove daily-report

# 暂停/恢复作业
hermes cron pause daily-report
hermes cron resume daily-report

# 查看作业历史
hermes cron history daily-report

# 手动运行作业
hermes cron run daily-report

作业类型

命令作业

hermes cron add my-job "0 * * * *" "echo 'Hello'"

脚本作业

hermes cron add backup "0 2 * * *" "bash /path/to/backup.sh"

Agent 作业

hermes cron add daily-briefing "0 8 * * *" "hermes run --skill daily-briefing"

依赖管理

class JobDependency:
def __init__(self, job_id: str, depends_on: list[str]):
self.job_id = job_id
self.depends_on = depends_on

async def can_run(self) -> bool:
"""检查所有依赖作业是否成功完成"""
for dep_id in self.depends_on:
status = await get_last_status(dep_id)
if status != "success":
return False
return True

通知

通知类型

类型触发时机配置
always始终cron notify always
on_success仅成功cron notify on_success
on_failure仅失败cron notify on_failure
never从不cron notify never

通知通道

# config.yaml
cron:
notifications:
channel: "telegram" # 或 "email", "discord", 等
recipients:
- "me"

错误处理

重试策略

async def execute_with_retry(job: CronJob):
last_error = None
for attempt in range(job.max_retries):
try:
return await job.execute()
except Exception as e:
last_error = e
wait_time = 2 ** attempt # 指数退避
await asyncio.sleep(wait_time)
raise CronExecutionError(f"Failed after {job.max_retries} retries", last_error)

死信队列

失败的作业进入死信队列:

hermes cron dlq list       # 列出死信
hermes cron dlq retry <id> # 重试
hermes cron dlq purge # 清空

相关文档