refactored nodepool to avoid having internal state and to survive restarts

This commit is contained in:
David Ribeiro Alves 2012-06-25 03:34:55 +01:00
parent e9ec08e6a8
commit a8ea32e29d
12 changed files with 516 additions and 321 deletions

View File

@ -27,12 +27,23 @@
<artifactId>jclouds-compute</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.jclouds</groupId>
<artifactId>jclouds-blobstore</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.jclouds.api</groupId>
<artifactId>filesystem</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jclouds</groupId>
<artifactId>jclouds-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jclouds</groupId>

View File

@ -28,6 +28,7 @@ import java.util.Properties;
import org.jclouds.apis.internal.BaseApiMetadata;
import org.jclouds.compute.ComputeServiceContext;
import org.jclouds.nodepool.config.NodePoolComputServiceContextModule;
import org.jclouds.rest.internal.BaseRestApiMetadata;
public class NodePoolApiMetadata extends BaseApiMetadata {
@ -64,9 +65,10 @@ public class NodePoolApiMetadata extends BaseApiMetadata {
public static class Builder extends BaseApiMetadata.Builder {
protected Builder() {
id("nodepool").name("node pool provider wrapper").identityName("Unused").defaultIdentity("nodepool")
.defaultEndpoint("nodepool").documentation(
URI.create("http://www.jclouds.org/documentation/userguide/compute")).view(
ComputeServiceContext.class).defaultProperties(NodePoolApiMetadata.defaultProperties());
.defaultEndpoint("nodepool")
.documentation(URI.create("http://www.jclouds.org/documentation/userguide/compute"))
.view(ComputeServiceContext.class).defaultModule(NodePoolComputServiceContextModule.class)
.defaultProperties(NodePoolApiMetadata.defaultProperties());
}
@Override

View File

@ -18,19 +18,23 @@
*/
package org.jclouds.nodepool;
import javax.inject.Singleton;
import org.jclouds.Context;
import org.jclouds.compute.ComputeService;
import org.jclouds.compute.Utils;
import org.jclouds.compute.internal.ComputeServiceContextImpl;
import org.jclouds.location.Provider;
import org.jclouds.nodepool.internal.BaseNodePoolComputeService;
import com.google.common.reflect.TypeToken;
import com.google.inject.Inject;
@Singleton
public class NodePoolComputeServiceContext extends ComputeServiceContextImpl {
@Inject
public NodePoolComputeServiceContext(Context backend, TypeToken<? extends Context> backendType,
public NodePoolComputeServiceContext(@Provider Context backend, @Provider TypeToken<? extends Context> backendType,
ComputeService computeService, Utils utils) {
super(backend, backendType, computeService, utils);
}
@ -42,7 +46,6 @@ public class NodePoolComputeServiceContext extends ComputeServiceContextImpl {
public NodePoolStats getPoolStats() {
return new NodePoolStats(getComputeService().currentSize(), getComputeService().idleNodes(), getComputeService()
.usedNodes(), getComputeService().allocationInProgressNodes(), getComputeService().maxNodes(),
getComputeService().minNodes());
.usedNodes(), getComputeService().maxNodes(), getComputeService().minNodes());
}
}

View File

@ -30,16 +30,13 @@ public class NodePoolStats {
private final int currentSize;
private final int idleNodes;
private final int usedNodes;
private final int allocationInProgressNodes;
private final int maxNodes;
private final int minNodes;
NodePoolStats(int currentSize, int idleNodes, int usedNodes, int allocationInProgressNodes, int maxNodes,
int minNodes) {
NodePoolStats(int currentSize, int idleNodes, int usedNodes, int maxNodes, int minNodes) {
this.currentSize = currentSize;
this.idleNodes = idleNodes;
this.usedNodes = usedNodes;
this.allocationInProgressNodes = allocationInProgressNodes;
this.maxNodes = maxNodes;
this.minNodes = minNodes;
}
@ -79,12 +76,4 @@ public class NodePoolStats {
return minNodes;
}
/**
* The number of nodes that are currently being allocated in the backend provider but are not yet
* in the pool.
*/
public int allocationInProgressNodes() {
return allocationInProgressNodes;
}
}

View File

@ -0,0 +1,39 @@
package org.jclouds.nodepool.config;
import java.io.InputStream;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.util.Map;
import javax.inject.Named;
import javax.inject.Singleton;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.compute.config.BaseComputeServiceContextModule;
import org.jclouds.nodepool.internal.JsonNodeMetadataStore;
import org.jclouds.nodepool.internal.NodeMetadataStore;
import com.google.inject.BindingAnnotation;
import com.google.inject.Provides;
public class NodePoolComputServiceContextModule extends BaseComputeServiceContextModule {
@Retention(RetentionPolicy.RUNTIME)
@BindingAnnotation
public @interface Internal {
}
@Override
protected void configure() {
super.configure();
bind(NodeMetadataStore.class).annotatedWith(Internal.class).to(JsonNodeMetadataStore.class);
}
@Provides
@Singleton
public Map<String, InputStream> provideInputStreamMapFromBlobStore(BlobStoreContext in,
@Named(NodeMetadataStore.CONTAINER) String container) {
return in.createInputStreamMap(container);
}
}

View File

@ -18,12 +18,10 @@
*/
package org.jclouds.nodepool.internal;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.find;
import static com.google.common.base.Preconditions.checkState;
import java.io.Closeable;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Set;
@ -36,24 +34,22 @@ import org.jclouds.compute.domain.ExecResponse;
import org.jclouds.compute.domain.Hardware;
import org.jclouds.compute.domain.Image;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.domain.NodeMetadataBuilder;
import org.jclouds.compute.domain.Template;
import org.jclouds.compute.domain.TemplateBuilder;
import org.jclouds.compute.extensions.ImageExtension;
import org.jclouds.compute.options.RunScriptOptions;
import org.jclouds.compute.options.TemplateOptions;
import org.jclouds.compute.predicates.NodePredicates;
import org.jclouds.domain.Location;
import org.jclouds.scriptbuilder.domain.Statement;
import org.jclouds.util.Maps2;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.HashMultimap;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
@ -64,6 +60,7 @@ import com.google.common.util.concurrent.ListenableFuture;
* @author David Alves
*
*/
public abstract class BaseNodePoolComputeService implements ComputeService, Closeable {
protected final ComputeService backingComputeService;
@ -72,18 +69,17 @@ public abstract class BaseNodePoolComputeService implements ComputeService, Clos
protected final Image image;
protected final Hardware hardware;
protected final Location location;
// assignments of nodes to group names
protected final Multimap<String, NodeMetadata> assignments = HashMultimap.create();
protected final NodeMetadataStore metadataStore;
public BaseNodePoolComputeService(ComputeServiceContext backingComputeServiceContext, String poolGroupNamePrefix,
Template backingTemplate) {
Template backingTemplate, NodeMetadataStore metadataStore) {
this.backingComputeService = backingComputeServiceContext.getComputeService();
this.poolGroupName = poolGroupNamePrefix;
this.template = backingTemplate == null ? this.backingComputeService.templateBuilder().build() : backingTemplate;
this.image = this.template.getImage();
this.hardware = this.template.getHardware();
this.location = this.template.getLocation();
this.metadataStore = metadataStore;
}
/**
@ -93,64 +89,47 @@ public abstract class BaseNodePoolComputeService implements ComputeService, Clos
* @param filter
* @return
*/
private Predicate<NodeMetadata> transformUserPredicateSpecificIdPredicate(Predicate<NodeMetadata> filter) {
Iterable<Map.Entry<String, NodeMetadata>> relevantAssginemnts = filterAssignmentsBasedOnUserPredicate(filter);
final Set<String> ids = Sets.newHashSet();
for (Map.Entry<String, NodeMetadata> assignment : relevantAssginemnts) {
ids.add(assignment.getValue().getId());
}
protected Predicate<NodeMetadata> transformUserPredicateInSpecificIdPredicate(Predicate<NodeMetadata> filter) {
final Set<NodeMetadata> filteredNodes = filterFrontendNodesBasedOnUserPredicate(filter);
return new Predicate<NodeMetadata>() {
@Override
public boolean apply(NodeMetadata input) {
return ids.contains(input.getId());
return filteredNodes.contains(input);
}
};
}// TODO this is n^2 expensive. s
private Map<? extends NodeMetadata, ExecResponse> transformBackendExecutionMapIntoFrontend(
Map<? extends NodeMetadata, ExecResponse> backendMap) {
Map<NodeMetadata, ExecResponse> frontendMap = Maps.newHashMapWithExpectedSize(backendMap.size());
for (Map.Entry<? extends NodeMetadata, ExecResponse> entry : backendMap.entrySet()) {
Map.Entry<String, NodeMetadata> assignmentEntry = findAssigmentEntry(entry.getKey().getId());
frontendMap
.put(toFrontendNodemetadata(assignmentEntry.getValue(), assignmentEntry.getKey()), entry.getValue());
}
return frontendMap;
}
protected Map.Entry<String, NodeMetadata> findAssigmentEntry(final String id) {
// TODO reverse lookup data structure would be faster but will pools be that big ?
return find(assignments.entries(), new Predicate<Map.Entry<String, NodeMetadata>>() {
private <T extends NodeMetadata> Map<T, ExecResponse> transformBackendExecutionMapIntoFrontend(
Map<T, ExecResponse> backendMap) {
return Maps2.transformKeys(backendMap, new Function<T, T>() {
@SuppressWarnings("unchecked")
@Override
public boolean apply(Entry<String, NodeMetadata> entry) {
return entry.getValue().getId().equals(id);
public T apply(T input) {
return (T) metadataStore.load(input);
}
});
}
protected NodeMetadata toFrontendNodemetadata(NodeMetadata backendNodeMetadata, String group) {
return NodeMetadataBuilder.fromNodeMetadata(backendNodeMetadata).group(group).build();
}
/**
* Because a lot of predicates are based on group info we need that to check wether the predicate
* matches.
*/
protected Iterable<Map.Entry<String, NodeMetadata>> filterAssignmentsBasedOnUserPredicate(
final Predicate<NodeMetadata> userFilter) {
return filter(assignments.entries(), new Predicate<Map.Entry<String, NodeMetadata>>() {
protected Set<NodeMetadata> filterFrontendNodesBasedOnUserPredicate(final Predicate<NodeMetadata> userFilter) {
return Sets.filter(metadataStore.loadAll(getBackendNodes()), new Predicate<NodeMetadata>() {
@Override
public boolean apply(Entry<String, NodeMetadata> input) {
return userFilter.apply(toFrontendNodemetadata(input.getValue(), input.getKey()));
public boolean apply(NodeMetadata input) {
return userFilter.apply(input);
}
});
}
@Override
public NodeMetadata getNodeMetadata(String id) {
Map.Entry<String, NodeMetadata> assigmentEntry = findAssigmentEntry(id);
return toFrontendNodemetadata(assigmentEntry.getValue(), assigmentEntry.getKey());
NodeMetadata backendMetadata = backingComputeService.getNodeMetadata(id);
checkState(backendMetadata.getGroup().equals(backendMetadata));
return metadataStore.load(backendMetadata);
}
@Override
@ -169,14 +148,14 @@ public abstract class BaseNodePoolComputeService implements ComputeService, Clos
public Map<? extends NodeMetadata, ExecResponse> runScriptOnNodesMatching(Predicate<NodeMetadata> filter,
String runScript, RunScriptOptions options) throws RunScriptOnNodesException {
return transformBackendExecutionMapIntoFrontend(backingComputeService.runScriptOnNodesMatching(
transformUserPredicateSpecificIdPredicate(filter), runScript, options));
transformUserPredicateInSpecificIdPredicate(filter), runScript, options));
}
@Override
public Map<? extends NodeMetadata, ExecResponse> runScriptOnNodesMatching(Predicate<NodeMetadata> filter,
Statement runScript, RunScriptOptions options) throws RunScriptOnNodesException {
return transformBackendExecutionMapIntoFrontend(backingComputeService.runScriptOnNodesMatching(
transformUserPredicateSpecificIdPredicate(filter), runScript, options));
transformUserPredicateInSpecificIdPredicate(filter), runScript, options));
}
@Override
@ -187,28 +166,22 @@ public abstract class BaseNodePoolComputeService implements ComputeService, Clos
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public Set<? extends NodeMetadata> listNodesDetailsMatching(Predicate filter) {
return FluentIterable.from(filterAssignmentsBasedOnUserPredicate(filter))
.transform(new Function<Map.Entry<String, NodeMetadata>, NodeMetadata>() {
@Override
public NodeMetadata apply(Entry<String, NodeMetadata> input) {
return toFrontendNodemetadata(input.getValue(), input.getKey());
}
}).toImmutableSet();
return filterFrontendNodesBasedOnUserPredicate(filter);
}
@Override
public void rebootNodesMatching(final Predicate<NodeMetadata> filter) {
backingComputeService.rebootNodesMatching(transformUserPredicateSpecificIdPredicate(filter));
backingComputeService.rebootNodesMatching(transformUserPredicateInSpecificIdPredicate(filter));
}
@Override
public void resumeNodesMatching(Predicate<NodeMetadata> filter) {
backingComputeService.resumeNodesMatching(transformUserPredicateSpecificIdPredicate(filter));
backingComputeService.resumeNodesMatching(transformUserPredicateInSpecificIdPredicate(filter));
}
@Override
public void suspendNodesMatching(Predicate<NodeMetadata> filter) {
backingComputeService.suspendNodesMatching(transformUserPredicateSpecificIdPredicate(filter));
backingComputeService.suspendNodesMatching(transformUserPredicateInSpecificIdPredicate(filter));
}
@Override
@ -222,13 +195,12 @@ public abstract class BaseNodePoolComputeService implements ComputeService, Clos
@Override
public Set<? extends NodeMetadata> createNodesInGroup(String group, int count, Template template)
throws RunNodesException {
return createNodesInGroup(group, count);
return createNodesInGroup(group, count, template.getOptions());
}
@Override
public Set<? extends NodeMetadata> createNodesInGroup(String group, int count, TemplateOptions templateOptions)
throws RunNodesException {
return createNodesInGroup(group, count);
public Set<? extends NodeMetadata> createNodesInGroup(String group, int count) throws RunNodesException {
return createNodesInGroup(group, count, template.getOptions());
}
@Override
@ -264,7 +236,7 @@ public abstract class BaseNodePoolComputeService implements ComputeService, Clos
@Override
public void suspendNode(String id) {
if (findAssigmentEntry(id) != null) {
if (getNodeMetadata(id) != null) {
backingComputeService.suspendNode(id);
}
throw new NoSuchElementException(id);
@ -272,7 +244,7 @@ public abstract class BaseNodePoolComputeService implements ComputeService, Clos
@Override
public void resumeNode(String id) {
if (findAssigmentEntry(id) != null) {
if (getNodeMetadata(id) != null) {
backingComputeService.resumeNode(id);
}
throw new NoSuchElementException(id);
@ -280,7 +252,7 @@ public abstract class BaseNodePoolComputeService implements ComputeService, Clos
@Override
public void rebootNode(String id) {
if (findAssigmentEntry(id) != null) {
if (getNodeMetadata(id) != null) {
backingComputeService.rebootNode(id);
}
throw new NoSuchElementException(id);
@ -288,7 +260,7 @@ public abstract class BaseNodePoolComputeService implements ComputeService, Clos
@Override
public ExecResponse runScriptOnNode(String id, Statement runScript) {
if (findAssigmentEntry(id) != null) {
if (getNodeMetadata(id) != null) {
return runScriptOnNode(id, runScript, new RunScriptOptions());
}
throw new NoSuchElementException(id);
@ -296,7 +268,7 @@ public abstract class BaseNodePoolComputeService implements ComputeService, Clos
@Override
public ExecResponse runScriptOnNode(String id, String runScript) {
if (findAssigmentEntry(id) != null) {
if (getNodeMetadata(id) != null) {
return runScriptOnNode(id, runScript, new RunScriptOptions());
}
throw new NoSuchElementException(id);
@ -305,7 +277,7 @@ public abstract class BaseNodePoolComputeService implements ComputeService, Clos
@Override
public ExecResponse runScriptOnNode(String id, Statement runScript, RunScriptOptions options) {
if (findAssigmentEntry(id) != null) {
if (getNodeMetadata(id) != null) {
return backingComputeService.runScriptOnNode(id, runScript, options);
}
throw new NoSuchElementException(id);
@ -313,7 +285,7 @@ public abstract class BaseNodePoolComputeService implements ComputeService, Clos
@Override
public ListenableFuture<ExecResponse> submitScriptOnNode(String id, Statement runScript, RunScriptOptions options) {
if (findAssigmentEntry(id) != null) {
if (getNodeMetadata(id) != null) {
return backingComputeService.submitScriptOnNode(id, runScript, options);
}
throw new NoSuchElementException(id);
@ -321,20 +293,32 @@ public abstract class BaseNodePoolComputeService implements ComputeService, Clos
@Override
public ExecResponse runScriptOnNode(String id, String runScript, RunScriptOptions options) {
if (findAssigmentEntry(id) != null) {
if (getNodeMetadata(id) != null) {
return backingComputeService.runScriptOnNode(id, runScript, options);
}
throw new NoSuchElementException(id);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public Set<NodeMetadata> getBackendNodes() {
return (Set<NodeMetadata>) backingComputeService.listNodesDetailsMatching((Predicate) NodePredicates
.inGroup(poolGroupName));
}
protected void addToPool(int number) {
try {
backingComputeService.createNodesInGroup(poolGroupName, number, template);
} catch (RunNodesException e) {
throw Throwables.propagate(e);
}
}
public abstract int idleNodes();
public abstract int maxNodes();
public abstract int minNodes();
public abstract int allocationInProgressNodes();
public abstract int usedNodes();
public abstract int currentSize();

View File

@ -19,43 +19,36 @@
package org.jclouds.nodepool.internal;
import static com.google.common.collect.Iterables.removeIf;
import static com.google.common.collect.Iterables.transform;
import static com.google.common.base.Preconditions.checkState;
import static org.jclouds.nodepool.config.NodePoolComputeServiceProperties.BACKING_GROUP_PROPERTY;
import static org.jclouds.nodepool.config.NodePoolComputeServiceProperties.BACKING_TEMPLATE_PROPERTY;
import static org.jclouds.nodepool.config.NodePoolComputeServiceProperties.MAX_SIZE_PROPERTY;
import static org.jclouds.nodepool.config.NodePoolComputeServiceProperties.MIN_SIZE_PROPERTY;
import static org.jclouds.nodepool.config.NodePoolComputeServiceProperties.REMOVE_DESTROYED_PROPERTY;
import java.util.Collections;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import org.jclouds.compute.ComputeServiceContext;
import org.jclouds.compute.RunNodesException;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.domain.NodeMetadata.Status;
import org.jclouds.compute.domain.NodeMetadataBuilder;
import org.jclouds.compute.domain.Template;
import org.jclouds.compute.options.TemplateOptions;
import org.jclouds.compute.predicates.NodePredicates;
import org.jclouds.compute.reference.ComputeServiceConstants;
import org.jclouds.logging.Logger;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;
/**
* An eager {@link NodePoolComputeService}. Eagerly builds and maintains a pool of nodes. It's only
@ -64,6 +57,7 @@ import com.google.common.collect.Sets;
* @author David Alves
*
*/
@Singleton
public class EagerNodePoolComputeService extends BaseNodePoolComputeService {
@Resource
@ -71,189 +65,97 @@ public class EagerNodePoolComputeService extends BaseNodePoolComputeService {
protected Logger logger = Logger.NULL;
private final int maxSize;
private final boolean reuseDestroyed;
private final int minSize;
// set of available nodes
private Set<NodeMetadata> available = Sets.newHashSet();
// lock associated with changes to the pool since they happen asynchronously
private final Lock lock = new ReentrantLock();
// all the nodes in the pool (associated or not)
private final Set<NodeMetadata> poolNodes = Sets.newLinkedHashSet();
private final boolean reuseDestroyed;
@Inject
public EagerNodePoolComputeService(ComputeServiceContext backingComputeServiceContext,
@Named(BACKING_GROUP_PROPERTY) String poolGroupPrefix, @Named(MAX_SIZE_PROPERTY) int maxSize,
@Named(MIN_SIZE_PROPERTY) int minSize, @Named(REMOVE_DESTROYED_PROPERTY) boolean readdDestroyed,
@Nullable @Named(BACKING_TEMPLATE_PROPERTY) Template backingTemplate) {
super(backingComputeServiceContext, poolGroupPrefix, backingTemplate);
@Nullable @Named(BACKING_TEMPLATE_PROPERTY) Template backingTemplate, NodeMetadataStore storage) {
super(backingComputeServiceContext, poolGroupPrefix, backingTemplate, storage);
this.maxSize = maxSize;
this.minSize = minSize;
this.reuseDestroyed = readdDestroyed;
}
@PostConstruct
public void startPool() throws RunNodesException {
increasePoolSize(minSize);
public void startEagerPool() {
Set<NodeMetadata> backendNodes = getBackendNodes();
if (backendNodes.size() < minSize) {
addToPool(backendNodes.size() - minSize);
}
}
@Override
public synchronized Set<? extends NodeMetadata> createNodesInGroup(String group, int count) throws RunNodesException {
try {
return assignPoolNodes(group, count);
} catch (Exception e) {
Set<NodeMetadata> nodes = Collections.emptySet();
Map<String, Exception> executionExceptions = ImmutableMap.of("poolnode", e);
Map<NodeMetadata, Exception> failedNodes = ImmutableMap.of(
new NodeMetadataBuilder().id("poolnode").status(Status.ERROR).build(), e);
throw new RunNodesException(group, count, template, nodes, executionExceptions, failedNodes);
public synchronized Set<? extends NodeMetadata> createNodesInGroup(String group, int count,
TemplateOptions templateOptions) throws RunNodesException {
Set<NodeMetadata> backendNodes = getBackendNodes();
Set<NodeMetadata> frontendNodes = metadataStore.loadAll(backendNodes);
checkState(frontendNodes.size() + count < maxSize,
"cannot add more nodes to pool [requested: %s, current: %s, max: %s]", count, frontendNodes.size(),
maxSize);
SetView<NodeMetadata> availableNodes = Sets.difference(backendNodes, frontendNodes);
Set<NodeMetadata> newFrontEndAssignments = Sets.newHashSet();
int i = 0;
for (Iterator<NodeMetadata> iter = availableNodes.iterator(); iter.hasNext() && i < count; i++) {
// TODO here we should run stuff on the nodes, like the initial scripts and
// Credentials handling.
newFrontEndAssignments.add(metadataStore.store(iter.next(), templateOptions, group));
}
return newFrontEndAssignments;
}
@Override
public synchronized void destroyNode(String id) {
unassignNode(id);
checkState(getNodeMetadata(id) != null);
metadataStore.deleteMapping(id);
if (!reuseDestroyed) {
backingComputeService.destroyNode(id);
addToPool(1);
}
}
@Override
public synchronized Set<? extends NodeMetadata> destroyNodesMatching(Predicate<NodeMetadata> filter) {
// copy the set of nodes to unassign because we'll be altering the assignments map.
Set<Map.Entry<String, NodeMetadata>> poolNodesToUnassign = Sets
.newHashSet(filterAssignmentsBasedOnUserPredicate(filter));
// TODO this should be done in parallel since it can take quite a while, moreover the contract
// for any destroy node action should probably be that the pool has at least minSize nodes
// before it returns.
for (Map.Entry<String, NodeMetadata> poolNode : poolNodesToUnassign) {
unassignNode(poolNode.getValue().getId());
Set<NodeMetadata> frontendNodes = Sets.filter(metadataStore.loadAll(getBackendNodes()), filter);
for (NodeMetadata node : frontendNodes) {
metadataStore.deleteMapping(node.getId());
}
return Sets.newHashSet(transform(poolNodesToUnassign,
new Function<Map.Entry<String, NodeMetadata>, NodeMetadata>() {
@Override
public NodeMetadata apply(final Map.Entry<String, NodeMetadata> input) {
assignments.remove(input.getKey(), input.getValue());
return toFrontendNodemetadata(input.getValue(), input.getKey());
}
}));
}
/**
* Adds nodes to the pool, using the pool's group name. Lock the pool so that no-one tries to
* increase/decrease until we're finished but we'll return from the method well before the pool
* as enough nodes.
*
* @throws RunNodesException
*/
private void increasePoolSize(final int size) throws RunNodesException {
lock.lock();
logger.debug(">> increasing pool size, available: %s total: %s min; %s max: %s increasing to: %s",
available.size(), poolNodes.size(), minSize, maxSize, size);
try {
Set<? extends NodeMetadata> original = backingComputeService.createNodesInGroup(poolGroupName, size, template);
poolNodes.addAll(original);
available.addAll(original);
logger.debug("<< pool size increased, available: %s total: %s min; %s max: %s increasing to: %s",
available.size(), poolNodes.size(), minSize, maxSize, size);
logger.info("pool started, status: %s min; %s max: %s", available.size(), minSize, maxSize);
} finally {
lock.unlock();
if (!reuseDestroyed) {
backingComputeService.destroyNodesMatching(transformUserPredicateInSpecificIdPredicate(filter));
addToPool(frontendNodes.size());
}
}
/**
* Unassigns the node with the provided id. If the we're set to reuse the nodes it adds it to the
* available pool, if not is destroys the backing node, removes if from the poll and increases
* the pool size by one.
*/
private NodeMetadata unassignNode(final String nodeId) {
Map.Entry<String, NodeMetadata> entry = findAssigmentEntry(nodeId);
assignments.remove(entry.getKey(), entry.getValue());
// if we're reusing destroyed simply add to the available nodes
if (reuseDestroyed) {
available.add(entry.getValue());
return entry.getValue();
}
// if not we need to destroy the backing node
lock.lock();
try {
backingComputeService.destroyNode(nodeId);
removeIf(poolNodes, new Predicate<NodeMetadata>() {
@Override
public boolean apply(NodeMetadata input) {
return input.getId().equals(nodeId);
}
});
if (poolNodes.size() < minSize) {
try {
increasePoolSize(1);
} catch (RunNodesException e) {
throw Throwables.propagate(e);
}
}
} finally {
lock.unlock();
}
return entry.getValue();
}
/**
* Used to assign size pool nodes to a group. If not enough nodes are available we check if we
* can increase the pool if that is enough, otherwise we complain.
*
* @throws RunNodesException
*/
private Set<? extends NodeMetadata> assignPoolNodes(String groupName, int size) throws InterruptedException,
ExecutionException, RunNodesException {
if (available.size() < size) {
if (poolNodes.size() + size > maxSize) {
// TODO think of a better exception
throw new IllegalStateException(
"not enough nodes available and cannot add enough nodes to pool [available: " + available.size()
+ " total: " + poolNodes.size() + " min: " + minSize + " max: " + maxSize
+ " requested: " + size + "]");
}
increasePoolSize(size - available.size());
}
Set<NodeMetadata> groupNodes = Sets.newHashSet();
Iterator<NodeMetadata> iter = available.iterator();
for (int i = 0; i < size && iter.hasNext(); i++) {
NodeMetadata node = iter.next();
assignments.put(groupName, node);
iter.remove();
groupNodes.add(toFrontendNodemetadata(node, groupName));
}
return groupNodes;
return frontendNodes;
}
@Override
public void close() {
// lock just to make sure we have the correct pool size
lock.lock();
try {
logger.info("Closing pooled compute service with {} nodes", currentSize());
available.clear();
assignments.clear();
poolNodes.clear();
backingComputeService.destroyNodesMatching(NodePredicates.inGroup(poolGroupName));
} catch (Exception e) {
lock.unlock();
}
public synchronized void close() throws IOException {
metadataStore.deleteAllMappings();
backingComputeService.destroyNodesMatching(NodePredicates.inGroup(poolGroupName));
}
@Override
public int allocationInProgressNodes() {
// TODO Auto-generated method stub
return 0;
public int currentSize() {
return getBackendNodes().size();
}
@Override
public int idleNodes() {
return available.size();
Set<NodeMetadata> backendNodes = getBackendNodes();
Set<NodeMetadata> frontendNodes = metadataStore.loadAll(backendNodes);
return backendNodes.size() - frontendNodes.size();
}
@Override
public int maxNodes() {
return maxSize;
}
@ -265,12 +167,7 @@ public class EagerNodePoolComputeService extends BaseNodePoolComputeService {
@Override
public int usedNodes() {
return currentSize() - idleNodes();
}
@Override
public int currentSize() {
return poolNodes.size();
return metadataStore.loadAll(getBackendNodes()).size();
}
}

View File

@ -0,0 +1,115 @@
package org.jclouds.nodepool.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Set;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.domain.NodeMetadataBuilder;
import org.jclouds.compute.options.TemplateOptions;
import org.jclouds.domain.LoginCredentials;
import org.jclouds.json.Json;
import org.jclouds.util.Strings2;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import com.google.inject.Singleton;
/**
* An implementation of {@link NodeMetadataStore} that stores all that is needed by building a json
* string.
*
* @author David Alves
*
*/
@Singleton
public class JsonNodeMetadataStore implements NodeMetadataStore {
private Map<String, InputStream> storage;
private final Json json;
private static class JsonUserNodeMetadata {
private String userGroup;
private Set<String> userTags;
private Map<String, String> userMetadata;
private String user;
private String password;
private String privateKey;
private Boolean authenticateSudo;
}
@Inject
public JsonNodeMetadataStore(Map<String, InputStream> storage, Json json) {
this.storage = storage;
this.json = json;
}
@Override
public NodeMetadata store(NodeMetadata backendNodeMetadata, TemplateOptions userOptions, String userGroup) {
checkNotNull(backendNodeMetadata);
checkNotNull(userGroup);
checkNotNull(userOptions);
JsonUserNodeMetadata jsonMetadata = new JsonUserNodeMetadata();
jsonMetadata.user = userOptions.getLoginUser();
jsonMetadata.password = userOptions.getLoginPassword();
jsonMetadata.privateKey = userOptions.getLoginPrivateKey();
jsonMetadata.authenticateSudo = userOptions.shouldAuthenticateSudo();
jsonMetadata.userMetadata = userOptions.getUserMetadata();
jsonMetadata.userTags = userOptions.getTags();
jsonMetadata.userGroup = userGroup;
storage.put(backendNodeMetadata.getId(), Strings2.toInputStream(json.toJson(jsonMetadata)));
return buildFromJsonAndBackendMetadata(backendNodeMetadata, jsonMetadata);
}
@Override
public NodeMetadata load(NodeMetadata backendNodeMetadata) {
try {
InputStream storedMetadata = storage.get(checkNotNull(backendNodeMetadata).getId());
if (storedMetadata == null) {
return null;
}
String jsonMetadataAsString = Strings2.toStringAndClose(storedMetadata);
JsonUserNodeMetadata jsonMetadata = json.fromJson(jsonMetadataAsString, JsonUserNodeMetadata.class);
return buildFromJsonAndBackendMetadata(backendNodeMetadata, jsonMetadata);
} catch (IOException e) {
throw Throwables.propagate(e);
}
}
private NodeMetadata buildFromJsonAndBackendMetadata(NodeMetadata backendNodeMetadata,
JsonUserNodeMetadata jsonMetadata) {
return NodeMetadataBuilder
.fromNodeMetadata(backendNodeMetadata)
.tags(jsonMetadata.userTags)
.group(jsonMetadata.userGroup)
.userMetadata(jsonMetadata.userMetadata)
.credentials(
new LoginCredentials(jsonMetadata.user, jsonMetadata.password, jsonMetadata.privateKey,
jsonMetadata.authenticateSudo)).build();
}
@Override
public void deleteAllMappings() {
storage.clear();
}
@Override
public void deleteMapping(String backendNodeId) {
storage.remove(backendNodeId);
}
public Set<NodeMetadata> loadAll(Set<NodeMetadata> backendNodes) {
return ImmutableSet.copyOf(Iterables.transform(backendNodes, new Function<NodeMetadata, NodeMetadata>() {
@Override
public NodeMetadata apply(NodeMetadata input) {
return load(input);
}
}));
}
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.nodepool.internal;
import java.util.Set;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.options.TemplateOptions;
/**
* Stores/Loads frontend {@link NodeMetadata} mappings.
*
* @author David Alves
*
*/
public interface NodeMetadataStore {
public static final String CONTAINER = "jclouds.nodepool.metadatastore.container";
/**
* Associates the provided user options and group with the provided backend {@link NodeMetadata},
* then build a frontend version of node metadata that has some fields from the backend node such
* as id, name or location, and some fields from the provided userOptions, such as userMetadata
* or tags.
*
* @param backendNode
* the backend node's {@link NodeMetadata}
* @param userOptions
* the user provided options
* @param userGroup
* the user selected group
* @return a version of NodeMetadata that includes information from the backend node and form the
* user provided options and group.
*/
public NodeMetadata store(NodeMetadata backendNode, TemplateOptions userOptions, String userGroup);
/**
* Removes the mapping from storage.
*
* @param backendNodeId
*/
public void deleteMapping(String backendNodeId);
/**
* Clears all mappings.
*/
public void deleteAllMappings();
/**
* Loads the previously stored user {@link NodeMetadata} corresponding to the provided backend
* {@link NodeMetadata}.
*
* @param backendNode
*
* @return the frontend {@link NodeMetadata} or null of this backend node has no mapping
*/
public NodeMetadata load(NodeMetadata backendNode);
/**
* Loads frontend {@link NodeMetadata} for all provided backend nodes.
*
* @param backendNodes
* @return
*/
public Set<NodeMetadata> loadAll(Set<NodeMetadata> backendNodes);
}

View File

@ -0,0 +1,71 @@
package org.jclouds.nodepool.internal;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.options.TemplateOptions;
import org.jclouds.nodepool.config.NodePoolComputServiceContextModule.Internal;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
@Singleton
public class NodeMetadataStoreCache implements NodeMetadataStore {
private Map<String, NodeMetadata> frontendMetadataCache = new HashMap<String, NodeMetadata>();
private NodeMetadataStore backing;
@Inject
public NodeMetadataStoreCache(@Internal NodeMetadataStore backing) {
this.backing = backing;
}
@Override
public synchronized NodeMetadata store(NodeMetadata backendNode, TemplateOptions userOptions, String userGroup) {
NodeMetadata frontEndNode = backing.store(backendNode, userOptions, userGroup);
frontendMetadataCache.put(backendNode.getId(), frontEndNode);
return frontEndNode;
}
@Override
public synchronized void deleteMapping(String backendNodeId) {
frontendMetadataCache.remove(backendNodeId);
backing.deleteMapping(backendNodeId);
}
@Override
public synchronized void deleteAllMappings() {
frontendMetadataCache.clear();
backing.deleteAllMappings();
}
@Override
public synchronized NodeMetadata load(NodeMetadata backendNode) {
NodeMetadata frontendNode = frontendMetadataCache.get(backendNode.getId());
if (frontendNode == null) {
frontendNode = backing.load(backendNode);
if (frontendNode != null) {
frontendMetadataCache.put(backendNode.getId(), frontendNode);
}
}
return frontendNode;
}
@Override
public synchronized Set<NodeMetadata> loadAll(Set<NodeMetadata> backendNodes) {
return ImmutableSet.copyOf(Iterables.transform(backendNodes, new Function<NodeMetadata, NodeMetadata>() {
@Override
public NodeMetadata apply(NodeMetadata input) {
return load(input);
}
}));
}
}

View File

@ -0,0 +1,71 @@
package org.jclouds.nodepool;
import org.jclouds.compute.StubComputeServiceIntegrationTest;
import org.jclouds.compute.stub.config.StubComputeServiceContextModule;
import org.jclouds.filesystem.config.FilesystemBlobStoreContextModule;
import org.testng.annotations.Test;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Module;
@Test(groups = "unit", testName = "NodePoolComputeServiceTest")
public class NodePoolComputeServiceStubTest extends StubComputeServiceIntegrationTest {
public NodePoolComputeServiceStubTest() {
provider = "nodepool";
}
@Override
protected Iterable<Module> setupModules() {
return ImmutableSet.<Module> of(new StubComputeServiceContextModule(), new FilesystemBlobStoreContextModule());
}
// public void testStartPool() throws InterruptedException, ExecutionException, RunNodesException
// {
// NodePoolStats stats = nodePoolComputeServiceContext.getPoolStats();
//
// assertEquals(stats.idleNodes(), 5);
// assertEquals(stats.currentSize(), 5);
// assertEquals(stats.maxNodes(), 10);
// }
//
// @Test(dependsOnMethods = "testStartPool", groups = { "unit", "poolStarted" })
// public void testAllocateMinNodes() throws RunNodesException {
// this.nodePoolComputeService.createNodesInGroup("1", 5);
// NodePoolStats stats = nodePoolComputeServiceContext.getPoolStats();
// // this pool is not supposed to add nodes past min until we request them
// assertEquals(stats.idleNodes(), 0);
// assertEquals(stats.currentSize(), 5);
// }
//
// @Test(dependsOnMethods = "testAllocateMinNodes", groups = { "unit", "poolStarted" })
// public void testAllocateUpToMaxNodes() throws RunNodesException {
// this.nodePoolComputeService.createNodesInGroup("2", 5);
// NodePoolStats stats = nodePoolComputeServiceContext.getPoolStats();
// assertEquals(stats.idleNodes(), 0);
// assertEquals(stats.currentSize(), 10);
// }
//
// @Test(dependsOnMethods = "testAllocateUpToMaxNodes", groups = { "unit", "poolStarted" },
// expectedExceptions = RunNodesException.class)
// public void testAllocateMoreNodesFails() throws RunNodesException {
// this.nodePoolComputeService.createNodesInGroup("3", 5);
// NodePoolStats stats = nodePoolComputeServiceContext.getPoolStats();
// }
//
// @Test(dependsOnMethods = "testAllocateUpToMaxNodes", groups = { "unit", "poolStarted" })
// public void testDeallocatingNodesAndReallocating() throws RunNodesException {
// this.nodePoolComputeService.destroyNodesMatching(NodePredicates.inGroup("2"));
// NodePoolStats stats = nodePoolComputeServiceContext.getPoolStats();
// assertEquals(stats.idleNodes(), 5);
// this.nodePoolComputeService.createNodesInGroup("2", 5);
// }
//
// @Test(dependsOnGroups = "poolStarted")
// public void testClose() throws IOException {
// NodePoolStats stats = nodePoolComputeServiceContext.getPoolStats();
// ((Closeable) this.nodePoolComputeService).close();
// assertEquals(stats.currentSize(), 0);
// }
}

View File

@ -1,69 +0,0 @@
package org.jclouds.nodepool;
import static org.testng.Assert.assertEquals;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import org.jclouds.ContextBuilder;
import org.jclouds.compute.ComputeServiceContext;
import org.jclouds.compute.RunNodesException;
import org.jclouds.compute.predicates.NodePredicates;
import org.jclouds.nodepool.internal.EagerNodePoolComputeService;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Test(groups = "unit", testName = "NodePoolComputeServiceTest")
public class NodePoolComputeServiceTest {
private EagerNodePoolComputeService pooledComputeService;
@BeforeClass
public void setUp() {
ComputeServiceContext stubCtx = ContextBuilder.newBuilder("stub").buildView(ComputeServiceContext.class);
this.pooledComputeService = new EagerNodePoolComputeService(stubCtx, "pool", 10, 5, true, stubCtx
.getComputeService().templateBuilder().build());
}
public void testStartPool() throws InterruptedException, ExecutionException, RunNodesException {
this.pooledComputeService.startPool();
assertEquals(pooledComputeService.idleNodes(), 5);
assertEquals(pooledComputeService.currentSize(), 5);
assertEquals(pooledComputeService.maxNodes(), 10);
}
@Test(dependsOnMethods = "testStartPool", groups = { "unit", "poolStarted" })
public void testAllocateMinNodes() throws RunNodesException {
this.pooledComputeService.createNodesInGroup("1", 5);
// this pool is not supposed to add nodes past min until we request them
assertEquals(pooledComputeService.idleNodes(), 0);
assertEquals(pooledComputeService.currentSize(), 5);
}
@Test(dependsOnMethods = "testAllocateMinNodes", groups = { "unit", "poolStarted" })
public void testAllocateUpToMaxNodes() throws RunNodesException {
this.pooledComputeService.createNodesInGroup("2", 5);
assertEquals(pooledComputeService.idleNodes(), 0);
assertEquals(pooledComputeService.currentSize(), 10);
}
@Test(dependsOnMethods = "testAllocateUpToMaxNodes", groups = { "unit", "poolStarted" }, expectedExceptions = RunNodesException.class)
public void testAllocateMoreNodesFails() throws RunNodesException {
this.pooledComputeService.createNodesInGroup("3", 5);
System.out.println(this.pooledComputeService.currentSize());
}
@Test(dependsOnMethods = "testAllocateUpToMaxNodes", groups = { "unit", "poolStarted" })
public void testDeallocatingNodesAndReallocating() throws RunNodesException {
this.pooledComputeService.destroyNodesMatching(NodePredicates.inGroup("2"));
assertEquals(pooledComputeService.idleNodes(), 5);
this.pooledComputeService.createNodesInGroup("2", 5);
}
@Test(dependsOnGroups = "poolStarted")
public void testClose() throws IOException {
this.pooledComputeService.close();
assertEquals(pooledComputeService.currentSize(), 0);
}
}