You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
gophi/core/conn.go

153 lines
3.5 KiB
Go

package core
import (
"bufio"
"bytes"
"io"
"net"
"time"
"github.com/grufwub/go-errors"
)
// deadlineConn wraps net.Conn to set the read / write deadlines on each access
type deadlineConn struct {
conn net.Conn
rd *time.Duration
wd *time.Duration
}
// Read wraps the underlying net.Conn read function, setting read deadline on each access
func (c *deadlineConn) Read(b []byte) (int, error) {
c.conn.SetReadDeadline(time.Now().Add(*c.rd))
return c.conn.Read(b)
}
// Read wraps the underlying net.Conn write function, setting write deadline on each access
func (c *deadlineConn) Write(b []byte) (int, error) {
c.conn.SetWriteDeadline(time.Now().Add(*c.wd))
return c.conn.Write(b)
}
// Close directly wraps underlying net.Conn close function
func (c *deadlineConn) Close() error {
return c.conn.Close()
}
// Conn wraps a DeadlineConn with a buffer
type conn struct {
b []byte
bw *bufio.Writer
c net.Conn
}
// wrapConn wraps a net.Conn in deadlineConn, then within conn and returns the result
func wrapConn(c net.Conn) *conn {
deadlineConn := &deadlineConn{
conn: c,
rd: &connReadDeadline,
wd: &connWriteDeadline,
}
return &conn{
b: connRequestBufferPool.Get(),
bw: connBufferedWriterPool.Get(deadlineConn),
c: c,
}
}
// Conn returns the underlying net.Conn
func (c *conn) Conn() net.Conn {
return c.c
}
// ReadLine reads a single line and returns the result, or nil and error
func (c *conn) ReadLine() ([]byte, error) {
totalCount, end, emptyRead := 0, -1, 0
for {
// Perform a single read into the buffer
count, err := c.c.Read(c.b[totalCount:])
if err != nil {
return nil, errors.With(err).WrapWithin(ErrConnRead)
}
// Handle empty reads...
if count < 1 {
// After too many empty reads just return error
if !(emptyRead < 100) {
return nil, errors.With(io.ErrNoProgress).WrapWithin(ErrConnRead)
}
// Iterate empty read counter
emptyRead++
continue
}
// Only accept up to new-line char
end = bytes.IndexByte(c.b[totalCount:totalCount+count], '\n')
if end != -1 {
// Drop any extra '\r'
if end > 0 && c.b[end-1] == '\r' {
end--
}
// Iterate total count up to the new-line
totalCount += end
break
}
// Iter total count
totalCount += count
// If we have hit read max, return error
if totalCount >= len(c.b) {
return nil, ErrInvalidRequest
}
}
return c.b[:totalCount], nil
}
// WriteBytes writes a byte slice to the buffer and returns error status
func (c *conn) Write(b []byte) error {
_, err := c.bw.Write(b)
if err != nil {
return errors.With(err).WrapWithin(ErrConnWrite)
}
return nil
}
// ReadFrom writes to the buffer from a reader and returns error status
func (c *conn) ReadFrom(r io.Reader) error {
// Since this buffer wraps deadlineConn, which DOES NOT have
// a ReadFrom method implemented, it will force the buffer to
// use it's own internal byte buffer along with the deadlineConn's
// Write implementation (forcing the deadline to be regularly updated)
_, err := c.bw.ReadFrom(r)
if err != nil {
return errors.With(err).WrapWithin(ErrConnWrite)
}
return nil
}
// Writer returns the underlying buffer wrapped conn writer
func (c *conn) Writer() io.Writer {
return c.bw
}
// Close flushes the underlying buffer, closes the conn then puts
// the sync.Pool conn buffers back
func (c *conn) Close() error {
// Flush + close
c.bw.Flush()
err := c.c.Close()
// Put buffers back
connBufferedWriterPool.Put(c.bw)
connRequestBufferPool.Put(c.b)
// Return error (if exists)
if err != nil {
return errors.With(err).WrapWithin(ErrConnClose)
}
return nil
}