基本实现

  1. 实现带缓存的I/O模块
  2. 对io.Reader和io.Writer进行封装
    1. 实现带缓存
    2. 对字符的I/O操作友好

Reader(缓存输入)

实现了一个带缓存的io.Reader

数据结构

1
2
3
4
5
6
7
8
type Reader struct {
	buf          []byte    // 缓存
	rd           io.Reader // 被包装的io.Reader对象
	r, w         int       // buf切片上的索引,r代表读取的索引,w代表写入的索引
	err          error     // 用来存储该Reader在执行中遇到的error
	lastByte     int       // 已读取的最后一个字节; -1 代表无效
	lastRuneSize int       // 已读取的最后一个rune; -1 代表无效
}

默认信息

1
2
const minReadBufferSize = 16 // buf最小长度为16字节
const maxConsecutiveEmptyReads = 100 // 一次读取,在连续100次没读取到会认为失败

关键代码逻辑

func (b *Reader) fill()

读取新的一个数据块,尽可能读满buf;只会读取成功读取一次,允许maxConsecutiveEmptyReads次读取失败,但是第一次读取成功后就不再读取,返回;

代码
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// fill reads a new chunk into the buffer.
func (b *Reader) fill() {
	// 把有效的数据迁移到buf的头部
	if b.r > 0 {
		copy(b.buf, b.buf[b.r:b.w])
		b.w -= b.r
		b.r = 0
	}

	if b.w >= len(b.buf) {
		panic("bufio: tried to fill full buffer")
	}

	// 读取一块新的数据,但是限制读取失败的次数
	for i := maxConsecutiveEmptyReads; i > 0; i-- {
		n, err := b.rd.Read(b.buf[b.w:])
		if n < 0 {
			panic(errNegativeRead)
		}
		b.w += n
		if err != nil {
			b.err = err
			return
		}
		if n > 0 {
			return
		}
	}
	b.err = io.ErrNoProgress
}

func (b *Reader) Peek(n int) ([]byte, error)

返回未读取数据的前n个字节,并且不会去改变buf的读取状态,单可能会改变buf的写入状态。 写入状态(b.w)和buf被更改的情况:

  1. n > b.w - b.r, 不断循环调用fill操作,要么buf读满,要么b.w - b.r >= n,要么报错(可能是对应的reader数据读完了)
  2. n < b.w - b.r, 直接返回对应数据
代码
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (b *Reader) Peek(n int) ([]byte, error) {
	if n < 0 {
		return nil, ErrNegativeCount
	}

	b.lastByte = -1
	b.lastRuneSize = -1

	for b.w-b.r < n && b.w-b.r < len(b.buf) && b.err == nil {
		b.fill() // 要么读满buf,要么buf的数据数量比n大
	}

	if n > len(b.buf) {
		return b.buf[b.r:b.w], ErrBufferFull
	}

	// 0 <= n <= len(b.buf)
	var err error
	if avail := b.w - b.r; avail < n {
		// 最终读取的数据不够n个,有可能是EOF造成的,也可能是其他报错造成的
		n = avail
		err = b.readErr()
		if err == nil {
			err = ErrBufferFull
		}
	}
	return b.buf[b.r : b.r+n], err
}

func (b *Reader) Discard(n int) (discarded int, err error)

直接丢弃未读取数据的前n个字节,并且返回实际丢弃的字节数量,有点类似于Peek函数,如果返回的discarded != n,可以通过err知道为什么不相等

代码
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (b *Reader) Discard(n int) (discarded int, err error) {
	if n < 0 {
		return 0, ErrNegativeCount
	}
	if n == 0 {
		return
	}
    // 循环去丢弃数据,如果buf里面不够,那么就去fill buf,直到满足remain == 0,或者中途遇见err
	remain := n
	for {
		skip := b.Buffered()
		if skip == 0 {
			b.fill()
			skip = b.Buffered()
		}
		if skip > remain {
			skip = remain
		}
		b.r += skip
		remain -= skip
		if remain == 0 {
			return n, nil
		}
		if b.err != nil {
			return n - remain, b.readErr()
		}
	}
}

func (b *Reader) Read(p []byte) (n int, err error)

将Reader包装的io.Reader的数据迁移到p中,迁移的数据量不会大于len(p)

如果buf为空,只会向io.Reader读取一次,无论有没有读取到,都不会进行第二次,这个是Read与fill的区别

如果buf本身不为空,那么p只能读取buf里面的数据,无论buf的长度是不是小于p的容量

n < len(p)的情况:

  1. buf为空,本身快要到达EOF
  2. buf不为空,buf的数据长度小于p的容量
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func (b *Reader) Read(p []byte) (n int, err error) {
	n = len(p)
	if n == 0 {
		if b.Buffered() > 0 {
			return 0, nil
		}
		return 0, b.readErr()
	}
    // buf为空的情况下
	if b.r == b.w {
		if b.err != nil {
			return 0, b.readErr()
		}
        // p 的长度大于 buf 的长度,直接让把rd的数据读取到p里面,避免中间层buf的消耗
		if len(p) >= len(b.buf) {
			// Large read, empty buffer.
			// Read directly into p to avoid copy.
			n, b.err = b.rd.Read(p)
			if n < 0 {
				panic(errNegativeRead)
			}
			if n > 0 {
				b.lastByte = int(p[n-1])
				b.lastRuneSize = -1
			}
			return n, b.readErr()
		}
		// One read.
		// Do not use b.fill, which will loop.
        // 只读取一次,不会去使用b.fill,这个函数会造成循环,读取多次
		b.r = 0
		b.w = 0
		n, b.err = b.rd.Read(b.buf)
		if n < 0 {
			panic(errNegativeRead)
		}
		if n == 0 {
			return 0, b.readErr()
		}
		b.w += n
	}

	// copy as much as we can
    // 数据读取完毕或者本身不需要读取数据,将buf里面的数据转移到p中
	n = copy(p, b.buf[b.r:b.w])
	b.r += n
	b.lastByte = int(b.buf[b.r-1])
	b.lastRuneSize = -1
	return n, nil
}

func (b *Reader) ReadSlice(delim byte) (line []byte, err error)

顾名思义哈,根据delim(切割符)来读取数据,该函数会读取rd的数据;

以下情况会返回:

  1. 直到读取到第一个切割符
  2. 读取到EOF
  3. 本身是有err的,buf是多少就读多少,可能是应对,本身在fill时读取到EOF,但是buf中还有剩余
  4. buf是满的,但是找不到分隔符,会直接将整个buf导出,并且带有ErrBufferFull,这里需要客户程序自己去做拼接,但是,这个函数算是比较底层的实现,可以参考ReadLine()collectFragments()等函数,他们对ErrBufferFull做了处理
代码
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (b *Reader) ReadSlice(delim byte) (line []byte, err error) {
	s := 0 // search start index
	for {
		// Search buffer.
		if i := bytes.IndexByte(b.buf[b.r+s:b.w], delim); i >= 0 {
			i += s
			line = b.buf[b.r : b.r+i+1]
			b.r += i + 1
			break
		}

		// Pending error?
		if b.err != nil {
			line = b.buf[b.r:b.w]
			b.r = b.w
			err = b.readErr()
			break
		}

		// Buffer full?
		if b.Buffered() >= len(b.buf) {
			b.r = b.w
			line = b.buf
			err = ErrBufferFull
			break
		}

		s = b.w - b.r // do not rescan area we scanned before

		b.fill() // buffer is not full
	}

	// Handle last byte, if any.
	if i := len(line) - 1; i >= 0 {
		b.lastByte = int(line[i])
		b.lastRuneSize = -1
	}

	return
}

Writer (缓存输出)

数据结构

1
2
3
4
5
6
type Writer struct {
	err error  // 用来存储该Writer在执行中遇到的error
	buf []byte // 缓存
	n   int    // 当前缓存使用了的空间数量,那么说明buf的存储一定是从0开始的
	wr  io.Writer
}

关键代码逻辑

func (b *Writer) Flush() error

将buf中的数据写入到对应的wr

代码
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (b *Writer) Flush() error {
	if b.err != nil {
		return b.err
	}
	if b.n == 0 {
		return nil
	}
    // 将buf中的数据写入
    // 如果没有完全写入,会将未写入的数据迁移到buf的头部
	n, err := b.wr.Write(b.buf[0:b.n])
	if n < b.n && err == nil {
		err = io.ErrShortWrite
	}
	if err != nil {
		if n > 0 && n < b.n {
			copy(b.buf[0:b.n-n], b.buf[n:b.n])
		}
		b.n -= n
		b.err = err
		return err
	}
	b.n = 0
	return nil
}