一.Redis 用作轻量级消息队列的使用还是很方便的,而 5.0 的版本下增加了 Stream 的类型,先简单比较下 Redis 的 List 和 Pub/Sub 与 Stream 来实现消息队列的区别

(重新复习下使用消息队列的主要目的,主要还是为了减少高并发请求的压力,进行流量的削峰填谷处理,比如应用在秒杀,抢购活动等业务场景)

1.List

是否存在消费竞争:是(即使多个客户端进行 [BRPop] 同一个 List ,获取到的消息都是不同的)

消息持久化:是(未消费的消息会存在 List 中)

消息留存:无(消费完成后,消息会被删除掉)

ACK机制:无(如果某条消息处理异常,无法重复消费异常的消息)

 

2.Pub/Sub

是否存在消费竞争:否(多个客户端都订阅的情况下,是可收到相同的数据,不存在消息竞争关系)

消息持久化:否(消息队列的消息不会进行保存,如果客户端没有订阅,但是有消息推送过来的话,这时候推过来的消息并无法处理并就会丢失)

消息留存:无

ACK机制:无(如果某条消息处理异常,无法重复消费异常的消息)

 

3.Stream

是否存在消费竞争:

[xread] -消息队列独立消费模式下 否(多个客户端都 xread 的情况下,是可收到相同的数据)

[xreadgroup] -消费组内消费组消费模式 是(但是不同组接收到的消息是相同的,只有同一个组内的消费者才竞争消息)

消息持久化:是(未消费的消息会存在 Stream 中 )

消息留存:有(即便消费消息后[xread命令],消息还是会存在创建的队列里面,不会移除,如果需要移除可用[xdel命令],担心队列长度过大,可以 [xtrim命令] 限制下最大长度)

ACK机制:有(消费组消费完成并处理无异常后,可[xack命令]进行对此条消息做应答处理,说明该消息已完成处理,对于异常处理的消息,可通过 [xpending命令]进行获取并重新消费处理并重新[xack命令]处理)

Stream 类型命令简单介绍(详细用法自行查找 相关文档 即可):

1.简单消息队列(无ack机制)

XADD - 添加消息到末尾
XTRIM - 对流进行修剪,限制长度
XDEL - 删除消息
XLEN - 获取流包含的元素数量,即消息长度
XRANGE - 获取消息列表,会自动过滤已经删除的消息
XREVRANGE - 反向获取消息列表,ID 从大到小
XREAD - 以阻塞或非阻塞方式获取消息列表

2.消费组消息队列(有ack机制)

XGROUP CREATE - 创建消费者组
XREADGROUP GROUP - 读取消费者组中的消息
XACK - 将消息标记为"已处理"
XGROUP SETID - 为消费者组设置新的最后递送消息ID
XGROUP DELCONSUMER - 删除消费者
XGROUP DESTROY - 删除消费者组
XPENDING - 显示待处理消息的相关信息
XCLAIM - 转移消息的归属权
XINFO - 查看流和消费者组的相关信息;
XINFO GROUPS - 打印消费者组的信息;
XINFO STREAM - 打印流信息

 

二. .Net Core 3.1 + CSRedisCore/FreeRedis 具体实现(后续会写个队列帮助类来进行封装,放到 github 上):

这里先简单说下 CSRedisCore 和 FreeRedis,后者是前者的升级版,但是移除了 RedisHelper 类,并且通过作者了解到,CSRedisCore 的 RedisHelper 类是不支持 Stream 类型操作的,并且也不再更新,作者直接建议使用 FreeRedis(方法名和redis cli一致)

(如果原项目一直使用 CSRedisCore.RedisHelper,因为 CSRedisCore 是支持 Stream 操作,所以可以直接自己扩展该类,将 Stream 类型的操作加上去,我自己就是这么做的)

1.基于 xread 的简单消息队列实现(独立消费)
(这种模式我觉得功能挺鸡肋的,大部分使用场景和 list 差不多)

直接使用 xread 独立消费对我的感觉来说好像还没 list 实现队列功能来的方便
使用上和 list 有个明显的区别,如果 list 里面存在消息,我进行出队消费后能直接读取到消息,消费完成后该 list 消息就会自动被删除掉

而 stream 的话,如果通过以下命令,是从队列尾部开始消费,即便此时 stream 有数据,还是不会有消息弹出能被消费到,而是需要等待新的消费者产生消息才能被消费(有点像 pub/sub )

//初始化 redis 
RedisClient redis= new FreeRedis.RedisClient($"127.0.0.1:6379,password=*,defaultDatabase=0");
redis cli:
XREAD count 1 block 1000 streams mymq $ //$表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略
c# code: 
whlie(true){
 redis.xread(0,"mymq","$"); //阻塞模式
}

 

如果你想消费生产者已经生产的消息,则需要通过以下命令来获取队列头部的第一条消息,然后再 xdel 掉该条消息,来继续获取下一条未被消费的消息,再 xdel ,直到消费完成(其实不使用 xdel 也可以,而是把上一次消费的消息的 id 传到命令里面,消费该id后面的消息,只是需要记录上次消费消息的id值供下一次消息消费去使用)。从这一点来说,我觉得 list 比他方便很多,只是 stream 提供了这种方式,可以更多的适应其他业务场景

redis cli:
XREAD count 1 streams mymq 0或者[id] //0代表从队列头部取出消息,id则为消费比该id更大值的消息
XDEL mymq [id] //xread 后会返回该条消息的id

c# code:
while(true){
 var data=redis.xread(1,"mymq","0");
 if(data!=null)
  redis.xdel("mymq",data.id);
 sleep(1000);//队列为空进行阻塞
}

 

2.基于 xreadgroup 的分组消息队列(群组消费)
我觉得这才是 Stream 类型最能体现出比较完美的消息队列实现,因为拥有 ack 机制(其实不知道为什么 xread 独立消费的时候不增加 ack 机制)

消息组消费组消费消息后,会将消息记录在 Pending列表 里面,当消息确认完成处理后,进行 xack 命令,把该消息从 Pending列表 中移除,等待消费者再次上线后,可以读取该 Pending列表,就可以继续处理该消息了,保证消息的有序和不丢失。

redis cli:
xgroup create mymq mygroup 0 //创建消费组,0 从头部开始消费 $从尾部开始消费
xreadgroup group mygroup customerA count 1 streams mymq> //创建消费组A用来消费队列里面的消息,支持同个消费组支持多个消费消息,并且是消费竞争
xack mymq mygroup [id] //消息处理完成后可 xack 处理
xpending mymq mygroup //获取已消费但是待处理的消息

c# code:
redis.XGroupCreate("mymq","mygroup","0"); 
redis.XGroupCreateConsumer("ordermq", "mygroup2","customerA");

while(true){
 var data = redis.XReadGroup("mygroup", "customerA", 5000, "mymq", ">");
 //todo
 redis.XAck("mymq", "mygroup", data.id);
}

//可以给一个定时任务,比如每隔一分钟,获取 pending 列表,然后根据 IDLE 已读取时长较长(比如60s+,具体根据业务场景来设置时间)的数据进行业务处理(防止上线的时候也有新的消息进行消费导致 pending 列表新旧数据都有)
var pending = redis.XPending("mymq", "mygroup");

//获取消息
var rangeData= redis.XRange("ordermq", pending.minId, pending.minId);
//对应业务处理,或者直接将消息重新插入队列
//应答处理
redis.XAck("mymq", "mygroup", rangeData.id);

 

三.总结:

Redis 5.0 提供了 Stream 类型后,可让开发者根据业务场景,可选择更多适合使用的队列实现方式,不局限于之前的 sub/pub 和 list 的方式,不过个人觉得 Pending 列表获取数据有麻烦的地方,无法直接获取到队列内容,而且一旦设置了队列最大长度,如果队列内消息超出长度,容易造成目标消息丢失,XRange 就无法继续获取消息内容了,不知道其他消息队列的处理方式是不是也是如此,之后会去看看 RabbitMQ 的处理方式是怎样的再做个对比