Improvements and tests for #297104: added handling of context information.
git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@1560 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
parent
5948ea79eb
commit
fd9b4b8ae9
|
@ -5,13 +5,14 @@ import java.net.InetSocketAddress;
|
||||||
import java.nio.channels.ClosedChannelException;
|
import java.nio.channels.ClosedChannelException;
|
||||||
import java.nio.channels.SelectionKey;
|
import java.nio.channels.SelectionKey;
|
||||||
import java.nio.channels.SocketChannel;
|
import java.nio.channels.SocketChannel;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import javax.servlet.ServletException;
|
import javax.servlet.ServletException;
|
||||||
import javax.servlet.http.HttpServletRequest;
|
import javax.servlet.http.HttpServletRequest;
|
||||||
import javax.servlet.http.HttpServletResponse;
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
import org.eclipse.jetty.http.HttpGenerator;
|
|
||||||
import org.eclipse.jetty.http.HttpMethods;
|
import org.eclipse.jetty.http.HttpMethods;
|
||||||
import org.eclipse.jetty.http.HttpParser;
|
import org.eclipse.jetty.http.HttpParser;
|
||||||
import org.eclipse.jetty.io.Buffer;
|
import org.eclipse.jetty.io.Buffer;
|
||||||
|
@ -186,6 +187,7 @@ public class ProxyHandler extends HandlerWrapper
|
||||||
* <p>CONNECT requests may have authentication headers such as <code>Proxy-Authorization</code>
|
* <p>CONNECT requests may have authentication headers such as <code>Proxy-Authorization</code>
|
||||||
* that authenticate the client with the proxy.</p>
|
* that authenticate the client with the proxy.</p>
|
||||||
*
|
*
|
||||||
|
* @param baseRequest Jetty-specific http request
|
||||||
* @param request the http request
|
* @param request the http request
|
||||||
* @param response the http response
|
* @param response the http response
|
||||||
* @param serverAddress the remote server address in the form {@code host:port}
|
* @param serverAddress the remote server address in the form {@code host:port}
|
||||||
|
@ -235,25 +237,28 @@ public class ProxyHandler extends HandlerWrapper
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ClientToProxyConnection clientToProxy = prepareConnections(channel, buffer);
|
ConcurrentMap<String, Object> context = new ConcurrentHashMap<String, Object>();
|
||||||
|
prepareContext(request, context);
|
||||||
|
|
||||||
|
ClientToProxyConnection clientToProxy = prepareConnections(context, channel, buffer);
|
||||||
|
|
||||||
// CONNECT expects a 200 response
|
// CONNECT expects a 200 response
|
||||||
response.setStatus(HttpServletResponse.SC_OK);
|
response.setStatus(HttpServletResponse.SC_OK);
|
||||||
|
|
||||||
// Prevent close
|
// Prevent close
|
||||||
((HttpGenerator)baseRequest.getConnection().getGenerator()).setPersistent(true);
|
baseRequest.getConnection().getGenerator().setPersistent(true);
|
||||||
|
|
||||||
// close to force last flush it so that the client receives it
|
// Close to force last flush it so that the client receives it
|
||||||
response.getOutputStream().close();
|
response.getOutputStream().close();
|
||||||
|
|
||||||
upgradeConnection(request, response, clientToProxy);
|
upgradeConnection(request, response, clientToProxy);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ClientToProxyConnection prepareConnections(SocketChannel channel, Buffer buffer)
|
private ClientToProxyConnection prepareConnections(ConcurrentMap<String, Object> context, SocketChannel channel, Buffer buffer)
|
||||||
{
|
{
|
||||||
HttpConnection httpConnection = HttpConnection.getCurrentConnection();
|
HttpConnection httpConnection = HttpConnection.getCurrentConnection();
|
||||||
ProxyToServerConnection proxyToServer = newProxyToServerConnection(buffer);
|
ProxyToServerConnection proxyToServer = newProxyToServerConnection(context, buffer);
|
||||||
ClientToProxyConnection clientToProxy = newClientToProxyConnection(channel, httpConnection.getEndPoint(), httpConnection.getTimeStamp());
|
ClientToProxyConnection clientToProxy = newClientToProxyConnection(context, channel, httpConnection.getEndPoint(), httpConnection.getTimeStamp());
|
||||||
clientToProxy.setConnection(proxyToServer);
|
clientToProxy.setConnection(proxyToServer);
|
||||||
proxyToServer.setConnection(clientToProxy);
|
proxyToServer.setConnection(clientToProxy);
|
||||||
return clientToProxy;
|
return clientToProxy;
|
||||||
|
@ -275,14 +280,14 @@ public class ProxyHandler extends HandlerWrapper
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ClientToProxyConnection newClientToProxyConnection(SocketChannel channel, EndPoint endPoint, long timeStamp)
|
protected ClientToProxyConnection newClientToProxyConnection(ConcurrentMap<String, Object> context, SocketChannel channel, EndPoint endPoint, long timeStamp)
|
||||||
{
|
{
|
||||||
return new ClientToProxyConnection(channel, endPoint, timeStamp);
|
return new ClientToProxyConnection(context, channel, endPoint, timeStamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ProxyToServerConnection newProxyToServerConnection(Buffer buffer)
|
protected ProxyToServerConnection newProxyToServerConnection(ConcurrentMap<String, Object> context, Buffer buffer)
|
||||||
{
|
{
|
||||||
return new ProxyToServerConnection(buffer);
|
return new ProxyToServerConnection(context, buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
private SocketChannel connectToServer(HttpServletRequest request, String host, int port) throws IOException
|
private SocketChannel connectToServer(HttpServletRequest request, String host, int port) throws IOException
|
||||||
|
@ -311,6 +316,10 @@ public class ProxyHandler extends HandlerWrapper
|
||||||
return channel;
|
return channel;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void prepareContext(HttpServletRequest request, ConcurrentMap<String, Object> context)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
private void upgradeConnection(HttpServletRequest request, HttpServletResponse response, Connection connection) throws IOException
|
private void upgradeConnection(HttpServletRequest request, HttpServletResponse response, Connection connection) throws IOException
|
||||||
{
|
{
|
||||||
// Set the new connection as request attribute and change the status to 101
|
// Set the new connection as request attribute and change the status to 101
|
||||||
|
@ -330,11 +339,12 @@ public class ProxyHandler extends HandlerWrapper
|
||||||
* <p>Reads (with non-blocking semantic) into the given {@code buffer} from the given {@code endPoint}.</p>
|
* <p>Reads (with non-blocking semantic) into the given {@code buffer} from the given {@code endPoint}.</p>
|
||||||
* @param endPoint the endPoint to read from
|
* @param endPoint the endPoint to read from
|
||||||
* @param buffer the buffer to read data into
|
* @param buffer the buffer to read data into
|
||||||
|
* @param context the context information related to the connection
|
||||||
* @return the number of bytes read (possibly 0 since the read is non-blocking)
|
* @return the number of bytes read (possibly 0 since the read is non-blocking)
|
||||||
* or -1 if the channel has been closed remotely
|
* or -1 if the channel has been closed remotely
|
||||||
* @throws IOException if the endPoint cannot be read
|
* @throws IOException if the endPoint cannot be read
|
||||||
*/
|
*/
|
||||||
protected int read(EndPoint endPoint, Buffer buffer) throws IOException
|
protected int read(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Object> context) throws IOException
|
||||||
{
|
{
|
||||||
return endPoint.fill(buffer);
|
return endPoint.fill(buffer);
|
||||||
}
|
}
|
||||||
|
@ -344,9 +354,11 @@ public class ProxyHandler extends HandlerWrapper
|
||||||
*
|
*
|
||||||
* @param endPoint the endPoint to write to
|
* @param endPoint the endPoint to write to
|
||||||
* @param buffer the buffer to write
|
* @param buffer the buffer to write
|
||||||
|
* @param context the context information related to the connection
|
||||||
* @throws IOException if the buffer cannot be written
|
* @throws IOException if the buffer cannot be written
|
||||||
|
* @return the number of bytes written
|
||||||
*/
|
*/
|
||||||
protected int write(EndPoint endPoint, Buffer buffer) throws IOException
|
protected int write(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Object> context) throws IOException
|
||||||
{
|
{
|
||||||
if (buffer == null)
|
if (buffer == null)
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -425,13 +437,15 @@ public class ProxyHandler extends HandlerWrapper
|
||||||
{
|
{
|
||||||
private final CountDownLatch _ready = new CountDownLatch(1);
|
private final CountDownLatch _ready = new CountDownLatch(1);
|
||||||
private final Buffer _buffer = new IndirectNIOBuffer(1024);
|
private final Buffer _buffer = new IndirectNIOBuffer(1024);
|
||||||
|
private final ConcurrentMap<String, Object> _context;
|
||||||
private volatile Buffer _data;
|
private volatile Buffer _data;
|
||||||
private volatile ClientToProxyConnection _toClient;
|
private volatile ClientToProxyConnection _toClient;
|
||||||
private volatile long _timestamp;
|
private volatile long _timestamp;
|
||||||
private volatile SelectChannelEndPoint _endPoint;
|
private volatile SelectChannelEndPoint _endPoint;
|
||||||
|
|
||||||
public ProxyToServerConnection(Buffer data)
|
public ProxyToServerConnection(ConcurrentMap<String, Object> context, Buffer data)
|
||||||
{
|
{
|
||||||
|
_context = context;
|
||||||
_data = data;
|
_data = data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -442,14 +456,14 @@ public class ProxyHandler extends HandlerWrapper
|
||||||
{
|
{
|
||||||
if (_data != null)
|
if (_data != null)
|
||||||
{
|
{
|
||||||
int written = write(_endPoint, _data);
|
int written = write(_endPoint, _data, _context);
|
||||||
_logger.debug("ProxyToServer: written to server {} bytes", written, null);
|
_logger.debug("ProxyToServer: written to server {} bytes", written, null);
|
||||||
_data = null;
|
_data = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
int read = read(_endPoint, _buffer);
|
int read = read(_endPoint, _buffer, _context);
|
||||||
|
|
||||||
if (read == -1)
|
if (read == -1)
|
||||||
{
|
{
|
||||||
|
@ -462,7 +476,7 @@ public class ProxyHandler extends HandlerWrapper
|
||||||
break;
|
break;
|
||||||
|
|
||||||
_logger.debug("ProxyToServer: read from server {} bytes {}", read, _endPoint);
|
_logger.debug("ProxyToServer: read from server {} bytes {}", read, _endPoint);
|
||||||
int written = write(_toClient._endPoint, _buffer);
|
int written = write(_toClient._endPoint, _buffer, _context);
|
||||||
_logger.debug("ProxyToServer: written to client {} bytes", written, null);
|
_logger.debug("ProxyToServer: written to client {} bytes", written, null);
|
||||||
}
|
}
|
||||||
return this;
|
return this;
|
||||||
|
@ -568,14 +582,16 @@ public class ProxyHandler extends HandlerWrapper
|
||||||
public class ClientToProxyConnection implements Connection
|
public class ClientToProxyConnection implements Connection
|
||||||
{
|
{
|
||||||
private final Buffer _buffer = new IndirectNIOBuffer(1024);
|
private final Buffer _buffer = new IndirectNIOBuffer(1024);
|
||||||
|
private final ConcurrentMap<String, Object> _context;
|
||||||
private final SocketChannel _channel;
|
private final SocketChannel _channel;
|
||||||
private final EndPoint _endPoint;
|
private final EndPoint _endPoint;
|
||||||
private final long _timestamp;
|
private final long _timestamp;
|
||||||
private volatile ProxyToServerConnection _toServer;
|
private volatile ProxyToServerConnection _toServer;
|
||||||
private boolean _firstTime = true;
|
private boolean _firstTime = true;
|
||||||
|
|
||||||
public ClientToProxyConnection(SocketChannel channel, EndPoint endPoint, long timestamp)
|
public ClientToProxyConnection(ConcurrentMap<String, Object> context, SocketChannel channel, EndPoint endPoint, long timestamp)
|
||||||
{
|
{
|
||||||
|
_context = context;
|
||||||
_channel = channel;
|
_channel = channel;
|
||||||
_endPoint = endPoint;
|
_endPoint = endPoint;
|
||||||
_timestamp = timestamp;
|
_timestamp = timestamp;
|
||||||
|
@ -596,7 +612,7 @@ public class ProxyHandler extends HandlerWrapper
|
||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
int read = read(_endPoint, _buffer);
|
int read = read(_endPoint, _buffer, _context);
|
||||||
|
|
||||||
if (read == -1)
|
if (read == -1)
|
||||||
{
|
{
|
||||||
|
@ -609,7 +625,7 @@ public class ProxyHandler extends HandlerWrapper
|
||||||
break;
|
break;
|
||||||
|
|
||||||
_logger.debug("ClientToProxy: read from client {} bytes {}", read, _endPoint);
|
_logger.debug("ClientToProxy: read from client {} bytes {}", read, _endPoint);
|
||||||
int written = write(_toServer._endPoint, _buffer);
|
int written = write(_toServer._endPoint, _buffer, _context);
|
||||||
_logger.debug("ClientToProxy: written to server {} bytes", written, null);
|
_logger.debug("ClientToProxy: written to server {} bytes", written, null);
|
||||||
}
|
}
|
||||||
return this;
|
return this;
|
||||||
|
|
|
@ -7,11 +7,15 @@ import java.io.InputStream;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
|
import java.nio.channels.SocketChannel;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import javax.servlet.ServletException;
|
import javax.servlet.ServletException;
|
||||||
import javax.servlet.ServletOutputStream;
|
import javax.servlet.ServletOutputStream;
|
||||||
import javax.servlet.http.HttpServletRequest;
|
import javax.servlet.http.HttpServletRequest;
|
||||||
import javax.servlet.http.HttpServletResponse;
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.io.Buffer;
|
||||||
|
import org.eclipse.jetty.io.EndPoint;
|
||||||
import org.eclipse.jetty.server.Request;
|
import org.eclipse.jetty.server.Request;
|
||||||
import org.eclipse.jetty.server.Server;
|
import org.eclipse.jetty.server.Server;
|
||||||
|
|
||||||
|
@ -374,6 +378,91 @@ public class ProxyHandlerConnectTest extends AbstractProxyHandlerTest
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testCONNECTAndPOSTWithContext() throws Exception
|
||||||
|
{
|
||||||
|
final String contextKey = "contextKey";
|
||||||
|
final String contextValue = "contextValue";
|
||||||
|
|
||||||
|
// Replace the default ProxyHandler with a subclass to test context information passing
|
||||||
|
proxy.stop();
|
||||||
|
proxy.setHandler(new ProxyHandler()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected boolean handleAuthentication(HttpServletRequest request, HttpServletResponse response, String address) throws ServletException, IOException
|
||||||
|
{
|
||||||
|
request.setAttribute(contextKey, contextValue);
|
||||||
|
return super.handleAuthentication(request, response, address);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected SocketChannel connect(HttpServletRequest request, String host, int port) throws IOException
|
||||||
|
{
|
||||||
|
assertEquals(contextValue, request.getAttribute(contextKey));
|
||||||
|
return super.connect(request, host, port);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void prepareContext(HttpServletRequest request, ConcurrentMap<String, Object> context)
|
||||||
|
{
|
||||||
|
// Transfer data from the HTTP request to the connection context
|
||||||
|
assertEquals(contextValue, request.getAttribute(contextKey));
|
||||||
|
context.put(contextKey, request.getAttribute(contextKey));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int read(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Object> context) throws IOException
|
||||||
|
{
|
||||||
|
assertEquals(contextValue, context.get(contextKey));
|
||||||
|
return super.read(endPoint, buffer, context);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int write(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Object> context) throws IOException
|
||||||
|
{
|
||||||
|
assertEquals(contextValue, context.get(contextKey));
|
||||||
|
return super.write(endPoint, buffer, context);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
proxy.start();
|
||||||
|
|
||||||
|
String hostPort = "localhost:" + serverConnector.getLocalPort();
|
||||||
|
String request = "" +
|
||||||
|
"CONNECT " + hostPort + " HTTP/1.1\r\n" +
|
||||||
|
"Host: " + hostPort + "\r\n" +
|
||||||
|
"\r\n";
|
||||||
|
Socket socket = newSocket();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
OutputStream output = socket.getOutputStream();
|
||||||
|
BufferedReader input = new BufferedReader(new InputStreamReader(socket.getInputStream()));
|
||||||
|
|
||||||
|
output.write(request.getBytes("UTF-8"));
|
||||||
|
output.flush();
|
||||||
|
|
||||||
|
// Expect 200 OK from the CONNECT request
|
||||||
|
Response response = readResponse(input);
|
||||||
|
assertEquals("200", response.getCode());
|
||||||
|
|
||||||
|
String body = "0123456789ABCDEF";
|
||||||
|
request = "" +
|
||||||
|
"POST /echo HTTP/1.1\r\n" +
|
||||||
|
"Host: " + hostPort + "\r\n" +
|
||||||
|
"Content-Length: " + body.length() + "\r\n" +
|
||||||
|
"\r\n" +
|
||||||
|
body;
|
||||||
|
output.write(request.getBytes("UTF-8"));
|
||||||
|
output.flush();
|
||||||
|
|
||||||
|
response = readResponse(input);
|
||||||
|
assertEquals("200", response.getCode());
|
||||||
|
assertEquals("POST /echo\r\n" + body, response.getBody());
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
socket.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private class ServerHandler extends AbstractHandler
|
private class ServerHandler extends AbstractHandler
|
||||||
{
|
{
|
||||||
public void handle(String target, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException, ServletException
|
public void handle(String target, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException, ServletException
|
||||||
|
|
Loading…
Reference in New Issue