Issue 723:Allow for asynchronous script execution and client script execution handling

This commit is contained in:
Adrian Cole 2011-10-14 22:34:57 -07:00
parent 80e37b5a1b
commit 9e0aaa949b
13 changed files with 608 additions and 190 deletions

View File

@ -22,6 +22,7 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import org.jclouds.compute.callables.ScriptStillRunningException;
import org.jclouds.compute.domain.ComputeMetadata;
import org.jclouds.compute.domain.ExecResponse;
import org.jclouds.compute.domain.Hardware;
@ -32,11 +33,14 @@ import org.jclouds.compute.domain.TemplateBuilder;
import org.jclouds.compute.internal.BaseComputeService;
import org.jclouds.compute.options.RunScriptOptions;
import org.jclouds.compute.options.TemplateOptions;
import org.jclouds.compute.reference.ComputeServiceConstants.Timeouts;
import org.jclouds.domain.Location;
import org.jclouds.io.Payload;
import org.jclouds.scriptbuilder.domain.Statement;
import com.google.common.annotations.Beta;
import com.google.common.base.Predicate;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.ImplementedBy;
/**
@ -333,23 +337,46 @@ public interface ComputeService {
* if the node is not found
* @throws IllegalStateException
* if the node is not in running state
* @throws ScriptStillRunningException
* if the script was still running after {@link Timeouts#scriptComplete}
*
* @see org.jclouds.compute.predicates.NodePredicates#runningWithTag(String)
* @see org.jclouds.scriptbuilder.domain.Statements
*/
ExecResponse runScriptOnNode(String id, Statement runScript, RunScriptOptions options);
/**
* Run the script on a specific node in the background, typically as {@code nohup}
*
* @param id
* node the script is to be executed on
* @param runScript
* statement containing the script to run
* @param options
* nullable options to how to run the script, whether to override credentials
* @return map with node identifiers and corresponding responses
* @throws NoSuchElementException
* if the node is not found
* @throws IllegalStateException
* if the node is not in running state
*
* @see org.jclouds.compute.predicates.NodePredicates#runningWithTag(String)
* @see org.jclouds.scriptbuilder.domain.Statements
*/
@Beta
ListenableFuture<ExecResponse> submitScriptOnNode(String id, Statement runScript, RunScriptOptions options);
/**
* @see #runScriptOnNode(String, Statement, RunScriptOptions)
*/
ExecResponse runScriptOnNode(String id, Statement runScript);
/**
* @see #runScriptOnNode(String, Statement, RunScriptOptions)
* @see org.jclouds.scriptbuilder.domain.Statements#exec
*/
ExecResponse runScriptOnNode(String id, String runScript, RunScriptOptions options);
/**
* @see #runScriptOnNode(String, String, RunScriptOptions)
*/

View File

@ -0,0 +1,197 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.compute.callables;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Date;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.inject.Inject;
import javax.inject.Named;
import org.jclouds.Constants;
import org.jclouds.compute.domain.ExecResponse;
import org.jclouds.compute.predicates.ScriptStatusReturnsZero;
import org.jclouds.compute.reference.ComputeServiceConstants;
import org.jclouds.logging.Logger;
import org.jclouds.predicates.RetryablePredicate;
import org.jclouds.scriptbuilder.InitBuilder;
import org.jclouds.ssh.SshClient;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.inject.assistedinject.Assisted;
/**
* A future that works in tandem with a task that was invoked by {@link InitBuilder}
*
* @author Adrian Cole
*/
public class BlockUntilInitScriptStatusIsZeroThenReturnOutput extends AbstractFuture<ExecResponse> {
public static interface Factory {
BlockUntilInitScriptStatusIsZeroThenReturnOutput create(SudoAwareInitManager commandRunner);
}
@Resource
@Named(ComputeServiceConstants.COMPUTE_LOGGER)
protected Logger logger = Logger.NULL;
private final ExecutorService userThreads;
private final SudoAwareInitManager commandRunner;
private final RetryablePredicate<String> notRunningAnymore;
private boolean shouldCancel;
@Inject
public BlockUntilInitScriptStatusIsZeroThenReturnOutput(
@Named(Constants.PROPERTY_USER_THREADS) ExecutorService userThreads,
final ScriptStatusReturnsZero stateRunning, @Assisted final SudoAwareInitManager commandRunner) {
this.commandRunner = checkNotNull(commandRunner, "commandRunner");
this.userThreads = checkNotNull(userThreads, "userThreads");
// arbitrarily high value, but Long.MAX_VALUE doesn't work!
this.notRunningAnymore = new RetryablePredicate<String>(new Predicate<String>() {
@Override
public boolean apply(String arg0) {
return commandRunner.runAction(arg0).getOutput().trim().equals("");
}
}, TimeUnit.DAYS.toMillis(1)) {
/**
* make sure we stop the retry loop if someone cancelled the future, this keeps threads
* from being consumed on dead tasks
*/
@Override
protected boolean atOrAfter(Date end) {
if (shouldCancel)
Throwables.propagate(new TimeoutException("cancelled"));
return super.atOrAfter(end);
}
};
}
/**
* in case login credentials or user changes at runtime.
*/
public void setSshClient(SshClient client) {
}
/**
* Submits a thread that will either set the result of the future or the exception that took
* place
*/
@PostConstruct
BlockUntilInitScriptStatusIsZeroThenReturnOutput init() {
userThreads.submit(new Runnable() {
@Override
public void run() {
try {
boolean complete = notRunningAnymore.apply("status");
String stdout = commandRunner.runAction("tail").getOutput();
String stderr = commandRunner.runAction("tailerr").getOutput();
// TODO make ScriptBuilder save exit status on nuhup
logger.debug("<< complete(%s) status(%s)", commandRunner.getStatement().getInstanceName(), complete);
set(new ExecResponse(stdout, stderr, complete && !shouldCancel ? 0 : -1));
} catch (Exception e) {
setException(e);
}
}
});
return this;
}
@Override
protected void interruptTask() {
logger.debug("<< cancelled(%s)", commandRunner.getStatement().getInstanceName());
commandRunner.runAction("stop");
shouldCancel = true;
super.interruptTask();
}
@Override
public String toString() {
return String.format("running task[%s]", commandRunner);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((commandRunner == null) ? 0 : commandRunner.hashCode());
result = prime * result + ((logger == null) ? 0 : logger.hashCode());
result = prime * result + ((notRunningAnymore == null) ? 0 : notRunningAnymore.hashCode());
result = prime * result + (shouldCancel ? 1231 : 1237);
result = prime * result + ((userThreads == null) ? 0 : userThreads.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
BlockUntilInitScriptStatusIsZeroThenReturnOutput other = (BlockUntilInitScriptStatusIsZeroThenReturnOutput) obj;
if (commandRunner == null) {
if (other.commandRunner != null)
return false;
} else if (!commandRunner.equals(other.commandRunner))
return false;
if (logger == null) {
if (other.logger != null)
return false;
} else if (!logger.equals(other.logger))
return false;
if (notRunningAnymore == null) {
if (other.notRunningAnymore != null)
return false;
} else if (!notRunningAnymore.equals(other.notRunningAnymore))
return false;
if (shouldCancel != other.shouldCancel)
return false;
if (userThreads == null) {
if (other.userThreads != null)
return false;
} else if (!userThreads.equals(other.userThreads))
return false;
return true;
}
@Override
public ExecResponse get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException,
ExecutionException {
try {
return super.get(timeout, unit);
} catch (TimeoutException e) {
throw new ScriptStillRunningException(e.getMessage(), this);
}
}
}

View File

@ -26,6 +26,7 @@ import org.jclouds.compute.options.RunScriptOptions;
import org.jclouds.scriptbuilder.domain.Statement;
import com.google.common.annotations.Beta;
import com.google.common.util.concurrent.ListenableFuture;
/**
* Separates out how one implements the ability to run a script on a node.
@ -36,11 +37,10 @@ import com.google.common.annotations.Beta;
public interface RunScriptOnNode extends Callable<ExecResponse> {
public interface Factory {
RunScriptOnNode create(NodeMetadata node, String script);
RunScriptOnNode create(NodeMetadata node, Statement script);
RunScriptOnNode create(NodeMetadata node, Statement script, RunScriptOptions options);
ListenableFuture<ExecResponse> submit(NodeMetadata node, Statement script, RunScriptOptions options);
}
/**

View File

@ -42,9 +42,7 @@ import org.jclouds.scriptbuilder.statements.login.AdminAccess;
import org.jclouds.ssh.SshClient;
import org.jclouds.ssh.SshException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Splitter;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
@ -54,21 +52,15 @@ import com.google.inject.assistedinject.AssistedInject;
*
* @author Adrian Cole
*/
public class RunScriptOnNodeAsInitScriptUsingSsh implements RunScriptOnNode {
public class RunScriptOnNodeAsInitScriptUsingSsh extends SudoAwareInitManager implements RunScriptOnNode {
public static final String PROPERTY_INIT_SCRIPT_PATTERN = "jclouds.compute.init-script-pattern";
@Resource
@Named(ComputeServiceConstants.COMPUTE_LOGGER)
protected Logger logger = Logger.NULL;
protected final Function<NodeMetadata, SshClient> sshFactory;
protected NodeMetadata node;
protected final InitBuilder init;
protected final boolean runAsRoot;
protected final String initFile;
protected SshClient ssh;
/**
*
* determines the naming convention of init scripts.
*
* ex. {@code /tmp/init-%s}
@ -80,24 +72,25 @@ public class RunScriptOnNodeAsInitScriptUsingSsh implements RunScriptOnNode {
@AssistedInject
public RunScriptOnNodeAsInitScriptUsingSsh(Function<NodeMetadata, SshClient> sshFactory,
@Assisted NodeMetadata node, @Assisted Statement script, @Assisted RunScriptOptions options) {
this.sshFactory = checkNotNull(sshFactory, "sshFactory");
this.node = checkNotNull(node, "node");
String name = options.getTaskName();
super(sshFactory, options.shouldRunAsRoot(), checkNotNull(node, "node"), init(script, options.getTaskName()));
this.initFile = String.format(initScriptPattern, options.getTaskName());
}
private static InitBuilder init(Statement script, String name) {
if (name == null) {
if (checkNotNull(script, "script") instanceof InitBuilder)
name = InitBuilder.class.cast(script).getInstanceName();
else
name = "jclouds-script-" + System.currentTimeMillis();
}
this.init = checkNotNull(script, "script") instanceof InitBuilder ? InitBuilder.class.cast(script)
: createInitScript(name, script);
this.initFile = String.format(initScriptPattern, name);
this.runAsRoot = options.shouldRunAsRoot();
return checkNotNull(script, "script") instanceof InitBuilder ? InitBuilder.class.cast(script) : createInitScript(
name, script);
}
public static InitBuilder createInitScript(String name, Statement script) {
String path = "/tmp/" + name;
return new InitBuilder(name, path, path, Collections.<String, String> emptyMap(), Collections.singleton(script));
@Override
public RunScriptOnNodeAsInitScriptUsingSsh init() {
super.init();
return this;
}
@Override
@ -112,10 +105,9 @@ public class RunScriptOnNodeAsInitScriptUsingSsh implements RunScriptOnNode {
}
}
@Override
public RunScriptOnNode init() {
ssh = sshFactory.apply(node);
return this;
public static InitBuilder createInitScript(String name, Statement script) {
String path = "/tmp/" + name;
return new InitBuilder(name, path, path, Collections.<String, String> emptyMap(), Collections.singleton(script));
}
public void refreshSshIfNewAdminCredentialsConfigured(AdminAccess input) {
@ -167,55 +159,4 @@ public class RunScriptOnNodeAsInitScriptUsingSsh implements RunScriptOnNode {
ssh.exec(String.format("ln -fs %s %s", initFile, init.getInstanceName()));
}
protected ExecResponse runAction(String action) {
ExecResponse returnVal;
String command = (runAsRoot) ? execScriptAsRoot(action) : execScriptAsDefaultUser(action);
returnVal = runCommand(command);
if (logger.isTraceEnabled())
logger.trace("<< %s[%s]", action, returnVal);
else
logger.debug("<< %s(%d)", action, returnVal.getExitCode());
return returnVal;
}
protected ExecResponse runCommand(String command) {
ExecResponse returnVal;
logger.debug(">> running [%s] as %s@%s", command.replace(node.getAdminPassword() != null ? node
.getAdminPassword() : "XXXXX", "XXXXX"), ssh.getUsername(), ssh.getHostAddress());
returnVal = ssh.exec(command);
return returnVal;
}
@VisibleForTesting
public String execScriptAsRoot(String action) {
String command;
if (node.getCredentials().identity.equals("root")) {
command = "./" + init.getInstanceName() + " " + action;
} else if (node.getAdminPassword() != null) {
command = String.format("echo '%s'|sudo -S ./%s %s", node.getAdminPassword(), init.getInstanceName(), action);
} else {
command = "sudo ./" + init.getInstanceName() + " " + action;
}
return command;
}
protected String execScriptAsDefaultUser(String action) {
return "./" + init.getInstanceName() + " " + action;
}
public NodeMetadata getNode() {
return node;
}
@Override
public String toString() {
return Objects.toStringHelper(this).add("node", node).add("name", init.getInstanceName())
.add("runAsRoot", runAsRoot).toString();
}
@Override
public Statement getStatement() {
return init;
}
}

View File

@ -20,18 +20,19 @@ package org.jclouds.compute.callables;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Named;
import org.jclouds.compute.domain.ExecResponse;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.options.RunScriptOptions;
import org.jclouds.compute.predicates.ScriptStatusReturnsZero.CommandUsingClient;
import org.jclouds.compute.reference.ComputeServiceConstants.Timeouts;
import org.jclouds.scriptbuilder.domain.Statement;
import org.jclouds.ssh.SshClient;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.inject.assistedinject.Assisted;
/**
@ -39,29 +40,38 @@ import com.google.inject.assistedinject.Assisted;
* @author Adrian Cole
*/
public class RunScriptOnNodeAsInitScriptUsingSshAndBlockUntilComplete extends RunScriptOnNodeAsInitScriptUsingSsh {
protected final Predicate<CommandUsingClient> runScriptNotRunning;
protected final Timeouts timeouts;
protected final BlockUntilInitScriptStatusIsZeroThenReturnOutput.Factory statusFactory;
@Inject
public RunScriptOnNodeAsInitScriptUsingSshAndBlockUntilComplete(
@Named("SCRIPT_COMPLETE") Predicate<CommandUsingClient> runScriptNotRunning,
BlockUntilInitScriptStatusIsZeroThenReturnOutput.Factory statusFactory, Timeouts timeouts,
Function<NodeMetadata, SshClient> sshFactory, @Assisted NodeMetadata node, @Assisted Statement script,
@Assisted RunScriptOptions options) {
super(sshFactory, node, script, options);
this.runScriptNotRunning = checkNotNull(runScriptNotRunning, "runScriptNotRunning");
this.statusFactory = checkNotNull(statusFactory, "statusFactory");
this.timeouts = checkNotNull(timeouts, "timeouts");
}
@Override
public ExecResponse doCall() {
ExecResponse returnVal = super.doCall();
boolean complete = runScriptNotRunning.apply(new CommandUsingClient("./" + init.getInstanceName() + " status",
ssh));
logger.debug("<< complete(%s)", complete);
if (logger.isDebugEnabled() || returnVal.getExitCode() != 0) {
logger.debug("<< stdout from %s as %s@%s\n%s", init.getInstanceName(), ssh.getUsername(),
ssh.getHostAddress(), ssh.exec("./" + init.getInstanceName() + " tail").getOutput());
logger.debug("<< stderr from %s as %s@%s\n%s", init.getInstanceName(), ssh.getUsername(),
ssh.getHostAddress(), ssh.exec("./" + init.getInstanceName() + " tailerr").getOutput());
try {
return future().get(timeouts.scriptComplete, TimeUnit.MILLISECONDS);
} catch (Exception e) {
Throwables.propagate(e);
return null;
}
return returnVal;
}
public BlockUntilInitScriptStatusIsZeroThenReturnOutput future() {
ExecResponse returnVal = super.doCall();
if (returnVal.getExitCode() != 0)
logger.warn("task: %s had non-zero exit status: %s", init.getInstanceName(), returnVal);
return statusFactory.create(this).init();
}
@Override
public RunScriptOnNodeAsInitScriptUsingSshAndBlockUntilComplete init() {
return RunScriptOnNodeAsInitScriptUsingSshAndBlockUntilComplete.class.cast(super.init());
}
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.compute.callables;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.concurrent.TimeoutException;
import com.google.common.base.Supplier;
/**
*
* @author Adrian Cole
*/
public class ScriptStillRunningException extends TimeoutException implements
Supplier<BlockUntilInitScriptStatusIsZeroThenReturnOutput> {
/** The serialVersionUID */
private static final long serialVersionUID = -7265376839848564663L;
private final BlockUntilInitScriptStatusIsZeroThenReturnOutput delegate;
public ScriptStillRunningException(String message, BlockUntilInitScriptStatusIsZeroThenReturnOutput delegate) {
super(checkNotNull(message, "message"));
this.delegate = checkNotNull(delegate, "delegate");
}
@Override
public BlockUntilInitScriptStatusIsZeroThenReturnOutput get() {
return delegate;
}
}

View File

@ -0,0 +1,127 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.compute.callables;
import static com.google.common.base.Preconditions.checkNotNull;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.inject.Named;
import org.jclouds.compute.domain.ExecResponse;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.compute.domain.NodeMetadataBuilder;
import org.jclouds.compute.reference.ComputeServiceConstants;
import org.jclouds.logging.Logger;
import org.jclouds.scriptbuilder.InitBuilder;
import org.jclouds.scriptbuilder.statements.login.AdminAccess;
import org.jclouds.ssh.SshClient;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Objects;
/**
*
* @author Adrian Cole
*/
public class SudoAwareInitManager {
@Resource
@Named(ComputeServiceConstants.COMPUTE_LOGGER)
protected Logger logger = Logger.NULL;
protected NodeMetadata node;
protected final InitBuilder init;
protected final boolean runAsRoot;
protected final Function<NodeMetadata, SshClient> sshFactory;
protected SshClient ssh;
public SudoAwareInitManager(Function<NodeMetadata, SshClient> sshFactory, boolean runAsRoot, NodeMetadata node,
InitBuilder init) {
this.sshFactory = checkNotNull(sshFactory, "sshFactory");
this.runAsRoot = runAsRoot;
this.node = node;
this.init = init;
}
@PostConstruct
public SudoAwareInitManager init() {
ssh = sshFactory.apply(node);
return this;
}
public void refreshSshIfNewAdminCredentialsConfigured(AdminAccess input) {
if (input.getAdminCredentials() != null && input.shouldGrantSudoToAdminUser()) {
ssh.disconnect();
logger.debug(">> reconnecting as %s@%s", input.getAdminCredentials().identity, ssh.getHostAddress());
ssh = sshFactory.apply(node = NodeMetadataBuilder.fromNodeMetadata(node).adminPassword(null).credentials(
input.getAdminCredentials()).build());
ssh.connect();
}
}
public ExecResponse runAction(String action) {
ExecResponse returnVal;
String command = (runAsRoot) ? execScriptAsRoot(action) : execScriptAsDefaultUser(action);
returnVal = runCommand(command);
if (logger.isTraceEnabled())
logger.trace("<< %s[%s]", action, returnVal);
else
logger.debug("<< %s(%d)", action, returnVal.getExitCode());
return returnVal;
}
public ExecResponse runCommand(String command) {
ExecResponse returnVal;
logger.debug(">> running [%s] as %s@%s", command.replace(node.getAdminPassword() != null ? node
.getAdminPassword() : "XXXXX", "XXXXX"), ssh.getUsername(), ssh.getHostAddress());
returnVal = ssh.exec(command);
return returnVal;
}
@VisibleForTesting
public String execScriptAsRoot(String action) {
String command;
if (node.getCredentials().identity.equals("root")) {
command = "./" + init.getInstanceName() + " " + action;
} else if (node.getAdminPassword() != null) {
command = String.format("echo '%s'|sudo -S ./%s %s", node.getAdminPassword(), init.getInstanceName(), action);
} else {
command = "sudo ./" + init.getInstanceName() + " " + action;
}
return command;
}
protected String execScriptAsDefaultUser(String action) {
return "./" + init.getInstanceName() + " " + action;
}
public NodeMetadata getNode() {
return node;
}
@Override
public String toString() {
return Objects.toStringHelper(this).add("node", node).add("name", init.getInstanceName()).add("runAsRoot",
runAsRoot).toString();
}
public InitBuilder getStatement() {
return init;
}
}

View File

@ -31,6 +31,7 @@ import javax.inject.Named;
import javax.inject.Singleton;
import org.jclouds.collect.Memoized;
import org.jclouds.compute.callables.BlockUntilInitScriptStatusIsZeroThenReturnOutput;
import org.jclouds.compute.callables.RunScriptOnNode;
import org.jclouds.compute.callables.RunScriptOnNodeAsInitScriptUsingSsh;
import org.jclouds.compute.callables.RunScriptOnNodeAsInitScriptUsingSshAndBlockUntilComplete;
@ -53,7 +54,6 @@ import org.jclouds.location.config.LocationModule;
import org.jclouds.rest.AuthorizationException;
import org.jclouds.rest.suppliers.MemoizedRetryOnTimeOutButNotOnAuthorizationExceptionSupplier;
import org.jclouds.scriptbuilder.domain.Statement;
import org.jclouds.scriptbuilder.domain.Statements;
import org.jclouds.ssh.SshClient;
import com.google.common.base.Function;
@ -83,25 +83,26 @@ public abstract class BaseComputeServiceContextModule extends AbstractModule {
bind(new TypeLiteral<Function<TemplateOptions, Statement>>() {
}).to(TemplateOptionsToStatement.class);
install(new FactoryModuleBuilder()
.implement(RunScriptOnNode.class, Names.named("direct"), RunScriptOnNodeUsingSsh.class)
.implement(RunScriptOnNode.class, Names.named("blocking"),
RunScriptOnNodeAsInitScriptUsingSshAndBlockUntilComplete.class)
.implement(RunScriptOnNode.class, Names.named("nonblocking"), RunScriptOnNodeAsInitScriptUsingSsh.class)
.build(RunScriptOnNodeFactoryImpl.Factory.class));
install(new FactoryModuleBuilder().implement(RunScriptOnNodeUsingSsh.class, Names.named("direct"),
RunScriptOnNodeUsingSsh.class).implement(RunScriptOnNodeAsInitScriptUsingSshAndBlockUntilComplete.class,
Names.named("blocking"), RunScriptOnNodeAsInitScriptUsingSshAndBlockUntilComplete.class).implement(
RunScriptOnNodeAsInitScriptUsingSsh.class, Names.named("nonblocking"),
RunScriptOnNodeAsInitScriptUsingSsh.class).build(RunScriptOnNodeFactoryImpl.Factory.class));
install(new PersistNodeCredentialsModule());
bind(RunScriptOnNode.Factory.class).to(RunScriptOnNodeFactoryImpl.class);
install(new FactoryModuleBuilder().implement(new TypeLiteral<Callable<Void>>() {
}, CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap.class)
.implement(new TypeLiteral<Function<NodeMetadata, Void>>() {
}, CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap.class)
.build(CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap.Factory.class));
}, CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap.class).implement(
new TypeLiteral<Function<NodeMetadata, Void>>() {
}, CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap.class).build(
CustomizeNodeAndAddToGoodMapOrPutExceptionIntoBadMap.Factory.class));
install(new FactoryModuleBuilder().implement(new TypeLiteral<Callable<RunScriptOnNode>>() {
}, InitializeRunScriptOnNodeOrPlaceInBadMap.class).build(InitializeRunScriptOnNodeOrPlaceInBadMap.Factory.class));
install(new FactoryModuleBuilder().build(BlockUntilInitScriptStatusIsZeroThenReturnOutput.Factory.class));
}
@Singleton
@ -110,13 +111,14 @@ public abstract class BaseComputeServiceContextModule extends AbstractModule {
static interface Factory {
@Named("direct")
RunScriptOnNode exec(NodeMetadata node, Statement script, RunScriptOptions options);
RunScriptOnNodeUsingSsh exec(NodeMetadata node, Statement script, RunScriptOptions options);
@Named("blocking")
RunScriptOnNode backgroundAndBlockOnComplete(NodeMetadata node, Statement script, RunScriptOptions options);
RunScriptOnNodeAsInitScriptUsingSshAndBlockUntilComplete backgroundAndBlockOnComplete(NodeMetadata node,
Statement script, RunScriptOptions options);
@Named("nonblocking")
RunScriptOnNode background(NodeMetadata node, Statement script, RunScriptOptions options);
RunScriptOnNodeAsInitScriptUsingSsh background(NodeMetadata node, Statement script, RunScriptOptions options);
}
private final Factory factory;
@ -132,18 +134,18 @@ public abstract class BaseComputeServiceContextModule extends AbstractModule {
checkNotNull(runScript, "runScript");
checkNotNull(options, "options");
return !options.shouldWrapInInitScript() ? factory.exec(node, runScript, options) : (options
.shouldBlockOnComplete() ? factory.backgroundAndBlockOnComplete(node, runScript, options) : factory
.background(node, runScript, options));
.shouldBlockOnComplete() ? factory.backgroundAndBlockOnComplete(node, runScript, options) : factory
.background(node, runScript, options));
}
@Override
public RunScriptOnNode create(NodeMetadata node, String script) {
return create(node, Statements.exec(checkNotNull(script, "script")));
}
@Override
public RunScriptOnNode create(NodeMetadata node, Statement script) {
return create(node, script, RunScriptOptions.NONE);
public BlockUntilInitScriptStatusIsZeroThenReturnOutput submit(NodeMetadata node, Statement script,
RunScriptOptions options) {
checkNotNull(node, "node");
checkNotNull(script, "script");
checkNotNull(options, "options");
options.shouldWrapInInitScript();
return factory.backgroundAndBlockOnComplete(node, script, options).init().future();
}
}
@ -173,8 +175,8 @@ public abstract class BaseComputeServiceContextModule extends AbstractModule {
}
/**
* supplies how the tag is encoded into the name. A string of hex characters
* is the last argument and tag is the first
* supplies how the tag is encoded into the name. A string of hex characters is the last argument
* and tag is the first
*/
@Provides
@Named("NAMING_CONVENTION")
@ -209,14 +211,14 @@ public abstract class BaseComputeServiceContextModule extends AbstractModule {
@Singleton
@Memoized
protected Supplier<Set<? extends Image>> supplyImageCache(@Named(PROPERTY_SESSION_INTERVAL) long seconds,
final Supplier<Set<? extends Image>> imageSupplier) {
final Supplier<Set<? extends Image>> imageSupplier) {
return new MemoizedRetryOnTimeOutButNotOnAuthorizationExceptionSupplier<Set<? extends Image>>(authException,
seconds, new Supplier<Set<? extends Image>>() {
@Override
public Set<? extends Image> get() {
return imageSupplier.get();
}
});
seconds, new Supplier<Set<? extends Image>>() {
@Override
public Set<? extends Image> get() {
return imageSupplier.get();
}
});
}
@Provides
@ -243,14 +245,14 @@ public abstract class BaseComputeServiceContextModule extends AbstractModule {
@Singleton
@Memoized
protected Supplier<Set<? extends Hardware>> supplySizeCache(@Named(PROPERTY_SESSION_INTERVAL) long seconds,
final Supplier<Set<? extends Hardware>> hardwareSupplier) {
final Supplier<Set<? extends Hardware>> hardwareSupplier) {
return new MemoizedRetryOnTimeOutButNotOnAuthorizationExceptionSupplier<Set<? extends Hardware>>(authException,
seconds, new Supplier<Set<? extends Hardware>>() {
@Override
public Set<? extends Hardware> get() {
return hardwareSupplier.get();
}
});
seconds, new Supplier<Set<? extends Hardware>>() {
@Override
public Set<? extends Hardware> get() {
return hardwareSupplier.get();
}
});
}
@Provides

View File

@ -99,6 +99,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.ListenableFuture;
/**
*
@ -570,7 +571,7 @@ public class BaseComputeService implements ComputeService {
}
return goodNodes;
}
/**
* {@inheritDoc}
*/
@ -578,7 +579,7 @@ public class BaseComputeService implements ComputeService {
public ExecResponse runScriptOnNode(String id, String runScript) {
return runScriptOnNode(id, runScript, RunScriptOptions.NONE);
}
/**
* {@inheritDoc}
*/
@ -586,7 +587,7 @@ public class BaseComputeService implements ComputeService {
public ExecResponse runScriptOnNode(String id, String runScript, RunScriptOptions options) {
return runScriptOnNode(id, Statements.exec(checkNotNull(runScript, "runScript")), options);
}
/**
* {@inheritDoc}
*/
@ -594,7 +595,7 @@ public class BaseComputeService implements ComputeService {
public ExecResponse runScriptOnNode(String id, Statement runScript) {
return runScriptOnNode(id, runScript, RunScriptOptions.NONE);
}
/**
* {@inheritDoc}
*/
@ -613,6 +614,32 @@ public class BaseComputeService implements ComputeService {
return response;
}
/**
* {@inheritDoc}
*/
@Override
public ListenableFuture<ExecResponse> submitScriptOnNode(String id, final Statement runScript,
RunScriptOptions options) {
NodeMetadata node = this.getNodeMetadata(id);
if (node == null)
throw new NoSuchElementException(id);
if (node.getState() != NodeState.RUNNING)
throw new IllegalStateException("node " + id
+ " needs to be running before executing a script on it. current state: " + node.getState());
initAdminAccess.visit(runScript);
final NodeMetadata node1 = updateNodeWithCredentialsIfPresent(node, options);
ListenableFuture<ExecResponse> response = runScriptOnNodeFactory.submit(node, runScript, options);
response.addListener(new Runnable() {
@Override
public void run() {
persistNodeCredentials.ifAdminAccess(runScript).apply(node1);
}
}, executor);
return response;
}
private Iterable<? extends RunScriptOnNode> transformNodesIntoInitializedScriptRunners(
Iterable<? extends NodeMetadata> nodes, Statement script, RunScriptOptions options,
Map<NodeMetadata, Exception> badNodes) {
@ -642,7 +669,7 @@ public class BaseComputeService implements ComputeService {
}
return node;
}
private final class TransformNodesIntoInitializedScriptRunners implements
Function<NodeMetadata, Future<RunScriptOnNode>> {
private final Map<NodeMetadata, Exception> badNodes;

View File

@ -34,21 +34,20 @@ import com.google.common.base.Predicate;
* @author Adrian Cole
*/
@Singleton
public class ScriptStatusReturnsZero implements
Predicate<ScriptStatusReturnsZero.CommandUsingClient> {
public class ScriptStatusReturnsZero implements Predicate<ScriptStatusReturnsZero.CommandUsingClient> {
@Resource
protected Logger logger = Logger.NULL;
@Override
public boolean apply(CommandUsingClient commandUsingClient) {
logger.trace("looking for [%s] state on %s@%s", commandUsingClient.command,
commandUsingClient.client.getUsername(), commandUsingClient.client.getHostAddress());
logger.trace("looking for [%s] state on %s@%s", commandUsingClient.command, commandUsingClient.client
.getUsername(), commandUsingClient.client.getHostAddress());
ExecResponse response = refresh(commandUsingClient);
while (response.getExitCode() == -1)
response = refresh(commandUsingClient);
logger.trace("%s@%s: looking for exit code 0: currently: %s", commandUsingClient.client
.getUsername(), commandUsingClient.client.getHostAddress(), response.getExitCode());
logger.trace("%s@%s: looking for exit code 0: currently: %s", commandUsingClient.client.getUsername(),
commandUsingClient.client.getHostAddress(), response.getExitCode());
return 0 == response.getExitCode();
}

View File

@ -17,6 +17,7 @@
* under the License.
*/
package org.jclouds.compute;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Predicates.and;
import static com.google.common.base.Predicates.not;
@ -50,11 +51,12 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.Set;
import java.util.SortedSet;
import java.util.Map.Entry;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -99,6 +101,7 @@ import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Guice;
import com.google.inject.Module;
@ -126,7 +129,6 @@ public abstract class BaseComputeServiceLiveTest {
protected String endpoint;
protected String apiversion;
protected Properties setupProperties() {
Properties overrides = new Properties();
overrides.setProperty(Constants.PROPERTY_TRUST_ALL_CERTS, "true");
@ -251,16 +253,16 @@ public abstract class BaseComputeServiceLiveTest {
for (Entry<? extends NodeMetadata, ExecResponse> response : client.runScriptOnNodesMatching(
runningInGroup(group), Statements.exec("hostname"),
wrapInInitScript(false).runAsRoot(false).overrideCredentialsWith(good)).entrySet()){
wrapInInitScript(false).runAsRoot(false).overrideCredentialsWith(good)).entrySet()) {
checkResponseEqualsHostname(response.getValue(), response.getKey());
}
// test single-node execution
ExecResponse response = client.runScriptOnNode(node.getId(), "hostname", wrapInInitScript(false)
.runAsRoot(false));
ExecResponse response = client.runScriptOnNode(node.getId(), "hostname", wrapInInitScript(false).runAsRoot(
false));
checkResponseEqualsHostname(response, node);
OperatingSystem os = node.getOperatingSystem();
// test bad password
try {
Map<? extends NodeMetadata, ExecResponse> responses = client.runScriptOnNodesMatching(
@ -279,18 +281,52 @@ public abstract class BaseComputeServiceLiveTest {
checkNodes(nodes, group);
// test adding AdminAccess later changes the default boot user, in this case to foo
response = client.runScriptOnNode(node.getId(), AdminAccess.builder().adminUsername("foo").build(), nameTask("adminUpdate"));
response = client.runScriptOnNode(node.getId(), "echo $USER", wrapInInitScript(false)
.runAsRoot(false));
ListenableFuture<ExecResponse> future = client.submitScriptOnNode(node.getId(), AdminAccess.builder()
.adminUsername("foo").build(), nameTask("adminUpdate"));
response = future.get(3, TimeUnit.MINUTES);
assert response.getExitCode() == 0 : node.getId() + ": " + response;
weCanCancelTasks(node);
assert response.getExitCode() == 0 : node.getId() + ": " + response;
response = client.runScriptOnNode(node.getId(), "echo $USER", wrapInInitScript(false).runAsRoot(false));
assert response.getOutput().trim().equals("foo") : node.getId() + ": " + response;
} finally {
client.destroyNodesMatching(inGroup(group));
}
}
@Test(enabled = false)
public void weCanCancelTasks(NodeMetadata node) throws InterruptedException, ExecutionException {
ListenableFuture<ExecResponse> future;
future = client.submitScriptOnNode(node.getId(), Statements.exec("sleep 300"), nameTask("sleeper"));
ExecResponse response = null;
try {
response = future.get(1, TimeUnit.MILLISECONDS);
assert false : node.getId() + ": " + response;
} catch (TimeoutException e) {
assert !future.isDone();
response = client.runScriptOnNode(node.getId(), Statements.exec("./sleeper status"), wrapInInitScript(false)
.runAsRoot(false));
assert !response.getOutput().trim().equals("") : node.getId() + ": " + response;
future.cancel(true);
response = client.runScriptOnNode(node.getId(), Statements.exec("./sleeper status"), wrapInInitScript(false)
.runAsRoot(false));
assert response.getOutput().trim().equals("") : node.getId() + ": " + response;
try {
future.get();
assert false : future;
} catch (CancellationException e1) {
}
}
}
protected void checkResponseEqualsHostname(ExecResponse execResponse, NodeMetadata node1) {
assert execResponse.getOutput().trim().equals(node1.getHostname()) : node1 + ": " + execResponse;
}
@ -407,7 +443,7 @@ public abstract class BaseComputeServiceLiveTest {
protected Map<? extends NodeMetadata, ExecResponse> runScriptWithCreds(final String group, OperatingSystem os,
Credentials creds) throws RunScriptOnNodesException {
return client.runScriptOnNodesMatching(runningInGroup(group), buildScript(os), overrideCredentialsWith(creds)
.nameTask("runScriptWithCreds"));
.nameTask("runScriptWithCreds"));
}
protected void checkNodes(Iterable<? extends NodeMetadata> nodes, String group) throws IOException {
@ -461,8 +497,8 @@ public abstract class BaseComputeServiceLiveTest {
}
protected void assertNodeZero(Collection<? extends NodeMetadata> metadataSet) {
assert metadataSet.size() == 0 : format("nodes left in set: [%s] which didn't match set: [%s]",
metadataSet, nodes);
assert metadataSet.size() == 0 : format("nodes left in set: [%s] which didn't match set: [%s]", metadataSet,
nodes);
}
@Test(enabled = true, dependsOnMethods = "testGet")
@ -484,8 +520,7 @@ public abstract class BaseComputeServiceLiveTest {
public boolean apply(NodeMetadata input) {
boolean returnVal = input.getState() == NodeState.SUSPENDED;
if (!returnVal)
getAnonymousLogger().warning(
format("node %s in state %s%n", input.getId(), input.getState()));
getAnonymousLogger().warning(format("node %s in state %s%n", input.getId(), input.getState()));
return returnVal;
}
@ -533,7 +568,7 @@ public abstract class BaseComputeServiceLiveTest {
for (NodeMetadata node : filter(client.listNodesDetailsMatching(all()), inGroup(group))) {
assert node.getState() == NodeState.TERMINATED : node;
assert context.getCredentialStore().get("node#" + node.getId()) == null : "credential should have been null for "
+ "node#" + node.getId();
+ "node#" + node.getId();
}
}
@ -599,21 +634,22 @@ public abstract class BaseComputeServiceLiveTest {
try {
ImmutableMap<String, String> userMetadata = ImmutableMap.<String, String> of("Name", group);
long startSeconds = currentTimeMillis();
NodeMetadata node = getOnlyElement(client.createNodesInGroup(group, 1,
inboundPorts(22, 8080).blockOnPort(22, 300)
.userMetadata(userMetadata)));
NodeMetadata node = getOnlyElement(client.createNodesInGroup(group, 1, inboundPorts(22, 8080).blockOnPort(22,
300).userMetadata(userMetadata)));
final String nodeId = node.getId();
long createSeconds = (currentTimeMillis() - startSeconds) / 1000;
checkUserMetadataInNodeEquals(node, userMetadata);
getAnonymousLogger().info(
format("<< available node(%s) os(%s) in %ss", node.getId(), node.getOperatingSystem(), createSeconds));
getAnonymousLogger()
.info(
format("<< available node(%s) os(%s) in %ss", node.getId(), node.getOperatingSystem(),
createSeconds));
startSeconds = currentTimeMillis();
// note this is a dependency on the template resolution so we have the right process per
// operating system. moreover, we wish this to run as root, so that it can change ip
// operating system. moreover, we wish this to run as root, so that it can change ip
// tables rules and setup our admin user
client.runScriptOnNode(nodeId, installAdminUserJBossAndOpenPorts(node.getOperatingSystem()),
nameTask("configure-jboss"));
@ -621,9 +657,9 @@ public abstract class BaseComputeServiceLiveTest {
long configureSeconds = (currentTimeMillis() - startSeconds) / 1000;
getAnonymousLogger().info(
format("<< configured node(%s) with %s in %ss", nodeId,
client.runScriptOnNode(nodeId, "java -fullversion", runAsRoot(false).wrapInInitScript(false)).getOutput().trim(),
configureSeconds));
format("<< configured node(%s) with %s in %ss", nodeId, client.runScriptOnNode(nodeId,
"java -fullversion", runAsRoot(false).wrapInInitScript(false)).getOutput().trim(),
configureSeconds));
trackAvailabilityOfJBossProcessOnNode(new Supplier<ExecResponse>() {
@ -664,7 +700,7 @@ public abstract class BaseComputeServiceLiveTest {
protected void checkUserMetadataInNodeEquals(NodeMetadata node, ImmutableMap<String, String> userMetadata) {
assert node.getUserMetadata().equals(userMetadata) : String.format("node userMetadata did not match %s %s",
userMetadata, node);
userMetadata, node);
}
public void testListImages() throws Exception {

View File

@ -85,7 +85,6 @@ public class StubComputeServiceIntegrationTest extends BaseComputeServiceLiveTes
// restart of jboss
expect(socketOpen.apply(new IPSocket("144.175.1.1", 8080))).andReturn(true).times(2);
replay(socketOpen);
preciseSocketTester = socketTester = new RetryablePredicate<IPSocket>(socketOpen, 1, 1, TimeUnit.MILLISECONDS);
@ -178,22 +177,20 @@ public class StubComputeServiceIntegrationTest extends BaseComputeServiceLiveTes
client2New.connect();
try {
runScript(client2New, "adminUpdate",
Strings2.toStringAndClose(StubComputeServiceIntegrationTest.class
.getResourceAsStream("/runscript_adminUpdate.sh")), 2);
runScript(client2New, "adminUpdate", Strings2.toStringAndClose(StubComputeServiceIntegrationTest.class
.getResourceAsStream("/runscript_adminUpdate.sh")), 2);
} catch (IOException e) {
Throwables.propagate(e);
}
client2New.disconnect();
// check id
// check id
client2Foo.connect();
expect(client2Foo.getUsername()).andReturn("foo").atLeastOnce();
expect(client2Foo.getHostAddress()).andReturn("foo").atLeastOnce();
expect(client2Foo.exec("echo $USER\n")).andReturn(new ExecResponse("foo\n", "", 0));
client2Foo.disconnect();
expect(factory.create(new IPSocket("144.175.1.3", 22), new Credentials("root", "password3"))).andReturn(
client3).times(2);
expect(factory.create(new IPSocket("144.175.1.4", 22), new Credentials("root", "password4"))).andReturn(
@ -263,7 +260,7 @@ public class StubComputeServiceIntegrationTest extends BaseComputeServiceLiveTes
clientNew.connect();
expect(clientNew.exec("java -fullversion\n")).andReturn(EXEC_GOOD);
clientNew.disconnect();
clientNew.connect();
scriptName = "jboss";
clientNew.put("/tmp/init-" + scriptName, Strings2
@ -283,11 +280,11 @@ public class StubComputeServiceIntegrationTest extends BaseComputeServiceLiveTes
clientNew.connect();
expect(clientNew.exec("./" + scriptName + " stop\n")).andReturn(EXEC_GOOD);
clientNew.disconnect();
clientNew.connect();
expect(clientNew.exec("./" + scriptName + " start\n")).andReturn(EXEC_GOOD);
clientNew.disconnect();
clientNew.connect();
expect(clientNew.exec("./" + scriptName + " tail\n")).andReturn(EXEC_GOOD);
clientNew.disconnect();
@ -436,6 +433,12 @@ public class StubComputeServiceIntegrationTest extends BaseComputeServiceLiveTes
public void testAScriptExecutionAfterBootWithBasicTemplate() throws Exception {
super.testAScriptExecutionAfterBootWithBasicTemplate();
}
@Test(enabled = false)
@Override
public void weCanCancelTasks(NodeMetadata node) throws InterruptedException, ExecutionException {
// not sure how to do multithreading in a mock so that tests can work
}
@Test(enabled = true, dependsOnMethods = { "testCompareSizes" })
public void testCreateAndRunAService() throws Exception {

View File

@ -91,18 +91,18 @@ public class RetryablePredicate<T> implements Predicate<T> {
return false;
}
long nextMaxInterval(long attempt, Date end) {
protected long nextMaxInterval(long attempt, Date end) {
long interval = (period * (long) Math.pow(attempt, 1.5));
interval = interval > maxPeriod ? maxPeriod : interval;
long max = end.getTime() - System.currentTimeMillis();
return (interval > max) ? max : interval;
}
boolean before(Date end) {
protected boolean before(Date end) {
return new Date().compareTo(end) <= 1;
}
boolean atOrAfter(Date end) {
protected boolean atOrAfter(Date end) {
return new Date().compareTo(end) >= 0;
}
}