在现代应用开发中,实时通信是许多系统的核心需求之一。Redis 的 PubSub(发布/订阅)机制为这种需求提供了强大的支持,允许系统组件之间进行高效的消息传递。上一篇文章中,我们深入探讨了 Redis 的 Pipeline 操作,而本篇文章将带您深入了解 Redis 的 PubSub 机制,特别是如何在 Golang 中利用 go-redis 库实现这一功能。
👉 点击查看《go-redis使用指南》系列文章目录
在《go-redis使用指南》系列文章中,我们将详细介绍如何在 Golang 项目中使用 redis/go-redis 库与 Redis 进行交互。以下是该系列文章的全部内容:
- Golang 操作 Redis:快速上手 - go-redis 使用指南
- Golang 操作 Redis:连接设置与参数详解 - go-redis 使用指南
- Golang 操作 Redis:基础的字符串键值操作 - go-redis 使用指南
- Golang 操作 Redis:如何设置 key 的过期时间 - go-redis 使用指南
- Golang 操作 Redis:Hash 哈希数据类型操作用法 - go-redis 使用指南
- Golang 操作 Redis:Set 集合数据类型操作用法 - go-redis 使用指南
- Golang 操作 Redis:为 Hash 中的字段设置过期时间 - go-redis 使用指南
- Golang 操作 Redis:List 列表数据类型操作用法 - go-redis 使用指南
- Golang 操作 Redis:SortedSet 有序集合数据类型操作用法 - go-redis 使用指南
- Golang 操作 Redis:bitmap 数据类型操作用法 - go-redis 使用指南
- Golang 操作 Redis:事务处理操作用法 - go-redis 使用指南
- Golang 操作 Redis:地理空间数据类型操作用法 - go-redis 使用指南
- Golang 操作 Redis:HyperLogLog 操作用法 - go-redis 使用指南
- Golang 操作 Redis:Pipeline 操作用法 - go-redis 使用指南
- Golang 操作 Redis:PubSub发布订阅用法 - go-redis 使用指南
- Golang 操作 Redis:布隆过滤器(Bloom Filter)操作用法 - go-redis 使用指南
- Golang 操作 Redis:Cuckoo Filter操作用法 - go-redis 使用指南
- Golang 操作 Redis:Stream操作用法 - go-redis 使用指南
Redis PubSub 发布订阅简介
Redis PubSub 是一种发布/订阅模式,允许发布者将消息发送到频道,订阅者可以订阅这些频道并接收消息。这种机制适用于构建实时消息系统、通知系统、事件广播等场景。
常见的使用场景:
- 实时通知系统:在订单系统中,当订单状态更新时,可以通过 PubSub 通知相关服务。
- 事件广播:分布式系统中使用 PubSub 广播事件通知。
- 聊天系统:通过 PubSub 实现实时多人聊天。
需要注意的是,PubSub 的一个关键优势在于其实时性高,非常适合广播消息的场景。然而,它的缺点是消息不能持久化,如果消费者未连接或暂时离线,未接收到的消息将会丢失。因此,PubSub 更适用于需要即时消息传递的场景,而不适合那些需要确保消息可靠到达的场合。
go-redis 中 PubSub 发布订阅操作的方法
Subscribe
- 订阅一个或多个频道,返回一个 PubSub 对象。PSubscribe
- 订阅一个或多个模式匹配的频道。Unsubscribe
- 取消订阅一个或多个频道。PUnsubscribe
- 取消订阅一个或多个模式匹配的频道。Publish
- 向一个频道发布消息。ReceiveMessage
- 接收订阅频道的消息。Close
- 关闭 PubSub 对象,取消所有订阅。PubSubChannels
- 查询活跃的频道。PubSubNumSub
- 查询指定频道有多少个订阅者。ReceiveTimeout
- 在指定时间内接收消息,超时则返回错误。Receive
- 接收消息或返回其他类型的信息,如 Subscription、Message、Pong 等。Channel
- 返回一个 Go channel,用于并发接收消息。ChannelWithSubscriptions
- 返回一个 Go channel,消息类型包括*Subscription
和*Message
,用于检测重新连接。
go-redis PubSub 发布订阅操作方法详细讲解及示例代码
基本方法示例
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/redis/go-redis/v9"
)
func main() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Redis 服务器地址
})
// 启动订阅者
go subscriber(ctx, rdb)
// 启动发布者
go publisher(ctx, rdb)
// 等待发布者和订阅者完成
time.Sleep(10 * time.Second)
}
// subscriber 订阅频道并接收消息
func subscriber(ctx context.Context, rdb *redis.Client) {
pubsub := rdb.Subscribe(ctx, "mychannel")
defer pubsub.Close()
ch := pubsub.Channel()
fmt.Println("Subscriber is waiting for messages...")
for msg := range ch {
fmt.Printf("Received message from channel %s: %s\n", msg.Channel, msg.Payload)
}
}
// publisher 发布消息到频道
func publisher(ctx context.Context, rdb *redis.Client) {
messages := []string{"Hello", "World", "Redis", "PubSub", "Example"}
for _, msg := range messages {
fmt.Printf("Publishing message: %s\n", msg)
err := rdb.Publish(ctx, "mychannel", msg).Err()
if err != nil {
log.Fatalf("Failed to publish message: %v", err)
}
time.Sleep(1 * time.Second) // 等待1秒钟再发布下一个消息
}
fmt.Println("All messages published.")
}
输出结果:
Publishing message: Hello
Subscriber is waiting for messages...
Received message from channel mychannel: Hello
Publishing message: World
Received message from channel mychannel: World
Publishing message: Redis
Received message from channel mychannel: Redis
Publishing message: PubSub
Received message from channel mychannel: PubSub
Publishing message: Example
Received message from channel mychannel: Example
All messages published.
go-redis PubSub 接收发布订阅消息的不同方式与区别
在使用 Redis PubSub 时,你可以通过不同的方法来接收消息。这些方法包括 Receive
、ReceiveTimeout
、ReceiveMessage
,以及使用 Channel。
Receive
:用于接收所有类型的消息(*redis.Message
、*redis.Subscription
、*redis.Pong
)。ReceiveMessage
:用于接收*redis.Message
类型的消息,专注于 PubSub 消息。ReceiveTimeout
:类似于Receive
,但支持超时控制,适用于需要等待消息但希望避免长时间阻塞的场景。- Channel:提供了异步和并发接收消息的机制,适合需要处理消息流的场景。
下面是对这些方法及其区别的详细介绍:
1. Receive
- 功能:
Receive
是一个通用的方法,用于接收任何类型的消息。它可以返回*redis.Message
(消息类型)、*redis.Subscription
(订阅类型)或*redis.Pong
(Pong 类型)。 - 适用场景:当你需要处理多种类型的 PubSub 消息时使用,特别是当你需要从 PubSub 订阅中接收连接检测响应(
*redis.Pong
)时。 - 示例:
msg, err := pubsub.Receive(ctx) if err != nil { log.Fatalf("Failed to receive message: %v", err) } switch v := msg.(type) { case *redis.Message: fmt.Printf("Received message: %s\n", v.Payload) case *redis.Subscription: fmt.Printf("Received subscription update: %s\n", v.Kind) case *redis.Pong: fmt.Println("Received Pong") }
2. ReceiveMessage
- 功能:
ReceiveMessage
方法专门用于接收*redis.Message
类型的消息。如果接收到其他类型的消息(如*redis.Subscription
或*redis.Pong
),它将返回错误。 - 适用场景:当你只关心 PubSub 消息,而不需要处理其他类型的消息时使用。
- 示例:
msg, err := pubsub.ReceiveMessage(ctx) if err != nil { log.Fatalf("Failed to receive message: %v", err) } fmt.Printf("Received message from channel %s: %s\n", msg.Channel, msg.Payload)
3. ReceiveTimeout
- 功能:
ReceiveTimeout
方法类似于Receive
,但它支持超时控制。如果在指定的时间内未收到任何消息,将返回context.DeadlineExceeded
错误。它可以接收*redis.Message
、*redis.Subscription
和*redis.Pong
类型的消息。 - 适用场景:当你希望在指定时间内等待消息时使用,适用于高延迟或网络不稳定的环境,能够防止因长时间等待而阻塞程序。
- 示例:
msg, err := pubsub.ReceiveTimeout(ctx, 2*time.Second) if err != nil { if err == context.DeadlineExceeded { fmt.Println("Receive timeout") return } log.Fatalf("Failed to receive message with timeout: %v", err) } switch v := msg.(type) { case *redis.Message: fmt.Printf("Received message: %s\n", v.Payload) case *redis.Subscription: fmt.Printf("Received subscription update: %s\n", v.Kind) case *redis.Pong: fmt.Println("Received Pong") }
4. 使用 Channel
- 功能:通过
Channel
方法,你可以获取一个 Go channel 用于并发接收 PubSub 消息。该 channel 会持续接收*redis.Message
和*redis.Subscription
类型的消息。 - 适用场景:当你需要并发处理消息,或者希望以异步方式处理消息流时使用。使用 channel 还可以让你更方便地处理消息,并在处理过程中实现更好的消息同步。
- 示例:
ch := pubsub.Channel() for msg := range ch { fmt.Printf("Received message from channel %s: %s\n", msg.Channel, msg.Payload) }
根据你的具体需求,可以选择最合适的方法来处理 PubSub 消息。
go-redis PubSub 发布订阅方法综合示例
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/redis/go-redis/v9"
)
func main() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Redis 服务器地址
})
// 启动普通订阅者
go subscriber(ctx, rdb)
// 启动模式匹配订阅者
go patternSubscriber(ctx, rdb)
// 启动使用 ReceiveTimeout 的订阅者
go timeoutSubscriber(ctx, rdb)
// 启动发布者
go publisher(ctx, rdb)
// 等待所有 goroutine 完成
time.Sleep(10 * time.Second)
}
// subscriber 订阅频道并接收消息
func subscriber(ctx context.Context, rdb *redis.Client) {
pubsub := rdb.Subscribe(ctx, "channel1")
defer pubsub.Close()
// 使用 Channel 接收消息
ch := pubsub.Channel()
fmt.Printf("Subscriber channel1:%v is waiting for messages on channel1...\n", pubsub)
checkPubSubInfo(ctx, rdb)
for msg := range ch {
fmt.Printf("Subscriber channel1:%v received message from channel %s: %s\n", pubsub, msg.Channel, msg.Payload)
}
// 取消订阅并关闭 PubSub
if err := pubsub.Unsubscribe(ctx, "channel1"); err != nil {
log.Fatalf("Failed to Unsubscribe: %v", err)
}
}
// patternSubscriber 使用模式匹配订阅频道并接收消息
func patternSubscriber(ctx context.Context, rdb *redis.Client) {
pubsub := rdb.PSubscribe(ctx, "channel*")
defer pubsub.Close()
// 使用 ChannelWithSubscriptions 接收消息
ch := pubsub.ChannelWithSubscriptions()
fmt.Println("Pattern subscriber is waiting for messages on channel* ...")
checkPubSubInfo(ctx, rdb)
for msg := range ch {
switch v := msg.(type) {
case *redis.Message:
fmt.Printf("Pattern subscriber Received message from channel %s: %s\n", v.Channel, v.Payload)
case *redis.Subscription:
fmt.Printf("Pattern subscriber Received Subscription message: %s %s %d\n", v.Kind, v.Channel, v.Count)
}
}
if err := pubsub.PUnsubscribe(ctx, "channel*"); err != nil {
log.Fatalf("Failed to PUnsubscribe: %v", err)
}
}
// timeoutSubscriber 使用 ReceiveTimeout 接收消息
func timeoutSubscriber(ctx context.Context, rdb *redis.Client) {
pubsub := rdb.Subscribe(ctx, "channel1")
defer pubsub.Close()
fmt.Println("Timeout subscriber is waiting for messages on channel1 with timeout...")
checkPubSubInfo(ctx, rdb)
for {
msg, err := pubsub.ReceiveTimeout(ctx, 2*time.Second)
if err != nil {
if err == context.DeadlineExceeded {
fmt.Println("Receive timeout.")
continue
}
log.Fatalf("Failed to receive message with timeout: %v", err)
}
switch v := msg.(type) {
case *redis.Message:
fmt.Printf("Timeout subscriber Received message from channel %s: %s\n", v.Channel, v.Payload)
case *redis.Subscription:
fmt.Printf("Timeout subscriber Received Subscription message: %s %s %d\n", v.Kind, v.Channel, v.Count)
}
}
}
// publisher 发布消息到频道
func publisher(ctx context.Context, rdb *redis.Client) {
messages := []string{"Hello", "World", "Redis", "PubSub", "Example"}
for _, msg := range messages {
fmt.Printf("Publishing message: %s\n", msg)
if err := rdb.Publish(ctx, "channel1", msg).Err(); err != nil {
log.Fatalf("Failed to publish message: %v", err)
}
time.Sleep(1 * time.Second) // 等待1秒钟再发布下一个消息
}
fmt.Println("All messages published.")
}
// checkPubSubInfo 查询活跃频道和订阅者数量
func checkPubSubInfo(ctx context.Context, rdb *redis.Client) {
// 查询活跃的频道
activeChannels := rdb.PubSubChannels(ctx, "channel*").Val()
fmt.Println("Active channels:", activeChannels)
// 查询频道的订阅者数量
numSub := rdb.PubSubNumSub(ctx, "channel1", "channel2").Val()
fmt.Println("Subscriber count:", numSub)
}
输出结果:
Publishing message: Hello
Subscriber channel1:PubSub(channel1) is waiting for messages on channel1...
Pattern subscriber is waiting for messages on channel* ...
Active channels: [channel1]
Timeout subscriber is waiting for messages on channel1 with timeout...
Subscriber count: map[channel1:1 channel2:0]
Pattern subscriber Received Subscription message: psubscribe channel* 1
Active channels: [channel1]
Subscriber count: map[channel1:2 channel2:0]
Subscriber channel1:PubSub(channel1) received message from channel channel1: Hello
Active channels: [channel1]
Subscriber count: map[channel1:2 channel2:0]
Timeout subscriber Received Subscription message: subscribe channel1 1
Publishing message: World
Pattern subscriber Received message from channel channel1: World
Timeout subscriber Received message from channel channel1: World
Subscriber channel1:PubSub(channel1) received message from channel channel1: World
Publishing message: Redis
Subscriber channel1:PubSub(channel1) received message from channel channel1: Redis
Timeout subscriber Received message from channel channel1: Redis
Pattern subscriber Received message from channel channel1: Redis
Publishing message: PubSub
Timeout subscriber Received message from channel channel1: PubSub
Pattern subscriber Received message from channel channel1: PubSub
Subscriber channel1:PubSub(channel1) received message from channel channel1: PubSub
Publishing message: Example
Pattern subscriber Received message from channel channel1: Example
Timeout subscriber Received message from channel channel1: Example
Subscriber channel1:PubSub(channel1) received message from channel channel1: Example
All messages published.
2024/08/20 23:24:05 Failed to receive message with timeout: read tcp 127.0.0.1:58755->127.0.0.1:6379: i/o timeout
exit status 1
结语
通过本篇文章,我们详细探讨了如何在 Golang 中使用 go-redis 库实现 Redis 的 PubSub 发布/订阅机制,涵盖了从基础的订阅发布操作到高级的消息处理方法。使用这些工具,您可以构建高效、可靠的实时消息系统,满足各种复杂的业务需求。
希望这篇文章能帮助你更好地理解和使用 go-redis,点击 go-redis 使用指南 可查看更多相关教程!