总结redis的zset数据结构实战用法

一 背景

之前做了项目中,很多地方都使用了redis,都使用了不同的redis数据结构来实现不同的需求,但是做完之后,却没有及时总结,现在,将设计到redis各数据结构的使用从项目中剥离出来,进行分析,本篇先着重讲zset数据结构的用途与常见指令,底层原理解析留在以后的笔记中学习记录!

我使用的后端框架Django,原生的redis操作集,redis版本6.0.6


二 zset数据结构

我们都直到redis中的有常用的五大数据结构,分别为string, list, hash, set, zset,可以看到set是集合,而zset是有序集合.本篇所要讲的zset是基于集合上一种有顺序(分值)的的数据结构,有序集合中的每一个元素项会对应一个分数值,并且分数是可以重复的,它会按照分值进行排序,从小到大,或从大到小,我使用zset的频率比set要多得多.接下来我将通过自己项目中的例子来分析zset中常见的操作.


三 开启讲解

1.搜索历史记录栏和热搜词汇排行榜

搜索历史记录栏可以说对于每个网络应用程序都是必不可少的功能,像存储用户搜索历史这样的热点数据,如果存储在像mysql这样关系型数据,纯I/O读写的速度是较慢的,存储在mysql中,每次添加会给数据库带来过大的压力,而使用mongodb非关系性数据库来说,它使用内存映射存储引擎,它会把磁盘I/O操作转化为内存操作,因而大大提升了读写性能.因此该功能实现可以基于mongodb,而我这里选了历史搜索记录,因为在我(小菜鸟)看来,第一时间想到的实现方法就是基于redis的zset数据结构,并且搜索记录不必一直存在,可以持续1个月或几个月就可以自动删除,还有一方面的原因就是自己对mongodb用法,使用场景,原理不是特别了解.


项目关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 上下文管理器代码
@contextlib.contextmanager
def manage_redis(db, redis_class=BaseRedis, redis=None):
try:
# redis = get_redis_connection(db) # redis实例链接
redis = redis_class.choice_redis_db(db).redis # 单例获取redis实例对象
yield redis
except Exception as e:
redis_logger.error(e)
raise RedisOperationError()
finally:
redis.close() # 其实可以不要,除非single client connection, 每条执行执行完都会调用conn.release()


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

# 记录用户搜索的关键词和维护热搜数据
@client_key # 自定义的装饰器,用于获取唯一key标识
def save_search(self, sender, key, **kwargs):
"""
记录某用户的浏览记录
有效时间1个月
"""
with manage_redis(self.DB, type(self)) as redis:
# 为每个用户维护一个搜索有序集合
# 为所有关键词维护一个有序集合,用于分析
with redis.pipeline() as pipe:
pipe.zadd(self.user_key(sender), {key: self.score})
# 60*60*24*30 = 25920000 30天存活
pipe.expire(self.user_key(sender), 25920000)
pipe.zincrby(self.heat_key(datetime.datetime.today()), 1, key) # 将该关键字添加到热搜有序集合中,如果存在key,则+1,不存在设置为1
pipe.execute()


@client_key
def delete_search_single(self, sender, key, **kwargs):
"""
单删某条搜索历史记录
"""
with manage_redis(self.DB, type(self)) as redis:
redis.zrem(self.user_key(sender), key)


@client_key
def delete_search_all(self, sender, **kwargs):
"""
群删所有搜索历史记录
"""
with manage_redis(self.DB, type(self)) as redis:
redis.delete(self.user_key(sender))


@client_key
def retrieve_last_ten(self, sender, key, **kwargs):
"""根据分页获取最新的10条搜索记录"""

# with 生存周期持续到函数结束
with manage_redis(self.DB, type(self)) as redis:
page = kwargs.get('page')
count = kwargs.get('count')
result = redis.zrevrange(self.user_key(sender), page * count, (page + 1) * count) # 返回分数从高到低的前十个(时间最近的前十个)
return result


def heat_search(self, sender, key, **kwargs):
"""
每日热搜
动态更新每日的前十位
"""
with manage_redis(self.DB, type(self)) as redis:
date = datetime.datetime.today()
result = redis.zrevrange(self.heat_key(date), 0, 10) # 前十大热搜
return result

总体分析:

1).我这里使用了上下文管理器来获取redis和自动释放redis,通过传递配置中使用的DB和类型,获取单例操作类的redis实例.

2).因为涉及到的redis指令较多,我使用了pipeline来传输命令,统一操作,这样一方面可以减少与redis通信多次命令执行的RTT(往返时间),另一方面可以可以保证数据的顺序,按照队列先进先出的顺序.


save_search()函数方法分析

1).首先向pipeline推送zadd命令,zadd命令是向制定name的有序集合中添加一条数据{key:score}, name用于唯一表示有序集合,key用于唯一标识一条数据项的键,score表示其对应的分数值,这里我将时间戳作为score.因此score越小表示浏览的时间越早,这样在从redis取出显示在前端就可以按照时间从前往后的顺序进行展示,有效的减少了时间复杂度.zadd命令时间复杂度为0(log(N))

2).然后为该键设置过期时间,通过expire命令设定30天存活

3).借着使用zincrby方法维护一个有序集合,用于记录每天的热搜词汇,每当有一个用户搜索词汇时,将该词汇添加到集合中,每次加1,如果一开始没有该词汇,则默认为1,时间复杂度为0(log(N))

4).最后,执行execute(),依次执行pipeline中的指令.


** heat_search() 函数方法分析**

1)使用zrevrange()操作来查找按照score值递减来排列,具有相同score的成员按照字典序的反序排序.该函数,返回热搜sorted set中排名前十的关键词返回.时间复杂度为O(N+log(M)),M表示当前sorted set中所具有的元素个数,M表示返回的数据个数.
ps:zrevrange()中可以添加score=True属性,来获取带有分数值的列表


2.足迹记录

足迹记录和搜索历史记录有者异曲同工的作用.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

def add_foot_commodity_id(self, user_id, validated_data):
"""
消费者浏览某个商品,添加足迹
:param validated_data: 验证后的数据
:param user_id:用户id
:return:boolean
"""
# add_foot.apply_async(args=(pickle.dumps(self), user_id, validated_data)) # can't pickle _thread.lock objects
with manage_redis(self.db) as redis:
try:
key = self.key('foot', user_id)
timestamp = self.score # 毫秒级别的时间戳
commodity_id = validated_data['pk']
# pipe = self.redis.pipeline() # 添加管道,减少客户端和服务端之间的TCP包传输次数
redis.zadd(key, {commodity_id: timestamp}) # 分别表示用户id(加密),当前日期+时间戳(分数值),商品id
# 每个用户最多记录100条历史记录
if redis.zcard(key) >= 100: # 集合中key为键的数量
redis.zremrangebyrank(key, 0, 0) # 移除时间最早的那条记录
# pipe.execute()
return True
except Exception as e:
consumer_logger.error(e)
raise RedisOperationError()

分析:

这个项目中添加足迹的相关操作我也是使用了sorted key, 因为大多数用户可能并不会在意自己的浏览足迹,而且用户每浏览一个商品都会添加足迹,但是多次浏览同一个商品,商品不变只是浏览的时间变化了,因此用sorted key 来模拟这个功能还是比较合适的,时间作为score,商品id作为key,key唯一,而分数值可以相同,替换旧的分数值.

zard()操作用于计算当前集合中数据项的个数,我这里限定了记录100条足迹记录,一旦某次添加超过了,就自动删除时间最早的那条记录,使用zremrangebyrank方法,按照排名区间删除,start和stop分别设置为0,0处表示分数值最小(即时间最早).


3.运动排行榜

排行榜在像游戏APP,运动APP中很常见,我们熟悉的就是微信运动排行榜,用redis实现排行榜毋庸置疑,每日更新,非常nice!话不多说,上代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def retrieve_cur_rank_user(self, member, mold, today=None, redis_name='default'):
"""
当天计数,获取当前用户在全服运动榜中的排名和运动值,从大到小

:param redis_name: redis name in config
:param mold:运动项目类型
:param today:当天日期
:param member: 用户id
:return [rank ,score]

键:'rank'-mold-date

数据结构:sorted set
"""
member = str(member)
today = today or (datetime.datetime.now()).strftime('%Y-%m-%d')
with manager_redis(redis_name) as redis:
name = self.key('rank', mold, today)
pipe = redis.pipeline()
pipe.zrevrank(name, member) # 获取当前用户的排名
pipe.zscore(name, member) # 获取当前用户的运动值
result = pipe.execute()
return result


分析

1.使用revrank从有序集成员中按score值从小到大进行排序,排名以0为底,socre分数值最大的成员排名为0

2.使用zscore来从有序集成员中获取当前用户的运动值

3.使用pipeline减少RTT的时间消耗


四 总结:

以上是我几个项目中使用redis的zset的部分典型实例.由于代码太多,则就不全不贴出来,有兴趣的可以去看我github上的项目.

1.第一个项目地址:https://github.com/syz247179876/Django-Mall

2.第二个项目地址:https://github.com/syz247179876/Flask-Sports


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!