golang实现Pub/Sub模式
一、什么是Pub/Sub模式
Pub/Sub 是一种可扩缩的异步消息传递服务,可将生成消息的服务与处理这些消息的服务分离开来。
Pub/Sub 允许服务异步通信,延迟时间大约为 100 毫秒。
Pub/Sub 用于流式分析和数据集成流水线,以注入和分发数据。无论是作为用于消息整合的消息传递中间件,还是作为并行处理任务的队列,它都非常有效。
通过 Pub/Sub,可以创建事件提供方和使用方的系统,称为发布者和订阅者。发布者通过广播事件而不是同步远程过程调用 (RPC) 与订阅者异步通信。
发布者将事件发送到 Pub/Sub 服务,而不考虑如何或何时处理这些事件。然后,Pub/Sub 会将事件传送到对其做出响应的所有服务。在通过 RPC 进行通信的系统中,发布商必须等待订阅者接收数据。但是,Pub/Sub 中的异步集成可以提高整个系统的灵活性和稳健性。
二、用Go实现简单的Pub/Sub服务
hub.go 实现Pub/Sub 服务,类似于中心枢纽,用于接受和分发消息
package main
import (
"context"
"sync"
)
type PubSubHub struct {
sync.Mutex
subs map[*subscriber]struct{}
}
func (h *PubSubHub) publish(ctx context.Context, msg *message) {
for s := range h.subs {
s.publish(ctx, msg)
}
}
func (h *PubSubHub) subscribe(ctx context.Context, s *subscriber) {
h.subs[s] = struct{}{}
go func() {
select {
case <-s.quit:
case <-ctx.Done():
delete(h.subs, s)
}
}()
go s.run(ctx)
}
func (h *PubSubHub) unsubscribe(ctx context.Context, s *subscriber) {
delete(h.subs, s)
close(s.quit)
}
func (h *PubSubHub) subscribers() int {
return len(h.subs)
}
func newPubSubHub() *PubSubHub {
return &PubSubHub{
subs: map[*subscriber]struct{}{},
}
}
subscriber.go 实现消息订阅和消费
package main
import (
"context"
"fmt"
"sync"
)
type message struct {
data []byte
}
type subscriber struct {
sync.Mutex
name string
handler chan *message
quit chan struct{}
}
func (s *subscriber) run(ctx context.Context) {
for {
select {
case msg:=<-s.handler:
fmt.Println(s.name, string(msg.data))
case <-s.quit:
return
case <-ctx.Done():
return
}
}
}
func (s *subscriber) publish(ctx context.Context, msg *message) {
select {
case <-ctx.Done():
return
case s.handler <- msg:
default:
}
}
func newSubscriber(name string) *subscriber {
return &subscriber{
name: name,
handler: make(chan *message, 100),
quit: make(chan struct{}),
}
}
三、验证
main.go
package main
import (
"context"
"time"
)
func main() {
ctx := context.Background()
h := newPubSubHub()
sub01 := newSubscriber("sub01")
sub02 := newSubscriber("sub02")
sub03 := newSubscriber("sub03")
h.subscribe(ctx, sub01)
h.subscribe(ctx, sub02)
h.subscribe(ctx, sub03)
h.publish(ctx, &message{
data: []byte("test01"),
})
time.Sleep(1*time.Second)
h.unsubscribe(ctx, sub03)
h.publish(ctx, &message{
data: []byte("test02"),
})
time.Sleep(1*time.Second)
}
运行main.go并且打印结果:
sub03 test01
sub01 test01
sub02 test01
sub02 test02
sub01 test02
打印的结果是对的,sub1-3都订阅了Pub/Sub服务,发送消息test01,三者都收到了。紧接着sub03取消订阅,然后再次发送消息test02,只有sub1-2收到了。
四、流程图
原文始发于微信公众号(堆栈future):golang实现Pub/Sub模式
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/103451.html