在分布式系统中,消息队列是一种非常重要的组件。它们帮助我们解耦服务、平滑流量尖峰并确保数据的可靠传递。消息队列在电子商务、金融交易、实时分析和物联网等多个领域都有广泛的应用。Redis Streams 是 Redis 5.0 版本引入的新数据类型,非常适合用于消息队列。本文将介绍如何在 Golang 中使用 Redis Streams 实现消息队列功能,并讨论相关的适用性和对比分析。

我为什么使用 Redis 作为消息队列

  1. 节省成本,目前是在服务器上自建有 redis,可以直接复用,无需在购买或搭建其他消息队列中间件
  2. 使用到消息队列的业务不重要
  3. 业务体量很小,场景也很简单

使用 Redis 作为消息队列的适用性

首先,把 Redis 当作消息队列来使用时,会存在以下问题:

  1. Redis 本身可能会丢数据
  2. 面对消息积压,Redis 内存资源紧张

如果业务场景足够简单,对于数据丢失不敏感,而且消息积压概率比较小的情况下,是完全可以使用 Redis 作为消息队列来使用的。

如果业务对于数据丢失非常敏感,而且写入量非常大,消息积压时会占用很多的机器资源,那么还是建议使用专业的消息队列中间件,如 Kafka、RabbitMQ 等。

使用 Redis 作为消息队列有以下几种方式:

  1. 列表 (List)

    • 方法:使用 LPUSHRPOP/BRPOP 实现生产和消费。
    • 优点:实现简单,性能较好。
    • 缺点:不支持消息确认、持久化和消费组。
  2. 发布/订阅 (Pub/Sub)

    • 方法:使用 PUBLISHSUBSCRIBE 实现实时消息分发。
    • 优点:实时性高,适用于广播消息。
    • 缺点:消息不能持久化,消费者未连接时消息会丢失。
  3. Stream

    • 方法:使用 XADD, XREAD, XGROUP 等命令实现。
    • 优点:支持持久化、消费组、消息确认、历史消息查询。
    • 缺点:相对复杂,需要管理消费组和消息确认。

Redis Streams 命令的使用方法

Redis Streams 是 Redis 5.0.0 版本新增的一种日志数据结构,它记录了一个有序的、不可变的、可扩展的时间序列。每个条目包含一个唯一的 ID 和一个与之关联的字段值对。

Redis Streams 提供了一系列强大的命令,用于管理和操作流数据,这些命令让 Redis Streams 能够有效地管理和处理消息数据,支持复杂的消息队列需求。

创建和添加数据到 Stream

使用 XADD 命令向 Stream 中添加数据:

XADD mystream * field1 value1 field2 value2
  • mystream 是 Stream 的名字。
  • * 表示自动生成条目 ID。
  • field1 value1 field2 value2 是字段和值对。

读取数据

XRANGE 命令

读取一定范围的条目:

XRANGE mystream - +
  • - 表示从最早的条目开始。
  • + 表示读取到最新的条目。

XREVRANGE 命令

按逆序读取范围内的条目:

XREVRANGE mystream + -

组和消费者

创建消费者组

使用 XGROUP 创建一个消费者组:

XGROUP CREATE mystream mygroup $ MKSTREAM
  • mystream 是 Stream 的名字。
  • mygroup 是消费者组的名字。
  • $ 表示从最后一个条目开始消费。

消费者读取数据

使用 XREADGROUP 从消费者组中读取数据:

XREADGROUP GROUP mygroup consumer COUNT 1 STREAMS mystream >
  • mygroup 是消费者组的名字。
  • consumer 是消费者的名字。
  • COUNT 1 表示读取 1 条消息。
  • mystream 是 Stream 的名字。
  • > 表示从未处理的消息中读取。

确认和处理消息

确认消息

消费者处理完消息后,需要确认:

XACK mystream mygroup message-id
  • message-id 是消息的 ID。

检查挂起的消息

使用 XPENDING 命令检查未处理的消息:

XPENDING mystream mygroup

Stream 的高级用法

使用 XTRIM 修剪 Stream

XTRIM mystream MAXLEN 1000

保持 Stream 中最多 1000 个条目。

与专业消息队列的比较

Redis Streams 通过如下特性满足了作为一个消息队列的基本需求:

  • 持久化:消息存储在磁盘上,可以恢复。
  • 消费组:支持多个消费者共同处理一个 Stream。
  • 消息确认:确保消息被成功处理,防止丢失。
  • 阻塞读取:消费者可以阻塞等待新消息的到来。

与专业的消息队列(如 Kafka, RabbitMQ)相比,Redis Streams 有以下优缺点:

  • 优点

    • 简单易用,与现有 Redis 集成良好。
    • 性能高,适合低延迟场景。
    • 功能丰富,支持消费组和消息确认。
  • 缺点

    • 不如 Kafka 在高吞吐、大规模分布式场景下表现好。
    • 缺乏一些高级功能,如复杂的路由、消息重试策略等。
    • 持久化机制相对简单,不适合非常大的消息量。

Golang 中使用 Redis Streams

在开始之前,请确保你已经安装了 Redis 和 Golang,并且安装了 Redis 的 Golang 客户端库 github.com/go-redis/redis/v8

go get github.com/go-redis/redis/v8

1. 创建消息队列生产者

生产者负责将消息推送到 Redis Streams 中。以下是一个简单的生产者实现:

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
    "log"
)

var ctx = context.Background()

func main() {
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })

    for i := 0; i < 10; i++ {
        err := rdb.XAdd(ctx, &redis.XAddArgs{
            Stream: "mystream",
            Values: map[string]interface{}{
                "message": fmt.Sprintf("Hello %d", i),
            },
        }).Err()
        if err != nil {
            log.Fatalf("Could not add message to stream: %v", err)
        }
    }

    fmt.Println("Messages added to stream")
}

2. 创建消息队列消费者

消费者负责从 Redis Streams 中读取消息并进行处理。以下是一个简单的消费者实现:

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
    "log"
    "time"
)

var ctx = context.Background()

func main() {
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })

    for {
        streams, err := rdb.XRead(ctx, &redis.XReadArgs{
            Streams: []string{"mystream", "$"},
            Count:   1,
            Block:   0,
        }).Result()
        if err != nil {
            log.Fatalf("Could not read from stream: %v", err)
        }

        for _, stream := range streams {
            for _, message := range stream.Messages {
                fmt.Printf("Received message: %s\n", message.Values["message"])
                // 在这里处理消息
                time.Sleep(1 * time.Second)  // 模拟处理时间
            }
        }
    }
}

3. 处理消息确认

在实际使用中,消息处理完后我们通常需要确认消息已被处理,以防止消息重复消费。以下是一个带有消息确认的消费者示例:

package main

import (
    "context"
    "fmt"
    "github.com/go-redis/redis/v8"
    "log"
)

var ctx = context.Background()

func main() {
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })

    groupName := "mygroup"
    consumerName := "consumer1"

    // 创建消费组
    err := rdb.XGroupCreateMkStream(ctx, "mystream", groupName, "0").Err()
    if err != nil && err != redis.ErrGroupExists {
        log.Fatalf("Could not create group: %v", err)
    }

    for {
        streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
            Group:    groupName,
            Consumer: consumerName,
            Streams:  []string{"mystream", ">"},
            Count:    1,
            Block:    0,
        }).Result()
        if err != nil {
            log.Fatalf("Could not read from stream: %v", err)
        }

        for _, stream := range streams {
            for _, message := range stream.Messages {
                fmt.Printf("Received message: %s\n", message.Values["message"])
                // 在这里处理消息
                err = rdb.XAck(ctx, "mystream", groupName, message.ID).Err()
                if err != nil {
                    log.Fatalf("Could not acknowledge message: %v", err)
                }
            }
        }
    }
}

相关阅读推荐:Golang 操作 Redis:Stream 操作用法 - go-redis 使用指南

总结

通过上述步骤,我们实现了一个简单的消息队列系统。生产者将消息推送到 Redis Streams 中,消费者从 Streams 中读取并处理消息。Redis Streams 提供了强大的消息队列功能,并且与 Golang 的结合使得实现这一功能变得非常简单和高效。希望这篇文章能帮助你更好地理解和使用 Redis Streams 来构建你的消息队列系统。


希望这篇文章对你有所帮助。如果你有任何问题或建议,欢迎在评论区留言讨论!


也可以看看