431642 - Implement ProxyServlet using Servlet 3.1 async I/O.

Intermediate commit that implements asynchronous content in HttpClient,
for the HTTP protocol, passing the tests.

This work needs to be extended to FCGI and SPDY and finally implement
the asynchronous proxy servlet.
This commit is contained in:
Simone Bordet 2014-04-19 12:36:15 +02:00
parent 1185febb74
commit aeb27cd461
8 changed files with 291 additions and 87 deletions

View File

@ -33,6 +33,7 @@ import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -48,8 +49,8 @@ import org.eclipse.jetty.util.log.Logger;
* is available</li>
* <li>{@link #responseHeader(HttpExchange, HttpField)}, when a HTTP field is available</li>
* <li>{@link #responseHeaders(HttpExchange)}, when all HTTP headers are available</li>
* <li>{@link #responseContent(HttpExchange, ByteBuffer)}, when HTTP content is available; this is the only method
* that may be invoked multiple times with different buffers containing different content</li>
* <li>{@link #responseContent(HttpExchange, ByteBuffer, Callback)}, when HTTP content is available; this is the only
* method that may be invoked multiple times with different buffers containing different content</li>
* <li>{@link #responseSuccess(HttpExchange)}, when the response is complete</li>
* </ol>
* At any time, subclasses may invoke {@link #responseFailure(Throwable)} to indicate that the response has failed
@ -237,7 +238,7 @@ public abstract class HttpReceiver
HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response headers {}{}{}", response, System.getProperty("line.separator"), response.getHeaders().toString().trim());
LOG.debug("Response headers {}{}{}", response, System.lineSeparator(), response.getHeaders().toString().trim());
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyHeaders(exchange.getConversation().getResponseListeners(), response);
@ -269,7 +270,7 @@ public abstract class HttpReceiver
* @param buffer the response HTTP content buffer
* @return whether the processing should continue
*/
protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer)
protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, Callback callback)
{
out: while (true)
{
@ -292,18 +293,18 @@ public abstract class HttpReceiver
HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response content {}{}{}", response, System.getProperty("line.separator"), BufferUtil.toDetailString(buffer));
LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer));
ContentDecoder decoder = this.decoder;
if (decoder != null)
{
buffer = decoder.decode(buffer);
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.getProperty("line.separator"), BufferUtil.toDetailString(buffer));
LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.lineSeparator(), BufferUtil.toDetailString(buffer));
}
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyContent(exchange.getConversation().getResponseListeners(), response, buffer);
notifier.notifyContent(exchange.getConversation().getResponseListeners(), response, buffer, callback);
return true;
}

View File

@ -27,6 +27,8 @@ import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingNestedCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -106,35 +108,17 @@ public class ResponseNotifier
}
}
public void notifyContent(List<Response.ResponseListener> listeners, Response response, ByteBuffer buffer)
public void notifyContent(List<Response.ResponseListener> listeners, Response response, ByteBuffer buffer, Callback callback)
{
// TODO: we need to create a "cumulative" callback that keeps track of how many listeners
// TODO: are invoked, and how many of these actually invoked the callback, and eventually
// TODO: call the callback passed to this method.
// Slice the buffer to avoid that listeners peek into data they should not look at.
buffer = buffer.slice();
if (!buffer.hasRemaining())
return;
// Optimized to avoid allocations of iterator instances
for (int i = 0; i < listeners.size(); ++i)
{
Response.ResponseListener listener = listeners.get(i);
if (listener instanceof Response.ContentListener)
{
// The buffer was sliced, so we always clear it (position=0, limit=capacity)
// before passing it to the listener that may consume it.
buffer.clear();
notifyContent((Response.ContentListener)listener, response, buffer);
}
}
ContentCallback contentCallback = new ContentCallback(listeners, response, buffer, callback);
contentCallback.iterate();
}
private void notifyContent(Response.ContentListener listener, Response response, ByteBuffer buffer)
private void notifyContent(Response.AsyncContentListener listener, Response response, ByteBuffer buffer, Callback callback)
{
try
{
listener.onContent(response, buffer);
listener.onContent(response, buffer, callback);
}
catch (Throwable x)
{
@ -222,7 +206,8 @@ public class ResponseNotifier
}
notifyHeaders(listeners, response);
if (response instanceof ContentResponse)
notifyContent(listeners, response, ByteBuffer.wrap(((ContentResponse)response).getContent()));
// TODO: handle callback
notifyContent(listeners, response, ByteBuffer.wrap(((ContentResponse)response).getContent()), new Callback.Adapter());
notifySuccess(listeners, response);
}
@ -243,7 +228,8 @@ public class ResponseNotifier
}
notifyHeaders(listeners, response);
if (response instanceof ContentResponse)
notifyContent(listeners, response, ByteBuffer.wrap(((ContentResponse)response).getContent()));
// TODO: handle callback
notifyContent(listeners, response, ByteBuffer.wrap(((ContentResponse)response).getContent()), new Callback.Adapter());
notifyFailure(listeners, response, failure);
}
@ -252,4 +238,51 @@ public class ResponseNotifier
forwardFailure(listeners, response, responseFailure);
notifyComplete(listeners, new Result(request, requestFailure, response, responseFailure));
}
private class ContentCallback extends IteratingNestedCallback
{
private final List<Response.ResponseListener> listeners;
private final Response response;
private final ByteBuffer buffer;
private int index;
private ContentCallback(List<Response.ResponseListener> listeners, Response response, ByteBuffer buffer, Callback callback)
{
super(callback);
this.listeners = listeners;
this.response = response;
// Slice the buffer to avoid that listeners peek into data they should not look at.
this.buffer = buffer.slice();
}
@Override
protected Action process() throws Exception
{
if (index == listeners.size())
return Action.SUCCEEDED;
Response.ResponseListener listener = listeners.get(index);
if (listener instanceof Response.AsyncContentListener)
{
// The buffer was sliced, so we always clear it
// (clear => position=0, limit=capacity) before
// passing it to the listener that may consume it.
buffer.clear();
ResponseNotifier.this.notifyContent((Response.AsyncContentListener)listener, response, buffer, this);
return Action.SCHEDULED;
}
else
{
succeeded();
return Action.SCHEDULED;
}
}
@Override
public void succeeded()
{
++index;
super.succeeded();
}
}
}

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.client.http;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpExchange;
@ -32,12 +33,13 @@ import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
{
private final HttpParser parser = new HttpParser(this);
private ByteBuffer buffer;
private boolean shutdown;
public HttpReceiverOverHTTP(HttpChannelOverHTTP channel)
@ -58,63 +60,75 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
public void receive()
{
HttpConnectionOverHTTP connection = getHttpConnection();
EndPoint endPoint = connection.getEndPoint();
HttpClient client = getHttpDestination().getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
ByteBuffer buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
try
buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
if (process())
{
while (true)
bufferPool.release(buffer);
// Don't linger the buffer around if we are idle.
buffer = null;
}
}
private boolean process()
{
HttpConnectionOverHTTP connection = getHttpConnection();
EndPoint endPoint = connection.getEndPoint();
ByteBuffer buffer = this.buffer;
while (true)
{
try
{
// Connection may be closed in a parser callback
if (connection.isClosed())
{
LOG.debug("{} closed", connection);
break;
if (LOG.isDebugEnabled())
LOG.debug("{} closed", connection);
return true;
}
if (!parse(buffer))
return false;
int read = endPoint.fill(buffer);
// Avoid boxing of variable 'read'
if (LOG.isDebugEnabled())
LOG.debug("Read {} bytes from {}", read, endPoint);
if (read > 0)
{
if (!parse(buffer))
return false;
}
else if (read == 0)
{
fillInterested();
return true;
}
else
{
int read = endPoint.fill(buffer);
if (LOG.isDebugEnabled()) // Avoid boxing of variable 'read'
LOG.debug("Read {} bytes from {}", read, endPoint);
if (read > 0)
{
parse(buffer);
}
else if (read == 0)
{
fillInterested();
break;
}
else
{
shutdown();
break;
}
shutdown();
return true;
}
}
}
catch (EofException x)
{
LOG.ignore(x);
failAndClose(x);
}
catch (Exception x)
{
LOG.debug(x);
failAndClose(x);
}
finally
{
bufferPool.release(buffer);
catch (Throwable x)
{
LOG.debug(x);
failAndClose(x);
return true;
}
}
}
private void parse(ByteBuffer buffer)
private boolean parse(ByteBuffer buffer)
{
while (buffer.hasRemaining())
parser.parseNext(buffer);
{
if (parser.parseNext(buffer))
return parser.isStart();
}
return true;
}
private void fillInterested()
@ -195,13 +209,33 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
if (exchange == null)
return false;
// TODO: need to create the callback here, then check whether it has completed
// TODO: after the call to responseContent. If it has, return false.
// TODO: if it has not, return true, and when will be invoked, we need to
// TODO: proceed with parsing.
final AtomicBoolean completed = new AtomicBoolean();
Callback callback = new Callback()
{
@Override
public void succeeded()
{
if (!completed.compareAndSet(false, true))
{
LOG.debug("Content consumed asynchronously, resuming processing");
if (process())
{
// TODO: release the buffer to the pool !
}
}
}
responseContent(exchange, buffer);
return false;
@Override
public void failed(Throwable x)
{
failAndClose(x);
}
};
responseContent(exchange, buffer, callback);
// Return false to have the parser continue parsing.
// TODO: there is a race here: when this thread returns true, the parser is still running
// TODO: some stateful code that may be changed concurrently by the callback thread.
return completed.compareAndSet(false, true);
}
@Override

View File

@ -235,10 +235,14 @@ public class DeferredContentProvider implements AsyncContentProvider, Closeable
synchronized (lock)
{
chunk = current;
--size;
lock.notify();
if (chunk != null)
{
--size;
lock.notify();
}
}
chunk.callback.succeeded();
if (chunk != null)
chunk.callback.succeeded();
}
@Override
@ -251,7 +255,8 @@ public class DeferredContentProvider implements AsyncContentProvider, Closeable
failure = x;
lock.notify();
}
chunk.callback.failed(x);
if (chunk != null)
chunk.callback.failed(x);
}
}

View File

@ -0,0 +1,127 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert;
import org.junit.Test;
public class HttpClientAsyncContentTest extends AbstractHttpClientServerTest
{
public HttpClientAsyncContentTest(SslContextFactory sslContextFactory)
{
super(sslContextFactory);
}
@Test
public void testSmallAsyncContent() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
ServletOutputStream output = response.getOutputStream();
output.write(65);
output.flush();
output.write(66);
}
});
final AtomicInteger contentCount = new AtomicInteger();
final AtomicReference<Callback> callbackRef = new AtomicReference<>();
final AtomicReference<CountDownLatch> contentLatch = new AtomicReference<>(new CountDownLatch(1));
final CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.onResponseContentAsync(new Response.AsyncContentListener()
{
@Override
public void onContent(Response response, ByteBuffer content, Callback callback)
{
contentCount.incrementAndGet();
callbackRef.set(callback);
contentLatch.get().countDown();
}
})
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
completeLatch.countDown();
}
});
Assert.assertTrue(contentLatch.get().await(555, TimeUnit.SECONDS));
Callback callback = callbackRef.get();
// Wait a while to be sure that the parsing does not proceed.
TimeUnit.MILLISECONDS.sleep(1000);
Assert.assertEquals(1, contentCount.get());
// Succeed the content callback to proceed with parsing.
callbackRef.set(null);
contentLatch.set(new CountDownLatch(1));
callback.succeeded();
Assert.assertTrue(contentLatch.get().await(555, TimeUnit.SECONDS));
callback = callbackRef.get();
// Wait a while to be sure that the parsing does not proceed.
TimeUnit.MILLISECONDS.sleep(1000);
Assert.assertEquals(2, contentCount.get());
Assert.assertEquals(1, completeLatch.getCount());
// Succeed the content callback to proceed with parsing.
callbackRef.set(null);
contentLatch.set(new CountDownLatch(1));
callback.succeeded();
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
Assert.assertEquals(2, contentCount.get());
}
public void test() throws Exception
{
try (Socket socket = new Socket())
{
System.out.println("socket = " + socket);
}
}
}

View File

@ -30,6 +30,7 @@ import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.IdleTimeout;
import org.eclipse.jetty.util.Callback;
public class HttpChannelOverFCGI extends HttpChannel
{
@ -105,7 +106,8 @@ public class HttpChannelOverFCGI extends HttpChannel
protected boolean content(ByteBuffer buffer)
{
HttpExchange exchange = getHttpExchange();
return exchange != null && receiver.responseContent(exchange, buffer);
// TODO: handle callback properly
return exchange != null && receiver.responseContent(exchange, buffer, new Callback.Adapter());
}
protected boolean responseSuccess()

View File

@ -24,6 +24,7 @@ import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpReceiver;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.util.Callback;
public class HttpReceiverOverFCGI extends HttpReceiver
{
@ -51,9 +52,9 @@ public class HttpReceiverOverFCGI extends HttpReceiver
}
@Override
protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer)
protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, Callback callback)
{
return super.responseContent(exchange, buffer);
return super.responseContent(exchange, buffer, callback);
}
@Override

View File

@ -123,7 +123,8 @@ public class HttpReceiverOverSPDY extends HttpReceiver implements StreamFrameLis
{
int length = dataInfo.length();
// TODO: avoid data copy here
boolean process = responseContent(exchange, dataInfo.asByteBuffer(false));
// TODO: handle callback properly
boolean process = responseContent(exchange, dataInfo.asByteBuffer(false), new Callback.Adapter());
dataInfo.consume(length);
if (process)