Merged branch 'master' into 'jetty-http2'.

This commit is contained in:
Simone Bordet 2014-07-22 21:14:46 +02:00
commit 467773dbdf
20 changed files with 1149 additions and 611 deletions

View File

@ -251,7 +251,7 @@ public class GZIPContentDecoder implements ContentDecoder
else
{
// Accumulate inflated bytes and loop to see if we have finished
byte[] newOutput = Arrays.copyOf(output, output.length+decoded);
byte[] newOutput = Arrays.copyOf(output, output.length + decoded);
System.arraycopy(bytes, 0, newOutput, output.length, decoded);
output = newOutput;
}

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.client;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
@ -49,9 +50,8 @@ import org.eclipse.jetty.util.log.Logger;
* is available</li>
* <li>{@link #responseHeader(HttpExchange, HttpField)}, when a HTTP field is available</li>
* <li>{@link #responseHeaders(HttpExchange)}, when all HTTP headers are available</li>
* <li>{@link #responseContent(HttpExchange, ByteBuffer, Callback)}, when HTTP content is available; this is the only
* method that may be invoked multiple times with different buffers containing different content</li>
* <li>{@link #responseSuccess(HttpExchange)}, when the response is complete</li>
* <li>{@link #responseContent(HttpExchange, ByteBuffer, Callback)}, when HTTP content is available</li>
* <li>{@link #responseSuccess(HttpExchange)}, when the response is successful</li>
* </ol>
* At any time, subclasses may invoke {@link #responseFailure(Throwable)} to indicate that the response has failed
* (for example, because of I/O exceptions).
@ -69,7 +69,8 @@ public abstract class HttpReceiver
private final AtomicReference<ResponseState> responseState = new AtomicReference<>(ResponseState.IDLE);
private final HttpChannel channel;
private volatile ContentDecoder decoder;
private ContentDecoder decoder;
private Throwable failure;
protected HttpReceiver(HttpChannel channel)
{
@ -104,7 +105,7 @@ public abstract class HttpReceiver
*/
protected boolean responseBegin(HttpExchange exchange)
{
if (!updateResponseState(ResponseState.IDLE, ResponseState.BEGIN))
if (!updateResponseState(ResponseState.IDLE, ResponseState.TRANSIENT))
return false;
HttpConversation conversation = exchange.getConversation();
@ -127,6 +128,9 @@ public abstract class HttpReceiver
ResponseNotifier notifier = destination.getResponseNotifier();
notifier.notifyBegin(conversation.getResponseListeners(), response);
if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.BEGIN))
terminateResponse(exchange, failure);
return true;
}
@ -152,7 +156,7 @@ public abstract class HttpReceiver
case BEGIN:
case HEADER:
{
if (updateResponseState(current, ResponseState.HEADER))
if (updateResponseState(current, ResponseState.TRANSIENT))
break out;
break;
}
@ -188,6 +192,9 @@ public abstract class HttpReceiver
}
}
if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADER))
terminateResponse(exchange, failure);
return true;
}
@ -228,7 +235,7 @@ public abstract class HttpReceiver
case BEGIN:
case HEADER:
{
if (updateResponseState(current, ResponseState.HEADERS))
if (updateResponseState(current, ResponseState.TRANSIENT))
break out;
break;
}
@ -261,6 +268,9 @@ public abstract class HttpReceiver
}
}
if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADERS))
terminateResponse(exchange, failure);
return true;
}
@ -273,7 +283,7 @@ public abstract class HttpReceiver
* @param buffer the response HTTP content buffer
* @return whether the processing should continue
*/
protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, Callback callback)
protected boolean responseContent(HttpExchange exchange, ByteBuffer buffer, final Callback callback)
{
out: while (true)
{
@ -283,7 +293,7 @@ public abstract class HttpReceiver
case HEADERS:
case CONTENT:
{
if (updateResponseState(current, ResponseState.CONTENT))
if (updateResponseState(current, ResponseState.TRANSIENT))
break out;
break;
}
@ -298,19 +308,49 @@ public abstract class HttpReceiver
if (LOG.isDebugEnabled())
LOG.debug("Response content {}{}{}", response, System.lineSeparator(), BufferUtil.toDetailString(buffer));
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
ContentDecoder decoder = this.decoder;
if (decoder != null)
if (decoder == null)
{
buffer = decoder.decode(buffer);
// TODO If the decoder consumes all the content, should we return here?
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.lineSeparator(), BufferUtil.toDetailString(buffer));
notifier.notifyContent(listeners, response, buffer, callback);
}
else
{
List<ByteBuffer> decodeds = new ArrayList<>(2);
while (buffer.hasRemaining())
{
ByteBuffer decoded = decoder.decode(buffer);
if (!decoded.hasRemaining())
continue;
decodeds.add(decoded);
if (LOG.isDebugEnabled())
LOG.debug("Response content decoded ({}) {}{}{}", decoder, response, System.lineSeparator(), BufferUtil.toDetailString(decoded));
}
if (decodeds.isEmpty())
{
callback.succeeded();
}
else
{
Callback partial = new Callback.Adapter()
{
@Override
public void failed(Throwable x)
{
callback.failed(x);
}
};
for (int i = 1, size = decodeds.size(); i <= size; ++i)
notifier.notifyContent(listeners, response, decodeds.get(i - 1), i < size ? partial : callback);
}
}
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyContent(exchange.getConversation().getResponseListeners(), response, buffer, callback);
if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
terminateResponse(exchange, failure);
return true;
}
@ -332,16 +372,17 @@ public abstract class HttpReceiver
if (!completed)
return false;
// Reset to be ready for another response
responseState.set(ResponseState.IDLE);
// Reset to be ready for another response.
reset();
// Mark atomically the response as terminated and succeeded,
// with respect to concurrency between request and response.
// If there is a non-null result, then both sender and
// receiver are reset and ready to be reused, and the
// connection closed/pooled (depending on the transport).
Result result = exchange.terminateResponse(null);
// It is important to notify *after* we reset and terminate
// because the notification may trigger another request/response.
HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response success {}", response);
@ -349,17 +390,7 @@ public abstract class HttpReceiver
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifySuccess(listeners, response);
if (result != null)
{
boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
if (!ordered)
channel.exchangeTerminated(result);
if (LOG.isDebugEnabled())
LOG.debug("Request/Response succeeded {}", response);
notifier.notifyComplete(listeners, result);
if (ordered)
channel.exchangeTerminated(result);
}
terminateResponse(exchange, result);
return true;
}
@ -388,7 +419,20 @@ public abstract class HttpReceiver
if (!completed)
return false;
// Dispose to avoid further responses
this.failure = failure;
// Update the state to avoid more response processing.
boolean fail;
while (true)
{
ResponseState current = responseState.get();
if (updateResponseState(current, ResponseState.FAILURE))
{
fail = current != ResponseState.TRANSIENT;
break;
}
}
dispose();
// Mark atomically the response as terminated and failed,
@ -402,19 +446,45 @@ public abstract class HttpReceiver
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyFailure(listeners, response, failure);
if (fail)
{
terminateResponse(exchange, result);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Concurrent failure: response termination skipped, performed by helpers");
}
return true;
}
private void terminateResponse(HttpExchange exchange, Throwable failure)
{
Result result = exchange.terminateResponse(failure);
terminateResponse(exchange, result);
}
private void terminateResponse(HttpExchange exchange, Result result)
{
HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response complete {}", response);
if (result != null)
{
boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
if (!ordered)
channel.exchangeTerminated(result);
if (LOG.isDebugEnabled())
LOG.debug("Request/Response failed {}", response);
LOG.debug("Request/Response {} {}", failure == null ? "succeeded" : "failed", response);
List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyComplete(listeners, result);
if (ordered)
channel.exchangeTerminated(result);
}
return true;
}
/**
@ -427,7 +497,6 @@ public abstract class HttpReceiver
protected void reset()
{
decoder = null;
responseState.set(ResponseState.IDLE);
}
/**
@ -440,7 +509,6 @@ public abstract class HttpReceiver
protected void dispose()
{
decoder = null;
responseState.set(ResponseState.FAILURE);
}
public boolean abort(Throwable cause)
@ -464,6 +532,10 @@ public abstract class HttpReceiver
*/
private enum ResponseState
{
/**
* One of the response*() methods is being executed.
*/
TRANSIENT,
/**
* The response is not yet received, the initial state
*/

View File

@ -65,7 +65,8 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
private final IteratingCallback contentCallback = new ContentCallback();
private final Callback lastCallback = new LastContentCallback();
private final HttpChannel channel;
private volatile HttpContent content;
private HttpContent content;
private Throwable failure;
protected HttpSender(HttpChannel channel)
{
@ -197,34 +198,40 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
protected boolean queuedToBegin(Request request)
{
if (!updateRequestState(RequestState.QUEUED, RequestState.BEGIN))
if (!updateRequestState(RequestState.QUEUED, RequestState.TRANSIENT))
return false;
if (LOG.isDebugEnabled())
LOG.debug("Request begin {}", request);
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
notifier.notifyBegin(request);
if (!updateRequestState(RequestState.TRANSIENT, RequestState.BEGIN))
terminateRequest(getHttpExchange(), failure, false);
return true;
}
protected boolean beginToHeaders(Request request)
{
if (!updateRequestState(RequestState.BEGIN, RequestState.HEADERS))
if (!updateRequestState(RequestState.BEGIN, RequestState.TRANSIENT))
return false;
if (LOG.isDebugEnabled())
LOG.debug("Request headers {}{}{}", request, System.getProperty("line.separator"), request.getHeaders().toString().trim());
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
notifier.notifyHeaders(request);
if (!updateRequestState(RequestState.TRANSIENT, RequestState.HEADERS))
terminateRequest(getHttpExchange(), failure, false);
return true;
}
protected boolean headersToCommit(Request request)
{
if (!updateRequestState(RequestState.HEADERS, RequestState.COMMIT))
if (!updateRequestState(RequestState.HEADERS, RequestState.TRANSIENT))
return false;
if (LOG.isDebugEnabled())
LOG.debug("Request committed {}", request);
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
notifier.notifyCommit(request);
if (!updateRequestState(RequestState.TRANSIENT, RequestState.COMMIT))
terminateRequest(getHttpExchange(), failure, true);
return true;
}
@ -236,21 +243,19 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
case COMMIT:
case CONTENT:
{
if (!updateRequestState(current, RequestState.CONTENT))
if (!updateRequestState(current, RequestState.TRANSIENT_CONTENT))
return false;
if (LOG.isDebugEnabled())
LOG.debug("Request content {}{}{}", request, System.getProperty("line.separator"), BufferUtil.toDetailString(content));
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
notifier.notifyContent(request, content);
if (!updateRequestState(RequestState.TRANSIENT_CONTENT, RequestState.CONTENT))
terminateRequest(getHttpExchange(), failure, true);
return true;
}
case FAILURE:
{
return false;
}
default:
{
throw new IllegalStateException(current.toString());
return false;
}
}
}
@ -269,43 +274,28 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (!completed)
return false;
// Reset to be ready for another request
requestState.set(RequestState.QUEUED);
// Reset to be ready for another request.
reset();
// Mark atomically the request as terminated and succeeded,
// with respect to concurrency between request and response.
Result result = exchange.terminateRequest(null);
// It is important to notify completion *after* we reset because
// the notification may trigger another request/response
Request request = exchange.getRequest();
if (LOG.isDebugEnabled())
LOG.debug("Request success {}", request);
HttpDestination destination = getHttpChannel().getHttpDestination();
destination.getRequestNotifier().notifySuccess(exchange.getRequest());
if (result != null)
{
boolean ordered = destination.getHttpClient().isStrictEventOrdering();
if (!ordered)
channel.exchangeTerminated(result);
if (LOG.isDebugEnabled())
LOG.debug("Request/Response succeded {}", request);
HttpConversation conversation = exchange.getConversation();
destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
if (ordered)
channel.exchangeTerminated(result);
}
terminateRequest(exchange, null, true, result);
return true;
}
case FAILURE:
{
return false;
}
default:
{
throw new IllegalStateException(current.toString());
return false;
}
}
}
@ -322,8 +312,22 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (!completed)
return false;
// Dispose to avoid further requests
RequestState requestState = dispose();
this.failure = failure;
// Update the state to avoid more request processing.
RequestState current;
boolean fail;
while (true)
{
current = requestState.get();
if (updateRequestState(current, RequestState.FAILURE))
{
fail = current != RequestState.TRANSIENT && current != RequestState.TRANSIENT_CONTENT;
break;
}
}
dispose();
// Mark atomically the request as terminated and failed,
// with respect to concurrency between request and response.
@ -335,8 +339,36 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
HttpDestination destination = getHttpChannel().getHttpDestination();
destination.getRequestNotifier().notifyFailure(request, failure);
boolean notCommitted = isBeforeCommit(requestState);
if (result == null && notCommitted && request.getAbortCause() == null)
if (fail)
{
terminateRequest(exchange, failure, !isBeforeCommit(current), result);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Concurrent failure: request termination skipped, performed by helpers");
}
return true;
}
private void terminateRequest(HttpExchange exchange, Throwable failure, boolean committed)
{
if (exchange != null)
{
Result result = exchange.terminateRequest(failure);
terminateRequest(exchange, failure, committed, result);
}
}
private void terminateRequest(HttpExchange exchange, Throwable failure, boolean committed, Result result)
{
Request request = exchange.getRequest();
if (LOG.isDebugEnabled())
LOG.debug("Terminating request {}", request);
if (failure != null && !committed && result == null && request.getAbortCause() == null)
{
// Complete the response from here
if (exchange.responseComplete())
@ -349,18 +381,17 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (result != null)
{
HttpDestination destination = getHttpChannel().getHttpDestination();
boolean ordered = destination.getHttpClient().isStrictEventOrdering();
if (!ordered)
channel.exchangeTerminated(result);
if (LOG.isDebugEnabled())
LOG.debug("Request/Response failed {}", request);
LOG.debug("Request/Response {} {}", failure == null ? "succeeded" : "failed", request);
HttpConversation conversation = exchange.getConversation();
destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
if (ordered)
channel.exchangeTerminated(result);
}
return true;
}
/**
@ -398,23 +429,14 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
{
content.close();
content = null;
requestState.set(RequestState.QUEUED);
senderState.set(SenderState.IDLE);
}
protected RequestState dispose()
protected void dispose()
{
while (true)
{
RequestState current = requestState.get();
if (updateRequestState(current, RequestState.FAILURE))
{
HttpContent content = this.content;
if (content != null)
content.close();
return current;
}
}
HttpContent content = this.content;
if (content != null)
content.close();
}
public void proceed(HttpExchange exchange, Throwable failure)
@ -485,7 +507,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
return abortable && anyToFailure(failure);
}
protected boolean updateRequestState(RequestState from, RequestState to)
private boolean updateRequestState(RequestState from, RequestState to)
{
boolean updated = requestState.compareAndSet(from, to);
if (!updated)
@ -505,6 +527,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
{
switch (requestState)
{
case TRANSIENT:
case QUEUED:
case BEGIN:
case HEADERS:
@ -518,6 +541,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
{
switch (requestState)
{
case TRANSIENT_CONTENT:
case COMMIT:
case CONTENT:
return true;
@ -534,8 +558,16 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
/**
* The request states {@link HttpSender} goes through when sending a request.
*/
protected enum RequestState
private enum RequestState
{
/**
* One of the state transition methods is being executed.
*/
TRANSIENT,
/**
* The content transition method is being executed.
*/
TRANSIENT_CONTENT,
/**
* The request is queued, the initial state
*/

View File

@ -196,12 +196,11 @@ public class HttpSenderOverHTTP extends HttpSender
}
@Override
protected RequestState dispose()
protected void dispose()
{
generator.abort();
RequestState result = super.dispose();
super.dispose();
shutdownOutput();
return result;
}
private void shutdownOutput()

View File

@ -144,28 +144,41 @@ public class InputStreamResponseListener extends Listener.Adapter
}
}
@Override
public void onSuccess(Response response)
{
if (LOG.isDebugEnabled())
LOG.debug("Queuing end of content {}{}", EOF, "");
queue.offer(EOF);
signal();
}
@Override
public void onFailure(Response response, Throwable failure)
{
fail(failure);
signal();
}
@Override
public void onComplete(Result result)
{
if (result.isFailed() && failure == null)
fail(result.getFailure());
this.result = result;
if (result.isSucceeded())
{
if (LOG.isDebugEnabled())
LOG.debug("Queuing end of content {}{}", EOF, "");
queue.offer(EOF);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Queuing failure {} {}", FAILURE, failure);
queue.offer(FAILURE);
this.failure = result.getFailure();
responseLatch.countDown();
}
resultLatch.countDown();
signal();
}
private void fail(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("Queuing failure {} {}", FAILURE, failure);
queue.offer(FAILURE);
this.failure = failure;
responseLatch.countDown();
}
protected boolean await()
{
try

View File

@ -0,0 +1,209 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPOutputStream;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert;
import org.junit.Test;
public class HttpClientGZIPTest extends AbstractHttpClientServerTest
{
public HttpClientGZIPTest(SslContextFactory sslContextFactory)
{
super(sslContextFactory);
}
@Test
public void testGZIPContentEncoding() throws Exception
{
final byte[] data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setHeader("Content-Encoding", "gzip");
GZIPOutputStream gzipOutput = new GZIPOutputStream(response.getOutputStream());
gzipOutput.write(data);
gzipOutput.finish();
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.assertEquals(200, response.getStatus());
Assert.assertArrayEquals(data, response.getContent());
}
@Test
public void testGZIPContentOneByteAtATime() throws Exception
{
final byte[] data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setHeader("Content-Encoding", "gzip");
ByteArrayOutputStream gzipData = new ByteArrayOutputStream();
GZIPOutputStream gzipOutput = new GZIPOutputStream(gzipData);
gzipOutput.write(data);
gzipOutput.finish();
ServletOutputStream output = response.getOutputStream();
byte[] gzipBytes = gzipData.toByteArray();
for (byte gzipByte : gzipBytes)
{
output.write(gzipByte);
output.flush();
sleep(100);
}
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send();
Assert.assertEquals(200, response.getStatus());
Assert.assertArrayEquals(data, response.getContent());
}
@Test
public void testGZIPContentSentTwiceInOneWrite() throws Exception
{
final byte[] data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setHeader("Content-Encoding", "gzip");
ByteArrayOutputStream gzipData = new ByteArrayOutputStream();
GZIPOutputStream gzipOutput = new GZIPOutputStream(gzipData);
gzipOutput.write(data);
gzipOutput.finish();
byte[] gzipBytes = gzipData.toByteArray();
byte[] content = Arrays.copyOf(gzipBytes, 2 * gzipBytes.length);
System.arraycopy(gzipBytes, 0, content, gzipBytes.length, gzipBytes.length);
ServletOutputStream output = response.getOutputStream();
output.write(content);
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send();
Assert.assertEquals(200, response.getStatus());
byte[] expected = Arrays.copyOf(data, 2 * data.length);
System.arraycopy(data, 0, expected, data.length, data.length);
Assert.assertArrayEquals(expected, response.getContent());
}
@Test
public void testGZIPContentFragmentedBeforeTrailer() throws Exception
{
// There are 8 trailer bytes to gzip encoding.
testGZIPContentFragmented(9);
}
@Test
public void testGZIPContentFragmentedAtTrailer() throws Exception
{
// There are 8 trailer bytes to gzip encoding.
testGZIPContentFragmented(1);
}
private void testGZIPContentFragmented(final int fragment) throws Exception
{
final byte[] data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setHeader("Content-Encoding", "gzip");
ByteArrayOutputStream gzipData = new ByteArrayOutputStream();
GZIPOutputStream gzipOutput = new GZIPOutputStream(gzipData);
gzipOutput.write(data);
gzipOutput.finish();
byte[] gzipBytes = gzipData.toByteArray();
byte[] chunk1 = Arrays.copyOfRange(gzipBytes, 0, gzipBytes.length - fragment);
byte[] chunk2 = Arrays.copyOfRange(gzipBytes, gzipBytes.length - fragment, gzipBytes.length);
ServletOutputStream output = response.getOutputStream();
output.write(chunk1);
output.flush();
sleep(500);
output.write(chunk2);
output.flush();
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send();
Assert.assertEquals(200, response.getStatus());
Assert.assertArrayEquals(data, response.getContent());
}
private static void sleep(long ms) throws IOException
{
try
{
TimeUnit.MILLISECONDS.sleep(ms);
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
}

View File

@ -43,8 +43,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.GZIPOutputStream;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
@ -672,32 +670,6 @@ public class HttpClientTest extends AbstractHttpClientServerTest
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void test_GZIP_ContentEncoding() throws Exception
{
final byte[] data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setHeader("Content-Encoding", "gzip");
GZIPOutputStream gzipOutput = new GZIPOutputStream(response.getOutputStream());
gzipOutput.write(data);
gzipOutput.finish();
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.assertEquals(200, response.getStatus());
Assert.assertArrayEquals(data, response.getContent());
}
@Slow
@Test
public void test_Request_IdleTimeout() throws Exception

View File

@ -0,0 +1,198 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert;
import org.junit.Test;
public class HttpResponseConcurrentAbortTest extends AbstractHttpClientServerTest
{
private final CountDownLatch callbackLatch = new CountDownLatch(1);
private final CountDownLatch failureLatch = new CountDownLatch(1);
private final CountDownLatch completeLatch = new CountDownLatch(1);
private final AtomicBoolean success = new AtomicBoolean();
public HttpResponseConcurrentAbortTest(SslContextFactory sslContextFactory)
{
super(sslContextFactory);
}
@Test
public void testAbortOnBegin() throws Exception
{
start(new EmptyServerHandler());
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.onResponseBegin(new Response.BeginListener()
{
@Override
public void onBegin(Response response)
{
abort(response);
}
})
.send(new TestResponseListener());
Assert.assertTrue(callbackLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(completeLatch.await(6, TimeUnit.SECONDS));
Assert.assertTrue(success.get());
}
@Test
public void testAbortOnHeader() throws Exception
{
start(new EmptyServerHandler());
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.onResponseHeader(new Response.HeaderListener()
{
@Override
public boolean onHeader(Response response, HttpField field)
{
abort(response);
return true;
}
})
.send(new TestResponseListener());
Assert.assertTrue(callbackLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(success.get());
}
@Test
public void testAbortOnHeaders() throws Exception
{
start(new EmptyServerHandler());
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.onResponseHeaders(new Response.HeadersListener()
{
@Override
public void onHeaders(Response response)
{
abort(response);
}
})
.send(new TestResponseListener());
Assert.assertTrue(callbackLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(success.get());
}
@Test
public void testAbortOnContent() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
OutputStream output = response.getOutputStream();
output.write(1);
output.flush();
}
});
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.onResponseContent(new Response.ContentListener()
{
@Override
public void onContent(Response response, ByteBuffer content)
{
abort(response);
}
})
.send(new TestResponseListener());
Assert.assertTrue(callbackLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(success.get());
}
private void abort(final Response response)
{
Logger logger = Log.getLogger(getClass());
new Thread("abort")
{
@Override
public void run()
{
response.abort(new Exception());
}
}.start();
try
{
// The failure callback must be executed asynchronously.
boolean latched = failureLatch.await(4, TimeUnit.SECONDS);
success.set(latched);
logger.info("SIMON - STEP 1");
// The complete callback must not be executed
// until we return from this callback.
latched = completeLatch.await(1, TimeUnit.SECONDS);
success.set(!latched);
logger.info("SIMON - STEP 2");
callbackLatch.countDown();
}
catch (InterruptedException x)
{
throw new RuntimeException(x);
}
}
private class TestResponseListener extends Response.Listener.Adapter
{
@Override
public void onFailure(Response response, Throwable failure)
{
failureLatch.countDown();
}
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
completeLatch.countDown();
}
}
}

View File

@ -18,23 +18,18 @@
package org.eclipse.jetty.client.http;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.zip.GZIPOutputStream;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.http.HttpFields;
@ -205,62 +200,4 @@ public class HttpReceiverOverHTTPTest
Assert.assertTrue(e.getCause() instanceof HttpResponseException);
}
}
@Test
public void test_Receive_GZIPResponseContent_Fragmented() throws Exception
{
byte[] data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (GZIPOutputStream gzipOutput = new GZIPOutputStream(baos))
{
gzipOutput.write(data);
}
byte[] gzip = baos.toByteArray();
endPoint.setInput("" +
"HTTP/1.1 200 OK\r\n" +
"Content-Length: " + gzip.length + "\r\n" +
"Content-Encoding: gzip\r\n" +
"\r\n");
HttpRequest request = (HttpRequest)client.newRequest("http://localhost");
final CountDownLatch latch = new CountDownLatch(1);
FutureResponseListener listener = new FutureResponseListener(request)
{
@Override
public void onContent(Response response, ByteBuffer content)
{
boolean hadRemaining=content.hasRemaining();
super.onContent(response, content);
// TODO gzip decoding can pass on empty chunks. Currently ignoring them here, but could be done at the decoder???
if (hadRemaining) // Ignore empty chunks
latch.countDown();
}
};
HttpExchange exchange = new HttpExchange(destination, request, Collections.<Response.ResponseListener>singletonList(listener));
connection.getHttpChannel().associate(exchange);
exchange.requestComplete();
exchange.terminateRequest(null);
connection.getHttpChannel().receive();
endPoint.reset();
ByteBuffer buffer = ByteBuffer.wrap(gzip);
int fragment = buffer.limit() - 1;
buffer.limit(fragment);
endPoint.setInput(buffer);
connection.getHttpChannel().receive();
endPoint.reset();
buffer.limit(gzip.length);
buffer.position(fragment);
endPoint.setInput(buffer);
connection.getHttpChannel().receive();
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatus());
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertArrayEquals(data, response.getContent());
}
}

View File

@ -28,13 +28,13 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ForkInvoker;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Logger;
public abstract class ProxyConnection extends AbstractConnection
{
protected static final Logger LOG = ConnectHandler.LOG;
private final ForkInvoker<Void> invoker = new ProxyForkInvoker();
private final IteratingCallback pipe = new ProxyIteratingCallback();
private final ByteBufferPool bufferPool;
private final ConcurrentMap<String, Object> context;
private Connection connection;
@ -69,52 +69,7 @@ public abstract class ProxyConnection extends AbstractConnection
@Override
public void onFillable()
{
final ByteBuffer buffer = getByteBufferPool().acquire(getInputBufferSize(), true);
try
{
final int filled = read(getEndPoint(), buffer);
if (LOG.isDebugEnabled())
LOG.debug("{} filled {} bytes", this, filled);
if (filled > 0)
{
write(getConnection().getEndPoint(), buffer, new Callback()
{
@Override
public void succeeded()
{
if (LOG.isDebugEnabled())
LOG.debug("{} wrote {} bytes", this, filled);
bufferPool.release(buffer);
invoker.invoke(null);
}
@Override
public void failed(Throwable x)
{
LOG.debug(this + " failed to write " + filled + " bytes", x);
bufferPool.release(buffer);
connection.close();
}
});
}
else if (filled == 0)
{
bufferPool.release(buffer);
fillInterested();
}
else
{
bufferPool.release(buffer);
connection.getEndPoint().shutdownOutput();
}
}
catch (IOException x)
{
LOG.debug(this + " could not fill", x);
bufferPool.release(buffer);
close();
connection.close();
}
pipe.iterate();
}
protected abstract int read(EndPoint endPoint, ByteBuffer buffer) throws IOException;
@ -130,29 +85,73 @@ public abstract class ProxyConnection extends AbstractConnection
getEndPoint().getRemoteAddress().getPort());
}
private class ProxyForkInvoker extends ForkInvoker<Void> implements Runnable
private class ProxyIteratingCallback extends IteratingCallback
{
private ProxyForkInvoker()
private ByteBuffer buffer;
private int filled;
@Override
protected Action process() throws Exception
{
super(4);
buffer = bufferPool.acquire(getInputBufferSize(), true);
try
{
int filled = this.filled = read(getEndPoint(), buffer);
if (LOG.isDebugEnabled())
LOG.debug("{} filled {} bytes", ProxyConnection.this, filled);
if (filled > 0)
{
write(connection.getEndPoint(), buffer, this);
return Action.SCHEDULED;
}
else if (filled == 0)
{
bufferPool.release(buffer);
fillInterested();
return Action.IDLE;
}
else
{
bufferPool.release(buffer);
connection.getEndPoint().shutdownOutput();
return Action.SUCCEEDED;
}
}
catch (IOException x)
{
LOG.debug(ProxyConnection.this + " could not fill", x);
disconnect();
return Action.SUCCEEDED;
}
}
@Override
public void fork(Void arg)
public void succeeded()
{
getExecutor().execute(this);
}
@Override
public void run()
{
onFillable();
if (LOG.isDebugEnabled())
LOG.debug("{} wrote {} bytes", ProxyConnection.this, filled);
bufferPool.release(buffer);
super.succeeded();
}
@Override
public void call(Void arg)
protected void onCompleteSuccess()
{
onFillable();
}
@Override
protected void onCompleteFailure(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug(ProxyConnection.this + " failed to write " + filled + " bytes", x);
disconnect();
}
private void disconnect()
{
bufferPool.release(buffer);
ProxyConnection.this.close();
connection.close();
}
}
}

View File

@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.servlet.AsyncContext;
import javax.servlet.ServletConfig;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.UnavailableException;
import javax.servlet.http.HttpServlet;
@ -516,10 +517,6 @@ public class ProxyServlet extends HttpServlet
protected void onResponseHeaders(HttpServletRequest request, HttpServletResponse response, Response proxyResponse)
{
// Clear the response headers in case it comes with predefined ones.
for (String name : response.getHeaderNames())
response.setHeader(name, null);
for (HttpField field : proxyResponse.getHeaders())
{
String headerName = field.getName();
@ -559,8 +556,15 @@ public class ProxyServlet extends HttpServlet
protected void onResponseFailure(HttpServletRequest request, HttpServletResponse response, Response proxyResponse, Throwable failure)
{
_log.debug(getRequestId(request) + " proxying failed", failure);
if (!response.isCommitted())
if (response.isCommitted())
{
request.setAttribute("org.eclipse.jetty.server.Response.failure", failure);
AsyncContext asyncContext = request.getAsyncContext();
asyncContext.complete();
}
else
{
response.resetBuffer();
if (failure instanceof TimeoutException)
response.setStatus(HttpServletResponse.SC_GATEWAY_TIMEOUT);
else

View File

@ -19,8 +19,10 @@
package org.eclipse.jetty.proxy;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.ConnectException;
@ -30,6 +32,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -42,7 +45,14 @@ import java.util.zip.GZIPOutputStream;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.DispatcherType;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
@ -55,11 +65,16 @@ import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
@ -95,6 +110,7 @@ public class ProxyServletTest
private HttpClient client;
private Server proxy;
private ServerConnector proxyConnector;
private ServletContextHandler proxyContext;
private ProxyServlet proxyServlet;
private Server server;
private ServerConnector serverConnector;
@ -112,13 +128,20 @@ public class ProxyServletTest
private void prepareProxy(Map<String, String> initParams) throws Exception
{
proxy = new Server();
proxyConnector = new ServerConnector(proxy);
HttpConfiguration configuration = new HttpConfiguration();
configuration.setSendDateHeader(false);
configuration.setSendServerVersion(false);
String value = initParams.get("outputBufferSize");
if (value != null)
configuration.setOutputBufferSize(Integer.valueOf(value));
proxyConnector = new ServerConnector(proxy, new HttpConnectionFactory(configuration));
proxy.addConnector(proxyConnector);
ServletContextHandler proxyCtx = new ServletContextHandler(proxy, "/", true, false);
proxyContext = new ServletContextHandler(proxy, "/", true, false);
ServletHolder proxyServletHolder = new ServletHolder(proxyServlet);
proxyServletHolder.setInitParameters(initParams);
proxyCtx.addServlet(proxyServletHolder, "/*");
proxyContext.addServlet(proxyServletHolder, "/*");
proxy.start();
@ -899,5 +922,184 @@ public class ProxyServletTest
Assert.assertTrue(response3.getHeaders().containsKey(PROXIED_HEADER));
}
@Test
public void testProxyRequestFailureInTheMiddleOfProxyingSmallContent() throws Exception
{
final long proxyTimeout = 1000;
Map<String, String> proxyParams = new HashMap<>();
proxyParams.put("timeout", String.valueOf(proxyTimeout));
prepareProxy(proxyParams);
final CountDownLatch chunk1Latch = new CountDownLatch(1);
final int chunk1 = 'q';
final int chunk2 = 'w';
prepareServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
ServletOutputStream output = response.getOutputStream();
output.write(chunk1);
response.flushBuffer();
// Wait for the client to receive this chunk.
await(chunk1Latch, 5000);
// Send second chunk, must not be received by proxy.
output.write(chunk2);
}
private boolean await(CountDownLatch latch, long ms) throws IOException
{
try
{
return latch.await(ms, TimeUnit.MILLISECONDS);
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
});
HttpClient client = prepareClient();
InputStreamResponseListener listener = new InputStreamResponseListener();
int port = serverConnector.getLocalPort();
client.newRequest("localhost", port).send(listener);
// Make the proxy request fail; given the small content, the
// proxy-to-client response is not committed yet so it will be reset.
TimeUnit.MILLISECONDS.sleep(2 * proxyTimeout);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(504, response.getStatus());
// Make sure there is no content, as the proxy-to-client response has been reset.
InputStream input = listener.getInputStream();
Assert.assertEquals(-1, input.read());
chunk1Latch.countDown();
// Result succeeds because a 504 is a valid HTTP response.
Result result = listener.await(5, TimeUnit.SECONDS);
Assert.assertTrue(result.isSucceeded());
// Make sure the proxy does not receive chunk2.
Assert.assertEquals(-1, input.read());
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination("http", "localhost", port);
Assert.assertEquals(0, destination.getConnectionPool().getIdleConnections().size());
}
@Test
public void testProxyRequestFailureInTheMiddleOfProxyingBigContent() throws Exception
{
final long proxyTimeout = 1000;
int outputBufferSize = 1024;
Map<String, String> proxyParams = new HashMap<>();
proxyParams.put("timeout", String.valueOf(proxyTimeout));
proxyParams.put("outputBufferSize", String.valueOf(outputBufferSize));
prepareProxy(proxyParams);
final CountDownLatch chunk1Latch = new CountDownLatch(1);
final byte[] chunk1 = new byte[outputBufferSize];
new Random().nextBytes(chunk1);
final int chunk2 = 'w';
prepareServer(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
ServletOutputStream output = response.getOutputStream();
output.write(chunk1);
response.flushBuffer();
// Wait for the client to receive this chunk.
await(chunk1Latch, 5000);
// Send second chunk, must not be received by proxy.
output.write(chunk2);
}
private boolean await(CountDownLatch latch, long ms) throws IOException
{
try
{
return latch.await(ms, TimeUnit.MILLISECONDS);
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
});
HttpClient client = prepareClient();
InputStreamResponseListener listener = new InputStreamResponseListener();
int port = serverConnector.getLocalPort();
client.newRequest("localhost", port).send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(200, response.getStatus());
InputStream input = listener.getInputStream();
for (int i = 0; i < chunk1.length; ++i)
Assert.assertEquals(chunk1[i] & 0xFF, input.read());
TimeUnit.MILLISECONDS.sleep(2 * proxyTimeout);
chunk1Latch.countDown();
try
{
// Make sure the proxy does not receive chunk2.
input.read();
Assert.fail();
}
catch (EOFException x)
{
// Expected
}
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination("http", "localhost", port);
Assert.assertEquals(0, destination.getConnectionPool().getIdleConnections().size());
}
@Test
public void testResponseHeadersAreNotRemoved() throws Exception
{
prepareProxy();
proxyContext.stop();
final String headerName = "X-Test";
final String headerValue = "test-value";
proxyContext.addFilter(new FilterHolder(new Filter()
{
@Override
public void init(FilterConfig filterConfig) throws ServletException
{
}
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException
{
((HttpServletResponse)response).addHeader(headerName, headerValue);
chain.doFilter(request, response);
}
@Override
public void destroy()
{
}
}), "/*", EnumSet.of(DispatcherType.REQUEST));
proxyContext.start();
prepareServer(new EmptyHttpServlet());
HttpClient client = prepareClient();
ContentResponse response = client.newRequest("localhost", serverConnector.getLocalPort()).send();
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(headerValue, response.getHeaders().get(headerName));
}
// TODO: test proxy authentication
}

View File

@ -353,10 +353,27 @@ public class HttpChannel implements Runnable
_state.completed();
if (!_response.isCommitted() && !_request.isHandled())
{
_response.sendError(404);
}
else
// Complete generating the response
_response.closeOutput();
{
// There is no way in the Servlet API to directly close a connection,
// so we rely on applications to pass this attribute to signal they
// want to hard close the connection, without even closing the output.
Object failure = _request.getAttribute("org.eclipse.jetty.server.Response.failure");
if (failure != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Explicit response failure", failure);
failed();
}
else
{
// Complete generating the response
_response.closeOutput();
}
}
}
catch(EofException|ClosedChannelException e)
{

View File

@ -24,7 +24,6 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import org.eclipse.jetty.http.AbstractMetaData;
import org.eclipse.jetty.http.FinalMetaData;
import org.eclipse.jetty.http.HostPortHttpField;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
@ -44,9 +43,6 @@ import org.eclipse.jetty.io.EndPoint;
*/
class HttpChannelOverHttp extends HttpChannel implements HttpParser.RequestHandler, HttpParser.ProxyHandler
{
/**
*
*/
private final HttpConnection _httpConnection;
private String _method;
private String _uri;
@ -100,7 +96,6 @@ class HttpChannelOverHttp extends HttpChannel implements HttpParser.RequestHandl
{
return _fields;
}
};
public HttpChannelOverHttp(HttpConnection httpConnection, Connector connector, HttpConfiguration config, EndPoint endPoint, HttpTransport transport, HttpInput input)
@ -140,7 +135,7 @@ class HttpChannelOverHttp extends HttpChannel implements HttpParser.RequestHandl
public boolean startRequest(String method, String uri, HttpVersion version)
{
_method=method;
_uri=uri.toString();
_uri=uri;
_version=version;
_expect = false;
_expect100Continue = false;
@ -158,7 +153,6 @@ class HttpChannelOverHttp extends HttpChannel implements HttpParser.RequestHandl
request.setServerPort(dPort);
request.setRemoteAddr(InetSocketAddress.createUnresolved(sAddr,sPort));
}
@Override
public void parsedHeader(HttpField field)
@ -255,7 +249,6 @@ class HttpChannelOverHttp extends HttpChannel implements HttpParser.RequestHandl
}
}
@Override
public void earlyEOF()
{
@ -356,9 +349,9 @@ class HttpChannelOverHttp extends HttpChannel implements HttpParser.RequestHandl
@Override
public void failed()
{
_httpConnection._generator.setPersistent(false);
getEndPoint().shutdownOutput();
}
@Override
public boolean messageComplete()
@ -367,10 +360,6 @@ class HttpChannelOverHttp extends HttpChannel implements HttpParser.RequestHandl
return false;
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.http.HttpParser.HttpHandler#getHeaderCacheSize()
*/
@Override
public int getHeaderCacheSize()
{

View File

@ -450,7 +450,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
_sendCallback.iterate();
}
private class SendCallback extends IteratingCallback
{
private ResponseInfo _info;
@ -623,7 +622,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
@Override
public String toString()
{
return String.format("%s[i=%s,cb=%s]",super.toString(),getState(),_info,_callback);
return String.format("%s[i=%s,cb=%s]",super.toString(),_info,_callback);
}
}

View File

@ -23,7 +23,9 @@ import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
@ -35,27 +37,27 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.continuation.ContinuationListener;
import org.eclipse.jetty.continuation.ContinuationSupport;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* Quality of Service Filter.
*
* <p/>
* This filter limits the number of active requests to the number set by the "maxRequests" init parameter (default 10).
* If more requests are received, they are suspended and placed on priority queues. Priorities are determined by
* the {@link #getPriority(ServletRequest)} method and are a value between 0 and the value given by the "maxPriority"
* init parameter (default 10), with higher values having higher priority.
* </p><p>
* <p/>
* This filter is ideal to prevent wasting threads waiting for slow/limited
* resources such as a JDBC connection pool. It avoids the situation where all of a
* containers thread pool may be consumed blocking on such a slow resource.
* By limiting the number of active threads, a smaller thread pool may be used as
* the threads are not wasted waiting. Thus more memory may be available for use by
* the active threads.
* </p><p>
* <p/>
* Furthermore, this filter uses a priority when resuming waiting requests. So that if
* a container is under load, and there are many requests waiting for resources,
* the {@link #getPriority(ServletRequest)} method is used, so that more important
@ -63,175 +65,168 @@ import org.eclipse.jetty.util.annotation.ManagedObject;
* maxRequest limit slightly smaller than the containers thread pool and a high priority
* allocated to admin users. Thus regardless of load, admin users would always be
* able to access the web application.
* </p><p>
* <p/>
* The maxRequest limit is policed by a {@link Semaphore} and the filter will wait a short while attempting to acquire
* the semaphore. This wait is controlled by the "waitMs" init parameter and allows the expense of a suspend to be
* avoided if the semaphore is shortly available. If the semaphore cannot be obtained, the request will be suspended
* for the default suspend period of the container or the valued set as the "suspendMs" init parameter.
* </p><p>
* <p/>
* If the "managedAttr" init parameter is set to true, then this servlet is set as a {@link ServletContext} attribute with the
* filter name as the attribute name. This allows context external mechanism (eg JMX via {@link ContextHandler#MANAGED_ATTRIBUTES}) to
* manage the configuration of the filter.
* </p>
*
*
*/
@ManagedObject("Quality of Service Filter")
public class QoSFilter implements Filter
{
final static int __DEFAULT_MAX_PRIORITY=10;
final static int __DEFAULT_PASSES=10;
final static int __DEFAULT_WAIT_MS=50;
final static long __DEFAULT_TIMEOUT_MS = -1;
private static final Logger LOG = Log.getLogger(QoSFilter.class);
final static String MANAGED_ATTR_INIT_PARAM="managedAttr";
final static String MAX_REQUESTS_INIT_PARAM="maxRequests";
final static String MAX_PRIORITY_INIT_PARAM="maxPriority";
final static String MAX_WAIT_INIT_PARAM="waitMs";
final static String SUSPEND_INIT_PARAM="suspendMs";
static final int __DEFAULT_MAX_PRIORITY = 10;
static final int __DEFAULT_PASSES = 10;
static final int __DEFAULT_WAIT_MS = 50;
static final long __DEFAULT_TIMEOUT_MS = -1;
ServletContext _context;
protected long _waitMs;
protected long _suspendMs;
protected int _maxRequests;
static final String MANAGED_ATTR_INIT_PARAM = "managedAttr";
static final String MAX_REQUESTS_INIT_PARAM = "maxRequests";
static final String MAX_PRIORITY_INIT_PARAM = "maxPriority";
static final String MAX_WAIT_INIT_PARAM = "waitMs";
static final String SUSPEND_INIT_PARAM = "suspendMs";
private final String _suspended = "QoSFilter@" + Integer.toHexString(hashCode()) + ".SUSPENDED";
private final String _resumed = "QoSFilter@" + Integer.toHexString(hashCode()) + ".RESUMED";
private long _waitMs;
private long _suspendMs;
private int _maxRequests;
private Semaphore _passes;
private Queue<Continuation>[] _queue;
private ContinuationListener[] _listener;
private String _suspended="QoSFilter@"+this.hashCode();
private Queue<AsyncContext>[] _queues;
private AsyncListener[] _listeners;
/* ------------------------------------------------------------ */
/**
* @see javax.servlet.Filter#init(javax.servlet.FilterConfig)
*/
public void init(FilterConfig filterConfig)
{
_context=filterConfig.getServletContext();
int max_priority=__DEFAULT_MAX_PRIORITY;
if (filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM)!=null)
max_priority=Integer.parseInt(filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM));
_queue=new Queue[max_priority+1];
_listener = new ContinuationListener[max_priority + 1];
for (int p=0;p<_queue.length;p++)
int max_priority = __DEFAULT_MAX_PRIORITY;
if (filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM) != null)
max_priority = Integer.parseInt(filterConfig.getInitParameter(MAX_PRIORITY_INIT_PARAM));
_queues = new Queue[max_priority + 1];
_listeners = new AsyncListener[_queues.length];
for (int p = 0; p < _queues.length; ++p)
{
_queue[p]=new ConcurrentLinkedQueue<Continuation>();
final int priority=p;
_listener[p] = new ContinuationListener()
{
public void onComplete(Continuation continuation)
{}
public void onTimeout(Continuation continuation)
{
_queue[priority].remove(continuation);
}
};
_queues[p] = new ConcurrentLinkedQueue<>();
_listeners[p] = new QoSAsyncListener(p);
}
int maxRequests=__DEFAULT_PASSES;
if (filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM)!=null)
maxRequests=Integer.parseInt(filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM));
_passes=new Semaphore(maxRequests,true);
int maxRequests = __DEFAULT_PASSES;
if (filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM) != null)
maxRequests = Integer.parseInt(filterConfig.getInitParameter(MAX_REQUESTS_INIT_PARAM));
_passes = new Semaphore(maxRequests, true);
_maxRequests = maxRequests;
long wait = __DEFAULT_WAIT_MS;
if (filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM)!=null)
wait=Integer.parseInt(filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM));
_waitMs=wait;
if (filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM) != null)
wait = Integer.parseInt(filterConfig.getInitParameter(MAX_WAIT_INIT_PARAM));
_waitMs = wait;
long suspend = __DEFAULT_TIMEOUT_MS;
if (filterConfig.getInitParameter(SUSPEND_INIT_PARAM)!=null)
suspend=Integer.parseInt(filterConfig.getInitParameter(SUSPEND_INIT_PARAM));
_suspendMs=suspend;
if (filterConfig.getInitParameter(SUSPEND_INIT_PARAM) != null)
suspend = Integer.parseInt(filterConfig.getInitParameter(SUSPEND_INIT_PARAM));
_suspendMs = suspend;
if (_context!=null && Boolean.parseBoolean(filterConfig.getInitParameter(MANAGED_ATTR_INIT_PARAM)))
_context.setAttribute(filterConfig.getFilterName(),this);
ServletContext context = filterConfig.getServletContext();
if (context != null && Boolean.parseBoolean(filterConfig.getInitParameter(MANAGED_ATTR_INIT_PARAM)))
context.setAttribute(filterConfig.getFilterName(), this);
}
/* ------------------------------------------------------------ */
/**
* @see javax.servlet.Filter#doFilter(javax.servlet.ServletRequest, javax.servlet.ServletResponse, javax.servlet.FilterChain)
*/
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException
{
boolean accepted=false;
boolean accepted = false;
try
{
if (request.getAttribute(_suspended)==null)
Boolean suspended = (Boolean)request.getAttribute(_suspended);
if (suspended == null)
{
accepted=_passes.tryAcquire(_waitMs,TimeUnit.MILLISECONDS);
accepted = _passes.tryAcquire(getWaitMs(), TimeUnit.MILLISECONDS);
if (accepted)
{
request.setAttribute(_suspended,Boolean.FALSE);
request.setAttribute(_suspended, Boolean.FALSE);
if (LOG.isDebugEnabled())
LOG.debug("Accepted {}", request);
}
else
{
request.setAttribute(_suspended,Boolean.TRUE);
request.setAttribute(_suspended, Boolean.TRUE);
int priority = getPriority(request);
Continuation continuation = ContinuationSupport.getContinuation(request);
if (_suspendMs>0)
continuation.setTimeout(_suspendMs);
continuation.suspend();
continuation.addContinuationListener(_listener[priority]);
_queue[priority].add(continuation);
AsyncContext asyncContext = request.startAsync();
long suspendMs = getSuspendMs();
if (suspendMs > 0)
asyncContext.setTimeout(suspendMs);
asyncContext.addListener(_listeners[priority]);
_queues[priority].add(asyncContext);
if (LOG.isDebugEnabled())
LOG.debug("Suspended {}", request);
return;
}
}
else
{
Boolean suspended=(Boolean)request.getAttribute(_suspended);
if (suspended.booleanValue())
if (suspended)
{
request.setAttribute(_suspended,Boolean.FALSE);
if (request.getAttribute("javax.servlet.resumed")==Boolean.TRUE)
request.setAttribute(_suspended, Boolean.FALSE);
Boolean resumed = (Boolean)request.getAttribute(_resumed);
if (resumed == Boolean.TRUE)
{
_passes.acquire();
accepted=true;
accepted = true;
if (LOG.isDebugEnabled())
LOG.debug("Resumed {}", request);
}
else
{
// Timeout! try 1 more time.
accepted = _passes.tryAcquire(_waitMs,TimeUnit.MILLISECONDS);
accepted = _passes.tryAcquire(getWaitMs(), TimeUnit.MILLISECONDS);
if (LOG.isDebugEnabled())
LOG.debug("Timeout {}", request);
}
}
else
{
// pass through resume of previously accepted request
// Pass through resume of previously accepted request.
_passes.acquire();
accepted = true;
if (LOG.isDebugEnabled())
LOG.debug("Passthrough {}", request);
}
}
if (accepted)
{
chain.doFilter(request,response);
chain.doFilter(request, response);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Rejected {}", request);
((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
}
}
catch(InterruptedException e)
catch (InterruptedException e)
{
_context.log("QoS",e);
((HttpServletResponse)response).sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
}
finally
{
if (accepted)
{
for (int p=_queue.length;p-->0;)
for (int p = _queues.length - 1; p >= 0; --p)
{
Continuation continutaion=_queue[p].poll();
if (continutaion!=null && continutaion.isSuspended())
AsyncContext asyncContext = _queues[p].poll();
if (asyncContext != null)
{
continutaion.resume();
break;
ServletRequest candidate = asyncContext.getRequest();
Boolean suspended = (Boolean)candidate.getAttribute(_suspended);
if (suspended == Boolean.TRUE)
{
candidate.setAttribute(_resumed, Boolean.TRUE);
asyncContext.dispatch();
break;
}
}
}
_passes.release();
@ -240,40 +235,40 @@ public class QoSFilter implements Filter
}
/**
* Get the request Priority.
* <p> The default implementation assigns the following priorities:<ul>
* <li> 2 - for a authenticated request
* <li> 1 - for a request with valid /non new session
* Computes the request priority.
* <p/>
* The default implementation assigns the following priorities:
* <ul>
* <li> 2 - for an authenticated request
* <li> 1 - for a request with valid / non new session
* <li> 0 - for all other requests.
* </ul>
* This method may be specialised to provide application specific priorities.
* This method may be overridden to provide application specific priorities.
*
* @param request
* @return the request priority
* @param request the incoming request
* @return the computed request priority
*/
protected int getPriority(ServletRequest request)
{
HttpServletRequest baseRequest = (HttpServletRequest)request;
if (baseRequest.getUserPrincipal() != null )
if (baseRequest.getUserPrincipal() != null)
{
return 2;
}
else
{
HttpSession session = baseRequest.getSession(false);
if (session!=null && !session.isNew())
if (session != null && !session.isNew())
return 1;
else
return 0;
}
}
public void destroy()
{
}
/* ------------------------------------------------------------ */
/**
* @see javax.servlet.Filter#destroy()
*/
public void destroy(){}
/* ------------------------------------------------------------ */
/**
* Get the (short) amount of time (in milliseconds) that the filter would wait
* for the semaphore to become available before suspending a request.
@ -286,7 +281,6 @@ public class QoSFilter implements Filter
return _waitMs;
}
/* ------------------------------------------------------------ */
/**
* Set the (short) amount of time (in milliseconds) that the filter would wait
* for the semaphore to become available before suspending a request.
@ -298,7 +292,6 @@ public class QoSFilter implements Filter
_waitMs = value;
}
/* ------------------------------------------------------------ */
/**
* Get the amount of time (in milliseconds) that the filter would suspend
* a request for while waiting for the semaphore to become available.
@ -311,7 +304,6 @@ public class QoSFilter implements Filter
return _suspendMs;
}
/* ------------------------------------------------------------ */
/**
* Set the amount of time (in milliseconds) that the filter would suspend
* a request for while waiting for the semaphore to become available.
@ -323,7 +315,6 @@ public class QoSFilter implements Filter
_suspendMs = value;
}
/* ------------------------------------------------------------ */
/**
* Get the maximum number of requests allowed to be processed
* at the same time.
@ -336,7 +327,6 @@ public class QoSFilter implements Filter
return _maxRequests;
}
/* ------------------------------------------------------------ */
/**
* Set the maximum number of requests allowed to be processed
* at the same time.
@ -345,8 +335,40 @@ public class QoSFilter implements Filter
*/
public void setMaxRequests(int value)
{
_passes = new Semaphore((value-_maxRequests+_passes.availablePermits()), true);
_passes = new Semaphore((value - getMaxRequests() + _passes.availablePermits()), true);
_maxRequests = value;
}
private class QoSAsyncListener implements AsyncListener
{
private final int priority;
public QoSAsyncListener(int priority)
{
this.priority = priority;
}
@Override
public void onStartAsync(AsyncEvent event) throws IOException
{
}
@Override
public void onComplete(AsyncEvent event) throws IOException
{
}
@Override
public void onTimeout(AsyncEvent event) throws IOException
{
// Remove before it's redispatched, so it won't be
// redispatched again in the finally block below.
_queues[priority].remove(event.getAsyncContext());
}
@Override
public void onError(AsyncEvent event) throws IOException
{
}
}
}

View File

@ -18,16 +18,12 @@
package org.eclipse.jetty.servlets;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.URL;
import java.util.EnumSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.DispatcherType;
import javax.servlet.Servlet;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.http.HttpServlet;
@ -64,14 +60,14 @@ public class QoSFilterTest
_tester = new ServletTester();
_tester.setContextPath("/context");
_tester.addServlet(TestServlet.class, "/test");
TestServlet.__maxSleepers=0;
TestServlet.__sleepers=0;
TestServlet.__maxSleepers = 0;
TestServlet.__sleepers = 0;
_connectors = new LocalConnector[NUM_CONNECTIONS];
for(int i = 0; i < _connectors.length; ++i)
for (int i = 0; i < _connectors.length; ++i)
_connectors[i] = _tester.createLocalConnector();
_doneRequests = new CountDownLatch(NUM_CONNECTIONS*NUM_LOOPS);
_doneRequests = new CountDownLatch(NUM_CONNECTIONS * NUM_LOOPS);
_tester.start();
}
@ -85,17 +81,17 @@ public class QoSFilterTest
@Test
public void testNoFilter() throws Exception
{
for(int i = 0; i < NUM_CONNECTIONS; ++i )
for (int i = 0; i < NUM_CONNECTIONS; ++i)
{
new Thread(new Worker(i)).start();
}
_doneRequests.await(10,TimeUnit.SECONDS);
_doneRequests.await(10, TimeUnit.SECONDS);
if (TestServlet.__maxSleepers<=MAX_QOS)
if (TestServlet.__maxSleepers <= MAX_QOS)
LOG.warn("TEST WAS NOT PARALLEL ENOUGH!");
else
Assert.assertThat(TestServlet.__maxSleepers,Matchers.lessThanOrEqualTo(NUM_CONNECTIONS));
Assert.assertThat(TestServlet.__maxSleepers, Matchers.lessThanOrEqualTo(NUM_CONNECTIONS));
}
@Test
@ -103,19 +99,19 @@ public class QoSFilterTest
{
FilterHolder holder = new FilterHolder(QoSFilter2.class);
holder.setAsyncSupported(true);
holder.setInitParameter(QoSFilter.MAX_REQUESTS_INIT_PARAM, ""+MAX_QOS);
_tester.getContext().getServletHandler().addFilterWithMapping(holder,"/*",EnumSet.of(DispatcherType.REQUEST,DispatcherType.ASYNC));
holder.setInitParameter(QoSFilter.MAX_REQUESTS_INIT_PARAM, "" + MAX_QOS);
_tester.getContext().getServletHandler().addFilterWithMapping(holder, "/*", EnumSet.of(DispatcherType.REQUEST, DispatcherType.ASYNC));
for(int i = 0; i < NUM_CONNECTIONS; ++i )
for (int i = 0; i < NUM_CONNECTIONS; ++i)
{
new Thread(new Worker(i)).start();
}
_doneRequests.await(10,TimeUnit.SECONDS);
if (TestServlet.__maxSleepers<MAX_QOS)
_doneRequests.await(10, TimeUnit.SECONDS);
if (TestServlet.__maxSleepers < MAX_QOS)
LOG.warn("TEST WAS NOT PARALLEL ENOUGH!");
else
Assert.assertEquals(TestServlet.__maxSleepers,MAX_QOS);
Assert.assertEquals(TestServlet.__maxSleepers, MAX_QOS);
}
@Test
@ -123,22 +119,25 @@ public class QoSFilterTest
{
FilterHolder holder = new FilterHolder(QoSFilter2.class);
holder.setAsyncSupported(true);
holder.setInitParameter(QoSFilter.MAX_REQUESTS_INIT_PARAM, ""+MAX_QOS);
_tester.getContext().getServletHandler().addFilterWithMapping(holder,"/*",EnumSet.of(DispatcherType.REQUEST,DispatcherType.ASYNC));
for(int i = 0; i < NUM_CONNECTIONS; ++i )
holder.setInitParameter(QoSFilter.MAX_REQUESTS_INIT_PARAM, String.valueOf(MAX_QOS));
_tester.getContext().getServletHandler().addFilterWithMapping(holder, "/*", EnumSet.of(DispatcherType.REQUEST, DispatcherType.ASYNC));
for (int i = 0; i < NUM_CONNECTIONS; ++i)
{
new Thread(new Worker2(i)).start();
}
_doneRequests.await(20,TimeUnit.SECONDS);
if (TestServlet.__maxSleepers<MAX_QOS)
_doneRequests.await(20, TimeUnit.SECONDS);
if (TestServlet.__maxSleepers < MAX_QOS)
LOG.warn("TEST WAS NOT PARALLEL ENOUGH!");
else
Assert.assertEquals(TestServlet.__maxSleepers,MAX_QOS);
Assert.assertEquals(TestServlet.__maxSleepers, MAX_QOS);
}
class Worker implements Runnable {
class Worker implements Runnable
{
private int _num;
public Worker(int num)
{
_num = num;
@ -147,32 +146,34 @@ public class QoSFilterTest
@Override
public void run()
{
for (int i=0;i<NUM_LOOPS;i++)
for (int i = 0; i < NUM_LOOPS; i++)
{
HttpTester.Request request = HttpTester.newRequest();
request.setMethod("GET");
request.setHeader("host", "tester");
request.setURI("/context/test?priority="+(_num%QoSFilter.__DEFAULT_MAX_PRIORITY));
request.setHeader("num", _num+"");
request.setURI("/context/test?priority=" + (_num % QoSFilter.__DEFAULT_MAX_PRIORITY));
request.setHeader("num", _num + "");
try
{
String responseString = _connectors[_num].getResponses(BufferUtil.toString(request.generate()));
if(responseString.indexOf("HTTP")!=-1)
if (responseString.contains("HTTP"))
{
_doneRequests.countDown();
}
}
catch (Exception x)
{
assertTrue(false);
Assert.assertTrue(false);
}
}
}
}
class Worker2 implements Runnable {
class Worker2 implements Runnable
{
private int _num;
public Worker2(int num)
{
_num = num;
@ -181,28 +182,25 @@ public class QoSFilterTest
@Override
public void run()
{
URL url=null;
URL url = null;
try
{
String addr = _tester.createConnector(true);
for (int i=0;i<NUM_LOOPS;i++)
for (int i = 0; i < NUM_LOOPS; i++)
{
url=new URL(addr+"/context/test?priority="+(_num%QoSFilter.__DEFAULT_MAX_PRIORITY)+"&n="+_num+"&l="+i);
// System.err.println(_num+"-"+i+" Try "+url);
url = new URL(addr + "/context/test?priority=" + (_num % QoSFilter.__DEFAULT_MAX_PRIORITY) + "&n=" + _num + "&l=" + i);
url.getContent();
_doneRequests.countDown();
// System.err.println(_num+"-"+i+" Got "+IO.toString(in)+" "+_doneRequests.getCount());
}
}
catch(Exception e)
catch (Exception e)
{
LOG.warn(String.valueOf(url));
LOG.debug(e);
LOG.debug("Request " + url + " failed", e);
}
}
}
public static class TestServlet extends HttpServlet implements Servlet
public static class TestServlet extends HttpServlet
{
private static int __sleepers;
private static int __maxSleepers;
@ -212,21 +210,18 @@ public class QoSFilterTest
{
try
{
synchronized(TestServlet.class)
synchronized (TestServlet.class)
{
__sleepers++;
if(__sleepers > __maxSleepers)
if (__sleepers > __maxSleepers)
__maxSleepers = __sleepers;
}
Thread.sleep(50);
synchronized(TestServlet.class)
synchronized (TestServlet.class)
{
// System.err.println(_count++);
__sleepers--;
if(__sleepers > __maxSleepers)
__maxSleepers = __sleepers;
}
response.setContentType("text/plain");
@ -234,7 +229,6 @@ public class QoSFilterTest
}
catch (InterruptedException e)
{
e.printStackTrace();
response.sendError(500);
}
}
@ -246,7 +240,7 @@ public class QoSFilterTest
public int getPriority(ServletRequest request)
{
String p = request.getParameter("priority");
if (p!=null)
if (p != null)
return Integer.parseInt(p);
return 0;
}

View File

@ -2,3 +2,4 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
#org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.servlets.LEVEL=DEBUG
#org.eclipse.jetty.servlets.GzipFilter.LEVEL=DEBUG
#org.eclipse.jetty.servlets.QoSFilter.LEVEL=DEBUG

View File

@ -1,135 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
/**
* Utility class that splits calls to {@link #invoke(Object)} into calls to {@link #fork(Object)} or {@link #call(Object)}
* depending on the max number of reentrant calls to {@link #invoke(Object)}.
* <p/>
* This class prevents {@link StackOverflowError}s in case of methods that end up invoking themselves,
* such is common for {@link Callback#succeeded()}.
* <p/>
* Typical use case is:
* <pre>
* public void reentrantMethod(Object param)
* {
* if (condition || tooManyReenters)
* fork(param)
* else
* call(param)
* }
* </pre>
* Calculating {@code tooManyReenters} usually involves using a {@link ThreadLocal} and algebra on the
* number of reentrant invocations, which is factored out in this class for convenience.
* <p />
* The same code using this class becomes:
* <pre>
* private final ForkInvoker invoker = ...;
*
* public void reentrantMethod(Object param)
* {
* invoker.invoke(param);
* }
* </pre>
*
*/
public abstract class ForkInvoker<T>
{
private static final ThreadLocal<Integer> __invocations = new ThreadLocal<Integer>()
{
@Override
protected Integer initialValue()
{
return 0;
}
};
private final int _maxInvocations;
/**
* Creates an instance with the given max number of reentrant calls to {@link #invoke(Object)}
* <p/>
* If {@code maxInvocations} is zero or negative, it is interpreted
* as if the max number of reentrant calls is infinite.
*
* @param maxInvocations the max number of reentrant calls to {@link #invoke(Object)}
*/
public ForkInvoker(int maxInvocations)
{
_maxInvocations = maxInvocations;
}
/**
* Invokes either {@link #fork(Object)} or {@link #call(Object)}.
* If {@link #condition()} returns true, {@link #fork(Object)} is invoked.
* Otherwise, if the max number of reentrant calls is positive and the
* actual number of reentrant invocations exceeds it, {@link #fork(Object)} is invoked.
* Otherwise, {@link #call(Object)} is invoked.
* @param arg TODO
*
* @return true if {@link #fork(Object)} has been called, false otherwise
*/
public boolean invoke(T arg)
{
boolean countInvocations = _maxInvocations > 0;
int invocations = __invocations.get();
if (condition() || countInvocations && invocations > _maxInvocations)
{
fork(arg);
return true;
}
else
{
if (countInvocations)
__invocations.set(invocations + 1);
try
{
call(arg);
return false;
}
finally
{
if (countInvocations)
__invocations.set(invocations);
}
}
}
/**
* Subclasses should override this method returning true if they want
* {@link #invoke(Object)} to call {@link #fork(Object)}.
*
* @return true if {@link #invoke(Object)} should call {@link #fork(Object)}, false otherwise
*/
protected boolean condition()
{
return false;
}
/**
* Executes the forked invocation
* @param arg TODO
*/
public abstract void fork(T arg);
/**
* Executes the direct, non-forked, invocation
* @param arg TODO
*/
public abstract void call(T arg);
}

View File

@ -51,7 +51,6 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public abstract class IteratingCallback implements Callback
{
/**
* The internal states of this callback
*/
@ -87,10 +86,11 @@ public abstract class IteratingCallback implements Callback
*/
FAILED,
/**
* The ICB has been closed and cannot be reset
* This callback has been closed and cannot be reset.
*/
CLOSED
}
/**
* The indication of the overall progress of the overall job that
* implementations of {@link #process()} must return.
@ -124,14 +124,9 @@ public abstract class IteratingCallback implements Callback
protected IteratingCallback(boolean needReset)
{
_state = new AtomicReference<>(needReset?State.SUCCEEDED:State.INACTIVE);
_state = new AtomicReference<>(needReset ? State.SUCCEEDED : State.INACTIVE);
}
protected State getState()
{
return _state.get();
}
/**
* Method called by {@link #iterate()} to process the sub task.
* <p/>
@ -151,13 +146,21 @@ public abstract class IteratingCallback implements Callback
/**
* Invoked when the overall task has completed successfully.
*
* @see #onCompleteFailure(Throwable)
*/
protected abstract void onCompleteSuccess();
protected void onCompleteSuccess()
{
}
/**
* Invoked when the overall task has completely failed.
* Invoked when the overall task has completed with a failure.
*
* @see #onCompleteSuccess()
*/
protected abstract void onCompleteFailure(Throwable x);
protected void onCompleteFailure(Throwable x)
{
}
/**
* This method must be invoked by applications to start the processing
@ -319,9 +322,10 @@ public abstract class IteratingCallback implements Callback
return;
}
case CLOSED:
// too late!
{
// Too late!
return;
}
default:
{
throw new IllegalStateException(toString());
@ -336,26 +340,29 @@ public abstract class IteratingCallback implements Callback
* {@code super.failed(Throwable)}.
*/
@Override
public final void failed(Throwable x)
public void failed(Throwable x)
{
while (true)
{
State current = _state.get();
switch(current)
switch (current)
{
case SUCCEEDED:
case FAILED:
case INACTIVE:
case CLOSED:
{
// Already complete!.
return;
}
default:
{
if (_state.compareAndSet(current, State.FAILED))
{
onCompleteFailure(x);
return;
}
}
}
}
}
@ -365,20 +372,24 @@ public abstract class IteratingCallback implements Callback
while (true)
{
State current = _state.get();
switch(current)
switch (current)
{
case INACTIVE:
case SUCCEEDED:
case FAILED:
{
if (_state.compareAndSet(current, State.CLOSED))
return;
break;
}
default:
{
if (_state.compareAndSet(current, State.CLOSED))
{
onCompleteFailure(new ClosedChannelException());
return;
}
}
}
}
}
@ -413,10 +424,13 @@ public abstract class IteratingCallback implements Callback
return _state.get() == State.SUCCEEDED;
}
/* ------------------------------------------------------------ */
/** Reset the callback
* <p>A callback can only be reset to INACTIVE from the SUCCEEDED or FAILED states or if it is already INACTIVE.
* @return True if the reset was successful
/**
* Resets this callback.
* <p/>
* A callback can only be reset to INACTIVE from the
* SUCCEEDED or FAILED states or if it is already INACTIVE.
*
* @return true if the reset was successful
*/
public boolean reset()
{