基本实现
- 实现带缓存的I/O模块
- 对io.Reader和io.Writer进行封装
- 实现带缓存
- 对字符的I/O操作友好
Reader(缓存输入)
实现了一个带缓存的io.Reader
数据结构
1
2
3
4
5
6
7
8
|
type Reader struct {
buf []byte // 缓存
rd io.Reader // 被包装的io.Reader对象
r, w int // buf切片上的索引,r代表读取的索引,w代表写入的索引
err error // 用来存储该Reader在执行中遇到的error
lastByte int // 已读取的最后一个字节; -1 代表无效
lastRuneSize int // 已读取的最后一个rune; -1 代表无效
}
|
默认信息
1
2
|
const minReadBufferSize = 16 // buf最小长度为16字节
const maxConsecutiveEmptyReads = 100 // 一次读取,在连续100次没读取到会认为失败
|
关键代码逻辑
func (b *Reader) fill()
读取新的一个数据块,尽可能读满buf;只会读取成功读取一次,允许maxConsecutiveEmptyReads
次读取失败,但是第一次读取成功后就不再读取,返回;
代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
// fill reads a new chunk into the buffer.
func (b *Reader) fill() {
// 把有效的数据迁移到buf的头部
if b.r > 0 {
copy(b.buf, b.buf[b.r:b.w])
b.w -= b.r
b.r = 0
}
if b.w >= len(b.buf) {
panic("bufio: tried to fill full buffer")
}
// 读取一块新的数据,但是限制读取失败的次数
for i := maxConsecutiveEmptyReads; i > 0; i-- {
n, err := b.rd.Read(b.buf[b.w:])
if n < 0 {
panic(errNegativeRead)
}
b.w += n
if err != nil {
b.err = err
return
}
if n > 0 {
return
}
}
b.err = io.ErrNoProgress
}
|
func (b *Reader) Peek(n int) ([]byte, error)
返回未读取数据的前n个字节,并且不会去改变buf的读取状态,单可能会改变buf的写入状态。
写入状态(b.w)和buf被更改的情况:
- n > b.w - b.r, 不断循环调用
fill
操作,要么buf读满,要么b.w - b.r >= n,要么报错(可能是对应的reader数据读完了)
- n < b.w - b.r, 直接返回对应数据
代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
func (b *Reader) Peek(n int) ([]byte, error) {
if n < 0 {
return nil, ErrNegativeCount
}
b.lastByte = -1
b.lastRuneSize = -1
for b.w-b.r < n && b.w-b.r < len(b.buf) && b.err == nil {
b.fill() // 要么读满buf,要么buf的数据数量比n大
}
if n > len(b.buf) {
return b.buf[b.r:b.w], ErrBufferFull
}
// 0 <= n <= len(b.buf)
var err error
if avail := b.w - b.r; avail < n {
// 最终读取的数据不够n个,有可能是EOF造成的,也可能是其他报错造成的
n = avail
err = b.readErr()
if err == nil {
err = ErrBufferFull
}
}
return b.buf[b.r : b.r+n], err
}
|
func (b *Reader) Discard(n int) (discarded int, err error)
直接丢弃未读取数据的前n个字节,并且返回实际丢弃的字节数量,有点类似于Peek
函数,如果返回的discarded != n,可以通过err知道为什么不相等
代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
func (b *Reader) Discard(n int) (discarded int, err error) {
if n < 0 {
return 0, ErrNegativeCount
}
if n == 0 {
return
}
// 循环去丢弃数据,如果buf里面不够,那么就去fill buf,直到满足remain == 0,或者中途遇见err
remain := n
for {
skip := b.Buffered()
if skip == 0 {
b.fill()
skip = b.Buffered()
}
if skip > remain {
skip = remain
}
b.r += skip
remain -= skip
if remain == 0 {
return n, nil
}
if b.err != nil {
return n - remain, b.readErr()
}
}
}
|
func (b *Reader) Read(p []byte) (n int, err error)
将Reader包装的io.Reader的数据迁移到p中,迁移的数据量不会大于len(p)
如果buf为空,只会向io.Reader读取一次,无论有没有读取到,都不会进行第二次,这个是Read与fill的区别
如果buf本身不为空,那么p只能读取buf里面的数据,无论buf的长度是不是小于p的容量
n < len(p)的情况:
- buf为空,本身快要到达EOF
- buf不为空,buf的数据长度小于p的容量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
func (b *Reader) Read(p []byte) (n int, err error) {
n = len(p)
if n == 0 {
if b.Buffered() > 0 {
return 0, nil
}
return 0, b.readErr()
}
// buf为空的情况下
if b.r == b.w {
if b.err != nil {
return 0, b.readErr()
}
// p 的长度大于 buf 的长度,直接让把rd的数据读取到p里面,避免中间层buf的消耗
if len(p) >= len(b.buf) {
// Large read, empty buffer.
// Read directly into p to avoid copy.
n, b.err = b.rd.Read(p)
if n < 0 {
panic(errNegativeRead)
}
if n > 0 {
b.lastByte = int(p[n-1])
b.lastRuneSize = -1
}
return n, b.readErr()
}
// One read.
// Do not use b.fill, which will loop.
// 只读取一次,不会去使用b.fill,这个函数会造成循环,读取多次
b.r = 0
b.w = 0
n, b.err = b.rd.Read(b.buf)
if n < 0 {
panic(errNegativeRead)
}
if n == 0 {
return 0, b.readErr()
}
b.w += n
}
// copy as much as we can
// 数据读取完毕或者本身不需要读取数据,将buf里面的数据转移到p中
n = copy(p, b.buf[b.r:b.w])
b.r += n
b.lastByte = int(b.buf[b.r-1])
b.lastRuneSize = -1
return n, nil
}
|
func (b *Reader) ReadSlice(delim byte) (line []byte, err error)
顾名思义哈,根据delim(切割符)来读取数据,该函数会读取rd的数据;
以下情况会返回:
- 直到读取到第一个切割符
- 读取到EOF
- 本身是有err的,buf是多少就读多少,可能是应对,本身在fill时读取到EOF,但是buf中还有剩余
- buf是满的,但是找不到分隔符,会直接将整个buf导出,并且带有
ErrBufferFull
,这里需要客户程序自己去做拼接,但是,这个函数算是比较底层的实现,可以参考ReadLine()
,collectFragments()
等函数,他们对ErrBufferFull
做了处理
代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
func (b *Reader) ReadSlice(delim byte) (line []byte, err error) {
s := 0 // search start index
for {
// Search buffer.
if i := bytes.IndexByte(b.buf[b.r+s:b.w], delim); i >= 0 {
i += s
line = b.buf[b.r : b.r+i+1]
b.r += i + 1
break
}
// Pending error?
if b.err != nil {
line = b.buf[b.r:b.w]
b.r = b.w
err = b.readErr()
break
}
// Buffer full?
if b.Buffered() >= len(b.buf) {
b.r = b.w
line = b.buf
err = ErrBufferFull
break
}
s = b.w - b.r // do not rescan area we scanned before
b.fill() // buffer is not full
}
// Handle last byte, if any.
if i := len(line) - 1; i >= 0 {
b.lastByte = int(line[i])
b.lastRuneSize = -1
}
return
}
|
Writer (缓存输出)
数据结构
1
2
3
4
5
6
|
type Writer struct {
err error // 用来存储该Writer在执行中遇到的error
buf []byte // 缓存
n int // 当前缓存使用了的空间数量,那么说明buf的存储一定是从0开始的
wr io.Writer
}
|
关键代码逻辑
func (b *Writer) Flush() error
将buf中的数据写入到对应的wr
代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
func (b *Writer) Flush() error {
if b.err != nil {
return b.err
}
if b.n == 0 {
return nil
}
// 将buf中的数据写入
// 如果没有完全写入,会将未写入的数据迁移到buf的头部
n, err := b.wr.Write(b.buf[0:b.n])
if n < b.n && err == nil {
err = io.ErrShortWrite
}
if err != nil {
if n > 0 && n < b.n {
copy(b.buf[0:b.n-n], b.buf[n:b.n])
}
b.n -= n
b.err = err
return err
}
b.n = 0
return nil
}
|