477878 - HttpClient over HTTP/2 doesn't close upload stream.

Clarified the difference between last and consumed in HttpContent.
Fixed HTTP/2 transport to behave correctly in case of last content.
This commit is contained in:
Simone Bordet 2015-09-25 11:16:38 +02:00
parent 6544af8ce7
commit 3201d0acd2
6 changed files with 242 additions and 26 deletions

View File

@ -67,11 +67,13 @@ public class HttpContent implements Callback, Closeable
{
private static final Logger LOG = Log.getLogger(HttpContent.class);
private static final ByteBuffer AFTER = ByteBuffer.allocate(0);
private static final ByteBuffer CLOSE = ByteBuffer.allocate(0);
private final ContentProvider provider;
private final Iterator<ByteBuffer> iterator;
private ByteBuffer buffer;
private volatile ByteBuffer content;
private ByteBuffer content;
private boolean last;
public HttpContent(ContentProvider provider)
{
@ -92,7 +94,7 @@ public class HttpContent implements Callback, Closeable
*/
public boolean isLast()
{
return !iterator.hasNext();
return last;
}
/**
@ -124,41 +126,50 @@ public class HttpContent implements Callback, Closeable
*/
public boolean advance()
{
boolean advanced;
boolean hasNext;
ByteBuffer bytes;
if (iterator instanceof Synchronizable)
{
synchronized (((Synchronizable)iterator).getLock())
{
advanced = iterator.hasNext();
bytes = advanced ? iterator.next() : null;
hasNext = advanced && iterator.hasNext();
return advance(iterator);
}
}
else
{
advanced = iterator.hasNext();
bytes = advanced ? iterator.next() : null;
hasNext = advanced && iterator.hasNext();
return advance(iterator);
}
}
if (advanced)
private boolean advance(Iterator<ByteBuffer> iterator)
{
boolean hasNext = iterator.hasNext();
ByteBuffer bytes = hasNext ? iterator.next() : null;
boolean hasMore = hasNext && iterator.hasNext();
boolean wasLast = last;
last = !hasMore;
if (hasNext)
{
buffer = bytes;
content = bytes == null ? null : bytes.slice();
if (LOG.isDebugEnabled())
LOG.debug("Advanced content to {} chunk {}", hasNext ? "next" : "last", bytes);
LOG.debug("Advanced content to {} chunk {}", hasMore ? "next" : "last", String.valueOf(bytes));
return bytes != null;
}
else
{
if (content != AFTER)
// No more content, but distinguish between last and consumed.
if (wasLast)
{
content = buffer = AFTER;
buffer = content = AFTER;
if (LOG.isDebugEnabled())
LOG.debug("Advanced content past last chunk");
}
else
{
buffer = content = CLOSE;
if (LOG.isDebugEnabled())
LOG.debug("Advanced content to last chunk");
}
return false;
}
}
@ -168,7 +179,7 @@ public class HttpContent implements Callback, Closeable
*/
public boolean isConsumed()
{
return content == AFTER;
return buffer == AFTER;
}
@Override
@ -176,6 +187,8 @@ public class HttpContent implements Callback, Closeable
{
if (isConsumed())
return;
if (buffer == CLOSE)
return;
if (iterator instanceof Callback)
((Callback)iterator).succeeded();
}
@ -185,6 +198,8 @@ public class HttpContent implements Callback, Closeable
{
if (isConsumed())
return;
if (buffer == CLOSE)
return;
if (iterator instanceof Callback)
((Callback)iterator).failed(x);
}

View File

@ -678,7 +678,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
{
return content.isNonBlocking();
}
@Override
public void succeeded()
{
@ -811,9 +811,9 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
while (true)
{
boolean advanced = content.advance();
boolean consumed = content.isConsumed();
boolean lastContent = content.isLast();
if (LOG.isDebugEnabled())
LOG.debug("Content {} consumed {} for {}", advanced, consumed, exchange.getRequest());
LOG.debug("Content present {}, last {}, consumed {} for {}", advanced, lastContent, content.isConsumed(), exchange.getRequest());
if (advanced)
{
@ -821,7 +821,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
return Action.SCHEDULED;
}
if (consumed)
if (lastContent)
{
sendContent(exchange, content, lastCallback);
return Action.IDLE;
@ -894,7 +894,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
{
return content.isNonBlocking();
}
@Override
public void succeeded()
{

View File

@ -79,7 +79,9 @@ public class HttpSenderOverHTTP extends HttpSender
boolean lastContent = content.isLast();
HttpGenerator.Result result = generator.generateRequest(null, null, chunk, contentBuffer, lastContent);
if (LOG.isDebugEnabled())
LOG.debug("Generated content: {} - {}", result, generator);
LOG.debug("Generated content ({} bytes) - {}/{}",
contentBuffer == null ? -1 : contentBuffer.remaining(),
result, generator);
switch (result)
{
case NEED_CHUNK:
@ -200,7 +202,11 @@ public class HttpSenderOverHTTP extends HttpSender
{
HttpGenerator.Result result = generator.generateRequest(metaData, headerBuffer, chunkBuffer, contentBuffer, lastContent);
if (LOG.isDebugEnabled())
LOG.debug("Generated headers: {} - ", result, generator);
LOG.debug("Generated headers ({} bytes), chunk ({} bytes), content ({} bytes) - {}/{}",
headerBuffer == null ? -1 : headerBuffer.remaining(),
chunkBuffer == null ? -1 : chunkBuffer.remaining(),
contentBuffer == null ? -1 : contentBuffer.remaining(),
result, generator);
switch (result)
{
case NEED_HEADER:

View File

@ -891,8 +891,9 @@ public class HttpGenerator
@Override
public String toString()
{
return String.format("%s{s=%s}",
return String.format("%s@%x{s=%s}",
getClass().getSimpleName(),
hashCode(),
_state);
}

View File

@ -64,9 +64,11 @@ public class HttpSenderOverHTTP2 extends HttpSender
if (content.hasContent() && !expects100Continue(request))
{
if (content.advance())
boolean advanced = content.advance();
boolean lastContent = content.isLast();
if (advanced || lastContent)
{
DataFrame dataFrame = new DataFrame(stream.getId(), content.getByteBuffer(), content.isLast());
DataFrame dataFrame = new DataFrame(stream.getId(), content.getByteBuffer(), lastContent);
stream.data(dataFrame, callback);
return;
}
@ -80,6 +82,7 @@ public class HttpSenderOverHTTP2 extends HttpSender
callback.failed(failure);
}
};
// TODO optimize the send of HEADERS and DATA frames.
channel.getSession().newStream(headersFrame, promise, channel.getStreamListener());
}

View File

@ -0,0 +1,191 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http.client;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.client.util.InputStreamContentProvider;
import org.eclipse.jetty.client.util.OutputStreamContentProvider;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.Assert;
import org.junit.Test;
public class AsyncRequestContentTest extends AbstractTest
{
public AsyncRequestContentTest(Transport transport)
{
super(transport);
}
@Test
public void testEmptyDeferredContent() throws Exception
{
start(new ConsumeInputHandler());
DeferredContentProvider contentProvider = new DeferredContentProvider();
CountDownLatch latch = new CountDownLatch(1);
client.POST("http://localhost:" + connector.getLocalPort())
.content(contentProvider)
.send(result ->
{
if (result.isSucceeded() &&
result.getResponse().getStatus() == HttpStatus.OK_200)
latch.countDown();
});
contentProvider.close();
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testDeferredContent() throws Exception
{
start(new ConsumeInputHandler());
DeferredContentProvider contentProvider = new DeferredContentProvider();
CountDownLatch latch = new CountDownLatch(1);
client.POST("http://localhost:" + connector.getLocalPort())
.content(contentProvider)
.send(result ->
{
if (result.isSucceeded() &&
result.getResponse().getStatus() == HttpStatus.OK_200)
latch.countDown();
});
contentProvider.offer(ByteBuffer.wrap(new byte[1]));
contentProvider.close();
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testEmptyInputStream() throws Exception
{
start(new ConsumeInputHandler());
InputStreamContentProvider contentProvider =
new InputStreamContentProvider(new ByteArrayInputStream(new byte[0]));
CountDownLatch latch = new CountDownLatch(1);
client.POST("http://localhost:" + connector.getLocalPort())
.content(contentProvider)
.send(result ->
{
if (result.isSucceeded() &&
result.getResponse().getStatus() == HttpStatus.OK_200)
latch.countDown();
});
contentProvider.close();
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testInputStream() throws Exception
{
start(new ConsumeInputHandler());
InputStreamContentProvider contentProvider =
new InputStreamContentProvider(new ByteArrayInputStream(new byte[1]));
CountDownLatch latch = new CountDownLatch(1);
client.POST("http://localhost:" + connector.getLocalPort())
.content(contentProvider)
.send(result ->
{
if (result.isSucceeded() &&
result.getResponse().getStatus() == HttpStatus.OK_200)
latch.countDown();
});
contentProvider.close();
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testEmptyOutputStream() throws Exception
{
start(new ConsumeInputHandler());
OutputStreamContentProvider contentProvider = new OutputStreamContentProvider();
CountDownLatch latch = new CountDownLatch(1);
client.POST("http://localhost:" + connector.getLocalPort())
.content(contentProvider)
.send(result ->
{
if (result.isSucceeded() &&
result.getResponse().getStatus() == HttpStatus.OK_200)
latch.countDown();
});
contentProvider.close();
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testOutputStream() throws Exception
{
start(new ConsumeInputHandler());
OutputStreamContentProvider contentProvider = new OutputStreamContentProvider();
CountDownLatch latch = new CountDownLatch(1);
client.POST("http://localhost:" + connector.getLocalPort())
.content(contentProvider)
.send(result ->
{
if (result.isSucceeded() &&
result.getResponse().getStatus() == HttpStatus.OK_200)
latch.countDown();
});
OutputStream output = contentProvider.getOutputStream();
output.write(new byte[1]);
output.flush();
contentProvider.close();
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
private static class ConsumeInputHandler extends AbstractHandler
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
ServletInputStream input = request.getInputStream();
while (true)
{
int read = input.read();
if (read < 0)
break;
}
response.setStatus(HttpStatus.OK_200);
}
}
}