You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
gophi/core/conn.go

134 lines
3.2 KiB
Go

package core
import (
"bufio"
"io"
"net"
"time"
)
var (
// connReadDeadline specifies the connection read deadline
connReadDeadline time.Duration
// connWriteDeadline specifies the connection write deadline
connWriteDeadline time.Duration
// connReadMax specifies the connection read max (in bytes)
connReadMax int
)
// deadlineConn wraps net.Conn to set the read / write deadlines on each access
type deadlineConn struct {
conn net.Conn
}
// Read wraps the underlying net.Conn read function, setting read deadline on each access
func (c *deadlineConn) Read(b []byte) (int, error) {
c.conn.SetReadDeadline(time.Now().Add(connReadDeadline))
return c.conn.Read(b)
}
// Read wraps the underlying net.Conn write function, setting write deadline on each access
func (c *deadlineConn) Write(b []byte) (int, error) {
c.conn.SetWriteDeadline(time.Now().Add(connWriteDeadline))
return c.conn.Write(b)
}
// Close directly wraps underlying net.Conn close function
func (c *deadlineConn) Close() error {
return c.conn.Close()
}
// Conn wraps a DeadlineConn with a buffer
type conn struct {
br *bufio.Reader
bw *bufio.Writer
cl io.Closer
}
// wrapConn wraps a net.Conn in deadlineConn, then within conn and returns the result
func wrapConn(c net.Conn) *conn {
deadlineConn := &deadlineConn{c}
return &conn{
br: connBufferedReaderPool.Get(deadlineConn),
bw: connBufferedWriterPool.Get(deadlineConn),
cl: deadlineConn,
}
}
// ReadLine reads a single line and returns the result, or nil and error
func (c *conn) ReadLine() ([]byte, Error) {
// return slice
var b []byte
// Read! Use this method so we can
// ensure we don't perform some insanely
// long read
for len(b) < connReadMax {
// read the line
line, isPrefix, err := c.br.ReadLine()
if err != nil {
return nil, WrapError(ConnReadErr, err)
}
// append line contents to return slice
b = append(b, line...)
// if finished reading, break out
if !isPrefix {
break
}
}
return b, nil
}
// WriteBytes writes a byte slice to the buffer and returns error status
func (c *conn) Write(b []byte) Error {
_, err := c.bw.Write(b)
if err != nil {
return WrapError(ConnWriteErr, err)
}
return nil
}
// ReadFrom writes to the buffer from a reader and returns error status
func (c *conn) ReadFrom(r io.Reader) Error {
// Since this buffer wraps deadlineConn, which DOES NOT have
// a ReadFrom method implemented, it will force the buffer to
// use it's own internal byte buffer along with the deadlineConn's
// Write implementation (forcing the deadline to be regularly updated)
_, err := c.bw.ReadFrom(r)
if err != nil {
return WrapError(ConnWriteErr, err)
}
return nil
}
// Writer returns the underlying buffer wrapped conn writer
func (c *conn) Writer() io.Writer {
return c.bw
}
// Close flushes the underlying buffer, closes the conn then puts
// the sync.Pool conn buffers back
func (c *conn) Close() Error {
// Flush + close
err1 := c.bw.Flush()
err2 := c.cl.Close()
// Put buffers back
connBufferedReaderPool.Put(c.br)
connBufferedWriterPool.Put(c.bw)
// If either errors, wrap. Else return none
if err2 != nil {
return WrapError(ConnCloseErr, err2)
}
if err1 != nil {
return WrapError(ConnWriteErr, err1)
}
return nil
}