Go+Redis实现延迟队列实操_Golang_脚本之家


本站和网页 https://www.jb51.net/article/262540.htm 的作者无关,不对其内容负责。快照谨为网络故障时之索引,不代表被搜索网站的即时页面。

Go+Redis实现延迟队列实操_Golang_脚本之家
脚本之家
服务器常用软件
手机版
投稿中心
关注微信
快捷导航
软件下载
android
MAC
驱动下载
字体下载
DLL
源码下载
PHP
ASP.NET
ASP
JSP
软件编程
C#
JAVA
C 语言
Delphi
Android
网络编程
PHP
ASP.NET
ASP
JavaScript
在线工具
CSS格式化
JS格式化
Html转化为Js
数据库
MYSQL
MSSQL
oracle
DB2
MARIADB
CMS
PHPCMS
DEDECMS
帝国CMS
WordPress
常用工具
PHP开发工具
python
Photoshop
必备软件
网站首页
网页制作
网络编程
脚本专栏
脚本下载
数据库
服务器
电子书籍
操作系统
网站运营
平面设计
其它
媒体动画
电脑基础
硬件教程
网络安全
vbs
DOS/BAT
hta
htc
python
perl
游戏相关
VBA
远程脚本
ColdFusion
ruby
autoit
seraphzone
PowerShell
linux shell
Lua
Golang
Erlang
其它
您的位置:首页 → 脚本专栏 → Golang → Go 延迟队列
Go+Redis实现延迟队列实操
更新时间:2022年09月14日 08:54:28 作者:jiaxwu​​​​​​​
这篇文章主要介绍了Go+Redis实现延迟队列实操,延迟队列是一种非常使用的数据结构,我们经常有需要延迟推送处理消息的场景,比如延迟60秒发送短信,延迟30分钟关闭订单,消息消费失败延迟重试等
目录前言简单的实现定义消息PushConsume存在的问题多消费者实现定义消息PushConsume存在的问题总结前言
延迟队列是一种非常使用的数据结构,我们经常有需要延迟推送处理消息的场景,比如延迟60秒发送短信,延迟30分钟关闭订单,消息消费失败延迟重试等等。
一般我们实现延迟消息都需要依赖底层的有序结构,比如堆,而Redis刚好提供了zset这种数据类型,它的底层实现是哈希表+跳表,也是一种有序的结构,所以这篇文章主要是使用Go+Redis来实现延迟队列。
当然Redis本身并不支持延迟队列,所以我们只是实现一个比较简单的延迟队列,而且Redis不太适合大量消息堆积,所以只适合比较简单的场景,如果需要更加强大稳定的消息队列,可以使用RocketMQ等自带延迟消息的消息队列。
我们这里先定一下我们要实现的几个目标:
消息必须至少被消费一次多个生产者多个消费者
然后我们定义一个简单的接口:
Push(msg) error:添加消息到队列Consume(topic, batchSize, func(msg) error):消费消息
简单的实现
每个主题最多可以被一个消费者消费,因为不会对主题进行分区但是可以多个生产者同时进行生产,因为Push操作是原子的同时需要消费操作返回值error为nil才删除消息,保证消息至少被消费一次
定义消息
这个消息参考了Kafka的消息结构:
Topic可以是某个队列的名字Key是消息的唯一标识,在一个队列里面不可以重复Body是消息的内容Delay是消息的延迟时间ReadyTime是消息准备好执行的时间
// Msg 消息
type Msg struct {
Topic string // 消息的主题
Key string // 消息的Key
Body []byte // 消息的Body
Delay time.Duration // 延迟时间(秒)
ReadyTime time.Time // 消息准备好执行的时间(now + delay)
Push
由于我们需要把消息的Body存储到Hash,把消息的ReadyTime存储到ZSet,所以我们需要一个简单的Lua脚本来保证这两个操作是原子的。
同时我们不会覆盖已经存在的相同Key的消息。
const delayQueuePushRedisScript = `
-- KEYS[1]: topicZSet
-- KEYS[2]: topicHash
-- ARGV[1]: 消息的Key
-- ARGV[2]: 消息的Body
-- ARGV[3]: 消息准备好执行的时间
local topicZSet = KEYS[1]
local topicHash = KEYS[2]
local key = ARGV[1]
local body = ARGV[2]
local readyTime = tonumber(ARGV[3])
-- 添加readyTime到zset
local count = redis.call("zadd", topicZSet, readyTime, key)
-- 消息已经存在
if count == 0 then
return 0
end
-- 添加body到hash
redis.call("hsetnx", topicHash, key, body)
return 1
func (q *SimpleRedisDelayQueue) Push(ctx context.Context, msg *Msg) error {
// 如果设置了ReadyTime,就使用RedisTime
var readyTime int64
if !msg.ReadyTime.IsZero() {
readyTime = msg.ReadyTime.Unix()
} else {
// 否则使用Delay
readyTime = time.Now().Add(msg.Delay).Unix()
success, err := q.pushScript.Run(ctx, q.client, []string{q.topicZSet(msg.Topic), q.topicHash(msg.Topic)},
msg.Key, msg.Body, readyTime).Bool()
if err != nil {
return err
if !success {
return ErrDuplicateMessage
return nil
Consume
其中第二个参数batchSize表示用于批量获取已经准备好执行的消息,减少网络请求。
fn是对消息进行处理的函数,它有一个返回值error,如果是nil才表示消息消费成功,然后调用删除脚本把成功消费的消息给删除(需要原子的删除ZSet和Hash里面的内容)。
const delayQueueDelRedisScript = `
-- KEYS[1]: topicZSet
-- KEYS[2]: topicHash
-- ARGV[1]: 消息的Key
local topicZSet = KEYS[1]
local topicHash = KEYS[2]
local key = ARGV[1]
-- 删除zset和hash关于这条消息的内容
redis.call("zrem", topicZSet, key)
redis.call("hdel", topicHash, key)
return 1
func (q *SimpleRedisDelayQueue) Consume(topic string, batchSize int, fn func(msg *Msg) error) {
for {
// 批量获取已经准备好执行的消息
now := time.Now().Unix()
zs, err := q.client.ZRangeByScoreWithScores(context.Background(), q.topicZSet(topic), &redis.ZRangeBy{
Min: "-inf",
Max: strconv.Itoa(int(now)),
Count: int64(batchSize),
}).Result()
// 如果获取出错或者获取不到消息,则休眠一秒
if err != nil || len(zs) == 0 {
time.Sleep(time.Second)
continue
// 遍历每个消息
for _, z := range zs {
key := z.Member.(string)
// 获取消息的body
body, err := q.client.HGet(context.Background(), q.topicHash(topic), key).Bytes()
if err != nil {
continue
// 处理消息
err = fn(&Msg{
Topic: topic,
Key: key,
Body: body,
ReadyTime: time.Unix(int64(z.Score), 0),
})
if err != nil {
continue
// 如果消息处理成功,删除消息
q.delScript.Run(context.Background(), q.client, []string{q.topicZSet(topic), q.topicHash(topic)}, key)
存在的问题
如果多个线程同时调用Consume函数,那么多个线程会拉取相同的可执行的消息,造成消息重复的被消费。
多消费者实现
每个主题最多可以被分区个数个消费者消费,会对主题进行分区
定义消息
我们添加了一个Partition字段表示消息的分区号
// Msg 消息
type Msg struct {
Topic string // 消息的主题
Key string // 消息的Key
Body []byte // 消息的Body
Partition int // 分区号
Delay time.Duration // 延迟时间(秒)
ReadyTime time.Time // 消息准备好执行的时间
Push
代码与SimpleRedisDelayQueue的Push相同,只是我们会使用Msg里面的Partition字段对主题进行分区。
func (q *PartitionRedisDelayQueue) topicZSet(topic string, partition int) string {
return fmt.Sprintf("%s:%d:z", topic, partition)
func (q *PartitionRedisDelayQueue) topicHash(topic string, partition int) string {
return fmt.Sprintf("%s:%d:h", topic, partition)
Consume
代码与SimpleRedisDelayQueue的Consume相同,我们也只是对Consume多加了一个partition参数用于指定消费的分区。
func (q *PartitionRedisDelayQueue) Consume(topic string, batchSize, partition int, fn func(msg *Msg) error) {
// ...
存在的问题
一个比较大的问题就是我们需要手动指定分区而不是自动分配分区,这个问题对于Push操作解决起来比较容易,可以通过哈希算法对Key进行哈希取模进行分区,比如murmur3。但是对于Consume就比较复杂,因为我们必须记录哪个分区已经被消费者消费了。如果真的需要更加复杂的场景还是建议使用RocketMQ、Kafka等消息队列进行实现。
总结
使用Redis的ZSet可以很容易的实现一个高性能消息队列但是Redis的ZSet实现的消息队列不适合大量消息堆积的场景,同时如果需要实现自动分区消费功能会比较复杂适合消息量不是很大,且不是很复杂的场景如果需要大量堆积消息和稳定的多消费者功能,可以使用自带延迟消息的RocketMQ
到此这篇关于Go+Redis实现延迟队列实操的文章就介绍到这了,更多相关Go 延迟队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
您可能感兴趣的文章:Golang实现基于Redis的可靠延迟队列百行代码实现基于Redis的可靠延迟队列分布式利器redis及redisson的延迟队列实践Redis延迟队列和分布式延迟队列的简答实现基于Redis延迟队列的实现代码SpringBoot集成Redisson实现延迟队列的场景分析php使用redis的有序集合zset实现延迟队列应用示例
Go
Redis
延迟
队列
相关文章
golang中实现给gif、png、jpeg图片添加文字水印这篇文章主要介绍了golang中实现给gif、png、jpeg图片添加文字水印,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧 2021-04-04
Go语言中log日志库的介绍本文给大家介绍Go语言中log日志库的概念使用技巧,log包定义了Logger类型,该类型提供了一些格式化输出的方法,本文通过示例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧 2021-10-10
golang中sync.Mutex的实现方法本文主要介绍了golang中sync.Mutex的实现方法,mutex 主要有两个 method: Lock() 和 Unlock(),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧 2022-04-04
Golang爬虫框架colly使用浅析这篇文章主要介绍了Golang爬虫框架colly的使用,colly是Go实现的比较有名的一款爬虫框架,而且Go在高并发和分布式场景的优势也正是爬虫技术所需要的,感兴趣想要详细了解可以参考下文 2023-05-05
Go 语言 JSON 标准库的使用今天通过本文给大家介绍Go 语言 JSON 标准库的使用小结,包括序列化和反序列化的相关知识,感兴趣的朋友跟随小编一起看看吧 2021-10-10
Go语言对前端领域的入侵WebAssembly运行原理这篇文章主要为大家介绍了不安分的Go语言对Web 前端领域的入侵WebAssembly运行原理实例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪 2022-07-07
SpringEvent优雅解耦时连续两个bug的解决方案这篇文章主要为大家介绍了SpringEvent优雅解耦时连续两个bug的解决方法,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪 2022-12-12
Go gRPC服务进阶middleware使用教程这篇文章主要为大家介绍了Go gRPC服务进阶middleware的使用教程,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪 2022-06-06
详解Go语言各种常见类型的默认值和判空方法本文主要介绍了详解Go语言各种常见类型的默认值和判空方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧 2023-04-04
golang中"var"与":="的区别解析这篇文章主要介绍了golang中“var”与“:=”的区别,使用var关键字是Go最基本的定义变量方式,有时也会使用到:=来定义变量,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下 2023-04-04
最新评论
大家感兴趣的内容
1Goland激活码破解永久版及安装详细教程(亲测可以)2Go语言string,int,int64 ,float之间类3Go语言中的Array、Slice、Map和Set使用详解4Go语言的GOPATH与工作目录详解5Go语言转换所有字符串为大写或者小写的方法6Go语言interface详解7五步让你成为GO 语言高手8Go语言命令行操作命令详细介绍9Go语言编程中字符串切割方法小结10我放弃Python转Go语言的9大理由(附优秀书籍推荐)
最近更新的内容
Golang中常用的语法糖分享golang组件swagger生成接口文档实践示例Go语言并发编程基础上下文概念详解利用rpm打包上线部署golang代码的方法教程Go语言json编码驼峰转下划线、下划线转驼峰的实现Go prometheus metrics条目自动回收与Golang合并yaml文件过程逐步讲解go语言规范RESTful API业务错误处理golang如何用type-switch判断interface变量的实际存储Go语言单元测试超详细解析
常用在线小工具
微信
投稿
脚本任务
在线工具
关注微信公众号
关于我们 -
广告合作 -
联系我们 -
免责声明 -
网站地图 -
投诉建议 -
在线投稿
CopyRight 2006-2023 JB51.Net Inc All Rights Reserved. 脚本之家 版权所有