Cron 内部
Hermes Agent 的计划任务系统允许你安排定期作业。本页详细介绍其内部实现。
概述
Cron 系统支持:
- 定期作业 — 按 cron 表达式调度
- 一次性作业 — 在指定时间执行
- 间隔作业 — 按固定间隔执行
- 依赖链 — 作业之间的依赖关系
核心组件
| 组件 | 文件 | 职责 |
|---|---|---|
CronScheduler | cron/scheduler.py | 主调度器,管理所有作业 |
CronJob | cron/job.py | 单个作业定义 |
CronExecutor | cron/executor.py | 执行上下文管理 |
CronTrigger | cron/trigger.py | 触发器实现 |
调度器架构
class CronScheduler:
def __init__(self):
self.jobs: dict[str, CronJob] = {}
self.executor = CronExecutor()
self.running = False
def add_job(self, job: CronJob):
"""添加新作业"""
self.jobs[job.id] = job
def remove_job(self, job_id: str):
"""移除作业"""
self.jobs.pop(job_id, None)
async def start(self):
"""启动调度器"""
self.running = True
while self.running:
await self._tick()
await asyncio.sleep(1)
async def _tick(self):
"""每个tick检查是否有作业应该运行"""
now = datetime.now()
for job in self.jobs.values():
if job.should_run(now):
await self.executor.run(job)
作业定义
@dataclass
class CronJob:
id: str
name: str
cron_expr: str # 6字段 cron 表达式
command: str # 要执行的命令
enabled: bool = True
max_retries: int = 3
timeout: Optional[int] = None # 秒
on_failure: Optional[str] = None # 失败时执行的命令
# 钩子
on_start: Optional[Callable] = None
on_success: Optional[Callable] = None
on_failure: Optional[Callable] = None
Cron 表达式
Hermes 使用 6 字段 cron 表达式:
┌───────────── 分钟 (0-59)
│ ┌───────────── 小时 (0-23)
│ │ ┌───────────── 日期 (1-31)
│ │ │ ┌───────────── 月份 (1-12)
│ │ │ │ ┌───────────── 星期 (0-6, 0 = 周日)
│ │ │ │ │ ┌───────────── 可选秒 (0-59)
│ │ │ │ │ │
* * * * * *
示例:
| 表达式 | 含义 |
|---|---|
0 * * * * | 每小时整点 |
0 9 * * 1-5 | 工作日 9:00 |
*/15 * * * * | 每 15 分钟 |
0 0 1 * * | 每月第一天 |
0 0 * * 0 | 每周日午夜 |
执行上下文
class CronExecutor:
def __init__(self):
self.max_concurrent = 5
self.semaphore = asyncio.Semaphore(self.max_concurrent)
async def run(self, job: CronJob):
async with self.semaphore:
ctx = ExecutionContext(job)
await ctx.execute()
执行上下文
@dataclass
class ExecutionContext:
job: CronJob
started_at: datetime
attempts: int = 0
output: str = ""
error: Optional[str] = None
status: Literal["pending", "running", "success", "failed"] = "pending"
async def execute(self):
self.status = "running"
try:
result = await asyncio.wait_for(
run_command(self.job.command),
timeout=self.job.timeout
)
self.output = result.stdout
self.status = "success"
self._trigger_hook("on_success")
except Exception as e:
self.error = str(e)
self.status = "failed"
self._trigger_hook("on_failure")
if self.job.on_failure:
await run_command(self.job.on_failure)
持久化
作业定义存储在 ~/.hermes/cron/jobs.json:
{
"jobs": [
{
"id": "daily-report",
"name": "每日报告",
"cron_expr": "0 9 * * *",
"command": "hermes run report --format html",
"enabled": true,
"max_retries": 3
}
]
}
执行历史存储在 ~/.hermes/cron/history/:
~/.hermes/cron/history/
├── daily-report/
│ ├── 2026-03-30-09-00-00.json
│ ├── 2026-03-31-09-00-00.json
│ └── 2026-04-01-09-00-00.json
CLI 命令
# 列出所有作业
hermes cron list
# 添加作业
hermes cron add "daily-report" "0 9 * * *" "hermes run report"
# 移除作业
hermes cron remove daily-report
# 暂停/恢复作业
hermes cron pause daily-report
hermes cron resume daily-report
# 查看作业历史
hermes cron history daily-report
# 手动运行作业
hermes cron run daily-report
作业类型
命令作业
hermes cron add my-job "0 * * * *" "echo 'Hello'"
脚本作业
hermes cron add backup "0 2 * * *" "bash /path/to/backup.sh"
Agent 作业
hermes cron add daily-briefing "0 8 * * *" "hermes run --skill daily-briefing"
依赖管理
class JobDependency:
def __init__(self, job_id: str, depends_on: list[str]):
self.job_id = job_id
self.depends_on = depends_on
async def can_run(self) -> bool:
"""检查所有依赖作业是否成功完成"""
for dep_id in self.depends_on:
status = await get_last_status(dep_id)
if status != "success":
return False
return True
通知
通知类型
| 类型 | 触发时机 | 配置 |
|---|---|---|
always | 始终 | cron notify always |
on_success | 仅成功 | cron notify on_success |
on_failure | 仅失败 | cron notify on_failure |
never | 从不 | cron notify never |
通知通道
# config.yaml
cron:
notifications:
channel: "telegram" # 或 "email", "discord", 等
recipients:
- "me"
错误处理
重试策略
async def execute_with_retry(job: CronJob):
last_error = None
for attempt in range(job.max_retries):
try:
return await job.execute()
except Exception as e:
last_error = e
wait_time = 2 ** attempt # 指数退避
await asyncio.sleep(wait_time)
raise CronExecutionError(f"Failed after {job.max_retries} retries", last_error)
死信队列
失败的作业进入死信队列:
hermes cron dlq list # 列出死信
hermes cron dlq retry <id> # 重试
hermes cron dlq purge # 清空