robfig/cron
在 Go 语言中,可以使用第三方库robfig/cron来实现定时任务调度。robfig/cron是一个功能强大的库,支持多种调度方式,包括标准的 Cron 表达式。
以下是一个简单的示例,展示了如何使用robfig/cron库来创建和管理定时任务。
- 安装
robfig/cron库:
go get github.com/robfig/cron/v3
- 编写定时任务代码:
package main
import (
"fmt"
"time"
"github.com/robfig/cron/v3"
)
func main() {
// 创建一个新的Cron实例
c := cron.New()
// 添加一个定时任务,使用Cron表达式
// 这里的表达式表示每分钟执行一次
_, err := c.AddFunc("*/1 * * * *", func() {
fmt.Println("Task executed at", time.Now())
})
if err != nil {
fmt.Println("Error adding cron job:", err)
return
}
// 启动Cron调度器
c.Start()
// 等待一段时间,让定时任务可以执行几次
time.Sleep(5 * time.Minute)
// 停止Cron调度器
c.Stop()
}
在这个示例中,我们创建了一个新的 Cron 实例,并添加了一个定时任务,该任务使用 Cron 表达式*/1 * * * *来表示每分钟执行一次。然后启动 Cron 调度器,并在主程序中等待一段时间,以便观察定时任务的执行情况。最后,停止 Cron 调度器。
Cron 表达式
Cron 表达式由五个字段组成,分别表示分钟、小时、日、月和星期 。每个字段可以包含特定的值或范围,用于指定调度的时间。例如:
*/1 * * * *:每分钟执行一次0 0 * * *:每天午夜执行一次0 9 * * 1:每周一上午 9 点执行一次
其他功能
robfig/cron还支持多种调度方式,例如:
- 使用
AddFunc添加一个定时任务,指定一个函数来执行任务。 - 使用
AddJob添加一个定时任务,指定一个实现了cron.Job接口的对象来执行任务。 - 使用
Remove方法删除一个定时任务。
处理并发
robfig/cron库是线程安全的,可以在多个 goroutine 中安全地使用。如果你的定时任务涉及到并发操作,确保你的任务函数也适合并发执行。
完整示例
以下是一个更完整的示例,展示了如何使用robfig/cron库来调度多个定时任务,并处理任务的并发执行:
package main
import (
"fmt"
"sync"
"time"
"github.com/robfig/cron/v3"
)
// 定义一个任务结构体
type MyTask struct {
Name string
}
// 实现cron.Job接口
func (t MyTask) Run() {
fmt.Println("Task", t.Name, "executed at", time.Now())
}
func main() {
// 创建一个新的Cron实例
c := cron.New()
// 添加多个定时任务
tasks := []MyTask{
{Name: "Task1"},
{Name: "Task2"},
{Name: "Task3"},
}
for _, task := range tasks {
_, err := c.AddJob("*/1 * * * *", task)
if err != nil {
fmt.Println("Error adding cron job:", err)
return
}
}
// 启动Cron调度器
c.Start()
// 等待一段时间,让定时任务可以执行几次
time.Sleep(5 * time.Minute)
// 停止Cron调度器
c.Stop()
}
在这个示例中,我们定义了一个任务结构体MyTask,并实现了cron.Job接口。然后创建多个任务,并使用AddJob方法将它们添加到 Cron 调度器中。每个任务都会在每分钟执行一次。
希望这些示例能帮助你理解如何在 Go 语言中使用robfig/cron库来实现定时任务调度。如果你有更多问题或需要进一步的帮助,请随时提问。
案例
- 多用户,多任务
- 前端交互:任务增删改停
- 部署到多台服务器上,如何避免多个服务器同时执行任务,这样会造成任务重复(分布式锁(Distributed Lock)来解决)
常见的分布式锁实现方式包括使用 Redis、ZooKeeper 等 。
这里我们使用 Redis 来实现分布式锁,确保每个任务在同一时间只在一台服务器上执行。以下是一个示例,展示了如何在 Go 语言中使用 Redis 实现分布式锁。
后端部分
- 安装必要的库:
go get github.com/gin-gonic/gin
go get github.com/robfig/cron/v3
go get github.com/go-redis/redis/v8
- 编写后端代码,提供 API 接口,并使用 Redis 实现分布式锁:
package main
import (
"context"
"fmt"
"net/http"
"strconv"
"sync"
"time"
"github.com/gin-gonic/gin"
"github.com/go-redis/redis/v8"
"github.com/robfig/cron/v3"
)
var (
userCrons = make(map[int]*UserCron)
mu sync.Mutex
taskIDGen = 1
ctx = context.Background()
redisClient *redis.Client
)
// 定义一个结构体来存储任务计划
type Task struct {
ID int
Schedule string
Active bool
}
// 定义一个结构体来存储用户的Cron实例和任务计划
type UserCron struct {
c *cron.Cron
tasks map[int]cron.EntryID
}
// 初始化Redis客户端
func initRedis() {
redisClient = redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Redis服务器地址
})
}
// 定义任务函数
func task(userID int, taskID int) {
lockKey := fmt.Sprintf("lock:user:%d:task:%d", userID, taskID)
lockValue := fmt.Sprintf("%d", time.Now().UnixNano())
// 尝试获取锁
success, err := redisClient.SetNX(ctx, lockKey, lockValue, 10*time.Second).Result()
if err != nil {
fmt.Println("Error acquiring lock:", err)
return
}
if !success {
fmt.Println("Task is already running on another server")
return
}
// 确保锁在任务完成后释放
defer redisClient.Del(ctx, lockKey)
// 执行任务
fmt.Printf("Task executed for User %d, Task %d at %s\n", userID, taskID, time.Now())
}
// 添加任务计划
func addTask(userID int, schedule string) int {
mu.Lock()
defer mu.Unlock()
userCron, exists := userCrons[userID]
if !exists {
userCron = &UserCron{
c: cron.New(cron.WithSeconds()),
tasks: make(map[int]cron.EntryID),
}
userCrons[userID] = userCron
}
taskID := taskIDGen
taskIDGen++
var err error
entryID, err := userCron.c.AddFunc(schedule, func() {
task(userID, taskID)
})
if err != nil {
fmt.Println("Error adding cron job:", err)
return -1
}
userCron.tasks[taskID] = entryID
userCron.c.Start()
return taskID
}
// 删除任务计划
func removeTask(userID int, taskID int) {
mu.Lock()
defer mu.Unlock()
userCron, exists := userCrons[userID]
if !exists {
fmt.Println("Task not found")
return
}
entryID, exists := userCron.tasks[taskID]
if !exists {
fmt.Println("Task not found")
return
}
userCron.c.Remove(entryID)
delete(userCron.tasks, taskID)
fmt.Println("Task cancelled")
}
func main() {
initRedis()
r := gin.Default()
// 添加任务接口
r.POST("/add", func(c *gin.Context) {
userID, err := strconv.Atoi(c.Query("userID"))
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid userID"})
return
}
schedule := c.Query("schedule")
if schedule == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid schedule"})
return
}
taskID := addTask(userID, schedule)
if taskID == -1 {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to add task"})
return
}
c.JSON(http.StatusOK, gin.H{"status": "Task added", "taskID": taskID})
})
// 删除任务接口
r.POST("/remove", func(c *gin.Context) {
userID, err := strconv.Atoi(c.Query("userID"))
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid userID"})
return
}
taskID, err := strconv.Atoi(c.Query("taskID"))
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid taskID"})
return
}
removeTask(userID, taskID)
c.JSON(http.StatusOK, gin.H{"status": "Task removed"})
})
// 启动HTTP服务器
r.Run(":8080")
}
前端部分
前端部分保持不变,可以继续使用之前的 HTML 和 JavaScript 代码来调用后端 API。
关键点:
- 分布式锁:使用 Redis 的
SETNX命令实现分布式锁,确保任务在同一时间只在一台服务器上执行。 - 锁的管理:在任务执行前尝试获取锁,任务完成后释放锁。锁的过期时间设置为 10 秒,以防止任务意外中断后锁无法释放。
- Redis 客户端初始化:在程序启动时初始化 Redis 客户端。
通过这种方式,你可以在多台服务器上部署定时任务管理系统,并确保每个任务在同一时间只在一台服务器上执行。如果你有更多问题或需要进一步的帮助,请随时提问。