diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java index 5a40a2c4061..cc83372c437 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java @@ -215,6 +215,10 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl LOG.debug("Closed {}", this); } + public void release(Connection connection) + { + } + public void close(Connection connection) { } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/PoolingHttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/PoolingHttpDestination.java index 6bba6a58205..73c13ad45da 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/PoolingHttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/PoolingHttpDestination.java @@ -140,8 +140,11 @@ public abstract class PoolingHttpDestination extends HttpD protected abstract void send(C connection, HttpExchange exchange); - public void release(C connection) + @Override + public void release(Connection c) { + @SuppressWarnings("unchecked") + C connection = (C)c; LOG.debug("{} released", connection); HttpClient client = getHttpClient(); if (client.isRunning()) diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java index a6f7de6ec3b..1c33cf0518d 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java @@ -28,8 +28,6 @@ import org.eclipse.jetty.fcgi.generator.Flusher; import org.eclipse.jetty.fcgi.generator.Generator; import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpFields; -import org.eclipse.jetty.http.HttpHeader; -import org.eclipse.jetty.http.HttpHeaderValue; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.io.IdleTimeout; @@ -83,42 +81,43 @@ public class HttpChannelOverFCGI extends HttpChannel return receiver.abort(cause); } - protected void responseBegin(int code, String reason) + protected boolean responseBegin(int code, String reason) { HttpExchange exchange = getHttpExchange(); - if (exchange != null) - { - exchange.getResponse().version(version).status(code).reason(reason); - receiver.responseBegin(exchange); - } + if (exchange == null) + return false; + exchange.getResponse().version(version).status(code).reason(reason); + return receiver.responseBegin(exchange); } - protected void responseHeader(HttpField field) + protected boolean responseHeader(HttpField field) { HttpExchange exchange = getHttpExchange(); - if (exchange != null) - receiver.responseHeader(exchange, field); + return exchange != null && receiver.responseHeader(exchange, field); } - protected void responseHeaders() + protected boolean responseHeaders() { HttpExchange exchange = getHttpExchange(); - if (exchange != null) - receiver.responseHeaders(exchange); + return exchange != null && receiver.responseHeaders(exchange); } - protected void content(ByteBuffer buffer) + protected boolean content(ByteBuffer buffer) { HttpExchange exchange = getHttpExchange(); - if (exchange != null) - receiver.responseContent(exchange, buffer); + return exchange != null && receiver.responseContent(exchange, buffer); } - protected void responseSuccess() + protected boolean responseSuccess() { HttpExchange exchange = getHttpExchange(); - if (exchange != null) - receiver.responseSuccess(exchange); + return exchange != null && receiver.responseSuccess(exchange); + } + + protected boolean responseFailure(Throwable failure) + { + HttpExchange exchange = getHttpExchange(); + return exchange != null && receiver.responseFailure(failure); } @Override @@ -126,12 +125,10 @@ public class HttpChannelOverFCGI extends HttpChannel { super.exchangeTerminated(result); idle.onClose(); - boolean close = result.isFailed(); HttpFields responseHeaders = result.getResponse().getHeaders(); - close |= responseHeaders.contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString()); - if (close) - connection.close(); - else + if (result.isFailed()) + connection.close(result.getFailure()); + else if (!connection.closeByHTTP(responseHeaders)) connection.release(this); } @@ -154,7 +151,8 @@ public class HttpChannelOverFCGI extends HttpChannel @Override protected void onIdleExpired(TimeoutException timeout) { - LOG.debug("Idle timeout for request {}", request); + if (LOG.isDebugEnabled()) + LOG.debug("Idle timeout for request {}", request); connection.abort(timeout); } diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpClientTransportOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpClientTransportOverFCGI.java index eed8c892a60..00c6778d5e3 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpClientTransportOverFCGI.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpClientTransportOverFCGI.java @@ -69,7 +69,7 @@ public class HttpClientTransportOverFCGI extends AbstractHttpClientTransport public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map context) throws IOException { HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY); - HttpConnectionOverFCGI connection = new HttpConnectionOverFCGI(endPoint, destination); + HttpConnectionOverFCGI connection = new HttpConnectionOverFCGI(endPoint, destination, isMultiplexed()); LOG.debug("Created {}", connection); @SuppressWarnings("unchecked") Promise promise = (Promise)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY); diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java index 262a7c498c9..f14143bfb0c 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java @@ -31,7 +31,6 @@ import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpConnection; import org.eclipse.jetty.client.HttpDestination; import org.eclipse.jetty.client.HttpExchange; -import org.eclipse.jetty.client.PoolingHttpDestination; import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; @@ -39,6 +38,9 @@ import org.eclipse.jetty.fcgi.FCGI; import org.eclipse.jetty.fcgi.generator.Flusher; import org.eclipse.jetty.fcgi.parser.ClientParser; import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpHeaderValue; import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; @@ -55,14 +57,16 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec private final AtomicBoolean closed = new AtomicBoolean(); private final Flusher flusher; private final HttpDestination destination; + private final boolean multiplexed; private final Delegate delegate; private final ClientParser parser; - public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination) + public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, boolean multiplexed) { super(endPoint, destination.getHttpClient().getExecutor(), destination.getHttpClient().isDispatchIO()); - this.flusher = new Flusher(endPoint); this.destination = destination; + this.multiplexed = multiplexed; + this.flusher = new Flusher(endPoint); this.delegate = new Delegate(destination); this.parser = new ClientParser(new ResponseListener()); requests.addLast(0); @@ -103,7 +107,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec while (true) { int read = endPoint.fill(buffer); - if (LOG.isDebugEnabled()) // Avoid boxing of variable 'read' + if (LOG.isDebugEnabled()) // Avoid boxing of variable 'read'. LOG.debug("Read {} bytes from {}", read, endPoint); if (read > 0) { @@ -124,7 +128,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec catch (Exception x) { LOG.debug(x); - // TODO: fail and close ? + close(x); } finally { @@ -140,7 +144,12 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec private void shutdown() { - close(new EOFException()); + // Close explicitly only if we are idle, since the request may still + // be in progress, otherwise close only if we can fail the responses. + if (channels.isEmpty()) + close(); + else + failAndClose(new EOFException()); } @Override @@ -153,13 +162,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec protected void release(HttpChannelOverFCGI channel) { channels.remove(channel.getRequest()); - if (destination instanceof PoolingHttpDestination) - { - @SuppressWarnings("unchecked") - PoolingHttpDestination fcgiDestination = - (PoolingHttpDestination)destination; - fcgiDestination.release(this); - } + destination.release(this); } @Override @@ -168,7 +171,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec close(new AsynchronousCloseException()); } - private void close(Throwable failure) + protected void close(Throwable failure) { if (closed.compareAndSet(false, true)) { @@ -184,6 +187,16 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec } } + protected boolean closeByHTTP(HttpFields fields) + { + if (multiplexed) + return false; + if (!fields.contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString())) + return false; + close(); + return true; + } + protected void abort(Throwable failure) { for (HttpChannelOverFCGI channel : channels.values()) @@ -195,6 +208,15 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec channels.clear(); } + private void failAndClose(Throwable failure) + { + boolean result = false; + for (HttpChannelOverFCGI channel : channels.values()) + result |= channel.responseFailure(failure); + if (result) + close(failure); + } + private int acquireRequest() { synchronized (requests) @@ -322,8 +344,23 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec HttpChannelOverFCGI channel = channels.get(request); if (channel != null) { - channel.responseSuccess(); - releaseRequest(request); + if (channel.responseSuccess()) + releaseRequest(request); + } + else + { + noChannel(request); + } + } + + @Override + public void onFailure(int request, Throwable failure) + { + HttpChannelOverFCGI channel = channels.get(request); + if (channel != null) + { + if (channel.responseFailure(failure)) + releaseRequest(request); } else { diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/generator/ServerGenerator.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/generator/ServerGenerator.java index 39619268a7b..e113dd29b78 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/generator/ServerGenerator.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/generator/ServerGenerator.java @@ -88,21 +88,36 @@ public class ServerGenerator extends Generator return generateContent(request, buffer, true, false, callback, FCGI.FrameType.STDOUT); } - public Result generateResponseContent(int request, ByteBuffer content, boolean lastContent, Callback callback) + public Result generateResponseContent(int request, ByteBuffer content, boolean lastContent, boolean aborted, Callback callback) { - Result result = generateContent(request, content, false, lastContent, callback, FCGI.FrameType.STDOUT); - if (lastContent) + if (aborted) { - // Generate the FCGI_END_REQUEST - request &= 0xFF_FF; - ByteBuffer endRequestBuffer = byteBufferPool.acquire(8, false); - BufferUtil.clearToFill(endRequestBuffer); - endRequestBuffer.putInt(0x01_03_00_00 + request); - endRequestBuffer.putInt(0x00_08_00_00); - endRequestBuffer.putLong(0x00L); - endRequestBuffer.flip(); - result = result.append(endRequestBuffer, true); + Result result = new Result(byteBufferPool, callback); + if (lastContent) + result.append(generateEndRequest(request, true), true); + else + result.append(BufferUtil.EMPTY_BUFFER, false); + return result; } - return result; + else + { + Result result = generateContent(request, content, false, lastContent, callback, FCGI.FrameType.STDOUT); + if (lastContent) + result.append(generateEndRequest(request, false), true); + return result; + } + } + + private ByteBuffer generateEndRequest(int request, boolean aborted) + { + request &= 0xFF_FF; + ByteBuffer endRequestBuffer = byteBufferPool.acquire(8, false); + BufferUtil.clearToFill(endRequestBuffer); + endRequestBuffer.putInt(0x01_03_00_00 + request); + endRequestBuffer.putInt(0x00_08_00_00); + endRequestBuffer.putInt(aborted ? 1 : 0); + endRequestBuffer.putInt(0); + endRequestBuffer.flip(); + return endRequestBuffer; } } diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ClientParser.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ClientParser.java index 4ba70b44e22..d7a7dc6604d 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ClientParser.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/ClientParser.java @@ -98,5 +98,13 @@ public class ClientParser extends Parser for (StreamContentParser streamParser : streamParsers) streamParser.end(request); } + + @Override + public void onFailure(int request, Throwable failure) + { + listener.onFailure(request, failure); + for (StreamContentParser streamParser : streamParsers) + streamParser.end(request); + } } } diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/EndRequestContentParser.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/EndRequestContentParser.java index 419536af77d..1f77eaf0ea6 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/EndRequestContentParser.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/EndRequestContentParser.java @@ -107,8 +107,12 @@ public class EndRequestContentParser extends ContentParser private void onEnd() { - // TODO: if protocol != 0, invoke an error callback - listener.onEnd(getRequest()); + if (application != 0) + listener.onFailure(getRequest(), new Exception("FastCGI application returned code " + application)); + else if (protocol != 0) + listener.onFailure(getRequest(), new Exception("FastCGI server returned code " + protocol)); + else + listener.onEnd(getRequest()); } private void reset() diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/Parser.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/Parser.java index 577219c301c..5739ef32012 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/Parser.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/parser/Parser.java @@ -100,6 +100,8 @@ public abstract class Parser public void onEnd(int request); + public void onFailure(int request, Throwable failure); + public static class Adapter implements Listener { @Override @@ -121,6 +123,12 @@ public abstract class Parser public void onEnd(int request) { } + + @Override + public void onFailure(int request, Throwable failure) + { + + } } } diff --git a/jetty-fcgi/fcgi-client/src/test/java/org/eclipse/jetty/fcgi/parser/ClientParserTest.java b/jetty-fcgi/fcgi-client/src/test/java/org/eclipse/jetty/fcgi/parser/ClientParserTest.java index 5f80ed2827c..826ed84b43e 100644 --- a/jetty-fcgi/fcgi-client/src/test/java/org/eclipse/jetty/fcgi/parser/ClientParserTest.java +++ b/jetty-fcgi/fcgi-client/src/test/java/org/eclipse/jetty/fcgi/parser/ClientParserTest.java @@ -111,7 +111,7 @@ public class ClientParserTest ByteBufferPool byteBufferPool = new MappedByteBufferPool(); ServerGenerator generator = new ServerGenerator(byteBufferPool); Generator.Result result1 = generator.generateResponseHeaders(id, 200, "OK", fields, null); - Generator.Result result2 = generator.generateResponseContent(id, null, true, null); + Generator.Result result2 = generator.generateResponseContent(id, null, true, false, null); final AtomicInteger verifier = new AtomicInteger(); ClientParser parser = new ClientParser(new ClientParser.Listener.Adapter() @@ -162,7 +162,7 @@ public class ClientParserTest ByteBufferPool byteBufferPool = new MappedByteBufferPool(); ServerGenerator generator = new ServerGenerator(byteBufferPool); Generator.Result result1 = generator.generateResponseHeaders(id, code, "OK", fields, null); - Generator.Result result2 = generator.generateResponseContent(id, content, true, null); + Generator.Result result2 = generator.generateResponseContent(id, content, true, false, null); final AtomicInteger verifier = new AtomicInteger(); ClientParser parser = new ClientParser(new ClientParser.Listener.Adapter() @@ -214,7 +214,7 @@ public class ClientParserTest ByteBufferPool byteBufferPool = new MappedByteBufferPool(); ServerGenerator generator = new ServerGenerator(byteBufferPool); Generator.Result result1 = generator.generateResponseHeaders(id, code, "OK", fields, null); - Generator.Result result2 = generator.generateResponseContent(id, content, true, null); + Generator.Result result2 = generator.generateResponseContent(id, content, true, false, null); final AtomicInteger totalLength = new AtomicInteger(); final AtomicBoolean verifier = new AtomicBoolean(); diff --git a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpTransportOverFCGI.java b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpTransportOverFCGI.java index fd62656164e..91459ed7798 100644 --- a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpTransportOverFCGI.java +++ b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/HttpTransportOverFCGI.java @@ -24,6 +24,8 @@ import org.eclipse.jetty.fcgi.generator.Flusher; import org.eclipse.jetty.fcgi.generator.Generator; import org.eclipse.jetty.fcgi.generator.ServerGenerator; import org.eclipse.jetty.http.HttpGenerator; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpHeaderValue; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.server.HttpTransport; import org.eclipse.jetty.util.BufferUtil; @@ -35,6 +37,8 @@ public class HttpTransportOverFCGI implements HttpTransport private final Flusher flusher; private final int request; private volatile boolean head; + private volatile boolean shutdown; + private volatile boolean aborted; public HttpTransportOverFCGI(ByteBufferPool byteBufferPool, Flusher flusher, int request) { @@ -47,13 +51,15 @@ public class HttpTransportOverFCGI implements HttpTransport public void send(HttpGenerator.ResponseInfo info, ByteBuffer content, boolean lastContent, Callback callback) { boolean head = this.head = info.isHead(); + boolean shutdown = this.shutdown = info.getHttpFields().contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString()); + if (head) { if (lastContent) { Generator.Result headersResult = generator.generateResponseHeaders(request, info.getStatus(), info.getReason(), info.getHttpFields(), new Callback.Adapter()); - Generator.Result contentResult = generator.generateResponseContent(request, BufferUtil.EMPTY_BUFFER, lastContent, callback); + Generator.Result contentResult = generator.generateResponseContent(request, BufferUtil.EMPTY_BUFFER, lastContent, aborted, callback); flusher.flush(headersResult, contentResult); } else @@ -67,9 +73,12 @@ public class HttpTransportOverFCGI implements HttpTransport { Generator.Result headersResult = generator.generateResponseHeaders(request, info.getStatus(), info.getReason(), info.getHttpFields(), new Callback.Adapter()); - Generator.Result contentResult = generator.generateResponseContent(request, content, lastContent, callback); + Generator.Result contentResult = generator.generateResponseContent(request, content, lastContent, aborted, callback); flusher.flush(headersResult, contentResult); } + + if (lastContent && shutdown) + flusher.shutdown(); } @Override @@ -79,7 +88,7 @@ public class HttpTransportOverFCGI implements HttpTransport { if (lastContent) { - Generator.Result result = generator.generateResponseContent(request, BufferUtil.EMPTY_BUFFER, lastContent, callback); + Generator.Result result = generator.generateResponseContent(request, BufferUtil.EMPTY_BUFFER, lastContent, aborted, callback); flusher.flush(result); } else @@ -90,18 +99,22 @@ public class HttpTransportOverFCGI implements HttpTransport } else { - Generator.Result result = generator.generateResponseContent(request, content, lastContent, callback); + Generator.Result result = generator.generateResponseContent(request, content, lastContent, aborted, callback); flusher.flush(result); } + + if (lastContent && shutdown) + flusher.shutdown(); + } + + @Override + public void abort() + { + aborted = true; } @Override public void completed() { } - - @Override - public void abort() - { - } } diff --git a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java index c69253e4ded..2e8187966e0 100644 --- a/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java +++ b/jetty-fcgi/fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/ServerFCGIConnection.java @@ -175,5 +175,17 @@ public class ServerFCGIConnection extends AbstractConnection channel.dispatch(); } } + + @Override + public void onFailure(int request, Throwable failure) + { + HttpChannelOverFCGI channel = channels.remove(request); + if (LOG.isDebugEnabled()) + LOG.debug("Request {} failure on {}: {}", request, channel, failure); + if (channel != null) + { + channel.badMessage(400, failure.toString()); + } + } } } diff --git a/jetty-fcgi/fcgi-server/src/test/java/org/eclipse/jetty/fcgi/server/HttpClientTest.java b/jetty-fcgi/fcgi-server/src/test/java/org/eclipse/jetty/fcgi/server/HttpClientTest.java index 103a326eabb..2146f6305be 100644 --- a/jetty-fcgi/fcgi-server/src/test/java/org/eclipse/jetty/fcgi/server/HttpClientTest.java +++ b/jetty-fcgi/fcgi-server/src/test/java/org/eclipse/jetty/fcgi/server/HttpClientTest.java @@ -24,6 +24,7 @@ import java.net.URI; import java.net.URLEncoder; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -40,6 +41,8 @@ import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.util.BytesContentProvider; +import org.eclipse.jetty.client.util.DeferredContentProvider; +import org.eclipse.jetty.client.util.FutureResponseListener; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.toolchain.test.IO; @@ -551,4 +554,77 @@ public class HttpClientTest extends AbstractHttpClientServerTest Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); } + + @Test + public void testEarlyEOF() throws Exception + { + start(new AbstractHandler() + { + @Override + public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + // Promise some content, then flush the headers, then fail to send the content. + response.setContentLength(16); + response.flushBuffer(); + throw new NullPointerException(); + } + }); + + try + { + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .timeout(5, TimeUnit.SECONDS) + .send(); + Assert.fail(); + } + catch (ExecutionException x) + { + // Expected. + } + } + + @Test + public void testSmallContentDelimitedByEOFWithSlowRequest() throws Exception + { + testContentDelimitedByEOFWithSlowRequest(1024); + } + + @Test + public void testBigContentDelimitedByEOFWithSlowRequest() throws Exception + { + testContentDelimitedByEOFWithSlowRequest(128 * 1024); + } + + private void testContentDelimitedByEOFWithSlowRequest(int length) throws Exception + { + final byte[] data = new byte[length]; + new Random().nextBytes(data); + start(new AbstractHandler() + { + @Override + public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + response.setHeader("Connection", "close"); + response.getOutputStream().write(data); + } + }); + + DeferredContentProvider content = new DeferredContentProvider(ByteBuffer.wrap(new byte[]{0})); + Request request = client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .content(content); + FutureResponseListener listener = new FutureResponseListener(request); + request.send(listener); + // Wait some time to simulate a slow request. + Thread.sleep(1000); + content.close(); + + ContentResponse response = listener.get(5, TimeUnit.SECONDS); + + Assert.assertEquals(200, response.getStatus()); + Assert.assertArrayEquals(data, response.getContent()); + } }