mirror of https://github.com/apache/jclouds.git
Issue #1016: fix jsch hanging
This commit is contained in:
parent
6cdad28359
commit
8c36f2c0d8
|
@ -33,6 +33,7 @@ import static org.jclouds.crypto.SshKeys.sha1PrivateKey;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
import java.net.ConnectException;
|
import java.net.ConnectException;
|
||||||
|
|
||||||
import javax.annotation.PreDestroy;
|
import javax.annotation.PreDestroy;
|
||||||
|
@ -158,6 +159,11 @@ public class JschSshClient implements SshClient {
|
||||||
T create() throws Exception;
|
T create() throws Exception;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static interface ConnectionWithStreams<T> extends Connection<T> {
|
||||||
|
InputStream getInputStream();
|
||||||
|
InputStream getErrStream();
|
||||||
|
}
|
||||||
|
|
||||||
protected <T, C extends Connection<T>> T acquire(C connection) {
|
protected <T, C extends Connection<T>> T acquire(C connection) {
|
||||||
connection.clear();
|
connection.clear();
|
||||||
String errorMessage = String.format("(%s) error acquiring %s", toString(), connection);
|
String errorMessage = String.format("(%s) error acquiring %s", toString(), connection);
|
||||||
|
@ -334,14 +340,20 @@ public class JschSshClient implements SshClient {
|
||||||
sessionConnection.clear();
|
sessionConnection.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Connection<ChannelExec> execConnection(final String command) {
|
protected ConnectionWithStreams<ChannelExec> execConnection(final String command) {
|
||||||
checkNotNull(command, "command");
|
checkNotNull(command, "command");
|
||||||
return new Connection<ChannelExec>() {
|
return new ConnectionWithStreams<ChannelExec>() {
|
||||||
|
|
||||||
private ChannelExec executor = null;
|
private ChannelExec executor = null;
|
||||||
|
private InputStream inputStream;
|
||||||
|
private InputStream errStream;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void clear() {
|
public void clear() {
|
||||||
|
if (inputStream != null)
|
||||||
|
Closeables.closeQuietly(inputStream);
|
||||||
|
if (errStream != null)
|
||||||
|
Closeables.closeQuietly(errStream);
|
||||||
if (executor != null)
|
if (executor != null)
|
||||||
executor.disconnect();
|
executor.disconnect();
|
||||||
}
|
}
|
||||||
|
@ -353,12 +365,23 @@ public class JschSshClient implements SshClient {
|
||||||
executor = (ChannelExec) sessionConnection.getSession().openChannel(channel);
|
executor = (ChannelExec) sessionConnection.getSession().openChannel(channel);
|
||||||
executor.setPty(true);
|
executor.setPty(true);
|
||||||
executor.setCommand(command);
|
executor.setCommand(command);
|
||||||
ByteArrayOutputStream error = new ByteArrayOutputStream();
|
inputStream = executor.getInputStream();
|
||||||
executor.setErrStream(error);
|
errStream = executor.getErrStream();
|
||||||
executor.connect();
|
executor.connect();
|
||||||
|
|
||||||
return executor;
|
return executor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InputStream getInputStream() {
|
||||||
|
return inputStream;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InputStream getErrStream() {
|
||||||
|
return errStream;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "ChannelExec()";
|
return "ChannelExec()";
|
||||||
|
@ -384,8 +407,10 @@ public class JschSshClient implements SshClient {
|
||||||
@Override
|
@Override
|
||||||
public ExecResponse create() throws Exception {
|
public ExecResponse create() throws Exception {
|
||||||
try {
|
try {
|
||||||
executor = acquire(execConnection(command));
|
ConnectionWithStreams<ChannelExec> connection = execConnection(command);
|
||||||
String outputString = Strings2.toStringAndClose(executor.getInputStream());
|
executor = acquire(connection);
|
||||||
|
String outputString = Strings2.toStringAndClose(connection.getInputStream());
|
||||||
|
String errorString = Strings2.toStringAndClose(connection.getErrStream());
|
||||||
int errorStatus = executor.getExitStatus();
|
int errorStatus = executor.getExitStatus();
|
||||||
int i = 0;
|
int i = 0;
|
||||||
String message = String.format("bad status -1 %s", toString());
|
String message = String.format("bad status -1 %s", toString());
|
||||||
|
@ -395,12 +420,6 @@ public class JschSshClient implements SshClient {
|
||||||
}
|
}
|
||||||
if (errorStatus == -1)
|
if (errorStatus == -1)
|
||||||
throw new SshException(message);
|
throw new SshException(message);
|
||||||
// be careful as this can hang reading
|
|
||||||
// com.jcraft.jsch.Channel$MyPipedInputStream when there's a slow
|
|
||||||
// network connection
|
|
||||||
// String errorString =
|
|
||||||
// Strings2.toStringAndClose(executor.getErrStream());
|
|
||||||
String errorString = "";
|
|
||||||
return new ExecResponse(outputString, errorString, errorStatus);
|
return new ExecResponse(outputString, errorString, errorStatus);
|
||||||
} finally {
|
} finally {
|
||||||
clear();
|
clear();
|
||||||
|
@ -451,10 +470,12 @@ public class JschSshClient implements SshClient {
|
||||||
String channel = "exec";
|
String channel = "exec";
|
||||||
executor = (ChannelExec) sessionConnection.openChannel(channel);
|
executor = (ChannelExec) sessionConnection.openChannel(channel);
|
||||||
executor.setCommand(command);
|
executor.setCommand(command);
|
||||||
ByteArrayOutputStream error = new ByteArrayOutputStream();
|
executor.setErrStream(new ByteArrayOutputStream());
|
||||||
executor.setErrStream(error);
|
InputStream inputStream = executor.getInputStream();
|
||||||
|
InputStream errStream = executor.getErrStream();
|
||||||
|
OutputStream outStream = executor.getOutputStream();
|
||||||
executor.connect();
|
executor.connect();
|
||||||
return new ExecChannel(executor.getOutputStream(), executor.getInputStream(), executor.getErrStream(),
|
return new ExecChannel(outStream, inputStream, errStream,
|
||||||
new Supplier<Integer>() {
|
new Supplier<Integer>() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -19,6 +19,8 @@
|
||||||
package org.jclouds.ssh.jsch;
|
package org.jclouds.ssh.jsch;
|
||||||
|
|
||||||
import static org.testng.Assert.assertEquals;
|
import static org.testng.Assert.assertEquals;
|
||||||
|
import static org.testng.Assert.assertNotEquals;
|
||||||
|
import static org.testng.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
|
@ -29,6 +31,12 @@ import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintStream;
|
import java.io.PrintStream;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.jclouds.compute.domain.ExecChannel;
|
import org.jclouds.compute.domain.ExecChannel;
|
||||||
import org.jclouds.compute.domain.ExecResponse;
|
import org.jclouds.compute.domain.ExecResponse;
|
||||||
|
@ -46,6 +54,10 @@ import com.google.common.base.Strings;
|
||||||
import com.google.common.base.Suppliers;
|
import com.google.common.base.Suppliers;
|
||||||
import com.google.common.io.Closeables;
|
import com.google.common.io.Closeables;
|
||||||
import com.google.common.net.HostAndPort;
|
import com.google.common.net.HostAndPort;
|
||||||
|
import com.google.common.util.concurrent.Futures;
|
||||||
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||||
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import com.google.inject.Guice;
|
import com.google.inject.Guice;
|
||||||
import com.google.inject.Injector;
|
import com.google.inject.Injector;
|
||||||
|
|
||||||
|
@ -147,6 +159,7 @@ public class JschSshClientLiveTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testPutAndGet() throws IOException {
|
public void testPutAndGet() throws IOException {
|
||||||
temp = File.createTempFile("foo", "bar");
|
temp = File.createTempFile("foo", "bar");
|
||||||
temp.deleteOnExit();
|
temp.deleteOnExit();
|
||||||
|
@ -157,19 +170,99 @@ public class JschSshClientLiveTest {
|
||||||
assertEquals(contents, "rabbit");
|
assertEquals(contents, "rabbit");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testGetEtcPassword() throws IOException {
|
public void testGetEtcPassword() throws IOException {
|
||||||
Payload input = setupClient().get("/etc/passwd");
|
Payload input = setupClient().get("/etc/passwd");
|
||||||
String contents = Strings2.toStringAndClose(input.getInput());
|
String contents = Strings2.toStringAndClose(input.getInput());
|
||||||
assert contents.indexOf("root") >= 0 : "no root in " + contents;
|
assert contents.indexOf("root") >= 0 : "no root in " + contents;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testExecHostname() throws IOException {
|
public void testExecHostname() throws IOException {
|
||||||
ExecResponse response = setupClient().exec("hostname");
|
SshClient client = setupClient();
|
||||||
|
try {
|
||||||
|
ExecResponse response = client.exec("hostname");
|
||||||
|
assertEquals(response.getExitStatus(), 0);
|
||||||
|
assertEquals(response.getError(), "");
|
||||||
|
assertEquals(response.getOutput().trim(), "localhost".equals(sshHost) ? InetAddress.getLocalHost().getHostName()
|
||||||
|
: sshHost);
|
||||||
|
} finally {
|
||||||
|
client.disconnect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExecInvalidCommand() throws IOException {
|
||||||
|
SshClient client = setupClient();
|
||||||
|
try {
|
||||||
|
ExecResponse response = client.exec("thisCommandDoesNotExist");
|
||||||
|
assertNotEquals(response.getExitStatus(), 0);
|
||||||
|
assertTrue(response.getOutput().contains("not found") || response.getError().contains("not found"),
|
||||||
|
"stdout="+response.getOutput()+"; stderr="+response.getError());
|
||||||
|
} finally {
|
||||||
|
client.disconnect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Added for issue #1016.
|
||||||
|
@Test(invocationCount=100)
|
||||||
|
public void testExecHostnameRepeatedlyWithDifferentSessions() throws Exception {
|
||||||
|
testExecHostname();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Added for issue #1016.
|
||||||
|
@Test
|
||||||
|
public void testExecHostnameRepeatedlyWithSameSessions() throws Exception {
|
||||||
|
final SshClient client = setupClient();
|
||||||
|
|
||||||
|
try {
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
ExecResponse response = client.exec("hostname");
|
||||||
|
assertEquals(response.getError(), "");
|
||||||
|
assertEquals(response.getOutput().trim(), "localhost".equals(sshHost) ? InetAddress.getLocalHost().getHostName()
|
||||||
|
: sshHost);
|
||||||
|
//System.out.println("completed (sequentially) "+i);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
client.disconnect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Added for issue #1016.
|
||||||
|
// Note that some commands fail the first few attempt, but with default retries at 5 they do pass (for me locally).
|
||||||
|
// The error is "JSchException: channel is not opened".
|
||||||
|
// With the thread-pool size at 100, you get failures a lot more often.
|
||||||
|
@Test
|
||||||
|
public void testExecHostnameConcurrentlyWithSameSessions() throws Exception {
|
||||||
|
final SshClient client = setupClient();
|
||||||
|
ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
|
||||||
|
List<ListenableFuture<ExecResponse>> futures = new ArrayList<ListenableFuture<ExecResponse>>();
|
||||||
|
try {
|
||||||
|
final AtomicInteger count = new AtomicInteger();
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
futures.add(executor.submit(new Callable<ExecResponse>() {
|
||||||
|
@Override
|
||||||
|
public ExecResponse call() {
|
||||||
|
ExecResponse response = client.exec("hostname");
|
||||||
|
//System.out.println("completed (concurrently) "+count.incrementAndGet());
|
||||||
|
return response;
|
||||||
|
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
List<ExecResponse> responses = Futures.allAsList(futures).get(3000, TimeUnit.SECONDS);
|
||||||
|
for (ExecResponse response : responses) {
|
||||||
assertEquals(response.getError(), "");
|
assertEquals(response.getError(), "");
|
||||||
assertEquals(response.getOutput().trim(), "localhost".equals(sshHost) ? InetAddress.getLocalHost().getHostName()
|
assertEquals(response.getOutput().trim(), "localhost".equals(sshHost) ? InetAddress.getLocalHost().getHostName()
|
||||||
: sshHost);
|
: sshHost);
|
||||||
}
|
}
|
||||||
|
} finally {
|
||||||
|
executor.shutdownNow();
|
||||||
|
client.disconnect();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testExecChannelTakesStdinAndNoEchoOfCharsInOuputAndOutlivesClient() throws IOException {
|
public void testExecChannelTakesStdinAndNoEchoOfCharsInOuputAndOutlivesClient() throws IOException {
|
||||||
SshClient client = setupClient();
|
SshClient client = setupClient();
|
||||||
ExecChannel response = client.execChannel("cat <<EOF");
|
ExecChannel response = client.execChannel("cat <<EOF");
|
||||||
|
|
Loading…
Reference in New Issue