From 612f8f2bba84ea3214acc1cc491a5ec0e3448d13 Mon Sep 17 00:00:00 2001 From: Aled Sage Date: Fri, 11 May 2012 10:09:55 +0100 Subject: [PATCH 1/3] Issue 858: fix timeout waiting for port connection, and abort when node!=running --- ...ateSshClientOncePortIsListeningOnNode.java | 25 ++++- ...dAddToGoodMapOrPutExceptionIntoBadMap.java | 12 ++- .../compute/util/ComputeServiceUtils.java | 80 +++++++++++--- ...ToGoodMapOrPutExceptionIntoBadMapTest.java | 6 +- .../compute/util/ComputeServiceUtilsTest.java | 101 ++++++++++++++++++ 5 files changed, 194 insertions(+), 30 deletions(-) diff --git a/compute/src/main/java/org/jclouds/compute/functions/CreateSshClientOncePortIsListeningOnNode.java b/compute/src/main/java/org/jclouds/compute/functions/CreateSshClientOncePortIsListeningOnNode.java index 90a5b23c8e..7dac74005b 100644 --- a/compute/src/main/java/org/jclouds/compute/functions/CreateSshClientOncePortIsListeningOnNode.java +++ b/compute/src/main/java/org/jclouds/compute/functions/CreateSshClientOncePortIsListeningOnNode.java @@ -21,11 +21,18 @@ package org.jclouds.compute.functions; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import java.util.concurrent.TimeUnit; + +import javax.annotation.Resource; +import javax.inject.Named; import javax.inject.Singleton; import org.jclouds.compute.domain.NodeMetadata; -import org.jclouds.compute.predicates.RetryIfSocketNotYetOpen; +import org.jclouds.compute.reference.ComputeServiceConstants; +import org.jclouds.compute.reference.ComputeServiceConstants.Timeouts; import org.jclouds.compute.util.ComputeServiceUtils; +import org.jclouds.logging.Logger; +import org.jclouds.predicates.SocketOpen; import org.jclouds.ssh.SshClient; import com.google.common.base.Function; @@ -39,13 +46,20 @@ import com.google.inject.Inject; */ @Singleton public class CreateSshClientOncePortIsListeningOnNode implements Function { + @Resource + @Named(ComputeServiceConstants.COMPUTE_LOGGER) + protected Logger logger = Logger.NULL; + @Inject(optional = true) SshClient.Factory sshFactory; - private final RetryIfSocketNotYetOpen socketTester; - + private final SocketOpen socketTester; + + private final long timeoutMs; + @Inject - public CreateSshClientOncePortIsListeningOnNode(RetryIfSocketNotYetOpen socketTester) { + public CreateSshClientOncePortIsListeningOnNode(SocketOpen socketTester, Timeouts timeouts) { this.socketTester = socketTester; + this.timeoutMs = timeouts.portOpen; } @Override @@ -55,7 +69,8 @@ public class CreateSshClientOncePortIsListeningOnNode implements Function> nodeRunning; private final InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory; - private final RetryIfSocketNotYetOpen socketTester; + private final SocketOpen socketTester; private final Timeouts timeouts; @Nullable @@ -87,7 +88,7 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Cal @AssistedInject public CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap( @Named("NODE_RUNNING") Predicate> nodeRunning, - RetryIfSocketNotYetOpen socketTester, Timeouts timeouts, + SocketOpen socketTester, Timeouts timeouts, Function templateOptionsToStatement, InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory, @Assisted TemplateOptions options, @Assisted AtomicReference node, @Assisted Set goodNodes, @@ -109,7 +110,7 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Cal @AssistedInject public CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap( @Named("NODE_RUNNING") Predicate> nodeRunning, GetNodeMetadataStrategy getNode, - RetryIfSocketNotYetOpen socketTester, Timeouts timeouts, + SocketOpen socketTester, Timeouts timeouts, Function templateOptionsToStatement, InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory, @Assisted TemplateOptions options, @Assisted Set goodNodes, @Assisted Map badNodes, @@ -153,7 +154,8 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Cal } } if (options.getPort() > 0) { - findReachableSocketOnNode(socketTester.seconds(options.getSeconds()), node.get(), options.getPort()); + findReachableSocketOnNode(socketTester, nodeRunning, node.get(), options.getPort(), + options.getSeconds(), TimeUnit.SECONDS, logger); } } logger.debug("<< customized node(%s)", originalId); diff --git a/compute/src/main/java/org/jclouds/compute/util/ComputeServiceUtils.java b/compute/src/main/java/org/jclouds/compute/util/ComputeServiceUtils.java index ff149ab778..8b47b0bc71 100644 --- a/compute/src/main/java/org/jclouds/compute/util/ComputeServiceUtils.java +++ b/compute/src/main/java/org/jclouds/compute/util/ComputeServiceUtils.java @@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Throwables.getStackTraceAsString; import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterables.filter; -import static com.google.common.collect.Iterables.find; import static com.google.common.collect.Iterables.size; import static com.google.common.collect.Iterables.transform; import static org.jclouds.scriptbuilder.domain.Statements.pipeHttpResponseToBash; @@ -32,8 +31,12 @@ import java.util.Formatter; import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; +import javax.annotation.Nullable; + import org.jclouds.compute.ComputeServiceContext; import org.jclouds.compute.domain.ComputeMetadata; import org.jclouds.compute.domain.Hardware; @@ -41,8 +44,10 @@ import org.jclouds.compute.domain.NodeMetadata; import org.jclouds.compute.domain.OsFamily; import org.jclouds.compute.domain.Processor; import org.jclouds.compute.domain.Volume; -import org.jclouds.compute.predicates.RetryIfSocketNotYetOpen; import org.jclouds.http.HttpRequest; +import org.jclouds.logging.Logger; +import org.jclouds.predicates.RetryablePredicate; +import org.jclouds.predicates.SocketOpen; import org.jclouds.scriptbuilder.domain.Statement; import org.jclouds.scriptbuilder.domain.Statements; @@ -165,24 +170,65 @@ public class ComputeServiceUtils { return org.jclouds.rest.Providers.getSupportedProvidersOfType(TypeToken.of(ComputeServiceContext.class)); } - public static HostAndPort findReachableSocketOnNode(RetryIfSocketNotYetOpen socketTester, final NodeMetadata node, - final int port) { + public static HostAndPort findReachableSocketOnNode(final SocketOpen socketTester, final NodeMetadata node, + final int port, long timeoutValue, TimeUnit timeUnits, Logger logger) { + return findReachableSocketOnNode(socketTester, null, node, port, timeoutValue, timeUnits, logger); + } + + public static HostAndPort findReachableSocketOnNode(final SocketOpen socketTester, + @Nullable final Predicate> nodeRunning, final NodeMetadata node, + final int port, long timeoutValue, TimeUnit timeUnits, Logger logger) { checkNodeHasIps(node); - HostAndPort socket = null; - try { - socket = find(transform(concat(node.getPublicAddresses(), node.getPrivateAddresses()), - new Function() { - @Override - public HostAndPort apply(String from) { - return HostAndPort.fromParts(from, port); - } - }), socketTester); - } catch (NoSuchElementException e) { - throw new NoSuchElementException(String.format("could not connect to any ip address port %d on node %s", port, - node)); + Iterable sockets = transform(concat(node.getPublicAddresses(), node.getPrivateAddresses()), + new Function() { + + @Override + public HostAndPort apply(String from) { + return HostAndPort.fromParts(from, port); + } + }); + + // Specify a retry period of 1s, expressed in the same time units. + long period = timeUnits.convert(1, TimeUnit.SECONDS); + + // For storing the result, as predicate will just tell us true/false + final AtomicReference result = new AtomicReference(); + + Predicate> multiIpSocketTester = new Predicate>() { + + @Override + public boolean apply(Iterable sockets) { + for (HostAndPort socket : sockets) { + if (socketTester.apply(socket)) { + result.set(socket); + return true; + } + } + if (nodeRunning != null && !nodeRunning.apply(new AtomicReference(node))) { + throw new IllegalStateException(String.format("Node %s is no longer running; aborting waiting for ip:port connection", node.getId())); + } + return false; + } + + }; + + RetryablePredicate> tester = new RetryablePredicate>( + multiIpSocketTester, timeoutValue, period, timeUnits); + + logger.debug(">> blocking on sockets %s for %d %s", sockets, timeoutValue, timeUnits); + + boolean passed = tester.apply(sockets); + + if (passed) { + logger.debug("<< socket %s opened", result); + assert result.get() != null; + return result.get(); + } else { + logger.warn("<< sockets %s didn't open after %d %s", sockets, timeoutValue, timeUnits); + throw new NoSuchElementException(String.format("could not connect to any ip address port %d on node %s", + port, node)); } - return socket; } public static void checkNodeHasIps(NodeMetadata node) { diff --git a/compute/src/test/java/org/jclouds/compute/strategy/CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest.java b/compute/src/test/java/org/jclouds/compute/strategy/CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest.java index 85ae78f04d..11585e053a 100644 --- a/compute/src/test/java/org/jclouds/compute/strategy/CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest.java +++ b/compute/src/test/java/org/jclouds/compute/strategy/CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest.java @@ -34,8 +34,8 @@ import org.jclouds.compute.domain.NodeState; import org.jclouds.compute.functions.TemplateOptionsToStatement; import org.jclouds.compute.options.TemplateOptions; import org.jclouds.compute.predicates.AtomicNodeRunning; -import org.jclouds.compute.predicates.RetryIfSocketNotYetOpen; import org.jclouds.compute.reference.ComputeServiceConstants.Timeouts; +import org.jclouds.predicates.SocketOpen; import org.jclouds.scriptbuilder.domain.Statement; import org.testng.Assert; import org.testng.annotations.Test; @@ -55,7 +55,7 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest { public void testBreakWhenNodeStillPending() { InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory = createMock(InitializeRunScriptOnNodeOrPlaceInBadMap.Factory.class); - RetryIfSocketNotYetOpen socketTester = createMock(RetryIfSocketNotYetOpen.class); + SocketOpen socketTester = createMock(SocketOpen.class); Timeouts timeouts = new Timeouts(); Function templateOptionsToStatement = new TemplateOptionsToStatement(); @SuppressWarnings("unused") @@ -98,7 +98,7 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest { public void testBreakGraceFullyWhenNodeDied() { InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory = createMock(InitializeRunScriptOnNodeOrPlaceInBadMap.Factory.class); - RetryIfSocketNotYetOpen socketTester = createMock(RetryIfSocketNotYetOpen.class); + SocketOpen socketTester = createMock(SocketOpen.class); Timeouts timeouts = new Timeouts(); Function templateOptionsToStatement = new TemplateOptionsToStatement(); @SuppressWarnings("unused") diff --git a/compute/src/test/java/org/jclouds/compute/util/ComputeServiceUtilsTest.java b/compute/src/test/java/org/jclouds/compute/util/ComputeServiceUtilsTest.java index 6bca217d1c..b606e1f4f0 100644 --- a/compute/src/test/java/org/jclouds/compute/util/ComputeServiceUtilsTest.java +++ b/compute/src/test/java/org/jclouds/compute/util/ComputeServiceUtilsTest.java @@ -18,21 +18,39 @@ */ package org.jclouds.compute.util; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; import static org.jclouds.compute.util.ComputeServiceUtils.parseVersionOrReturnEmptyString; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import java.net.URI; import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.easymock.EasyMock; import org.jclouds.compute.config.BaseComputeServiceContextModule; +import org.jclouds.compute.domain.NodeMetadata; import org.jclouds.compute.domain.OsFamily; +import org.jclouds.compute.predicates.SocketOpenPredicates; import org.jclouds.compute.reference.ComputeServiceConstants; import org.jclouds.http.HttpRequest; import org.jclouds.json.Json; import org.jclouds.json.config.GsonModule; +import org.jclouds.logging.Logger; +import org.jclouds.predicates.SocketOpen; import org.testng.annotations.Test; +import com.google.common.base.Predicate; +import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.ImmutableSet; +import com.google.common.net.HostAndPort; import com.google.inject.Guice; /** @@ -79,6 +97,89 @@ public class ComputeServiceUtilsTest { ComputeServiceUtils.extractTargzIntoDirectory(request, "/stage/").render( org.jclouds.scriptbuilder.domain.OsFamily.UNIX), "curl -q -s -S -L --connect-timeout 10 --max-time 600 --retry 20 -X GET -H \"Host: adriancolehappy.s3.amazonaws.com\" -H \"Date: Sun, 12 Sep 2010 08:25:19 GMT\" -H \"Authorization: AWS 0ASHDJAS82:JASHFDA=\" https://adriancolehappy.s3.amazonaws.com/java/install |(mkdir -p /stage/ &&cd /stage/ &&tar -xpzf -)\n"); + } + + @Test + public void testFindReachableSocketOnNodeTimesOut() throws Exception { + final long timeoutSecs = 2; + final long timeoutMs = timeoutSecs * 1000; + final long SLOW_GRACE = 500; + final long EARLY_GRACE = 10; + + SocketOpen socketTester = SocketOpenPredicates.alwaysFail; + NodeMetadata node = createMock(NodeMetadata.class); + expect(node.getPublicAddresses()).andReturn(ImmutableSet.of("1.2.3.4")).atLeastOnce(); + expect(node.getPrivateAddresses()).andReturn(ImmutableSet.of("1.2.3.5")).atLeastOnce(); + + replay(node); + + Stopwatch stopwatch = new Stopwatch(); + stopwatch.start(); + try { + ComputeServiceUtils.findReachableSocketOnNode(socketTester, node, 1234, timeoutMs, TimeUnit.MILLISECONDS, Logger.CONSOLE); + fail(); + } catch (NoSuchElementException success) { + // expected + } + long timetaken = stopwatch.elapsedMillis(); + + assertTrue(timetaken >= timeoutMs-EARLY_GRACE && timetaken <= timeoutMs+SLOW_GRACE, "timetaken="+timetaken); + verify(node); + } + + @Test + public void testFindReachableSocketOnNodeReturnsAvailable() throws Exception { + SocketOpen socketTester = createMock(SocketOpen.class); + expect(socketTester.apply(HostAndPort.fromParts("1.2.3.4", 22))).andReturn(false); + expect(socketTester.apply(HostAndPort.fromParts("1.2.3.5", 22))).andReturn(true); + + NodeMetadata node = createMock(NodeMetadata.class); + expect(node.getPublicAddresses()).andReturn(ImmutableSet.of("1.2.3.4")).atLeastOnce(); + expect(node.getPrivateAddresses()).andReturn(ImmutableSet.of("1.2.3.5")).atLeastOnce(); + + replay(socketTester); + replay(node); + + HostAndPort result = ComputeServiceUtils.findReachableSocketOnNode(socketTester, node, 22, 2000, TimeUnit.MILLISECONDS, Logger.CONSOLE); + assertEquals(result, HostAndPort.fromParts("1.2.3.5", 22)); + verify(socketTester); + verify(node); + } + + @Test + public void testFindReachableSocketOnNodeAbortsWhenNodeNotRunning() throws Exception { + final long SLOW_GRACE = 500; + + SocketOpen socketTester = SocketOpenPredicates.alwaysFail; + + NodeMetadata node = createMock(NodeMetadata.class); + expect(node.getPublicAddresses()).andReturn(ImmutableSet.of("1.2.3.4")).atLeastOnce(); + expect(node.getPrivateAddresses()).andReturn(ImmutableSet.of("1.2.3.5")).atLeastOnce(); + expect(node.getId()).andReturn("myid").atLeastOnce(); + + Predicate> nodeRunning = createMock(Predicate.class); + expect(nodeRunning.apply(EasyMock.>anyObject())).andReturn(false); + + replay(node); + replay(nodeRunning); + + Stopwatch stopwatch = new Stopwatch(); + stopwatch.start(); + try { + ComputeServiceUtils.findReachableSocketOnNode(socketTester, nodeRunning, + node, 22, 2000000, TimeUnit.MILLISECONDS, Logger.CONSOLE); + fail(); + } catch (RuntimeException e) { + if (!e.getMessage().contains("no longer running")) { + throw e; + } + } + long timetaken = stopwatch.elapsedMillis(); + + assertTrue(timetaken <= SLOW_GRACE, "timetaken="+timetaken); + + verify(node); + verify(nodeRunning); } } From 83c9ecc3d813dbf250481bc164f59dc7168d67c5 Mon Sep 17 00:00:00 2001 From: Aled Sage Date: Mon, 14 May 2012 10:38:10 +0100 Subject: [PATCH 2/3] Issue 858: moved RetryIfSocketNotYetOpen from compute/ to labs/virtualbox/ --- .../org/jclouds/compute/predicates/SocketOpenPredicates.java | 2 +- .../functions/admin/StartVBoxIfNotAlreadyRunning.java | 2 +- .../jclouds/virtualbox}/predicates/RetryIfSocketNotYetOpen.java | 2 +- .../functions/admin/StartVBoxIfNotAlreadyRunningLiveTest.java | 2 +- .../virtualbox}/predicates/RetryIfSocketNotYetOpenTest.java | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) rename {compute/src/main/java/org/jclouds/compute => labs/virtualbox/src/main/java/org/jclouds/virtualbox}/predicates/RetryIfSocketNotYetOpen.java (98%) rename {compute/src/test/java/org/jclouds/compute => labs/virtualbox/src/test/java/org/jclouds/virtualbox}/predicates/RetryIfSocketNotYetOpenTest.java (98%) diff --git a/compute/src/test/java/org/jclouds/compute/predicates/SocketOpenPredicates.java b/compute/src/test/java/org/jclouds/compute/predicates/SocketOpenPredicates.java index 86ddbef963..eb8aab1ce7 100644 --- a/compute/src/test/java/org/jclouds/compute/predicates/SocketOpenPredicates.java +++ b/compute/src/test/java/org/jclouds/compute/predicates/SocketOpenPredicates.java @@ -24,7 +24,7 @@ import com.google.common.net.HostAndPort; /** - * For use in unit tests, e.g. {@link RetryIfSocketNotYetOpenTest}. + * For use in unit tests. */ public class SocketOpenPredicates { diff --git a/labs/virtualbox/src/main/java/org/jclouds/virtualbox/functions/admin/StartVBoxIfNotAlreadyRunning.java b/labs/virtualbox/src/main/java/org/jclouds/virtualbox/functions/admin/StartVBoxIfNotAlreadyRunning.java index 6c9d561c42..d75b83d927 100644 --- a/labs/virtualbox/src/main/java/org/jclouds/virtualbox/functions/admin/StartVBoxIfNotAlreadyRunning.java +++ b/labs/virtualbox/src/main/java/org/jclouds/virtualbox/functions/admin/StartVBoxIfNotAlreadyRunning.java @@ -33,13 +33,13 @@ import javax.inject.Singleton; import org.jclouds.compute.callables.RunScriptOnNode.Factory; import org.jclouds.compute.domain.NodeMetadata; -import org.jclouds.compute.predicates.RetryIfSocketNotYetOpen; import org.jclouds.compute.reference.ComputeServiceConstants; import org.jclouds.location.Provider; import org.jclouds.logging.Logger; import org.jclouds.rest.annotations.Credential; import org.jclouds.rest.annotations.Identity; import org.jclouds.scriptbuilder.domain.Statements; +import org.jclouds.virtualbox.predicates.RetryIfSocketNotYetOpen; import org.virtualbox_4_1.SessionState; import org.virtualbox_4_1.VirtualBoxManager; diff --git a/compute/src/main/java/org/jclouds/compute/predicates/RetryIfSocketNotYetOpen.java b/labs/virtualbox/src/main/java/org/jclouds/virtualbox/predicates/RetryIfSocketNotYetOpen.java similarity index 98% rename from compute/src/main/java/org/jclouds/compute/predicates/RetryIfSocketNotYetOpen.java rename to labs/virtualbox/src/main/java/org/jclouds/virtualbox/predicates/RetryIfSocketNotYetOpen.java index 50c0dea589..b1d506b88e 100644 --- a/compute/src/main/java/org/jclouds/compute/predicates/RetryIfSocketNotYetOpen.java +++ b/labs/virtualbox/src/main/java/org/jclouds/virtualbox/predicates/RetryIfSocketNotYetOpen.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.jclouds.compute.predicates; +package org.jclouds.virtualbox.predicates; import java.util.concurrent.TimeUnit; diff --git a/labs/virtualbox/src/test/java/org/jclouds/virtualbox/functions/admin/StartVBoxIfNotAlreadyRunningLiveTest.java b/labs/virtualbox/src/test/java/org/jclouds/virtualbox/functions/admin/StartVBoxIfNotAlreadyRunningLiveTest.java index 45958fe7f4..783081c01b 100644 --- a/labs/virtualbox/src/test/java/org/jclouds/virtualbox/functions/admin/StartVBoxIfNotAlreadyRunningLiveTest.java +++ b/labs/virtualbox/src/test/java/org/jclouds/virtualbox/functions/admin/StartVBoxIfNotAlreadyRunningLiveTest.java @@ -35,8 +35,8 @@ import org.jclouds.compute.domain.NodeMetadata; import org.jclouds.compute.domain.NodeMetadataBuilder; import org.jclouds.compute.domain.NodeState; import org.jclouds.compute.domain.OperatingSystem; -import org.jclouds.compute.predicates.RetryIfSocketNotYetOpen; import org.jclouds.scriptbuilder.domain.Statements; +import org.jclouds.virtualbox.predicates.RetryIfSocketNotYetOpen; import org.testng.annotations.Test; import org.virtualbox_4_1.VirtualBoxManager; diff --git a/compute/src/test/java/org/jclouds/compute/predicates/RetryIfSocketNotYetOpenTest.java b/labs/virtualbox/src/test/java/org/jclouds/virtualbox/predicates/RetryIfSocketNotYetOpenTest.java similarity index 98% rename from compute/src/test/java/org/jclouds/compute/predicates/RetryIfSocketNotYetOpenTest.java rename to labs/virtualbox/src/test/java/org/jclouds/virtualbox/predicates/RetryIfSocketNotYetOpenTest.java index c3a24721f0..782bc77968 100644 --- a/compute/src/test/java/org/jclouds/compute/predicates/RetryIfSocketNotYetOpenTest.java +++ b/labs/virtualbox/src/test/java/org/jclouds/virtualbox/predicates/RetryIfSocketNotYetOpenTest.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.jclouds.compute.predicates; +package org.jclouds.virtualbox.predicates; import static org.jclouds.compute.predicates.SocketOpenPredicates.alwaysFail; import static org.testng.Assert.assertEquals; From b3a027f06525b608d6d7763cad9991fe5fa2c89d Mon Sep 17 00:00:00 2001 From: Aled Sage Date: Mon, 14 May 2012 12:43:20 +0100 Subject: [PATCH 3/3] Issue 858: extracted OpenSocketFinder from ComputeServiceUtils --- ...ateSshClientOncePortIsListeningOnNode.java | 14 +- ...dAddToGoodMapOrPutExceptionIntoBadMap.java | 17 +- .../compute/util/ComputeServiceUtils.java | 79 ------- .../util/ConcurrentOpenSocketFinder.java | 170 +++++++++++++++ .../compute/util/OpenSocketFinder.java | 30 +++ ...ToGoodMapOrPutExceptionIntoBadMapTest.java | 67 +++++- .../compute/util/ComputeServiceUtilsTest.java | 102 --------- .../util/ConcurrentOpenSocketFinderTest.java | 193 ++++++++++++++++++ 8 files changed, 466 insertions(+), 206 deletions(-) create mode 100644 compute/src/main/java/org/jclouds/compute/util/ConcurrentOpenSocketFinder.java create mode 100644 compute/src/main/java/org/jclouds/compute/util/OpenSocketFinder.java create mode 100644 compute/src/test/java/org/jclouds/compute/util/ConcurrentOpenSocketFinderTest.java diff --git a/compute/src/main/java/org/jclouds/compute/functions/CreateSshClientOncePortIsListeningOnNode.java b/compute/src/main/java/org/jclouds/compute/functions/CreateSshClientOncePortIsListeningOnNode.java index 7dac74005b..efa16b7016 100644 --- a/compute/src/main/java/org/jclouds/compute/functions/CreateSshClientOncePortIsListeningOnNode.java +++ b/compute/src/main/java/org/jclouds/compute/functions/CreateSshClientOncePortIsListeningOnNode.java @@ -30,9 +30,8 @@ import javax.inject.Singleton; import org.jclouds.compute.domain.NodeMetadata; import org.jclouds.compute.reference.ComputeServiceConstants; import org.jclouds.compute.reference.ComputeServiceConstants.Timeouts; -import org.jclouds.compute.util.ComputeServiceUtils; +import org.jclouds.compute.util.OpenSocketFinder; import org.jclouds.logging.Logger; -import org.jclouds.predicates.SocketOpen; import org.jclouds.ssh.SshClient; import com.google.common.base.Function; @@ -52,13 +51,14 @@ public class CreateSshClientOncePortIsListeningOnNode implements Function> nodeRunning; private final InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory; - private final SocketOpen socketTester; private final Timeouts timeouts; + private final OpenSocketFinder openSocketFinder; @Nullable private final Statement statement; @@ -88,7 +87,7 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Cal @AssistedInject public CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap( @Named("NODE_RUNNING") Predicate> nodeRunning, - SocketOpen socketTester, Timeouts timeouts, + OpenSocketFinder openSocketFinder, Timeouts timeouts, Function templateOptionsToStatement, InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory, @Assisted TemplateOptions options, @Assisted AtomicReference node, @Assisted Set goodNodes, @@ -98,7 +97,7 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Cal checkNotNull(options, "options")); this.nodeRunning = checkNotNull(nodeRunning, "nodeRunning"); this.initScriptRunnerFactory = checkNotNull(initScriptRunnerFactory, "initScriptRunnerFactory"); - this.socketTester = checkNotNull(socketTester, "socketTester"); + this.openSocketFinder = checkNotNull(openSocketFinder, "openSocketFinder"); this.timeouts = checkNotNull(timeouts, "timeouts"); this.node = node; this.options = checkNotNull(options, "options"); @@ -110,12 +109,12 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Cal @AssistedInject public CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap( @Named("NODE_RUNNING") Predicate> nodeRunning, GetNodeMetadataStrategy getNode, - SocketOpen socketTester, Timeouts timeouts, + OpenSocketFinder openSocketFinder, Timeouts timeouts, Function templateOptionsToStatement, InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory, @Assisted TemplateOptions options, @Assisted Set goodNodes, @Assisted Map badNodes, @Assisted Multimap customizationResponses) { - this(nodeRunning, socketTester, timeouts, templateOptionsToStatement, initScriptRunnerFactory, options, + this(nodeRunning, openSocketFinder, timeouts, templateOptionsToStatement, initScriptRunnerFactory, options, new AtomicReference(null), goodNodes, badNodes, customizationResponses); } @@ -154,8 +153,8 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Cal } } if (options.getPort() > 0) { - findReachableSocketOnNode(socketTester, nodeRunning, node.get(), options.getPort(), - options.getSeconds(), TimeUnit.SECONDS, logger); + openSocketFinder.findOpenSocketOnNode(node.get(), options.getPort(), + options.getSeconds(), TimeUnit.SECONDS); } } logger.debug("<< customized node(%s)", originalId); diff --git a/compute/src/main/java/org/jclouds/compute/util/ComputeServiceUtils.java b/compute/src/main/java/org/jclouds/compute/util/ComputeServiceUtils.java index 8b47b0bc71..cf3e18f27d 100644 --- a/compute/src/main/java/org/jclouds/compute/util/ComputeServiceUtils.java +++ b/compute/src/main/java/org/jclouds/compute/util/ComputeServiceUtils.java @@ -18,12 +18,8 @@ */ package org.jclouds.compute.util; -import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Throwables.getStackTraceAsString; -import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterables.filter; -import static com.google.common.collect.Iterables.size; -import static com.google.common.collect.Iterables.transform; import static org.jclouds.scriptbuilder.domain.Statements.pipeHttpResponseToBash; import java.net.URI; @@ -31,12 +27,8 @@ import java.util.Formatter; import java.util.Map; import java.util.Map.Entry; import java.util.NoSuchElementException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; -import javax.annotation.Nullable; - import org.jclouds.compute.ComputeServiceContext; import org.jclouds.compute.domain.ComputeMetadata; import org.jclouds.compute.domain.Hardware; @@ -45,16 +37,11 @@ import org.jclouds.compute.domain.OsFamily; import org.jclouds.compute.domain.Processor; import org.jclouds.compute.domain.Volume; import org.jclouds.http.HttpRequest; -import org.jclouds.logging.Logger; -import org.jclouds.predicates.RetryablePredicate; -import org.jclouds.predicates.SocketOpen; import org.jclouds.scriptbuilder.domain.Statement; import org.jclouds.scriptbuilder.domain.Statements; -import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; -import com.google.common.net.HostAndPort; import com.google.common.reflect.TypeToken; /** @@ -170,72 +157,6 @@ public class ComputeServiceUtils { return org.jclouds.rest.Providers.getSupportedProvidersOfType(TypeToken.of(ComputeServiceContext.class)); } - public static HostAndPort findReachableSocketOnNode(final SocketOpen socketTester, final NodeMetadata node, - final int port, long timeoutValue, TimeUnit timeUnits, Logger logger) { - return findReachableSocketOnNode(socketTester, null, node, port, timeoutValue, timeUnits, logger); - } - - public static HostAndPort findReachableSocketOnNode(final SocketOpen socketTester, - @Nullable final Predicate> nodeRunning, final NodeMetadata node, - final int port, long timeoutValue, TimeUnit timeUnits, Logger logger) { - checkNodeHasIps(node); - - Iterable sockets = transform(concat(node.getPublicAddresses(), node.getPrivateAddresses()), - new Function() { - - @Override - public HostAndPort apply(String from) { - return HostAndPort.fromParts(from, port); - } - }); - - // Specify a retry period of 1s, expressed in the same time units. - long period = timeUnits.convert(1, TimeUnit.SECONDS); - - // For storing the result, as predicate will just tell us true/false - final AtomicReference result = new AtomicReference(); - - Predicate> multiIpSocketTester = new Predicate>() { - - @Override - public boolean apply(Iterable sockets) { - for (HostAndPort socket : sockets) { - if (socketTester.apply(socket)) { - result.set(socket); - return true; - } - } - if (nodeRunning != null && !nodeRunning.apply(new AtomicReference(node))) { - throw new IllegalStateException(String.format("Node %s is no longer running; aborting waiting for ip:port connection", node.getId())); - } - return false; - } - - }; - - RetryablePredicate> tester = new RetryablePredicate>( - multiIpSocketTester, timeoutValue, period, timeUnits); - - logger.debug(">> blocking on sockets %s for %d %s", sockets, timeoutValue, timeUnits); - - boolean passed = tester.apply(sockets); - - if (passed) { - logger.debug("<< socket %s opened", result); - assert result.get() != null; - return result.get(); - } else { - logger.warn("<< sockets %s didn't open after %d %s", sockets, timeoutValue, timeUnits); - throw new NoSuchElementException(String.format("could not connect to any ip address port %d on node %s", - port, node)); - } - } - - public static void checkNodeHasIps(NodeMetadata node) { - checkState(size(concat(node.getPublicAddresses(), node.getPrivateAddresses())) > 0, - "node does not have IP addresses configured: " + node); - } - public static String parseVersionOrReturnEmptyString(org.jclouds.compute.domain.OsFamily family, String in, Map> osVersionMap) { if (osVersionMap.containsKey(family)) { diff --git a/compute/src/main/java/org/jclouds/compute/util/ConcurrentOpenSocketFinder.java b/compute/src/main/java/org/jclouds/compute/util/ConcurrentOpenSocketFinder.java new file mode 100644 index 0000000000..efe3b84bee --- /dev/null +++ b/compute/src/main/java/org/jclouds/compute/util/ConcurrentOpenSocketFinder.java @@ -0,0 +1,170 @@ +package org.jclouds.compute.util; + +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.Iterables.concat; +import static com.google.common.collect.Iterables.size; +import static com.google.common.collect.Iterables.transform; + +import java.util.Collection; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import javax.annotation.Resource; +import javax.inject.Named; + +import org.jclouds.Constants; +import org.jclouds.compute.domain.NodeMetadata; +import org.jclouds.compute.reference.ComputeServiceConstants; +import org.jclouds.logging.Logger; +import org.jclouds.predicates.RetryablePredicate; +import org.jclouds.predicates.SocketOpen; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableSet; +import com.google.common.net.HostAndPort; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.inject.Inject; + +public class ConcurrentOpenSocketFinder implements OpenSocketFinder { + + @Resource + @Named(ComputeServiceConstants.COMPUTE_LOGGER) + private Logger logger = Logger.NULL; + + private final SocketOpen socketTester; + private final Predicate> nodeRunning; + private final ListeningExecutorService executor; + + @Inject + public ConcurrentOpenSocketFinder(SocketOpen socketTester, + @Named("NODE_RUNNING") final Predicate> nodeRunning, + @Named(Constants.PROPERTY_USER_THREADS) ExecutorService userThreads) { + this.socketTester = socketTester; + this.nodeRunning = nodeRunning; + this.executor = MoreExecutors.listeningDecorator(userThreads); + } + + public HostAndPort findOpenSocketOnNode(final NodeMetadata node, final int port, + long timeoutValue, TimeUnit timeUnits) { + Iterable hosts = checkNodeHasIps(node); + Set sockets = toHostAndPorts(hosts, port); + + // Specify a retry period of 1s, expressed in the same time units. + long period = timeUnits.convert(1, TimeUnit.SECONDS); + + // For storing the result; needed because predicate will just tell us true/false + final AtomicReference result = new AtomicReference(); + + Predicate> concurrentOpenSocketFinder = new Predicate>() { + + @Override + public boolean apply(Collection input) { + HostAndPort reachableSocket = findOpenSocket(input); + if (reachableSocket != null) { + result.set(reachableSocket); + return true; + } else { + if (nodeRunning != null && !nodeRunning.apply(new AtomicReference(node))) { + throw new IllegalStateException(String.format("Node %s is no longer running; aborting waiting for ip:port connection", node.getId())); + } + return false; + } + } + + }; + + RetryablePredicate> retryingOpenSocketFinder = new RetryablePredicate>( + concurrentOpenSocketFinder, timeoutValue, period, timeUnits); + + logger.debug(">> blocking on sockets %s for %d %s", sockets, timeoutValue, timeUnits); + + boolean passed = retryingOpenSocketFinder.apply(sockets); + + if (passed) { + logger.debug("<< socket %s opened", result); + assert result.get() != null; + return result.get(); + } else { + logger.warn("<< sockets %s didn't open after %d %s", sockets, timeoutValue, timeUnits); + throw new NoSuchElementException(String.format("could not connect to any ip address port %d on node %s", + port, node)); + } + + } + + /** + * Checks if any any of the given HostAndPorts are reachable. It checks them all concurrently, + * and returns the first one found or null if none are reachable. + * + * @return A reachable HostAndPort, or null. + * @throws InterruptedException + */ + private HostAndPort findOpenSocket(final Collection sockets) { + final AtomicReference result = new AtomicReference(); + final CountDownLatch latch = new CountDownLatch(1); + final AtomicInteger completeCount = new AtomicInteger(); + + for (final HostAndPort socket : sockets) { + final ListenableFuture future = executor.submit(new Runnable() { + + @Override + public void run() { + try { + if (socketTester.apply(socket)) { + result.compareAndSet(null, socket); + latch.countDown(); + } + } catch (RuntimeException e) { + logger.warn(e, "Error checking reachability of ip:port %s", socket); + } + } + + }); + + future.addListener(new Runnable() { + + @Override + public void run() { + if (completeCount.incrementAndGet() >= sockets.size()) { + latch.countDown(); // Tried all; mark as done + } + } + + }, MoreExecutors.sameThreadExecutor()); + } + + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + return result.get(); + } + + private Iterable checkNodeHasIps(NodeMetadata node) { + Iterable ips = concat(node.getPublicAddresses(), node.getPrivateAddresses()); + checkState(size(ips) > 0, "node does not have IP addresses configured: " + node); + return ips; + } + + private Set toHostAndPorts(Iterable hosts, final int port) { + return ImmutableSet.copyOf(transform(hosts, + new Function() { + + @Override + public HostAndPort apply(String from) { + return HostAndPort.fromParts(from, port); + } + })); + } +} diff --git a/compute/src/main/java/org/jclouds/compute/util/OpenSocketFinder.java b/compute/src/main/java/org/jclouds/compute/util/OpenSocketFinder.java new file mode 100644 index 0000000000..ce6d85c7e6 --- /dev/null +++ b/compute/src/main/java/org/jclouds/compute/util/OpenSocketFinder.java @@ -0,0 +1,30 @@ +package org.jclouds.compute.util; + +import java.util.concurrent.TimeUnit; + +import org.jclouds.compute.domain.NodeMetadata; + +import com.google.common.net.HostAndPort; +import com.google.inject.ImplementedBy; + +/** + * For finding an open/reachable ip:port for a node. + * + * @author aled + */ +@ImplementedBy(ConcurrentOpenSocketFinder.class) +public interface OpenSocketFinder { + + /** + * + * @param node The node (checking its public and private addresses) + * @param port The port to try to connect to + * @param timeoutValue Max time to try to connect to the ip:port + * @param timeUnits + * + * @return The reachable ip:port + * @throws NoSuchElementException If no ports accessible within the given time + * @throws IllegalStateException If the given node has no public or private addresses + */ + HostAndPort findOpenSocketOnNode(final NodeMetadata node, final int port, long timeoutValue, TimeUnit timeUnits); +} diff --git a/compute/src/test/java/org/jclouds/compute/strategy/CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest.java b/compute/src/test/java/org/jclouds/compute/strategy/CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest.java index 11585e053a..9e3db80062 100644 --- a/compute/src/test/java/org/jclouds/compute/strategy/CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest.java +++ b/compute/src/test/java/org/jclouds/compute/strategy/CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest.java @@ -19,12 +19,15 @@ package org.jclouds.compute.strategy; import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; import static org.testng.Assert.assertEquals; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.jclouds.compute.config.CustomizationResponse; @@ -35,7 +38,7 @@ import org.jclouds.compute.functions.TemplateOptionsToStatement; import org.jclouds.compute.options.TemplateOptions; import org.jclouds.compute.predicates.AtomicNodeRunning; import org.jclouds.compute.reference.ComputeServiceConstants.Timeouts; -import org.jclouds.predicates.SocketOpen; +import org.jclouds.compute.util.OpenSocketFinder; import org.jclouds.scriptbuilder.domain.Statement; import org.testng.Assert; import org.testng.annotations.Test; @@ -55,7 +58,7 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest { public void testBreakWhenNodeStillPending() { InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory = createMock(InitializeRunScriptOnNodeOrPlaceInBadMap.Factory.class); - SocketOpen socketTester = createMock(SocketOpen.class); + OpenSocketFinder openSocketFinder = createMock(OpenSocketFinder.class); Timeouts timeouts = new Timeouts(); Function templateOptionsToStatement = new TemplateOptionsToStatement(); @SuppressWarnings("unused") @@ -79,10 +82,10 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest { }; // replay mocks - replay(initScriptRunnerFactory, socketTester); + replay(initScriptRunnerFactory, openSocketFinder); // run AtomicReference atomicNode = new AtomicReference(node); - new CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap( new AtomicNodeRunning(nodeRunning), socketTester, timeouts, + new CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap( new AtomicNodeRunning(nodeRunning), openSocketFinder, timeouts, templateOptionsToStatement, initScriptRunnerFactory, options, atomicNode, goodNodes, badNodes, customizationResponses).apply(atomicNode); @@ -93,12 +96,12 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest { assertEquals(customizationResponses.size(), 0); // verify mocks - verify(initScriptRunnerFactory, socketTester); + verify(initScriptRunnerFactory, openSocketFinder); } public void testBreakGraceFullyWhenNodeDied() { InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory = createMock(InitializeRunScriptOnNodeOrPlaceInBadMap.Factory.class); - SocketOpen socketTester = createMock(SocketOpen.class); + OpenSocketFinder openSocketFinder = createMock(OpenSocketFinder.class); Timeouts timeouts = new Timeouts(); Function templateOptionsToStatement = new TemplateOptionsToStatement(); @SuppressWarnings("unused") @@ -123,10 +126,10 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest { }; // replay mocks - replay(initScriptRunnerFactory, socketTester); + replay(initScriptRunnerFactory, openSocketFinder); // run AtomicReference atomicNode = new AtomicReference(node); - new CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap( new AtomicNodeRunning(nodeRunning), socketTester, timeouts, + new CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap( new AtomicNodeRunning(nodeRunning), openSocketFinder, timeouts, templateOptionsToStatement, initScriptRunnerFactory, options, atomicNode, goodNodes, badNodes, customizationResponses).apply(atomicNode); @@ -137,6 +140,52 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest { assertEquals(customizationResponses.size(), 0); // verify mocks - verify(initScriptRunnerFactory, socketTester); + verify(initScriptRunnerFactory, openSocketFinder); + } + + public void testBreakGraceWhenNodeSocketFailsToOpen() { + int portTimeoutSecs = 2; + InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory = createMock(InitializeRunScriptOnNodeOrPlaceInBadMap.Factory.class); + OpenSocketFinder openSocketFinder = createMock(OpenSocketFinder.class); + Timeouts timeouts = new Timeouts(); + Function templateOptionsToStatement = new TemplateOptionsToStatement(); + TemplateOptions options = new TemplateOptions().blockOnPort(22, portTimeoutSecs); + Set goodNodes = Sets.newLinkedHashSet(); + Map badNodes = Maps.newLinkedHashMap(); + Multimap customizationResponses = LinkedHashMultimap.create(); + + final NodeMetadata pendingNode = new NodeMetadataBuilder().ids("id").state(NodeState.PENDING).build(); + final NodeMetadata runningNode = new NodeMetadataBuilder().ids("id").state(NodeState.RUNNING).build(); + + expect(openSocketFinder.findOpenSocketOnNode(runningNode, 22, portTimeoutSecs, TimeUnit.SECONDS)) + .andThrow(new NoSuchElementException("could not connect to any ip address port")).once(); + + GetNodeMetadataStrategy nodeRunning = new GetNodeMetadataStrategy(){ + + @Override + public NodeMetadata getNode(String input) { + Assert.assertEquals(input, pendingNode.getId()); + return runningNode; + } + + }; + + // replay mocks + replay(initScriptRunnerFactory, openSocketFinder); + + // run + AtomicReference atomicNode = new AtomicReference(pendingNode); + new CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap( new AtomicNodeRunning(nodeRunning), openSocketFinder, timeouts, + templateOptionsToStatement, initScriptRunnerFactory, options, atomicNode, goodNodes, badNodes, + customizationResponses).apply(atomicNode); + + assertEquals(goodNodes.size(), 0); + assertEquals(badNodes.keySet(), ImmutableSet.of(pendingNode)); + badNodes.get(pendingNode).printStackTrace(); + assertEquals(badNodes.get(pendingNode).getMessage(), "could not connect to any ip address port"); + assertEquals(customizationResponses.size(), 0); + + // verify mocks + verify(initScriptRunnerFactory, openSocketFinder); } } diff --git a/compute/src/test/java/org/jclouds/compute/util/ComputeServiceUtilsTest.java b/compute/src/test/java/org/jclouds/compute/util/ComputeServiceUtilsTest.java index b606e1f4f0..6a5d101b07 100644 --- a/compute/src/test/java/org/jclouds/compute/util/ComputeServiceUtilsTest.java +++ b/compute/src/test/java/org/jclouds/compute/util/ComputeServiceUtilsTest.java @@ -18,39 +18,21 @@ */ package org.jclouds.compute.util; -import static org.easymock.EasyMock.createMock; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.verify; import static org.jclouds.compute.util.ComputeServiceUtils.parseVersionOrReturnEmptyString; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; import java.net.URI; import java.util.Map; -import java.util.NoSuchElementException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import org.easymock.EasyMock; import org.jclouds.compute.config.BaseComputeServiceContextModule; -import org.jclouds.compute.domain.NodeMetadata; import org.jclouds.compute.domain.OsFamily; -import org.jclouds.compute.predicates.SocketOpenPredicates; import org.jclouds.compute.reference.ComputeServiceConstants; import org.jclouds.http.HttpRequest; import org.jclouds.json.Json; import org.jclouds.json.config.GsonModule; -import org.jclouds.logging.Logger; -import org.jclouds.predicates.SocketOpen; import org.testng.annotations.Test; -import com.google.common.base.Predicate; -import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableMultimap; -import com.google.common.collect.ImmutableSet; -import com.google.common.net.HostAndPort; import com.google.inject.Guice; /** @@ -98,88 +80,4 @@ public class ComputeServiceUtilsTest { org.jclouds.scriptbuilder.domain.OsFamily.UNIX), "curl -q -s -S -L --connect-timeout 10 --max-time 600 --retry 20 -X GET -H \"Host: adriancolehappy.s3.amazonaws.com\" -H \"Date: Sun, 12 Sep 2010 08:25:19 GMT\" -H \"Authorization: AWS 0ASHDJAS82:JASHFDA=\" https://adriancolehappy.s3.amazonaws.com/java/install |(mkdir -p /stage/ &&cd /stage/ &&tar -xpzf -)\n"); } - - @Test - public void testFindReachableSocketOnNodeTimesOut() throws Exception { - final long timeoutSecs = 2; - final long timeoutMs = timeoutSecs * 1000; - final long SLOW_GRACE = 500; - final long EARLY_GRACE = 10; - - SocketOpen socketTester = SocketOpenPredicates.alwaysFail; - NodeMetadata node = createMock(NodeMetadata.class); - expect(node.getPublicAddresses()).andReturn(ImmutableSet.of("1.2.3.4")).atLeastOnce(); - expect(node.getPrivateAddresses()).andReturn(ImmutableSet.of("1.2.3.5")).atLeastOnce(); - - replay(node); - - Stopwatch stopwatch = new Stopwatch(); - stopwatch.start(); - try { - ComputeServiceUtils.findReachableSocketOnNode(socketTester, node, 1234, timeoutMs, TimeUnit.MILLISECONDS, Logger.CONSOLE); - fail(); - } catch (NoSuchElementException success) { - // expected - } - long timetaken = stopwatch.elapsedMillis(); - - assertTrue(timetaken >= timeoutMs-EARLY_GRACE && timetaken <= timeoutMs+SLOW_GRACE, "timetaken="+timetaken); - verify(node); - } - - @Test - public void testFindReachableSocketOnNodeReturnsAvailable() throws Exception { - SocketOpen socketTester = createMock(SocketOpen.class); - expect(socketTester.apply(HostAndPort.fromParts("1.2.3.4", 22))).andReturn(false); - expect(socketTester.apply(HostAndPort.fromParts("1.2.3.5", 22))).andReturn(true); - - NodeMetadata node = createMock(NodeMetadata.class); - expect(node.getPublicAddresses()).andReturn(ImmutableSet.of("1.2.3.4")).atLeastOnce(); - expect(node.getPrivateAddresses()).andReturn(ImmutableSet.of("1.2.3.5")).atLeastOnce(); - - replay(socketTester); - replay(node); - - HostAndPort result = ComputeServiceUtils.findReachableSocketOnNode(socketTester, node, 22, 2000, TimeUnit.MILLISECONDS, Logger.CONSOLE); - assertEquals(result, HostAndPort.fromParts("1.2.3.5", 22)); - - verify(socketTester); - verify(node); - } - - @Test - public void testFindReachableSocketOnNodeAbortsWhenNodeNotRunning() throws Exception { - final long SLOW_GRACE = 500; - - SocketOpen socketTester = SocketOpenPredicates.alwaysFail; - - NodeMetadata node = createMock(NodeMetadata.class); - expect(node.getPublicAddresses()).andReturn(ImmutableSet.of("1.2.3.4")).atLeastOnce(); - expect(node.getPrivateAddresses()).andReturn(ImmutableSet.of("1.2.3.5")).atLeastOnce(); - expect(node.getId()).andReturn("myid").atLeastOnce(); - - Predicate> nodeRunning = createMock(Predicate.class); - expect(nodeRunning.apply(EasyMock.>anyObject())).andReturn(false); - - replay(node); - replay(nodeRunning); - - Stopwatch stopwatch = new Stopwatch(); - stopwatch.start(); - try { - ComputeServiceUtils.findReachableSocketOnNode(socketTester, nodeRunning, - node, 22, 2000000, TimeUnit.MILLISECONDS, Logger.CONSOLE); - fail(); - } catch (RuntimeException e) { - if (!e.getMessage().contains("no longer running")) { - throw e; - } - } - long timetaken = stopwatch.elapsedMillis(); - - assertTrue(timetaken <= SLOW_GRACE, "timetaken="+timetaken); - - verify(node); - verify(nodeRunning); - } } diff --git a/compute/src/test/java/org/jclouds/compute/util/ConcurrentOpenSocketFinderTest.java b/compute/src/test/java/org/jclouds/compute/util/ConcurrentOpenSocketFinderTest.java new file mode 100644 index 0000000000..5421f4cf67 --- /dev/null +++ b/compute/src/test/java/org/jclouds/compute/util/ConcurrentOpenSocketFinderTest.java @@ -0,0 +1,193 @@ +package org.jclouds.compute.util; + +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.easymock.EasyMock; +import org.jclouds.compute.domain.NodeMetadata; +import org.jclouds.concurrent.MoreExecutors; +import org.jclouds.predicates.SocketOpen; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.base.Predicate; +import com.google.common.base.Stopwatch; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.net.HostAndPort; + +@Test(singleThreaded=true) +public class ConcurrentOpenSocketFinderTest { + + private static final long SLOW_GRACE = 500; + private static final long EARLY_GRACE = 10; + + private NodeMetadata node; + private SocketOpen socketTester; + private Predicate> nodeRunning; + private ExecutorService threadPool; + + @SuppressWarnings("unchecked") + @BeforeMethod + public void setUp() { + node = createMock(NodeMetadata.class); + expect(node.getPublicAddresses()).andReturn(ImmutableSet.of("1.2.3.4")).atLeastOnce(); + expect(node.getPrivateAddresses()).andReturn(ImmutableSet.of("1.2.3.5")).atLeastOnce(); + expect(node.getId()).andReturn("myid").anyTimes(); + + socketTester = createMock(SocketOpen.class); + + nodeRunning = createMock(Predicate.class); + + replay(node); + + threadPool = Executors.newCachedThreadPool(); + } + + @AfterMethod(alwaysRun=true) + public void tearDown() { + if (threadPool != null) threadPool.shutdownNow(); + } + + @Test + public void testRespectsTimeout() throws Exception { + final long timeoutMs = 1000; + + expect(socketTester.apply(HostAndPort.fromParts("1.2.3.4", 22))).andReturn(false).times(2, Integer.MAX_VALUE); + expect(socketTester.apply(HostAndPort.fromParts("1.2.3.5", 22))).andReturn(false).times(2, Integer.MAX_VALUE); + expect(nodeRunning.apply(EasyMock.>anyObject())).andReturn(true); + replay(socketTester); + replay(nodeRunning); + + OpenSocketFinder finder = new ConcurrentOpenSocketFinder(socketTester, null, MoreExecutors.sameThreadExecutor()); + + Stopwatch stopwatch = new Stopwatch(); + stopwatch.start(); + try { + finder.findOpenSocketOnNode(node, 22, timeoutMs, TimeUnit.MILLISECONDS); + fail(); + } catch (NoSuchElementException success) { + // expected + } + long timetaken = stopwatch.elapsedMillis(); + + assertTrue(timetaken >= timeoutMs-EARLY_GRACE && timetaken <= timeoutMs+SLOW_GRACE, "timetaken="+timetaken); + + verify(node); + verify(socketTester); + } + + @Test + public void testReturnsReachable() throws Exception { + expect(socketTester.apply(HostAndPort.fromParts("1.2.3.4", 22))).andReturn(false).once(); + expect(socketTester.apply(HostAndPort.fromParts("1.2.3.5", 22))).andReturn(true).once(); + expect(nodeRunning.apply(EasyMock.>anyObject())).andReturn(true); + replay(socketTester); + replay(nodeRunning); + + OpenSocketFinder finder = new ConcurrentOpenSocketFinder(socketTester, null, MoreExecutors.sameThreadExecutor()); + + HostAndPort result = finder.findOpenSocketOnNode(node, 22, 2000, TimeUnit.MILLISECONDS); + assertEquals(result, HostAndPort.fromParts("1.2.3.5", 22)); + + verify(node); + verify(socketTester); + } + + @Test + public void testChecksSocketsConcurrently() throws Exception { + long delayForReachableMs = 25; + + expect(nodeRunning.apply(EasyMock.>anyObject())).andReturn(true); + replay(nodeRunning); + + // Can't use mock+answer for concurrency tests; EasyMock uses lock in ReplayState + ControllableSocketOpen socketTester = new ControllableSocketOpen(ImmutableMap.of( + HostAndPort.fromParts("1.2.3.4", 22), new SlowCallable(false, 1000), + HostAndPort.fromParts("1.2.3.5", 22), new SlowCallable(true, delayForReachableMs))); + + OpenSocketFinder finder = new ConcurrentOpenSocketFinder(socketTester, null, threadPool); + + Stopwatch stopwatch = new Stopwatch(); + stopwatch.start(); + HostAndPort result = finder.findOpenSocketOnNode(node, 22, 2000, TimeUnit.MILLISECONDS); + long timetaken = stopwatch.elapsedMillis(); + + assertEquals(result, HostAndPort.fromParts("1.2.3.5", 22)); + assertTrue(timetaken >= delayForReachableMs-EARLY_GRACE && timetaken <= delayForReachableMs+SLOW_GRACE, "timetaken="+timetaken); + verify(node); + } + + @Test + public void testAbortsWhenNodeNotRunning() throws Exception { + expect(socketTester.apply(EasyMock.anyObject())).andReturn(false); + expect(nodeRunning.apply(EasyMock.>anyObject())).andReturn(false); + replay(socketTester); + replay(nodeRunning); + + OpenSocketFinder finder = new ConcurrentOpenSocketFinder(socketTester, nodeRunning, MoreExecutors.sameThreadExecutor()); + + Stopwatch stopwatch = new Stopwatch(); + stopwatch.start(); + try { + finder.findOpenSocketOnNode(node, 22, 2000, TimeUnit.MILLISECONDS); + fail(); + } catch (NoSuchElementException e) { + // success + // Note: don't get the "no longer running" message, because logged+swallowed by RetryablePredicate + } + long timetaken = stopwatch.elapsedMillis(); + + assertTrue(timetaken <= SLOW_GRACE, "timetaken="+timetaken); + + verify(node); + verify(socketTester); + verify(nodeRunning); + } + + private static class SlowCallable implements Callable { + private final T result; + private final long delay; + + SlowCallable(T result, long delay) { + this.result = result; + this.delay = delay; + } + @Override + public T call() throws Exception { + Thread.sleep(delay); + return result; + } + }; + + private static class ControllableSocketOpen implements SocketOpen { + private final Map> answers; + + ControllableSocketOpen(Map> answers) { + this.answers = answers; + } + @Override + public boolean apply(HostAndPort input) { + try { + return answers.get(input).call(); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + }; +}