gopacket tcpassembly源码分析

生活中,最使人疲惫的往往不是道路的遥远,而是心中的郁闷;最使人痛苦的往往不是生活的不幸,而是希望的破灭;最使人颓废的往往不是前途的坎坷,而是自信的丧失;最使人绝望的往往不是挫折的打击,而是心灵的死亡。所以我们要有自己的梦想,让梦想的星光指引着我们走出落漠,走出惆怅,带着我们走进自己的理想。

导读:本篇文章讲解 gopacket tcpassembly源码分析,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com,来源:原文

gopacket tcpassembly源码分析

调用

参考示例example/httpassembly

  1. 自定义一个factory,实现New接口
type httpStream struct {
	net, transport gopacket.Flow
	r              tcpreader.ReaderStream
}
func (h *httpStreamFactory) New(net, transport gopacket.Flow) tcpassembly.Stream {
}
  1. New接口保存一个tcpreader.NewReaderStream()流,启动处理流的协程,然后返回这个流

func (h *httpStreamFactory) New(net, transport gopacket.Flow) tcpassembly.Stream {
	hstream := &httpStream{
		net:       net,
		transport: transport,
		r:         tcpreader.NewReaderStream(),
	}
	go hstream.run() // Important... we must guarantee that data from the reader stream is read.

	// ReaderStream implements tcpassembly.Stream, so we can return a pointer to it.
	return &hstream.r
}
  1. 流处理协程,建一个buf,从这个buf中读取数据,然后重组解析
func (h *httpStream) run() {
	buf := bufio.NewReader(&h.r)
	for {
		req, err := http.ReadRequest(buf)
		if err == io.EOF {
			// We must read until we see an EOF... very important!
			return
		} else if err != nil {
			log.Println("Error reading stream", h.net, h.transport, ":", err)
		} else {
			bodyBytes := tcpreader.DiscardBytesToEOF(req.Body)
			req.Body.Close()
			log.Println("Received request from stream", h.net, h.transport, ":", req, "with", bodyBytes, "bytes in request body")
		}
	}
}
  1. 使用,和reassembly一样
func main() {
	defer util.Run()()
	// 1. 打开设备
	var handle *pcap.Handle
	var err error
	handle, err = pcap.OpenLive(*iface, int32(*snaplen), true, pcap.BlockForever)
	if err != nil {
		log.Fatal(err)
	}
	// 设置BPF
	if err := handle.SetBPFFilter(*filter); err != nil {
		log.Fatal(err)
	}

	// 2. 初始化assembly
	streamFactory := &httpStreamFactory{}
	streamPool := tcpassembly.NewStreamPool(streamFactory)
	assembler := tcpassembly.NewAssembler(streamPool)

	log.Println("reading in packets")
	// 3.初始化packetSource
	packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
	packets := packetSource.Packets()
	ticker := time.Tick(time.Second)
	for {
		select {
		// 4. 读取包
		case packet := <-packets:
			// A nil packet indicates the end of a pcap file.
			if packet == nil {
				return
			}
			if *logAllPackets {
				log.Println(packet)
			}
			if packet.NetworkLayer() == nil || packet.TransportLayer() == nil || packet.TransportLayer().LayerType() != layers.LayerTypeTCP {
				log.Println("Unusable packet")
				continue
			}
			tcp := packet.TransportLayer().(*layers.TCP)
			// 5. tcp直接丢进去
			assembler.AssembleWithTimestamp(packet.NetworkLayer().NetworkFlow(), tcp, packet.Metadata().Timestamp)

		case <-ticker:
			// 6. 定时书信连接
			assembler.FlushOlderThan(time.Now().Add(time.Minute * -2))
		}
	}
}

Assembler

type AssemblerOptions struct {
	// 等待无序包时要缓冲的page总数最大值
	// 一旦达到这个上限值, Assembler将会降级刷新每个连接的,如果<=0将被忽略。
	MaxBufferedPagesTotal int
	// 单个连接缓冲的page最大值
	// 如果达到上限,则将刷新最小序列号以及任何连续数据。如果<= 0,这将被忽略。
	MaxBufferedPagesPerConnection int
}

type Assembler struct {
	AssemblerOptions	// 选项
	ret      []Reassembly	// 数据包
	pc       *pageCache
	connPool *StreamPool
}

// 创建一个Assember
// pool: StreamPool,来Assember共享
// DefaultAssemblerOptions
// 改造建议 - 选项模式与默认值
func NewAssembler(pool *StreamPool) *Assembler {
	pool.mu.Lock()
	pool.users++
	pool.mu.Unlock()
	return &Assembler{
		ret:              make([]Reassembly, assemblerReturnValueInitialSize),
		pc:               newPageCache(),
		connPool:         pool,
		AssemblerOptions: DefaultAssemblerOptions, //默认值,无限制
	}
}

AssemblyTimestamp

func (a *Assembler) AssembleWithTimestamp(netFlow gopacket.Flow, t *layers.TCP, timestamp time.Time) {
	// 忽略空的数据包,比如keepalived
	// tcp握手时 t.SYN = 1 t.FIN = 0 t.RST = 0 len(t.LayerPayload()) == 0 
	// 即false && true && true && true
	// tcp挥手时 t.SYN = 0 t.FIN = 1 t.RST = 0 len(t.LayerPayload()) == 0 
	// 即true && false && true && true
	if !t.SYN && !t.FIN && !t.RST && len(t.LayerPayload()) == 0 {
		if *debugLog {
			log.Println("ignoring useless packet")
		}
		return
	}

	a.ret = a.ret[:0]
	// 4元组组成的key
	key := key{netFlow, t.TransportFlow()}

	var conn *connection
	// This for loop handles a race condition where a connection will close, lock
	// the connection pool, and remove itself, but before it locked the connection
	// pool it's returned to another Assemble statement.  This should loop 0-1
	// times for the VAST majority of cases.
	// 创建conn
	for {
		// tcp keepalive syn=0 payload=0
		// 即 true && true end 为true?
		conn = a.connPool.getConnection(key, !t.SYN && len(t.LayerPayload()) == 0, timestamp)
		if conn == nil {
			if *debugLog {
				log.Printf("%v got empty packet on otherwise empty connection", key)
			}
			return
		}
		conn.mu.Lock()
		if !conn.closed {
			break
		}
		conn.mu.Unlock()
	}
	if conn.lastSeen.Before(timestamp) {
		conn.lastSeen = timestamp
	}
	//type Sequence int64 提供Difference和Add函数
	seq, bytes := Sequence(t.Seq), t.Payload // seq:当前序号  bytes:tcp负载的数据
	// 校验序号
	if conn.nextSeq == invalidSequence {
		if t.SYN {
			if *debugLog {
				log.Printf("%v saw first SYN packet, returning immediately, seq=%v", key, seq)
			}
			// 添加 Reassembly重组后的对象
			a.ret = append(a.ret, Reassembly{
				Bytes: bytes,
				Skip:  0,
				Start: true,
				Seen:  timestamp,
			})
			// 下一个包的序号 = 当前的序号 + 字节数 + 1
			conn.nextSeq = seq.Add(len(bytes) + 1)
		} else {
			if *debugLog {
				log.Printf("%v waiting for start, storing into connection", key)
			}
			// 插入到数据到connection中
			a.insertIntoConn(t, conn, timestamp)
		}
	} else if diff := conn.nextSeq.Difference(seq); diff > 0 {
		if *debugLog {
			log.Printf("%v gap in sequence numbers (%v, %v) diff %v, storing into connection", key, conn.nextSeq, seq, diff)
		}
		// 插入到数据到connection中
		a.insertIntoConn(t, conn, timestamp)
	} else {=<0
		// 字节校准
		bytes, conn.nextSeq = byteSpan(conn.nextSeq, seq, bytes)
		if *debugLog {
			log.Printf("%v found contiguous data (%v, %v), returning immediately", key, seq, conn.nextSeq)
		}
		a.ret = append(a.ret, Reassembly{
			Bytes: bytes,
			Skip:  0,
			End:   t.RST || t.FIN,
			Seen:  timestamp,
		})
	}
	if len(a.ret) > 0 {
		a.sendToConnection(conn)
	}
	conn.mu.Unlock()
}

insertIntoConn

func (a *Assembler) insertIntoConn(t *layers.TCP, conn *connection, ts time.Time) {
	if conn.first != nil && conn.first.seq == conn.nextSeq {
		panic("wtf")
	}
	// p:第一页 p2:最后一页 numPages:页数
	p, p2, numPages := a.pagesFromTCP(t, ts)

	//遍历双向链接page列表获取正确的放置给定序号的位置
	// 直接插入不好吗?
	prev, current := conn.traverseConn(Sequence(t.Seq))
	conn.pushBetween(prev, current, p, p2)
	conn.pages += numPages

	// 校验最大缓冲page数
	if (a.MaxBufferedPagesPerConnection > 0 && conn.pages >= a.MaxBufferedPagesPerConnection) ||
		(a.MaxBufferedPagesTotal > 0 && a.pc.used >= a.MaxBufferedPagesTotal) {
		if *debugLog {
			log.Printf("%v hit max buffer size: %+v, %v, %v", conn.key, a.AssemblerOptions, conn.pages, a.pc.used)
		}
		// 弹出
		a.addNextFromConn(conn)
	}
}

pagesFromTCP

从TCP数据包创建一个page(或设置一个pages)。

注意此函数不应该接受SYN包,因为它不能正确处理seq。

返回双连接的page列表中的第一个和最后一个页面。

func (a *Assembler) pagesFromTCP(t *layers.TCP, ts time.Time) (p, p2 *page, numPages int) {
	first := a.pc.next(ts)
	current := first
	numPages++
	seq, bytes := Sequence(t.Seq), t.Payload
	for {
		length := min(len(bytes), pageBytes)
		// 拷贝负载数据
		current.Bytes = current.buf[:length]
		copy(current.Bytes, bytes)
		// 设置seq
		current.seq = seq
		// 处理剩余数据>1900,一般不会进入到这里,实际场景下MTU会将TCP切段
		bytes = bytes[length:]
		if len(bytes) == 0 {
			break
		}
		seq = seq.Add(length)
		// 创建下一页
		current.next = a.pc.next(ts)
		// 设置下一个的prev为current
		current.next.prev = current
		// 设置下一页
		current = current.next
		numPages++
	}
	current.End = t.RST || t.FIN // 设置end
	return first, current, numPages
}

addNextFromConn

弹出第一页

func (a *Assembler) addNextFromConn(conn *connection) {
	if conn.nextSeq == invalidSequence {
		conn.first.Skip = -1
	} else if diff := conn.nextSeq.Difference(conn.first.seq); diff > 0 {
		conn.first.Skip = int(diff)
	}
	conn.first.Bytes, conn.nextSeq = byteSpan(conn.nextSeq, conn.first.seq, conn.first.Bytes)
	if *debugLog {
		log.Printf("%v   adding from conn (%v, %v)", conn.key, conn.first.seq, conn.nextSeq)
	}
	a.ret = append(a.ret, conn.first.Reassembly)
	a.pc.replace(conn.first)
	if conn.first == conn.last {
		conn.first = nil
		conn.last = nil
	} else {
		conn.first = conn.first.next
		conn.first.prev = nil
	}
	conn.pages--
}

sendToConnection

func (a *Assembler) sendToConnection(conn *connection) {
	// 组数据
	a.addContiguous(conn)
	if conn.stream == nil {
		panic("why?")
	}
	conn.stream.Reassembled(a.ret)
	if a.ret[len(a.ret)-1].End {
		a.closeConnection(conn)
	}
}

addContiguous

func (a *Assembler) addContiguous(conn *connection) {
	for conn.first != nil && conn.nextSeq.Difference(conn.first.seq) <= 0 {
		a.addNextFromConn(conn)
	}
}

addNextFromConn

弹出第一页添加到数组中

func (a *Assembler) addNextFromConn(conn *connection) {
	if conn.nextSeq == invalidSequence {
		conn.first.Skip = -1
	} else if diff := conn.nextSeq.Difference(conn.first.seq); diff > 0 {
		conn.first.Skip = int(diff)
	}
	conn.first.Bytes, conn.nextSeq = byteSpan(conn.nextSeq, conn.first.seq, conn.first.Bytes)
	if *debugLog {
		log.Printf("%v   adding from conn (%v, %v)", conn.key, conn.first.seq, conn.nextSeq)
	}
	a.ret = append(a.ret, conn.first.Reassembly)
	a.pc.replace(conn.first)
	if conn.first == conn.last {
		conn.first = nil
		conn.last = nil
	} else {
		conn.first = conn.first.next
		conn.first.prev = nil
	}
	conn.pages--
}

closeConnection

func (a *Assembler) closeConnection(conn *connection) {
	if *debugLog {
		log.Printf("%v closing", conn.key)
	}
	conn.stream.ReassemblyComplete()
	conn.closed = true
	a.connPool.remove(conn)
	for p := conn.first; p != nil; p = p.next {
		a.pc.replace(p)
	}
}

StreamPool

管理流的连接池,初始连接池分配1024个

type StreamPool struct {
	conns              map[key]*connection
	users              int
	mu                 sync.RWMutex
	factory            StreamFactory
	free               []*connection
	all                [][]connection
	nextAlloc          int
	newConnectionCount int64
}

func NewStreamPool(factory StreamFactory) *StreamPool {
	return &StreamPool{
		conns:     make(map[key]*connection, initialAllocSize),
		free:      make([]*connection, 0, initialAllocSize),
		factory:   factory,
		nextAlloc: initialAllocSize,
	}
}

grow

分配连接

func (p *StreamPool) grow() {
	conns := make([]connection, p.nextAlloc)
	p.all = append(p.all, conns)
	for i := range conns {
		p.free = append(p.free, &conns[i])
	}
	if *memLog {
		log.Println("StreamPool: created", p.nextAlloc, "new connections")
	}
	p.nextAlloc *= 2
}

newConnection

创建连接

func (p *StreamPool) newConnection(k key, s Stream, ts time.Time) (c *connection) {
	if *memLog {
		p.newConnectionCount++
		if p.newConnectionCount&0x7FFF == 0 {
			log.Println("StreamPool:", p.newConnectionCount, "requests,", len(p.conns), "used,", len(p.free), "free")
		}
	}
	if len(p.free) == 0 {
		p.grow()
	}
	index := len(p.free) - 1
	c, p.free = p.free[index], p.free[:index]
	c.reset(k, s, ts)
	return c
}

getConnection

// 返回一个连接,如果连接已经被关闭或者连接不存在,返回nil
func (p *StreamPool) getConnection(k key, end bool, ts time.Time) *connection {
	p.mu.RLock()
	conn := p.conns[k]
	p.mu.RUnlock()
	if end || conn != nil {
		return conn
	}
	s := p.factory.New(k[0], k[1])
	p.mu.Lock()
	conn = p.newConnection(k, s, ts)
	if conn2 := p.conns[k]; conn2 != nil {
		p.mu.Unlock()
		return conn2
	}
	p.conns[k] = conn
	p.mu.Unlock()
	return conn
}

remove

删除某个个连接

func (p *StreamPool) remove(conn *connection) {
	p.mu.Lock()
	delete(p.conns, conn.key)
	p.free = append(p.free, conn)
	p.mu.Unlock()
}

connection

返回所有的连接

func (p *StreamPool) connections() []*connection {
	p.mu.RLock()
	conns := make([]*connection, 0, len(p.conns))
	for _, conn := range p.conns {
		conns = append(conns, conn)
	}
	p.mu.RUnlock()
	return conns
}

connection

type connection struct {
	key               key
	pages             int
	first, last       *page
	nextSeq           Sequence
	created, lastSeen time.Time
	stream            Stream
	closed            bool
	mu                sync.Mutex
}

reset

因为连接是预先分配的,所以需要重置,相当于初始化

func (c *connection) reset(k key, s Stream, ts time.Time) {
	c.key = k
	c.pages = 0
	c.first, c.last = nil, nil
	c.nextSeq = invalidSequence
	c.created = ts
	c.stream = s
	c.closed = false
}

traverseConn

遍历双向链接page列表获取正确的放置给定序号的位置。

注意它是向后遍历的,从最大的序号开始,一直往下,因为我们假设常见的情况是TCP数据包的流是顺序,与最小损失或数据包重新排序。

遍历双向链接的page列表,以找到放置给定序号的正确位置。

注意,它是向后遍历的,从最高的序号开始,一直向下,因为我们假设通常的情况是,流的TCP包将按顺序出现,损失或包重排序最小。

func (c *connection) traverseConn(seq Sequence) (prev, current *page) {
	prev = c.last
	for prev != nil && prev.seq.Difference(seq) < 0 {
		current = prev
		prev = current.prev
	}
	return
}

pushbettwen

首先插入双向链接列表first-…-last在另一个双链接列表中的节点prevnext之间。如果prev为nil,则首先成为连接列表中的新第一页。如果next为nil,则使last成为列表中新的最后一页。第一个/最后一个可能指向相同的页面。

func (c *connection) pushBetween(prev, next, first, last *page) {
	// Maintain our doubly linked list
	if next == nil || c.last == nil {
		c.last = last
	} else {
		last.next = next
		next.prev = last
	}
	if prev == nil || c.first == nil {
		c.first = first
	} else {
		first.prev = prev
		prev.next = first
	}
}

Reassembly

Reassembly由assembler传入stream中。

type Reassembly struct {
	// TCP流的负载,可能为空
	Bytes []byte
	// 如果在此和最后一个Reassembly之间跳过字节,则Skip设置为非零。
	// 如果这是连接中的第一个包,我们没有看到开始,我们不知道我们跳过了多少字节,所以我们将它设为-1。
	// 否则,它被设置为跳过的字节数。
	Skip int
	// 如果这组字节带有TCP SYN,则设置Start。
	Start bool
	// 如果这组字节带有TCP FIN或RST,则设置End。
	End bool
	// 看到的是这组字节从网络中取出的时间戳。
	Seen time.Time
}

pageCache

pageCache是一个并发不安全的page对象,使用此应该避免内存分配。它会增长但是不会收缩。

type pageCache struct {
	free         []*page
	pcSize       int
	size, used   int
	pages        [][]page
	pageRequests int64
}
func newPageCache() *pageCache {
	pc := &pageCache{
		free:   make([]*page, 0, initialAllocSize),
		pcSize: initialAllocSize,
	}
	pc.grow()
	return pc
}

grow

分配空间

func (c *pageCache) grow() {
	pages := make([]page, c.pcSize)
	c.pages = append(c.pages, pages)
	c.size += c.pcSize
	for i := range pages {
		c.free = append(c.free, &pages[i])
	}
	if *memLog {
		log.Println("PageCache: created", c.pcSize, "new pages")
	}
	c.pcSize *= 2
}

next

返回一个感觉的page对象

func (c *pageCache) next(ts time.Time) (p *page) {
	if *memLog {
		c.pageRequests++
		if c.pageRequests&0xFFFF == 0 {
			log.Println("PageCache:", c.pageRequests, "requested,", c.used, "used,", len(c.free), "free")
		}
	}
	if len(c.free) == 0 {
		c.grow()
	}
	i := len(c.free) - 1
	p, c.free = c.free[i], c.free[:i]
	p.prev = nil
	p.next = nil
	p.Reassembly = Reassembly{Bytes: p.buf[:0], Seen: ts}
	c.used++
	return p
}

replace

替换一个page

func (c *pageCache) replace(p *page) {
	c.used--
	c.free = append(c.free, p)
}

page

用来存储未处理的TCP数据包(无序包)。

未使用的page存储在页面缓存中,并从pageCache中返回,避免内存分配。

使用的page存储在connection中的双向链表中。

type page struct {
	Reassembly
	seq        Sequence
	index      int
	prev, next *page
	buf        [pageBytes]byte
}

byteSpan

func byteSpan(expected, received Sequence, bytes []byte) (toSend []byte, next Sequence) {
	if expected == invalidSequence {
		return bytes, received.Add(len(bytes))
	}
	span := int(received.Difference(expected))
	if span <= 0 {
		return bytes, received.Add(len(bytes))
	} else if len(bytes) < span {
		return nil, expected
	}
	return bytes[span:], expected.Add(len(bytes) - span)
}

推荐一个零声学院免费教程,个人觉得老师讲得不错,
分享给大家:Linux,Nginx,ZeroMQ,MySQL,Redis,
fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,
TCP/IP,协程,DPDK等技术内容,点击立即学习

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/137616.html

(0)
飞熊的头像飞熊bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!