Further progress on 431642 (async proxy servlet).

This commit is contained in:
Simone Bordet 2014-04-11 09:45:21 +02:00
parent a96d7c388c
commit 4975cae881
9 changed files with 137 additions and 17 deletions

View File

@ -450,19 +450,27 @@ public class HttpRequest implements Request
@Override
public Request onResponseContent(final Response.ContentListener listener)
{
this.responseListeners.add(new Response.ContentListener()
this.responseListeners.add(new Response.AsyncContentListener()
{
@Override
public void onContent(Response response, ByteBuffer content)
public void onContent(Response response, ByteBuffer content, Callback callback)
{
listener.onContent(response, content);
try
{
listener.onContent(response, content);
callback.succeeded();
}
catch (Exception x)
{
callback.failed(x);
}
}
});
return this;
}
@Override
public Request onResponseContent(final Response.AsyncContentListener listener)
public Request onResponseContentAsync(final Response.AsyncContentListener listener)
{
this.responseListeners.add(new Response.AsyncContentListener()
{

View File

@ -108,6 +108,10 @@ public class ResponseNotifier
public void notifyContent(List<Response.ResponseListener> listeners, Response response, ByteBuffer buffer)
{
// TODO: we need to create a "cumulative" callback that keeps track of how many listeners
// TODO: are invoked, and how many of these actually invoked the callback, and eventually
// TODO: call the callback passed to this method.
// Slice the buffer to avoid that listeners peek into data they should not look at.
buffer = buffer.slice();
if (!buffer.hasRemaining())

View File

@ -338,15 +338,16 @@ public interface Request
Request onResponseHeaders(Response.HeadersListener listener);
/**
* @param listener a listener for response content events
* @param listener a consuming listener for response content events
* @return this request object
* @deprecated Use {@link #onResponseContent(Response.AsyncContentListener)} instead.
*/
@Deprecated
Request onResponseContent(Response.ContentListener listener);
// TODO: JAVADOCS
Request onResponseContent(Response.AsyncContentListener listener);
/**
* @param listener an asynchronous listener for response content events
* @return this request object
*/
Request onResponseContentAsync(Response.AsyncContentListener listener);
/**
* @param listener a listener for response success event

View File

@ -241,7 +241,15 @@ public interface Response
@Override
public void onContent(Response response, ByteBuffer content, Callback callback)
{
callback.succeeded();
try
{
onContent(response, content);
callback.succeeded();
}
catch (Exception x)
{
callback.failed(x);
}
}
@Override

View File

@ -182,6 +182,11 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
if (exchange == null)
return false;
// TODO: need to create the callback here, then check whether it has completed
// TODO: after the call to responseContent. If it has, return false.
// TODO: if it has not, return true, and when will be invoked, we need to
// TODO: proceed with parsing.
responseContent(exchange, buffer);
return false;
}

View File

@ -68,6 +68,7 @@ import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.TestingDir;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.ssl.SslContextFactory;
@ -983,6 +984,13 @@ public class HttpClientTest extends AbstractHttpClientServerTest
counter.incrementAndGet();
}
@Override
public void onContent(Response response, ByteBuffer content, Callback callback)
{
// Should not be invoked
counter.incrementAndGet();
}
@Override
public void onSuccess(Response response)
{
@ -1010,6 +1018,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
.onResponseHeader(listener)
.onResponseHeaders(listener)
.onResponseContent(listener)
.onResponseContentAsync(listener)
.onResponseSuccess(listener)
.onResponseFailure(listener)
.send(listener);

View File

@ -23,13 +23,20 @@ import java.nio.ByteBuffer;
import javax.servlet.AsyncContext;
import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.util.Callback;
public class AsyncProxyServlet extends ProxyServlet
{
private static final String WRITE_LISTENER_ATTRIBUTE = AsyncProxyServlet.class.getName() + ".writeListener";
@Override
protected ContentProvider proxyRequestContent(AsyncContext asyncContext, final int requestId) throws IOException
{
@ -39,6 +46,23 @@ public class AsyncProxyServlet extends ProxyServlet
return provider;
}
@Override
protected void onResponseContent(HttpServletRequest request, HttpServletResponse response, Response proxyResponse, byte[] buffer, int offset, int length, Callback callback) throws IOException
{
StreamWriter writeListener = (StreamWriter)request.getAttribute(WRITE_LISTENER_ATTRIBUTE);
if (writeListener == null)
{
writeListener = new StreamWriter(request.getAsyncContext());
request.setAttribute(WRITE_LISTENER_ATTRIBUTE, writeListener);
response.getOutputStream().setWriteListener(writeListener);
}
_log.debug("{} proxying content to downstream: {} bytes", getRequestId(request), length);
if (writeListener.data(buffer, offset, length, callback))
writeListener.onWritePossible();
else
;// TODO: fail callback
}
private class StreamReader implements ReadListener, Callback
{
private final byte[] buffer = new byte[512];
@ -64,7 +88,7 @@ public class AsyncProxyServlet extends ProxyServlet
{
int read = input.read(buffer);
_log.debug("Asynchronous read {} bytes from {}", read, input);
if (read >= 0)
if (read > 0)
{
_log.debug("{} proxying content to upstream: {} bytes", requestId, read);
provider.offer(ByteBuffer.wrap(buffer, 0, read), this);
@ -103,4 +127,64 @@ public class AsyncProxyServlet extends ProxyServlet
// complete the async context since we cannot throw an exception from here.
}
}
private class StreamWriter implements WriteListener
{
private final AsyncContext asyncContext;
private byte[] buffer;
private int offset;
private int length;
private volatile Callback callback;
private StreamWriter(AsyncContext asyncContext)
{
this.asyncContext = asyncContext;
}
private boolean data(byte[] bytes, int offset, int length, Callback callback)
{
if (this.callback != null)
return false;
this.buffer = bytes;
this.offset = offset;
this.length = length;
this.callback = callback;
return true;
}
@Override
public void onWritePossible() throws IOException
{
if (callback == null)
{
ServletOutputStream output = asyncContext.getResponse().getOutputStream();
output.write(buffer, offset, length);
if (output.isReady())
complete();
}
else
{
// If we have a pending callback, it means
// that the write blocked but is now complete.
complete();
}
}
private void complete()
{
this.buffer = null;
this.offset = 0;
this.length = 0;
Callback callback = this.callback;
this.callback = null;
callback.succeeded();
}
@Override
public void onError(Throwable t)
{
// TODO:
}
}
}

View File

@ -19,7 +19,6 @@
package org.eclipse.jetty.proxy;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
@ -51,6 +50,7 @@ import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.HttpCookieStore;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -531,7 +531,7 @@ public class ProxyServlet extends HttpServlet
}
}
protected void onResponseContent(HttpServletRequest request, HttpServletResponse response, Response proxyResponse, byte[] buffer, int offset, int length) throws IOException
protected void onResponseContent(HttpServletRequest request, HttpServletResponse response, Response proxyResponse, byte[] buffer, int offset, int length, Callback callback) throws IOException
{
response.getOutputStream().write(buffer, offset, length);
_log.debug("{} proxying content to downstream: {} bytes", getRequestId(request), length);
@ -732,7 +732,7 @@ public class ProxyServlet extends HttpServlet
}
@Override
public void onContent(Response proxyResponse, ByteBuffer content)
public void onContent(Response proxyResponse, ByteBuffer content, Callback callback)
{
byte[] buffer;
int offset;
@ -751,7 +751,7 @@ public class ProxyServlet extends HttpServlet
try
{
onResponseContent(request, response, proxyResponse, buffer, offset, length);
onResponseContent(request, response, proxyResponse, buffer, offset, length, callback);
}
catch (IOException x)
{

View File

@ -65,6 +65,7 @@ import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.StdErrLog;
@ -797,7 +798,7 @@ public class ProxyServletTest
}
@Override
protected void onResponseContent(HttpServletRequest request, HttpServletResponse response, Response proxyResponse, byte[] buffer, int offset, int length) throws IOException
protected void onResponseContent(HttpServletRequest request, HttpServletResponse response, Response proxyResponse, byte[] buffer, int offset, int length, Callback callback) throws IOException
{
// Accumulate the response content
ByteArrayOutputStream baos = temp.get(request.getRequestURI());
@ -807,7 +808,7 @@ public class ProxyServletTest
temp.put(request.getRequestURI(), baos);
}
baos.write(buffer, offset, length);
super.onResponseContent(request, response, proxyResponse, buffer, offset, length);
super.onResponseContent(request, response, proxyResponse, buffer, offset, length, callback);
}
@Override