Issue 858: extracted OpenSocketFinder from ComputeServiceUtils

This commit is contained in:
Aled Sage 2012-05-14 12:43:20 +01:00
parent 83c9ecc3d8
commit b3a027f065
8 changed files with 466 additions and 206 deletions

View File

@ -30,9 +30,8 @@ import javax.inject.Singleton;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.reference.ComputeServiceConstants;
import org.jclouds.compute.reference.ComputeServiceConstants.Timeouts;
import org.jclouds.compute.util.ComputeServiceUtils;
import org.jclouds.compute.util.OpenSocketFinder;
import org.jclouds.logging.Logger;
import org.jclouds.predicates.SocketOpen;
import org.jclouds.ssh.SshClient;
import com.google.common.base.Function;
@ -52,13 +51,14 @@ public class CreateSshClientOncePortIsListeningOnNode implements Function<NodeMe
@Inject(optional = true)
SshClient.Factory sshFactory;
private final SocketOpen socketTester;
private final OpenSocketFinder openSocketFinder;
private final long timeoutMs;
@Inject
public CreateSshClientOncePortIsListeningOnNode(SocketOpen socketTester, Timeouts timeouts) {
this.socketTester = socketTester;
public CreateSshClientOncePortIsListeningOnNode(OpenSocketFinder openSocketFinder, Timeouts timeouts) {
this.openSocketFinder = openSocketFinder;
this.timeoutMs = timeouts.portOpen;
}
@ -69,8 +69,8 @@ public class CreateSshClientOncePortIsListeningOnNode implements Function<NodeMe
checkNotNull(node.getCredentials().identity, "no login identity found for node %s", node.getId());
checkNotNull(node.getCredentials().credential, "no credential found for %s on node %s", node
.getCredentials().identity, node.getId());
HostAndPort socket = ComputeServiceUtils.findReachableSocketOnNode(socketTester, node, node.getLoginPort(),
timeoutMs, TimeUnit.MILLISECONDS, logger);
HostAndPort socket = openSocketFinder.findOpenSocketOnNode(node, node.getLoginPort(),
timeoutMs, TimeUnit.MILLISECONDS);
return sshFactory.create(socket, node.getCredentials());
}
}

View File

@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.getRootCause;
import static java.lang.String.format;
import static org.jclouds.compute.util.ComputeServiceUtils.findReachableSocketOnNode;
import java.util.Map;
import java.util.Set;
@ -41,9 +40,9 @@ import org.jclouds.compute.domain.NodeState;
import org.jclouds.compute.options.TemplateOptions;
import org.jclouds.compute.reference.ComputeServiceConstants;
import org.jclouds.compute.reference.ComputeServiceConstants.Timeouts;
import org.jclouds.compute.util.OpenSocketFinder;
import org.jclouds.javax.annotation.Nullable;
import org.jclouds.logging.Logger;
import org.jclouds.predicates.SocketOpen;
import org.jclouds.scriptbuilder.domain.Statement;
import com.google.common.base.Function;
@ -72,8 +71,8 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Cal
private final Predicate<AtomicReference<NodeMetadata>> nodeRunning;
private final InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory;
private final SocketOpen socketTester;
private final Timeouts timeouts;
private final OpenSocketFinder openSocketFinder;
@Nullable
private final Statement statement;
@ -88,7 +87,7 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Cal
@AssistedInject
public CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap(
@Named("NODE_RUNNING") Predicate<AtomicReference<NodeMetadata>> nodeRunning,
SocketOpen socketTester, Timeouts timeouts,
OpenSocketFinder openSocketFinder, Timeouts timeouts,
Function<TemplateOptions, Statement> templateOptionsToStatement,
InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory, @Assisted TemplateOptions options,
@Assisted AtomicReference<NodeMetadata> node, @Assisted Set<NodeMetadata> goodNodes,
@ -98,7 +97,7 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Cal
checkNotNull(options, "options"));
this.nodeRunning = checkNotNull(nodeRunning, "nodeRunning");
this.initScriptRunnerFactory = checkNotNull(initScriptRunnerFactory, "initScriptRunnerFactory");
this.socketTester = checkNotNull(socketTester, "socketTester");
this.openSocketFinder = checkNotNull(openSocketFinder, "openSocketFinder");
this.timeouts = checkNotNull(timeouts, "timeouts");
this.node = node;
this.options = checkNotNull(options, "options");
@ -110,12 +109,12 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Cal
@AssistedInject
public CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap(
@Named("NODE_RUNNING") Predicate<AtomicReference<NodeMetadata>> nodeRunning, GetNodeMetadataStrategy getNode,
SocketOpen socketTester, Timeouts timeouts,
OpenSocketFinder openSocketFinder, Timeouts timeouts,
Function<TemplateOptions, Statement> templateOptionsToStatement,
InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory, @Assisted TemplateOptions options,
@Assisted Set<NodeMetadata> goodNodes, @Assisted Map<NodeMetadata, Exception> badNodes,
@Assisted Multimap<NodeMetadata, CustomizationResponse> customizationResponses) {
this(nodeRunning, socketTester, timeouts, templateOptionsToStatement, initScriptRunnerFactory, options,
this(nodeRunning, openSocketFinder, timeouts, templateOptionsToStatement, initScriptRunnerFactory, options,
new AtomicReference<NodeMetadata>(null), goodNodes, badNodes, customizationResponses);
}
@ -154,8 +153,8 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Cal
}
}
if (options.getPort() > 0) {
findReachableSocketOnNode(socketTester, nodeRunning, node.get(), options.getPort(),
options.getSeconds(), TimeUnit.SECONDS, logger);
openSocketFinder.findOpenSocketOnNode(node.get(), options.getPort(),
options.getSeconds(), TimeUnit.SECONDS);
}
}
logger.debug("<< customized node(%s)", originalId);

View File

@ -18,12 +18,8 @@
*/
package org.jclouds.compute.util;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.getStackTraceAsString;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.size;
import static com.google.common.collect.Iterables.transform;
import static org.jclouds.scriptbuilder.domain.Statements.pipeHttpResponseToBash;
import java.net.URI;
@ -31,12 +27,8 @@ import java.util.Formatter;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.jclouds.compute.ComputeServiceContext;
import org.jclouds.compute.domain.ComputeMetadata;
import org.jclouds.compute.domain.Hardware;
@ -45,16 +37,11 @@ import org.jclouds.compute.domain.OsFamily;
import org.jclouds.compute.domain.Processor;
import org.jclouds.compute.domain.Volume;
import org.jclouds.http.HttpRequest;
import org.jclouds.logging.Logger;
import org.jclouds.predicates.RetryablePredicate;
import org.jclouds.predicates.SocketOpen;
import org.jclouds.scriptbuilder.domain.Statement;
import org.jclouds.scriptbuilder.domain.Statements;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.net.HostAndPort;
import com.google.common.reflect.TypeToken;
/**
@ -170,72 +157,6 @@ public class ComputeServiceUtils {
return org.jclouds.rest.Providers.getSupportedProvidersOfType(TypeToken.of(ComputeServiceContext.class));
}
public static HostAndPort findReachableSocketOnNode(final SocketOpen socketTester, final NodeMetadata node,
final int port, long timeoutValue, TimeUnit timeUnits, Logger logger) {
return findReachableSocketOnNode(socketTester, null, node, port, timeoutValue, timeUnits, logger);
}
public static HostAndPort findReachableSocketOnNode(final SocketOpen socketTester,
@Nullable final Predicate<AtomicReference<NodeMetadata>> nodeRunning, final NodeMetadata node,
final int port, long timeoutValue, TimeUnit timeUnits, Logger logger) {
checkNodeHasIps(node);
Iterable<HostAndPort> sockets = transform(concat(node.getPublicAddresses(), node.getPrivateAddresses()),
new Function<String, HostAndPort>() {
@Override
public HostAndPort apply(String from) {
return HostAndPort.fromParts(from, port);
}
});
// Specify a retry period of 1s, expressed in the same time units.
long period = timeUnits.convert(1, TimeUnit.SECONDS);
// For storing the result, as predicate will just tell us true/false
final AtomicReference<HostAndPort> result = new AtomicReference<HostAndPort>();
Predicate<Iterable<HostAndPort>> multiIpSocketTester = new Predicate<Iterable<HostAndPort>>() {
@Override
public boolean apply(Iterable<HostAndPort> sockets) {
for (HostAndPort socket : sockets) {
if (socketTester.apply(socket)) {
result.set(socket);
return true;
}
}
if (nodeRunning != null && !nodeRunning.apply(new AtomicReference<NodeMetadata>(node))) {
throw new IllegalStateException(String.format("Node %s is no longer running; aborting waiting for ip:port connection", node.getId()));
}
return false;
}
};
RetryablePredicate<Iterable<HostAndPort>> tester = new RetryablePredicate<Iterable<HostAndPort>>(
multiIpSocketTester, timeoutValue, period, timeUnits);
logger.debug(">> blocking on sockets %s for %d %s", sockets, timeoutValue, timeUnits);
boolean passed = tester.apply(sockets);
if (passed) {
logger.debug("<< socket %s opened", result);
assert result.get() != null;
return result.get();
} else {
logger.warn("<< sockets %s didn't open after %d %s", sockets, timeoutValue, timeUnits);
throw new NoSuchElementException(String.format("could not connect to any ip address port %d on node %s",
port, node));
}
}
public static void checkNodeHasIps(NodeMetadata node) {
checkState(size(concat(node.getPublicAddresses(), node.getPrivateAddresses())) > 0,
"node does not have IP addresses configured: " + node);
}
public static String parseVersionOrReturnEmptyString(org.jclouds.compute.domain.OsFamily family, String in,
Map<OsFamily, Map<String, String>> osVersionMap) {
if (osVersionMap.containsKey(family)) {

View File

@ -0,0 +1,170 @@
package org.jclouds.compute.util;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.size;
import static com.google.common.collect.Iterables.transform;
import java.util.Collection;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Resource;
import javax.inject.Named;
import org.jclouds.Constants;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.reference.ComputeServiceConstants;
import org.jclouds.logging.Logger;
import org.jclouds.predicates.RetryablePredicate;
import org.jclouds.predicates.SocketOpen;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
public class ConcurrentOpenSocketFinder implements OpenSocketFinder {
@Resource
@Named(ComputeServiceConstants.COMPUTE_LOGGER)
private Logger logger = Logger.NULL;
private final SocketOpen socketTester;
private final Predicate<AtomicReference<NodeMetadata>> nodeRunning;
private final ListeningExecutorService executor;
@Inject
public ConcurrentOpenSocketFinder(SocketOpen socketTester,
@Named("NODE_RUNNING") final Predicate<AtomicReference<NodeMetadata>> nodeRunning,
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService userThreads) {
this.socketTester = socketTester;
this.nodeRunning = nodeRunning;
this.executor = MoreExecutors.listeningDecorator(userThreads);
}
public HostAndPort findOpenSocketOnNode(final NodeMetadata node, final int port,
long timeoutValue, TimeUnit timeUnits) {
Iterable<String> hosts = checkNodeHasIps(node);
Set<HostAndPort> sockets = toHostAndPorts(hosts, port);
// Specify a retry period of 1s, expressed in the same time units.
long period = timeUnits.convert(1, TimeUnit.SECONDS);
// For storing the result; needed because predicate will just tell us true/false
final AtomicReference<HostAndPort> result = new AtomicReference<HostAndPort>();
Predicate<Collection<HostAndPort>> concurrentOpenSocketFinder = new Predicate<Collection<HostAndPort>>() {
@Override
public boolean apply(Collection<HostAndPort> input) {
HostAndPort reachableSocket = findOpenSocket(input);
if (reachableSocket != null) {
result.set(reachableSocket);
return true;
} else {
if (nodeRunning != null && !nodeRunning.apply(new AtomicReference<NodeMetadata>(node))) {
throw new IllegalStateException(String.format("Node %s is no longer running; aborting waiting for ip:port connection", node.getId()));
}
return false;
}
}
};
RetryablePredicate<Collection<HostAndPort>> retryingOpenSocketFinder = new RetryablePredicate<Collection<HostAndPort>>(
concurrentOpenSocketFinder, timeoutValue, period, timeUnits);
logger.debug(">> blocking on sockets %s for %d %s", sockets, timeoutValue, timeUnits);
boolean passed = retryingOpenSocketFinder.apply(sockets);
if (passed) {
logger.debug("<< socket %s opened", result);
assert result.get() != null;
return result.get();
} else {
logger.warn("<< sockets %s didn't open after %d %s", sockets, timeoutValue, timeUnits);
throw new NoSuchElementException(String.format("could not connect to any ip address port %d on node %s",
port, node));
}
}
/**
* Checks if any any of the given HostAndPorts are reachable. It checks them all concurrently,
* and returns the first one found or null if none are reachable.
*
* @return A reachable HostAndPort, or null.
* @throws InterruptedException
*/
private HostAndPort findOpenSocket(final Collection<HostAndPort> sockets) {
final AtomicReference<HostAndPort> result = new AtomicReference<HostAndPort>();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger completeCount = new AtomicInteger();
for (final HostAndPort socket : sockets) {
final ListenableFuture<?> future = executor.submit(new Runnable() {
@Override
public void run() {
try {
if (socketTester.apply(socket)) {
result.compareAndSet(null, socket);
latch.countDown();
}
} catch (RuntimeException e) {
logger.warn(e, "Error checking reachability of ip:port %s", socket);
}
}
});
future.addListener(new Runnable() {
@Override
public void run() {
if (completeCount.incrementAndGet() >= sockets.size()) {
latch.countDown(); // Tried all; mark as done
}
}
}, MoreExecutors.sameThreadExecutor());
}
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
return result.get();
}
private Iterable<String> checkNodeHasIps(NodeMetadata node) {
Iterable<String> ips = concat(node.getPublicAddresses(), node.getPrivateAddresses());
checkState(size(ips) > 0, "node does not have IP addresses configured: " + node);
return ips;
}
private Set<HostAndPort> toHostAndPorts(Iterable<String> hosts, final int port) {
return ImmutableSet.copyOf(transform(hosts,
new Function<String, HostAndPort>() {
@Override
public HostAndPort apply(String from) {
return HostAndPort.fromParts(from, port);
}
}));
}
}

View File

@ -0,0 +1,30 @@
package org.jclouds.compute.util;
import java.util.concurrent.TimeUnit;
import org.jclouds.compute.domain.NodeMetadata;
import com.google.common.net.HostAndPort;
import com.google.inject.ImplementedBy;
/**
* For finding an open/reachable ip:port for a node.
*
* @author aled
*/
@ImplementedBy(ConcurrentOpenSocketFinder.class)
public interface OpenSocketFinder {
/**
*
* @param node The node (checking its public and private addresses)
* @param port The port to try to connect to
* @param timeoutValue Max time to try to connect to the ip:port
* @param timeUnits
*
* @return The reachable ip:port
* @throws NoSuchElementException If no ports accessible within the given time
* @throws IllegalStateException If the given node has no public or private addresses
*/
HostAndPort findOpenSocketOnNode(final NodeMetadata node, final int port, long timeoutValue, TimeUnit timeUnits);
}

View File

@ -19,12 +19,15 @@
package org.jclouds.compute.strategy;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.testng.Assert.assertEquals;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.jclouds.compute.config.CustomizationResponse;
@ -35,7 +38,7 @@ import org.jclouds.compute.functions.TemplateOptionsToStatement;
import org.jclouds.compute.options.TemplateOptions;
import org.jclouds.compute.predicates.AtomicNodeRunning;
import org.jclouds.compute.reference.ComputeServiceConstants.Timeouts;
import org.jclouds.predicates.SocketOpen;
import org.jclouds.compute.util.OpenSocketFinder;
import org.jclouds.scriptbuilder.domain.Statement;
import org.testng.Assert;
import org.testng.annotations.Test;
@ -55,7 +58,7 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest {
public void testBreakWhenNodeStillPending() {
InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory = createMock(InitializeRunScriptOnNodeOrPlaceInBadMap.Factory.class);
SocketOpen socketTester = createMock(SocketOpen.class);
OpenSocketFinder openSocketFinder = createMock(OpenSocketFinder.class);
Timeouts timeouts = new Timeouts();
Function<TemplateOptions, Statement> templateOptionsToStatement = new TemplateOptionsToStatement();
@SuppressWarnings("unused")
@ -79,10 +82,10 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest {
};
// replay mocks
replay(initScriptRunnerFactory, socketTester);
replay(initScriptRunnerFactory, openSocketFinder);
// run
AtomicReference<NodeMetadata> atomicNode = new AtomicReference<NodeMetadata>(node);
new CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap( new AtomicNodeRunning(nodeRunning), socketTester, timeouts,
new CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap( new AtomicNodeRunning(nodeRunning), openSocketFinder, timeouts,
templateOptionsToStatement, initScriptRunnerFactory, options, atomicNode, goodNodes, badNodes,
customizationResponses).apply(atomicNode);
@ -93,12 +96,12 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest {
assertEquals(customizationResponses.size(), 0);
// verify mocks
verify(initScriptRunnerFactory, socketTester);
verify(initScriptRunnerFactory, openSocketFinder);
}
public void testBreakGraceFullyWhenNodeDied() {
InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory = createMock(InitializeRunScriptOnNodeOrPlaceInBadMap.Factory.class);
SocketOpen socketTester = createMock(SocketOpen.class);
OpenSocketFinder openSocketFinder = createMock(OpenSocketFinder.class);
Timeouts timeouts = new Timeouts();
Function<TemplateOptions, Statement> templateOptionsToStatement = new TemplateOptionsToStatement();
@SuppressWarnings("unused")
@ -123,10 +126,10 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest {
};
// replay mocks
replay(initScriptRunnerFactory, socketTester);
replay(initScriptRunnerFactory, openSocketFinder);
// run
AtomicReference<NodeMetadata> atomicNode = new AtomicReference<NodeMetadata>(node);
new CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap( new AtomicNodeRunning(nodeRunning), socketTester, timeouts,
new CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap( new AtomicNodeRunning(nodeRunning), openSocketFinder, timeouts,
templateOptionsToStatement, initScriptRunnerFactory, options, atomicNode, goodNodes, badNodes,
customizationResponses).apply(atomicNode);
@ -137,6 +140,52 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest {
assertEquals(customizationResponses.size(), 0);
// verify mocks
verify(initScriptRunnerFactory, socketTester);
verify(initScriptRunnerFactory, openSocketFinder);
}
public void testBreakGraceWhenNodeSocketFailsToOpen() {
int portTimeoutSecs = 2;
InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory = createMock(InitializeRunScriptOnNodeOrPlaceInBadMap.Factory.class);
OpenSocketFinder openSocketFinder = createMock(OpenSocketFinder.class);
Timeouts timeouts = new Timeouts();
Function<TemplateOptions, Statement> templateOptionsToStatement = new TemplateOptionsToStatement();
TemplateOptions options = new TemplateOptions().blockOnPort(22, portTimeoutSecs);
Set<NodeMetadata> goodNodes = Sets.newLinkedHashSet();
Map<NodeMetadata, Exception> badNodes = Maps.newLinkedHashMap();
Multimap<NodeMetadata, CustomizationResponse> customizationResponses = LinkedHashMultimap.create();
final NodeMetadata pendingNode = new NodeMetadataBuilder().ids("id").state(NodeState.PENDING).build();
final NodeMetadata runningNode = new NodeMetadataBuilder().ids("id").state(NodeState.RUNNING).build();
expect(openSocketFinder.findOpenSocketOnNode(runningNode, 22, portTimeoutSecs, TimeUnit.SECONDS))
.andThrow(new NoSuchElementException("could not connect to any ip address port")).once();
GetNodeMetadataStrategy nodeRunning = new GetNodeMetadataStrategy(){
@Override
public NodeMetadata getNode(String input) {
Assert.assertEquals(input, pendingNode.getId());
return runningNode;
}
};
// replay mocks
replay(initScriptRunnerFactory, openSocketFinder);
// run
AtomicReference<NodeMetadata> atomicNode = new AtomicReference<NodeMetadata>(pendingNode);
new CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap( new AtomicNodeRunning(nodeRunning), openSocketFinder, timeouts,
templateOptionsToStatement, initScriptRunnerFactory, options, atomicNode, goodNodes, badNodes,
customizationResponses).apply(atomicNode);
assertEquals(goodNodes.size(), 0);
assertEquals(badNodes.keySet(), ImmutableSet.of(pendingNode));
badNodes.get(pendingNode).printStackTrace();
assertEquals(badNodes.get(pendingNode).getMessage(), "could not connect to any ip address port");
assertEquals(customizationResponses.size(), 0);
// verify mocks
verify(initScriptRunnerFactory, openSocketFinder);
}
}

View File

@ -18,39 +18,21 @@
*/
package org.jclouds.compute.util;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.jclouds.compute.util.ComputeServiceUtils.parseVersionOrReturnEmptyString;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.net.URI;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.easymock.EasyMock;
import org.jclouds.compute.config.BaseComputeServiceContextModule;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.domain.OsFamily;
import org.jclouds.compute.predicates.SocketOpenPredicates;
import org.jclouds.compute.reference.ComputeServiceConstants;
import org.jclouds.http.HttpRequest;
import org.jclouds.json.Json;
import org.jclouds.json.config.GsonModule;
import org.jclouds.logging.Logger;
import org.jclouds.predicates.SocketOpen;
import org.testng.annotations.Test;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.net.HostAndPort;
import com.google.inject.Guice;
/**
@ -98,88 +80,4 @@ public class ComputeServiceUtilsTest {
org.jclouds.scriptbuilder.domain.OsFamily.UNIX),
"curl -q -s -S -L --connect-timeout 10 --max-time 600 --retry 20 -X GET -H \"Host: adriancolehappy.s3.amazonaws.com\" -H \"Date: Sun, 12 Sep 2010 08:25:19 GMT\" -H \"Authorization: AWS 0ASHDJAS82:JASHFDA=\" https://adriancolehappy.s3.amazonaws.com/java/install |(mkdir -p /stage/ &&cd /stage/ &&tar -xpzf -)\n");
}
@Test
public void testFindReachableSocketOnNodeTimesOut() throws Exception {
final long timeoutSecs = 2;
final long timeoutMs = timeoutSecs * 1000;
final long SLOW_GRACE = 500;
final long EARLY_GRACE = 10;
SocketOpen socketTester = SocketOpenPredicates.alwaysFail;
NodeMetadata node = createMock(NodeMetadata.class);
expect(node.getPublicAddresses()).andReturn(ImmutableSet.of("1.2.3.4")).atLeastOnce();
expect(node.getPrivateAddresses()).andReturn(ImmutableSet.of("1.2.3.5")).atLeastOnce();
replay(node);
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
try {
ComputeServiceUtils.findReachableSocketOnNode(socketTester, node, 1234, timeoutMs, TimeUnit.MILLISECONDS, Logger.CONSOLE);
fail();
} catch (NoSuchElementException success) {
// expected
}
long timetaken = stopwatch.elapsedMillis();
assertTrue(timetaken >= timeoutMs-EARLY_GRACE && timetaken <= timeoutMs+SLOW_GRACE, "timetaken="+timetaken);
verify(node);
}
@Test
public void testFindReachableSocketOnNodeReturnsAvailable() throws Exception {
SocketOpen socketTester = createMock(SocketOpen.class);
expect(socketTester.apply(HostAndPort.fromParts("1.2.3.4", 22))).andReturn(false);
expect(socketTester.apply(HostAndPort.fromParts("1.2.3.5", 22))).andReturn(true);
NodeMetadata node = createMock(NodeMetadata.class);
expect(node.getPublicAddresses()).andReturn(ImmutableSet.of("1.2.3.4")).atLeastOnce();
expect(node.getPrivateAddresses()).andReturn(ImmutableSet.of("1.2.3.5")).atLeastOnce();
replay(socketTester);
replay(node);
HostAndPort result = ComputeServiceUtils.findReachableSocketOnNode(socketTester, node, 22, 2000, TimeUnit.MILLISECONDS, Logger.CONSOLE);
assertEquals(result, HostAndPort.fromParts("1.2.3.5", 22));
verify(socketTester);
verify(node);
}
@Test
public void testFindReachableSocketOnNodeAbortsWhenNodeNotRunning() throws Exception {
final long SLOW_GRACE = 500;
SocketOpen socketTester = SocketOpenPredicates.alwaysFail;
NodeMetadata node = createMock(NodeMetadata.class);
expect(node.getPublicAddresses()).andReturn(ImmutableSet.of("1.2.3.4")).atLeastOnce();
expect(node.getPrivateAddresses()).andReturn(ImmutableSet.of("1.2.3.5")).atLeastOnce();
expect(node.getId()).andReturn("myid").atLeastOnce();
Predicate<AtomicReference<NodeMetadata>> nodeRunning = createMock(Predicate.class);
expect(nodeRunning.apply(EasyMock.<AtomicReference<NodeMetadata>>anyObject())).andReturn(false);
replay(node);
replay(nodeRunning);
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
try {
ComputeServiceUtils.findReachableSocketOnNode(socketTester, nodeRunning,
node, 22, 2000000, TimeUnit.MILLISECONDS, Logger.CONSOLE);
fail();
} catch (RuntimeException e) {
if (!e.getMessage().contains("no longer running")) {
throw e;
}
}
long timetaken = stopwatch.elapsedMillis();
assertTrue(timetaken <= SLOW_GRACE, "timetaken="+timetaken);
verify(node);
verify(nodeRunning);
}
}

View File

@ -0,0 +1,193 @@
package org.jclouds.compute.util;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.easymock.EasyMock;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.concurrent.MoreExecutors;
import org.jclouds.predicates.SocketOpen;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.net.HostAndPort;
@Test(singleThreaded=true)
public class ConcurrentOpenSocketFinderTest {
private static final long SLOW_GRACE = 500;
private static final long EARLY_GRACE = 10;
private NodeMetadata node;
private SocketOpen socketTester;
private Predicate<AtomicReference<NodeMetadata>> nodeRunning;
private ExecutorService threadPool;
@SuppressWarnings("unchecked")
@BeforeMethod
public void setUp() {
node = createMock(NodeMetadata.class);
expect(node.getPublicAddresses()).andReturn(ImmutableSet.of("1.2.3.4")).atLeastOnce();
expect(node.getPrivateAddresses()).andReturn(ImmutableSet.of("1.2.3.5")).atLeastOnce();
expect(node.getId()).andReturn("myid").anyTimes();
socketTester = createMock(SocketOpen.class);
nodeRunning = createMock(Predicate.class);
replay(node);
threadPool = Executors.newCachedThreadPool();
}
@AfterMethod(alwaysRun=true)
public void tearDown() {
if (threadPool != null) threadPool.shutdownNow();
}
@Test
public void testRespectsTimeout() throws Exception {
final long timeoutMs = 1000;
expect(socketTester.apply(HostAndPort.fromParts("1.2.3.4", 22))).andReturn(false).times(2, Integer.MAX_VALUE);
expect(socketTester.apply(HostAndPort.fromParts("1.2.3.5", 22))).andReturn(false).times(2, Integer.MAX_VALUE);
expect(nodeRunning.apply(EasyMock.<AtomicReference<NodeMetadata>>anyObject())).andReturn(true);
replay(socketTester);
replay(nodeRunning);
OpenSocketFinder finder = new ConcurrentOpenSocketFinder(socketTester, null, MoreExecutors.sameThreadExecutor());
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
try {
finder.findOpenSocketOnNode(node, 22, timeoutMs, TimeUnit.MILLISECONDS);
fail();
} catch (NoSuchElementException success) {
// expected
}
long timetaken = stopwatch.elapsedMillis();
assertTrue(timetaken >= timeoutMs-EARLY_GRACE && timetaken <= timeoutMs+SLOW_GRACE, "timetaken="+timetaken);
verify(node);
verify(socketTester);
}
@Test
public void testReturnsReachable() throws Exception {
expect(socketTester.apply(HostAndPort.fromParts("1.2.3.4", 22))).andReturn(false).once();
expect(socketTester.apply(HostAndPort.fromParts("1.2.3.5", 22))).andReturn(true).once();
expect(nodeRunning.apply(EasyMock.<AtomicReference<NodeMetadata>>anyObject())).andReturn(true);
replay(socketTester);
replay(nodeRunning);
OpenSocketFinder finder = new ConcurrentOpenSocketFinder(socketTester, null, MoreExecutors.sameThreadExecutor());
HostAndPort result = finder.findOpenSocketOnNode(node, 22, 2000, TimeUnit.MILLISECONDS);
assertEquals(result, HostAndPort.fromParts("1.2.3.5", 22));
verify(node);
verify(socketTester);
}
@Test
public void testChecksSocketsConcurrently() throws Exception {
long delayForReachableMs = 25;
expect(nodeRunning.apply(EasyMock.<AtomicReference<NodeMetadata>>anyObject())).andReturn(true);
replay(nodeRunning);
// Can't use mock+answer for concurrency tests; EasyMock uses lock in ReplayState
ControllableSocketOpen socketTester = new ControllableSocketOpen(ImmutableMap.of(
HostAndPort.fromParts("1.2.3.4", 22), new SlowCallable<Boolean>(false, 1000),
HostAndPort.fromParts("1.2.3.5", 22), new SlowCallable<Boolean>(true, delayForReachableMs)));
OpenSocketFinder finder = new ConcurrentOpenSocketFinder(socketTester, null, threadPool);
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
HostAndPort result = finder.findOpenSocketOnNode(node, 22, 2000, TimeUnit.MILLISECONDS);
long timetaken = stopwatch.elapsedMillis();
assertEquals(result, HostAndPort.fromParts("1.2.3.5", 22));
assertTrue(timetaken >= delayForReachableMs-EARLY_GRACE && timetaken <= delayForReachableMs+SLOW_GRACE, "timetaken="+timetaken);
verify(node);
}
@Test
public void testAbortsWhenNodeNotRunning() throws Exception {
expect(socketTester.apply(EasyMock.<HostAndPort>anyObject())).andReturn(false);
expect(nodeRunning.apply(EasyMock.<AtomicReference<NodeMetadata>>anyObject())).andReturn(false);
replay(socketTester);
replay(nodeRunning);
OpenSocketFinder finder = new ConcurrentOpenSocketFinder(socketTester, nodeRunning, MoreExecutors.sameThreadExecutor());
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
try {
finder.findOpenSocketOnNode(node, 22, 2000, TimeUnit.MILLISECONDS);
fail();
} catch (NoSuchElementException e) {
// success
// Note: don't get the "no longer running" message, because logged+swallowed by RetryablePredicate
}
long timetaken = stopwatch.elapsedMillis();
assertTrue(timetaken <= SLOW_GRACE, "timetaken="+timetaken);
verify(node);
verify(socketTester);
verify(nodeRunning);
}
private static class SlowCallable<T> implements Callable<T> {
private final T result;
private final long delay;
SlowCallable(T result, long delay) {
this.result = result;
this.delay = delay;
}
@Override
public T call() throws Exception {
Thread.sleep(delay);
return result;
}
};
private static class ControllableSocketOpen implements SocketOpen {
private final Map<HostAndPort, ? extends Callable<Boolean>> answers;
ControllableSocketOpen(Map<HostAndPort, ? extends Callable<Boolean>> answers) {
this.answers = answers;
}
@Override
public boolean apply(HostAndPort input) {
try {
return answers.get(input).call();
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
};
}