Skip to content

调度器

支持时间点、间隔、Cron、条件触发四种任务类型,同步/异步双 API。

快速开始

rust
use std::sync::Arc;
use std::time::Duration;
use foxcore_api::{TaskDefinition, TaskKind};

// 调度器实例由核心注入,插件在 on_start 中获取
// 以下示例展示注册任务的方式

fn register_tasks(scheduler: &SchedulerManager) -> Result<(), Box<dyn std::error::Error>> {
    // 注册一个每 5 秒执行的间隔任务
    scheduler.add(TaskDefinition {
        name: "heartbeat".to_string(),
        kind: TaskKind::Interval(Duration::from_secs(5)),
        callback: Arc::new(|uuid| {
            Box::pin(async move {
                tracing::info!("心跳任务执行: {uuid}");
            })
        }),
    })?;
    Ok(())
}

重要

必须持有 SchedulerManager 直到不再需要调度。drop 时会自动触发优雅关闭。

四种调度类型

1. 时间点任务 (TimePoint)

在指定的时间点执行一次:

rust
use chrono::Local;

let target = Local::now() + chrono::Duration::minutes(30);
scheduler.add(TaskDefinition {
    name: "delayed-report".to_string(),
    kind: TaskKind::TimePoint(target),
    callback: Arc::new(|_uuid| {
        Box::pin(async {
            tracing::info!("30 分钟后的报告任务");
        })
    }),
})?;

TIP

如果指定的时间已在过去,会返回 SchedulerError::InvalidTimePoint

2. 间隔任务 (Interval)

按固定间隔重复执行:

rust
scheduler.add(TaskDefinition {
    name: "cleanup".to_string(),
    kind: TaskKind::Interval(Duration::from_secs(3600)), // 每小时
    callback: Arc::new(|_uuid| {
        Box::pin(async {
            tracing::info!("执行清理任务");
        })
    }),
})?;

3. Cron 表达式任务 (Cron)

使用标准 cron 表达式调度(支持 5/6/7 字段格式):

rust
// 每天凌晨 3 点执行
scheduler.add(TaskDefinition {
    name: "daily-backup".to_string(),
    kind: TaskKind::Cron("0 0 3 * * *".to_string()),
    callback: Arc::new(|_uuid| {
        Box::pin(async {
            tracing::info!("每日备份");
        })
    }),
})?;

// 每 5 分钟执行(秒级 cron)
scheduler.add(TaskDefinition {
    name: "monitor".to_string(),
    kind: TaskKind::Cron("0 */5 * * * *".to_string()),
    callback: Arc::new(|_uuid| {
        Box::pin(async {
            tracing::info!("监控检查");
        })
    }),
})?;

WARNING

无效的 cron 表达式会返回 SchedulerError::InvalidCron

4. 条件任务 (Condition)

调用方通过 ConditionHandle::notify() 通知调度器检测条件,条件满足时触发回调:

rust
use std::sync::atomic::{AtomicU32, Ordering};

let request_count = Arc::new(AtomicU32::new(0));
let count_ref = request_count.clone();

let handle = scheduler.add(TaskDefinition {
    name: "rate-limit-alert".to_string(),
    kind: TaskKind::Condition {
        check: Arc::new(move || {
            let c = count_ref.clone();
            Box::pin(async move {
                c.load(Ordering::SeqCst) > 1000
            })
        }),
        once: false, // 每次条件满足都触发
    },
    callback: Arc::new(|_uuid| {
        Box::pin(async {
            tracing::warn!("请求量超过阈值!");
        })
    }),
})?;

// 持有 ConditionHandle,在合适的时机通知检测
let condition = handle.condition.unwrap();

// 当请求计数变化时通知
request_count.fetch_add(1, Ordering::SeqCst);
condition.notify(); // 非阻塞

// 异步环境中使用
// condition.notify_async().await;

once: true 时条件满足后只触发一次,然后自动移除任务。

任务管理

rust
// 注册任务,获取句柄
let handle = scheduler.add(task_def)?;

// 查询任务信息
let info = scheduler.get(handle.id)?;
println!("任务: {} ({})", info.name, info.kind_desc);

// 列出所有任务
for task in scheduler.list() {
    println!("{}: {} [{}]", task.id, task.name, task.kind_desc);
}

// 按 ID 移除
scheduler.remove(handle.id)?;

// 按名称移除(所有同名任务)
let removed_ids = scheduler.remove_by_name("heartbeat")?;

配置文件

调度器配置位于 config/config.toml[scheduler] 段:

toml
[scheduler.runtime]
# tokio 工作线程数 (0 = CPU 核心数)
worker_threads = 0
# 线程名前缀
thread_name = "scheduler"

在异步环境中使用

如果调用方已有 tokio 运行时,使用 init_async

rust
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = SchedulerConfig::default();
    let scheduler = SchedulerManager::init_async(&config).await?;

    // 使用 async 版本的 API
    scheduler.add_async(task_def).await?;
    scheduler.remove_async(id).await?;

    Ok(())
}

TIP

init() 创建独立的 tokio runtime,适用于同步代码。在 tokio runtime 内部调用 init() 会因 runtime 嵌套而 panic,此时必须使用 init_async()

API 参考

SchedulerManager

方法说明
init(config)同步创建调度器,内部启动独立 tokio runtime
init_async(config)在已有 tokio runtime 中创建调度器
add(task)注册任务,返回 TaskHandle
add_async(task)异步注册任务
remove(id)按 ID 移除任务
remove_async(id)异步按 ID 移除任务
remove_by_name(name)按名称移除所有同名任务,返回被移除的 ID 列表
get(id)查询单个任务信息
list()列出所有已注册任务
shutdown(self)优雅关闭调度器

TaskDefinition

字段类型说明
nameString任务名称
kindTaskKind调度类型
callbackArc<dyn Fn(Uuid) -> BoxFuture<()>>回调函数

TaskKind

变体说明
TimePoint(DateTime<Local>)指定时间点执行一次
Interval(Duration)固定间隔重复执行
Cron(String)cron 表达式调度
Condition { check, once }条件触发

TaskHandle

字段类型说明
idUuid任务唯一标识
nameString任务名称
conditionOption<ConditionHandle>条件任务的通知句柄

SchedulerError

变体说明
InvalidCron无效的 cron 表达式
InvalidTimePoint时间点已在过去
TaskNotFound任务不存在
RuntimeErrortokio runtime 错误