Issue 1103: isolate polling of nodes into its own class

This commit is contained in:
Adrian Cole 2012-10-20 11:04:49 -07:00
parent 3cd57073e0
commit bc82296702
5 changed files with 294 additions and 186 deletions

View File

@ -32,6 +32,7 @@ import javax.inject.Singleton;
import org.jclouds.compute.domain.Image;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.functions.PollNodeRunning;
import org.jclouds.compute.predicates.AtomicImageAvailable;
import org.jclouds.compute.predicates.AtomicImageDeleted;
import org.jclouds.compute.predicates.AtomicNodeRunning;
@ -43,9 +44,12 @@ import org.jclouds.compute.reference.ComputeServiceConstants.PollPeriod;
import org.jclouds.compute.reference.ComputeServiceConstants.Timeouts;
import org.jclouds.predicates.RetryablePredicate;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import com.google.inject.name.Names;
/**
*
@ -120,7 +124,8 @@ public class ComputeServiceTimeoutsModule extends AbstractModule {
@Override
protected void configure() {
bind(new TypeLiteral<Function<AtomicReference<NodeMetadata>, AtomicReference<NodeMetadata>>>() {
}).annotatedWith(Names.named(TIMEOUT_NODE_RUNNING)).to(PollNodeRunning.class);
}
/**

View File

@ -0,0 +1,87 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.compute.functions;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Throwables.propagate;
import static java.lang.String.format;
import static org.jclouds.compute.config.ComputeServiceProperties.TIMEOUT_NODE_RUNNING;
import static org.jclouds.compute.util.ComputeServiceUtils.formatStatus;
import java.util.concurrent.atomic.AtomicReference;
import javax.inject.Inject;
import javax.inject.Named;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.domain.NodeMetadata.Status;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
/**
* Polls until the node is running or throws {@link IllegalStateException} if
* this cannot be achieved within the timeout.
*
* @author Adrian Cole
*
*/
@Named(TIMEOUT_NODE_RUNNING)
public class PollNodeRunning implements Function<AtomicReference<NodeMetadata>, AtomicReference<NodeMetadata>> {
private final Predicate<AtomicReference<NodeMetadata>> nodeRunning;
@Inject
public PollNodeRunning(@Named(TIMEOUT_NODE_RUNNING) Predicate<AtomicReference<NodeMetadata>> nodeRunning) {
this.nodeRunning = checkNotNull(nodeRunning, "nodeRunning");
}
/**
* @param node
* will be updated with the node which is running
* @throws {@link IllegalStateException} if this cannot be achieved within
* the timeout.
*/
@Override
public AtomicReference<NodeMetadata> apply(AtomicReference<NodeMetadata> node) throws IllegalStateException {
String originalId = node.get().getId();
NodeMetadata originalNode = node.get();
try {
Stopwatch stopwatch = new Stopwatch().start();
if (!nodeRunning.apply(node)) {
long timeWaited = stopwatch.elapsedMillis();
if (node.get() == null) {
node.set(originalNode);
throw new IllegalStateException(format("api response for node(%s) was null", originalId));
} else {
throw new IllegalStateException(format(
"node(%s) didn't achieve the status running; aborting after %d seconds with final status: %s",
originalId, timeWaited / 1000, formatStatus(node.get())));
}
}
} catch (IllegalStateException e) {
if (node.get().getStatus() == Status.TERMINATED) {
throw new IllegalStateException(format("node(%s) terminated", originalId));
} else {
throw propagate(e);
}
}
return node;
}
}

View File

@ -17,12 +17,11 @@
* under the License.
*/
package org.jclouds.compute.strategy;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.getRootCause;
import static java.lang.String.format;
import static org.jclouds.compute.config.ComputeServiceProperties.TIMEOUT_NODE_RUNNING;
import static org.jclouds.compute.util.ComputeServiceUtils.formatStatus;
import java.util.Map;
import java.util.Set;
@ -37,18 +36,14 @@ import org.jclouds.compute.callables.RunScriptOnNode;
import org.jclouds.compute.config.CustomizationResponse;
import org.jclouds.compute.domain.ExecResponse;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.domain.NodeMetadata.Status;
import org.jclouds.compute.options.TemplateOptions;
import org.jclouds.compute.reference.ComputeServiceConstants;
import org.jclouds.compute.reference.ComputeServiceConstants.Timeouts;
import org.jclouds.compute.util.OpenSocketFinder;
import org.jclouds.javax.annotation.Nullable;
import org.jclouds.logging.Logger;
import org.jclouds.scriptbuilder.domain.Statement;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Multimap;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
@ -59,6 +54,7 @@ import com.google.inject.assistedinject.AssistedInject;
*/
public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Callable<Void>,
Function<AtomicReference<NodeMetadata>, Void> {
public static interface Factory {
Callable<Void> create(TemplateOptions options, AtomicReference<NodeMetadata> node, Set<NodeMetadata> goodNodes,
Map<NodeMetadata, Exception> badNodes, Multimap<NodeMetadata, CustomizationResponse> customizationResponses);
@ -71,9 +67,8 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Cal
@Named(ComputeServiceConstants.COMPUTE_LOGGER)
protected Logger logger = Logger.NULL;
private final Predicate<AtomicReference<NodeMetadata>> nodeRunning;
private final Function<AtomicReference<NodeMetadata>, AtomicReference<NodeMetadata>> pollNodeRunning;
private final InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory;
private final Timeouts timeouts;
private final OpenSocketFinder openSocketFinder;
@Nullable
@ -88,19 +83,17 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Cal
@AssistedInject
public CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap(
@Named(TIMEOUT_NODE_RUNNING) Predicate<AtomicReference<NodeMetadata>> nodeRunning,
OpenSocketFinder openSocketFinder, Timeouts timeouts,
Function<TemplateOptions, Statement> templateOptionsToStatement,
@Named(TIMEOUT_NODE_RUNNING) Function<AtomicReference<NodeMetadata>, AtomicReference<NodeMetadata>> pollNodeRunning,
OpenSocketFinder openSocketFinder, Function<TemplateOptions, Statement> templateOptionsToStatement,
InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory, @Assisted TemplateOptions options,
@Assisted AtomicReference<NodeMetadata> node, @Assisted Set<NodeMetadata> goodNodes,
@Assisted Map<NodeMetadata, Exception> badNodes,
@Assisted Multimap<NodeMetadata, CustomizationResponse> customizationResponses) {
this.statement = checkNotNull(templateOptionsToStatement, "templateOptionsToStatement").apply(
checkNotNull(options, "options"));
this.nodeRunning = checkNotNull(nodeRunning, "nodeRunning");
this.pollNodeRunning = checkNotNull(pollNodeRunning, "pollNodeRunning");
this.initScriptRunnerFactory = checkNotNull(initScriptRunnerFactory, "initScriptRunnerFactory");
this.openSocketFinder = checkNotNull(openSocketFinder, "openSocketFinder");
this.timeouts = checkNotNull(timeouts, "timeouts");
this.node = node;
this.options = checkNotNull(options, "options");
this.goodNodes = checkNotNull(goodNodes, "goodNodes");
@ -110,13 +103,13 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Cal
@AssistedInject
public CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap(
@Named(TIMEOUT_NODE_RUNNING) Predicate<AtomicReference<NodeMetadata>> nodeRunning, GetNodeMetadataStrategy getNode,
OpenSocketFinder openSocketFinder, Timeouts timeouts,
@Named(TIMEOUT_NODE_RUNNING) Function<AtomicReference<NodeMetadata>, AtomicReference<NodeMetadata>> pollNodeRunning,
GetNodeMetadataStrategy getNode, OpenSocketFinder openSocketFinder,
Function<TemplateOptions, Statement> templateOptionsToStatement,
InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory, @Assisted TemplateOptions options,
@Assisted Set<NodeMetadata> goodNodes, @Assisted Map<NodeMetadata, Exception> badNodes,
@Assisted Multimap<NodeMetadata, CustomizationResponse> customizationResponses) {
this(nodeRunning, openSocketFinder, timeouts, templateOptionsToStatement, initScriptRunnerFactory, options,
this(pollNodeRunning, openSocketFinder, templateOptionsToStatement, initScriptRunnerFactory, options,
new AtomicReference<NodeMetadata>(null), goodNodes, badNodes, customizationResponses);
}
@ -125,38 +118,9 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Cal
checkState(!tainted, "this object is not designed to be reused: %s", toString());
tainted = true;
String originalId = node.get().getId();
NodeMetadata originalNode = node.get();
try {
if (options.shouldBlockUntilRunning()) {
try {
Stopwatch stopwatch = new Stopwatch().start();
if (!nodeRunning.apply(node)) {
long timeWaited = stopwatch.elapsedMillis();
long earlyReturnGrace = 10; // sleeps can sometimes return milliseconds early
if (node.get() == null) {
node.set(originalNode);
throw new IllegalStateException(format("api response for node(%s) was null, so we can't customize",
originalId));
} else if (timeWaited < (timeouts.nodeRunning - earlyReturnGrace)) {
throw new IllegalStateException(
format(
"node(%s) didn't achieve the status running, so we couldn't customize; aborting prematurely after %d seconds with final status: %s",
originalId, timeWaited / 1000, formatStatus(node.get())));
} else {
throw new IllegalStateException(
format(
"node(%s) didn't achieve the status running within %d seconds, so we couldn't customize; final status: %s",
originalId, timeouts.nodeRunning / 1000, formatStatus(node.get())));
}
}
} catch (IllegalStateException e) {
if (node.get().getStatus() == Status.TERMINATED) {
throw new IllegalStateException(format("node(%s) terminated before we could customize", originalId));
} else {
throw e;
}
}
pollNodeRunning.apply(node);
if (statement != null) {
RunScriptOnNode runner = initScriptRunnerFactory.create(node.get(), statement, options, badNodes).call();
if (runner != null) {
@ -165,8 +129,8 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap implements Cal
}
}
if (options.getPort() > 0) {
openSocketFinder.findOpenSocketOnNode(node.get(), options.getPort(),
options.getSeconds(), TimeUnit.SECONDS);
openSocketFinder.findOpenSocketOnNode(node.get(), options.getPort(), options.getSeconds(),
TimeUnit.SECONDS);
}
}
logger.debug("<< customized node(%s)", originalId);

View File

@ -0,0 +1,163 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.compute.functions;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.testng.Assert.assertEquals;
import java.util.concurrent.atomic.AtomicReference;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.jclouds.compute.config.ComputeServiceTimeoutsModule;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.domain.NodeMetadata.Status;
import org.jclouds.compute.domain.NodeMetadataBuilder;
import org.jclouds.compute.predicates.AtomicNodeRunning;
import org.jclouds.compute.reference.ComputeServiceConstants.PollPeriod;
import org.jclouds.compute.reference.ComputeServiceConstants.Timeouts;
import org.jclouds.compute.strategy.GetNodeMetadataStrategy;
import org.testng.annotations.Test;
import com.google.common.base.Predicate;
/**
* @author Adrian Cole
*/
@Test
public class PollNodeRunningTest {
@Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = "node\\(id\\) didn't achieve the status running; aborting after 0 seconds with final status: PENDING")
public void testIllegalStateExceptionWhenNodeStillPending() {
final NodeMetadata pendingNode = new NodeMetadataBuilder().ids("id").status(Status.PENDING).build();
// node always stays pending
Predicate<AtomicReference<NodeMetadata>> nodeRunning = new Predicate<AtomicReference<NodeMetadata>>() {
@Override
public boolean apply(AtomicReference<NodeMetadata> input) {
assertEquals(input.get(), pendingNode);
return false;
}
};
AtomicReference<NodeMetadata> atomicNode = new AtomicReference<NodeMetadata>(pendingNode);
try {
new PollNodeRunning(nodeRunning).apply(atomicNode);
} finally {
assertEquals(atomicNode.get(), pendingNode);
}
}
@Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = "node\\(id\\) terminated")
public void testIllegalStateExceptionWhenNodeDied() {
final NodeMetadata pendingNode = new NodeMetadataBuilder().ids("id").status(Status.PENDING).build();
final NodeMetadata deadNode = new NodeMetadataBuilder().ids("id").status(Status.TERMINATED).build();
Predicate<AtomicReference<NodeMetadata>> nodeRunning = new Predicate<AtomicReference<NodeMetadata>>() {
@Override
public boolean apply(AtomicReference<NodeMetadata> input) {
assertEquals(input.get(), pendingNode);
input.set(deadNode);
return false;
}
};
AtomicReference<NodeMetadata> atomicNode = new AtomicReference<NodeMetadata>(pendingNode);
try {
new PollNodeRunning(nodeRunning).apply(atomicNode);
} finally {
assertEquals(atomicNode.get(), deadNode);
}
}
@Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = "api response for node\\(id\\) was null")
public void testIllegalStateExceptionAndNodeResetWhenRefSetToNull() {
final NodeMetadata pendingNode = new NodeMetadataBuilder().ids("id").status(Status.PENDING).build();
Predicate<AtomicReference<NodeMetadata>> nodeRunning = new Predicate<AtomicReference<NodeMetadata>>() {
@Override
public boolean apply(AtomicReference<NodeMetadata> input) {
assertEquals(input.get(), pendingNode);
input.set(null);
return false;
}
};
AtomicReference<NodeMetadata> atomicNode = new AtomicReference<NodeMetadata>(pendingNode);
try {
new PollNodeRunning(nodeRunning).apply(atomicNode);
} finally {
assertEquals(atomicNode.get(), pendingNode);
}
}
public void testRecoversWhenTemporarilyNodeNotFound() {
String nodeId = "myid";
Timeouts timeouts = new Timeouts();
PollPeriod period = new PollPeriod();
final NodeMetadata pendingNode = new NodeMetadataBuilder().ids(nodeId).status(Status.PENDING).build();
final NodeMetadata runningNode = new NodeMetadataBuilder().ids(nodeId).status(Status.RUNNING).build();
GetNodeMetadataStrategy nodeClient = createMock(GetNodeMetadataStrategy.class);
AtomicNodeRunning nodeRunning = new AtomicNodeRunning(nodeClient);
Predicate<AtomicReference<NodeMetadata>> retryableNodeRunning = new ComputeServiceTimeoutsModule() {
public Predicate<AtomicReference<NodeMetadata>> nodeRunning(AtomicNodeRunning statusRunning,
Timeouts timeouts, PollPeriod period) {
return super.nodeRunning(statusRunning, timeouts, period);
}
}.nodeRunning(nodeRunning, timeouts, period);
AtomicReference<NodeMetadata> atomicNode = new AtomicReference<NodeMetadata>(pendingNode);
// Simulate transient error: first call returns null; subsequent calls
// return the running node
EasyMock.expect(nodeClient.getNode(nodeId)).andAnswer(new IAnswer<NodeMetadata>() {
private int count = 0;
@Override
public NodeMetadata answer() throws Throwable {
count++;
if (count <= 1) {
return null;
} else {
return runningNode;
}
}
}).anyTimes();
// replay mocks
replay(nodeClient);
// run
new PollNodeRunning(retryableNodeRunning).apply(atomicNode);
assertEquals(atomicNode.get().getStatus(), Status.RUNNING);
// verify mocks
verify(nodeClient);
}
}

View File

@ -23,7 +23,6 @@ import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.util.Map;
import java.util.NoSuchElementException;
@ -31,27 +30,19 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.jclouds.compute.config.ComputeServiceTimeoutsModule;
import org.jclouds.compute.config.CustomizationResponse;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.domain.NodeMetadata.Status;
import org.jclouds.compute.domain.NodeMetadataBuilder;
import org.jclouds.compute.functions.TemplateOptionsToStatement;
import org.jclouds.compute.options.TemplateOptions;
import org.jclouds.compute.predicates.AtomicNodeRunning;
import org.jclouds.compute.reference.ComputeServiceConstants.PollPeriod;
import org.jclouds.compute.reference.ComputeServiceConstants.Timeouts;
import org.jclouds.compute.util.OpenSocketFinder;
import org.jclouds.scriptbuilder.domain.Statement;
import org.testng.Assert;
import org.testng.annotations.Test;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
@ -63,10 +54,9 @@ import com.google.common.collect.Sets;
@Test(groups = "unit", testName = "CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest")
public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest {
public void testBreakWhenNodeStillPending() {
public void testBreakOnIllegalStateExceptionDuringPollNode() {
InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory = createMock(InitializeRunScriptOnNodeOrPlaceInBadMap.Factory.class);
OpenSocketFinder openSocketFinder = createMock(OpenSocketFinder.class);
Timeouts timeouts = new Timeouts();
Function<TemplateOptions, Statement> templateOptionsToStatement = new TemplateOptionsToStatement();
@SuppressWarnings("unused")
Statement statement = null;
@ -75,87 +65,40 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest {
Map<NodeMetadata, Exception> badNodes = Maps.newLinkedHashMap();
Multimap<NodeMetadata, CustomizationResponse> customizationResponses = LinkedHashMultimap.create();
final NodeMetadata node = new NodeMetadataBuilder().ids("id").status(Status.PENDING).build();
final NodeMetadata pendingNode = new NodeMetadataBuilder().ids("id").status(Status.PENDING).build();
// node always stays pending
GetNodeMetadataStrategy nodeRunning = new GetNodeMetadataStrategy(){
Function<AtomicReference<NodeMetadata>, AtomicReference<NodeMetadata>> pollNodeRunning = new Function<AtomicReference<NodeMetadata>, AtomicReference<NodeMetadata>>() {
@Override
public NodeMetadata getNode(String input) {
Assert.assertEquals(input, node.getId());
return node;
public AtomicReference<NodeMetadata> apply(AtomicReference<NodeMetadata> node) {
Assert.assertEquals(node.get(), pendingNode);
throw new IllegalStateException("bad state!");
}
};
// replay mocks
replay(initScriptRunnerFactory, openSocketFinder);
// run
AtomicReference<NodeMetadata> atomicNode = new AtomicReference<NodeMetadata>(node);
new CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap( new AtomicNodeRunning(nodeRunning), openSocketFinder, timeouts,
templateOptionsToStatement, initScriptRunnerFactory, options, atomicNode, goodNodes, badNodes,
customizationResponses).apply(atomicNode);
AtomicReference<NodeMetadata> atomicNode = new AtomicReference<NodeMetadata>(pendingNode);
new CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap(pollNodeRunning, openSocketFinder,
templateOptionsToStatement, initScriptRunnerFactory, options, atomicNode, goodNodes, badNodes,
customizationResponses).apply(atomicNode);
assertEquals(goodNodes.size(), 0);
assertEquals(badNodes.keySet(), ImmutableSet.of(node));
assertTrue(badNodes.get(node).getMessage() != null && badNodes.get(node).getMessage().matches(
"node\\(id\\) didn't achieve the status running, so we couldn't customize; aborting prematurely after .* seconds with final status: PENDING"),
badNodes.get(node).getMessage());
assertEquals(badNodes.keySet(), ImmutableSet.of(pendingNode));
assertEquals(badNodes.get(pendingNode).getMessage(), "bad state!");
assertEquals(customizationResponses.size(), 0);
// verify mocks
verify(initScriptRunnerFactory, openSocketFinder);
}
public void testBreakGraceFullyWhenNodeDied() {
InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory = createMock(InitializeRunScriptOnNodeOrPlaceInBadMap.Factory.class);
OpenSocketFinder openSocketFinder = createMock(OpenSocketFinder.class);
Timeouts timeouts = new Timeouts();
Function<TemplateOptions, Statement> templateOptionsToStatement = new TemplateOptionsToStatement();
@SuppressWarnings("unused")
Statement statement = null;
TemplateOptions options = new TemplateOptions();
Set<NodeMetadata> goodNodes = Sets.newLinkedHashSet();
Map<NodeMetadata, Exception> badNodes = Maps.newLinkedHashMap();
Multimap<NodeMetadata, CustomizationResponse> customizationResponses = LinkedHashMultimap.create();
final NodeMetadata node = new NodeMetadataBuilder().ids("id").status(Status.PENDING).build();
final NodeMetadata deadNnode = new NodeMetadataBuilder().ids("id").status(Status.TERMINATED).build();
// node dies
GetNodeMetadataStrategy nodeRunning = new GetNodeMetadataStrategy(){
@Override
public NodeMetadata getNode(String input) {
Assert.assertEquals(input, node.getId());
return deadNnode;
}
};
// replay mocks
replay(initScriptRunnerFactory, openSocketFinder);
// run
AtomicReference<NodeMetadata> atomicNode = new AtomicReference<NodeMetadata>(node);
new CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap( new AtomicNodeRunning(nodeRunning), openSocketFinder, timeouts,
templateOptionsToStatement, initScriptRunnerFactory, options, atomicNode, goodNodes, badNodes,
customizationResponses).apply(atomicNode);
assertEquals(goodNodes.size(), 0);
assertEquals(badNodes.keySet(), ImmutableSet.of(node));
badNodes.get(node).printStackTrace();
assertEquals(badNodes.get(node).getMessage(), "node(id) terminated before we could customize");
assertEquals(customizationResponses.size(), 0);
// verify mocks
verify(initScriptRunnerFactory, openSocketFinder);
}
public void testBreakGraceWhenNodeSocketFailsToOpen() {
int portTimeoutSecs = 2;
InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory = createMock(InitializeRunScriptOnNodeOrPlaceInBadMap.Factory.class);
OpenSocketFinder openSocketFinder = createMock(OpenSocketFinder.class);
Timeouts timeouts = new Timeouts();
Function<TemplateOptions, Statement> templateOptionsToStatement = new TemplateOptionsToStatement();
TemplateOptions options = new TemplateOptions().blockOnPort(22, portTimeoutSecs);
Set<NodeMetadata> goodNodes = Sets.newLinkedHashSet();
@ -165,27 +108,28 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest {
final NodeMetadata pendingNode = new NodeMetadataBuilder().ids("id").status(Status.PENDING).build();
final NodeMetadata runningNode = new NodeMetadataBuilder().ids("id").status(Status.RUNNING).build();
expect(openSocketFinder.findOpenSocketOnNode(runningNode, 22, portTimeoutSecs, TimeUnit.SECONDS))
.andThrow(new NoSuchElementException("could not connect to any ip address port")).once();
expect(openSocketFinder.findOpenSocketOnNode(runningNode, 22, portTimeoutSecs, TimeUnit.SECONDS)).andThrow(
new NoSuchElementException("could not connect to any ip address port")).once();
GetNodeMetadataStrategy nodeRunning = new GetNodeMetadataStrategy(){
Function<AtomicReference<NodeMetadata>, AtomicReference<NodeMetadata>> pollNodeRunning = new Function<AtomicReference<NodeMetadata>, AtomicReference<NodeMetadata>>() {
@Override
public NodeMetadata getNode(String input) {
Assert.assertEquals(input, pendingNode.getId());
return runningNode;
public AtomicReference<NodeMetadata> apply(AtomicReference<NodeMetadata> node) {
Assert.assertEquals(node.get(), pendingNode);
node.set(runningNode);
return node;
}
};
// replay mocks
replay(initScriptRunnerFactory, openSocketFinder);
// run
AtomicReference<NodeMetadata> atomicNode = new AtomicReference<NodeMetadata>(pendingNode);
new CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap( new AtomicNodeRunning(nodeRunning), openSocketFinder, timeouts,
templateOptionsToStatement, initScriptRunnerFactory, options, atomicNode, goodNodes, badNodes,
customizationResponses).apply(atomicNode);
new CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap(pollNodeRunning, openSocketFinder,
templateOptionsToStatement, initScriptRunnerFactory, options, atomicNode, goodNodes, badNodes,
customizationResponses).apply(atomicNode);
assertEquals(goodNodes.size(), 0);
assertEquals(badNodes.keySet(), ImmutableSet.of(pendingNode));
@ -196,59 +140,4 @@ public class CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMapTest {
// verify mocks
verify(initScriptRunnerFactory, openSocketFinder);
}
public void testRecoversWhenTemporarilyNodeNotFound() {
String nodeId = "myid";
InitializeRunScriptOnNodeOrPlaceInBadMap.Factory initScriptRunnerFactory = createMock(InitializeRunScriptOnNodeOrPlaceInBadMap.Factory.class);
OpenSocketFinder openSocketFinder = createMock(OpenSocketFinder.class);
Timeouts timeouts = new Timeouts();
PollPeriod period = new PollPeriod();
Function<TemplateOptions, Statement> templateOptionsToStatement = new TemplateOptionsToStatement();
Set<NodeMetadata> goodNodes = Sets.newLinkedHashSet();
Map<NodeMetadata, Exception> badNodes = Maps.newLinkedHashMap();
Multimap<NodeMetadata, CustomizationResponse> customizationResponses = LinkedHashMultimap.create();
TemplateOptions options = new TemplateOptions();
final NodeMetadata pendingNode = new NodeMetadataBuilder().ids(nodeId).status(Status.PENDING).build();
final NodeMetadata runningNode = new NodeMetadataBuilder().ids(nodeId).status(Status.RUNNING).build();
GetNodeMetadataStrategy nodeClient = createMock(GetNodeMetadataStrategy.class);
AtomicNodeRunning nodeRunning = new AtomicNodeRunning(nodeClient);
Predicate<AtomicReference<NodeMetadata>> retryableNodeRunning = new ComputeServiceTimeoutsModule() {
public Predicate<AtomicReference<NodeMetadata>> nodeRunning(AtomicNodeRunning statusRunning, Timeouts timeouts, PollPeriod period) {
return super.nodeRunning(statusRunning, timeouts, period);
}
}.nodeRunning(nodeRunning, timeouts, period);
AtomicReference<NodeMetadata> atomicNode = new AtomicReference<NodeMetadata>(pendingNode);
// Simulate transient error: first call returns null; subsequent calls return the running node
EasyMock.expect(nodeClient.getNode(nodeId)).andAnswer(new IAnswer<NodeMetadata>() {
private int count = 0;
@Override
public NodeMetadata answer() throws Throwable {
count++;
if (count <= 1) {
return null;
} else {
return runningNode;
}
}
}).anyTimes();
// replay mocks
replay(initScriptRunnerFactory, openSocketFinder, nodeClient);
// run
new CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap(retryableNodeRunning, openSocketFinder, timeouts,
templateOptionsToStatement, initScriptRunnerFactory, options, atomicNode, goodNodes, badNodes,
customizationResponses).apply(atomicNode);
if (badNodes.size() > 0) Iterables.get(badNodes.values(), 0).printStackTrace();
assertEquals(badNodes.size(), 0);
assertEquals(goodNodes, ImmutableSet.of(runningNode));
assertEquals(customizationResponses.size(), 0);
// verify mocks
verify(initScriptRunnerFactory, openSocketFinder, nodeClient);
}
}