Jade Dungeon

redis的Stream结构实现消息队列

消息队列的特点

  • Producer:消息生产者,负责产生和发送消息到 Broker;
  • Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个 queue;
  • Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理;

List先进先出:LPUSH、RPOP

生产者使用LPUSH将消息插入到队列的头部, 如果key不存在则会创建一个空的队列再插入消息。

LPUSH key element[element...]

消费者使用RPOP key依次读取队列的消息,先进先出:

RPOP key

List实时消费:BLPOP、BRPOP

LPUSH、RPOP 存在一个性能风险,生产者向队列插入数据的时候, List 并不会主动通知消费者及时消费。

我们需要写一个while(true)不停地调用 RPOP 指令,当有新消息就会返回消息,否则返回空。

程序需要不断轮询并判断是否为空再执行消费逻辑, 这就会导致即使没有新消息写入到队列, 消费者也要不停地调用 RPOP 命令占用 CPU 资源。

Redis 提供了 BLPOP、BRPOP 阻塞读取的命令, 消费者在在读取队列没有数据的时候自动阻塞,直到有新的消息写入队列, 才会继续读取新消息执行业务逻辑。

BRPOP myQueue 0

参数0表示阻塞等待时间无无限制

List重复消费

  • 消息队列为每一条消息生成一个「全局 ID」;
  • 生产者为每一条消息创建一条「全局 ID」,消费者把一件处理过的消息 ID 记录下来判断是否重复。

其实这就是幂等,对于同一条消息,消费者收到后处理一次的结果和多次的结果是一致的。

List消息可靠性

本质就是消费者在处理消息的时候崩溃了,就无法再还原消息,缺乏一个消息确认机制。

Redis 提供了RPOPLPUSH、BRPOPLPUSH(阻塞)两个指令, 含义是从 List 从读取消息的同时把这条消息复制到另一个 List 中(备份), 并且是原子操作。

我们就可以在业务流程正确处理完成后再删除队列消息实现消息确认机制。 如果在处理消息的时候宕机了,重启后再从备份 List 中读取消息处理。

LPUSH queue01 msg01 msg02
BRPOPdisMQ queue02

生产者用LPUSH把消息插入到queue01队列中。消费者使用BRPOPLPUSH读取消息msg01, 同时该消息会被插入到queue02中。

如果消费成功则把queue02的消息删除即可,异常的话可以继续从queue02再次读取消息处理。

消息堆积

需要注意的是,如果生产者消息发送的很快,而消费者处理速度慢就会导致消息堆积, 给 Redis 的内存带来过大压力。

Java例子

添加依赖

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson-spring-boot-starter</artifactId>
  <version>3.16.7</version>
</dependency>

添加 Redis 配置:

spring:
  application:
    name: redission
  redis:
    host: 127.0.0.1
    port: 6379
    ssl: false

RBlockingDeque继承java.util.concurrent.BlockingDeque, 在使用过程中我们完全可以根据接口文档来选择合适的 API 去实现业务逻辑:

主要方法如下 First Element (Head)

  Throws Exceptions Special Value Blocks Times out
Insert addFirst(e) offerFirst(e) putFirst(e) offerFirst(e, time, unit)
Remove removeFirst() pollFirst() takeFirst() pollFirst(time, unit)
examine getFirst() peekFirst()    

Last Element(Tail)

  Throws Exceptions Special Value Blocks Times out
Insert addLast(e) offerLast(e) putLast(e) offerLast(e, time, unit)
Remove removeLast() pollLast() takeLast() pollLast(time, unit)
examine getLast() peekLast()    

用双端队列来举例:

@Slf4j
@Service
public class QueueService {

	@Autowired
	private RedissonClient redissonClient;

	private static final String REDIS_MQ = "redisMQ";

	/**
	 * 发送消息到队列头部
	 *
	 * @param message
	 */
	public void sendMessage(String message) {
		RBlockingDeque<String> blockingDeque = 
			redissonClient.getBlockingDeque(REDIS_MQ);
		try {
			blockingDeque.putFirst(message);
			log.info("将消息: {} 插入到队列。", message);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	/**
	 * 从队列尾部阻塞读取消息,若没有消息,
	 * 线程就会阻塞等待新消息插入,防止 CPU 空转
	 */
	public void onMessage() {
		RBlockingDeque<String> blockingDeque = 
			redissonClient.getBlockingDeque(REDIS_MQ);
		while (true) {
			try {
				String message = blockingDeque.takeLast();
				log.info("从队列 {} 中读取到消息:{}.", REDIS_MQ, message);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

单元测试;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RedissionApplication.class)
public class RedissionApplicationTests {

	@Autowired
	private QueueService queueService;

	@Test
	public void testQueue() throws InterruptedException {
		new Thread(() -> {
				for (int i = 0; i < 1000; i++) {
					queueService.sendMessage("消息" + i);
				}
		}).start();

		new Thread(() -> queueService.onMessage()).start();

		Thread.currentThread().join();
	}

}

Stream结构的优点

上回说到使用 Redis 的 List 实现消息队列有很多局限性,比如:

  • 没有良好的 ACK 机制;
  • 没有 ConsumerGroup 消费组概念;
  • 消息堆积。
  • List 是线性结构,想要查询指定数据需要遍历整个列表;

Stream 是 Redis 5.0 引入的一种专门为消息队列设计的数据类型, Stream 是一个包含 0 个或者多个元素的有序队列,这些元素根据 ID 的大小进行有序排列。

它实现了大部分消息队列的功能:

  • 消息 ID 系列化生成;
  • 消息遍历;
  • 消息的阻塞和非阻塞读;
  • Consumer Groups 消费组;
  • ACK 确认机制。
  • 支持多播。

提供了很多消息队列操作命令,并且借鉴 Kafka 的 Consumer Groups 的概念,提供了消费组功能。

同时提供了消息的持久化和主从复制机制,客户端可以访问任何时刻的数据, 并且能记住每一个客户端的访问位置,从而保证消息不丢失。

废话少说,先来看下如何使用,官网文档详见:https://redis.io/topics/streams-intro

XADD:插入消息

格式:

XADD <stream-name> <id> <key1> <value1> <key2> <value2> <key3> <value3> ...
  • stream-name指定队列的名称
  • id指定消息的ID,可以用*表示让Stream自动生成一个唯一的id
  • 队列中的一个消息是由一个或多个key-value组成的映射

消息 ID 由两部分组成:

  • 当前毫秒内的时间戳;
  • 顺序编号。从 0 为起始值,用于区分同一时间内产生的多个命令。

通过将元素ID与时间进行关联,并强制要求新元素的ID必须大于旧元素的ID, Redis从逻辑上将流变成了一种只执行追加操作(append only)的数据结构。

用户可以确信,新的消息和事件只会出现在已有消息和事件之后, 就像现实世界里新事件总是发生在已有事件之后一样,一切都是有序进行的。

例子,返回值为自动生成的ID:

XADD myjobqueue01 * city shanghai cost 300  -- Result: 1646725804475-0
XADD myjobqueue01 * city chongqin cost 320  -- Result: 1646725810465-0
XADD myjobqueue01 * city shenzhen cost 500  -- Result: 1646725818677-0

XREAD:读取消息

XREAD [COUNT 读取数量] [BLOCK 阻塞毫秒数] STREAMS 流key [流key ...] ID [消息ID ...]

该指令可以同时对多个流进行读取,每个心法对应含义如下:

  • COUNT:表示每个流中最多读取的元素个数;
  • BLOCK:阻塞读取,当消息队列没有消息的时候,则阻塞等待, 0表示无限等待,单位是毫秒。
  • ID:消息 ID,在读取消息的时候可以指定 ID,并从这个 ID 的下一条消息开始读取, 0-0则表示从第一个元素开始读取。

如果想使用 XREAD 进行顺序消费,每次读取后要记住返回的消息ID, 下次调用 XREAD 就将上一次返回的消息ID作为参数传递到下一次调用就可以继续消费后续的消息了。

例:

XREAD COUNT 1 BLOCK 0 STREAMS myjobqueue01 0-0
                                           -- Result:
                                           -- myjobqueue01
                                           -- 1646725804475-0
                                           -- city
                                           -- shanghai
                                           -- cost
                                           -- 300

XREAD COUNT 1 BLOCK 0 STREAMS myjobqueue01 1646725804475-0
                                           -- Result:
                                           -- myjobqueue01
                                           -- 1646725810465-0
                                           -- city
                                           -- chongqin
                                           -- cost
                                           -- 320

XREAD COUNT 1 BLOCK 0 STREAMS myjobqueue01 1646725810465-0
                                           -- Result:
                                           -- myjobqueue01
                                           -- 1646725818677-0
                                           -- city
                                           -- shenzhen
                                           -- cost
                                           -- 500

读取最新消息$

历史的消息就不接了,只想接收我使用 XREAD 阻塞等待的那一刻开始通过 XADD 发布的消息, 要使用$。等待过程中,队列有新追加消息,则会立即读取到。

XREAD COUNT 1 BLOCK 0 STREAMS myjobqueue01 $

通过 XREAD 读取的数据其实并没有被删除,当重新执行以下指令的时候又会重新读取到:

XREAD COUNT 2 BLOCK 0 STREAMS myjobqueue01 0-0
                                                -- Result:
                                                -- myjobqueue01
                                                -- 1646725804475-0
                                                -- city
                                                -- shanghai
                                                -- cost
                                                -- 300
                                                -- 
                                                -- 1646725810465-0
                                                -- city
                                                -- chongqin
                                                -- cost
                                                -- 320

所以我们还需要 ACK 机制,来实现一个真正的消息队列。

ConsumerGroup

Redis Stream 的 ConsumerGroup(消费者组)是一个强大的支持多播的可持久化的消息队列。 它允许用户将一个流从逻辑上划分为多个不同的流,并让 ConsumerGroup 的消费者去处理。

Stream 的高可用是建立主从复制基础上的,它和其它数据结构的复制机制没有区别, 也就是说在 Sentinel 和 Cluster 集群环境下 Stream 是可以支持高可用的。

图例

Redis Stream 的结构如上图所示。有一个消息链表,每个消息都有一个唯一的 ID 和对应的内容;

  • 消息持久化;
  • 每个消费组的状态是独立的,不不影响,同一份的 Stream 消息会被所有的消费组消费;
  • 一个消费组可以有多个消费者组成,消费者之间是竞争关系, 任意一个消费者读取了消息都会使last_deliverd_id往前移动;
  • 每个消费者有一个pending_ids变量,用于记录当前消费者读取了但是还没 ack 的消息。 它用来保证消息至少被客户端消费了一次。

消费组实现的消息队列主要涉及以下三个指令:

  • XGROUP用于创建、销毁和管理消费者组。
  • XREADGROUP用于通过消费者组从流中读取。
  • XACK是允许消费者将待处理消息标记为已正确处理的命令。

创建消费组

Stream 通过XGROUP CREATE指令创建消费组(Consumer Group), 需要传递起始消息 ID 参数用来初始化last_delivered_id变量。

我们使用XADDbossStream队列插入一些消息:

XADD bossStream * name zhangsan age 26
XADD bossStream * name lisi age 2
XADD bossStream * name bigold age 40

如下指令,为消息队列名为 bossStream 创建dep-hrdep-km两个消费组。

语法如下

XGROUP CREATE stream group start_id
  • stream:指定队列的名字;
  • group:指定消费组名字;
  • start_id:指定消费组在 Stream 中的起始 ID, 它决定了消费者组从哪个 ID 之后开始读取消息,0-0从第一条开始读取, $表示从最后一条向后开始读取,只接收新消息。
  • MKSTREAM:默认情况下,XGROUP CREATE命令在目标流不存在时返回错误。 可以使用可选MKSTREAM子命令作为 之后的最后一个参数来自动创建流。

例:

XGROUP CREATE bossStream dep-hr 0-0 MKSTREAM
XGROUP CREATE bossStream dep-km 0-0 MKSTREAM

读取消息

语法如下:

XREADGROUP GROUP groupName consumerName [COUNT n] [BLOCK ms]  \
	STREAMS streamName [stream ...] id [id ...]

该命令与XREAD大同小异,区别在于新增GROUP groupName consumerName选项。

该选项的两个参数分别用于指定被读取的消费者组以及负责处理消息的消费者。

其中:

  • >:命令的最后参数>,表示从尚未被消费的消息开始读取;
  • BLOCK:阻塞读取;

如果消息队列中的消息被消费组的一个消费者消费了, 这条消息就不会再被这个消费组的其他消费者读取到。例:

dep-hr消费组的 consumer1 从bossStream 阻塞读取一条消息:

XREADGROUP GROUP dep-hr consumer1 COUNT 1 BLOCK 0 STREAMS bossStream >
                                                    -- Result:
                                                    -- bossStream
                                                    -- 1646728882372-0
                                                    -- name
                                                    -- zhangsan
                                                    -- age
                                                    -- 26

consumer2再操作:

XREADGROUP GROUP dep-hr consumer2 COUNT 1 BLOCK 0 STREAMS bossStream >
                                                    -- Result:
                                                    -- bossStream
                                                    -- 1646728887677-0
                                                    -- name
                                                    -- lisi
                                                    -- age
                                                    -- 2

consumer2不能再读取到zhangsan了, 而是读取下一条lisi因为这条消息已经被consumer1读取了。

图例

使用消费者的另一个目的可以让组内的多个消费者分担读取消息, 也就是每个消费者读取部分消息,从而实现均衡负载。

查看已读未确认消息:XPENDING

为了保证消费者在消费的时候发生故障或者宕机重启后依然可以读取消息, Stream 内部有一个队列(pending List)保存每个消费者读取但是还没有执行 ACK 的消息。

如果消费者使用了XREADGROUP GROUP groupName consumerName读取消息, 但是没有给 Stream 发送XACK命令,消息依然保留。

比如查看bossStream中的 消费组dep-hr中各个消费者已读取未确认的消息信息:

XPENDING bossStream dep-hr
                               -- Time: 2022-03-08 16:50:54:722
                               -- Duration(ms): 1
                               -- Result:
                               -- 2                     -- 未确认消息条数
                               -- 1646728882372-0       -- dep-hr组中读取的最大ID
                               -- 1646728887677-0       -- dep-hr组中读取的最小ID
                               -- consumer1
                               -- 1
                               -- 
                               -- consumer2
                               -- 1

查看consumer1读取了哪些数据,使用以下命令:

XPENDING bossStream dep-hr - + 10 consumer1
                              -- 1646728882372-0
                              -- consumer1
                              -- 612898
                              -- 1

ACK 确认

所以当接收到消息并且消费成功以后,我们需要手动 ACK 通知 Streams, 这条消息就会被删除了。语法如下:

XACK key group-key ID [ID ...]

例:

XACK bossStream dep-hr 1646728882372-0 1646728887677-0

消费确认增加了消息的可靠性,一般在业务处理完成之后, 需要执行 ack 确认消息已经被消费完成。

Java例子

<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson-spring-boot-starter</artifactId>
  <version>3.16.7</version>
</dependency>
spring:
  application:
    name: redission
  redis:
    host: 127.0.0.1
    port: 6379
    ssl: false
	@Slf4j
@Service
public class QueueService {
	@Autowired
	private RedissonClient redissonClient;

	/* 发送消息到队列 */
	public void sendMessage(String message) {
		RStream<String, String> stream = 
			redissonClient.getStream("sensor#4921");
		stream.add("speed", "19");
		stream.add("velocity", "39%");
		stream.add("temperature", "10C");
	}

	/* 消费者消费消息 */
	public void consumerMessage(String message) {
		RStream<String, String> stream = redissonClient.getStream("sensor#4921");
		stream.createGroup("sensors_data", StreamMessageId.ALL);
		Map<StreamMessageId, Map<String, String>> messages = 
			stream.readGroup("sensors_data", "consumer_1");
		for (Map.Entry<StreamMessageId, Map<String, String>> entry : 
				messages.entrySet()) 
		{
			Map<String, String> msg = entry.getValue();
			System.out.println(msg);
			stream.ack("sensors_data", entry.getKey());
		}
	}

}