golang实现Pub/Sub模式

golang实现Pub/Sub模式

一、什么是Pub/Sub模式

Pub/Sub 是一种可扩缩的异步消息传递服务,可将生成消息的服务与处理这些消息的服务分离开来。

Pub/Sub 允许服务异步通信,延迟时间大约为 100 毫秒。

Pub/Sub 用于流式分析和数据集成流水线,以注入和分发数据。无论是作为用于消息整合的消息传递中间件,还是作为并行处理任务的队列,它都非常有效。

通过 Pub/Sub,可以创建事件提供方和使用方的系统,称为发布者和订阅者。发布者通过广播事件而不是同步远程过程调用 (RPC) 与订阅者异步通信。

发布者将事件发送到 Pub/Sub 服务,而不考虑如何或何时处理这些事件。然后,Pub/Sub 会将事件传送到对其做出响应的所有服务。在通过 RPC 进行通信的系统中,发布商必须等待订阅者接收数据。但是,Pub/Sub 中的异步集成可以提高整个系统的灵活性和稳健性。

二、用Go实现简单的Pub/Sub服务

hub.go 实现Pub/Sub 服务,类似于中心枢纽,用于接受和分发消息

package main

import (
 "context"
 "sync"
)

type PubSubHub struct {
 sync.Mutex
 subs map[*subscriber]struct{}
}

func (h *PubSubHub) publish(ctx context.Context, msg *message) {
 for s := range h.subs {
  s.publish(ctx, msg)
 }
}

func (h *PubSubHub) subscribe(ctx context.Context, s *subscriber) {
 h.subs[s] = struct{}{}

 go func() {
  select {
  case <-s.quit:
  case <-ctx.Done():
   delete(h.subs, s)
  }
 }()

 go s.run(ctx)
}

func (h *PubSubHub) unsubscribe(ctx context.Context, s *subscriber) {
 delete(h.subs, s)
 close(s.quit)
}

func (h *PubSubHub) subscribers() int {
 return len(h.subs)
}

func newPubSubHub() *PubSubHub {
 return &PubSubHub{
  subs: map[*subscriber]struct{}{},
 }
}

subscriber.go 实现消息订阅和消费

package main

import (
 "context"
 "fmt"
 "sync"
)

type message struct {
 data []byte
}

type subscriber struct {
 sync.Mutex
 name string
 handler chan *message
 quit chan struct{}
}

func (s *subscriber) run(ctx context.Context) {
 for {
  select {
  case msg:=<-s.handler:
   fmt.Println(s.name, string(msg.data))
  case <-s.quit:
   return
  case <-ctx.Done():
   return
  }
 }
}

func (s *subscriber) publish(ctx context.Context, msg *message) {
 select {
 case <-ctx.Done():
  return
 case s.handler <- msg:
 default:
 }
}

func newSubscriber(name string) *subscriber {
 return &subscriber{
  name: name,
  handler: make(chan *message, 100),
  quit: make(chan struct{}),
 }
}

三、验证

main.go

package main

import (
 "context"
 "time"
)

func main() {
 ctx := context.Background()
 h := newPubSubHub()
 sub01 := newSubscriber("sub01")
 sub02 := newSubscriber("sub02")
 sub03 := newSubscriber("sub03")

 h.subscribe(ctx, sub01)
 h.subscribe(ctx, sub02)
 h.subscribe(ctx, sub03)

 h.publish(ctx, &message{
  data: []byte("test01"),
 })
 time.Sleep(1*time.Second)

 h.unsubscribe(ctx, sub03)

 h.publish(ctx, &message{
  data: []byte("test02"),
 })
 time.Sleep(1*time.Second)
}

运行main.go并且打印结果:

sub03 test01
sub01 test01
sub02 test01

sub02 test02
sub01 test02

打印的结果是对的,sub1-3都订阅了Pub/Sub服务,发送消息test01,三者都收到了。紧接着sub03取消订阅,然后再次发送消息test02,只有sub1-2收到了。

四、流程图

1、Subscriber 订阅消息golang实现Pub/Sub模式

2、Publisher 发送消息golang实现Pub/Sub模式

3、Unsubscribe 取消订阅golang实现Pub/Sub模式

原文始发于微信公众号(堆栈future):golang实现Pub/Sub模式

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/103451.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!