Skip to content

Commit

Permalink
Refactor client has function.
Browse files Browse the repository at this point in the history
  • Loading branch information
afshin committed Dec 28, 2016
1 parent 57a1ca8 commit 980406d
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 19 deletions.
32 changes: 17 additions & 15 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,17 +63,17 @@ func (c *Client) add(group, name, node, service, version string) error {

// Blocks until the required services are available to the client.
// Returns true if it had to block and false if it returns immediately.
func (c *Client) block(services ...string) bool {
func (c *Client) block(required map[string]struct{}, services []string) bool {
// Even though the client may have just checked to see if services exist,
// the check is performed here in case there was a delay waiting for the
// additions mutex to become available.
if c.has(services...) {
if c.has(required) {
return false
}
c.log.Blocked("sleuth: waiting for client to find %s", services)
c.additions.activate()
for range c.additions.stream {
if c.has(services...) {
if c.has(required) {
break
}
}
Expand Down Expand Up @@ -161,21 +161,15 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
return nil, newError(errTimeout, "%s {%s}%s timed out", req.Method, to, url)
}

func (c *Client) has(services ...string) bool {
// Check to see if required services are already registered.
verified := make(map[string]bool)
func (c *Client) has(required map[string]struct{}) bool {
// Check to see if required services already exist locally.
available := 0
for _, service := range services {
verified[service] = false
}
total := len(verified)
for service := range verified {
for service := range required {
if peers, ok := c.services.get(service); ok && peers.available() {
verified[service] = true
available += 1
}
}
return available == total
return available == len(required)
}

func (c *Client) listen(handle string, listener chan *http.Response) {
Expand Down Expand Up @@ -236,8 +230,16 @@ func (c *Client) WaitFor(services ...string) error {
if c.closed {
return newError(errClosed, "client is closed").escalate(errWait)
}
if !c.has(services...) {
c.block(services...)
// Collapse services and make sure all values are unique.
required := make(map[string]struct{})
for _, service := range services {
required[service] = struct{}{}
}
if len(required) != len(services) {
c.log.Warn("sleuth: %v contains duplicates [%d]", services, warnDuplicate)
}
if !c.has(required) {
c.block(required, services)
}
return nil
}
Expand Down
1 change: 1 addition & 0 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const (
// Warnings are in the 801-899 range.
warnInterface = 801
warnClose = 802
warnDuplicate = 803
// Errors are in the 901-999 range.
errNew = 901
errDispatch = 902
Expand Down
10 changes: 6 additions & 4 deletions sleuth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func TestZipUnzip(t *testing.T) {

func TestIntegratedCycle(t *testing.T) {
addr := "sleuth-test-server-one"
client, err := New(&Config{group: GROUP})
client, err := New(&Config{group: GROUP, LogLevel: "warn"})
if err != nil {
t.Errorf("client instantiation failed: %s", err.Error())
return
Expand All @@ -450,11 +450,13 @@ func TestIntegratedCycle(t *testing.T) {
t.Errorf("server close failed: %s", err.Error())
}
}(server, t)
// Wait until the server becomes available.
client.WaitFor(addr)
// Wait until the server becomes available. Confirm dupicate warning.
client.WaitFor(addr, addr)
// Set timeout to 10 seconds to accommodate slow test spin-up.
client.Timeout = time.Second * 10
if client.block(addr) {
required := make(map[string]struct{})
required[addr] = struct{}{}
if client.block(required, []string{addr}) {
t.Errorf("call to block should have returned immediately")
}
body := "foo bar baz"
Expand Down

0 comments on commit 980406d

Please sign in to comment.