From ac698c33ec59feff64b85fa2ddf5162b28984e24 Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Wed, 1 Jun 2011 12:00:22 -0700 Subject: [PATCH] Issue 586:retry ssh on channel failures --- .../org/jclouds/ssh/jsch/JschSshClient.java | 342 ++++++++++++------ 1 file changed, 231 insertions(+), 111 deletions(-) diff --git a/drivers/jsch/src/main/java/org/jclouds/ssh/jsch/JschSshClient.java b/drivers/jsch/src/main/java/org/jclouds/ssh/jsch/JschSshClient.java index 4dea7af4e2..f46dec95d3 100644 --- a/drivers/jsch/src/main/java/org/jclouds/ssh/jsch/JschSshClient.java +++ b/drivers/jsch/src/main/java/org/jclouds/ssh/jsch/JschSshClient.java @@ -58,9 +58,7 @@ import com.google.inject.Inject; import com.jcraft.jsch.ChannelExec; import com.jcraft.jsch.ChannelSftp; import com.jcraft.jsch.JSch; -import com.jcraft.jsch.JSchException; import com.jcraft.jsch.Session; -import com.jcraft.jsch.SftpException; /** * This class needs refactoring. It is not thread safe. @@ -99,7 +97,7 @@ public class JschSshClient implements SshClient { @Inject(optional = true) @Named("jclouds.ssh.retryable_messages") @VisibleForTesting - String retryableMessages = "invalid data,End of IO Stream Read,Connection reset,connection is closed by foreign host,socket is not established"; + String retryableMessages = "failed to send channel request,channel is not opened,invalid data,End of IO Stream Read,Connection reset,connection is closed by foreign host,socket is not established"; @Inject(optional = true) @Named("jclouds.ssh.retry_predicate") @@ -128,81 +126,182 @@ public class JschSshClient implements SshClient { this.privateKey = privateKey; } - public Payload get(String path) { - checkNotNull(path, "path"); - - ChannelSftp sftp = getSftp(); - try { - return Payloads.newInputStreamPayload(new CloseFtpChannelOnCloseInputStream(sftp.get(path), sftp)); - } catch (SftpException e) { - throw new SshException(String.format("%s@%s:%d: Error getting path: %s", username, host, port, path), e); - } - } - - @Override - public void put(String path, Payload contents) { - checkNotNull(path, "path"); - checkNotNull(contents, "contents"); - ChannelSftp sftp = getSftp(); - try { - sftp.put(contents.getInput(), path); - } catch (SftpException e) { - throw new SshException(String.format("%s@%s:%d: Error putting path: %s", username, host, port, path), e); - } finally { - Closeables.closeQuietly(contents); - } - } - @Override public void put(String path, String contents) { put(path, Payloads.newStringPayload(checkNotNull(contents, "contents"))); } - private ChannelSftp getSftp() { - checkConnected(); - logger.debug("%s@%s:%d: Opening sftp Channel.", username, host, port); - ChannelSftp sftp = null; - try { - sftp = (ChannelSftp) session.openChannel("sftp"); - sftp.connect(); - } catch (JSchException e) { - throw new SshException(String.format("%s@%s:%d: Error connecting to sftp.", username, host, port), e); - } - return sftp; + private void checkConnected() { + checkState(session != null && session.isConnected(), String.format("(%s) Session not connected!", toString())); } - private void checkConnected() { - checkState(session != null && session.isConnected(), String.format("%s@%s:%d: SFTP not connected!", username, - host, port)); + public static interface Connection { + void clear(); + + T create() throws Exception; + } + + Connection sessionConnection = new Connection() { + + @Override + public void clear() { + if (session != null && session.isConnected()) { + session.disconnect(); + session = null; + } + } + + @Override + public Session create() throws Exception { + JSch jsch = new JSch(); + session = jsch.getSession(username, host, port); + if (timeout != 0) + session.setTimeout(timeout); + if (password != null) { + session.setPassword(password); + } else { + // jsch wipes out your private key + jsch.addIdentity(username, Arrays.copyOf(privateKey, privateKey.length), null, emptyPassPhrase); + } + java.util.Properties config = new java.util.Properties(); + config.put("StrictHostKeyChecking", "no"); + session.setConfig(config); + session.connect(); + return session; + } + + @Override + public String toString() { + return String.format("Session(%s)", JschSshClient.this.toString()); + } + }; + + protected > T acquire(C connection) { + connection.clear(); + Exception e = null; + String errorMessage = String.format("(%s) error acquiring %s", toString(), connection); + for (int i = 0; i < sshRetries; i++) { + try { + logger.debug(">> (%s) acquiring %s", toString(), connection); + T returnVal = connection.create(); + logger.debug("<< (%s) acquired %s", toString(), returnVal); + return returnVal; + } catch (Exception from) { + e = from; + connection.clear(); + + if (i == sshRetries) + throw propagate(from, errorMessage); + + if (shouldRetry(from)) { + logger.warn("<< " + errorMessage + ": " + from.getMessage()); + backoffForAttempt(i + 1, errorMessage + ": " + from.getMessage()); + continue; + } + throw propagate(from, errorMessage); + } + } + if (e != null) + throw propagate(e, errorMessage); + return null; } @PostConstruct public void connect() { - disconnect(); - Exception e = null; - RETRY_LOOP: for (int i = 0; i < sshRetries; i++) { - try { - newSession(); - e = null; - break RETRY_LOOP; - } catch (Exception from) { - e = from; - disconnect(); + acquire(sessionConnection); + } - if (i == sshRetries) - throw propagate(from); + Connection sftpConnection = new Connection() { - if (shouldRetry(from)) { - backoffForAttempt(i + 1, String.format("%s@%s:%d: connection error: %s", username, host, port, from - .getMessage())); - continue; - } + private ChannelSftp sftp; - throw propagate(from); - } + @Override + public void clear() { + if (sftp != null) + sftp.disconnect(); } - if (e != null) - throw propagate(e); + + @Override + public ChannelSftp create() throws Exception { + checkConnected(); + sftp = (ChannelSftp) session.openChannel("sftp"); + sftp.connect(); + return sftp; + } + + @Override + public String toString() { + return "ChannelSftp(" + JschSshClient.this.toString() + ")"; + } + }; + + class GetConnection implements Connection { + private final String path; + private ChannelSftp sftp; + + GetConnection(String path) { + this.path = checkNotNull(path, "path"); + } + + @Override + public void clear() { + if (sftp != null) + sftp.disconnect(); + } + + @Override + public Payload create() throws Exception { + sftp = acquire(sftpConnection); + return Payloads.newInputStreamPayload(new CloseFtpChannelOnCloseInputStream(sftp.get(path), sftp)); + } + + @Override + public String toString() { + return "Payload(" + JschSshClient.this.toString() + ")[" + path + "]"; + } + }; + + public Payload get(String path) { + return acquire(new GetConnection(path)); + } + + class PutConnection implements Connection { + private final String path; + private final Payload contents; + private ChannelSftp sftp; + + PutConnection(String path, Payload contents) { + this.path = checkNotNull(path, "path"); + this.contents = checkNotNull(contents, "contents"); + } + + @Override + public void clear() { + if (sftp != null) + sftp.disconnect(); + } + + @Override + public Void create() throws Exception { + sftp = acquire(sftpConnection); + try { + sftp.put(contents.getInput(), path); + } finally { + Closeables.closeQuietly(contents); + clear(); + } + return null; + } + + @Override + public String toString() { + return "Put(" + JschSshClient.this.toString() + ")[" + path + "]"; + } + }; + + @Override + public void put(String path, Payload contents) { + acquire(new PutConnection(path, contents)); } @VisibleForTesting @@ -223,52 +322,64 @@ public class JschSshClient implements SshClient { backoffLimitedRetryHandler.imposeBackoffExponentialDelay(200L, 2, retryAttempt, sshRetries, message); } - private void newSession() throws JSchException { - JSch jsch = new JSch(); - session = null; - try { - session = jsch.getSession(username, host, port); - if (timeout != 0) - session.setTimeout(timeout); - logger.debug("%s@%s:%d: Session created.", username, host, port); - if (password != null) { - session.setPassword(password); - } else { - // jsch wipes out your private key - jsch.addIdentity(username, Arrays.copyOf(privateKey, privateKey.length), null, emptyPassPhrase); - } - } catch (JSchException e) { - throw new SshException(String.format("%s@%s:%d: Error creating session.", username, host, port), e); - } - java.util.Properties config = new java.util.Properties(); - config.put("StrictHostKeyChecking", "no"); - session.setConfig(config); - session.connect(); - logger.debug("%s@%s:%d: Session connected.", username, host, port); + private SshException propagate(Exception e, String message) { + message += ": " + e.getMessage(); + logger.error(e, "<< " + message); + throw new SshException(message, e); } - private SshException propagate(Exception e) { - throw new SshException(String.format("%s@%s:%d: Error connecting to session.", username, host, port), e); + @Override + public String toString() { + return String.format("%s@%s:%d", username, host, port); } @PreDestroy public void disconnect() { - if (session != null && session.isConnected()) { - session.disconnect(); - session = null; - } + sessionConnection.clear(); } - public ExecResponse exec(String command) { - checkConnected(); - ChannelExec executor = null; - try { - try { - executor = (ChannelExec) session.openChannel("exec"); - executor.setPty(true); - } catch (JSchException e) { - throw new SshException(String.format("%s@%s:%d: Error connecting to exec.", username, host, port), e); - } + Connection execConnection = new Connection() { + + private ChannelExec executor = null; + + @Override + public void clear() { + if (executor != null) + executor.disconnect(); + } + + @Override + public ChannelExec create() throws Exception { + checkConnected(); + executor = (ChannelExec) session.openChannel("exec"); + executor.setPty(true); + return executor; + } + + @Override + public String toString() { + return "ChannelExec(" + JschSshClient.this.toString() + ")"; + } + + }; + + class ExecConnection implements Connection { + private final String command; + private ChannelExec executor; + + ExecConnection(String command) { + this.command = checkNotNull(command, "command"); + } + + @Override + public void clear() { + if (executor != null) + executor.disconnect(); + } + + @Override + public ExecResponse create() throws Exception { + executor = acquire(execConnection); executor.setCommand(command); ByteArrayOutputStream error = new ByteArrayOutputStream(); executor.setErrStream(error); @@ -278,20 +389,29 @@ public class JschSshClient implements SshClient { String errorString = error.toString(); int errorStatus = executor.getExitStatus(); int i = 0; - while ((errorStatus = executor.getExitStatus()) == -1 && i < this.sshRetries) - backoffForAttempt(++i, String.format("%s@%s:%d: bad status: -1", username, host, port)); + String message = String.format("bad status -1 %s", toString()); + while ((errorStatus = executor.getExitStatus()) == -1 && i < JschSshClient.this.sshRetries) { + logger.warn("<< " + message); + backoffForAttempt(++i, message); + } if (errorStatus == -1) - throw new SshException(String.format("%s@%s:%d: received exit status %d executing %s", username, host, - port, executor.getExitStatus(), command)); + throw new SshException(message); return new ExecResponse(outputString, errorString, errorStatus); - } catch (Exception e) { - throw new SshException(String - .format("%s@%s:%d: Error executing command: %s", username, host, port, command), e); + } finally { + if (executor != null) + executor.disconnect(); } - } finally { - if (executor != null) - executor.disconnect(); } + + @Override + public String toString() { + return "ExecResponse(" + JschSshClient.this.toString() + ")[" + command + "]"; + } + + }; + + public ExecResponse exec(String command) { + return acquire(new ExecConnection(command)); } @Override