1. Redis简介
Redis(REmote DIctionary Server)是一种开源、高性能的键值对数据库,适用于多种用途,特别是用于存储非关系型数据。Redis支持多种数据结构,如字符串、列表、哈希、集合和有序集合等。
Redis特殊数据类型之一是stream(流),也是Redis 5.0版本中新增的数据类型。Stream数据类型类似于传统消息队列,但是为了增强其可靠性、可伸缩性和持久性,在Redis中以一种独有的方式实现。
2. Stream基本概念
Stream是Redis中一个非常重要的概念。Stream是一个由多个消息条目组成的日志数据结构,其中每个条目都有一个唯一的ID,用于标识条目在Stream中的位置。消息条目中的消息内容可以是任何二进制数据,而消息的键则是由用户指定的。每个条目可以有零个或多个键值对,Stream也可以有零个或多个条目。
2.1 Stream ID
Stream ID是Stream的一个很重要的概念,它唯一地标识了每个消息条目。Stream ID是一个二元组,由两个部分组成:
时间戳(timestamp):以秒为单位的整型数字,对应UNIX时间戳,表示消息的发生时间。
序列号(sequence number):是一个自然数,表示该时间戳内生成的消息的顺序。即序列号越大,说明该消息条目越靠后。
redis> XADD mystream 1000-1 temperature 25.4 humidity 80.0
"1589514469362-0"
redis> XADD mystream 1000-2 temperature 26.0 humidity 78.0
"1589514479574-0"
redis> XADD mystream 1000-3 temperature 27.2 humidity 75.0
"1589514485436-0"
在这个例子中,我们插入了三个消息到名为mystream的Stream中,在mystream中有三个条目。每个条目的ID都是由时间戳和序列号组成的,时间戳是Unix时间戳,而序列号在每个时间戳中都是递增的。
2.2 消费者组
Stream的消费者组(consumer group)是一个逻辑概念,它表示一组共享Stream中所有消息的消费者。消费者组由一个唯一的名称标识。消费者组可以理解为一个发现新Stream数据并进行消费的进程集合。可以将多个消费者分配到同一消费者组内,从而实现消息的负载均衡,从而更好地利用硬件资源。
每个消费者组都维护了一个“待处理条目列表”(pending list),表示当前待处理的条目集合。
3. Stream命令
3.1 XADD
redis> XADD mystream * temperature 30.0
"1589532353428-0"
在上面的例子中,我们使用*XADD*命令将一个名为mystream的Stream的一个新条目添加到Stream的末尾。第一个参数是Stream的名称,第二个参数是插入时的Stream ID,可以使用特殊的通配符*代替,系统会自动为其分配Stream ID。后面的参数是键值对,表示要添加到Stream中的数据。命令返回值是新插入的消息条目的ID。
3.2 XLEN
redis> XLEN mystream
(integer) 1
上述例子中,我们使用*XLEN*命令获取了名为mystream的Stream中的消息数。
3.3 XREAD
*XREAD*命令是从一个或多个Stream中读取数据的命令。
redis> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) "1589520653676-0"
2) 1) "sensor_id"
2) "sensor_0"
3) 3) "temperature"
4) "20.5"
3) 1) "1589520654340-0"
2) 1) "sensor_id"
2) "sensor_0"
3) 3) "temperature"
4) "20.6"
在这个例子中,我们从名为mystream的Stream中读取了两个消息条目。*COUNT*参数指定了我们想要读取的条目数量,*STREAMS*参数指定了我们要读取的Stream的名称和从哪个位置开始读取。
3.4 XGROUP
*XGROUP*命令用于管理消费者组。
redis> XGROUP CREATE mystream mygroup 0
OK
在上面的例子中,我们创建了一个名为mygroup的消费者组,将其关联到mystream Stream的头部。消费组使用构成Stream ID的时间戳和序列号来确定是否存在并处理任何给定的条目。
3.5 XREADGROUP
*XREADGROUP*命令类似于*XREAD*,但是它在消费者组级别而不是Stream级别上操作。它从指定的消费者组中读取数据。
redis> XGROUP CREATE mystream mygroup $
OK
redis> XREADGROUP GROUP mygroup consumer_1 COUNT 1 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) "1589522086747-0"
2) 1) "temperature"
2) "25"
在上述例子中,我们使用*XGROUP*命令创建了一个名为mygroup的消费者组,并将其关联到mystream的末尾。我们使用*XREADGROUP*命令从mygroup消费者组读取一条消息条目。
3.6 XPENDING
*XPENDING*命令用于查询在待处理条目列表中的条目的信息。
redis> XPENDING mystream mygroup
1) (integer) 1
2) 1) "1589525520144-0"
2) "consumer_1"
3) (integer) 15098
在上面的例子中,我们使用*XPENDING*命令查询了mygroup消费者组中已处理但尚未确认的消息。XPENDING命令返回未确认的消息数量,消息的最小和最大ID,以及未确认消息的消费状态等信息。
3.7 XACK
*XACK*命令用于将已处理的消息从待处理条目列表中移除。
redis> XADD mystream * temperature 32.0
"1589541373526-0"
redis> XPENDING mystream mygroup
1) (integer) 1
2) 1) "1589541373526-0"
2) "consumer_1"
3) (integer) 13742
redis> XACK mystream mygroup 1589541373526-0
(integer) 1
redis> XPENDING mystream mygroup
(nil)
在上面的例子中,我们首先使用*XADD*在mystream中插入了一条消息。然后我们使用*XPENDING*查看了mygroup消费组的状态,并使用*XACK*将该条消息标记为已处理。
3.8 XCLAIM
*XCLAIM*命令用于将以下未处理执行者(或将它们移动到另一个消费组):
出于某种原因没有被ack的消息
消费者已超出消费者组之前所声明的时间窗口
redis> XADD mystream * temperature 32.0 humidity 70.0
"1589541373526-0"
redis> XGROUP CREATE mystream mygroup $
OK
redis> XREADGROUP GROUP mygroup consumer_1 COUNT 1 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) "1589541373526-0"
2) 1) "temperature"
2) "32"
3) "humidity"
4) "70"
使用*XREADGROUP*命令从mygroup消费组中检索未处理的条目,并将它们分配给消费者*consumer_1*处理。仅在*XACK*命令执行之前*consumer_1*挂了,因此当我们在使用*XPENDING*命令时,可以看到未确认的消息:
redis> XPENDING mystream mygroup
1) (integer) 1
2) 1) "1589541373526-0"
2) "consumer_1"
3) (integer) 771
现在假设我们想将该未处理的消息重新分配给另一个消费者进行处理。此时,我们可以使用*XCLAIM*命令:
redis> XCLAIM mystream mygroup consumer_2 3600000 1589541373526-0
1) 1) "1589541373526-0"
2) 1) "temperature"
2) "32"
3) "humidity"
4) "70"
在上述例子中,我们使用*XCLAIM*命令将具有ID 1589541373526-0的挂起条目重新分配给具有名称consumer_2的另一个消费者名。*3600000*表示在取消选择消息之前等待的时间(以毫秒为单位)。最后,该命令返回一个包含重新分配的条目的数据结构。
4. 结论
以上是关于Redis特殊数据类型之Stream的一些基本概念、命令及使用实例。Stream提供了在Redis中处理多个未处理数据条目的完美方法,允许用户通过创建消费组来实现平衡消费者工作,这导致了更好的性能和可伸缩性。Stream是Redis中极具价值的数据类型之一,因此开发人员应该特别注意它。希望这篇文章有助于了解Redis Stream的基本知识。