444764 - HttpClient notifies callbacks for last chunk of content twice.

Fixed by filtering out notification of the callbacks in case the
HttpContent is already consumed.
This commit is contained in:
Simone Bordet 2014-09-22 18:16:49 +02:00
parent 709ac00f8b
commit 1cd367ae7c
2 changed files with 50 additions and 0 deletions

View File

@ -155,6 +155,8 @@ public class HttpContent implements Callback, Closeable
@Override
public void succeeded()
{
if (isConsumed())
return;
if (iterator instanceof Callback)
((Callback)iterator).succeeded();
}
@ -162,6 +164,8 @@ public class HttpContent implements Callback, Closeable
@Override
public void failed(Throwable x)
{
if (isConsumed())
return;
if (iterator instanceof Callback)
((Callback)iterator).failed(x);
}

View File

@ -57,6 +57,7 @@ import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert;
@ -669,6 +670,51 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testUploadWithDeferredContentAvailableCallbacksNotifiedOnce() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
IO.copy(request.getInputStream(), new ByteArrayOutputStream());
}
});
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger succeeds = new AtomicInteger();
try (DeferredContentProvider content = new DeferredContentProvider())
{
// Make the content immediately available.
content.offer(ByteBuffer.allocate(1024), new Callback.Adapter()
{
@Override
public void succeeded()
{
Thread.dumpStack();
succeeds.incrementAndGet();
}
});
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.content(content)
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
if (result.isSucceeded() && result.getResponse().getStatus() == 200)
latch.countDown();
}
});
}
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertEquals(1, succeeds.get());
}
@Test
public void testUploadWithDeferredContentProviderRacingWithSend() throws Exception
{