调度器
支持时间点、间隔、Cron、条件触发四种任务类型,同步/异步双 API。
快速开始
rust
use std::sync::Arc;
use std::time::Duration;
use foxcore_api::{TaskDefinition, TaskKind};
// 调度器实例由核心注入,插件在 on_start 中获取
// 以下示例展示注册任务的方式
fn register_tasks(scheduler: &SchedulerManager) -> Result<(), Box<dyn std::error::Error>> {
// 注册一个每 5 秒执行的间隔任务
scheduler.add(TaskDefinition {
name: "heartbeat".to_string(),
kind: TaskKind::Interval(Duration::from_secs(5)),
callback: Arc::new(|uuid| {
Box::pin(async move {
tracing::info!("心跳任务执行: {uuid}");
})
}),
})?;
Ok(())
}重要
必须持有 SchedulerManager 直到不再需要调度。drop 时会自动触发优雅关闭。
四种调度类型
1. 时间点任务 (TimePoint)
在指定的时间点执行一次:
rust
use chrono::Local;
let target = Local::now() + chrono::Duration::minutes(30);
scheduler.add(TaskDefinition {
name: "delayed-report".to_string(),
kind: TaskKind::TimePoint(target),
callback: Arc::new(|_uuid| {
Box::pin(async {
tracing::info!("30 分钟后的报告任务");
})
}),
})?;TIP
如果指定的时间已在过去,会返回 SchedulerError::InvalidTimePoint。
2. 间隔任务 (Interval)
按固定间隔重复执行:
rust
scheduler.add(TaskDefinition {
name: "cleanup".to_string(),
kind: TaskKind::Interval(Duration::from_secs(3600)), // 每小时
callback: Arc::new(|_uuid| {
Box::pin(async {
tracing::info!("执行清理任务");
})
}),
})?;3. Cron 表达式任务 (Cron)
使用标准 cron 表达式调度(支持 5/6/7 字段格式):
rust
// 每天凌晨 3 点执行
scheduler.add(TaskDefinition {
name: "daily-backup".to_string(),
kind: TaskKind::Cron("0 0 3 * * *".to_string()),
callback: Arc::new(|_uuid| {
Box::pin(async {
tracing::info!("每日备份");
})
}),
})?;
// 每 5 分钟执行(秒级 cron)
scheduler.add(TaskDefinition {
name: "monitor".to_string(),
kind: TaskKind::Cron("0 */5 * * * *".to_string()),
callback: Arc::new(|_uuid| {
Box::pin(async {
tracing::info!("监控检查");
})
}),
})?;WARNING
无效的 cron 表达式会返回 SchedulerError::InvalidCron。
4. 条件任务 (Condition)
调用方通过 ConditionHandle::notify() 通知调度器检测条件,条件满足时触发回调:
rust
use std::sync::atomic::{AtomicU32, Ordering};
let request_count = Arc::new(AtomicU32::new(0));
let count_ref = request_count.clone();
let handle = scheduler.add(TaskDefinition {
name: "rate-limit-alert".to_string(),
kind: TaskKind::Condition {
check: Arc::new(move || {
let c = count_ref.clone();
Box::pin(async move {
c.load(Ordering::SeqCst) > 1000
})
}),
once: false, // 每次条件满足都触发
},
callback: Arc::new(|_uuid| {
Box::pin(async {
tracing::warn!("请求量超过阈值!");
})
}),
})?;
// 持有 ConditionHandle,在合适的时机通知检测
let condition = handle.condition.unwrap();
// 当请求计数变化时通知
request_count.fetch_add(1, Ordering::SeqCst);
condition.notify(); // 非阻塞
// 异步环境中使用
// condition.notify_async().await;once: true 时条件满足后只触发一次,然后自动移除任务。
任务管理
rust
// 注册任务,获取句柄
let handle = scheduler.add(task_def)?;
// 查询任务信息
let info = scheduler.get(handle.id)?;
println!("任务: {} ({})", info.name, info.kind_desc);
// 列出所有任务
for task in scheduler.list() {
println!("{}: {} [{}]", task.id, task.name, task.kind_desc);
}
// 按 ID 移除
scheduler.remove(handle.id)?;
// 按名称移除(所有同名任务)
let removed_ids = scheduler.remove_by_name("heartbeat")?;配置文件
调度器配置位于 config/config.toml 的 [scheduler] 段:
toml
[scheduler.runtime]
# tokio 工作线程数 (0 = CPU 核心数)
worker_threads = 0
# 线程名前缀
thread_name = "scheduler"在异步环境中使用
如果调用方已有 tokio 运行时,使用 init_async:
rust
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = SchedulerConfig::default();
let scheduler = SchedulerManager::init_async(&config).await?;
// 使用 async 版本的 API
scheduler.add_async(task_def).await?;
scheduler.remove_async(id).await?;
Ok(())
}TIP
init() 创建独立的 tokio runtime,适用于同步代码。在 tokio runtime 内部调用 init() 会因 runtime 嵌套而 panic,此时必须使用 init_async()。
API 参考
SchedulerManager
| 方法 | 说明 |
|---|---|
init(config) | 同步创建调度器,内部启动独立 tokio runtime |
init_async(config) | 在已有 tokio runtime 中创建调度器 |
add(task) | 注册任务,返回 TaskHandle |
add_async(task) | 异步注册任务 |
remove(id) | 按 ID 移除任务 |
remove_async(id) | 异步按 ID 移除任务 |
remove_by_name(name) | 按名称移除所有同名任务,返回被移除的 ID 列表 |
get(id) | 查询单个任务信息 |
list() | 列出所有已注册任务 |
shutdown(self) | 优雅关闭调度器 |
TaskDefinition
| 字段 | 类型 | 说明 |
|---|---|---|
name | String | 任务名称 |
kind | TaskKind | 调度类型 |
callback | Arc<dyn Fn(Uuid) -> BoxFuture<()>> | 回调函数 |
TaskKind
| 变体 | 说明 |
|---|---|
TimePoint(DateTime<Local>) | 指定时间点执行一次 |
Interval(Duration) | 固定间隔重复执行 |
Cron(String) | cron 表达式调度 |
Condition { check, once } | 条件触发 |
TaskHandle
| 字段 | 类型 | 说明 |
|---|---|---|
id | Uuid | 任务唯一标识 |
name | String | 任务名称 |
condition | Option<ConditionHandle> | 条件任务的通知句柄 |
SchedulerError
| 变体 | 说明 |
|---|---|
InvalidCron | 无效的 cron 表达式 |
InvalidTimePoint | 时间点已在过去 |
TaskNotFound | 任务不存在 |
RuntimeError | tokio runtime 错误 |