在上一篇文章中,我们解释了 Redis 中 Cuckoo Filter 操作用法。在这篇文章中,我们将深入了解 Redis 的 Stream 数据类型及其操作方法。Stream 是 Redis 在 5.0 版本引入的一种强大的数据结构,用于实现消息队列、事件日志等实时数据流处理场景。在这篇文章中,我们将学习如何使用 go-redis 操作 Redis Stream,并通过一个完整的生产者和消费者的示例来演示各个方法的具体用法。

👉 点击查看《go-redis使用指南》系列文章目录

在《go-redis使用指南》系列文章中,我们将详细介绍如何在 Golang 项目中使用 redis/go-redis 库与 Redis 进行交互。以下是该系列文章的全部内容:

  1. Golang 操作 Redis:快速上手 - go-redis 使用指南
  2. Golang 操作 Redis:连接设置与参数详解 - go-redis 使用指南
  3. Golang 操作 Redis:基础的字符串键值操作 - go-redis 使用指南
  4. Golang 操作 Redis:如何设置 key 的过期时间 - go-redis 使用指南
  5. Golang 操作 Redis:Hash 哈希数据类型操作用法 - go-redis 使用指南
  6. Golang 操作 Redis:Set 集合数据类型操作用法 - go-redis 使用指南
  7. Golang 操作 Redis:为 Hash 中的字段设置过期时间 - go-redis 使用指南
  8. Golang 操作 Redis:List 列表数据类型操作用法 - go-redis 使用指南
  9. Golang 操作 Redis:SortedSet 有序集合数据类型操作用法 - go-redis 使用指南
  10. Golang 操作 Redis:bitmap 数据类型操作用法 - go-redis 使用指南
  11. Golang 操作 Redis:事务处理操作用法 - go-redis 使用指南
  12. Golang 操作 Redis:地理空间数据类型操作用法 - go-redis 使用指南
  13. Golang 操作 Redis:HyperLogLog 操作用法 - go-redis 使用指南
  14. Golang 操作 Redis:Pipeline 操作用法 - go-redis 使用指南
  15. Golang 操作 Redis:PubSub发布订阅用法 - go-redis 使用指南
  16. Golang 操作 Redis:布隆过滤器(Bloom Filter)操作用法 - go-redis 使用指南
  17. Golang 操作 Redis:Cuckoo Filter操作用法 - go-redis 使用指南
  18. Golang 操作 Redis:Stream操作用法 - go-redis 使用指南
golang redis go-redis

Redis Stream 简介

Redis Stream 是一种日志数据结构,类似于 Kafka 中的 Topic,可以用来存储和管理时间序列数据。它支持消息的持久化、消费组、消息确认等特性,因此非常适用于实现消息队列、事件处理和日志聚合等场景。

常见的使用场景包括:

  • 消息队列:Stream 允许多个生产者和消费者进行消息的读写,并支持消费组功能,方便实现多消费者消费不同消息。
  • 事件日志:Stream 可以记录发生的事件,并且支持按照时间顺序查询和消费这些事件。
  • 实时数据处理:Stream 适合实时处理数据流,如物联网数据、用户行为日志等。

相关阅读推荐:Redis Streams 实践指南:在 Golang 中使用 Redis 作为消息队列

go-redis 中 Stream 操作的方法

在 go-redis 中,Stream 操作提供了丰富的接口来处理消息的生产、消费以及管理。以下是这些方法的名称及其功能描述:

  • XAdd - 向 Stream 添加一条消息。
  • XDel - 删除 Stream 中指定 ID 的消息。
  • XLen - 获取 Stream 中消息的数量。
  • XRange - 按消息 ID 范围获取 Stream 中的消息。
  • XRangeN - 按消息 ID 范围获取 Stream 中的指定数量的消息。
  • XRevRange - 按消息 ID 范围逆序获取 Stream 中的消息。
  • XRevRangeN - 按消息 ID 范围逆序获取 Stream 中的指定数量的消息。
  • XRead - 从多个 Stream 中读取消息。
  • XReadStreams - 从多个指定的 Stream 中读取消息。
  • XGroupCreate - 创建消费组。
  • XGroupCreateMkStream - 创建消费组并在 Stream 不存在时自动创建。
  • XGroupSetID - 设置消费组的 ID。
  • XGroupDestroy - 删除消费组。
  • XGroupCreateConsumer - 为消费组创建消费者。
  • XGroupDelConsumer - 删除消费组中的消费者。
  • XReadGroup - 从消费组中读取消息。
  • XAck - 确认消费组中的消息。
  • XPending - 获取消费组中待处理消息的概要信息。
  • XPendingExt - 获取消费组中待处理消息的详细信息。
  • XClaim - 将待处理消息分配给另一个消费者。
  • XClaimJustID - 只返回待处理消息的 ID,不返回消息内容。
  • XAutoClaim - 自动分配超时的待处理消息给当前消费者,并返回消息内容。
  • XAutoClaimJustID - 自动分配超时的待处理消息给当前消费者,并只返回消息 ID。
  • XTrimMaxLen - 按最大长度修剪 Stream。
  • XTrimMaxLenApprox - 按近似最大长度修剪 Stream。
  • XTrimMinID - 按最小 ID 修剪 Stream。
  • XTrimMinIDApprox - 按近似最小 ID 修剪 Stream。
  • XInfoGroups - 获取 Stream 中所有消费组的信息。
  • XInfoStream - 获取 Stream 的基本信息。
  • XInfoStreamFull - 获取 Stream 的详细信息。
  • XInfoConsumers - 获取消费组中的消费者信息。

go-redis Stream 操作方法详细讲解及示例代码

接下来,我们将通过一个完整的生产者和消费者的示例来演示这些方法的具体用法。示例代码展示了如何在 Redis 中实现一个简单的消息队列,包含消息的生产、消费和管理。

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/redis/go-redis/v9"
)

func main() {
	ctx := context.Background()

	// 创建Redis客户端
	rdb := redis.NewClient(&redis.Options{
		Addr: "localhost:6379",
	})

	stream := "mystream"
	group := "mygroup"
	consumer := "consumer1"

	// XGroupCreateMkStream - 创建一个消费者组,并在必要时创建流
	err := rdb.XGroupCreateMkStream(ctx, stream, group, "0").Err()
	if err != nil && err != redis.Nil {
		log.Fatalf("XGroupCreateMkStream error: %v", err)
	}

	// XAdd - 向流中添加多条消息
	for i := 1; i <= 5; i++ {
		msgID, err := rdb.XAdd(ctx, &redis.XAddArgs{
			Stream: stream,
			Values: map[string]interface{}{
				"field1": fmt.Sprintf("value%d", i),
				"field2": fmt.Sprintf("more_value%d", i),
			},
		}).Result()
		if err != nil {
			log.Fatalf("XAdd error: %v", err)
		}
		fmt.Printf("Message ID added: %s\n", msgID)
	}

	// XReadGroup - 读取消息
	msgs, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
		Group:    group,
		Consumer: consumer,
		Streams:  []string{stream, ">"},
		Count:    2,
		Block:    0,
	}).Result()
	if err != nil {
		log.Fatalf("XReadGroup error: %v", err)
	}
	fmt.Printf("Messages read: %+v\n", msgs)

	// XAcknowledge - 确认消息已被处理
	if len(msgs) > 0 {
		var msgIDs []string
		for _, msg := range msgs[0].Messages {
			msgIDs = append(msgIDs, msg.ID)
		}
		ackCount, err := rdb.XAck(ctx, stream, group, msgIDs...).Result()
		if err != nil {
			log.Fatalf("XAcknowledge error: %v", err)
		}
		fmt.Printf("Messages acknowledged: %d\n", ackCount)
	}

	// XPendingExt - 查看消费者组的未决消息的详细信息
	pendingExt, err := rdb.XPendingExt(ctx, &redis.XPendingExtArgs{
		Stream: stream,
		Group:  group,
		Start:  "-",
		End:    "+",
		Count:  10,
	}).Result()
	if err != nil {
		log.Fatalf("XPendingExt error: %v", err)
	}
	fmt.Printf("Pending messages detail: %+v\n", pendingExt)

	// XClaim - 获取未确认的消息
	if len(pendingExt) > 0 {
		var pendingIDs []string
		for _, p := range pendingExt {
			pendingIDs = append(pendingIDs, p.ID)
		}
		claimedMsgs, err := rdb.XClaim(ctx, &redis.XClaimArgs{
			Stream:   stream,
			Group:    group,
			Consumer: consumer,
			MinIdle:  time.Second * 30,
			Messages: pendingIDs,
		}).Result()
		if err != nil {
			log.Fatalf("XClaim error: %v", err)
		}
		fmt.Printf("Claimed messages: %+v\n", claimedMsgs)
	}

	// XDel - 删除一条消息
	delCount, err := rdb.XDel(ctx, stream, msgs[0].Messages[0].ID).Result()
	if err != nil {
		log.Fatalf("XDel error: %v", err)
	}
	fmt.Printf("Messages deleted: %d\n", delCount)

	// XLen - 获取流中的消息数量
	length, err := rdb.XLen(ctx, stream).Result()
	if err != nil {
		log.Fatalf("XLen error: %v", err)
	}
	fmt.Printf("Stream length: %d\n", length)

	// XRange - 获取流中的消息
	messages, err := rdb.XRange(ctx, stream, "-", "+").Result()
	if err != nil {
		log.Fatalf("XRange error: %v", err)
	}
	fmt.Printf("Messages in stream: %+v\n", messages)

	// XRangeN - 获取指定数量的消息
	limitedMessages, err := rdb.XRangeN(ctx, stream, "-", "+", 3).Result()
	if err != nil {
		log.Fatalf("XRangeN error: %v", err)
	}
	fmt.Printf("Limited messages in stream: %+v\n", limitedMessages)

	// XRevRange - 反向获取流中的消息
	revMessages, err := rdb.XRevRange(ctx, stream, "+", "-").Result()
	if err != nil {
		log.Fatalf("XRevRange error: %v", err)
	}
	fmt.Printf("Reversed messages in stream: %+v\n", revMessages)

	// XRevRangeN - 反向获取指定数量的消息
	revLimitedMessages, err := rdb.XRevRangeN(ctx, stream, "+", "-", 2).Result()
	if err != nil {
		log.Fatalf("XRevRangeN error: %v", err)
	}
	fmt.Printf("Reversed limited messages in stream: %+v\n", revLimitedMessages)

	// XInfoStream - 获取流的信息
	info, err := rdb.XInfoStream(ctx, stream).Result()
	if err != nil {
		log.Fatalf("XInfoStream error: %v", err)
	}
	fmt.Printf("Stream info: %+v\n", info)

	// XInfoGroups - 获取流的消费者组信息
	groupsInfo, err := rdb.XInfoGroups(ctx, stream).Result()
	if err != nil {
		log.Fatalf("XInfoGroups error: %v", err)
	}
	fmt.Printf("Stream groups info: %+v\n", groupsInfo)

	// XTrimMaxLen - 修剪流,保留最多maxLen条消息
	trimCount, err := rdb.XTrimMaxLen(ctx, stream, 3).Result()
	if err != nil {
		log.Fatalf("XTrimMaxLen error: %v", err)
	}
	fmt.Printf("Trimmed messages count: %d\n", trimCount)

	// XGroupDestroy - 删除消费者组
	groupDelCount, err := rdb.XGroupDestroy(ctx, stream, group).Result()
	if err != nil {
		log.Fatalf("XGroupDestroy error: %v", err)
	}
	fmt.Printf("Group destroyed count: %d\n", groupDelCount)
}

执行代码输出结果:

Message ID added: 1724849944534-0
Message ID added: 1724849944535-0
Message ID added: 1724849944535-1
Message ID added: 1724849944536-0
Message ID added: 1724849944537-0
Messages read: [{Stream:mystream Messages:[{ID:1724849944534-0 Values:map[field1:value1 field2:more_value1]} {ID:1724849944535-0 Values:map[field1:value2 field2:more_value2]}]}]
Messages acknowledged: 2
Pending messages detail: []
Messages deleted: 1
Stream length: 4
Messages in stream: [{ID:1724849944535-0 Values:map[field1:value2 field2:more_value2]} {ID:1724849944535-1 Values:map[field1:value3 field2:more_value3]} {ID:1724849944536-0 Values:map[field1:value4 field2:more_value4]} {ID:1724849944537-0 Values:map[field1:value5 field2:more_value5]}]
Limited messages in stream: [{ID:1724849944535-0 Values:map[field1:value2 field2:more_value2]} {ID:1724849944535-1 Values:map[field1:value3 field2:more_value3]} {ID:1724849944536-0 Values:map[field1:value4 field2:more_value4]}]
Reversed messages in stream: [{ID:1724849944537-0 Values:map[field1:value5 field2:more_value5]} {ID:1724849944536-0 Values:map[field1:value4 field2:more_value4]} {ID:1724849944535-1 Values:map[field1:value3 field2:more_value3]} {ID:1724849944535-0 Values:map[field1:value2 field2:more_value2]}]
Reversed limited messages in stream: [{ID:1724849944537-0 Values:map[field1:value5 field2:more_value5]} {ID:1724849944536-0 Values:map[field1:value4 field2:more_value4]}]
Stream info: &{Length:4 RadixTreeKeys:1 RadixTreeNodes:2 Groups:1 LastGeneratedID:1724849944537-0 MaxDeletedEntryID:1724849944534-0 EntriesAdded:5 FirstEntry:{ID:1724849944535-0 Values:map[field1:value2 field2:more_value2]} LastEntry:{ID:1724849944537-0 Values:map[field1:value5 field2:more_value5]} RecordedFirstEntryID:1724849944535-0}
Stream groups info: [{Name:mygroup Consumers:1 Pending:0 LastDeliveredID:1724849944535-0 EntriesRead:2 Lag:3}]
Trimmed messages count: 1
Group destroyed count: 1

常见问题

未决消息是什么意思?没有 ack 的消息会如何处理?

未决消息(Pending Messages) 指的是那些已被某个消费者读取但尚未被确认处理(acknowledged)的消息。这些消息已经从流中被读取并分配给了一个消费者组中的某个消费者,但因为还没有收到该消费者的确认,Redis 会将这些消息标记为未决消息。

在 Redis 的流(Stream)中,当使用 XReadGroup 读取消息时,这些消息会被分配给指定的消费者,并标记为“已读取但未确认”。如果消费者成功处理了消息,它应该使用 XAck 命令来确认消息已被处理,这样 Redis 就会从未决消息列表中移除这些消息。

如果消费者在处理消息时失败或断开连接,未决消息将继续存在于未决消息列表中。其他消费者可以使用 XPending 或 XPendingExt 命令查看未决消息的状态,包括哪些消息未决、哪个消费者持有这些消息、消息的等待时间等。然后可以使用 XClaim 命令将这些未决消息重新分配给另一个消费者,确保消息能够被及时处理。

所以,总结一下:

  • 已确认消息(Acknowledged Messages):已被消费者读取并确认处理的消息。
  • 未决消息(Pending Messages):已被消费者读取但尚未确认处理的消息。

通过适当使用这些命令,Redis Stream 可以帮助实现可靠的消息处理机制,避免消息的丢失或重复处理。

XClaimXAck 的作用是什么?

XClaimXAck 是 Redis Stream 中用于处理消费者组内消息的两个关键命令,它们的功能和用途有所不同:

  • XAck 用于确认消息已被成功处理,使消息从未决状态中移除。
  • XClaim 用于将未确认的消息从一个消费者转移到另一个消费者,确保消息不会因为消费者的失效而丢失。

它们在消费者组的消息处理流程中扮演着不同的角色,XAck 主要用于消息的正常确认,XClaim 则用于异常情况的消息重新分配。

XAck - 确认消息已被处理

XAck(Acknowledge)命令用于确认消费者已经成功处理了一条或多条消息。当一个消费者从流中读取消息并完成相应的处理后,需要调用 XAck 来确认这些消息。确认后,这些消息会从未决消息列表中移除,表示它们已经成功处理,不再需要重发。如果不调用 XAck,Redis 会将未确认的消息标记为未决状态,可能会在系统恢复或重新分配时再次投递这些消息。调用 XAck 可以防止消息重复投递给消费者。

  • 使用场景:当消费者处理完消息后,确保这些消息不会被重新处理。
  • 示例
    result, err := rdb.XAck(ctx, "mystream", "mygroup", "message-id-1", "message-id-2").Result()
    if err != nil {
        log.Fatalf("XAck error: %v", err)
    }
    log.Printf("Number of messages acknowledged: %d", result)
    

XClaim - 重新分配未确认消息

XClaim 命令用于将未确认的消息从一个消费者转移到另一个消费者。这通常发生在一个消费者从流中读取消息后未能在预定时间内确认处理(例如消费者进程崩溃或超时),此时可以使用 XClaim 将这些未决消息分配给其他消费者以确保消息不会丢失。

  • 使用场景:在一个消费者组中,如果某个消费者处理消息超时或者意外失败,其他消费者可以使用 XClaim 来接管这些未确认的消息。
  • 示例
    msgs, err := rdb.XClaim(ctx, &redis.XClaimArgs{
        Stream:   "mystream",
        Group:    "mygroup",
        Consumer: "consumer-2",
        MinIdle:  time.Duration(5 * time.Minute),
        Messages: []string{"message-id-1"},
    }).Result()
    if err != nil {
        log.Fatalf("XClaim error: %v", err)
    }
    log.Printf("Claimed messages: %v", msgs)
    

XLen 返回的长度为何是 4,而不是 2?

XLen 返回的是流中的消息总数,和消息是否被读取无关。消息读取后不会自动删除,只有通过 XDel 手动删除或使用自动裁剪功能时才会删除消息。你可以选择在适当的时机删除不再需要的消息,或者使用自动裁剪功能来控制流的大小。

假设你向流中添加了 5 条消息,然后进行如下操作:

  1. 添加消息:向流中添加 5 条消息后,XLen 返回 5,因为流中有 5 条消息。
  2. 读取消息:如果你使用 XReadXReadGroup 读取了 2 条消息,消息仍然保留在流中,只是被标记为已被某个消费者读取,XLen 仍然返回 5。
  3. 删除消息:之后,如果使用 XDel 删除了其中的 1 条消息,XLen 将返回 4,因为流中只剩下 4 条消息了。

消息的删除时机

如果已消费的消息不进行删除,流中会不断积累旧的消息,这可能导致存储空间的浪费。为了管理这种情况,Redis 提供了几种机制:

  1. 手动删除:通常,你需要手动调用 XDel 来删除消息。例如,当消息被处理并确认(XAck)后,你可以选择删除这些消息以释放存储空间。
  2. 自动裁剪:你可以通过设置流的最大长度(如使用 XTrimMaxLenXTrimMinID)来自动删除旧的消息,确保流不会无限增长。

通常在消费者确认消息处理完成后,使用 XAck 命令确认消息。如果你确定这些消息不再需要(例如,所有消费者都已处理这些消息),你可以使用 XDel 删除它们。在某些情况下,使用自动裁剪策略可能更有效,例如设置流的最大长度或最小 ID,以定期清理过期的消息。这样可以避免手动删除每一条消息。

自动裁剪清理流的方式

Redis 提供了几种自动裁剪策略来定期清理流:

  1. XTrimMaxLen:设置流的最大长度,当流中的消息数超过该长度时,自动删除最旧的消息。
    rdb.XTrimMaxLen(ctx, "mystream", 1000) // 保持最多 1000 条消息
    
  2. XTrimMaxLenApprox:类似于 XTrimMaxLen,但允许一定的近似误差,以提高性能。
    rdb.XTrimMaxLenApprox(ctx, "mystream", 1000, 100) // 保持最多 1000 条消息,误差范围 100
    
  3. XTrimMinID:根据消息 ID 删除流中的消息,删除 ID 小于指定值的所有消息。
    rdb.XTrimMinID(ctx, "mystream", "1609459200000-0") // 删除 ID 小于指定值的消息
    
  4. XTrimMinIDApprox:类似于 XTrimMinID,允许一定的近似误差。
    rdb.XTrimMinIDApprox(ctx, "mystream", "1609459200000-0", 100) // 删除 ID 小于指定值的消息,误差范围 100
    

XReadGroupArgsStreams 字段中的 ">" 代表什么意思?

在 Redis Stream 中,XReadGroupArgs 中的 Streams 字段用于指定消费者组读取消息的流和起始点。符号 “>” 和其他符号在这个字段中有特定的含义,用于控制消息的读取行为。

  • ">":表示从流中最新的消息开始读取,不包括已存在的旧消息。
  • "0":表示从流的开头开始读取所有消息。
  • <message-id>:表示从指定的消息 ID 开始读取。

同一个组内多个消费者时的消费策略

在 Redis Stream 中,消费者组(Consumer Group)允许多个消费者并行处理消息。消费策略如下:

  1. 轮询:消息在消费者组内轮询分配给不同的消费者。Redis 保证每条消息只被消费者组中的一个消费者处理一次。
  2. 消息确认:每个消费者处理消息后调用 XAck 确认,确保消息不会重复投递。如果消费者处理失败或超时,消息会被重新分配给其他消费者。
  3. 消息重试:未确认的消息会被标记为未决(Pending),可以使用 XPending 查询这些消息。消费者可以使用 XClaim 强制将这些未处理的消息重新分配给其他消费者。
  4. 消费者组管理:使用 XGroupCreateXGroupDestroy 等命令管理消费者组,确保消息的可靠处理和消费者的动态调整。

如何使用 Redis Stream 实现消费者的广播模式?

在 Redis Streams 中,默认的消费模式是每条消息只能由一个消费者处理,这样可以避免重复处理。然而,如果需要实现新消息的广播模式(即每个消费者都能接收到每条新消息),可以使用使用多个消费者组的方式实现。

Redis Streams 支持在一个流上创建多个消费者组。每个消费者组可以独立处理流中的消息。这种方法可以模拟广播模式,每个消费者组都可以有自己的消费者,所有消费者组都能接收到流中的所有消息。

步骤:

  1. 创建多个消费者组:每个消费者组都将接收到所有流中的消息。
  2. 为每个消费者组分配消费者:每个消费者组中的消费者将接收到该组分配的消息。

示例代码:

package main

import (
	"context"
	"fmt"
	"github.com/redis/go-redis/v9"
	"time"
)

var ctx = context.Background()

func main() {
	rdb := redis.NewClient(&redis.Options{
		Addr: "localhost:6379",
	})

	stream := "mystream"

	// 创建两个消费者组
	rdb.XGroupCreateMkStream(ctx, stream, "group1", "$")
	rdb.XGroupCreateMkStream(ctx, stream, "group2", "$")

	// 向流中添加消息
	for i := 0; i < 5; i++ {
		rdb.XAdd(ctx, &redis.XAddArgs{
			Stream: stream,
			Values: map[string]interface{}{"message": fmt.Sprintf("message %d", i)},
		})
	}

	// 消费者组 1 的消费者
	go consumeGroup(rdb, stream, "group1", "consumer1")

	// 消费者组 2 的消费者
	go consumeGroup(rdb, stream, "group2", "consumer2")

	// 等待一段时间,以便观察结果
	time.Sleep(10 * time.Second)
}

func consumeGroup(rdb *redis.Client, stream, group, consumer string) {
	for {
		// 使用 XReadGroup 从流中读取消息
		messages, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
			Group:    group,
			Consumer: consumer,
			Streams:  []string{stream, ">"},
			Block:    0,
			Count:    10,
		}).Result()
		if err != nil {
			fmt.Println("Error reading from stream:", err)
			return
		}

		// 处理消息
		for _, message := range messages {
			for _, xMessage := range message.Messages {
				fmt.Printf("Consumer %s received: %v\n", consumer, xMessage.Values)
				// 确认消息处理
				rdb.XAck(ctx, stream, group, xMessage.ID)
			}
		}
	}
}

运行代码输出:

Consumer consumer2 received: map[message:message 0]
Consumer consumer2 received: map[message:message 1]
Consumer consumer2 received: map[message:message 2]
Consumer consumer2 received: map[message:message 3]
Consumer consumer2 received: map[message:message 4]
Consumer consumer1 received: map[message:message 0]
Consumer consumer1 received: map[message:message 1]
Consumer consumer1 received: map[message:message 2]
Consumer consumer1 received: map[message:message 3]
Consumer consumer1 received: map[message:message 4]

结语

在这篇文章中,我们深入探讨了 Redis Stream 的基本操作,学习了如何在 Golang 中使用 go-redis 实现一个简单的消息队列。Stream 作为 Redis 的一种强大数据结构,适用于多种实时数据处理场景,希望本文的示例能够帮助你更好地理解和应用它。点击 go-redis 使用指南 可查看更多相关教程!


也可以看看