mirror of https://github.com/apache/jclouds.git
Merge pull request #626 from aledsage/Issue-858-LaunchNodeHangs
Issue-858: fix timeout waiting for port connection, and abort when node!=running
This commit is contained in:
commit
70cf2bb6a9
|
@ -21,11 +21,17 @@ package org.jclouds.compute.functions;
|
|||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static com.google.common.base.Preconditions.checkState;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import javax.inject.Named;
|
||||
import javax.inject.Singleton;
|
||||
|
||||
import org.jclouds.compute.domain.NodeMetadata;
|
||||
import org.jclouds.compute.predicates.RetryIfSocketNotYetOpen;
|
||||
import org.jclouds.compute.util.ComputeServiceUtils;
|
||||
import org.jclouds.compute.reference.ComputeServiceConstants;
|
||||
import org.jclouds.compute.reference.ComputeServiceConstants.Timeouts;
|
||||
import org.jclouds.compute.util.OpenSocketFinder;
|
||||
import org.jclouds.logging.Logger;
|
||||
import org.jclouds.ssh.SshClient;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
|
@ -39,13 +45,21 @@ import com.google.inject.Inject;
|
|||
*/
|
||||
@Singleton
|
||||
public class CreateSshClientOncePortIsListeningOnNode implements Function<NodeMetadata, SshClient> {
|
||||
@Resource
|
||||
@Named(ComputeServiceConstants.COMPUTE_LOGGER)
|
||||
protected Logger logger = Logger.NULL;
|
||||
|
||||
@Inject(optional = true)
|
||||
SshClient.Factory sshFactory;
|
||||
private final RetryIfSocketNotYetOpen socketTester;
|
||||
|
||||
private final OpenSocketFinder openSocketFinder;
|
||||
|
||||
private final long timeoutMs;
|
||||
|
||||
@Inject
|
||||
public CreateSshClientOncePortIsListeningOnNode(RetryIfSocketNotYetOpen socketTester) {
|
||||
this.socketTester = socketTester;
|
||||
public CreateSshClientOncePortIsListeningOnNode(OpenSocketFinder openSocketFinder, Timeouts timeouts) {
|
||||
this.openSocketFinder = openSocketFinder;
|
||||
this.timeoutMs = timeouts.portOpen;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -55,7 +69,8 @@ public class CreateSshClientOncePortIsListeningOnNode implements Function<NodeMe
|
|||
checkNotNull(node.getCredentials().identity, "no login identity found for node %s", node.getId());
|
||||
checkNotNull(node.getCredentials().credential, "no credential found for %s on node %s", node
|
||||
.getCredentials().identity, node.getId());
|
||||
HostAndPort socket = ComputeServiceUtils.findReachableSocketOnNode(socketTester, node, node.getLoginPort());
|
||||
HostAndPort socket = openSocketFinder.findOpenSocketOnNode(node, node.getLoginPort(),
|
||||
timeoutMs, TimeUnit.MILLISECONDS);
|
||||
return sshFactory.create(socket, node.getCredentials());
|
||||
}
|
||||
}
|
|
@ -22,11 +22,11 @@ import static com.google.common.base.Preconditions.checkNotNull;
|
|||
import static com.google.common.base.Preconditions.checkState;
|
||||
import static com.google.common.base.Throwables.getRootCause;
|
||||
import static java.lang.String.format;
|
||||
import static org.jclouds.compute.util.ComputeServiceUtils.findReachableSocketOnNode;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
@ -38,9 +38,9 @@ import org.jclouds.compute.domain.ExecResponse;
|
|||
import org.jclouds.compute.domain.NodeMetadata;
|
||||
import org.jclouds.compute.domain.NodeState;
|
||||
import org.jclouds.compute.options.TemplateOptions;
|
||||
import org.jclouds.compute.predicates.RetryIfSocketNotYetOpen;
|
||||
import org.jclouds.compute.reference.ComputeServiceConstants;
|
||||
import org.jclouds.compute.reference.ComputeServiceConstants.Timeouts;
|
||||
import org.jclouds.compute.util.OpenSocketFinder;
|
||||
import org.jclouds.javax.annotation.Nullable;
|
||||
import org.jclouds.logging.Logger;
|
||||
import org.jclouds.scriptbuilder.domain.Statement;
|
||||
|
@ -71,8 +71,8 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Cal
|
|||
|
||||
private final Predicate<AtomicReference<NodeMetadata>> nodeRunning;
|
||||
private final InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory;
|
||||
private final RetryIfSocketNotYetOpen socketTester;
|
||||
private final Timeouts timeouts;
|
||||
private final OpenSocketFinder openSocketFinder;
|
||||
|
||||
@Nullable
|
||||
private final Statement statement;
|
||||
|
@ -87,7 +87,7 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Cal
|
|||
@AssistedInject
|
||||
public CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap(
|
||||
@Named("NODE_RUNNING") Predicate<AtomicReference<NodeMetadata>> nodeRunning,
|
||||
RetryIfSocketNotYetOpen socketTester, Timeouts timeouts,
|
||||
OpenSocketFinder openSocketFinder, Timeouts timeouts,
|
||||
Function<TemplateOptions, Statement> templateOptionsToStatement,
|
||||
InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory, @Assisted TemplateOptions options,
|
||||
@Assisted AtomicReference<NodeMetadata> node, @Assisted Set<NodeMetadata> goodNodes,
|
||||
|
@ -97,7 +97,7 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Cal
|
|||
checkNotNull(options, "options"));
|
||||
this.nodeRunning = checkNotNull(nodeRunning, "nodeRunning");
|
||||
this.initScriptRunnerFactory = checkNotNull(initScriptRunnerFactory, "initScriptRunnerFactory");
|
||||
this.socketTester = checkNotNull(socketTester, "socketTester");
|
||||
this.openSocketFinder = checkNotNull(openSocketFinder, "openSocketFinder");
|
||||
this.timeouts = checkNotNull(timeouts, "timeouts");
|
||||
this.node = node;
|
||||
this.options = checkNotNull(options, "options");
|
||||
|
@ -109,12 +109,12 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Cal
|
|||
@AssistedInject
|
||||
public CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap(
|
||||
@Named("NODE_RUNNING") Predicate<AtomicReference<NodeMetadata>> nodeRunning, GetNodeMetadataStrategy getNode,
|
||||
RetryIfSocketNotYetOpen socketTester, Timeouts timeouts,
|
||||
OpenSocketFinder openSocketFinder, Timeouts timeouts,
|
||||
Function<TemplateOptions, Statement> templateOptionsToStatement,
|
||||
InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory, @Assisted TemplateOptions options,
|
||||
@Assisted Set<NodeMetadata> goodNodes, @Assisted Map<NodeMetadata, Exception> badNodes,
|
||||
@Assisted Multimap<NodeMetadata, CustomizationResponse> customizationResponses) {
|
||||
this(nodeRunning, socketTester, timeouts, templateOptionsToStatement, initScriptRunnerFactory, options,
|
||||
this(nodeRunning, openSocketFinder, timeouts, templateOptionsToStatement, initScriptRunnerFactory, options,
|
||||
new AtomicReference<NodeMetadata>(null), goodNodes, badNodes, customizationResponses);
|
||||
}
|
||||
|
||||
|
@ -153,7 +153,8 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Cal
|
|||
}
|
||||
}
|
||||
if (options.getPort() > 0) {
|
||||
findReachableSocketOnNode(socketTester.seconds(options.getSeconds()), node.get(), options.getPort());
|
||||
openSocketFinder.findOpenSocketOnNode(node.get(), options.getPort(),
|
||||
options.getSeconds(), TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
logger.debug("<< customized node(%s)", originalId);
|
||||
|
|
|
@ -18,13 +18,8 @@
|
|||
*/
|
||||
package org.jclouds.compute.util;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkState;
|
||||
import static com.google.common.base.Throwables.getStackTraceAsString;
|
||||
import static com.google.common.collect.Iterables.concat;
|
||||
import static com.google.common.collect.Iterables.filter;
|
||||
import static com.google.common.collect.Iterables.find;
|
||||
import static com.google.common.collect.Iterables.size;
|
||||
import static com.google.common.collect.Iterables.transform;
|
||||
import static org.jclouds.scriptbuilder.domain.Statements.pipeHttpResponseToBash;
|
||||
|
||||
import java.net.URI;
|
||||
|
@ -41,15 +36,12 @@ import org.jclouds.compute.domain.NodeMetadata;
|
|||
import org.jclouds.compute.domain.OsFamily;
|
||||
import org.jclouds.compute.domain.Processor;
|
||||
import org.jclouds.compute.domain.Volume;
|
||||
import org.jclouds.compute.predicates.RetryIfSocketNotYetOpen;
|
||||
import org.jclouds.http.HttpRequest;
|
||||
import org.jclouds.scriptbuilder.domain.Statement;
|
||||
import org.jclouds.scriptbuilder.domain.Statements;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.net.HostAndPort;
|
||||
import com.google.common.reflect.TypeToken;
|
||||
|
||||
/**
|
||||
|
@ -165,31 +157,6 @@ public class ComputeServiceUtils {
|
|||
return org.jclouds.rest.Providers.getSupportedProvidersOfType(TypeToken.of(ComputeServiceContext.class));
|
||||
}
|
||||
|
||||
public static HostAndPort findReachableSocketOnNode(RetryIfSocketNotYetOpen socketTester, final NodeMetadata node,
|
||||
final int port) {
|
||||
checkNodeHasIps(node);
|
||||
HostAndPort socket = null;
|
||||
try {
|
||||
socket = find(transform(concat(node.getPublicAddresses(), node.getPrivateAddresses()),
|
||||
new Function<String, HostAndPort>() {
|
||||
|
||||
@Override
|
||||
public HostAndPort apply(String from) {
|
||||
return HostAndPort.fromParts(from, port);
|
||||
}
|
||||
}), socketTester);
|
||||
} catch (NoSuchElementException e) {
|
||||
throw new NoSuchElementException(String.format("could not connect to any ip address port %d on node %s", port,
|
||||
node));
|
||||
}
|
||||
return socket;
|
||||
}
|
||||
|
||||
public static void checkNodeHasIps(NodeMetadata node) {
|
||||
checkState(size(concat(node.getPublicAddresses(), node.getPrivateAddresses())) > 0,
|
||||
"node does not have IP addresses configured: " + node);
|
||||
}
|
||||
|
||||
public static String parseVersionOrReturnEmptyString(org.jclouds.compute.domain.OsFamily family, String in,
|
||||
Map<OsFamily, Map<String, String>> osVersionMap) {
|
||||
if (osVersionMap.containsKey(family)) {
|
||||
|
|
|
@ -0,0 +1,170 @@
|
|||
package org.jclouds.compute.util;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkState;
|
||||
import static com.google.common.collect.Iterables.concat;
|
||||
import static com.google.common.collect.Iterables.size;
|
||||
import static com.google.common.collect.Iterables.transform;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import javax.inject.Named;
|
||||
|
||||
import org.jclouds.Constants;
|
||||
import org.jclouds.compute.domain.NodeMetadata;
|
||||
import org.jclouds.compute.reference.ComputeServiceConstants;
|
||||
import org.jclouds.logging.Logger;
|
||||
import org.jclouds.predicates.RetryablePredicate;
|
||||
import org.jclouds.predicates.SocketOpen;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.net.HostAndPort;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.inject.Inject;
|
||||
|
||||
public class ConcurrentOpenSocketFinder implements OpenSocketFinder {
|
||||
|
||||
@Resource
|
||||
@Named(ComputeServiceConstants.COMPUTE_LOGGER)
|
||||
private Logger logger = Logger.NULL;
|
||||
|
||||
private final SocketOpen socketTester;
|
||||
private final Predicate<AtomicReference<NodeMetadata>> nodeRunning;
|
||||
private final ListeningExecutorService executor;
|
||||
|
||||
@Inject
|
||||
public ConcurrentOpenSocketFinder(SocketOpen socketTester,
|
||||
@Named("NODE_RUNNING") final Predicate<AtomicReference<NodeMetadata>> nodeRunning,
|
||||
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService userThreads) {
|
||||
this.socketTester = socketTester;
|
||||
this.nodeRunning = nodeRunning;
|
||||
this.executor = MoreExecutors.listeningDecorator(userThreads);
|
||||
}
|
||||
|
||||
public HostAndPort findOpenSocketOnNode(final NodeMetadata node, final int port,
|
||||
long timeoutValue, TimeUnit timeUnits) {
|
||||
Iterable<String> hosts = checkNodeHasIps(node);
|
||||
Set<HostAndPort> sockets = toHostAndPorts(hosts, port);
|
||||
|
||||
// Specify a retry period of 1s, expressed in the same time units.
|
||||
long period = timeUnits.convert(1, TimeUnit.SECONDS);
|
||||
|
||||
// For storing the result; needed because predicate will just tell us true/false
|
||||
final AtomicReference<HostAndPort> result = new AtomicReference<HostAndPort>();
|
||||
|
||||
Predicate<Collection<HostAndPort>> concurrentOpenSocketFinder = new Predicate<Collection<HostAndPort>>() {
|
||||
|
||||
@Override
|
||||
public boolean apply(Collection<HostAndPort> input) {
|
||||
HostAndPort reachableSocket = findOpenSocket(input);
|
||||
if (reachableSocket != null) {
|
||||
result.set(reachableSocket);
|
||||
return true;
|
||||
} else {
|
||||
if (nodeRunning != null && !nodeRunning.apply(new AtomicReference<NodeMetadata>(node))) {
|
||||
throw new IllegalStateException(String.format("Node %s is no longer running; aborting waiting for ip:port connection", node.getId()));
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
RetryablePredicate<Collection<HostAndPort>> retryingOpenSocketFinder = new RetryablePredicate<Collection<HostAndPort>>(
|
||||
concurrentOpenSocketFinder, timeoutValue, period, timeUnits);
|
||||
|
||||
logger.debug(">> blocking on sockets %s for %d %s", sockets, timeoutValue, timeUnits);
|
||||
|
||||
boolean passed = retryingOpenSocketFinder.apply(sockets);
|
||||
|
||||
if (passed) {
|
||||
logger.debug("<< socket %s opened", result);
|
||||
assert result.get() != null;
|
||||
return result.get();
|
||||
} else {
|
||||
logger.warn("<< sockets %s didn't open after %d %s", sockets, timeoutValue, timeUnits);
|
||||
throw new NoSuchElementException(String.format("could not connect to any ip address port %d on node %s",
|
||||
port, node));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if any any of the given HostAndPorts are reachable. It checks them all concurrently,
|
||||
* and returns the first one found or null if none are reachable.
|
||||
*
|
||||
* @return A reachable HostAndPort, or null.
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
private HostAndPort findOpenSocket(final Collection<HostAndPort> sockets) {
|
||||
final AtomicReference<HostAndPort> result = new AtomicReference<HostAndPort>();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final AtomicInteger completeCount = new AtomicInteger();
|
||||
|
||||
for (final HostAndPort socket : sockets) {
|
||||
final ListenableFuture<?> future = executor.submit(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
if (socketTester.apply(socket)) {
|
||||
result.compareAndSet(null, socket);
|
||||
latch.countDown();
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
logger.warn(e, "Error checking reachability of ip:port %s", socket);
|
||||
}
|
||||
}
|
||||
|
||||
});
|
||||
|
||||
future.addListener(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (completeCount.incrementAndGet() >= sockets.size()) {
|
||||
latch.countDown(); // Tried all; mark as done
|
||||
}
|
||||
}
|
||||
|
||||
}, MoreExecutors.sameThreadExecutor());
|
||||
}
|
||||
|
||||
try {
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
return result.get();
|
||||
}
|
||||
|
||||
private Iterable<String> checkNodeHasIps(NodeMetadata node) {
|
||||
Iterable<String> ips = concat(node.getPublicAddresses(), node.getPrivateAddresses());
|
||||
checkState(size(ips) > 0, "node does not have IP addresses configured: " + node);
|
||||
return ips;
|
||||
}
|
||||
|
||||
private Set<HostAndPort> toHostAndPorts(Iterable<String> hosts, final int port) {
|
||||
return ImmutableSet.copyOf(transform(hosts,
|
||||
new Function<String, HostAndPort>() {
|
||||
|
||||
@Override
|
||||
public HostAndPort apply(String from) {
|
||||
return HostAndPort.fromParts(from, port);
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
package org.jclouds.compute.util;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.jclouds.compute.domain.NodeMetadata;
|
||||
|
||||
import com.google.common.net.HostAndPort;
|
||||
import com.google.inject.ImplementedBy;
|
||||
|
||||
/**
|
||||
* For finding an open/reachable ip:port for a node.
|
||||
*
|
||||
* @author aled
|
||||
*/
|
||||
@ImplementedBy(ConcurrentOpenSocketFinder.class)
|
||||
public interface OpenSocketFinder {
|
||||
|
||||
/**
|
||||
*
|
||||
* @param node The node (checking its public and private addresses)
|
||||
* @param port The port to try to connect to
|
||||
* @param timeoutValue Max time to try to connect to the ip:port
|
||||
* @param timeUnits
|
||||
*
|
||||
* @return The reachable ip:port
|
||||
* @throws NoSuchElementException If no ports accessible within the given time
|
||||
* @throws IllegalStateException If the given node has no public or private addresses
|
||||
*/
|
||||
HostAndPort findOpenSocketOnNode(final NodeMetadata node, final int port, long timeoutValue, TimeUnit timeUnits);
|
||||
}
|
|
@ -24,7 +24,7 @@ import com.google.common.net.HostAndPort;
|
|||
|
||||
|
||||
/**
|
||||
* For use in unit tests, e.g. {@link RetryIfSocketNotYetOpenTest}.
|
||||
* For use in unit tests.
|
||||
*/
|
||||
public class SocketOpenPredicates {
|
||||
|
||||
|
|
|
@ -19,12 +19,15 @@
|
|||
package org.jclouds.compute.strategy;
|
||||
|
||||
import static org.easymock.EasyMock.createMock;
|
||||
import static org.easymock.EasyMock.expect;
|
||||
import static org.easymock.EasyMock.replay;
|
||||
import static org.easymock.EasyMock.verify;
|
||||
import static org.testng.Assert.assertEquals;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.jclouds.compute.config.CustomizationResponse;
|
||||
|
@ -34,8 +37,8 @@ import org.jclouds.compute.domain.NodeState;
|
|||
import org.jclouds.compute.functions.TemplateOptionsToStatement;
|
||||
import org.jclouds.compute.options.TemplateOptions;
|
||||
import org.jclouds.compute.predicates.AtomicNodeRunning;
|
||||
import org.jclouds.compute.predicates.RetryIfSocketNotYetOpen;
|
||||
import org.jclouds.compute.reference.ComputeServiceConstants.Timeouts;
|
||||
import org.jclouds.compute.util.OpenSocketFinder;
|
||||
import org.jclouds.scriptbuilder.domain.Statement;
|
||||
import org.testng.Assert;
|
||||
import org.testng.annotations.Test;
|
||||
|
@ -55,7 +58,7 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest {
|
|||
|
||||
public void testBreakWhenNodeStillPending() {
|
||||
InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory = createMock(InitializeRunScriptOnNodeOrPlaceInBadMap.Factory.class);
|
||||
RetryIfSocketNotYetOpen socketTester = createMock(RetryIfSocketNotYetOpen.class);
|
||||
OpenSocketFinder openSocketFinder = createMock(OpenSocketFinder.class);
|
||||
Timeouts timeouts = new Timeouts();
|
||||
Function<TemplateOptions, Statement> templateOptionsToStatement = new TemplateOptionsToStatement();
|
||||
@SuppressWarnings("unused")
|
||||
|
@ -79,10 +82,10 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest {
|
|||
};
|
||||
|
||||
// replay mocks
|
||||
replay(initScriptRunnerFactory, socketTester);
|
||||
replay(initScriptRunnerFactory, openSocketFinder);
|
||||
// run
|
||||
AtomicReference<NodeMetadata> atomicNode = new AtomicReference<NodeMetadata>(node);
|
||||
new CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap( new AtomicNodeRunning(nodeRunning), socketTester, timeouts,
|
||||
new CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap( new AtomicNodeRunning(nodeRunning), openSocketFinder, timeouts,
|
||||
templateOptionsToStatement, initScriptRunnerFactory, options, atomicNode, goodNodes, badNodes,
|
||||
customizationResponses).apply(atomicNode);
|
||||
|
||||
|
@ -93,12 +96,12 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest {
|
|||
assertEquals(customizationResponses.size(), 0);
|
||||
|
||||
// verify mocks
|
||||
verify(initScriptRunnerFactory, socketTester);
|
||||
verify(initScriptRunnerFactory, openSocketFinder);
|
||||
}
|
||||
|
||||
public void testBreakGraceFullyWhenNodeDied() {
|
||||
InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory = createMock(InitializeRunScriptOnNodeOrPlaceInBadMap.Factory.class);
|
||||
RetryIfSocketNotYetOpen socketTester = createMock(RetryIfSocketNotYetOpen.class);
|
||||
OpenSocketFinder openSocketFinder = createMock(OpenSocketFinder.class);
|
||||
Timeouts timeouts = new Timeouts();
|
||||
Function<TemplateOptions, Statement> templateOptionsToStatement = new TemplateOptionsToStatement();
|
||||
@SuppressWarnings("unused")
|
||||
|
@ -123,10 +126,10 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest {
|
|||
};
|
||||
|
||||
// replay mocks
|
||||
replay(initScriptRunnerFactory, socketTester);
|
||||
replay(initScriptRunnerFactory, openSocketFinder);
|
||||
// run
|
||||
AtomicReference<NodeMetadata> atomicNode = new AtomicReference<NodeMetadata>(node);
|
||||
new CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap( new AtomicNodeRunning(nodeRunning), socketTester, timeouts,
|
||||
new CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap( new AtomicNodeRunning(nodeRunning), openSocketFinder, timeouts,
|
||||
templateOptionsToStatement, initScriptRunnerFactory, options, atomicNode, goodNodes, badNodes,
|
||||
customizationResponses).apply(atomicNode);
|
||||
|
||||
|
@ -137,6 +140,52 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest {
|
|||
assertEquals(customizationResponses.size(), 0);
|
||||
|
||||
// verify mocks
|
||||
verify(initScriptRunnerFactory, socketTester);
|
||||
verify(initScriptRunnerFactory, openSocketFinder);
|
||||
}
|
||||
|
||||
public void testBreakGraceWhenNodeSocketFailsToOpen() {
|
||||
int portTimeoutSecs = 2;
|
||||
InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory = createMock(InitializeRunScriptOnNodeOrPlaceInBadMap.Factory.class);
|
||||
OpenSocketFinder openSocketFinder = createMock(OpenSocketFinder.class);
|
||||
Timeouts timeouts = new Timeouts();
|
||||
Function<TemplateOptions, Statement> templateOptionsToStatement = new TemplateOptionsToStatement();
|
||||
TemplateOptions options = new TemplateOptions().blockOnPort(22, portTimeoutSecs);
|
||||
Set<NodeMetadata> goodNodes = Sets.newLinkedHashSet();
|
||||
Map<NodeMetadata, Exception> badNodes = Maps.newLinkedHashMap();
|
||||
Multimap<NodeMetadata, CustomizationResponse> customizationResponses = LinkedHashMultimap.create();
|
||||
|
||||
final NodeMetadata pendingNode = new NodeMetadataBuilder().ids("id").state(NodeState.PENDING).build();
|
||||
final NodeMetadata runningNode = new NodeMetadataBuilder().ids("id").state(NodeState.RUNNING).build();
|
||||
|
||||
expect(openSocketFinder.findOpenSocketOnNode(runningNode, 22, portTimeoutSecs, TimeUnit.SECONDS))
|
||||
.andThrow(new NoSuchElementException("could not connect to any ip address port")).once();
|
||||
|
||||
GetNodeMetadataStrategy nodeRunning = new GetNodeMetadataStrategy(){
|
||||
|
||||
@Override
|
||||
public NodeMetadata getNode(String input) {
|
||||
Assert.assertEquals(input, pendingNode.getId());
|
||||
return runningNode;
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
// replay mocks
|
||||
replay(initScriptRunnerFactory, openSocketFinder);
|
||||
|
||||
// run
|
||||
AtomicReference<NodeMetadata> atomicNode = new AtomicReference<NodeMetadata>(pendingNode);
|
||||
new CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap( new AtomicNodeRunning(nodeRunning), openSocketFinder, timeouts,
|
||||
templateOptionsToStatement, initScriptRunnerFactory, options, atomicNode, goodNodes, badNodes,
|
||||
customizationResponses).apply(atomicNode);
|
||||
|
||||
assertEquals(goodNodes.size(), 0);
|
||||
assertEquals(badNodes.keySet(), ImmutableSet.of(pendingNode));
|
||||
badNodes.get(pendingNode).printStackTrace();
|
||||
assertEquals(badNodes.get(pendingNode).getMessage(), "could not connect to any ip address port");
|
||||
assertEquals(customizationResponses.size(), 0);
|
||||
|
||||
// verify mocks
|
||||
verify(initScriptRunnerFactory, openSocketFinder);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -79,6 +79,5 @@ public class ComputeServiceUtilsTest {
|
|||
ComputeServiceUtils.extractTargzIntoDirectory(request, "/stage/").render(
|
||||
org.jclouds.scriptbuilder.domain.OsFamily.UNIX),
|
||||
"curl -q -s -S -L --connect-timeout 10 --max-time 600 --retry 20 -X GET -H \"Host: adriancolehappy.s3.amazonaws.com\" -H \"Date: Sun, 12 Sep 2010 08:25:19 GMT\" -H \"Authorization: AWS 0ASHDJAS82:JASHFDA=\" https://adriancolehappy.s3.amazonaws.com/java/install |(mkdir -p /stage/ &&cd /stage/ &&tar -xpzf -)\n");
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,193 @@
|
|||
package org.jclouds.compute.util;
|
||||
|
||||
import static org.easymock.EasyMock.createMock;
|
||||
import static org.easymock.EasyMock.expect;
|
||||
import static org.easymock.EasyMock.replay;
|
||||
import static org.easymock.EasyMock.verify;
|
||||
import static org.testng.Assert.assertEquals;
|
||||
import static org.testng.Assert.assertTrue;
|
||||
import static org.testng.Assert.fail;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.easymock.EasyMock;
|
||||
import org.jclouds.compute.domain.NodeMetadata;
|
||||
import org.jclouds.concurrent.MoreExecutors;
|
||||
import org.jclouds.predicates.SocketOpen;
|
||||
import org.testng.annotations.AfterMethod;
|
||||
import org.testng.annotations.BeforeMethod;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.net.HostAndPort;
|
||||
|
||||
@Test(singleThreaded=true)
|
||||
public class ConcurrentOpenSocketFinderTest {
|
||||
|
||||
private static final long SLOW_GRACE = 500;
|
||||
private static final long EARLY_GRACE = 10;
|
||||
|
||||
private NodeMetadata node;
|
||||
private SocketOpen socketTester;
|
||||
private Predicate<AtomicReference<NodeMetadata>> nodeRunning;
|
||||
private ExecutorService threadPool;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@BeforeMethod
|
||||
public void setUp() {
|
||||
node = createMock(NodeMetadata.class);
|
||||
expect(node.getPublicAddresses()).andReturn(ImmutableSet.of("1.2.3.4")).atLeastOnce();
|
||||
expect(node.getPrivateAddresses()).andReturn(ImmutableSet.of("1.2.3.5")).atLeastOnce();
|
||||
expect(node.getId()).andReturn("myid").anyTimes();
|
||||
|
||||
socketTester = createMock(SocketOpen.class);
|
||||
|
||||
nodeRunning = createMock(Predicate.class);
|
||||
|
||||
replay(node);
|
||||
|
||||
threadPool = Executors.newCachedThreadPool();
|
||||
}
|
||||
|
||||
@AfterMethod(alwaysRun=true)
|
||||
public void tearDown() {
|
||||
if (threadPool != null) threadPool.shutdownNow();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRespectsTimeout() throws Exception {
|
||||
final long timeoutMs = 1000;
|
||||
|
||||
expect(socketTester.apply(HostAndPort.fromParts("1.2.3.4", 22))).andReturn(false).times(2, Integer.MAX_VALUE);
|
||||
expect(socketTester.apply(HostAndPort.fromParts("1.2.3.5", 22))).andReturn(false).times(2, Integer.MAX_VALUE);
|
||||
expect(nodeRunning.apply(EasyMock.<AtomicReference<NodeMetadata>>anyObject())).andReturn(true);
|
||||
replay(socketTester);
|
||||
replay(nodeRunning);
|
||||
|
||||
OpenSocketFinder finder = new ConcurrentOpenSocketFinder(socketTester, null, MoreExecutors.sameThreadExecutor());
|
||||
|
||||
Stopwatch stopwatch = new Stopwatch();
|
||||
stopwatch.start();
|
||||
try {
|
||||
finder.findOpenSocketOnNode(node, 22, timeoutMs, TimeUnit.MILLISECONDS);
|
||||
fail();
|
||||
} catch (NoSuchElementException success) {
|
||||
// expected
|
||||
}
|
||||
long timetaken = stopwatch.elapsedMillis();
|
||||
|
||||
assertTrue(timetaken >= timeoutMs-EARLY_GRACE && timetaken <= timeoutMs+SLOW_GRACE, "timetaken="+timetaken);
|
||||
|
||||
verify(node);
|
||||
verify(socketTester);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReturnsReachable() throws Exception {
|
||||
expect(socketTester.apply(HostAndPort.fromParts("1.2.3.4", 22))).andReturn(false).once();
|
||||
expect(socketTester.apply(HostAndPort.fromParts("1.2.3.5", 22))).andReturn(true).once();
|
||||
expect(nodeRunning.apply(EasyMock.<AtomicReference<NodeMetadata>>anyObject())).andReturn(true);
|
||||
replay(socketTester);
|
||||
replay(nodeRunning);
|
||||
|
||||
OpenSocketFinder finder = new ConcurrentOpenSocketFinder(socketTester, null, MoreExecutors.sameThreadExecutor());
|
||||
|
||||
HostAndPort result = finder.findOpenSocketOnNode(node, 22, 2000, TimeUnit.MILLISECONDS);
|
||||
assertEquals(result, HostAndPort.fromParts("1.2.3.5", 22));
|
||||
|
||||
verify(node);
|
||||
verify(socketTester);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testChecksSocketsConcurrently() throws Exception {
|
||||
long delayForReachableMs = 25;
|
||||
|
||||
expect(nodeRunning.apply(EasyMock.<AtomicReference<NodeMetadata>>anyObject())).andReturn(true);
|
||||
replay(nodeRunning);
|
||||
|
||||
// Can't use mock+answer for concurrency tests; EasyMock uses lock in ReplayState
|
||||
ControllableSocketOpen socketTester = new ControllableSocketOpen(ImmutableMap.of(
|
||||
HostAndPort.fromParts("1.2.3.4", 22), new SlowCallable<Boolean>(false, 1000),
|
||||
HostAndPort.fromParts("1.2.3.5", 22), new SlowCallable<Boolean>(true, delayForReachableMs)));
|
||||
|
||||
OpenSocketFinder finder = new ConcurrentOpenSocketFinder(socketTester, null, threadPool);
|
||||
|
||||
Stopwatch stopwatch = new Stopwatch();
|
||||
stopwatch.start();
|
||||
HostAndPort result = finder.findOpenSocketOnNode(node, 22, 2000, TimeUnit.MILLISECONDS);
|
||||
long timetaken = stopwatch.elapsedMillis();
|
||||
|
||||
assertEquals(result, HostAndPort.fromParts("1.2.3.5", 22));
|
||||
assertTrue(timetaken >= delayForReachableMs-EARLY_GRACE && timetaken <= delayForReachableMs+SLOW_GRACE, "timetaken="+timetaken);
|
||||
verify(node);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAbortsWhenNodeNotRunning() throws Exception {
|
||||
expect(socketTester.apply(EasyMock.<HostAndPort>anyObject())).andReturn(false);
|
||||
expect(nodeRunning.apply(EasyMock.<AtomicReference<NodeMetadata>>anyObject())).andReturn(false);
|
||||
replay(socketTester);
|
||||
replay(nodeRunning);
|
||||
|
||||
OpenSocketFinder finder = new ConcurrentOpenSocketFinder(socketTester, nodeRunning, MoreExecutors.sameThreadExecutor());
|
||||
|
||||
Stopwatch stopwatch = new Stopwatch();
|
||||
stopwatch.start();
|
||||
try {
|
||||
finder.findOpenSocketOnNode(node, 22, 2000, TimeUnit.MILLISECONDS);
|
||||
fail();
|
||||
} catch (NoSuchElementException e) {
|
||||
// success
|
||||
// Note: don't get the "no longer running" message, because logged+swallowed by RetryablePredicate
|
||||
}
|
||||
long timetaken = stopwatch.elapsedMillis();
|
||||
|
||||
assertTrue(timetaken <= SLOW_GRACE, "timetaken="+timetaken);
|
||||
|
||||
verify(node);
|
||||
verify(socketTester);
|
||||
verify(nodeRunning);
|
||||
}
|
||||
|
||||
private static class SlowCallable<T> implements Callable<T> {
|
||||
private final T result;
|
||||
private final long delay;
|
||||
|
||||
SlowCallable(T result, long delay) {
|
||||
this.result = result;
|
||||
this.delay = delay;
|
||||
}
|
||||
@Override
|
||||
public T call() throws Exception {
|
||||
Thread.sleep(delay);
|
||||
return result;
|
||||
}
|
||||
};
|
||||
|
||||
private static class ControllableSocketOpen implements SocketOpen {
|
||||
private final Map<HostAndPort, ? extends Callable<Boolean>> answers;
|
||||
|
||||
ControllableSocketOpen(Map<HostAndPort, ? extends Callable<Boolean>> answers) {
|
||||
this.answers = answers;
|
||||
}
|
||||
@Override
|
||||
public boolean apply(HostAndPort input) {
|
||||
try {
|
||||
return answers.get(input).call();
|
||||
} catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
|
@ -33,13 +33,13 @@ import javax.inject.Singleton;
|
|||
|
||||
import org.jclouds.compute.callables.RunScriptOnNode.Factory;
|
||||
import org.jclouds.compute.domain.NodeMetadata;
|
||||
import org.jclouds.compute.predicates.RetryIfSocketNotYetOpen;
|
||||
import org.jclouds.compute.reference.ComputeServiceConstants;
|
||||
import org.jclouds.location.Provider;
|
||||
import org.jclouds.logging.Logger;
|
||||
import org.jclouds.rest.annotations.Credential;
|
||||
import org.jclouds.rest.annotations.Identity;
|
||||
import org.jclouds.scriptbuilder.domain.Statements;
|
||||
import org.jclouds.virtualbox.predicates.RetryIfSocketNotYetOpen;
|
||||
import org.virtualbox_4_1.SessionState;
|
||||
import org.virtualbox_4_1.VirtualBoxManager;
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.jclouds.compute.predicates;
|
||||
package org.jclouds.virtualbox.predicates;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
|
@ -35,8 +35,8 @@ import org.jclouds.compute.domain.NodeMetadata;
|
|||
import org.jclouds.compute.domain.NodeMetadataBuilder;
|
||||
import org.jclouds.compute.domain.NodeState;
|
||||
import org.jclouds.compute.domain.OperatingSystem;
|
||||
import org.jclouds.compute.predicates.RetryIfSocketNotYetOpen;
|
||||
import org.jclouds.scriptbuilder.domain.Statements;
|
||||
import org.jclouds.virtualbox.predicates.RetryIfSocketNotYetOpen;
|
||||
import org.testng.annotations.Test;
|
||||
import org.virtualbox_4_1.VirtualBoxManager;
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.jclouds.compute.predicates;
|
||||
package org.jclouds.virtualbox.predicates;
|
||||
|
||||
import static org.jclouds.compute.predicates.SocketOpenPredicates.alwaysFail;
|
||||
import static org.testng.Assert.assertEquals;
|
Loading…
Reference in New Issue