Golang实现自己的Redis (pipeline客户端)

用11篇文章实现一个可用的Redis服务,姑且叫EasyRedis吧,希望通过文章将Redis掰开撕碎了呈现给大家,而不是仅仅停留在八股文的层面,并且有非常爽的感觉,欢迎持续关注学习。

项目代码地址: https://github.com/gofish2020/easyredis 欢迎Fork & Star

  • [x] easyredis之TCP服务
  • [x] easyredis之网络请求序列化协议(RESP)
  • [x] easyredis之内存数据库
  • [x] easyredis之过期时间 (时间轮实现)
  • [x] easyredis之持久化 (AOF实现)
  • [x] easyredis之发布订阅功能
  • [x] easyredis之有序集合(跳表实现)
  • [x] easyredis之 pipeline 客户端实现
  • [ ] easyredis之事务(原子性/回滚)
  • [ ] easyredis之连接池
  • [ ] easyredis之分布式集群存储

【第八篇】EasyRedis之pipeline客户端

网络编程的一个基础知识:用同一个sokcet连接发送多个数据包的时候,我们一般的做法是,发送并立刻接收结果,在没有接收到,是不会继续发送数据包。这种方法简单,但是效率太低。时间都浪费在等待上了…

socket的【发送缓冲区和接收缓冲区】是分离的,也就是发送不用等待接收,接收也不用等待发送。

所以我们可以把我们要发送的多个数据包【数据包1/数据包2…数据包N】复用同一个连接,通过发送缓冲区按顺序都发送给服务端。服务端处理请求的顺序,也是按照【数据包1/数据包2…数据包N】这个顺序处理的。当处理完以后,处理结果将按照【数据包结果1/数据包结果2…数据包结果N】顺序发送给客户端的接收缓冲区。客户端只需要从接收缓冲区中读取数据,并保存到请求数据包上,即可。这样我们就可以将发送和接收分离开来。一个协程只负责发送,一个协程只负责接收,互相不用等待。关键在于保证发送和接收的顺序是相同的设计逻辑图如下:Golang实现自己的Redis (pipeline客户端)

代码路径redis/client/client.go整个代码也就是200多行,结合上图非常容易理解

创建客户端

type RedisClent struct {
// socket连接
conn net.Conn

addr string
// 客户端当前状态
connStatus atomic.Int32

// heartbeat
ticker time.Ticker

// buffer cache
waitSend chan *request
waitResult chan *request

// 有请求正在处理中...
working sync.WaitGroup
}

// 创建redis客户端socket
func NewRedisClient(addr string) (*RedisClent, error) {
conn, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
}

rc := RedisClent{}
rc.conn = conn
rc.waitSend = make(chan *request, maxChanSize)
rc.waitResult = make(chan *request, maxChanSize)
rc.addr = addr
return &rc, nil
}

// 启动
func (rc *RedisClent) Start() error {
rc.ticker = *time.NewTicker(heartBeatInterval)
// 将waitSend缓冲区进行发送
go rc.execSend()
// 获取服务端结果
go rc.execReceive()
// 定时发送心跳
go rc.execHeardBeat()
rc.connStatus.Store(connRunning) // 启动状态
return nil
}

发送Redis命令

command [][]byte保存到缓冲区 rc.waitSend

// 将redis命令保存到 waitSend 中
func (rc *RedisClent) Send(command [][]byte) (protocol.Reply, error) {

// 已关闭
if rc.connStatus.Load() == connClosed {
return nil, errors.New("client closed")
}

req := &request{
command: command,
wait: wait.Wait{},
}
// 单个请求
req.wait.Add(1)

// 所有请求
rc.working.Add(1)
defer rc.working.Done()

// 将数据保存到缓冲中
rc.waitSend <- req

// 等待处理结束
if req.wait.WaitWithTimeOut(maxWait) {
return nil, errors.New("time out")
}
// 出错
if req.err != nil {
err := req.err
return nil, err
}
// 正常
return req.reply, nil
}

发送到服务端

// 将waitSend缓冲区进行发送
func (rc *RedisClent) execSend() {
for req := range rc.waitSend {
rc.sendReq(req)
}
}

func (rc *RedisClent) sendReq(req *request) {
// 无效请求
if req == nil || len(req.command) == 0 {
return
}

var err error
// 网络请求(重试3次)
for i := 0; i < 3; i++ {
_, err = rc.conn.Write(req.Bytes())
// 发送成功 or 发送错误(除了超时错误和deadline错误)跳出
if err == nil ||
(!strings.Contains(err.Error(), "timeout") && // only retry timeout
!strings.Contains(err.Error(), "deadline exceeded")) {
break
}
}

if err == nil { // 发送成功,异步等待结果
rc.waitResult <- req
} else { // 发送失败,请求直接失败
req.err = err
req.wait.Done()
}
}


从服务端读取数据

func (rc *RedisClent) execReceive() {

ch := parser.ParseStream(rc.conn)

for payload := range ch {

if payload.Err != nil {
if rc.connStatus.Load() == connClosed { // 连接已关闭
return
}

// 否则,重新连接(可能因为网络抖动临时断开了)

rc.reconnect()
return
}

// 说明一切正常

rc.handleResult(payload.Reply)
}
}

func (rc *RedisClent) handleResult(reply protocol.Reply) {
// 从rc.waitResult 获取一个等待中的请求,将结果保存进去
req := <-rc.waitResult
if req == nil {
return
}
req.reply = reply
req.wait.Done() // 通知已经获取到结果
}

断线重连

因为网络抖动可能存在连接断开的情况,所以需要有重连的功能

func (rc *RedisClent) reconnect() {
logger.Info("redis client reconnect...")
rc.conn.Close()

var conn net.Conn
// 重连(重试3次)
for i := 0; i < 3; i++ {
var err error
conn, err = net.Dial("tcp", rc.addr)
if err != nil {
logger.Error("reconnect error: " + err.Error())
time.Sleep(time.Second)
continue
} else {
break
}
}
// 服务端连不上,说明服务可能挂了(or 网络问题 and so on...)
if conn == nil {
rc.Stop()
return
}

// 这里关闭没问题,因为rc.conn.Close已经关闭,函数Send中保存的请求因为发送不成功,不会写入到waitResult
close(rc.waitResult)
// 清理 waitResult(因为连接重置,新连接上只能处理新请求,老的请求的数据结果在老连接上,老连接已经关了,新连接上肯定是没有结果的)
for req := range rc.waitResult {
req.err = errors.New("connect reset")
req.wait.Done()
}

// 新连接(新气象)
rc.waitResult = make(chan *request, maxWait)
rc.conn = conn

// 重新启动接收协程
go rc.execReceive()
}


原文始发于微信公众号(nullbody笔记):Golang实现自己的Redis (pipeline客户端)

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/223648.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!