Bootstrap

用 Redis 实现消息队列是一个好主意么?

《Redis 核心技术与实战》学习笔记 09,部分已经作为留言发布,但是留言太多,排在后面的一般很难被大家看到,所以集中发布在这里,欢迎讨论。

题图来自《Redis 核心技术与实战》专栏

15 | 消息队列的考验:Redis有哪些解决方案?

在学习专栏内容之前,我觉得消息队列需要保证消息的可靠性,但是 Redis 明显是无法保证这个的。当然,对于那些不需要严格可靠的消息队列,Redis 的高性能显然是有优势的。

消息队列需要满足三个需求,分别是消息保序、处理重复消息和保证消息可靠性。

对于消息保序里面,商品扣减库存的例子,有一点凑巧了。

假设有 10 件库存,如果按时间顺序,分别减少 2、8、1,结果因为顺序的关系,先减了 2 和 1,但是 8 就没法扣减了,估计业主会不开心,坚持要上 Kafka。

利用 List 类型的先进先出特性,可以在一定程度上保证顺序,但是考虑到重复消息和可靠性,还是要增加不少额外的操作。

> LPUSH mq "101030001:stock:5"
(integer) 1
> LPUSH mq "101030001:stock:3"
(integer) 2
> RPOP mq
"101030001:stock:5"
> RPOP mq
"101030001:stock:3"
> RPOP mq
(nil)

> LPUSH mq "101030001:stock:5"
(integer) 1
> LPUSH mq "101030001:stock:3"
(integer) 2
> BRPOPLPUSH mq mqback 100
"101030001:stock:5"
> BRPOPLPUSH mq mqback 100
"101030001:stock:3"
> BRPOPLPUSH mq mqback 100
(nil)
(100.05s)
// 这里是秒!而不是微秒

回到最开始的问题,为什么不用一个轻量型的消息队列?然后就有了 Streams 类型。

> xadd mqstream * repo 5
"1616596359048-0"
> xadd mqstream * repo 3
"1616596373016-0"
> xadd mqstream * repo 2
"1616596375028-0"
> xread block 100 streams mqstream 1616596359048-0
1) 1) "mqstream"
   2) 1) 1) "1616596373016-0"
         2) 1) "repo"
            2) "3"
      2) 1) "1616596375028-0"
         2) 1) "repo"
            2) "2"
> xread block 10000 streams mqstream $
(nil)
(10.31s)

我可以吐槽一下 xread 返回值的显示格式么?难道是某位 Lisper 大神写的

感觉上 Streams 类型其实就是在 Redis 的基础上实现了轻量型的消息队列。

> xgroup create mqstream group1 0
OK
> xadd mqstream * repo 5
"1616596629703-0"
> xadd mqstream * repo 3
"1616596631747-0"
> xadd mqstream * repo 2
"1616596633347-0"
> xreadgroup group group1 consumer1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1616596359048-0"
         2) 1) "repo"
            2) "5"
      2) 1) "1616596373016-0"
         2) 1) "repo"
            2) "3"
      3) 1) "1616596375028-0"
         2) 1) "repo"
            2) "2"
      4) 1) "1616596629703-0"
         2) 1) "repo"
            2) "5"
      5) 1) "1616596631747-0"
         2) 1) "repo"
            2) "3"
      6) 1) "1616596633347-0"
         2) 1) "repo"
            2) "2"
> xreadgroup group group1 consumer2 streams mqstream 0
1) 1) "mqstream"
   2) (empty list or set)

这里发现了我的一个误解,因为 mqstream 里面的消息虽然已经被 xread 读过了,但是并没有发 xack,所以消息还在 mqstream 中。

> xgroup create mqstream group2 0
OK
> xreadgroup group group2 consumer1 count 1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1616596359048-0"
         2) 1) "repo"
            2) "5"
> xreadgroup group group2 consumer2 count 1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1616596373016-0"
         2) 1) "repo"
            2) "3"
> xreadgroup group group2 consumer3 count 1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1616596375028-0"
         2) 1) "repo"
            2) "2"

> xpending mqstream group2
1) (integer) 3
2) "1616596359048-0"
3) "1616596375028-0"
4) 1) 1) "consumer1"
      2) "1"
   2) 1) "consumer2"
      2) "1"
   3) 1) "consumer3"
      2) "1"
> xpending mqstream group2 - + 10 consumer2
1) 1) "1616596373016-0"
   2) "consumer2"
   3) (integer) 109485
   4) (integer) 1

> xack mqstream group2 1616596373016-0
(integer) 1
> xpending mqstream group2 - + 10 consumer2
(empty list or set)

对于课后题,如果多个消费者需要读取消息队列中的消息,我觉的仍然可以使用 Streams 类型,但是发送 XACK 的机制需要处理一下,所有消费者都消费完了再统一结账(XACK)。

或者如果数据量不是特别大,也可以考虑用多个 Streams 并行处理,比如实时计算有一个 Streams,分布式文件系统留存也有一个 Streams,写的时候同时写入。

课代表 @Kaito 的回答更清晰一些,多个消费者组就可以解决问题,另外对于 Redis 做消息队列的分析也是专栏内容很好的补充。