Issue 586:retry ssh on channel failures

This commit is contained in:
Adrian Cole 2011-06-01 12:00:22 -07:00
parent b1984f8600
commit ac698c33ec
1 changed files with 231 additions and 111 deletions

View File

@ -58,9 +58,7 @@ import com.google.inject.Inject;
import com.jcraft.jsch.ChannelExec; import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.ChannelSftp; import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session; import com.jcraft.jsch.Session;
import com.jcraft.jsch.SftpException;
/** /**
* This class needs refactoring. It is not thread safe. * This class needs refactoring. It is not thread safe.
@ -99,7 +97,7 @@ public class JschSshClient implements SshClient {
@Inject(optional = true) @Inject(optional = true)
@Named("jclouds.ssh.retryable_messages") @Named("jclouds.ssh.retryable_messages")
@VisibleForTesting @VisibleForTesting
String retryableMessages = "invalid data,End of IO Stream Read,Connection reset,connection is closed by foreign host,socket is not established"; String retryableMessages = "failed to send channel request,channel is not opened,invalid data,End of IO Stream Read,Connection reset,connection is closed by foreign host,socket is not established";
@Inject(optional = true) @Inject(optional = true)
@Named("jclouds.ssh.retry_predicate") @Named("jclouds.ssh.retry_predicate")
@ -128,81 +126,182 @@ public class JschSshClient implements SshClient {
this.privateKey = privateKey; this.privateKey = privateKey;
} }
public Payload get(String path) {
checkNotNull(path, "path");
ChannelSftp sftp = getSftp();
try {
return Payloads.newInputStreamPayload(new CloseFtpChannelOnCloseInputStream(sftp.get(path), sftp));
} catch (SftpException e) {
throw new SshException(String.format("%s@%s:%d: Error getting path: %s", username, host, port, path), e);
}
}
@Override
public void put(String path, Payload contents) {
checkNotNull(path, "path");
checkNotNull(contents, "contents");
ChannelSftp sftp = getSftp();
try {
sftp.put(contents.getInput(), path);
} catch (SftpException e) {
throw new SshException(String.format("%s@%s:%d: Error putting path: %s", username, host, port, path), e);
} finally {
Closeables.closeQuietly(contents);
}
}
@Override @Override
public void put(String path, String contents) { public void put(String path, String contents) {
put(path, Payloads.newStringPayload(checkNotNull(contents, "contents"))); put(path, Payloads.newStringPayload(checkNotNull(contents, "contents")));
} }
private ChannelSftp getSftp() { private void checkConnected() {
checkConnected(); checkState(session != null && session.isConnected(), String.format("(%s) Session not connected!", toString()));
logger.debug("%s@%s:%d: Opening sftp Channel.", username, host, port);
ChannelSftp sftp = null;
try {
sftp = (ChannelSftp) session.openChannel("sftp");
sftp.connect();
} catch (JSchException e) {
throw new SshException(String.format("%s@%s:%d: Error connecting to sftp.", username, host, port), e);
}
return sftp;
} }
private void checkConnected() { public static interface Connection<T> {
checkState(session != null && session.isConnected(), String.format("%s@%s:%d: SFTP not connected!", username, void clear();
host, port));
T create() throws Exception;
}
Connection<Session> sessionConnection = new Connection<Session>() {
@Override
public void clear() {
if (session != null && session.isConnected()) {
session.disconnect();
session = null;
}
}
@Override
public Session create() throws Exception {
JSch jsch = new JSch();
session = jsch.getSession(username, host, port);
if (timeout != 0)
session.setTimeout(timeout);
if (password != null) {
session.setPassword(password);
} else {
// jsch wipes out your private key
jsch.addIdentity(username, Arrays.copyOf(privateKey, privateKey.length), null, emptyPassPhrase);
}
java.util.Properties config = new java.util.Properties();
config.put("StrictHostKeyChecking", "no");
session.setConfig(config);
session.connect();
return session;
}
@Override
public String toString() {
return String.format("Session(%s)", JschSshClient.this.toString());
}
};
protected <T, C extends Connection<T>> T acquire(C connection) {
connection.clear();
Exception e = null;
String errorMessage = String.format("(%s) error acquiring %s", toString(), connection);
for (int i = 0; i < sshRetries; i++) {
try {
logger.debug(">> (%s) acquiring %s", toString(), connection);
T returnVal = connection.create();
logger.debug("<< (%s) acquired %s", toString(), returnVal);
return returnVal;
} catch (Exception from) {
e = from;
connection.clear();
if (i == sshRetries)
throw propagate(from, errorMessage);
if (shouldRetry(from)) {
logger.warn("<< " + errorMessage + ": " + from.getMessage());
backoffForAttempt(i + 1, errorMessage + ": " + from.getMessage());
continue;
}
throw propagate(from, errorMessage);
}
}
if (e != null)
throw propagate(e, errorMessage);
return null;
} }
@PostConstruct @PostConstruct
public void connect() { public void connect() {
disconnect(); acquire(sessionConnection);
Exception e = null; }
RETRY_LOOP: for (int i = 0; i < sshRetries; i++) {
Connection<ChannelSftp> sftpConnection = new Connection<ChannelSftp>() {
private ChannelSftp sftp;
@Override
public void clear() {
if (sftp != null)
sftp.disconnect();
}
@Override
public ChannelSftp create() throws Exception {
checkConnected();
sftp = (ChannelSftp) session.openChannel("sftp");
sftp.connect();
return sftp;
}
@Override
public String toString() {
return "ChannelSftp(" + JschSshClient.this.toString() + ")";
}
};
class GetConnection implements Connection<Payload> {
private final String path;
private ChannelSftp sftp;
GetConnection(String path) {
this.path = checkNotNull(path, "path");
}
@Override
public void clear() {
if (sftp != null)
sftp.disconnect();
}
@Override
public Payload create() throws Exception {
sftp = acquire(sftpConnection);
return Payloads.newInputStreamPayload(new CloseFtpChannelOnCloseInputStream(sftp.get(path), sftp));
}
@Override
public String toString() {
return "Payload(" + JschSshClient.this.toString() + ")[" + path + "]";
}
};
public Payload get(String path) {
return acquire(new GetConnection(path));
}
class PutConnection implements Connection<Void> {
private final String path;
private final Payload contents;
private ChannelSftp sftp;
PutConnection(String path, Payload contents) {
this.path = checkNotNull(path, "path");
this.contents = checkNotNull(contents, "contents");
}
@Override
public void clear() {
if (sftp != null)
sftp.disconnect();
}
@Override
public Void create() throws Exception {
sftp = acquire(sftpConnection);
try { try {
newSession(); sftp.put(contents.getInput(), path);
e = null; } finally {
break RETRY_LOOP; Closeables.closeQuietly(contents);
} catch (Exception from) { clear();
e = from; }
disconnect(); return null;
if (i == sshRetries)
throw propagate(from);
if (shouldRetry(from)) {
backoffForAttempt(i + 1, String.format("%s@%s:%d: connection error: %s", username, host, port, from
.getMessage()));
continue;
} }
throw propagate(from); @Override
public String toString() {
return "Put(" + JschSshClient.this.toString() + ")[" + path + "]";
} }
} };
if (e != null)
throw propagate(e); @Override
public void put(String path, Payload contents) {
acquire(new PutConnection(path, contents));
} }
@VisibleForTesting @VisibleForTesting
@ -223,52 +322,64 @@ public class JschSshClient implements SshClient {
backoffLimitedRetryHandler.imposeBackoffExponentialDelay(200L, 2, retryAttempt, sshRetries, message); backoffLimitedRetryHandler.imposeBackoffExponentialDelay(200L, 2, retryAttempt, sshRetries, message);
} }
private void newSession() throws JSchException { private SshException propagate(Exception e, String message) {
JSch jsch = new JSch(); message += ": " + e.getMessage();
session = null; logger.error(e, "<< " + message);
try { throw new SshException(message, e);
session = jsch.getSession(username, host, port);
if (timeout != 0)
session.setTimeout(timeout);
logger.debug("%s@%s:%d: Session created.", username, host, port);
if (password != null) {
session.setPassword(password);
} else {
// jsch wipes out your private key
jsch.addIdentity(username, Arrays.copyOf(privateKey, privateKey.length), null, emptyPassPhrase);
}
} catch (JSchException e) {
throw new SshException(String.format("%s@%s:%d: Error creating session.", username, host, port), e);
}
java.util.Properties config = new java.util.Properties();
config.put("StrictHostKeyChecking", "no");
session.setConfig(config);
session.connect();
logger.debug("%s@%s:%d: Session connected.", username, host, port);
} }
private SshException propagate(Exception e) { @Override
throw new SshException(String.format("%s@%s:%d: Error connecting to session.", username, host, port), e); public String toString() {
return String.format("%s@%s:%d", username, host, port);
} }
@PreDestroy @PreDestroy
public void disconnect() { public void disconnect() {
if (session != null && session.isConnected()) { sessionConnection.clear();
session.disconnect();
session = null;
}
} }
public ExecResponse exec(String command) { Connection<ChannelExec> execConnection = new Connection<ChannelExec>() {
private ChannelExec executor = null;
@Override
public void clear() {
if (executor != null)
executor.disconnect();
}
@Override
public ChannelExec create() throws Exception {
checkConnected(); checkConnected();
ChannelExec executor = null;
try {
try {
executor = (ChannelExec) session.openChannel("exec"); executor = (ChannelExec) session.openChannel("exec");
executor.setPty(true); executor.setPty(true);
} catch (JSchException e) { return executor;
throw new SshException(String.format("%s@%s:%d: Error connecting to exec.", username, host, port), e);
} }
@Override
public String toString() {
return "ChannelExec(" + JschSshClient.this.toString() + ")";
}
};
class ExecConnection implements Connection<ExecResponse> {
private final String command;
private ChannelExec executor;
ExecConnection(String command) {
this.command = checkNotNull(command, "command");
}
@Override
public void clear() {
if (executor != null)
executor.disconnect();
}
@Override
public ExecResponse create() throws Exception {
executor = acquire(execConnection);
executor.setCommand(command); executor.setCommand(command);
ByteArrayOutputStream error = new ByteArrayOutputStream(); ByteArrayOutputStream error = new ByteArrayOutputStream();
executor.setErrStream(error); executor.setErrStream(error);
@ -278,22 +389,31 @@ public class JschSshClient implements SshClient {
String errorString = error.toString(); String errorString = error.toString();
int errorStatus = executor.getExitStatus(); int errorStatus = executor.getExitStatus();
int i = 0; int i = 0;
while ((errorStatus = executor.getExitStatus()) == -1 && i < this.sshRetries) String message = String.format("bad status -1 %s", toString());
backoffForAttempt(++i, String.format("%s@%s:%d: bad status: -1", username, host, port)); while ((errorStatus = executor.getExitStatus()) == -1 && i < JschSshClient.this.sshRetries) {
if (errorStatus == -1) logger.warn("<< " + message);
throw new SshException(String.format("%s@%s:%d: received exit status %d executing %s", username, host, backoffForAttempt(++i, message);
port, executor.getExitStatus(), command));
return new ExecResponse(outputString, errorString, errorStatus);
} catch (Exception e) {
throw new SshException(String
.format("%s@%s:%d: Error executing command: %s", username, host, port, command), e);
} }
if (errorStatus == -1)
throw new SshException(message);
return new ExecResponse(outputString, errorString, errorStatus);
} finally { } finally {
if (executor != null) if (executor != null)
executor.disconnect(); executor.disconnect();
} }
} }
@Override
public String toString() {
return "ExecResponse(" + JschSshClient.this.toString() + ")[" + command + "]";
}
};
public ExecResponse exec(String command) {
return acquire(new ExecConnection(command));
}
@Override @Override
public String getHostAddress() { public String getHostAddress() {
return this.host; return this.host;