478021 - Client sending Connection: close does not shutdown output.

Fixed behavior of HttpGenerator to change its persistent also for
requests.

Reworked HttpSenderOverHTTP to send headers via IteratingCallback, so
that multiple generation steps can be made to produce SHUTDOWN_OUT.
This commit is contained in:
Simone Bordet 2015-09-23 22:21:51 +02:00
parent f063df4200
commit 8c21871cf0
3 changed files with 286 additions and 86 deletions

View File

@ -33,6 +33,7 @@ import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
public class HttpSenderOverHTTP extends HttpSender
{
@ -52,77 +53,9 @@ public class HttpSenderOverHTTP extends HttpSender
@Override
protected void sendHeaders(HttpExchange exchange, HttpContent content, Callback callback)
{
Request request = exchange.getRequest();
ContentProvider requestContent = request.getContent();
long contentLength = requestContent == null ? -1 : requestContent.getLength();
String path = request.getPath();
String query = request.getQuery();
if (query != null)
path += "?" + query;
MetaData.Request requestInfo = new MetaData.Request(request.getMethod(), new HttpURI(path), request.getVersion(), request.getHeaders(), contentLength);
try
{
HttpClient client = getHttpChannel().getHttpDestination().getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
ByteBuffer header = bufferPool.acquire(client.getRequestBufferSize(), false);
ByteBuffer chunk = null;
ByteBuffer contentBuffer = null;
boolean lastContent = false;
if (!expects100Continue(request))
{
content.advance();
contentBuffer = content.getByteBuffer();
lastContent = content.isLast();
}
while (true)
{
HttpGenerator.Result result = generator.generateRequest(requestInfo, header, chunk, contentBuffer, lastContent);
switch (result)
{
case NEED_CHUNK:
{
chunk = bufferPool.acquire(HttpGenerator.CHUNK_SIZE, false);
break;
}
case FLUSH:
{
int size = 1;
boolean hasChunk = chunk != null;
if (hasChunk)
++size;
boolean hasContent = contentBuffer != null;
if (hasContent)
++size;
ByteBuffer[] toWrite = new ByteBuffer[size];
ByteBuffer[] toRecycle = new ByteBuffer[hasChunk ? 2 : 1];
toWrite[0] = header;
toRecycle[0] = header;
if (hasChunk)
{
toWrite[1] = chunk;
toRecycle[1] = chunk;
}
if (hasContent)
toWrite[toWrite.length - 1] = contentBuffer;
EndPoint endPoint = getHttpChannel().getHttpConnection().getEndPoint();
endPoint.write(new ByteBufferRecyclerCallback(callback, bufferPool, toRecycle), toWrite);
return;
}
case DONE:
{
// The headers have already been generated, perhaps by a concurrent abort.
callback.failed(new HttpRequestException("Could not generate headers", request));
return;
}
default:
{
callback.failed(new IllegalStateException(result.toString()));
return;
}
}
}
new HeadersCallback(exchange, content, callback).iterate();
}
catch (Throwable x)
{
@ -145,6 +78,8 @@ public class HttpSenderOverHTTP extends HttpSender
ByteBuffer contentBuffer = content.getByteBuffer();
boolean lastContent = content.isLast();
HttpGenerator.Result result = generator.generateRequest(null, null, chunk, contentBuffer, lastContent);
if (LOG.isDebugEnabled())
LOG.debug("Generated content: {} - {}", result, generator);
switch (result)
{
case NEED_CHUNK:
@ -168,17 +103,19 @@ public class HttpSenderOverHTTP extends HttpSender
}
case CONTINUE:
{
break;
if (lastContent)
break;
callback.succeeded();
return;
}
case DONE:
{
assert generator.isEnd();
callback.succeeded();
return;
}
default:
{
throw new IllegalStateException();
throw new IllegalStateException(result.toString());
}
}
}
@ -208,6 +145,8 @@ public class HttpSenderOverHTTP extends HttpSender
private void shutdownOutput()
{
if (LOG.isDebugEnabled())
LOG.debug("Request shutdown output {}", getHttpExchange().getRequest());
getHttpChannel().getHttpConnection().getEndPoint().shutdownOutput();
}
@ -217,6 +156,144 @@ public class HttpSenderOverHTTP extends HttpSender
return String.format("%s[%s]", super.toString(), generator);
}
private class HeadersCallback extends IteratingCallback
{
private final HttpExchange exchange;
private final Callback callback;
private final MetaData.Request metaData;
private ByteBuffer headerBuffer;
private ByteBuffer chunkBuffer;
private ByteBuffer contentBuffer;
private boolean lastContent;
private boolean generated;
public HeadersCallback(HttpExchange exchange, HttpContent content, Callback callback)
{
super(false);
this.exchange = exchange;
this.callback = callback;
Request request = exchange.getRequest();
ContentProvider requestContent = request.getContent();
long contentLength = requestContent == null ? -1 : requestContent.getLength();
String path = request.getPath();
String query = request.getQuery();
if (query != null)
path += "?" + query;
metaData = new MetaData.Request(request.getMethod(), new HttpURI(path), request.getVersion(), request.getHeaders(), contentLength);
if (!expects100Continue(request))
{
content.advance();
contentBuffer = content.getByteBuffer();
lastContent = content.isLast();
}
}
@Override
protected Action process() throws Exception
{
HttpClient client = getHttpChannel().getHttpDestination().getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
while (true)
{
HttpGenerator.Result result = generator.generateRequest(metaData, headerBuffer, chunkBuffer, contentBuffer, lastContent);
if (LOG.isDebugEnabled())
LOG.debug("Generated headers: {} - ", result, generator);
switch (result)
{
case NEED_HEADER:
{
headerBuffer = bufferPool.acquire(client.getRequestBufferSize(), false);
break;
}
case NEED_CHUNK:
{
chunkBuffer = bufferPool.acquire(HttpGenerator.CHUNK_SIZE, false);
break;
}
case FLUSH:
{
EndPoint endPoint = getHttpChannel().getHttpConnection().getEndPoint();
if (chunkBuffer == null)
{
if (contentBuffer == null)
endPoint.write(this, headerBuffer);
else
endPoint.write(this, headerBuffer, contentBuffer);
}
else
{
if (contentBuffer == null)
endPoint.write(this, headerBuffer, chunkBuffer);
else
endPoint.write(this, headerBuffer, chunkBuffer, contentBuffer);
}
generated = true;
return Action.SCHEDULED;
}
case SHUTDOWN_OUT:
{
shutdownOutput();
return Action.SUCCEEDED;
}
case CONTINUE:
{
if (generated)
return Action.SUCCEEDED;
break;
}
case DONE:
{
if (generated)
return Action.SUCCEEDED;
// The headers have already been generated by some
// other thread, perhaps by a concurrent abort().
throw new HttpRequestException("Could not generate headers", exchange.getRequest());
}
default:
{
throw new IllegalStateException(result.toString());
}
}
}
}
@Override
public void succeeded()
{
release();
super.succeeded();
}
@Override
public void failed(Throwable x)
{
release();
callback.failed(x);
super.failed(x);
}
@Override
protected void onCompleteSuccess()
{
super.onCompleteSuccess();
callback.succeeded();
}
private void release()
{
HttpClient client = getHttpChannel().getHttpDestination().getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
bufferPool.release(headerBuffer);
headerBuffer = null;
if (chunkBuffer != null)
bufferPool.release(chunkBuffer);
chunkBuffer = null;
}
}
private class ByteBufferRecyclerCallback implements Callback
{
private final Callback callback;

View File

@ -0,0 +1,122 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.client.util.StringContentProvider;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert;
import org.junit.Test;
public class ClientConnectionCloseTest extends AbstractHttpClientServerTest
{
public ClientConnectionCloseTest(SslContextFactory sslContextFactory)
{
super(sslContextFactory);
}
@Test
public void testClientConnectionCloseShutdownOutputWithoutRequestContent() throws Exception
{
testClientConnectionCloseShutdownOutput(null);
}
@Test
public void testClientConnectionCloseShutdownOutputWithRequestContent() throws Exception
{
testClientConnectionCloseShutdownOutput(new StringContentProvider("data", StandardCharsets.UTF_8));
}
@Test
public void testClientConnectionCloseShutdownOutputWithChunkedRequestContent() throws Exception
{
DeferredContentProvider content = new DeferredContentProvider()
{
@Override
public long getLength()
{
return -1;
}
};
content.offer(ByteBuffer.wrap("data".getBytes(StandardCharsets.UTF_8)));
content.close();
testClientConnectionCloseShutdownOutput(content);
}
private void testClientConnectionCloseShutdownOutput(ContentProvider content) throws Exception
{
AtomicReference<EndPoint> ref = new AtomicReference<>();
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
ref.set(baseRequest.getHttpChannel().getEndPoint());
ServletInputStream input = request.getInputStream();
while (true)
{
int read = input.read();
if (read < 0)
break;
}
response.setStatus(HttpStatus.OK_200);
}
});
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.path("/ctx/path")
.header(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString())
.content(content)
.send();
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
// Wait for the FIN to arrive to the server
Thread.sleep(1000);
// Do not read from the server because it will trigger
// the send of the TLS Close Message before the response.
EndPoint serverEndPoint = ref.get();
ByteBuffer buffer = BufferUtil.allocate(1);
int read = serverEndPoint.fill(buffer);
Assert.assertEquals(-1, read);
}
}

View File

@ -41,7 +41,7 @@ public class HttpGenerator
{
private final static Logger LOG = Log.getLogger(HttpGenerator.class);
public final static boolean __STRICT=Boolean.getBoolean("org.eclipse.jetty.http.HttpGenerator.STRICT");
public final static boolean __STRICT=Boolean.getBoolean("org.eclipse.jetty.http.HttpGenerator.STRICT");
private final static byte[] __colon_space = new byte[] {':',' '};
private final static HttpHeaderValue[] CLOSE = {HttpHeaderValue.CLOSE};
@ -87,7 +87,7 @@ public class HttpGenerator
{
this(false,false);
}
/* ------------------------------------------------------------------------------- */
public HttpGenerator(boolean sendServerVersion,boolean sendXPoweredBy)
{
@ -160,7 +160,7 @@ public class HttpGenerator
{
return _noContent;
}
/* ------------------------------------------------------------ */
public void setPersistent(boolean persistent)
{
@ -212,7 +212,11 @@ public class HttpGenerator
// If we have not been told our persistence, set the default
if (_persistent==null)
_persistent=(info.getVersion().ordinal() > HttpVersion.HTTP_1_0.ordinal());
{
_persistent=info.getVersion().ordinal() > HttpVersion.HTTP_1_0.ordinal();
if (!_persistent && HttpMethod.CONNECT.is(info.getMethod()))
_persistent=true;
}
// prepare the header
int pos=BufferUtil.flipToFill(header);
@ -278,12 +282,9 @@ public class HttpGenerator
}
if (last)
{
_state=State.COMPLETING;
return len>0?Result.FLUSH:Result.CONTINUE;
}
return Result.FLUSH;
return len>0?Result.FLUSH:Result.CONTINUE;
}
case COMPLETING:
@ -330,7 +331,7 @@ public class HttpGenerator
{
return generateResponse(info,false,header,chunk,content,last);
}
/* ------------------------------------------------------------ */
public Result generateResponse(MetaData.Response info, boolean head, ByteBuffer header, ByteBuffer chunk, ByteBuffer content, boolean last) throws IOException
{
@ -597,7 +598,7 @@ public class HttpGenerator
String v = field.getValue();
if (v==null || v.length()==0)
continue; // rfc7230 does not allow no value
HttpHeader h = field.getHeader();
switch (h==null?HttpHeader.UNKNOWN:h)
@ -664,9 +665,9 @@ public class HttpGenerator
case CLOSE:
{
close=true;
_persistent=false;
if (response!=null)
{
_persistent=false;
if (_endOfContent == EndOfContent.UNKNOWN_CONTENT)
_endOfContent=EndOfContent.EOF_CONTENT;
}
@ -959,7 +960,7 @@ public class HttpGenerator
for (int i=0;i<l;i++)
{
char c=s.charAt(i);
if (c<0 || c>0xff || c=='\r' || c=='\n'|| c==':')
buffer.put((byte)'?');
else
@ -973,7 +974,7 @@ public class HttpGenerator
for (int i=0;i<l;i++)
{
char c=s.charAt(i);
if (c<0 || c>0xff || c=='\r' || c=='\n')
buffer.put((byte)' ');
else
@ -1006,7 +1007,7 @@ public class HttpGenerator
}
}
public static void putTo(HttpFields fields, ByteBuffer bufferInFillMode)
public static void putTo(HttpFields fields, ByteBuffer bufferInFillMode)
{
for (HttpField field : fields)
{