Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion memcache/memcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
"fmt"
"io"
"net"

"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -129,6 +129,14 @@ func NewFromSelector(ss ServerSelector) *Client {
return &Client{selector: ss}
}

// Stats contains statistics about connections being used by a client.
type Stats struct {
// The current number of active connections in use.
ActiveConns int
// The current number of idle connections in the pool.
IdleConns int
}

// Client is a memcache client.
// It is safe for unlocked use by multiple concurrent goroutines.
type Client struct {
Expand All @@ -144,6 +152,11 @@ type Client struct {
// be set to a number higher than your peak parallel requests.
MaxIdleConns int

// number of currently used connections (must use atomic)
activeConns int32
// number of currently idle connections (must use atomic)
idleConns int32

selector ServerSelector

lk sync.Mutex
Expand Down Expand Up @@ -193,6 +206,7 @@ func (cn *conn) extendDeadline() {
// cache miss). The purpose is to not recycle TCP connections that
// are bad.
func (cn *conn) condRelease(err *error) {
atomic.AddInt32(&cn.c.activeConns, -1)
if *err == nil || resumableError(*err) {
cn.release()
} else {
Expand All @@ -212,6 +226,7 @@ func (c *Client) putFreeConn(addr net.Addr, cn *conn) {
return
}
c.freeconn[addr.String()] = append(freelist, cn)
atomic.AddInt32(&c.idleConns, 1)
}

func (c *Client) getFreeConn(addr net.Addr) (cn *conn, ok bool) {
Expand All @@ -226,6 +241,7 @@ func (c *Client) getFreeConn(addr net.Addr) (cn *conn, ok bool) {
}
cn = freelist[len(freelist)-1]
c.freeconn[addr.String()] = freelist[:len(freelist)-1]
atomic.AddInt32(&c.idleConns, -1)
return cn, true
}

Expand Down Expand Up @@ -276,6 +292,7 @@ func (c *Client) getConn(addr net.Addr) (*conn, error) {
cn, ok := c.getFreeConn(addr)
if ok {
cn.extendDeadline()
atomic.AddInt32(&c.activeConns, 1)
return cn, nil
}
nc, err := c.dial(addr)
Expand All @@ -289,6 +306,7 @@ func (c *Client) getConn(addr net.Addr) (*conn, error) {
c: c,
}
cn.extendDeadline()
atomic.AddInt32(&c.activeConns, 1)
return cn, nil
}

Expand Down Expand Up @@ -490,6 +508,14 @@ func (c *Client) GetMulti(keys []string) (map[string]*Item, error) {
return m, err
}

// Stats returns this client's current statistics.
func (c *Client) Stats() Stats {
return Stats{
ActiveConns: int(atomic.LoadInt32(&c.activeConns)),
IdleConns: int(atomic.LoadInt32(&c.idleConns)),
}
}

// parseGetResponse reads a GET response from r and calls cb for each
// read and allocated Item
func parseGetResponse(r *bufio.Reader, cb func(*Item)) error {
Expand Down
11 changes: 10 additions & 1 deletion memcache/memcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"time"
)

const testServer = "localhost:11211"
const testServer = "127.0.0.1:11211"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NB: I had to use this to connect to my loopback-only memcached (we don't typically want local services to bind to external interfaces). I can revert this line if you want, but thought it might actually be nice to include.


func setup(t *testing.T) bool {
c, err := net.Dial("tcp", testServer)
Expand Down Expand Up @@ -212,6 +212,15 @@ func testWithClient(t *testing.T, c *Client) {
// Test Ping
err = c.Ping()
checkErr(err, "error ping: %s", err)

// Stats
statz := c.Stats()
if statz.ActiveConns > 0 {
t.Errorf("expected 0 active conns, got %d", statz.ActiveConns)
}
if statz.IdleConns < 1 {
t.Errorf("expected at least 1 idle conns, got %d", statz.IdleConns)
}
}

func testTouchWithClient(t *testing.T, c *Client) {
Expand Down