Jade Dungeon

Redis的分布式锁

基于Redis的分布式锁

可靠性

首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:

  1. 互斥性。在任意时刻,只有一个客户端能持有锁。
  2. 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁, 也能保证后续其他客户端能加锁。
  3. 具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。
  4. 解铃还须系铃人。加锁和解锁必须是同一个客户端, 客户端自己不能把别人加的锁给解了。

组件依赖

首先我们要通过Maven引入Jedis开源组件:

<dependency>
	<groupId>redis.clients</groupId>
	<artifactId>jedis</artifactId>
	<version>2.9.0</version>
</dependency>

加锁代码

正确实现

public class RedisTool {

	private static final String LOCK_SUCCESS = "OK";
	private static final String SET_IF_NOT_EXIST = "NX";
	private static final String SET_WITH_EXPIRE_TIME = "PX";

	/**
	 * 尝试获取分布式锁
	 * @param jedis Redis客户端
	 * @param lockKey 锁
	 * @param requestId 请求标识
	 * @param expireTime 超期时间
	 * @return 是否获取成功
	 */
	public static boolean tryGetDistributedLock(
		Jedis jedis, String lockKey, String requestId, int expireTime) 
	{
		String result = jedis.set(lockKey, requestId, 
						SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
		if (LOCK_SUCCESS.equals(result)) {
			return true;
		} else {
			return false;
		}
	}
}

可以看到,我们加锁就一行代码:jedis.set()一共有五个形参:

  1. key,用来当锁,因为key是唯一的。
  2. value,传的是requestId,光有key作为锁不够,还要用到value。 分布式锁要满足可靠性。通过给value赋值为requestId,可以区分锁是哪个请求加的, 在解锁的时候作为依据。requestId可以使用UUID.randomUUID().toString()方法生成。
  3. nxxx,这个参数填的是NX,代表SET IF NOT EXIST,即当key不存在时, 我们进行set操作;若key已经存在,则不做任何操作;
  4. expx,这个参数传的是PX,给这个key加一个过期的设置,具体时间由第五个参数决定。
  5. time,与第四个参数相呼应,代表key的过期时间。

总的来说,执行上面的set()方法就只会导致两种结果:

  1. 当前没有锁(key不存在),那么就进行加锁操作,并对锁设置个有效期, 同时value表示加锁的客户端。
  2. 已有锁存在,不做任何操作。

加锁代码满足我们可靠性里描述的三个条件:

  1. 首先,set()加入了NX参数,可以保证如果已有key存在,则函数不会调用成功, 也就是只有一个客户端能持有锁,满足互斥性。
  2. 其次,由于我们对锁设置了过期时间,即使锁的持有者后续发生崩溃而没有解锁, 锁也会因为到了过期时间而自动解锁(即key被删除),不会发生死锁。
  3. 最后,因为我们将value赋值为requestId,代表加锁的客户端请求标识, 那么在客户端在解锁的时候就可以进行校验是否是同一个客户端。 由于我们只考虑Redis单机部署的场景,所以容错性我们暂不考虑。

错误示例1

比较常见的错误示例就是使用jedis.setnx()jedis.expire()组合实现加锁, 代码如下:

public static void wrongGetLock1(
	Jedis jedis, String lockKey, String requestId, int expireTime) 
{
	Long result = jedis.setnx(lockKey, requestId);

	if (result == 1) {
		// 若在这里程序突然崩溃,则无法设置过期时间,将发生死锁
		jedis.expire(lockKey, expireTime);
	} else { 
		// do nothing
	}
}

setnx()方法作用就是「SET IF NOT EXIST」,expire()方法就是给锁加一个过期时间 。乍一看好像和前面的set()方法结果一样,然而由于这是两条Redis命令, 不具有原子性,如果程序在执行完setnx()之后突然崩溃,导致锁没有设置过期时间。 那么将会发生死锁。网上之所以有人这样实现,是因为低版本的jedis并不支持多参数的 set()方法。

错误示例2

这一种错误示例就比较难以发现问题,而且实现也比较复杂。实现思路: 使用jedis.setnx()命令实现加锁,其中key是锁,value是锁的过期时间。 执行过程:

  1. 通过setnx()方法尝试加锁,如果当前锁不存在,返回加锁成功。
  2. 如果锁已经存在则获取锁的过期时间,和当前时间比较,如果锁已经过期, 则设置新的过期时间,返回加锁成功。

代码如下:

public static boolean wrongGetLock2(
		Jedis jedis, String lockKey, int expireTime) 
{
	long   expires   = System.currentTimeMillis() + expireTime;
	String expiresStr = String.valueOf(expires);

	// 如果当前锁不存在,返回加锁成功
	if (jedis.setnx(lockKey, expiresStr) == 1) { return true; }

	// 如果锁存在,获取锁的过期时间
	String currentValueStr = jedis.get(lockKey);

	if (currentValueStr != null &&  //
			Long.parseLong(currentValueStr) < System.currentTimeMillis()) 
	{
		// 锁已过期,获取上一个锁的过期时间,并设置现在锁的过期时间
		String oldValueStr = jedis.getSet(lockKey, expiresStr);
		if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
			// 考虑多线程并发的情况,只有一个线程的设置值和当前值相同,
			// 它才有权利加锁
			return true;
		}
	}

	// 其他情况,一律返回加锁失败
	return false;
}

那么这段代码的问题:

  • 由于是客户端自己生成过期时间, 所以需要强制要求分布式下每个客户端的时间必须同步。
  • 当锁过期的时候,如果多个客户端同时执行jedis.getSet()方法, 那么虽然最终只有一个客户端可以加锁, 但是这个客户端的锁的过期时间可能被其他客户端覆盖。
  • 锁不具备拥有者标识,即任何客户端都可以解锁。

解锁代码

正确例子

public class RedisTool {

	private static final Long RELEASE_SUCCESS = 1L;

	/**
	 * 释放分布式锁
	 * @param jedis Redis客户端
	 * @param lockKey 锁
	 * @param requestId 请求标识
	 * @return 是否释放成功
	 */
	public static boolean releaseDistributedLock(
			Jedis jedis, String lockKey, String requestId) 
	{
		Object result = jedis.eval(
			" if redis.call('get', KEYS[1]) == ARGV[1] " + // Lua 脚本
				" then return redis.call('del', KEYS[1]) else return 0 end", 
			Collections.singletonList(lockKey), 
			Collections.singletonList(requestId));

		if (RELEASE_SUCCESS.equals(result)) {
			return true;
		} else {
			return false;
		}
	}

}

解锁的过程是将Lua代码传到jedis.eval()方法里,并使参数KEYS[1]赋值为lockKeyARGV[1]赋值为requestId。Lua脚本取得锁对应的value值是否与requestId相等。 Redis保证eval()函数执行的Lua脚本完成以后,Redis才会执行其他命令。

错误示例1

最常见的解锁代码就是直接使用jedis.del()方法删除锁, 这种不先判断锁的拥有者而直接解锁的方式,会导致任何客户端都可以随时进行解锁, 即使这把锁不是它的:

public static void wrongReleaseLock1(Jedis jedis, String lockKey) {
	jedis.del(lockKey);
}

错误示例2

分成两条命令去执行,并发环境出容易出错:

public static void wrongReleaseLock2(
		Jedis jedis, String lockKey, String requestId) 
{

    // 判断加锁与解锁是不是同一个客户端
    if (requestId.equals(jedis.get(lockKey))) {
        // 两步之间,其他客户端抢占了锁的占有权,则会误解锁
        jedis.del(lockKey);
    }
}

如代码注释,问题在于如果调用jedis.del()方法的时候, 这把锁已经不属于当前客户端的时候会解除他人加的锁。那么是否真的有这种场景? 答案是肯定的,比如客户端A加锁,一段时间之后客户端A解锁,在执行jedis.del()之前 ,锁突然过期了,此时客户端B尝试加锁成功,然后客户端A再执行del()方法, 则将客户端B的锁给解除了。

多机部署Redisson实现分布式锁

如果你的项目中Redis是多机部署的,那么可以尝试使用Redisson实现分布式锁, 这是Redis官方提供的Java组件,参考链接:https://github.com/redisson/redisson

Redis实现布隆过滤器

布隆过滤器是一种概率数据结构,用来高效地测试集合中是否存在某个元素。 使用布隆过滤器有助于减少在磁盘中查找键值的次数,从而降低开销。

Redisson是一款超快速轻量级Redis Java客户端,提供了许多常用的Java对象和功能,包括布隆过滤器。

下面的示例代码演示了如何用Redisson的RBloomFilter接口使用布隆过滤器:

RBloomFilter<SomeObject> bloomFilter = redisson.getBloomFilter("sample");
// 初始化布隆过滤器
// expectedInsertions = 55000000
// falseProbability = 0.03
bloomFilter.tryInit(55000000L, 0.03);

bloomFilter.add(new SomeObject("field1Value", "field2Value"));

bloomFilter.add(new SomeObject("field5Value", "field8Value"));

bloomFilter.contains(new SomeObject("field1Value", "field8Value"));

bloomFilter.count();

布隆过滤器是一种概率数据结构:能确认元素不存在于集合中,但只能提供元素出现在集合中的概率。

  • falseProbability参数定义了使用给定RBloomFilter发生误报的概率。
  • expectedInsertions参数定义了每个元素的预期插入次数。

RBloomFilter对象最多支持\(2^32\) bit。 Redisson还能通过RClusteredBloomFilter接口在Redis中支持分布式布隆过滤器。 RClusteredBloomFilter的内存效率更高,可以缩小所有Redis节点使用的内存。 RClusteredBloomFilter对象最多支持2^64 bit。 请注意,RClusteredBloomFilter只支持Redisson集群模式使用。

以下示例代码演示了如何使用RClusteredBloomFilter接口:

RClusteredBloomFilter<SomeObject> bloomFilter = redisson.getClusteredBloomFilter("sample");
// 初始化布隆过滤器
// expectedInsertions = 255000000
// falseProbability = 0.03
bloomFilter.tryInit(255000000L, 0.03);
bloomFilter.add(new SomeObject("field1Value", "field2Value"));
bloomFilter.add(new SomeObject("field5Value", "field8Value"));
bloomFilter.contains(new SomeObject("field1Value", "field8Value"));

限流算法

固定窗口法

固定窗口法是限流算法里面最简单的,比如我想限制1分钟以内请求为100个, 从现在算起的一分钟内,请求就最多就是100个,这分钟过完的那一刻把计数器归零, 重新计算,周而复始。

这个方法虽然简单,但有个大问题是无法应对两个时间边界内的突发流量。 如果在计数器清零的前1秒以及清零的后1秒都进来了100个请求, 那么在短时间内服务器就接收到了两倍的(200个)请求,这样就有可能压垮系统。

伪代码实现:

def can_pass_fixed_window(user, action, time_zone=60, times=30):
    """
    :param user: 用户唯一标识
    :param action: 用户访问的接口标识(即用户在客户端进行的动作)
    :param time_zone: 接口限制的时间段
    :param time_zone: 限制的时间段内允许多少请求通过
    """
    key = '{}:{}'.format(user, action)
    # redis_conn 表示redis连接对象
    count = redis_conn.get(key)
    if not count:
        count = 1
        redis_conn.setex(key, time_zone, count)
    if count < times:
        redis_conn.incr(key)
        return True

    return False

会导致上面的问题是因为我们的统计精度还不够,为了将临界问题的影响降低, 我们可以使用滑动窗口法。

滑动窗口法

滑动窗口法,简单来说就是随着时间的推移,时间窗口也会持续移动, 有一个计数器不断维护着窗口内的请求数量,这样就可以保证任意时间段内, 都不会超过最大允许的请求数。例如当前时间窗口是0s~60s,请求数是40, 10s后时间窗口就变成了10s~70s,请求数是60。

时间窗口的滑动和计数器可以使用redis的有序集合(sorted set)来实现。 score的值用毫秒时间戳来表示,可以利用 当前时间戳 - 时间窗口的大小 来计算出窗口的边界,然后根据score的值做一个范围筛选就可以圈出一个窗口; value的值仅作为用户行为的唯一标识,也用毫秒时间戳就好。 最后统计一下窗口内的请求数再做判断即可。

虽然滑动窗口法避免了时间界限的问题, 但是依然无法很好解决细时间粒度上面请求过于集中的问题, 就例如限制了1分钟请求不能超过60次,请求都集中在59s时发送过来, 这样滑动窗口的效果就大打折扣。

伪代码实现:

def can_pass_slide_window(user, action, time_zone=60, times=30):
    """
    :param user: 用户唯一标识
    :param action: 用户访问的接口标识(即用户在客户端进行的动作)
    :param time_zone: 接口限制的时间段
    :param time_zone: 限制的时间段内允许多少请求通过
    """
    key = '{}:{}'.format(user, action)
    now_ts = time.time() * 1000
    # value是什么在这里并不重要,只要保证value的唯一性即可,这里使用毫秒时间戳作为唯一值
    value = now_ts 
    # 时间窗口左边界
    old_ts = now_ts - (time_zone * 1000)
    # 记录行为
    redis_conn.zadd(key, value, now_ts)
    # 删除时间窗口之前的数据
    redis_conn.zremrangebyscore(key, 0, old_ts)
    # 获取窗口内的行为数量
    count = redis_conn.zcard(key)
    # 设置一个过期时间免得占空间
    redis_conn.expire(key, time_zone + 1)
    if not count or count < times:
        return True
    return False

为了使流量更加平滑,我们可以使用更加高级的令牌桶算法和漏桶算法。

令牌桶法

令牌桶算法的思路不复杂,它先以固定的速率生成令牌,把令牌放到固定容量的桶里, 超过桶容量的令牌则丢弃,每来一个请求则获取一次令牌, 规定只有获得令牌的请求才能放行,没有获得令牌的请求则丢弃。

令牌桶法限制的是请求的平均流入速率,优点是能应对一定程度上的突发请求, 也能在一定程度上保持流量的来源特征,实现难度不高,适用于大多数应用场景。

伪代码实现:

# 令牌桶法,具体步骤:
# 请求来了就计算生成的令牌数,生成的速率有限制
# 如果生成的令牌太多,则丢弃令牌
# 有令牌的请求才能通过,否则拒绝
def can_pass_token_bucket(user, action, time_zone=60, times=30):
    """
    :param user: 用户唯一标识
    :param action: 用户访问的接口标识(即用户在客户端进行的动作)
    :param time_zone: 接口限制的时间段
    :param time_zone: 限制的时间段内允许多少请求通过
    """
    # 请求来了就倒水,倒水速率有限制
    key = '{}:{}'.format(user, action)
    rate = times / time_zone # 令牌生成速度
    capacity = times # 桶容量
    tokens = redis_conn.hget(key, 'tokens') # 看桶中有多少令牌
    last_time = redis_conn.hget(key, 'last_time') # 上次令牌生成时间
    now = time.time()
    tokens = int(tokens) if tokens else capacity
    last_time = int(last_time) if last_time else now
    delta_tokens = (now - last_time) * rate # 经过一段时间后生成的令牌
    if delta_tokens > 1:
        tokens = tokens + tokens # 增加令牌
        if tokens > tokens:
            tokens = capacity
        last_time = time.time() # 记录令牌生成时间
        redis_conn.hset(key, 'last_time', last_time)

    if tokens >= 1:
        tokens -= 1 # 请求进来了,令牌就减少1
        redis_conn.hset(key, 'tokens', tokens)
        return True
    return False

漏桶算法

漏桶算法的思路与令牌桶算法有点相反。大家可以将请求想象成是水流, 水流可以任意速率流入漏桶中,同时漏桶以固定的速率将水流出。 如果流入速度太大会导致水满溢出,溢出的请求被丢弃。

通过上图可以看出漏桶法的特点是:不限制请求流入的速率,但是限制了请求流出的速率。

这样突发流量可以被整形成一个稳定的流量,不会发生超频。

关于漏桶算法的实现方式有一点值得注意, 我在浏览相关内容时发现网上大多数对于漏桶算法的伪代码实现的伪代码, 实现只实现了水流入桶的部分,没有实现关键的水从桶中漏出的部分。 如果只实现了前半部分,其实跟令牌桶没有大的区别。

根据维基百科,漏桶算法的实现理论有两种,分别是基于 meter 的和 基于 queue 的, 他们实现的具体思路不同。

基于meter的漏桶

基于 meter 的实现相对来说比较简单,其实它就有一个计数器, 然后有消息要发送的时候,就看计数器够不够,如果计数器没有满的话, 那么这个消息就可以被处理,如果计数器不足以发送消息的话, 那么这个消息将会被丢弃。

那么这个计数器是怎么来的呢,基于 meter 的形式的计数器就是发送的频率, 例如你设置得频率是不超过 5条/s ,那么计数器就是 5, 在一秒内你每发送一条消息就减少一个,当你发第 6 条的时候计时器就不够了, 那么这条消息就被丢弃了。

这种实现有点类似最开始介绍的固定窗口法,只不过时间粒度再小一些,伪代码就不上了。

基于queue的漏桶

基于 queue 的实现起来比较复杂,但是原理却比较简单,它也存在一个计数器, 这个计数器却不表示速率限制,而是表示 queue 的大小, 这里就是当有消息要发送的时候看 queue 中是否还有位置,如果有, 那么就将消息放进 queue 中,这个 queue 以 FIFO 的形式提供服务; 如果 queue 没有位置了,消息将被抛弃。

在消息被放进 queue 之后,还需要维护一个定时器, 这个定时器的周期就是我们设置的频率周期,例如我们设置得频率是 5条/s, 那么定时器的周期就是 200ms,定时器每 200ms 去 queue 里获取一次消息, 如果有消息,那么就发送出去,如果没有就轮空。

这种实现方式比较复杂,限于篇幅这里就没有实现了

熟悉python的朋友可以参考 aiolimiter 的实现

熟悉go的朋友可以参考uber的 ratelimit 的实现

redis-cell

如果觉得上面的都太难,不好实现,那么我墙裂建议你尝试一下redis-cell这个模块!

Redis 4.0 提供了一个限流 Redis 模块,它叫 redis-cell。该模块也使用了漏斗算法, 并提供了原子的限流指令。有了这个模块,限流问题就非常简单了。 这个模块需要单独安装,安装教程网上很多,它只有一个指令CL.THROTTLE,参数:

  1. key:操作的key
  2. max_burst:漏桶的容量
  3. operations:漏水的速度,数量/秒数中的数量部分
  4. seconds:漏水的速率,数量/秒数中的秒数部分
  5. apply:每次请求消耗的水滴

例子:

CL.THROTTLE user123 15 30 60 1
                ▲    ▲  ▲  ▲ ▲
                |    |  |  | └───── apply 1 operation (default if omitted) 每次请求消耗的水滴
                |    |  └──┴─────── 30 operations / 60 seconds 漏水的速率
                |    └───────────── 15 max_burst 漏桶的容量
                └─────────────────── key “user123” 用户行为

执行以上命令之后,redis会返回如下信息:

> cl.throttle laoqian:reply 15 30 60
1) (integer) 0   # 0 表示允许,1表示拒绝
2) (integer) 16  # 漏桶容量
3) (integer) 15  # 漏桶剩余空间left_quota
4) (integer) -1  # 如果拒绝了,需要多长时间后再试(漏桶有空间了,单位秒)
5) (integer) 2   # 多长时间后,漏桶完全空出来(单位秒)

有了上面的redis模块,就可以轻松对付大多数的限流场景了,简直太方便了有木有!