Fixes #8584 - HttpRequest.send() never returns
Fixed handling of the idle timeout in case the SOCKS proxy does not reply to the SOCKS bytes. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
ccb2cc0637
commit
58f0e0744b
|
@ -19,6 +19,7 @@ import java.nio.ByteBuffer;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
@ -140,12 +141,21 @@ public class Socks4Proxy extends ProxyConfiguration.Proxy
|
||||||
@Override
|
@Override
|
||||||
public void failed(Throwable x)
|
public void failed(Throwable x)
|
||||||
{
|
{
|
||||||
close();
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("SOCKS4 failure", x);
|
||||||
|
getEndPoint().close(x);
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
Promise<Connection> promise = (Promise<Connection>)context.get(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
|
Promise<Connection> promise = (Promise<Connection>)context.get(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
|
||||||
promise.failed(x);
|
promise.failed(x);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean onIdleExpired()
|
||||||
|
{
|
||||||
|
failed(new TimeoutException("Idle timeout expired"));
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFillable()
|
public void onFillable()
|
||||||
{
|
{
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
|
|
||||||
package org.eclipse.jetty.client;
|
package org.eclipse.jetty.client;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
@ -21,11 +22,15 @@ import java.nio.channels.ServerSocketChannel;
|
||||||
import java.nio.channels.SocketChannel;
|
import java.nio.channels.SocketChannel;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import javax.net.ssl.SSLContext;
|
import javax.net.ssl.SSLContext;
|
||||||
import javax.net.ssl.SSLSocket;
|
import javax.net.ssl.SSLSocket;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.client.api.Request;
|
||||||
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
|
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
|
||||||
|
import org.eclipse.jetty.client.util.FutureResponseListener;
|
||||||
import org.eclipse.jetty.http.HttpScheme;
|
import org.eclipse.jetty.http.HttpScheme;
|
||||||
import org.eclipse.jetty.io.ClientConnector;
|
import org.eclipse.jetty.io.ClientConnector;
|
||||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||||
|
@ -34,19 +39,22 @@ import org.junit.jupiter.api.AfterEach;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
public class Socks4ProxyTest
|
public class Socks4ProxyTest
|
||||||
{
|
{
|
||||||
private ServerSocketChannel server;
|
private ServerSocketChannel proxy;
|
||||||
private HttpClient client;
|
private HttpClient client;
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void prepare() throws Exception
|
public void prepare() throws Exception
|
||||||
{
|
{
|
||||||
server = ServerSocketChannel.open();
|
proxy = ServerSocketChannel.open();
|
||||||
server.bind(new InetSocketAddress("localhost", 0));
|
proxy.bind(new InetSocketAddress("localhost", 0));
|
||||||
|
|
||||||
ClientConnector connector = new ClientConnector();
|
ClientConnector connector = new ClientConnector();
|
||||||
QueuedThreadPool clientThreads = new QueuedThreadPool();
|
QueuedThreadPool clientThreads = new QueuedThreadPool();
|
||||||
|
@ -62,13 +70,13 @@ public class Socks4ProxyTest
|
||||||
public void dispose() throws Exception
|
public void dispose() throws Exception
|
||||||
{
|
{
|
||||||
client.stop();
|
client.stop();
|
||||||
server.close();
|
proxy.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSocks4Proxy() throws Exception
|
public void testSocks4Proxy() throws Exception
|
||||||
{
|
{
|
||||||
int proxyPort = server.socket().getLocalPort();
|
int proxyPort = proxy.socket().getLocalPort();
|
||||||
client.getProxyConfiguration().getProxies().add(new Socks4Proxy("localhost", proxyPort));
|
client.getProxyConfiguration().getProxies().add(new Socks4Proxy("localhost", proxyPort));
|
||||||
|
|
||||||
CountDownLatch latch = new CountDownLatch(1);
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
@ -91,7 +99,7 @@ public class Socks4ProxyTest
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
});
|
});
|
||||||
|
|
||||||
try (SocketChannel channel = server.accept())
|
try (SocketChannel channel = proxy.accept())
|
||||||
{
|
{
|
||||||
int socks4MessageLength = 9;
|
int socks4MessageLength = 9;
|
||||||
ByteBuffer buffer = ByteBuffer.allocate(socks4MessageLength);
|
ByteBuffer buffer = ByteBuffer.allocate(socks4MessageLength);
|
||||||
|
@ -130,7 +138,7 @@ public class Socks4ProxyTest
|
||||||
@Test
|
@Test
|
||||||
public void testSocks4ProxyWithSplitResponse() throws Exception
|
public void testSocks4ProxyWithSplitResponse() throws Exception
|
||||||
{
|
{
|
||||||
int proxyPort = server.socket().getLocalPort();
|
int proxyPort = proxy.socket().getLocalPort();
|
||||||
client.getProxyConfiguration().getProxies().add(new Socks4Proxy("localhost", proxyPort));
|
client.getProxyConfiguration().getProxies().add(new Socks4Proxy("localhost", proxyPort));
|
||||||
|
|
||||||
CountDownLatch latch = new CountDownLatch(1);
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
@ -150,7 +158,7 @@ public class Socks4ProxyTest
|
||||||
result.getFailure().printStackTrace();
|
result.getFailure().printStackTrace();
|
||||||
});
|
});
|
||||||
|
|
||||||
try (SocketChannel channel = server.accept())
|
try (SocketChannel channel = proxy.accept())
|
||||||
{
|
{
|
||||||
int socks4MessageLength = 9;
|
int socks4MessageLength = 9;
|
||||||
ByteBuffer buffer = ByteBuffer.allocate(socks4MessageLength);
|
ByteBuffer buffer = ByteBuffer.allocate(socks4MessageLength);
|
||||||
|
@ -189,7 +197,7 @@ public class Socks4ProxyTest
|
||||||
public void testSocks4ProxyWithTLSServer() throws Exception
|
public void testSocks4ProxyWithTLSServer() throws Exception
|
||||||
{
|
{
|
||||||
String proxyHost = "localhost";
|
String proxyHost = "localhost";
|
||||||
int proxyPort = server.socket().getLocalPort();
|
int proxyPort = proxy.socket().getLocalPort();
|
||||||
|
|
||||||
String serverHost = "127.0.0.13"; // Server host different from proxy host.
|
String serverHost = "127.0.0.13"; // Server host different from proxy host.
|
||||||
int serverPort = proxyPort + 1; // Any port will do.
|
int serverPort = proxyPort + 1; // Any port will do.
|
||||||
|
@ -221,7 +229,7 @@ public class Socks4ProxyTest
|
||||||
result.getFailure().printStackTrace();
|
result.getFailure().printStackTrace();
|
||||||
});
|
});
|
||||||
|
|
||||||
try (SocketChannel channel = server.accept())
|
try (SocketChannel channel = proxy.accept())
|
||||||
{
|
{
|
||||||
int socks4MessageLength = 9;
|
int socks4MessageLength = 9;
|
||||||
ByteBuffer buffer = ByteBuffer.allocate(socks4MessageLength);
|
ByteBuffer buffer = ByteBuffer.allocate(socks4MessageLength);
|
||||||
|
@ -269,4 +277,69 @@ public class Socks4ProxyTest
|
||||||
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRequestTimeoutWhenSocksProxyDoesNotRespond() throws Exception
|
||||||
|
{
|
||||||
|
String proxyHost = "localhost";
|
||||||
|
int proxyPort = proxy.socket().getLocalPort();
|
||||||
|
client.getProxyConfiguration().getProxies().add(new Socks4Proxy(proxyHost, proxyPort));
|
||||||
|
|
||||||
|
long timeout = 1000;
|
||||||
|
Request request = client.newRequest("localhost", proxyPort + 1)
|
||||||
|
.timeout(timeout, TimeUnit.MILLISECONDS);
|
||||||
|
FutureResponseListener listener = new FutureResponseListener(request);
|
||||||
|
request.send(listener);
|
||||||
|
|
||||||
|
try (SocketChannel ignored = proxy.accept())
|
||||||
|
{
|
||||||
|
// Accept the connection, but do not reply and don't close.
|
||||||
|
|
||||||
|
ExecutionException x = assertThrows(ExecutionException.class, () -> listener.get(2 * timeout, TimeUnit.MILLISECONDS));
|
||||||
|
assertThat(x.getCause(), instanceOf(TimeoutException.class));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIdleTimeoutWhenSocksProxyDoesNotRespond() throws Exception
|
||||||
|
{
|
||||||
|
String proxyHost = "localhost";
|
||||||
|
int proxyPort = proxy.socket().getLocalPort();
|
||||||
|
client.getProxyConfiguration().getProxies().add(new Socks4Proxy(proxyHost, proxyPort));
|
||||||
|
long idleTimeout = 1000;
|
||||||
|
client.setIdleTimeout(idleTimeout);
|
||||||
|
|
||||||
|
Request request = client.newRequest("localhost", proxyPort + 1);
|
||||||
|
FutureResponseListener listener = new FutureResponseListener(request);
|
||||||
|
request.send(listener);
|
||||||
|
|
||||||
|
try (SocketChannel ignored = proxy.accept())
|
||||||
|
{
|
||||||
|
// Accept the connection, but do not reply and don't close.
|
||||||
|
|
||||||
|
ExecutionException x = assertThrows(ExecutionException.class, () -> listener.get(2 * idleTimeout, TimeUnit.MILLISECONDS));
|
||||||
|
assertThat(x.getCause(), instanceOf(TimeoutException.class));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSocksProxyClosesConnectionImmediately() throws Exception
|
||||||
|
{
|
||||||
|
String proxyHost = "localhost";
|
||||||
|
int proxyPort = proxy.socket().getLocalPort();
|
||||||
|
client.getProxyConfiguration().getProxies().add(new Socks4Proxy(proxyHost, proxyPort));
|
||||||
|
|
||||||
|
Request request = client.newRequest("localhost", proxyPort + 1);
|
||||||
|
FutureResponseListener listener = new FutureResponseListener(request);
|
||||||
|
request.send(listener);
|
||||||
|
|
||||||
|
try (SocketChannel channel = proxy.accept())
|
||||||
|
{
|
||||||
|
// Immediately close the connection.
|
||||||
|
channel.close();
|
||||||
|
|
||||||
|
ExecutionException x = assertThrows(ExecutionException.class, () -> listener.get(5, TimeUnit.SECONDS));
|
||||||
|
assertThat(x.getCause(), instanceOf(IOException.class));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue