Made ProxyServlet and AsyncProxyServlet more extensible by adding

factory methods for relevant objects used in the implementation.
This commit is contained in:
Simone Bordet 2014-05-21 09:11:40 +02:00
parent 7745e8b6c1
commit 9e1cac33d0
2 changed files with 71 additions and 33 deletions

View File

@ -46,10 +46,15 @@ public class AsyncProxyServlet extends ProxyServlet
{
ServletInputStream input = request.getInputStream();
DeferredContentProvider provider = new DeferredContentProvider();
input.setReadListener(new StreamReader(proxyRequest, request, provider));
input.setReadListener(newReadListener(proxyRequest, request, provider));
return provider;
}
protected ReadListener newReadListener(Request proxyRequest, HttpServletRequest request, DeferredContentProvider provider)
{
return new StreamReader(proxyRequest, request, provider);
}
@Override
protected void onResponseContent(HttpServletRequest request, HttpServletResponse response, Response proxyResponse, byte[] buffer, int offset, int length, Callback callback)
{
@ -59,7 +64,7 @@ public class AsyncProxyServlet extends ProxyServlet
StreamWriter writeListener = (StreamWriter)request.getAttribute(WRITE_LISTENER_ATTRIBUTE);
if (writeListener == null)
{
writeListener = new StreamWriter(request, proxyResponse);
writeListener = newWriteListener(request, proxyResponse);
request.setAttribute(WRITE_LISTENER_ATTRIBUTE, writeListener);
// Set the data to write before calling setWriteListener(), because
@ -82,7 +87,12 @@ public class AsyncProxyServlet extends ProxyServlet
onResponseFailure(request, response, proxyResponse, x);
}
}
protected StreamWriter newWriteListener(HttpServletRequest request, Response proxyResponse)
{
return new StreamWriter(request, proxyResponse);
}
public static class Transparent extends AsyncProxyServlet
{
private final TransparentDelegate delegate = new TransparentDelegate(this);
@ -101,14 +111,14 @@ public class AsyncProxyServlet extends ProxyServlet
}
}
private class StreamReader implements ReadListener, Callback
protected class StreamReader implements ReadListener, Callback
{
private final byte[] buffer = new byte[getHttpClient().getRequestBufferSize()];
private final Request proxyRequest;
private final HttpServletRequest request;
private final DeferredContentProvider provider;
public StreamReader(Request proxyRequest, HttpServletRequest request, DeferredContentProvider provider)
protected StreamReader(Request proxyRequest, HttpServletRequest request, DeferredContentProvider provider)
{
this.proxyRequest = proxyRequest;
this.request = request;
@ -131,7 +141,7 @@ public class AsyncProxyServlet extends ProxyServlet
if (read > 0)
{
_log.debug("{} proxying content to upstream: {} bytes", requestId, read);
provider.offer(ByteBuffer.wrap(buffer, 0, read), this);
onRequestContent(proxyRequest, request, provider, buffer, 0, read, this);
// Do not call isReady() so that we can apply backpressure.
break;
}
@ -140,6 +150,11 @@ public class AsyncProxyServlet extends ProxyServlet
_log.debug("{} asynchronous read pending on {}", requestId, input);
}
protected void onRequestContent(Request proxyRequest, HttpServletRequest request, DeferredContentProvider provider, byte[] buffer, int offset, int length, Callback callback)
{
provider.offer(ByteBuffer.wrap(buffer, offset, length), callback);
}
@Override
public void onAllDataRead() throws IOException
{
@ -174,7 +189,7 @@ public class AsyncProxyServlet extends ProxyServlet
}
}
private class StreamWriter implements WriteListener
protected class StreamWriter implements WriteListener
{
private final HttpServletRequest request;
private final Response proxyResponse;
@ -184,14 +199,14 @@ public class AsyncProxyServlet extends ProxyServlet
private int length;
private Callback callback;
private StreamWriter(HttpServletRequest request, Response proxyResponse)
protected StreamWriter(HttpServletRequest request, Response proxyResponse)
{
this.request = request;
this.proxyResponse = proxyResponse;
this.state = WriteState.IDLE;
}
private void data(byte[] bytes, int offset, int length, Callback callback)
protected void data(byte[] bytes, int offset, int length, Callback callback)
{
if (state != WriteState.IDLE)
throw new WritePendingException();
@ -235,7 +250,7 @@ public class AsyncProxyServlet extends ProxyServlet
}
}
private void complete()
protected void complete()
{
buffer = null;
offset = 0;

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.proxy;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
@ -477,32 +478,17 @@ public class ProxyServlet extends HttpServlet
proxyRequest.getHeaders().toString().trim());
}
proxyRequest.send(new ProxyResponseListener(request, response));
proxyRequest.send(newProxyResponseListener(request, response));
}
protected ContentProvider proxyRequestContent(final Request proxyRequest, final HttpServletRequest request) throws IOException
{
return new InputStreamContentProvider(request.getInputStream())
{
@Override
public long getLength()
{
return request.getContentLength();
}
return new ProxyInputStreamContentProvider(proxyRequest, request, request.getInputStream());
}
@Override
protected ByteBuffer onRead(byte[] buffer, int offset, int length)
{
_log.debug("{} proxying content to upstream: {} bytes", getRequestId(request), length);
return super.onRead(buffer, offset, length);
}
@Override
protected void onReadFailure(Throwable failure)
{
onClientRequestFailure(proxyRequest, request, failure);
}
};
protected Response.Listener newProxyResponseListener(HttpServletRequest request, HttpServletResponse response)
{
return new ProxyResponseListener(request, response);
}
protected void onClientRequestFailure(Request proxyRequest, HttpServletRequest request, Throwable failure)
@ -716,12 +702,12 @@ public class ProxyServlet extends HttpServlet
}
}
private class ProxyResponseListener extends Response.Listener.Adapter
protected class ProxyResponseListener extends Response.Listener.Adapter
{
private final HttpServletRequest request;
private final HttpServletResponse response;
public ProxyResponseListener(HttpServletRequest request, HttpServletResponse response)
protected ProxyResponseListener(HttpServletRequest request, HttpServletResponse response)
{
this.request = request;
this.response = response;
@ -811,4 +797,41 @@ public class ProxyServlet extends HttpServlet
_log.debug("{} proxying complete", getRequestId(request));
}
}
protected class ProxyInputStreamContentProvider extends InputStreamContentProvider
{
private final Request proxyRequest;
private final HttpServletRequest request;
protected ProxyInputStreamContentProvider(Request proxyRequest, HttpServletRequest request, InputStream input)
{
super(input);
this.proxyRequest = proxyRequest;
this.request = request;
}
@Override
public long getLength()
{
return request.getContentLength();
}
@Override
protected ByteBuffer onRead(byte[] buffer, int offset, int length)
{
_log.debug("{} proxying content to upstream: {} bytes", getRequestId(request), length);
return onRequestContent(proxyRequest, request, buffer, offset, length);
}
protected ByteBuffer onRequestContent(Request proxyRequest, final HttpServletRequest request, byte[] buffer, int offset, int length)
{
return super.onRead(buffer, offset, length);
}
@Override
protected void onReadFailure(Throwable failure)
{
onClientRequestFailure(proxyRequest, request, failure);
}
}
}