Golang 操作 Redis:PubSub发布订阅用法 - go-redis 使用指南

文章目录

在现代应用开发中,实时通信是许多系统的核心需求之一。Redis 的 PubSub(发布/订阅)机制为这种需求提供了强大的支持,允许系统组件之间进行高效的消息传递。上一篇文章中,我们深入探讨了 Redis 的 Pipeline 操作,而本篇文章将带您深入了解 Redis 的 PubSub 机制,特别是如何在 Golang 中利用 go-redis 库实现这一功能。

👉 点击查看《go-redis使用指南》系列文章目录

在《go-redis使用指南》系列文章中,我们将详细介绍如何在 Golang 项目中使用 redis/go-redis 库与 Redis 进行交互。以下是该系列文章的全部内容:

  1. Golang 操作 Redis:快速上手 - go-redis 使用指南
  2. Golang 操作 Redis:连接设置与参数详解 - go-redis 使用指南
  3. Golang 操作 Redis:基础的字符串键值操作 - go-redis 使用指南
  4. Golang 操作 Redis:如何设置 key 的过期时间 - go-redis 使用指南
  5. Golang 操作 Redis:Hash 哈希数据类型操作用法 - go-redis 使用指南
  6. Golang 操作 Redis:Set 集合数据类型操作用法 - go-redis 使用指南
  7. Golang 操作 Redis:为 Hash 中的字段设置过期时间 - go-redis 使用指南
  8. Golang 操作 Redis:List 列表数据类型操作用法 - go-redis 使用指南
  9. Golang 操作 Redis:SortedSet 有序集合数据类型操作用法 - go-redis 使用指南
  10. Golang 操作 Redis:bitmap 数据类型操作用法 - go-redis 使用指南
  11. Golang 操作 Redis:事务处理操作用法 - go-redis 使用指南
  12. Golang 操作 Redis:地理空间数据类型操作用法 - go-redis 使用指南
  13. Golang 操作 Redis:HyperLogLog 操作用法 - go-redis 使用指南
  14. Golang 操作 Redis:Pipeline 操作用法 - go-redis 使用指南
  15. Golang 操作 Redis:PubSub发布订阅用法 - go-redis 使用指南
  16. Golang 操作 Redis:布隆过滤器(Bloom Filter)操作用法 - go-redis 使用指南
  17. Golang 操作 Redis:Cuckoo Filter操作用法 - go-redis 使用指南
  18. Golang 操作 Redis:Stream操作用法 - go-redis 使用指南
golang redis go-redis

Redis PubSub 发布订阅简介

Redis PubSub 是一种发布/订阅模式,允许发布者将消息发送到频道,订阅者可以订阅这些频道并接收消息。这种机制适用于构建实时消息系统、通知系统、事件广播等场景。

常见的使用场景:

  • 实时通知系统:在订单系统中,当订单状态更新时,可以通过 PubSub 通知相关服务。
  • 事件广播:分布式系统中使用 PubSub 广播事件通知。
  • 聊天系统:通过 PubSub 实现实时多人聊天。

需要注意的是,PubSub 的一个关键优势在于其实时性高,非常适合广播消息的场景。然而,它的缺点是消息不能持久化,如果消费者未连接或暂时离线,未接收到的消息将会丢失。因此,PubSub 更适用于需要即时消息传递的场景,而不适合那些需要确保消息可靠到达的场合。

go-redis 中 PubSub 发布订阅操作的方法

  • Subscribe - 订阅一个或多个频道,返回一个 PubSub 对象。
  • PSubscribe - 订阅一个或多个模式匹配的频道。
  • Unsubscribe - 取消订阅一个或多个频道。
  • PUnsubscribe - 取消订阅一个或多个模式匹配的频道。
  • Publish - 向一个频道发布消息。
  • ReceiveMessage - 接收订阅频道的消息。
  • Close - 关闭 PubSub 对象,取消所有订阅。
  • PubSubChannels - 查询活跃的频道。
  • PubSubNumSub - 查询指定频道有多少个订阅者。
  • ReceiveTimeout - 在指定时间内接收消息,超时则返回错误。
  • Receive - 接收消息或返回其他类型的信息,如 Subscription、Message、Pong 等。
  • Channel - 返回一个 Go channel,用于并发接收消息。
  • ChannelWithSubscriptions - 返回一个 Go channel,消息类型包括*Subscription*Message,用于检测重新连接。

go-redis PubSub 发布订阅操作方法详细讲解及示例代码

基本方法示例

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/redis/go-redis/v9"
)

func main() {
	ctx := context.Background()
	rdb := redis.NewClient(&redis.Options{
		Addr: "localhost:6379", // Redis 服务器地址
	})

	// 启动订阅者
	go subscriber(ctx, rdb)

	// 启动发布者
	go publisher(ctx, rdb)

	// 等待发布者和订阅者完成
	time.Sleep(10 * time.Second)
}

// subscriber 订阅频道并接收消息
func subscriber(ctx context.Context, rdb *redis.Client) {
	pubsub := rdb.Subscribe(ctx, "mychannel")
	defer pubsub.Close()

	ch := pubsub.Channel()

	fmt.Println("Subscriber is waiting for messages...")
	for msg := range ch {
		fmt.Printf("Received message from channel %s: %s\n", msg.Channel, msg.Payload)
	}
}

// publisher 发布消息到频道
func publisher(ctx context.Context, rdb *redis.Client) {
	messages := []string{"Hello", "World", "Redis", "PubSub", "Example"}

	for _, msg := range messages {
		fmt.Printf("Publishing message: %s\n", msg)
		err := rdb.Publish(ctx, "mychannel", msg).Err()
		if err != nil {
			log.Fatalf("Failed to publish message: %v", err)
		}
		time.Sleep(1 * time.Second) // 等待1秒钟再发布下一个消息
	}
	fmt.Println("All messages published.")
}

输出结果:

Publishing message: Hello
Subscriber is waiting for messages...
Received message from channel mychannel: Hello
Publishing message: World
Received message from channel mychannel: World
Publishing message: Redis
Received message from channel mychannel: Redis
Publishing message: PubSub
Received message from channel mychannel: PubSub
Publishing message: Example
Received message from channel mychannel: Example
All messages published.

go-redis PubSub 接收发布订阅消息的不同方式与区别

在使用 Redis PubSub 时,你可以通过不同的方法来接收消息。这些方法包括 ReceiveReceiveTimeoutReceiveMessage,以及使用 Channel。

  1. Receive:用于接收所有类型的消息(*redis.Message*redis.Subscription*redis.Pong)。
  2. ReceiveMessage:用于接收 *redis.Message 类型的消息,专注于 PubSub 消息。
  3. ReceiveTimeout:类似于 Receive,但支持超时控制,适用于需要等待消息但希望避免长时间阻塞的场景。
  4. Channel:提供了异步和并发接收消息的机制,适合需要处理消息流的场景。

下面是对这些方法及其区别的详细介绍:

1. Receive

  • 功能Receive 是一个通用的方法,用于接收任何类型的消息。它可以返回 *redis.Message(消息类型)、*redis.Subscription(订阅类型)或 *redis.Pong(Pong 类型)。
  • 适用场景:当你需要处理多种类型的 PubSub 消息时使用,特别是当你需要从 PubSub 订阅中接收连接检测响应(*redis.Pong)时。
  • 示例
    msg, err := pubsub.Receive(ctx)
    if err != nil {
        log.Fatalf("Failed to receive message: %v", err)
    }
    switch v := msg.(type) {
    case *redis.Message:
        fmt.Printf("Received message: %s\n", v.Payload)
    case *redis.Subscription:
        fmt.Printf("Received subscription update: %s\n", v.Kind)
    case *redis.Pong:
        fmt.Println("Received Pong")
    }
    

2. ReceiveMessage

  • 功能ReceiveMessage 方法专门用于接收 *redis.Message 类型的消息。如果接收到其他类型的消息(如 *redis.Subscription*redis.Pong),它将返回错误。
  • 适用场景:当你只关心 PubSub 消息,而不需要处理其他类型的消息时使用。
  • 示例
    msg, err := pubsub.ReceiveMessage(ctx)
    if err != nil {
        log.Fatalf("Failed to receive message: %v", err)
    }
    fmt.Printf("Received message from channel %s: %s\n", msg.Channel, msg.Payload)
    

3. ReceiveTimeout

  • 功能ReceiveTimeout 方法类似于 Receive,但它支持超时控制。如果在指定的时间内未收到任何消息,将返回 context.DeadlineExceeded 错误。它可以接收 *redis.Message*redis.Subscription*redis.Pong 类型的消息。
  • 适用场景:当你希望在指定时间内等待消息时使用,适用于高延迟或网络不稳定的环境,能够防止因长时间等待而阻塞程序。
  • 示例
    msg, err := pubsub.ReceiveTimeout(ctx, 2*time.Second)
    if err != nil {
        if err == context.DeadlineExceeded {
            fmt.Println("Receive timeout")
            return
        }
        log.Fatalf("Failed to receive message with timeout: %v", err)
    }
    switch v := msg.(type) {
    case *redis.Message:
        fmt.Printf("Received message: %s\n", v.Payload)
    case *redis.Subscription:
        fmt.Printf("Received subscription update: %s\n", v.Kind)
    case *redis.Pong:
        fmt.Println("Received Pong")
    }
    

4. 使用 Channel

  • 功能:通过 Channel 方法,你可以获取一个 Go channel 用于并发接收 PubSub 消息。该 channel 会持续接收 *redis.Message*redis.Subscription 类型的消息。
  • 适用场景:当你需要并发处理消息,或者希望以异步方式处理消息流时使用。使用 channel 还可以让你更方便地处理消息,并在处理过程中实现更好的消息同步。
  • 示例
    ch := pubsub.Channel()
    for msg := range ch {
        fmt.Printf("Received message from channel %s: %s\n", msg.Channel, msg.Payload)
    }
    

根据你的具体需求,可以选择最合适的方法来处理 PubSub 消息。

go-redis PubSub 发布订阅方法综合示例

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/redis/go-redis/v9"
)

func main() {
	ctx := context.Background()
	rdb := redis.NewClient(&redis.Options{
		Addr: "localhost:6379", // Redis 服务器地址
	})

	// 启动普通订阅者
	go subscriber(ctx, rdb)

	// 启动模式匹配订阅者
	go patternSubscriber(ctx, rdb)

	// 启动使用 ReceiveTimeout 的订阅者
	go timeoutSubscriber(ctx, rdb)

	// 启动发布者
	go publisher(ctx, rdb)

	// 等待所有 goroutine 完成
	time.Sleep(10 * time.Second)
}

// subscriber 订阅频道并接收消息
func subscriber(ctx context.Context, rdb *redis.Client) {
	pubsub := rdb.Subscribe(ctx, "channel1")
	defer pubsub.Close()

	// 使用 Channel 接收消息
	ch := pubsub.Channel()
	fmt.Printf("Subscriber channel1:%v is waiting for messages on channel1...\n", pubsub)
	checkPubSubInfo(ctx, rdb)

	for msg := range ch {
		fmt.Printf("Subscriber channel1:%v received message from channel %s: %s\n", pubsub, msg.Channel, msg.Payload)
	}

	// 取消订阅并关闭 PubSub
	if err := pubsub.Unsubscribe(ctx, "channel1"); err != nil {
		log.Fatalf("Failed to Unsubscribe: %v", err)
	}
}

// patternSubscriber 使用模式匹配订阅频道并接收消息
func patternSubscriber(ctx context.Context, rdb *redis.Client) {
	pubsub := rdb.PSubscribe(ctx, "channel*")
	defer pubsub.Close()

	// 使用 ChannelWithSubscriptions 接收消息
	ch := pubsub.ChannelWithSubscriptions()
	fmt.Println("Pattern subscriber is waiting for messages on channel* ...")
	checkPubSubInfo(ctx, rdb)

	for msg := range ch {
		switch v := msg.(type) {
		case *redis.Message:
			fmt.Printf("Pattern subscriber Received message from channel %s: %s\n", v.Channel, v.Payload)
		case *redis.Subscription:
			fmt.Printf("Pattern subscriber Received Subscription message: %s %s %d\n", v.Kind, v.Channel, v.Count)
		}
	}
	if err := pubsub.PUnsubscribe(ctx, "channel*"); err != nil {
		log.Fatalf("Failed to PUnsubscribe: %v", err)
	}
}

// timeoutSubscriber 使用 ReceiveTimeout 接收消息
func timeoutSubscriber(ctx context.Context, rdb *redis.Client) {
	pubsub := rdb.Subscribe(ctx, "channel1")
	defer pubsub.Close()

	fmt.Println("Timeout subscriber is waiting for messages on channel1 with timeout...")
	checkPubSubInfo(ctx, rdb)

	for {
		msg, err := pubsub.ReceiveTimeout(ctx, 2*time.Second)
		if err != nil {
			if err == context.DeadlineExceeded {
				fmt.Println("Receive timeout.")
				continue
			}
			log.Fatalf("Failed to receive message with timeout: %v", err)
		}
		switch v := msg.(type) {
		case *redis.Message:
			fmt.Printf("Timeout subscriber Received message from channel %s: %s\n", v.Channel, v.Payload)
		case *redis.Subscription:
			fmt.Printf("Timeout subscriber Received Subscription message: %s %s %d\n", v.Kind, v.Channel, v.Count)
		}
	}
}

// publisher 发布消息到频道
func publisher(ctx context.Context, rdb *redis.Client) {
	messages := []string{"Hello", "World", "Redis", "PubSub", "Example"}

	for _, msg := range messages {
		fmt.Printf("Publishing message: %s\n", msg)
		if err := rdb.Publish(ctx, "channel1", msg).Err(); err != nil {
			log.Fatalf("Failed to publish message: %v", err)
		}
		time.Sleep(1 * time.Second) // 等待1秒钟再发布下一个消息
	}
	fmt.Println("All messages published.")
}

// checkPubSubInfo 查询活跃频道和订阅者数量
func checkPubSubInfo(ctx context.Context, rdb *redis.Client) {
	// 查询活跃的频道
	activeChannels := rdb.PubSubChannels(ctx, "channel*").Val()
	fmt.Println("Active channels:", activeChannels)

	// 查询频道的订阅者数量
	numSub := rdb.PubSubNumSub(ctx, "channel1", "channel2").Val()
	fmt.Println("Subscriber count:", numSub)
}

输出结果:

Publishing message: Hello
Subscriber channel1:PubSub(channel1) is waiting for messages on channel1...
Pattern subscriber is waiting for messages on channel* ...
Active channels: [channel1]
Timeout subscriber is waiting for messages on channel1 with timeout...
Subscriber count: map[channel1:1 channel2:0]
Pattern subscriber Received Subscription message: psubscribe channel* 1
Active channels: [channel1]
Subscriber count: map[channel1:2 channel2:0]
Subscriber channel1:PubSub(channel1) received message from channel channel1: Hello
Active channels: [channel1]
Subscriber count: map[channel1:2 channel2:0]
Timeout subscriber Received Subscription message: subscribe channel1 1
Publishing message: World
Pattern subscriber Received message from channel channel1: World
Timeout subscriber Received message from channel channel1: World
Subscriber channel1:PubSub(channel1) received message from channel channel1: World
Publishing message: Redis
Subscriber channel1:PubSub(channel1) received message from channel channel1: Redis
Timeout subscriber Received message from channel channel1: Redis
Pattern subscriber Received message from channel channel1: Redis
Publishing message: PubSub
Timeout subscriber Received message from channel channel1: PubSub
Pattern subscriber Received message from channel channel1: PubSub
Subscriber channel1:PubSub(channel1) received message from channel channel1: PubSub
Publishing message: Example
Pattern subscriber Received message from channel channel1: Example
Timeout subscriber Received message from channel channel1: Example
Subscriber channel1:PubSub(channel1) received message from channel channel1: Example
All messages published.
2024/08/20 23:24:05 Failed to receive message with timeout: read tcp 127.0.0.1:58755->127.0.0.1:6379: i/o timeout
exit status 1

结语

通过本篇文章,我们详细探讨了如何在 Golang 中使用 go-redis 库实现 Redis 的 PubSub 发布/订阅机制,涵盖了从基础的订阅发布操作到高级的消息处理方法。使用这些工具,您可以构建高效、可靠的实时消息系统,满足各种复杂的业务需求。

希望这篇文章能帮助你更好地理解和使用 go-redis,点击 go-redis 使用指南 可查看更多相关教程!


也可以看看