From 8c36f2c0d8e89252a150fe081216792c426d7d7a Mon Sep 17 00:00:00 2001 From: Aled Sage Date: Wed, 11 Jul 2012 14:49:26 +0100 Subject: [PATCH] Issue #1016: fix jsch hanging --- .../org/jclouds/ssh/jsch/JschSshClient.java | 55 +++++++--- .../ssh/jsch/JschSshClientLiveTest.java | 101 +++++++++++++++++- 2 files changed, 135 insertions(+), 21 deletions(-) diff --git a/drivers/jsch/src/main/java/org/jclouds/ssh/jsch/JschSshClient.java b/drivers/jsch/src/main/java/org/jclouds/ssh/jsch/JschSshClient.java index 85eb3e8d00..7d5418ef56 100644 --- a/drivers/jsch/src/main/java/org/jclouds/ssh/jsch/JschSshClient.java +++ b/drivers/jsch/src/main/java/org/jclouds/ssh/jsch/JschSshClient.java @@ -33,6 +33,7 @@ import static org.jclouds.crypto.SshKeys.sha1PrivateKey; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.net.ConnectException; import javax.annotation.PreDestroy; @@ -158,6 +159,11 @@ public class JschSshClient implements SshClient { T create() throws Exception; } + public static interface ConnectionWithStreams extends Connection { + InputStream getInputStream(); + InputStream getErrStream(); + } + protected > T acquire(C connection) { connection.clear(); String errorMessage = String.format("(%s) error acquiring %s", toString(), connection); @@ -334,14 +340,20 @@ public class JschSshClient implements SshClient { sessionConnection.clear(); } - protected Connection execConnection(final String command) { + protected ConnectionWithStreams execConnection(final String command) { checkNotNull(command, "command"); - return new Connection() { + return new ConnectionWithStreams() { private ChannelExec executor = null; - + private InputStream inputStream; + private InputStream errStream; + @Override public void clear() { + if (inputStream != null) + Closeables.closeQuietly(inputStream); + if (errStream != null) + Closeables.closeQuietly(errStream); if (executor != null) executor.disconnect(); } @@ -353,12 +365,23 @@ public class JschSshClient implements SshClient { executor = (ChannelExec) sessionConnection.getSession().openChannel(channel); executor.setPty(true); executor.setCommand(command); - ByteArrayOutputStream error = new ByteArrayOutputStream(); - executor.setErrStream(error); + inputStream = executor.getInputStream(); + errStream = executor.getErrStream(); executor.connect(); + return executor; } + @Override + public InputStream getInputStream() { + return inputStream; + } + + @Override + public InputStream getErrStream() { + return errStream; + } + @Override public String toString() { return "ChannelExec()"; @@ -384,8 +407,10 @@ public class JschSshClient implements SshClient { @Override public ExecResponse create() throws Exception { try { - executor = acquire(execConnection(command)); - String outputString = Strings2.toStringAndClose(executor.getInputStream()); + ConnectionWithStreams connection = execConnection(command); + executor = acquire(connection); + String outputString = Strings2.toStringAndClose(connection.getInputStream()); + String errorString = Strings2.toStringAndClose(connection.getErrStream()); int errorStatus = executor.getExitStatus(); int i = 0; String message = String.format("bad status -1 %s", toString()); @@ -395,12 +420,6 @@ public class JschSshClient implements SshClient { } if (errorStatus == -1) throw new SshException(message); - // be careful as this can hang reading - // com.jcraft.jsch.Channel$MyPipedInputStream when there's a slow - // network connection - // String errorString = - // Strings2.toStringAndClose(executor.getErrStream()); - String errorString = ""; return new ExecResponse(outputString, errorString, errorStatus); } finally { clear(); @@ -431,7 +450,7 @@ public class JschSshClient implements SshClient { private final String command; private ChannelExec executor = null; private Session sessionConnection; - + ExecChannelConnection(String command) { this.command = checkNotNull(command, "command"); } @@ -451,10 +470,12 @@ public class JschSshClient implements SshClient { String channel = "exec"; executor = (ChannelExec) sessionConnection.openChannel(channel); executor.setCommand(command); - ByteArrayOutputStream error = new ByteArrayOutputStream(); - executor.setErrStream(error); + executor.setErrStream(new ByteArrayOutputStream()); + InputStream inputStream = executor.getInputStream(); + InputStream errStream = executor.getErrStream(); + OutputStream outStream = executor.getOutputStream(); executor.connect(); - return new ExecChannel(executor.getOutputStream(), executor.getInputStream(), executor.getErrStream(), + return new ExecChannel(outStream, inputStream, errStream, new Supplier() { @Override diff --git a/drivers/jsch/src/test/java/org/jclouds/ssh/jsch/JschSshClientLiveTest.java b/drivers/jsch/src/test/java/org/jclouds/ssh/jsch/JschSshClientLiveTest.java index f2a098b826..0019055f41 100644 --- a/drivers/jsch/src/test/java/org/jclouds/ssh/jsch/JschSshClientLiveTest.java +++ b/drivers/jsch/src/test/java/org/jclouds/ssh/jsch/JschSshClientLiveTest.java @@ -19,6 +19,8 @@ package org.jclouds.ssh.jsch; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertTrue; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -29,6 +31,12 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintStream; import java.net.InetAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.jclouds.compute.domain.ExecChannel; import org.jclouds.compute.domain.ExecResponse; @@ -46,6 +54,10 @@ import com.google.common.base.Strings; import com.google.common.base.Suppliers; import com.google.common.io.Closeables; import com.google.common.net.HostAndPort; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Guice; import com.google.inject.Injector; @@ -147,6 +159,7 @@ public class JschSshClientLiveTest { } } + @Test public void testPutAndGet() throws IOException { temp = File.createTempFile("foo", "bar"); temp.deleteOnExit(); @@ -157,19 +170,99 @@ public class JschSshClientLiveTest { assertEquals(contents, "rabbit"); } + @Test public void testGetEtcPassword() throws IOException { Payload input = setupClient().get("/etc/passwd"); String contents = Strings2.toStringAndClose(input.getInput()); assert contents.indexOf("root") >= 0 : "no root in " + contents; } + @Test public void testExecHostname() throws IOException { - ExecResponse response = setupClient().exec("hostname"); - assertEquals(response.getError(), ""); - assertEquals(response.getOutput().trim(), "localhost".equals(sshHost) ? InetAddress.getLocalHost().getHostName() - : sshHost); + SshClient client = setupClient(); + try { + ExecResponse response = client.exec("hostname"); + assertEquals(response.getExitStatus(), 0); + assertEquals(response.getError(), ""); + assertEquals(response.getOutput().trim(), "localhost".equals(sshHost) ? InetAddress.getLocalHost().getHostName() + : sshHost); + } finally { + client.disconnect(); + } } + @Test + public void testExecInvalidCommand() throws IOException { + SshClient client = setupClient(); + try { + ExecResponse response = client.exec("thisCommandDoesNotExist"); + assertNotEquals(response.getExitStatus(), 0); + assertTrue(response.getOutput().contains("not found") || response.getError().contains("not found"), + "stdout="+response.getOutput()+"; stderr="+response.getError()); + } finally { + client.disconnect(); + } + } + + // Added for issue #1016. + @Test(invocationCount=100) + public void testExecHostnameRepeatedlyWithDifferentSessions() throws Exception { + testExecHostname(); + } + + // Added for issue #1016. + @Test + public void testExecHostnameRepeatedlyWithSameSessions() throws Exception { + final SshClient client = setupClient(); + + try { + for (int i = 0; i < 100; i++) { + ExecResponse response = client.exec("hostname"); + assertEquals(response.getError(), ""); + assertEquals(response.getOutput().trim(), "localhost".equals(sshHost) ? InetAddress.getLocalHost().getHostName() + : sshHost); + //System.out.println("completed (sequentially) "+i); + } + } finally { + client.disconnect(); + } + } + + // Added for issue #1016. + // Note that some commands fail the first few attempt, but with default retries at 5 they do pass (for me locally). + // The error is "JSchException: channel is not opened". + // With the thread-pool size at 100, you get failures a lot more often. + @Test + public void testExecHostnameConcurrentlyWithSameSessions() throws Exception { + final SshClient client = setupClient(); + ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); + List> futures = new ArrayList>(); + try { + final AtomicInteger count = new AtomicInteger(); + for (int i = 0; i < 100; i++) { + futures.add(executor.submit(new Callable() { + @Override + public ExecResponse call() { + ExecResponse response = client.exec("hostname"); + //System.out.println("completed (concurrently) "+count.incrementAndGet()); + return response; + + } + })); + } + List responses = Futures.allAsList(futures).get(3000, TimeUnit.SECONDS); + for (ExecResponse response : responses) { + assertEquals(response.getError(), ""); + assertEquals(response.getOutput().trim(), "localhost".equals(sshHost) ? InetAddress.getLocalHost().getHostName() + : sshHost); + } + } finally { + executor.shutdownNow(); + client.disconnect(); + } + } + + @Test public void testExecChannelTakesStdinAndNoEchoOfCharsInOuputAndOutlivesClient() throws IOException { SshClient client = setupClient(); ExecChannel response = client.execChannel("cat <