Issue 858: fix timeout waiting for port connection, and abort when node!=running

This commit is contained in:
Aled Sage 2012-05-11 10:09:55 +01:00
parent a2c8993592
commit 612f8f2bba
5 changed files with 194 additions and 30 deletions

View File

@ -21,11 +21,18 @@ package org.jclouds.compute.functions;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import javax.inject.Named;
import javax.inject.Singleton;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.predicates.RetryIfSocketNotYetOpen;
import org.jclouds.compute.reference.ComputeServiceConstants;
import org.jclouds.compute.reference.ComputeServiceConstants.Timeouts;
import org.jclouds.compute.util.ComputeServiceUtils;
import org.jclouds.logging.Logger;
import org.jclouds.predicates.SocketOpen;
import org.jclouds.ssh.SshClient;
import com.google.common.base.Function;
@ -39,13 +46,20 @@ import com.google.inject.Inject;
*/
@Singleton
public class CreateSshClientOncePortIsListeningOnNode implements Function<NodeMetadata, SshClient> {
@Resource
@Named(ComputeServiceConstants.COMPUTE_LOGGER)
protected Logger logger = Logger.NULL;
@Inject(optional = true)
SshClient.Factory sshFactory;
private final RetryIfSocketNotYetOpen socketTester;
private final SocketOpen socketTester;
private final long timeoutMs;
@Inject
public CreateSshClientOncePortIsListeningOnNode(RetryIfSocketNotYetOpen socketTester) {
public CreateSshClientOncePortIsListeningOnNode(SocketOpen socketTester, Timeouts timeouts) {
this.socketTester = socketTester;
this.timeoutMs = timeouts.portOpen;
}
@Override
@ -55,7 +69,8 @@ public class CreateSshClientOncePortIsListeningOnNode implements Function<NodeMe
checkNotNull(node.getCredentials().identity, "no login identity found for node %s", node.getId());
checkNotNull(node.getCredentials().credential, "no credential found for %s on node %s", node
.getCredentials().identity, node.getId());
HostAndPort socket = ComputeServiceUtils.findReachableSocketOnNode(socketTester, node, node.getLoginPort());
HostAndPort socket = ComputeServiceUtils.findReachableSocketOnNode(socketTester, node, node.getLoginPort(),
timeoutMs, TimeUnit.MILLISECONDS, logger);
return sshFactory.create(socket, node.getCredentials());
}
}

View File

@ -27,6 +27,7 @@ import static org.jclouds.compute.util.ComputeServiceUtils.findReachableSocketOn
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Resource;
@ -38,11 +39,11 @@ import org.jclouds.compute.domain.ExecResponse;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.domain.NodeState;
import org.jclouds.compute.options.TemplateOptions;
import org.jclouds.compute.predicates.RetryIfSocketNotYetOpen;
import org.jclouds.compute.reference.ComputeServiceConstants;
import org.jclouds.compute.reference.ComputeServiceConstants.Timeouts;
import org.jclouds.javax.annotation.Nullable;
import org.jclouds.logging.Logger;
import org.jclouds.predicates.SocketOpen;
import org.jclouds.scriptbuilder.domain.Statement;
import com.google.common.base.Function;
@ -71,7 +72,7 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Cal
private final Predicate<AtomicReference<NodeMetadata>> nodeRunning;
private final InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory;
private final RetryIfSocketNotYetOpen socketTester;
private final SocketOpen socketTester;
private final Timeouts timeouts;
@Nullable
@ -87,7 +88,7 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Cal
@AssistedInject
public CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap(
@Named("NODE_RUNNING") Predicate<AtomicReference<NodeMetadata>> nodeRunning,
RetryIfSocketNotYetOpen socketTester, Timeouts timeouts,
SocketOpen socketTester, Timeouts timeouts,
Function<TemplateOptions, Statement> templateOptionsToStatement,
InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory, @Assisted TemplateOptions options,
@Assisted AtomicReference<NodeMetadata> node, @Assisted Set<NodeMetadata> goodNodes,
@ -109,7 +110,7 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Cal
@AssistedInject
public CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap(
@Named("NODE_RUNNING") Predicate<AtomicReference<NodeMetadata>> nodeRunning, GetNodeMetadataStrategy getNode,
RetryIfSocketNotYetOpen socketTester, Timeouts timeouts,
SocketOpen socketTester, Timeouts timeouts,
Function<TemplateOptions, Statement> templateOptionsToStatement,
InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory, @Assisted TemplateOptions options,
@Assisted Set<NodeMetadata> goodNodes, @Assisted Map<NodeMetadata, Exception> badNodes,
@ -153,7 +154,8 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Cal
}
}
if (options.getPort() > 0) {
findReachableSocketOnNode(socketTester.seconds(options.getSeconds()), node.get(), options.getPort());
findReachableSocketOnNode(socketTester, nodeRunning, node.get(), options.getPort(),
options.getSeconds(), TimeUnit.SECONDS, logger);
}
}
logger.debug("<< customized node(%s)", originalId);

View File

@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.getStackTraceAsString;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.find;
import static com.google.common.collect.Iterables.size;
import static com.google.common.collect.Iterables.transform;
import static org.jclouds.scriptbuilder.domain.Statements.pipeHttpResponseToBash;
@ -32,8 +31,12 @@ import java.util.Formatter;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.jclouds.compute.ComputeServiceContext;
import org.jclouds.compute.domain.ComputeMetadata;
import org.jclouds.compute.domain.Hardware;
@ -41,8 +44,10 @@ import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.domain.OsFamily;
import org.jclouds.compute.domain.Processor;
import org.jclouds.compute.domain.Volume;
import org.jclouds.compute.predicates.RetryIfSocketNotYetOpen;
import org.jclouds.http.HttpRequest;
import org.jclouds.logging.Logger;
import org.jclouds.predicates.RetryablePredicate;
import org.jclouds.predicates.SocketOpen;
import org.jclouds.scriptbuilder.domain.Statement;
import org.jclouds.scriptbuilder.domain.Statements;
@ -165,24 +170,65 @@ public class ComputeServiceUtils {
return org.jclouds.rest.Providers.getSupportedProvidersOfType(TypeToken.of(ComputeServiceContext.class));
}
public static HostAndPort findReachableSocketOnNode(RetryIfSocketNotYetOpen socketTester, final NodeMetadata node,
final int port) {
checkNodeHasIps(node);
HostAndPort socket = null;
try {
socket = find(transform(concat(node.getPublicAddresses(), node.getPrivateAddresses()),
new Function<String, HostAndPort>() {
public static HostAndPort findReachableSocketOnNode(final SocketOpen socketTester, final NodeMetadata node,
final int port, long timeoutValue, TimeUnit timeUnits, Logger logger) {
return findReachableSocketOnNode(socketTester, null, node, port, timeoutValue, timeUnits, logger);
}
@Override
public HostAndPort apply(String from) {
return HostAndPort.fromParts(from, port);
}
}), socketTester);
} catch (NoSuchElementException e) {
throw new NoSuchElementException(String.format("could not connect to any ip address port %d on node %s", port,
node));
public static HostAndPort findReachableSocketOnNode(final SocketOpen socketTester,
@Nullable final Predicate<AtomicReference<NodeMetadata>> nodeRunning, final NodeMetadata node,
final int port, long timeoutValue, TimeUnit timeUnits, Logger logger) {
checkNodeHasIps(node);
Iterable<HostAndPort> sockets = transform(concat(node.getPublicAddresses(), node.getPrivateAddresses()),
new Function<String, HostAndPort>() {
@Override
public HostAndPort apply(String from) {
return HostAndPort.fromParts(from, port);
}
});
// Specify a retry period of 1s, expressed in the same time units.
long period = timeUnits.convert(1, TimeUnit.SECONDS);
// For storing the result, as predicate will just tell us true/false
final AtomicReference<HostAndPort> result = new AtomicReference<HostAndPort>();
Predicate<Iterable<HostAndPort>> multiIpSocketTester = new Predicate<Iterable<HostAndPort>>() {
@Override
public boolean apply(Iterable<HostAndPort> sockets) {
for (HostAndPort socket : sockets) {
if (socketTester.apply(socket)) {
result.set(socket);
return true;
}
}
if (nodeRunning != null && !nodeRunning.apply(new AtomicReference<NodeMetadata>(node))) {
throw new IllegalStateException(String.format("Node %s is no longer running; aborting waiting for ip:port connection", node.getId()));
}
return false;
}
};
RetryablePredicate<Iterable<HostAndPort>> tester = new RetryablePredicate<Iterable<HostAndPort>>(
multiIpSocketTester, timeoutValue, period, timeUnits);
logger.debug(">> blocking on sockets %s for %d %s", sockets, timeoutValue, timeUnits);
boolean passed = tester.apply(sockets);
if (passed) {
logger.debug("<< socket %s opened", result);
assert result.get() != null;
return result.get();
} else {
logger.warn("<< sockets %s didn't open after %d %s", sockets, timeoutValue, timeUnits);
throw new NoSuchElementException(String.format("could not connect to any ip address port %d on node %s",
port, node));
}
return socket;
}
public static void checkNodeHasIps(NodeMetadata node) {

View File

@ -34,8 +34,8 @@ import org.jclouds.compute.domain.NodeState;
import org.jclouds.compute.functions.TemplateOptionsToStatement;
import org.jclouds.compute.options.TemplateOptions;
import org.jclouds.compute.predicates.AtomicNodeRunning;
import org.jclouds.compute.predicates.RetryIfSocketNotYetOpen;
import org.jclouds.compute.reference.ComputeServiceConstants.Timeouts;
import org.jclouds.predicates.SocketOpen;
import org.jclouds.scriptbuilder.domain.Statement;
import org.testng.Assert;
import org.testng.annotations.Test;
@ -55,7 +55,7 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest {
public void testBreakWhenNodeStillPending() {
InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory = createMock(InitializeRunScriptOnNodeOrPlaceInBadMap.Factory.class);
RetryIfSocketNotYetOpen socketTester = createMock(RetryIfSocketNotYetOpen.class);
SocketOpen socketTester = createMock(SocketOpen.class);
Timeouts timeouts = new Timeouts();
Function<TemplateOptions, Statement> templateOptionsToStatement = new TemplateOptionsToStatement();
@SuppressWarnings("unused")
@ -98,7 +98,7 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest {
public void testBreakGraceFullyWhenNodeDied() {
InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory = createMock(InitializeRunScriptOnNodeOrPlaceInBadMap.Factory.class);
RetryIfSocketNotYetOpen socketTester = createMock(RetryIfSocketNotYetOpen.class);
SocketOpen socketTester = createMock(SocketOpen.class);
Timeouts timeouts = new Timeouts();
Function<TemplateOptions, Statement> templateOptionsToStatement = new TemplateOptionsToStatement();
@SuppressWarnings("unused")

View File

@ -18,21 +18,39 @@
*/
package org.jclouds.compute.util;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.jclouds.compute.util.ComputeServiceUtils.parseVersionOrReturnEmptyString;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.net.URI;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.easymock.EasyMock;
import org.jclouds.compute.config.BaseComputeServiceContextModule;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.domain.OsFamily;
import org.jclouds.compute.predicates.SocketOpenPredicates;
import org.jclouds.compute.reference.ComputeServiceConstants;
import org.jclouds.http.HttpRequest;
import org.jclouds.json.Json;
import org.jclouds.json.config.GsonModule;
import org.jclouds.logging.Logger;
import org.jclouds.predicates.SocketOpen;
import org.testng.annotations.Test;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.net.HostAndPort;
import com.google.inject.Guice;
/**
@ -79,6 +97,89 @@ public class ComputeServiceUtilsTest {
ComputeServiceUtils.extractTargzIntoDirectory(request, "/stage/").render(
org.jclouds.scriptbuilder.domain.OsFamily.UNIX),
"curl -q -s -S -L --connect-timeout 10 --max-time 600 --retry 20 -X GET -H \"Host: adriancolehappy.s3.amazonaws.com\" -H \"Date: Sun, 12 Sep 2010 08:25:19 GMT\" -H \"Authorization: AWS 0ASHDJAS82:JASHFDA=\" https://adriancolehappy.s3.amazonaws.com/java/install |(mkdir -p /stage/ &&cd /stage/ &&tar -xpzf -)\n");
}
@Test
public void testFindReachableSocketOnNodeTimesOut() throws Exception {
final long timeoutSecs = 2;
final long timeoutMs = timeoutSecs * 1000;
final long SLOW_GRACE = 500;
final long EARLY_GRACE = 10;
SocketOpen socketTester = SocketOpenPredicates.alwaysFail;
NodeMetadata node = createMock(NodeMetadata.class);
expect(node.getPublicAddresses()).andReturn(ImmutableSet.of("1.2.3.4")).atLeastOnce();
expect(node.getPrivateAddresses()).andReturn(ImmutableSet.of("1.2.3.5")).atLeastOnce();
replay(node);
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
try {
ComputeServiceUtils.findReachableSocketOnNode(socketTester, node, 1234, timeoutMs, TimeUnit.MILLISECONDS, Logger.CONSOLE);
fail();
} catch (NoSuchElementException success) {
// expected
}
long timetaken = stopwatch.elapsedMillis();
assertTrue(timetaken >= timeoutMs-EARLY_GRACE && timetaken <= timeoutMs+SLOW_GRACE, "timetaken="+timetaken);
verify(node);
}
@Test
public void testFindReachableSocketOnNodeReturnsAvailable() throws Exception {
SocketOpen socketTester = createMock(SocketOpen.class);
expect(socketTester.apply(HostAndPort.fromParts("1.2.3.4", 22))).andReturn(false);
expect(socketTester.apply(HostAndPort.fromParts("1.2.3.5", 22))).andReturn(true);
NodeMetadata node = createMock(NodeMetadata.class);
expect(node.getPublicAddresses()).andReturn(ImmutableSet.of("1.2.3.4")).atLeastOnce();
expect(node.getPrivateAddresses()).andReturn(ImmutableSet.of("1.2.3.5")).atLeastOnce();
replay(socketTester);
replay(node);
HostAndPort result = ComputeServiceUtils.findReachableSocketOnNode(socketTester, node, 22, 2000, TimeUnit.MILLISECONDS, Logger.CONSOLE);
assertEquals(result, HostAndPort.fromParts("1.2.3.5", 22));
verify(socketTester);
verify(node);
}
@Test
public void testFindReachableSocketOnNodeAbortsWhenNodeNotRunning() throws Exception {
final long SLOW_GRACE = 500;
SocketOpen socketTester = SocketOpenPredicates.alwaysFail;
NodeMetadata node = createMock(NodeMetadata.class);
expect(node.getPublicAddresses()).andReturn(ImmutableSet.of("1.2.3.4")).atLeastOnce();
expect(node.getPrivateAddresses()).andReturn(ImmutableSet.of("1.2.3.5")).atLeastOnce();
expect(node.getId()).andReturn("myid").atLeastOnce();
Predicate<AtomicReference<NodeMetadata>> nodeRunning = createMock(Predicate.class);
expect(nodeRunning.apply(EasyMock.<AtomicReference<NodeMetadata>>anyObject())).andReturn(false);
replay(node);
replay(nodeRunning);
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
try {
ComputeServiceUtils.findReachableSocketOnNode(socketTester, nodeRunning,
node, 22, 2000000, TimeUnit.MILLISECONDS, Logger.CONSOLE);
fail();
} catch (RuntimeException e) {
if (!e.getMessage().contains("no longer running")) {
throw e;
}
}
long timetaken = stopwatch.elapsedMillis();
assertTrue(timetaken <= SLOW_GRACE, "timetaken="+timetaken);
verify(node);
verify(nodeRunning);
}
}