Redis学习
# Redis学习
# 概要
- 基本数据结构
- Java API
- 持久化
# 基本数据类型
# 1.Strings字符串
概念:Redis Strings存储字节序列,包括文本、序列化对象和二进制数组。由于 Redis 键是字符串,因此当我们也使用字符串类型作为值时,我们是将一个字符串映射到另一个字符串。
限制:默认情况下,单个 Redis 字符串最大可为 512 MB。
# 基本命令:key-value
- 基本操作
# 创建
> set key value
# 获取
> get key
value/(nil)
2
3
4
5
- 用途
- 实现缓存
# 过期时间命令:SET key value [EX seconds] [PX milliseconds]
- 描述
可以使用EX
和PX
选项来为键设置过期时间,这样键在一定时间后会自动被删除。
- 基本用法
# 创建
> set a 1 ex 1 # 一秒钟后过期
> set a 1 px 1 # 一分钟后过期
2
3
- 用途
- 验证码功能中,验证码有效时间
- 文件有效期
# 锁命令:key-value [nx|xx]
- 描述
这种操作通常用于实现一些特定的原子性操作或者在多个客户端并发操作时确保一致性。
- 基本用法
# 创建方法1
# NX(Not eXists): 表示只有在键不存在时才进行设置。如果键已经存在,则不执行任何操作。
# XX(eXists): 表示只有在键已经存在时才进行设置。如果键不存在,则不执行任何操作。
> set key value [nx|xx]
# 当key存在时,使用nx将不执行操作,使用xx将执行操作
# 例1,key=a不存在
> set a 1 nx | > set a 1 xx
OK | (nil)
> set a 2 nx | > set a 1 nx
(nil) | OK
> set a 2 xx | > set a 2 xx
OK | OK
2
3
4
5
6
7
8
9
10
11
12
- 用法
1. 分布式锁:当使用SET key value NX
时,可以将该操作作为获取锁的尝试。只有当键不存在时(表示锁没有被其他客户端持有),才会成功获取锁。这可以用于实现分布式系统中的锁机制,防止多个节点同时修改共享资源。
2. 原子性操作:在某些情况下,可能需要确保一组操作是原子执行的。通过使用SET key value NX
,可以尝试设置一个键,如果成功,则说明没有其他客户端在同时进行相同的操作。这可以用于实现原子性的操作序列。
3. 乐观锁:在并发环境中,乐观锁是一种通过版本控制来处理并发更新的机制。通过SET key value XX
,可以检查某个键是否存在并且具有特定的值,然后进行相应的更新。这可以用于实现乐观锁的机制,以防止并发更新引起的问题。
4. 防止重复操作:通过SET key value NX
,可以确保一个操作在一定时间内只执行一次。例如,可以用于确保用户在一段时间内只能执行某个特定的操作一次,以防止重复提交。
# 计数器命令:key-value
- 描述
即使字符串是Redis的基本值,也可以使用其他的方式(incr)来进行数据的转换,然后将计算后的值作为新的字符串进行替换。
incr是原子的,即使多个客户端针对同一key发出INCR也永远不会进入竞争状态。例如,永远不会发生客户端 1 读取“10”,客户端 2 同时读取“10”,两者都递增到 11,并将新值设置为 11。最终值将始终为 12,并且读取的值当所有其他客户端不同时执行命令时执行增量设置操作。
- 基本用法
# 创建
> set a 1
# 自增+1
> incr a
(integer) 2
# 自定义增加
> incrby a 10
(integer) 12
> incrby a -10
(integer) 2
> incrbyfloat a 1.5
"3.50000000000000284"
2
3
4
5
6
7
8
9
10
11
12
- 用途
- 计数器足以概括所有需要计数的操作了,例如:点赞、关注、在线人数等功能。
# 2.Lists列表
概念:Redis Lists是字符串值的链接列表。Redis 列表经常用于:
- 实现堆栈和队列。
- 为后台工作系统构建队列管理。
限制:Redis 列表的最大长度为 2^32 - 1 (4,294,967,295) 个元素。
# 基本命令
LPUSH
将新元素添加到列表的头部;RPUSH
添加到尾巴。LPOP
从列表头部删除并返回一个元素;RPOP
做同样的事情,但是从列表的尾部开始。LLEN
返回列表的长度。LMOVE
以原子方式将元素从一个列表移动到另一个列表。需要安装插件Redis Labs RediSearch
LTRIM
将列表缩小到指定的元素范围。与类似LRANGE
,但不是显示指定范围的元素,而是将此范围设置为新的列表值。给定范围之外的所有元素都将被删除。
- 基本用法
# 创建(头插)
> lpush a 1
(integer) 1 # 返回值为当前列表的长度
> lpush a 3
(integer) 2
# 创建(尾插)
> rpush a 2
(integer) 3
# 查看列表全部元素
> lrange key start stop # 与Python中列表的切片一样 0是开头从左到右递增 -1是末尾从右到左递减
> lrange a 0 -1
1) "3"
2) "1"
3) "2"
# 获取元素
> lpop a # 从左往右取数据
"3"
> rpop a # 从右往左取数据
"2"
# 获取列表长度
> llen a
(integer) 1
# 移动元素(需要插件)
> LMOVE source destination LEFT|RIGHT LEFTINDEX RIGHTINDEX
# source: 源列表的键名。
# destination: 目标列表的键名。
# LEFT|RIGHT: 指定方向,是从左侧移动到右侧还是从右侧移动到左侧。
# LEFTINDEX: 要移动的元素的在源列表中的索引位置(从左侧开始的索引,0 表示第一个元素)。
# RIGHTINDEX: 要移动的元素在目标列表中的索引位置。
> rpush list1 1 2 3 4 5 6
(integer) 6
> rpush list2 11 22 33 44 55 66
(integer) 6
> lmove list1 list2 left 0 -1 # 将左边的列表中从索引为0的元素开始到最后一个,移动到右边列表的尾端。
# 限制列表长度
> ltrim key start stop # start 开始下标 stop 结束下标
> rpush a 1 2 3 4 5 6
(integer) 6
> lrange a 0 -1
1) "1"
2) "2"
3) "3"
4) "4"
5) "5"
6) "6"
> ltrim a 2 -2
OK
> lrange a 0 -1
1) "3"
2) "4"
3) "5"
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
- 用途
- 实现队列和堆栈
上限列表
:只想使用列表来存储最新项目,无论它们是什么:社交网络更新、日志或其他任何内容。Redis 允许我们使用列表作为上限集合,仅记住最新的 N 个项目并使用命令丢弃所有最旧的项目LTRIM
。
# 队列、堆栈操作
概念:基于列表实现都是通过约束来进行实现,先进先出,先进后出。都是把方法定死只能这么用来实现对应的功能。
- 队列
# 入队(尾插)
> rpush a 1 2 3 4 5 6
(integer) 6
# 出队(头出)
> lpop a
"1"
2
3
4
5
6
- 堆栈
# 入栈(尾插)
> rpush a 1 2 3 4 5 6
(integer) 6
# 出栈(尾出)
> rpop a
"6"
2
3
4
5
6
# 阻塞列表操作
列表有一个特殊的功能,使它们适合实现队列,并且通常作为进程间通信系统的构建块:阻塞操作。
假设一下,您想要使用一个进程将项目推送到列表中,并使用不同的进程来实际对这些项目执行某种操作。这是通常的生产者/消费者设置,可以通过以下简单方式实现:
- 要将项目推入列表,生产者调用
LPUSH
。 - 要从列表中提取/处理项目,消费者调用
RPOP
.
然而,有时列表可能是空的,没有什么可处理的,所以RPOP
只返回 NULL。在这种情况下,消费者被迫等待一段时间并使用 重试RPOP
。这称为轮询,在这种情况下不是一个好主意,因为它有几个缺点:
- 强制 Redis 和客户端处理无用的命令(列表为空时的所有请求都不会完成任何实际工作,它们只会返回 NULL)。
- 增加了项目处理的延迟,因为工作线程收到 NULL 后,会等待一段时间。为了使延迟更小,我们可以在调用 之间等待更少的时间
RPOP
,从而放大问题 1,即对 Redis 进行更多无用的调用。
因此,Redis 实现了名为 和BRPOP
的命令BLPOP
,这些命令是的版本RPOP
并且LPOP
能够在列表为空时阻塞:仅当将新元素添加到列表中或达到用户指定的超时时,它们才会返回给调用者。
# 创建(头出阻塞)需要设置过期时间单位秒
> blpop key timeout
> blpop a 10 # 等待超时停止阻塞
(nil)
(10.06s)
# 一个终端使用blpop对a阻塞获取数据过期时间10s,另起一个终端对a进行尾插数据
> blpop a 10
1) "a"
2) "1"
(6.60s)
2
3
4
5
6
7
8
9
10
# 3.sets集合
描述:
Redis sets是唯一字符串(成员)的无序集合。您可以使用 Redis 集高效地:
- 跟踪唯一项目(例如,跟踪访问给定博客文章的所有唯一 IP 地址)。
- 表示关系(例如,具有给定角色的所有用户的集合)。
- 执行常见的集合运算,例如交集、并集和差集。
限制:Redis sets的最大大小为 2^32 - 1 (4,294,967,295) 个成员。
# 基本命令
SADD
将新成员添加到集合中。SREM
从集合中删除指定的成员。SISMEMBER
测试字符串的集合成员资格。SINTER
返回两个或多个集合共有的成员集(即交集)。SCARD
返回集合的大小(也称为基数)。
- 基本用法
# 创建
> sadd a 1
(integer) 1 # 返回添加成功的数据的个数
> sadd a 1 2
(integer) 1
> sadd a 2
(integer) 0
# 删除
> srem a 1
(integer) 1 # 返回删除成功的数据的个数
# 查找集合中是否存在铁定元素
> sismember a 1
(integer) 0 # 返回查找元素存在的个数
> sismember a 1 2
(integer) 1
# 获取集合大小
scard a
(integer) 2
# 获取集合的交集中的元素
> sinter a b
1) "1"
2) "2"
# 获取集合的差集元素,前一个与后一个的差异
> sadd set1 1 2 3 4 5
(integer) 5
> sadd set2 1 2 3 5 6 10 4
(integer) 7
> sdiff set1 set2
(empty list or set) # 因为set1是set2的子集,所以返回为空列表或者是集合
> sdiff set2 set1
1) "6"
2) "10"
# 获取并集
> sunion set1 set2
1) "1"
2) "2"
3) "3"
4) "4"
5) "5"
6) "6"
7) "10"
# 随机取出一个元素,删除集合中的对应元素
> spop set1
"2"
# 随机返回一个元素,但不删除集合中对应元素
> srandmember set1
"3"
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
- 用途
- 确保一个账号只能登陆一次,限制单个账号登陆设备数
- 获取当前在线人数
# 4.hashes哈希表
描述:
Redis hashes是结构化为字段值对集合的记录类型。可以使用哈希来表示基本对象并存储计数器分组等。
# 基本命令
HSET
(opens new window)设置哈希上一个或多个字段的值。HGET
(opens new window)返回给定字段的值。HMGET
(opens new window)返回一个或多个给定字段的值。HINCRBY
(opens new window)将给定字段的值增加所提供的整数。
- 操作
# 创建
> hset key field value # key是hashes的名字,field是key,value是value
> hset hash1 a 1 b 1 c 1 d 'hello world'
(integer) 3 # 返回创建成功的个数
# 取值(不会删除hashes中对应的key-value)
> hget hash1 d
"hello world"
# 取多个field的值
> hmget hash1 a d c f
1) "1"
2) "hello world"
3) "1"
4) (nil)
# 取出所有的值
> hgetall hash1
1) "a"
2) "1"
3) "b"
4) "1"
5) "c"
6) "1"
7) "d"
8) "hello world"
# 计算
> hincrby hash1 a 10
(integer) 11
> hincrby hash1 a -10
(integer) 1
# 获取所有的key
> hkeys hash1
1) "a"
2) "b"
3) "c"
4) "d"
# 获取所有的value
> hvals hash1
1) "1"
2) "1"
3) "1"
4) "hello world"
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
- 用途
- 存储对象信息其中:key为对象名称、field为属性名称、value属性名
# 5.sorted sets排序集合
描述:
Redis sorted sets是按关联分数排序的唯一字符串(成员)的集合。当多个字符串具有相同分数时,字符串按字典顺序排序。排序集的一些用例包括:
- 排行榜。例如,您可以使用排序集轻松维护大型在线游戏中最高分数的有序列表。
- 速率限制器。特别是,您可以使用排序集构建滑动窗口速率限制器,以防止过多的 API 请求。
可以将排序集视为集合和哈希的混合。与集合类似,排序集合由唯一的、不重复的字符串元素组成,因此在某种意义上,排序集合也是一个集合。
然而,虽然集合内的元素没有排序,但排序集合中的每个元素都与一个浮点值相关联,称为分数 (这就是为什么该类型也类似于散列,因为每个元素都映射到一个值)。
此外,排序集中的元素是按顺序获取的(因此它们不是根据请求排序的,顺序是用于表示排序集的数据结构的特性)。它们根据以下规则排序:
- 如果 B 和 A 是具有不同分数的两个元素,则如果 A.score > B.score,则 A > B。
- 如果 B 和 A 的分数完全相同,则如果 A 字符串按字典顺序大于 B 字符串,则 A > B。B 和 A 字符串不能相等,因为排序集仅具有唯一元素。
# 基本命令
ZADD
将新成员和关联分数添加到排序集中。如果该成员已存在,则更新分数。ZRANGE
返回在给定范围内排序的有序集的成员。ZRANK
返回所提供成员的排名,假设按升序排序。ZREVRANK
返回所提供成员的排名,假设排序集按降序排列。
- 操作
# 创建
> zadd scores nx 10 a 1 b 6 c
(integer) 3
# 显示排名从低到高
> zrange scores 0 -1
1) "b"
2) "c"
3) "a"
# 显示排名从高到低
> zrevrange scores 0 -1
1) "a"
2) "c"
3) "b"
# 显示排名同时显示对应分数
> zrange scores 0 -1 withscores # 升序
1) "b"
2) "1"
3) "c"
4) "6"
5) "a"
6) "10"
> zrevrange scores 0 -1 withscores # 降序
1) "a"
2) "10"
3) "c"
4) "6"
5) "b"
6) "1"
# 显示对应分数内的数据
> zrangebyscore scores 0 8 (0 < score < 8)
1) "b"
2) "c"
# 删除使用member也就是key
> zrem scores b
(integer) 1
# 删除使用分数
> zremrangebyscore scores 0 8 # 删除分数0到8的元素
(integer) 2
# 获取排名
> zrank scores a # 从高到低
(integer) 1
> zrevrank scores a # 从低到高
(integer) 2
# 增加得分
> zrange scores 0 -1 withscores
1) "g"
2) "5"
3) "a"
4) "10"
5) "aa"
6) "12"
7) "bb"
8) "13"
> zincrby scores 10 a
"20"
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
- 用途
- 实现排行榜
# 6.streams流
描述:
Redis 流是一种数据结构,其作用类似于仅追加日志,但也实现了多种操作来克服典型仅追加日志的一些限制。其中包括 O(1) 时间内的随机访问和复杂的消费策略,例如消费者组。您可以使用流来实时记录并同时聚合事件。Redis 流用例的示例包括:
- 事件溯源(例如,跟踪用户操作、点击等)
- 传感器监控(例如,现场设备的读数)
- 通知(例如,将每个用户的通知记录存储在单独的流中)
Redis 为每个流条目生成一个唯一的 ID。您可以使用这些 ID 稍后检索其关联条目或读取并处理流中的所有后续条目。请注意,由于这些 ID 与时间相关,因此此处显示的 ID 可能会有所不同,并且与您在自己的 Redis 实例中看到的 ID 不同。
Redis 流支持多种修剪策略(以防止流无限制增长)和多种消耗策略(请参阅XREAD
、XREADGROUP
和XRANGE
)。
# 基本命令
XADD
向流添加一个新条目。XREAD
读取一个或多个条目,从给定位置开始并及时向前移动。XRANGE
返回两个提供的条目 ID 之间的条目范围。XLEN
返回流的长度。
- 操作
# 创建
# 命令返回的条目 IDXADD明确地标识给定流中的每个条目,由两部分组成:
<millisecondsTime>-<sequenceNumber>
# millisecondsTime:生成流ID的本地Redis节点中的本地时间
# sequenceNumber:序列号用于在同一毫秒内创建的条目。由于序列号是64位宽,因此实际上同一毫秒内可以生成的条目数量没有限制。
> xadd key ID field string # id为自己设置或者使用*服务器为我们生成一个新的 ID。每个新的 ID 都会单调递增,因此更简单地说,与所有过去的条目相比,添加的每个新条目都将具有更高的 ID。由服务器自动生成 ID 几乎总是您想要的,并且显式指定 ID 的原因非常罕见。
> xadd log 1-0 name "chx" age 16 #不常用
"1-0"
> xadd log * name "c" age 16
"1703677399034-0"
> xadd log * name "chx" age 16
"1703677406316-0"
> xadd log * name "chx" age 15
"1703677442610-0"
# 读取一个或多个流条目
# 要按范围查询流,只需要指定两个 ID:start和end。返回的范围将包括以开始或结束作为 ID 的元素,因此该范围是包含在内的。两个特殊的 ID-和+分别表示可能的最小和最大 ID。
> xrange log - + # 读取全部条目
1) 1) "1-0"
2) 1) "name"
2) "chx"
3) "age"
4) "16"
2) 1) "1703677399034-0"
2) 1) "name"
2) "c"
3) "age"
4) "16"
3) 1) "1703677406316-0"
2) 1) "name"
2) "chx"
3) "age"
4) "16"
4) 1) "1703677442610-0"
2) 1) "name"
2) "chx"
3) "age"
4) "15"
> xrange log - + count 1 # 读取第一条数据
1) 1) "1-0"
2) 1) "name"
2) "chx"
3) "age"
4) "16"
> xrange log - + count 2 # 读取前两条数据
1) 1) "1-0"
2) 1) "name"
2) "chx"
3) "age"
4) "16"
2) 1) "1703677399034-0"
2) 1) "name"
2) "c"
3) "age"
4) "16"
> xrange log 1703677399034-0 + count 2 # 从指定id开始
1) 1) "1703677399034-0"
2) 1) "name"
2) "c"
3) "age"
4) "16"
2) 1) "1703677406316-0"
2) 1) "name"
2) "chx"
3) "age"
4) "16"
> xrevrange log + - # 反向查看数据
1) 1) "1703677442610-0"
2) 1) "name"
2) "chx"
3) "age"
4) "15"
2) 1) "1703677406316-0"
2) 1) "name"
2) "chx"
3) "age"
4) "16"
3) 1) "1703677399034-0"
2) 1) "name"
2) "c"
3) "age"
4) "16"
4) 1) "1-0"
2) 1) "name"
2) "chx"
3) "age"
4) "16"
# 查看当前流的长度
> xlen log
(integer) 4
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# 使用 XREAD 监听新项目
当我们不想通过范围访问流中的项目时,通常我们想要的是订阅流中到达的新项目。这个概念可能与Redis Pub/Sub相关,您可以订阅一个频道,或者与Redis阻塞列表相关,您可以等待一个键获取新元素来提取,但在消费流的方式上存在根本性的差异:
流可以有多个客户端(消费者)等待数据。默认情况下,每个正在等待在给定流中的数据的消费者都将收到每个新项目。这种行为与阻塞列表不同,其中每个消费者将获得不同的元素。然而,向多个消费者进行扇出的能力类似于Pub/Sub。
在Pub/Sub中,消息是“发射并忘记”的,并且无论如何都不会被存储,而在使用阻塞列表时,当客户端接收到消息时,它会从列表中弹出(有效地删除)。然而,流的工作方式从根本上讲是不同的。所有消息都会无限期地追加到流中(除非用户明确要求删除条目):不同的消费者将通过记住其接收的最后一条消息的ID来知道什么是新消息。
流消费者组提供了Pub/Sub或阻塞列表无法实现的控制级别,为同一流提供不同的组,显式确认已处理的项目,检查待处理项目的能力,声明未处理的消息以及为每个单一客户端提供一致的历史可见性,该客户端仅能看到其私有的消息历史。 提供监听流中到达新消息的能力的命令称为XREAD。它比XRANGE要复杂一些,因此我们将首先展示简单的形式,稍后将提供整个命令布局。
提供侦听到达流的新消息的功能的命令称为XREAD
。它比XRANGE
复杂一点,因此我们将开始显示简单的表单,稍后将提供整个命令布局。
> xread count 2 streams log 0
1) 1) "log"
2) 1) 1) "1-0"
2) 1) "name"
2) "chx"
3) "age"
4) "16"
2) 1) "1703677399034-0"
2) 1) "name"
2) "c"
3) "age"
4) "16"
2
3
4
5
6
7
8
9
10
11
12
以上是XREAD的非阻塞形式。请注意,COUNT选项并非强制性的。事实上,该命令唯一强制性的选项是STREAMS选项,它指定了一个键列表,以及调用方已经为每个流看到的相应最大ID,以便该命令将只向客户端提供ID大于我们指定的那个消息。
在上面的命令中,我们写了STREAMS race:france 0,因此我们希望获取Stream race:france中所有ID大于0-0的消息。正如您在上面的示例中看到的,该命令返回键名,因为实际上可以使用多个键调用此命令以同时从不同的流中读取消息。例如,我可以写:STREAMS race:france race:italy 0 0。请注意,在STREAMS选项之后,我们需要提供键名称,然后提供ID。因此,STREAMS选项必须始终是最后一个选项。任何其他选项必须位于STREAMS选项之前。
除了XREAD可以同时访问多个流,并且我们能够指定我们拥有的最后一个ID以获取更新的消息之外,在这种简单的形式中,该命令与XRANGE并没有太大的区别。然而,有趣的部分是我们可以通过指定BLOCK参数轻松将XREAD转换为阻塞命令:
# block为阻塞时间,阻塞时间设置为0时,永远阻塞
> xread block 10000 streams log 1703677442610-0
(nil)
(10.10s)
> xread block 10000 streams log 1703677442610-0
1) 1) "log"
2) 1) 1) "1703681397111-0"
2) 1) "name"
2) "zxn"
(1.60s)
> xread block 0 streams log 1703681397111-0
1) 1) "log"
2) 1) 1) "1703681529495-0"
2) 1) "name"
2) "zxn"
3) "age"
4) "21"
(38.85s)
> xread block 0 streams log $ # 这个特殊的ID表示XREAD应该使用流mystream中已存储的最大ID作为最后的ID
1) 1) "log"
2) 1) 1) "1703681680957-0"
2) 1) "name"
2) "kbc"
3) "age"
4) "21"
(37.06s)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
请注意,在上面的示例中,除了删除COUNT之外,我使用了超时为0毫秒的新BLOCK选项(这意味着永不超时)。此外,我没有传递流mystream的普通ID,而是传递了特殊的ID $。这个特殊的ID表示XREAD应该使用流mystream中已存储的最大ID作为最后的ID,这样我们将只接收新消息,从我们开始监听的时间开始。这在某种程度上类似于Unix命令中的tail -f。
请注意,当使用BLOCK选项时,我们不必使用特殊的ID $。我们可以使用任何有效的ID。如果命令能够立即无阻塞地提供服务,它将这样做,否则将阻塞。通常,如果我们想从新条目开始消耗流,我们使用ID $开始,然后继续使用接收到的最后一条消息的ID进行下一次调用,以此类推。
XREAD的阻塞形式还能够监听多个流,只需指定多个键名即可。如果请求可以同步地服务,因为至少有一个流的元素大于我们指定的相应ID,它将返回结果。否则,命令将阻塞,并返回第一个收到新数据的流的项目(根据指定的ID)。
与阻塞列表操作类似,从客户端等待数据的角度来看,阻塞流读取是公平的,因为语义是FIFO样式的。第一个为给定流阻塞的客户端将是在有新项可用时解除阻塞的第一个客户端。
XREAD除了COUNT和BLOCK之外,没有其他选项,因此它是一个具有特定目的的相当基本的命令,用于将消费者连接到一个或多个流。通过使用消费者组API可以使用更强大的功能来消耗流,然而,通过消费者组读取的实现是通过一个称为XREADGROUP的不同命令完成的,将在本指南的下一部分介绍。
# 消费群体
当任务是从不同的客户端消耗同一流时,XREAD已经提供了一种向N个客户端进行扇出的方法,还可以使用副本来提供更多的读取可伸缩性。然而,在某些问题中,我们想做的不是向许多客户端提供相同的消息流,而是向许多客户端提供来自同一消息流的不同子集。这在处理速度较慢的消息的情况下特别有用:具有接收流的不同部分的N个不同工作器的能力允许我们通过将不同的消息路由到准备做更多工作的不同工作器来扩展消息处理。
在实际情况中,如果我们想象有三个消费者C1、C2、C3,以及包含消息1、2、3、4、5、6、7的流,那么我们想要根据以下图表提供消息:
1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1
2
3
4
5
6
7
为了实现这一目标,Redis使用了一个称为消费者组的概念。非常重要的一点是,从实现的角度来看,Redis消费者组与Kafka(TM)消费者组无关。然而,它们在功能上相似,因此我决定保留Kafka(TM)的术语,因为它最初推广了这个想法。
消费者组类似于一个伪消费者,从流中获取数据,并实际上为多个消费者提供服务,提供了一定的保证:
- 每条消息都会传递给不同的消费者,以确保不可能将相同的消息传递给多个消费者。
- 消费者在消费者组内由一个名称标识,这是一个区分大小写的字符串,由实施消费者的客户端选择。这意味着即使在断开连接后,流消费者组仍保留所有状态,因为客户端将再次声明为相同的消费者。但是,这也意味着由客户端提供唯一标识符。
- 每个消费者组都有从未被消费的第一个ID的概念,因此当消费者请求新消息时,它只能提供先前未传递的消息。
- 然而,消费消息需要使用特定命令进行显式确认。Redis将确认解释为:此消息已正确处理,因此可以从消费者组中清除。
- 消费者组跟踪当前挂起的所有消息,即已传递给消费者组的某些消费者但尚未确认为已处理的消息。由于有了这个特性,当访问流的消息历史时,每个消费者只会看到传递给它的消息。
- 在某种程度上,可以将消费者组想象为关于流的某些状态的集合:
+----------------------------------------+
| consumer_group_name: mygroup |
| consumer_group_stream: somekey |
| last_delivered_id: 1292309234234-92 |
| |
| consumers: |
| "consumer-1" with pending messages |
| 1292309234234-4 |
| 1292309234232-8 |
| "consumer-42" with pending messages |
| ... (and so forth) |
+----------------------------------------+
2
3
4
5
6
7
8
9
10
11
12
从这个角度看,很容易理解消费者组能够做什么,它如何能够仅为消费者提供待处理消息的历史记录,以及请求新消息的消费者将只接收大于last_delivered_id的消息ID。同时,如果将消费者组视为Redis流的辅助数据结构,很明显一个单一的流可以有多个消费者组,它们有不同的消费者集合。实际上,同一个流可以有通过XREAD读取而没有消费者组的客户端,也可以有通过在不同的消费者组中使用XREADGROUP进行读取的客户端。
现在是时候放大看看基本的消费者组命令了。它们如下:
- XGROUP 用于创建、销毁和管理消费者组。
- XREADGROUP 用于通过消费者组从流中读取。
- XACK 是一个命令,允许消费者将挂起的消息标记为已正确处理。
# 创建消费者组
假设我已经有一个类型为stream的键log存在,为了创建一个消费者组,我只需要执行以下操作:
> xgroup create log users $
OK
2
正如您在上述命令中所看到的,当创建消费者组时,我们必须指定一个ID,在示例中只是$。这是必需的,因为消费者组在其他状态之间必须知道在第一个连接时要为下一个消息提供什么消息ID,即在组刚刚创建时的最后一个消息ID是什么。如果我们像上面那样提供$,那么只有从现在开始在流中到达的新消息才会提供给组中的消费者。如果我们指定0,那么消费者组将首先消耗流历史中的所有消息。当然,您可以指定任何其他有效的ID。您知道的是,消费者组将开始传递大于您指定的ID的消息。因为$表示流中的当前最大ID,指定$将只消耗新消息。
XGROUP CREATE还支持使用可选的MKSTREAM子命令作为最后一个参数自动创建流,如果流不存在:
> xgroup create student users $ mkstream
OK
2
现在消费者组已经创建,我们可以立即尝试通过使用XREADGROUP命令通过消费者组读取消息。我们将从名为Alice和Bob的消费者那里读取,以查看系统将如何向Alice或Bob返回不同的消息。
XREADGROUP与XREAD非常相似,并提供相同的BLOCK选项,否则它是一个同步命令。但是有一个必须始终指定的强制性选项,即GROUP,它有两个参数:消费者组的名称和尝试读取的消费者的名称。该选项COUNT也得到支持,并且与XREAD中的选项相同。
我们将在race:italy流中添加赛手,并尝试使用消费者组读取一些内容:注意:这里rider是字段名,name是关联的值。请记住,流项目是小字典。
> xreadgroup group users chx count 10 block 0 streams log >
1) 1) "log"
2) 1) 1) "1703746778586-0"
2) 1) "name"
2) "c"
(10.06s)
2
3
4
5
6
XREADGROUP的回复与XREAD的回复非常相似。但是请注意上面提供的GROUP < group-name > < consumer-name >。它表示我要使用消费者组mygroup从流中读取,我是消费者chx。每当消费者使用消费者组执行操作时,它必须指定其名称,以在组内唯一标识此消费者。
在上面的命令行中还有另一个非常重要的细节,在强制的STREAMS选项之后,对键mystream请求的ID是特殊ID >。此特殊ID仅在消费者组的上下文中有效,它表示:迄今为止从未传递给其他消费者的消息。
这几乎总是您想要的,但也可以在此处指定实际ID,例如0或任何其他有效ID。在这种情况下,但是,发生的情况是我们要求XREADGROUP只提供待处理消息的历史记录,并且在这种情况下,将永远不会看到组中的新消息。因此,基本上XREADGROUP根据我们指定的ID具有以下行为:
- 如果ID是特殊ID >,则该命令将仅返回到目前为止从未传递给其他消费者的新消息,并作为副作用,将更新消费者组的最后ID。
- 如果ID是任何其他有效的数字ID,则该命令将允许我们访问待处理消息的历史记录。也就是说,传递给此指定消费者(由提供的名称标识)的消息集,并且迄今为止从未使用XACK确认。
我们可以立即测试此行为,指定ID为0,而不使用任何COUNT选项:我们将只看到待处理消息。
> xreadgroup group users chx streams log 0
1) 1) "log"
2) 1) 1) "1703746590288-0"
2) 1) "name"
2) "chen hai xiao"
2) 1) "1703746734331-0"
2) 1) "name"
2) "chen"
3) 1) "1703746778586-0"
2) 1) "name"
2) "c"
2
3
4
5
6
7
8
9
10
11
然而,如果我们确认已处理该消息,它将不再是待处理消息历史的一部分,因此系统将不再报告任何内容:
# 处理消息
> xack log users 1703746590288-0
(integer) 1
> xreadgroup group users chx streams log 0
1) 1) "log"
2) 1) 1) "1703746734331-0"
2) 1) "name"
2) "chen"
2) 1) "1703746778586-0"
2) 1) "name"
2) "c"
2
3
4
5
6
7
8
9
10
11
如果您还不知道XACK的工作原理,不要担心,思路只是已处理的消息不再是我们可以访问的历史的一部分。
现在轮到zxn读取一些内容:
127.0.0.1:6379> xreadgroup group users zxn streams log >
1) 1) "log"
2) 1) 1) "1703812886399-0"
2) 1) "name"
2) "zxn"
> xreadgroup group users zxn streams log 0
1) 1) "log"
2) 1) 1) "1703812886399-0"
2) 1) "name"
2) "zxn"
> xack log users 1703812886399-0
(integer) 1
> xreadgroup group users zxn streams log 0
1) 1) "log"
2) (empty list or set)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
zxn请求了最多两条消息,并通过相同的消费者组users进行读取。因此,Redis仅报告新消息。正如您所见,“chen”消息未传递,因为它已经传递给Alice,因此zxn获取了zxn信息等等。
这样,chx、zxn和组中的任何其他消费者都能够从同一流中读取不同的消息,读取它们尚未处理的消息历史,或将消息标记为已处理。这允许从流中消耗消息的创建不同的拓扑和语义。
有几点需要记住:
- 消费者在第一次提及时会自动创建,无需显式创建。
- 即使使用XREADGROUP,您也可以同时从多个键中读取,但是为使其正常工作,您需要在每个流中使用相同的名称创建一个消费者组。这不是常见的需求,但值得一提的是技术上是可行的。
- XREADGROUP是一个写命令,因为即使它从流中读取,但作为读取的副作用,消费者组会被修改,因此只能在主实例上调用。
# 自动认领
XAUTOCLAIM命令是在Redis 6.2中添加的,实现了我们上面描述的认领过程。XPENDING和XCLAIM为不同类型的恢复机制提供了基本构建块。该命令通过由Redis管理来优化通用流程,并为大多数恢复需求提供了简单的解决方案。
XAUTOCLAIM识别处于空闲状态的待处理消息,并将其所有权转移到消费者。该命令的签名如下:
XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT count] [JUSTID]
因此,在上面的示例中,我可以使用自动认领来认领单个消息,如下所示:
# 该命令在redis6.2.0之后安装
> XAUTOCLAIM log users chx 6000 > COUNT 1
2