专业编程基础技术教程

网站首页 > 基础教程 正文

一、Redis基础和应用篇 redis相关知识

ccvgpt 2024-10-12 13:48:57 基础教程 5 ℃

一、引子

Reids能做什么?缓存,实现分布式锁

二、Redis基础数据结构

Reids有5种基础数据结构,分别是:字符串(string)、列表(list)、字典(hash)、集合(set)和有序集合(zset)

一、Redis基础和应用篇 redis相关知识

2.1、字符串(string)

Redis的字符串是动态字符串,是可以修改的字符串,内部结构的实现类似于Java的ArrayList,就是一个字符数组,采用预分配冗余空间的方式来减少内存的频繁分配。当字符串长度小于1MB时,扩容都是加倍现有的空间。如果长度超过1MB,扩容时一次只会多扩1MB的空间。最大长度是512MB。可以用来缓存用户信息。常用的命令有:

set key value

get key

exists key

del key

mget key1 key2 key3

mset key1 value1 key2 value2 key3 value3

expire key

setex key 5 value # 5s 后过期

setnx key value

incr key

incrby key 5

2.2、列表(list)

Redis的列表相当于Java中的LinkedList,注意它是链表而不是数组。列表中的每个元素都使用双向指针,串起来可以同时支持向前向后遍历。当列表弹出最后一个元素之后,该数据结构被自动删除,内存被回收。再深入一点,Redis底层存储的不是一个简单的LinkedList,而是称之为“快速链表”(quicklist)的一个结构。这种结构在列表元素较少的情况下,会使用一块连续的内存存储,这个结构是压缩列表,叫ziplist。当数据量比较多的时候才会改成quicklist。因为普通的链表需要的附件指针空间太大,会浪费空间,还会加重内存的碎片化,比如某普通链表中存的只是int类型的数据,结构上还需要两个额外的指针prev和next。所以Redis将链表和ziplist结合起来组成了quicklist,也就是将多个ziplist使用双向指针串起来使用。quicklist既满足了快速的插入删除性能,又不会出现太大的空间冗余。

Redis的列表结构常用来做异步队列使用。将需要延迟处理的任务结构体序列化成字符串,塞进Redis的列表,另一个线程从这个列表中轮询数据进行处理。

常用的命令有:

rpush key v1 v2 v3

llen key

lpop、rpop key

lindex key 1 # O(n) 慎用

lrange key 0 -1 # 获取所有元素,O(n)慎用

ltrim key 1 -1 # O(n)慎用,

其中lindex相当于Java链表的get(int index)方法,他需要对链表进行遍历,性能随着参数index增大而变差。ltrim是保留,可以通过ltrim来实现一个定长的链表。

2.3、字典(hash)

Redis的字典实现结构上与Java的HashMap也是一样的,都是“数组+链表”二维结构。不同的是,Redis的字典的值只能是字符串,另外它们rehash的方式不一样,因为Java的HashMap在字典很大时,rehash是个耗时的操作,需要一次性全部rehash。Redis为了追求高性能,不能堵塞服务,所以采用了渐进式rehash策略。渐进式rehash会在rehash的同时,保留新旧两个hash结构,查询时会同时查询两个hash结构,然后在后续的定时任务以及hash操作指令中,循序渐进的将旧hash的内容一点点的迁移到新的hash结构中。当搬迁完成了,就会使用新的hash结构取而代之。当hash移除了最后一个元素之后,该数据结构被自动删除,内存被回收。

hash结构也可以用来存储用户信息,与字符串需要一次性全部序列化整个对象不同,hash可以对用户结构中的每个字段单独存储。这样当我们需要获取用户信息时可以进行部分获取。而以整个字符串的形式去保存用户信息的话,就只能一次性全部读取,这样就会浪费网络流量。

hash也有缺点,hash结构的存储消耗要高于单个字符串。到底该使用hash还是字符串,需要根据实际情况再三权衡。

常用的命令有:

hset key k1 v1

hgetall key

hlen key

hget key k1

hmset key k1 v1 k2 v2

hincrby key k1 1

2.4、集合(set)

Redis的集合相当于Java里的HashSet,它内部的键值对是无序的、唯一的。它的内部实现相当于一个特殊的字典,字典中所有的value都是一个值NULL。

set可以用来存储某个活动中中奖的用户ID,因为有去重功能,可以保证同一个用户不会中奖两次

常用的命令有:

sadd key v1

smembers key # 无序

sismember key v1 # contains(v1)

scard key # count

spop key # 弹出一个

2.5、有序集合(zset)

它类似于Java的SortedSet和HashMap的结合体,一方面它是一个Set,保证了内部value的唯一性,另一方面它可以给每个value赋予一个score,代表这个value的排序权重。它的内部实现用的是一种叫作“跳跃列表”的数据结构。

zset可以用来存储粉丝列表,value值是粉丝的用户ID,score是关注时间。我们可以对粉丝列表按关注时间进行排序。

常用的命令有:

zadd key score1 v1

zrange key 0 -1 # 按score排序列出,参数区间为排名范围

zrevrange key 0 -1 # 按score逆序列出,参数区间为排名范围

zcard key

zscore key v1

zrank key v1

zrangebyscore key 0 9 # 根据分值区间遍历zset

zrangebyscore key -inf 9 withscores # 根据分值区间(-无穷大,9] 遍历zset

zrem key v1

2.6、过期时间

Redis所有的数据结构都可以设置过期时间,时间到了,Redis会自动删除相应的对象。需要注意的是,如果一个字符串已经设置了过期时间,然后调用set方法修改了它,它的过期时间会消失。

三、分布式锁

3.1、分布式锁的奥义

分布式锁本质上要实现的目标就是在Redis里面占一个“坑”,当别的线程也要来占坑时,发现那里已经有一根“大萝卜”了,就只好放弃或者稍后再试。占坑一般使用setnx指令,用完了,再调用del指令释放“坑”。

这两个指令不是原子的,为了解决加入expire自动释放锁,但是可能会出现机器掉电或者人为因素,导致expire无法执行产生的死锁。此时不能使用Redis事务来解决,因为epire是依赖于sexnx的执行结果的,如果setnx没抢到锁,expire是不应该执行的。事务里没有if-else分支逻辑,事务的特点是一口气执行,要么全部执行,要么一个都不执行。

redis2.8版本中加入:set key val ex time nx 解决该问题

3.2、超时问题

Redis分布式锁不要用于较长时间的任务,如果真的偶尔出现了问题,造成的数据小错乱可能需要人工接入解决。

有一个稍微安全一点的方案是将set指令的value参数设置为一个随机数,释放锁时先匹配随机数是否一致,然后再删除key,这是为了确保当前线程占有的锁不会被其他线程释放,除非这个锁是因为过期了而被服务器自动释放的。

但是匹配value和删除key不是一个原子操作,这就需要Lua脚本来处理了,因为Lua脚本可以保证连续多个指令的原子操作。但是这也不是一个完美的方案,它只是相对安全一点,因为如果真的超时了,当前线程的逻辑没有执行完,其他线程也会乘虚而入。

if	redis.call("get",KEYS[1]==ARGV[1] then
	return redis.call("del",KEYS[1])
else
	return 0
end

四、布隆过滤器

4.1、布隆过滤器是什么

当布隆过滤器说某个值存在时,这个值可能不存在;当它说某个值不存在时,那就肯定不存在。

4.2、布隆过滤器的基本用法

bf.add、bf.exists、bf.madd、bf.mexists

五、简单限流

简单限流使用了滑动窗口,由于操作都是针对同一个key,所以使用了pipeline。但这种方案不适合一个period范围内有大量操作的情况,会消耗大量的存储空间

public class SimpleRateLimiter {

    public static void main(String[] args) {
       Jedis jedis = new Jedis();
       SimpleRateLimiter simpleRateLimiter = new SimpleRateLimiter(jedis);
        for (int i = 0; i < 60; i++) {
            try {
                Thread.sleep(20);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(simpleRateLimiter.isActionAllowed("leo","reply",60,5));
        }
    }

    private Jedis jedis;

    public SimpleRateLimiter(Jedis jedis) {
        this.jedis = jedis;
    }

    public boolean isActionAllowed(String userId, String actionKey, int period, int maxCount) {
        String key = String.format("hist:%s:%s", userId, actionKey);
        long nowTs = System.currentTimeMillis();
        Pipeline pipe = jedis.pipelined();
        pipe.multi();
        pipe.zadd(key, nowTs, "" + nowTs);
        pipe.zremrangeByScore(key, 0, nowTs - period * 1000);
        Response<Long> count = pipe.zcard(key);
        pipe.expire(key, period + 1);
        pipe.exec();
        pipe.close();
        return count.get() <= maxCount;
    }
}

六、漏斗限流

public class FunnelRateLimiter {

    static class Funnel {
        int capacity;// 漏斗容量
        float leakingRate;// 漏嘴流水速率
        int leftQuota;// 漏斗剩余空间
        long leakingTs;// 上一次漏水时间

        public Funnel(int capacity, float leakingRate) {
            this.capacity = capacity;
            this.leakingRate = leakingRate;
            this.leftQuota = capacity;
            this.leakingTs = System.currentTimeMillis();
        }

        void makeSpace() {
            long nowTs = System.currentTimeMillis();
            long deltaTs = nowTs - leakingTs;
            int deltaQuota = (int) (deltaTs * leakingRate);
            System.out.println(deltaQuota);
            // 间隔时间太长,整数数字过大溢出
            if (deltaQuota < 0) {
                this.leftQuota = capacity;
                this.leakingTs = nowTs;
            }
            // 腾出空间太小,最小单位是1
            if (deltaQuota < 1) {
                return;
            }
            this.leftQuota += deltaQuota;
            this.leakingTs = nowTs;
            if (this.leftQuota > this.capacity) {
                this.leftQuota = capacity;
            }
        }


        boolean watering(int quota) {
            makeSpace();
            if (this.leftQuota >= quota) {
                this.leftQuota -= quota;
                return true;
            }
            return false;
        }

    }

    private Map<String, Funnel> funnels = new HashMap<String, Funnel>();

    public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate) {
        String key = String.format("%s:%s", userId, actionKey);
        Funnel funnel = funnels.get(key);
        if (funnel == null) {
            funnel = new Funnel(capacity, leakingRate);
            funnels.put(key, funnel);
        }
        return funnel.watering(1);
    }

    public static void main(String[] args) {
        FunnelRateLimiter funnelRateLimiter = new FunnelRateLimiter();
        for (int i = 0; i < 60; i++) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(funnelRateLimiter.isActionAllowed("leo", "reply", 5, 0.001f));
        }
    }
}

上面这个使用redis实现很复杂,可以使用Redis-Cell来实现

cl.throttle leo:reply 15 30 60 1:表示初始容量15,每60秒最多30次,最小是空间是1

七、Scan

7.1、如何从海量的Key中,找出满足特定前缀的key列表?

使用keys *aaa key,但是这样做有2个问题:

1、没有offset、limit参数,一次性突出所有满足条件的key,万一实例中有几百万个key满足条件,满屏幕的返回结果,很难受;

2、keys算法是遍历算法,复杂度是O(n),如果实例中有千万级别以上的key,这个指令就会导致Redis服务卡顿。

可以采用scan指令解决这个问题。

7.2、scan的一些特点

1、复杂度虽然也是O(n),但它是通过游标分步进行的,不会阻塞线程

2、提供limit参数,可以控制每次返回结果的最大条数,limit只是一个hint,返回的结果可多可少

3、同keys一样,它也提供模式匹配功能

4、服务器不需要为游标保存状态,游标的唯一状态就是scan返回给客户端的游标整数

5、返回的结果可能会有重复,需要客户端去重,这点非常重要

6、遍历的过程中如果有数据修改,改动后的数据能不能遍历到是不确定的

7、单次返回的结果是空的并不意味着遍历结束,而要看返回的游标值是否为零

7.3、scan命令

scan 0 match key99* count 1000

limit限定的是服务器单次遍历的字典槽位数量,而不是返回结果的数量。

更多的指令如:hscan、zscan、sscan

7.4、scan的原理

字典的结构:

在Redis中所有的key都存储在一个很大的字典中,这个字典的结构和Java中的HashMap一样,它是一维数组,二维链表结构。scan指令返回的游标就是第一个数组的位置索引,limit参数就表示需要遍历的索引槽位数,之所以返回的结果可能多可能少,是因为不是所有的槽位上都会挂接链表,有些多,有些可能没有。每一次遍历都会将limit数量的槽位上挂接的所有链表元素进行模式匹配过滤后,一次性返回给客户端。

scan遍历顺序:

它不是从第一维数组的第0位一直遍历到末尾,而是采用了高位进位加法来遍历。之所以使用这样特殊的方式进行遍历,是考虑到字典的扩容和缩容时避免槽位的遍历重复和遗漏,如下图:

7.5、大key问题

如果某个key太大,会导致数据迁移卡顿。另外内存分配上,如果一个key太大,那么当它需要扩容时,会一次性申请更大的内存,这也会导致卡顿。如果这个key被删除,内存会被一次性回收,卡顿现象也会再次产生。

如果观察到Redis的内存大起大落,这极有可能是因为大key导致的,这时候就需要定位出具体是哪个key,进一步定位出具体的业务来源,然后在改进相关业务代码设计。那么如何定位大key呢?可以使用scan指令,对于扫描出来的每一个key,使用type指令获得key的类型,然后使用相应数据结构的size或者len方法得到它的大小,对于每一种类型,将大小排名的前若干名作为扫描结果展示出来。

Redis官方给出的命令:redis-cli -h 127.0.0.1 -p 6379 --bigkeys -i 0.1每次休眠0.1s,ops不会剧烈抬升

最近发表
标签列表