Select + Channels 实现定时任务与高效调度

time 包 在与 channel 结合时提供了很多有用的功能,其中 time.Ticker 结构体能够处理定时事件,它会定期在指定 channel 上发送时间值在这篇文章中,我们深入探讨了如何使用 Go 语言中的 time 包与 channel 结合来处理定时事件和任务调度。通过 time.Ticker 结构体的周期性触发和 select 语句的多路复用能力,我们可以创建高效且响应迅速的定时任务处理系统

Select + Channels  实现定时任务与高效调度

Ticker 对于在 goroutine 中周期性执行任务(如日志记录或计算)非常方便。记得使用 Stop() 来停止它们,最好使用 defer 语句。这种功能与 select 语句结合得非常紧密。

ticker := time.NewTicker(updateInterval)
defer ticker.Stop()
...
select {
  case u:= <- ch1:
    ...
  case v:= <- ch2:
    ...
  case <- ticker.C:

   logState(status) // e.g. call some logging function logState

  default// no value ready to be received
    ...
}

ticker.C 是用于传递 tick 的通道。defer ticker.Stop() 确保在函数返回时停止 ticker。

time.NewTicker 返回一个 *time.Ticker 类型的值,其中包括一个用于接收 tick 的通道以及一个用于停止 ticker 的 Stop 方法。这使开发者可以更好地控制 ticker 的生命周期。当完成对 ticker 的使用时,可以停止它以释放资源。

time.Tick 也是一个方便的函数,它返回一个只读通道,以在指定的时间间隔发出时间。一旦启动,它就无法停止。底层的 ticker 将继续进行,直到程序退出。如果不恰当地使用,这可能导致内存泄漏,特别是在可以多次调用的函数内部。但如果不需要停止 ticker,time.Tick 使用起来会更简单一些。

下面是使用 select 语句和通道处理定时事件的简单演示:

package main
import (
"fmt"
"time"
)

func main() {
  tick := time.Tick(1e8)
  boom := time.After(5e8)
  for {
    select {
      case <-tick:
        fmt.Println("tick.")
      case <-boom:
        fmt.Println("BOOM!")
        return
      default:
        fmt.Println(" .")
        time.Sleep(5e7)
    }
  }
}

如何将这种模式应用到实际开发中?

Job Scheduling:

可以使用一个 ticker 来安排一个任务以固定时间间隔运行。例如,这里需要一个后台任务,每小时从数据库中清理旧数据。

tick := time.Tick(1 * time.Hour)
for {
    select {
    case <-tick:
        cleanupDatabase()
    // other cases...
    }
}

超时控制

我们可以使用定时器为操作设置超时。例如,如果网络请求花费的时间过长,就取消该请求。

boom := time.After(5 * time.Second)
select {
case response := <-networkRequest():
    processResponse(response)
case <-boom:
    fmt.Println("Request timed out")
    return
}

Rate Limiting:

可以使用 ticker 来限制执行操作的速率。例如,我们希望限制向第三方 API 发送请求的速率,以避免超过速率限制。

tick := time.Tick(1 * time.Second)
for request := range requests {
    select {
    case <-tick:
        sendRequest(request)
    // other cases...
    }
}

定期轮询:

可以使用 ticker 定期轮询资源以检查变化。例如,我们希望每隔几秒钟检查一次文件是否已更新。

tick := time.Tick(5 * time.Second)
for {
    select {
    case <-tick:
        checkFileUpdate()
    // other cases...
    }
}

在所有这些例子中,可以使用默认情况来处理如果定时事件尚未准备好要执行的其他操作。例如,它可以用于打印进度消息,或执行一个优先级较低的任务。

缓存数据

当应用程序处理来自数据库的数据时,它们通常会将这些数据存储在内存中,以避免昂贵的数据库检索。如果数据库数据没有变化,那么就没有问题。但对于可能发生变化的数据,我们需要一种定期刷新的方法。当缓存的数据过时时,我们不希望向用户显示旧值。一种解决方案是使用一个 goroutine 和一个 Ticker 对象。

package main

import (
 "fmt"
 "sync"
 "time"
)

type Cache struct {
 sync.RWMutex
 data map[string]string
}

func NewCache() *Cache {
 c := &Cache{
  data: make(map[string]string),
 }
 go c.refreshEvery(time.Minute)
 return c
}

func (c *Cache) refreshEvery(d time.Duration) {
 ticker := time.NewTicker(d)
 defer ticker.Stop()

 for {
  select {
  case <-ticker.C:
   c.refresh()
  }
 }
}

func (c *Cache) refresh() {
 c.Lock()
 defer c.Unlock()

 // Here you would fetch the new data from the database and update the cache.
 // For simplicity, we'll just print a message.
 fmt.Println("Refreshing cache...")
}

func (c *Cache) Get(key string) (stringbool) {
 c.RLock()
 defer c.RUnlock()

 value, ok := c.data[key]
 return value, ok
}

func (c *Cache) Set(key string, value string) {
 c.Lock()
 defer c.Unlock()

 c.data[key] = value
}

func main() {
 cache := NewCache()

 // Use the cache...
 cache.Set("foo""bar")
 value, ok := cache.Get("foo")
 fmt.Println(value, ok)
}

Reference:

  • https://pkg.go.dev/github.com/tjgq/ticker


原文始发于微信公众号(Go Official Blog):Select + Channels 实现定时任务与高效调度

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/269273.html

(0)
Java朝阳的头像Java朝阳

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!