Merged branch 'jetty-9.1.x' into 'master'.
This commit is contained in:
commit
3717115059
|
@ -215,6 +215,10 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
|
|||
LOG.debug("Closed {}", this);
|
||||
}
|
||||
|
||||
public void release(Connection connection)
|
||||
{
|
||||
}
|
||||
|
||||
public void close(Connection connection)
|
||||
{
|
||||
}
|
||||
|
|
|
@ -140,8 +140,11 @@ public abstract class PoolingHttpDestination<C extends Connection> extends HttpD
|
|||
|
||||
protected abstract void send(C connection, HttpExchange exchange);
|
||||
|
||||
public void release(C connection)
|
||||
@Override
|
||||
public void release(Connection c)
|
||||
{
|
||||
@SuppressWarnings("unchecked")
|
||||
C connection = (C)c;
|
||||
LOG.debug("{} released", connection);
|
||||
HttpClient client = getHttpClient();
|
||||
if (client.isRunning())
|
||||
|
|
|
@ -28,8 +28,6 @@ import org.eclipse.jetty.fcgi.generator.Flusher;
|
|||
import org.eclipse.jetty.fcgi.generator.Generator;
|
||||
import org.eclipse.jetty.http.HttpField;
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpHeaderValue;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
import org.eclipse.jetty.io.IdleTimeout;
|
||||
|
||||
|
@ -83,42 +81,43 @@ public class HttpChannelOverFCGI extends HttpChannel
|
|||
return receiver.abort(cause);
|
||||
}
|
||||
|
||||
protected void responseBegin(int code, String reason)
|
||||
protected boolean responseBegin(int code, String reason)
|
||||
{
|
||||
HttpExchange exchange = getHttpExchange();
|
||||
if (exchange != null)
|
||||
{
|
||||
exchange.getResponse().version(version).status(code).reason(reason);
|
||||
receiver.responseBegin(exchange);
|
||||
}
|
||||
if (exchange == null)
|
||||
return false;
|
||||
exchange.getResponse().version(version).status(code).reason(reason);
|
||||
return receiver.responseBegin(exchange);
|
||||
}
|
||||
|
||||
protected void responseHeader(HttpField field)
|
||||
protected boolean responseHeader(HttpField field)
|
||||
{
|
||||
HttpExchange exchange = getHttpExchange();
|
||||
if (exchange != null)
|
||||
receiver.responseHeader(exchange, field);
|
||||
return exchange != null && receiver.responseHeader(exchange, field);
|
||||
}
|
||||
|
||||
protected void responseHeaders()
|
||||
protected boolean responseHeaders()
|
||||
{
|
||||
HttpExchange exchange = getHttpExchange();
|
||||
if (exchange != null)
|
||||
receiver.responseHeaders(exchange);
|
||||
return exchange != null && receiver.responseHeaders(exchange);
|
||||
}
|
||||
|
||||
protected void content(ByteBuffer buffer)
|
||||
protected boolean content(ByteBuffer buffer)
|
||||
{
|
||||
HttpExchange exchange = getHttpExchange();
|
||||
if (exchange != null)
|
||||
receiver.responseContent(exchange, buffer);
|
||||
return exchange != null && receiver.responseContent(exchange, buffer);
|
||||
}
|
||||
|
||||
protected void responseSuccess()
|
||||
protected boolean responseSuccess()
|
||||
{
|
||||
HttpExchange exchange = getHttpExchange();
|
||||
if (exchange != null)
|
||||
receiver.responseSuccess(exchange);
|
||||
return exchange != null && receiver.responseSuccess(exchange);
|
||||
}
|
||||
|
||||
protected boolean responseFailure(Throwable failure)
|
||||
{
|
||||
HttpExchange exchange = getHttpExchange();
|
||||
return exchange != null && receiver.responseFailure(failure);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -126,12 +125,10 @@ public class HttpChannelOverFCGI extends HttpChannel
|
|||
{
|
||||
super.exchangeTerminated(result);
|
||||
idle.onClose();
|
||||
boolean close = result.isFailed();
|
||||
HttpFields responseHeaders = result.getResponse().getHeaders();
|
||||
close |= responseHeaders.contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString());
|
||||
if (close)
|
||||
connection.close();
|
||||
else
|
||||
if (result.isFailed())
|
||||
connection.close(result.getFailure());
|
||||
else if (!connection.closeByHTTP(responseHeaders))
|
||||
connection.release(this);
|
||||
}
|
||||
|
||||
|
@ -154,7 +151,8 @@ public class HttpChannelOverFCGI extends HttpChannel
|
|||
@Override
|
||||
protected void onIdleExpired(TimeoutException timeout)
|
||||
{
|
||||
LOG.debug("Idle timeout for request {}", request);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Idle timeout for request {}", request);
|
||||
connection.abort(timeout);
|
||||
}
|
||||
|
||||
|
|
|
@ -69,7 +69,7 @@ public class HttpClientTransportOverFCGI extends AbstractHttpClientTransport
|
|||
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
|
||||
{
|
||||
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
|
||||
HttpConnectionOverFCGI connection = new HttpConnectionOverFCGI(endPoint, destination);
|
||||
HttpConnectionOverFCGI connection = new HttpConnectionOverFCGI(endPoint, destination, isMultiplexed());
|
||||
LOG.debug("Created {}", connection);
|
||||
@SuppressWarnings("unchecked")
|
||||
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.eclipse.jetty.client.HttpClient;
|
|||
import org.eclipse.jetty.client.HttpConnection;
|
||||
import org.eclipse.jetty.client.HttpDestination;
|
||||
import org.eclipse.jetty.client.HttpExchange;
|
||||
import org.eclipse.jetty.client.PoolingHttpDestination;
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.client.api.Request;
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
|
@ -39,6 +38,9 @@ import org.eclipse.jetty.fcgi.FCGI;
|
|||
import org.eclipse.jetty.fcgi.generator.Flusher;
|
||||
import org.eclipse.jetty.fcgi.parser.ClientParser;
|
||||
import org.eclipse.jetty.http.HttpField;
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpHeaderValue;
|
||||
import org.eclipse.jetty.io.AbstractConnection;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
|
@ -55,14 +57,16 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
|
|||
private final AtomicBoolean closed = new AtomicBoolean();
|
||||
private final Flusher flusher;
|
||||
private final HttpDestination destination;
|
||||
private final boolean multiplexed;
|
||||
private final Delegate delegate;
|
||||
private final ClientParser parser;
|
||||
|
||||
public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination)
|
||||
public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, boolean multiplexed)
|
||||
{
|
||||
super(endPoint, destination.getHttpClient().getExecutor(), destination.getHttpClient().isDispatchIO());
|
||||
this.flusher = new Flusher(endPoint);
|
||||
this.destination = destination;
|
||||
this.multiplexed = multiplexed;
|
||||
this.flusher = new Flusher(endPoint);
|
||||
this.delegate = new Delegate(destination);
|
||||
this.parser = new ClientParser(new ResponseListener());
|
||||
requests.addLast(0);
|
||||
|
@ -103,7 +107,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
|
|||
while (true)
|
||||
{
|
||||
int read = endPoint.fill(buffer);
|
||||
if (LOG.isDebugEnabled()) // Avoid boxing of variable 'read'
|
||||
if (LOG.isDebugEnabled()) // Avoid boxing of variable 'read'.
|
||||
LOG.debug("Read {} bytes from {}", read, endPoint);
|
||||
if (read > 0)
|
||||
{
|
||||
|
@ -124,7 +128,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
|
|||
catch (Exception x)
|
||||
{
|
||||
LOG.debug(x);
|
||||
// TODO: fail and close ?
|
||||
close(x);
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
@ -140,7 +144,12 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
|
|||
|
||||
private void shutdown()
|
||||
{
|
||||
close(new EOFException());
|
||||
// Close explicitly only if we are idle, since the request may still
|
||||
// be in progress, otherwise close only if we can fail the responses.
|
||||
if (channels.isEmpty())
|
||||
close();
|
||||
else
|
||||
failAndClose(new EOFException());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -153,13 +162,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
|
|||
protected void release(HttpChannelOverFCGI channel)
|
||||
{
|
||||
channels.remove(channel.getRequest());
|
||||
if (destination instanceof PoolingHttpDestination)
|
||||
{
|
||||
@SuppressWarnings("unchecked")
|
||||
PoolingHttpDestination<HttpConnectionOverFCGI> fcgiDestination =
|
||||
(PoolingHttpDestination<HttpConnectionOverFCGI>)destination;
|
||||
fcgiDestination.release(this);
|
||||
}
|
||||
destination.release(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -168,7 +171,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
|
|||
close(new AsynchronousCloseException());
|
||||
}
|
||||
|
||||
private void close(Throwable failure)
|
||||
protected void close(Throwable failure)
|
||||
{
|
||||
if (closed.compareAndSet(false, true))
|
||||
{
|
||||
|
@ -184,6 +187,16 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
|
|||
}
|
||||
}
|
||||
|
||||
protected boolean closeByHTTP(HttpFields fields)
|
||||
{
|
||||
if (multiplexed)
|
||||
return false;
|
||||
if (!fields.contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString()))
|
||||
return false;
|
||||
close();
|
||||
return true;
|
||||
}
|
||||
|
||||
protected void abort(Throwable failure)
|
||||
{
|
||||
for (HttpChannelOverFCGI channel : channels.values())
|
||||
|
@ -195,6 +208,15 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
|
|||
channels.clear();
|
||||
}
|
||||
|
||||
private void failAndClose(Throwable failure)
|
||||
{
|
||||
boolean result = false;
|
||||
for (HttpChannelOverFCGI channel : channels.values())
|
||||
result |= channel.responseFailure(failure);
|
||||
if (result)
|
||||
close(failure);
|
||||
}
|
||||
|
||||
private int acquireRequest()
|
||||
{
|
||||
synchronized (requests)
|
||||
|
@ -322,8 +344,23 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
|
|||
HttpChannelOverFCGI channel = channels.get(request);
|
||||
if (channel != null)
|
||||
{
|
||||
channel.responseSuccess();
|
||||
releaseRequest(request);
|
||||
if (channel.responseSuccess())
|
||||
releaseRequest(request);
|
||||
}
|
||||
else
|
||||
{
|
||||
noChannel(request);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(int request, Throwable failure)
|
||||
{
|
||||
HttpChannelOverFCGI channel = channels.get(request);
|
||||
if (channel != null)
|
||||
{
|
||||
if (channel.responseFailure(failure))
|
||||
releaseRequest(request);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -88,21 +88,36 @@ public class ServerGenerator extends Generator
|
|||
return generateContent(request, buffer, true, false, callback, FCGI.FrameType.STDOUT);
|
||||
}
|
||||
|
||||
public Result generateResponseContent(int request, ByteBuffer content, boolean lastContent, Callback callback)
|
||||
public Result generateResponseContent(int request, ByteBuffer content, boolean lastContent, boolean aborted, Callback callback)
|
||||
{
|
||||
Result result = generateContent(request, content, false, lastContent, callback, FCGI.FrameType.STDOUT);
|
||||
if (lastContent)
|
||||
if (aborted)
|
||||
{
|
||||
// Generate the FCGI_END_REQUEST
|
||||
request &= 0xFF_FF;
|
||||
ByteBuffer endRequestBuffer = byteBufferPool.acquire(8, false);
|
||||
BufferUtil.clearToFill(endRequestBuffer);
|
||||
endRequestBuffer.putInt(0x01_03_00_00 + request);
|
||||
endRequestBuffer.putInt(0x00_08_00_00);
|
||||
endRequestBuffer.putLong(0x00L);
|
||||
endRequestBuffer.flip();
|
||||
result = result.append(endRequestBuffer, true);
|
||||
Result result = new Result(byteBufferPool, callback);
|
||||
if (lastContent)
|
||||
result.append(generateEndRequest(request, true), true);
|
||||
else
|
||||
result.append(BufferUtil.EMPTY_BUFFER, false);
|
||||
return result;
|
||||
}
|
||||
return result;
|
||||
else
|
||||
{
|
||||
Result result = generateContent(request, content, false, lastContent, callback, FCGI.FrameType.STDOUT);
|
||||
if (lastContent)
|
||||
result.append(generateEndRequest(request, false), true);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
private ByteBuffer generateEndRequest(int request, boolean aborted)
|
||||
{
|
||||
request &= 0xFF_FF;
|
||||
ByteBuffer endRequestBuffer = byteBufferPool.acquire(8, false);
|
||||
BufferUtil.clearToFill(endRequestBuffer);
|
||||
endRequestBuffer.putInt(0x01_03_00_00 + request);
|
||||
endRequestBuffer.putInt(0x00_08_00_00);
|
||||
endRequestBuffer.putInt(aborted ? 1 : 0);
|
||||
endRequestBuffer.putInt(0);
|
||||
endRequestBuffer.flip();
|
||||
return endRequestBuffer;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -98,5 +98,13 @@ public class ClientParser extends Parser
|
|||
for (StreamContentParser streamParser : streamParsers)
|
||||
streamParser.end(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(int request, Throwable failure)
|
||||
{
|
||||
listener.onFailure(request, failure);
|
||||
for (StreamContentParser streamParser : streamParsers)
|
||||
streamParser.end(request);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -107,8 +107,12 @@ public class EndRequestContentParser extends ContentParser
|
|||
|
||||
private void onEnd()
|
||||
{
|
||||
// TODO: if protocol != 0, invoke an error callback
|
||||
listener.onEnd(getRequest());
|
||||
if (application != 0)
|
||||
listener.onFailure(getRequest(), new Exception("FastCGI application returned code " + application));
|
||||
else if (protocol != 0)
|
||||
listener.onFailure(getRequest(), new Exception("FastCGI server returned code " + protocol));
|
||||
else
|
||||
listener.onEnd(getRequest());
|
||||
}
|
||||
|
||||
private void reset()
|
||||
|
|
|
@ -100,6 +100,8 @@ public abstract class Parser
|
|||
|
||||
public void onEnd(int request);
|
||||
|
||||
public void onFailure(int request, Throwable failure);
|
||||
|
||||
public static class Adapter implements Listener
|
||||
{
|
||||
@Override
|
||||
|
@ -121,6 +123,12 @@ public abstract class Parser
|
|||
public void onEnd(int request)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(int request, Throwable failure)
|
||||
{
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -111,7 +111,7 @@ public class ClientParserTest
|
|||
ByteBufferPool byteBufferPool = new MappedByteBufferPool();
|
||||
ServerGenerator generator = new ServerGenerator(byteBufferPool);
|
||||
Generator.Result result1 = generator.generateResponseHeaders(id, 200, "OK", fields, null);
|
||||
Generator.Result result2 = generator.generateResponseContent(id, null, true, null);
|
||||
Generator.Result result2 = generator.generateResponseContent(id, null, true, false, null);
|
||||
|
||||
final AtomicInteger verifier = new AtomicInteger();
|
||||
ClientParser parser = new ClientParser(new ClientParser.Listener.Adapter()
|
||||
|
@ -162,7 +162,7 @@ public class ClientParserTest
|
|||
ByteBufferPool byteBufferPool = new MappedByteBufferPool();
|
||||
ServerGenerator generator = new ServerGenerator(byteBufferPool);
|
||||
Generator.Result result1 = generator.generateResponseHeaders(id, code, "OK", fields, null);
|
||||
Generator.Result result2 = generator.generateResponseContent(id, content, true, null);
|
||||
Generator.Result result2 = generator.generateResponseContent(id, content, true, false, null);
|
||||
|
||||
final AtomicInteger verifier = new AtomicInteger();
|
||||
ClientParser parser = new ClientParser(new ClientParser.Listener.Adapter()
|
||||
|
@ -214,7 +214,7 @@ public class ClientParserTest
|
|||
ByteBufferPool byteBufferPool = new MappedByteBufferPool();
|
||||
ServerGenerator generator = new ServerGenerator(byteBufferPool);
|
||||
Generator.Result result1 = generator.generateResponseHeaders(id, code, "OK", fields, null);
|
||||
Generator.Result result2 = generator.generateResponseContent(id, content, true, null);
|
||||
Generator.Result result2 = generator.generateResponseContent(id, content, true, false, null);
|
||||
|
||||
final AtomicInteger totalLength = new AtomicInteger();
|
||||
final AtomicBoolean verifier = new AtomicBoolean();
|
||||
|
|
|
@ -24,6 +24,8 @@ import org.eclipse.jetty.fcgi.generator.Flusher;
|
|||
import org.eclipse.jetty.fcgi.generator.Generator;
|
||||
import org.eclipse.jetty.fcgi.generator.ServerGenerator;
|
||||
import org.eclipse.jetty.http.HttpGenerator;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpHeaderValue;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.server.HttpTransport;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
|
@ -35,6 +37,8 @@ public class HttpTransportOverFCGI implements HttpTransport
|
|||
private final Flusher flusher;
|
||||
private final int request;
|
||||
private volatile boolean head;
|
||||
private volatile boolean shutdown;
|
||||
private volatile boolean aborted;
|
||||
|
||||
public HttpTransportOverFCGI(ByteBufferPool byteBufferPool, Flusher flusher, int request)
|
||||
{
|
||||
|
@ -47,13 +51,15 @@ public class HttpTransportOverFCGI implements HttpTransport
|
|||
public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback)
|
||||
{
|
||||
boolean head = this.head = info.isHead();
|
||||
boolean shutdown = this.shutdown = info.getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString());
|
||||
|
||||
if (head)
|
||||
{
|
||||
if (lastContent)
|
||||
{
|
||||
Generator.Result headersResult = generator.generateResponseHeaders(request, info.getStatus(), info.getReason(),
|
||||
info.getHttpFields(), new Callback.Adapter());
|
||||
Generator.Result contentResult = generator.generateResponseContent(request, BufferUtil.EMPTY_BUFFER, lastContent, callback);
|
||||
Generator.Result contentResult = generator.generateResponseContent(request, BufferUtil.EMPTY_BUFFER, lastContent, aborted, callback);
|
||||
flusher.flush(headersResult, contentResult);
|
||||
}
|
||||
else
|
||||
|
@ -67,9 +73,12 @@ public class HttpTransportOverFCGI implements HttpTransport
|
|||
{
|
||||
Generator.Result headersResult = generator.generateResponseHeaders(request, info.getStatus(), info.getReason(),
|
||||
info.getHttpFields(), new Callback.Adapter());
|
||||
Generator.Result contentResult = generator.generateResponseContent(request, content, lastContent, callback);
|
||||
Generator.Result contentResult = generator.generateResponseContent(request, content, lastContent, aborted, callback);
|
||||
flusher.flush(headersResult, contentResult);
|
||||
}
|
||||
|
||||
if (lastContent && shutdown)
|
||||
flusher.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -79,7 +88,7 @@ public class HttpTransportOverFCGI implements HttpTransport
|
|||
{
|
||||
if (lastContent)
|
||||
{
|
||||
Generator.Result result = generator.generateResponseContent(request, BufferUtil.EMPTY_BUFFER, lastContent, callback);
|
||||
Generator.Result result = generator.generateResponseContent(request, BufferUtil.EMPTY_BUFFER, lastContent, aborted, callback);
|
||||
flusher.flush(result);
|
||||
}
|
||||
else
|
||||
|
@ -90,18 +99,22 @@ public class HttpTransportOverFCGI implements HttpTransport
|
|||
}
|
||||
else
|
||||
{
|
||||
Generator.Result result = generator.generateResponseContent(request, content, lastContent, callback);
|
||||
Generator.Result result = generator.generateResponseContent(request, content, lastContent, aborted, callback);
|
||||
flusher.flush(result);
|
||||
}
|
||||
|
||||
if (lastContent && shutdown)
|
||||
flusher.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abort()
|
||||
{
|
||||
aborted = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void completed()
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abort()
|
||||
{
|
||||
}
|
||||
}
|
||||
|
|
|
@ -175,5 +175,17 @@ public class ServerFCGIConnection extends AbstractConnection
|
|||
channel.dispatch();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(int request, Throwable failure)
|
||||
{
|
||||
HttpChannelOverFCGI channel = channels.remove(request);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Request {} failure on {}: {}", request, channel, failure);
|
||||
if (channel != null)
|
||||
{
|
||||
channel.badMessage(400, failure.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.net.URI;
|
|||
import java.net.URLEncoder;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -40,6 +41,8 @@ import org.eclipse.jetty.client.api.Request;
|
|||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.client.api.Result;
|
||||
import org.eclipse.jetty.client.util.BytesContentProvider;
|
||||
import org.eclipse.jetty.client.util.DeferredContentProvider;
|
||||
import org.eclipse.jetty.client.util.FutureResponseListener;
|
||||
import org.eclipse.jetty.http.HttpMethod;
|
||||
import org.eclipse.jetty.server.handler.AbstractHandler;
|
||||
import org.eclipse.jetty.toolchain.test.IO;
|
||||
|
@ -551,4 +554,77 @@ public class HttpClientTest extends AbstractHttpClientServerTest
|
|||
|
||||
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEarlyEOF() throws Exception
|
||||
{
|
||||
start(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
// Promise some content, then flush the headers, then fail to send the content.
|
||||
response.setContentLength(16);
|
||||
response.flushBuffer();
|
||||
throw new NullPointerException();
|
||||
}
|
||||
});
|
||||
|
||||
try
|
||||
{
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
Assert.fail();
|
||||
}
|
||||
catch (ExecutionException x)
|
||||
{
|
||||
// Expected.
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSmallContentDelimitedByEOFWithSlowRequest() throws Exception
|
||||
{
|
||||
testContentDelimitedByEOFWithSlowRequest(1024);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBigContentDelimitedByEOFWithSlowRequest() throws Exception
|
||||
{
|
||||
testContentDelimitedByEOFWithSlowRequest(128 * 1024);
|
||||
}
|
||||
|
||||
private void testContentDelimitedByEOFWithSlowRequest(int length) throws Exception
|
||||
{
|
||||
final byte[] data = new byte[length];
|
||||
new Random().nextBytes(data);
|
||||
start(new AbstractHandler()
|
||||
{
|
||||
@Override
|
||||
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
|
||||
{
|
||||
baseRequest.setHandled(true);
|
||||
response.setHeader("Connection", "close");
|
||||
response.getOutputStream().write(data);
|
||||
}
|
||||
});
|
||||
|
||||
DeferredContentProvider content = new DeferredContentProvider(ByteBuffer.wrap(new byte[]{0}));
|
||||
Request request = client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.content(content);
|
||||
FutureResponseListener listener = new FutureResponseListener(request);
|
||||
request.send(listener);
|
||||
// Wait some time to simulate a slow request.
|
||||
Thread.sleep(1000);
|
||||
content.close();
|
||||
|
||||
ContentResponse response = listener.get(5, TimeUnit.SECONDS);
|
||||
|
||||
Assert.assertEquals(200, response.getStatus());
|
||||
Assert.assertArrayEquals(data, response.getContent());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue