Merge branch '360665'

This commit is contained in:
Simone Bordet 2011-11-04 15:54:26 +01:00
commit f21752dbe3
6 changed files with 447 additions and 139 deletions

View File

@ -357,34 +357,27 @@ public class HttpConnection extends AbstractConnection implements Dumpable
complete = true;
}
}
// if the endpoint is closed, but the parser incomplete
if (!_endp.isOpen() && !(_parser.isComplete()||_parser.isIdle()))
{
// we wont be called again so let the parser see the close
complete=true;
_parser.parseAvailable();
// TODO should not need this
if (!(_parser.isComplete()||_parser.isIdle()))
{
LOG.warn("Incomplete {} {}",_parser,_endp);
if (_exchange!=null && !_exchange.isDone())
{
_exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
_exchange.getEventListener().onException(new EOFException("Incomplete"));
}
}
}
}
if (_endp.isInputShutdown() && !_parser.isComplete() && !_parser.isIdle())
// if the endpoint is closed, but the parser incomplete
if ((!_endp.isOpen()||_endp.isInputShutdown()) && !(_parser.isComplete()||_parser.isIdle()))
{
if (_exchange!=null && !_exchange.isDone())
// we wont be called again so let the parser see the close
complete=true;
_parser.parseAvailable();
// parsing again may have changed the parser state, so check again
if (!(_parser.isComplete()||_parser.isIdle()))
{
_exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
_exchange.getEventListener().onException(new EOFException("Incomplete"));
LOG.warn("Incomplete {} {}",_parser,_endp);
HttpExchange exchange = _exchange;
if (exchange!=null && !exchange.isDone())
{
exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
exchange.getEventListener().onException(new EOFException("Incomplete"));
}
_endp.close();
}
_endp.close();
}
if (complete || failed)

View File

@ -15,7 +15,7 @@ package org.eclipse.jetty.client;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.ConnectException;
import java.net.ProtocolException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@ -65,7 +65,7 @@ public class HttpDestination implements Dumpable
private List<HttpCookie> _cookies;
HttpDestination(HttpClient client, Address address, boolean ssl)
{
_client = client;
@ -524,7 +524,7 @@ public class HttpDestination implements Dumpable
// Add any known authorizations
if (_authorizations != null)
{
Authentication auth = (Authentication)_authorizations.match(ex.getURI());
Authentication auth = (Authentication)_authorizations.match(ex.getRequestURI());
if (auth != null)
(auth).setCredentials(ex);
}
@ -665,7 +665,7 @@ public class HttpDestination implements Dumpable
AggregateLifeCycle.dump(out,indent,_connections);
}
}
private class ConnectExchange extends ContentExchange
{
private final SelectConnector.ProxySelectChannelEndPoint proxyEndPoint;
@ -687,13 +687,18 @@ public class HttpDestination implements Dumpable
@Override
protected void onResponseComplete() throws IOException
{
if (getResponseStatus() == HttpStatus.OK_200)
int responseStatus = getResponseStatus();
if (responseStatus == HttpStatus.OK_200)
{
proxyEndPoint.upgrade();
}
else if(responseStatus == HttpStatus.GATEWAY_TIMEOUT_504)
{
onExpire();
}
else
{
onConnectionFailed(new ConnectException(exchange.getAddress().toString()));
onException(new ProtocolException("Proxy: " + proxyEndPoint.getRemoteAddr() +":" + proxyEndPoint.getRemotePort() + " didn't return http return code 200, but " + responseStatus + " while trying to request: " + exchange.getAddress().toString()));
}
}
@ -702,5 +707,22 @@ public class HttpDestination implements Dumpable
{
HttpDestination.this.onConnectionFailed(x);
}
@Override
protected void onException(Throwable x)
{
_queue.remove(exchange);
exchange.setStatus(STATUS_EXCEPTED);
exchange.getEventListener().onException(x);
}
@Override
protected void onExpire()
{
_queue.remove(exchange);
exchange.setStatus(STATUS_EXPIRED);
exchange.getEventListener().onExpire();
}
}
}

View File

@ -15,14 +15,11 @@ package org.eclipse.jetty.client;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLSession;
@ -107,7 +104,7 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector
else
{
channel.configureBlocking(false);
channel.connect(address.toSocketAddress());
channel.connect(address.toSocketAddress());
_selectorManager.register(channel,destination);
ConnectTimeout connectTimeout = new ConnectTimeout(channel,destination);
_httpClient.schedule(connectTimeout,_httpClient.getConnectTimeout());
@ -232,7 +229,7 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector
Timeout.Task connectTimeout = _connectingChannels.remove(channel);
if (connectTimeout != null)
connectTimeout.cancel();
if (attachment instanceof HttpDestination)
((HttpDestination)attachment).onConnectionFailed(ex);
else
@ -280,12 +277,14 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector
public static class ProxySelectChannelEndPoint extends SslSelectChannelEndPoint
{
private final SelectChannelEndPoint plainEndPoint;
private final EnforceOverrideEndPointMethods enforcer;
private volatile boolean upgraded = false;
public ProxySelectChannelEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey key, Buffers sslBuffers, SSLEngine engine, int maxIdleTimeout) throws IOException
{
super(sslBuffers, channel, selectSet, key, engine, maxIdleTimeout);
this.plainEndPoint = new SelectChannelEndPoint(channel, selectSet, key, maxIdleTimeout);
this.enforcer = new EnforceOverrideEndPointMethods();
}
public void upgrade()
@ -295,179 +294,338 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector
public void shutdownOutput() throws IOException
{
if (upgraded)
super.shutdownOutput();
else
plainEndPoint.shutdownOutput();
enforcer.shutdownOutput();
}
public boolean isOutputShutdown()
{
return enforcer.isOutputShutdown();
}
public void shutdownInput() throws IOException
{
enforcer.shutdownInput();
}
public boolean isInputShutdown()
{
return enforcer.isInputShutdown();
}
public void close() throws IOException
{
if (upgraded)
super.close();
else
plainEndPoint.close();
enforcer.close();
}
public int fill(Buffer buffer) throws IOException
{
if (upgraded)
return super.fill(buffer);
else
return plainEndPoint.fill(buffer);
return enforcer.fill(buffer);
}
public int flush(Buffer buffer) throws IOException
{
if (upgraded)
return super.flush(buffer);
else
return plainEndPoint.flush(buffer);
return enforcer.flush(buffer);
}
public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
{
if (upgraded)
return super.flush(header, buffer, trailer);
else
return plainEndPoint.flush(header, buffer, trailer);
return enforcer.flush(header, buffer, trailer);
}
public String getLocalAddr()
{
if (upgraded)
return super.getLocalAddr();
else
return plainEndPoint.getLocalAddr();
return enforcer.getLocalAddr();
}
public String getLocalHost()
{
if (upgraded)
return super.getLocalHost();
else
return plainEndPoint.getLocalHost();
return enforcer.getLocalHost();
}
public int getLocalPort()
{
if (upgraded)
return super.getLocalPort();
else
return plainEndPoint.getLocalPort();
return enforcer.getLocalPort();
}
public String getRemoteAddr()
{
if (upgraded)
return super.getRemoteAddr();
else
return plainEndPoint.getRemoteAddr();
return enforcer.getRemoteAddr();
}
public String getRemoteHost()
{
if (upgraded)
return super.getRemoteHost();
else
return plainEndPoint.getRemoteHost();
return enforcer.getRemoteHost();
}
public int getRemotePort()
{
if (upgraded)
return super.getRemotePort();
else
return plainEndPoint.getRemotePort();
return enforcer.getRemotePort();
}
public boolean isBlocking()
{
if (upgraded)
return super.isBlocking();
else
return plainEndPoint.isBlocking();
return enforcer.isBlocking();
}
public boolean isBufferred()
{
if (upgraded)
return super.isBufferred();
else
return plainEndPoint.isBufferred();
return enforcer.isBufferred();
}
public boolean blockReadable(long millisecs) throws IOException
{
if (upgraded)
return super.blockReadable(millisecs);
else
return plainEndPoint.blockReadable(millisecs);
return enforcer.blockReadable(millisecs);
}
public boolean blockWritable(long millisecs) throws IOException
{
if (upgraded)
return super.blockWritable(millisecs);
else
return plainEndPoint.blockWritable(millisecs);
return enforcer.blockWritable(millisecs);
}
public boolean isOpen()
{
if (upgraded)
return super.isOpen();
else
return plainEndPoint.isOpen();
return enforcer.isOpen();
}
public Object getTransport()
{
if (upgraded)
return super.getTransport();
else
return plainEndPoint.getTransport();
return enforcer.getTransport();
}
public boolean isBufferingInput()
{
if (upgraded)
return super.isBufferingInput();
else
return plainEndPoint.isBufferingInput();
return enforcer.isBufferingInput();
}
public boolean isBufferingOutput()
{
if (upgraded)
return super.isBufferingOutput();
else
return plainEndPoint.isBufferingOutput();
return enforcer.isBufferingOutput();
}
public void flush() throws IOException
{
if (upgraded)
super.flush();
else
plainEndPoint.flush();
enforcer.flush();
}
public int getMaxIdleTime()
{
if (upgraded)
return super.getMaxIdleTime();
else
return plainEndPoint.getMaxIdleTime();
return enforcer.getMaxIdleTime();
}
public void setMaxIdleTime(int timeMs) throws IOException
{
if (upgraded)
super.setMaxIdleTime(timeMs);
else
plainEndPoint.setMaxIdleTime(timeMs);
enforcer.setMaxIdleTime(timeMs);
}
/**
* The only reason this class exist is to enforce that
* {@link ProxySelectChannelEndPoint} overrides all methods of {@link EndPoint}.
* Therefore, if a method is added to {@link EndPoint}, this class
* won't compile anymore, will need an implementation, and one must remember
* to override the correspondent method in {@link ProxySelectChannelEndPoint}.
*/
private class EnforceOverrideEndPointMethods implements EndPoint
{
public void shutdownOutput() throws IOException
{
if (upgraded)
ProxySelectChannelEndPoint.super.shutdownOutput();
else
plainEndPoint.shutdownOutput();
}
public boolean isOutputShutdown()
{
if (upgraded)
return ProxySelectChannelEndPoint.super.isOutputShutdown();
else
return plainEndPoint.isOutputShutdown();
}
public void shutdownInput() throws IOException
{
if (upgraded)
ProxySelectChannelEndPoint.super.shutdownInput();
else
plainEndPoint.shutdownInput();
}
public boolean isInputShutdown()
{
if (upgraded)
return ProxySelectChannelEndPoint.super.isInputShutdown();
else
return plainEndPoint.isInputShutdown();
}
public void close() throws IOException
{
if (upgraded)
ProxySelectChannelEndPoint.super.close();
else
plainEndPoint.close();
}
public int fill(Buffer buffer) throws IOException
{
if (upgraded)
return ProxySelectChannelEndPoint.super.fill(buffer);
else
return plainEndPoint.fill(buffer);
}
public int flush(Buffer buffer) throws IOException
{
if (upgraded)
return ProxySelectChannelEndPoint.super.flush(buffer);
else
return plainEndPoint.flush(buffer);
}
public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
{
if (upgraded)
return ProxySelectChannelEndPoint.super.flush(header, buffer, trailer);
else
return plainEndPoint.flush(header, buffer, trailer);
}
public String getLocalAddr()
{
if (upgraded)
return ProxySelectChannelEndPoint.super.getLocalAddr();
else
return plainEndPoint.getLocalAddr();
}
public String getLocalHost()
{
if (upgraded)
return ProxySelectChannelEndPoint.super.getLocalHost();
else
return plainEndPoint.getLocalHost();
}
public int getLocalPort()
{
if (upgraded)
return ProxySelectChannelEndPoint.super.getLocalPort();
else
return plainEndPoint.getLocalPort();
}
public String getRemoteAddr()
{
if (upgraded)
return ProxySelectChannelEndPoint.super.getRemoteAddr();
else
return plainEndPoint.getRemoteAddr();
}
public String getRemoteHost()
{
if (upgraded)
return ProxySelectChannelEndPoint.super.getRemoteHost();
else
return plainEndPoint.getRemoteHost();
}
public int getRemotePort()
{
if (upgraded)
return ProxySelectChannelEndPoint.super.getRemotePort();
else
return plainEndPoint.getRemotePort();
}
public boolean isBlocking()
{
if (upgraded)
return ProxySelectChannelEndPoint.super.isBlocking();
else
return plainEndPoint.isBlocking();
}
public boolean isBufferred()
{
if (upgraded)
return ProxySelectChannelEndPoint.super.isBufferred();
else
return plainEndPoint.isBufferred();
}
public boolean blockReadable(long millisecs) throws IOException
{
if (upgraded)
return ProxySelectChannelEndPoint.super.blockReadable(millisecs);
else
return plainEndPoint.blockReadable(millisecs);
}
public boolean blockWritable(long millisecs) throws IOException
{
if (upgraded)
return ProxySelectChannelEndPoint.super.blockWritable(millisecs);
else
return plainEndPoint.blockWritable(millisecs);
}
public boolean isOpen()
{
if (upgraded)
return ProxySelectChannelEndPoint.super.isOpen();
else
return plainEndPoint.isOpen();
}
public Object getTransport()
{
if (upgraded)
return ProxySelectChannelEndPoint.super.getTransport();
else
return plainEndPoint.getTransport();
}
public boolean isBufferingInput()
{
if (upgraded)
return ProxySelectChannelEndPoint.super.isBufferingInput();
else
return plainEndPoint.isBufferingInput();
}
public boolean isBufferingOutput()
{
if (upgraded)
return ProxySelectChannelEndPoint.super.isBufferingOutput();
else
return plainEndPoint.isBufferingOutput();
}
public void flush() throws IOException
{
if (upgraded)
ProxySelectChannelEndPoint.super.flush();
else
plainEndPoint.flush();
}
public int getMaxIdleTime()
{
if (upgraded)
return ProxySelectChannelEndPoint.super.getMaxIdleTime();
else
return plainEndPoint.getMaxIdleTime();
}
public void setMaxIdleTime(int timeMs) throws IOException
{
if (upgraded)
ProxySelectChannelEndPoint.super.setMaxIdleTime(timeMs);
else
plainEndPoint.setMaxIdleTime(timeMs);
}
}
}
}

View File

@ -0,0 +1,126 @@
// ========================================================================
// Copyright (c) 2009-2009 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.client;
import java.io.IOException;
import java.net.ProtocolException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.HttpConnection;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.ConnectHandler;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
/* ------------------------------------------------------------ */
/**
* This UnitTest class executes two tests. Both will send a http request to https://google.com through a misbehaving proxy server.
* <p/>
* The first test runs against a proxy which simply closes the connection (as nginx does) for a connect request. The second proxy server always responds with a
* 500 error.
* <p/>
* The expected result for both tests is an exception and the HttpExchange should have status HttpExchange.STATUS_EXCEPTED.
*/
public class HttpsViaBrokenHttpProxyTest
{
private Server _proxy = new Server();
private HttpClient _client = new HttpClient();
@Before
public void init() throws Exception
{
// setup proxies with different behaviour
_proxy.addConnector(new SelectChannelConnector());
_proxy.setHandler(new BadBehavingConnectHandler());
_proxy.start();
int proxyClosingConnectionPort = _proxy.getConnectors()[0].getLocalPort();
_client.setProxy(new Address("localhost", proxyClosingConnectionPort));
_client.start();
}
@After
public void destroy() throws Exception
{
_client.stop();
_proxy.stop();
}
@Test
public void httpsViaProxyThatClosesConnectionOnConnectRequestTest() throws Exception
{
sendRequestThroughProxy(new ContentExchange(), "close", 9);
}
@Test
public void httpsViaProxyThatReturns500ErrorTest() throws Exception
{
HttpExchange exchange = new ContentExchange()
{
@Override
protected void onException(Throwable x)
{
// Suppress logging for expected exception
if (!(x instanceof ProtocolException))
super.onException(x);
}
};
sendRequestThroughProxy(exchange, "error500", 9);
}
@Test
public void httpsViaProxyThatReturns504ErrorTest() throws Exception
{
sendRequestThroughProxy(new ContentExchange(), "error504", 8);
}
private void sendRequestThroughProxy(HttpExchange exchange, String desiredBehaviour, int exptectedStatus) throws Exception
{
String url = "https://" + desiredBehaviour + ".com/";
exchange.setURL(url);
exchange.addRequestHeader("behaviour", desiredBehaviour);
_client.send(exchange);
assertEquals(HttpExchange.toState(exptectedStatus) + " status awaited", exptectedStatus, exchange.waitForDone());
}
private class BadBehavingConnectHandler extends ConnectHandler
{
@Override
protected void handleConnect(Request baseRequest, HttpServletRequest request, HttpServletResponse response, String serverAddress)
throws ServletException, IOException
{
if (serverAddress.contains("close"))
{
HttpConnection.getCurrentConnection().getEndPoint().close();
}
else if (serverAddress.contains("error500"))
{
response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500);
}
else if (serverAddress.contains("error504"))
{
response.setStatus(HttpStatus.GATEWAY_TIMEOUT_504);
}
baseRequest.setHandled(true);
}
}
}

View File

@ -1,9 +1,13 @@
package org.eclipse.jetty.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.URLEncoder;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
@ -26,9 +30,6 @@ import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.junit.After;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class ProxyTunnellingTest
{
private Server server;
@ -41,7 +42,7 @@ public class ProxyTunnellingTest
{
return proxyConnector.getLocalPort();
}
protected void startSSLServer(Handler handler) throws Exception
{
SslSelectChannelConnector connector = new SslSelectChannelConnector();
@ -217,11 +218,11 @@ public class ProxyTunnellingTest
ContentExchange exchange = new ContentExchange(true)
{
@Override
protected void onConnectionFailed(Throwable x)
protected void onException(Throwable x)
{
latch.countDown();
}
};
exchange.setMethod(HttpMethods.GET);
String body = "BODY";

View File

@ -112,10 +112,18 @@ public class ChannelEndPoint implements EndPoint
Socket socket= ((SocketChannel)_channel).socket();
if (!socket.isClosed())
{
if(socket.isOutputShutdown())
socket.close();
else if (!socket.isInputShutdown())
socket.shutdownInput();
try
{
if (!socket.isInputShutdown())
socket.shutdownInput();
if (socket.isOutputShutdown())
socket.close();
}
catch (SocketException e)
{
LOG.ignore(e);
}
}
}
}
@ -132,12 +140,12 @@ public class ChannelEndPoint implements EndPoint
{
try
{
if (!socket.isOutputShutdown())
socket.shutdownOutput();
if (socket.isInputShutdown())
socket.close();
else if (!socket.isOutputShutdown())
socket.shutdownOutput();
}
catch(SocketException e)
catch (SocketException e)
{
LOG.ignore(e);
}
@ -174,7 +182,7 @@ public class ChannelEndPoint implements EndPoint
{
final NIOBuffer nbuf = (NIOBuffer)buf;
final ByteBuffer bbuf=nbuf.getByteBuffer();
//noinspection SynchronizationOnLocalVariableOrMethodParameter
try
{
@ -211,7 +219,7 @@ public class ChannelEndPoint implements EndPoint
{
LOG.ignore(xx);
}
if (len>0)
throw x;
len=-1;
@ -221,7 +229,7 @@ public class ChannelEndPoint implements EndPoint
{
throw new IOException("Not Implemented");
}
return len;
}