redigo 对redis的订阅机制放在pubsub.go里面
PubSubConn封装Conn以实现订阅者提供简便方法。Subscribe,PSubscribe,Unsubscribe和PUnsubscribe方法发送和刷新订阅。receive方法将推送的消息转换对应的类型
// Receive returns a pushed message as a Subscription, Message, Pong or error.
// The return value is intended to be used directly in a type switch as
// illustrated in the PubSubConn example
func (c PubSubConn) Receive() interface{
} {
return c.receiveInternal(c.Conn.Receive())
}
返回的是一个空接口类型 interface{},由于空接口没有方法,因此所有类型都实现了空接口,也就是说可以返回任意类型。
返回类型
具体会返回哪些类型在receiveInternal()里面可以看到,
目前返回的三种Message、Subscription、Pong都定义在了pubsub.go 里面。
func (c PubSubConn) receiveInternal(replyArg interface{ }, errArg error) interface{ } { reply, err := Values(replyArg, errArg) if err != nil { return err } var kind string reply, err = Scan(reply, &kind) if err != nil { return err } switch kind { case "message": var m Message if _, err := Scan(reply, &m.Channel, &m.Data); err != nil { return err } return m case "pmessage": var m Message if _, err := Scan(reply, &m.Pattern, &m.Channel, &m.Data); err != nil { return err } return m case "subscribe", "psubscribe", "unsubscribe", "punsubscribe": s := Subscription{ Kind: kind} if _, err := Scan(reply, &s.Channel, &s.Count); err != nil { return err } return s case "pong": var p Pong if _, err := Scan(reply, &p.Data); err != nil { return err } return p } r
订阅示例
package main import( //"github.com/go-redis/redis" "fmt" "time" //"reflect" "unsafe" "github.com/gomodule/redigo/redis" log "github.com/astaxie/beego/logs" ) type SubscribeCallback func (channel, message string) type Subscriber struct { client redis.PubSubConn cbMap map[string]SubscribeCallback } func (c *Subscriber) Connect(ip string, port uint16) { conn, err := redis.Dial("tcp", "127.0.0.1:6379") if err != nil { log.Critical("redis dial failed.") } c.client = redis.PubSubConn{ conn} c.cbMap = make(map[string]SubscribeCallback) go func() { for { log.Debug("wait...") //读取channel消息 并检测收到的消息类型 switch res := c.client.Receive().(type) { case redis.Message: channel := (*string)(unsafe.Pointer(&res.Channel)) message := (*string)(unsafe.Pointer(&res.Data)) c.cbMap[*channel](*channel, *message) case redis.Subscription: //订阅成功,同时订阅多个时,这地方会收到多次 fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count) case error: log.Error("error handle...") //如果是长连接项目,这里最好return,否则客户端断开时,这里可能会造成死循环 return } } }() } func (c *Subscriber) Close() { err := c.client.Close() if err != nil{ log.Error("redis close error.") } } func (c *Subscriber) Subscribe(channel interface{ }, cb SubscribeCallback) { err := c.client.Subscribe(channel) if err != nil{ log.Critical("redis Subscribe error.") } c.cbMap[channel.(string)] = cb } func TestCallback1(chann, msg string){ log.Debug("TestCallback1 channel : ", chann, " message : ", msg) } func TestCallback2(chann, msg string){ log.Debug("TestCallback2 channel : ", chann, " message : ", msg) } func TestCallback3(chann, msg string){ log.Debug("TestCallback3 channel : ", chann, " message : ", msg) } func main() { log.Info("===========main start============") var sub Subscriber sub.Connect("127.0.0.1", 6397) sub.Subscribe("test_chan1", TestCallback1) sub.Subscribe("test_chan2", TestCallback2) sub.Subscribe("test_chan3", TestCallback3) for{ time.Sleep(1 * time.Second) } }
常见问题
这地方如果用redis的连接池的话,要注意不要设置读取数据的超时时间,否则到了超时时间,就会断开连接了,报错如下:
2022/07/12 15:43:14 wait...
test1: subscribe 1
2022/07/12 15:43:14 wait...
[GIN] 2022/07/12 - 15:43:26 | 200 | 1.129741ms | 222.128.58.255 | GET "
/gotest/pub"
2022/07/12 15:43:26 {test1 [104 101 108 108 111]}
2022/07/12 15:43:26 0xc00009e540 0xc00009e560
2022/07/12 15:43:26 wait...
2022/07/12 15:43:46 error handle... read tcp 10.10.2.8:57724->123.207.190.86:637
9: i/o timeout
2022/07/12 15:43:46 wait...
2022/07/12 15:43:46 error handle... read tcp 10.10.2.8:57724->123.207.190.86:637
9: use of closed network connection
2022/07/12 15:43:46 wait...
2022/07/12 15:43:46 error handle... read tcp 10.10.2.8:57724->123.207.190.86:637
9: use of closed network connection
2022/07/12 15:43:46 wait...
2022/07/12 15:43:46 error handle... read tcp 10.10.2.8:57724->123.207.190.86:637
9: use of closed network connection
2022/07/12 15:43:46 wait...
如果是长连接项目,case error 的情况下这里最好return,否则客户端断开时,这里可能会造成死循环(break只会退出switch,不会退出外边的for)
发布示例
发布直接使用默认的Conn来Send “Publish“ 命令即可.
redigo的管道的使用方法设计到三个函数,Do函数也是下面这三个函数的合并:
c.Send("SUBSCRIBE", "example") c.Flush() for { reply, err := c.Receive() if err != nil { return err } // process pushed message }
send()方法把命令写到输出缓冲区,Flush()把缓冲区的命令刷新到redis服务器,Receive()函数接收redis给予的响应,三个操作共同完成一套命令流程。
package main import( //"github.com/go-redis/redis" "github.com/gomodule/redigo/redis" log "github.com/astaxie/beego/logs" ) func main() { client, err := redis.Dial("tcp", "127.0.0.1:6379") if err != nil { log.Critical("redis dial failed.") } defer client.Close() _, err = client.Do("Publish", "test_chan1", "hello") if err != nil { log.Critical("redis Publish failed.") } _, err = client.Do("Publish", "test_chan2", "hello") if err != nil { log.Critical("redis Publish failed.") } _, err = client.Do("Publish", "test_chan3", "hello") if err != nil { log.Critical("redis Publish failed.") } }
PubSubConn
定义
type PubSubConn struct { Conn Conn }
提供的方法:
1.Close 关闭连接
func (c PubSubConn) Close() error
2.PSubscribe 订阅channel支持通配符匹配
func (c PubSubConn) PSubscribe(channel ...interface{}) error
3.PUnsubscribe 取消发布, 如果没有给定, 则取消所有
func (c PubSubConn) PUnsubscribe(channel ...interface{}) error
4.Ping 指定的数据向服务器发送PING 调用此方法时,连接必须至少订阅一个通道或模式
func (c PubSubConn) Ping(data string) error
5.Receive 获取消息
func (c PubSubConn) Receive() interface{}
6.ReceiveWithTimeout 带有超时时间的获取消息函数
func (c PubSubConn) ReceiveWithTimeout(timeout time.Duration) interface{}
7.Subscribe 订阅
func (c PubSubConn) Subscribe(channel ...interface{}) error
8.Unsubscribe 取消订阅
func (c PubSubConn) Unsubscribe(channel ...interface{}) error
go redis发布订阅常用函数
Subscribe - 订阅channel
PSubscribe - 订阅channel支持通配符匹配
Publish - 将信息发送到指定的channel。
PubSub Channels - 查询活跃的channel
127.0.0.1:6379> pubsub channels 1) "forum_38" 2) "forum_99" 3) "forum_79"
PubSub NumSub - 查询指定的channel有多少个订阅者
127.0.0.1:6379> pubsub numsub forum_38 forum_10 1) "forum_38" 2) (integer) 1 3) "forum_10" 4) (integer) 1
《本文》有 0 条评论