type RStream struct Stream buffered byte locker sync Mutex func rst RS

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
type RStream struct {
*Stream
buffered []byte
locker *sync.Mutex
}
func (rst *RStream) copy(from, to []byte) (n int, err error) {
length := len(from)
if length > len(to) {
length = len(from)
rst.buffered = from[len(to):]
}
copy(to, from)
return length, nil
}
func (rst *RStream) Read(p []byte) (n int, err error) {
rst.locker.Lock()
defer rst.locker.Unlock()
if rst.buffered != nil && len(rst.buffered) > 0 {
return rst.copy(rst.buffered, p)
}
var msg goream.Message
var fail goream.Fail
for {
msg, fail = rst.Stream.Pull()
if fail != nil {
logrus.WithField("place", "RSTream.Read").Error(fail)
return 0, fail
}
if msg.GetType() == protocol.FRAME_TYPE_RESPONSE {
break
}
}
return rst.copy(msg.GetBody(), p)
}