diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpContent.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpContent.java index 598d11e133f..aed1adec365 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpContent.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpContent.java @@ -155,6 +155,8 @@ public class HttpContent implements Callback, Closeable @Override public void succeeded() { + if (isConsumed()) + return; if (iterator instanceof Callback) ((Callback)iterator).succeeded(); } @@ -162,6 +164,8 @@ public class HttpContent implements Callback, Closeable @Override public void failed(Throwable x) { + if (isConsumed()) + return; if (iterator instanceof Callback) ((Callback)iterator).failed(x); } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java index 310d4a663c4..17e8c83c8fd 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientStreamTest.java @@ -57,6 +57,7 @@ import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.toolchain.test.MavenTestingUtils; import org.eclipse.jetty.toolchain.test.annotation.Slow; import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.junit.Assert; @@ -669,6 +670,51 @@ public class HttpClientStreamTest extends AbstractHttpClientServerTest Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); } + @Test + public void testUploadWithDeferredContentAvailableCallbacksNotifiedOnce() throws Exception + { + start(new AbstractHandler() + { + @Override + public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + IO.copy(request.getInputStream(), new ByteArrayOutputStream()); + } + }); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicInteger succeeds = new AtomicInteger(); + try (DeferredContentProvider content = new DeferredContentProvider()) + { + // Make the content immediately available. + content.offer(ByteBuffer.allocate(1024), new Callback.Adapter() + { + @Override + public void succeeded() + { + Thread.dumpStack(); + succeeds.incrementAndGet(); + } + }); + + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .content(content) + .send(new Response.CompleteListener() + { + @Override + public void onComplete(Result result) + { + if (result.isSucceeded() && result.getResponse().getStatus() == 200) + latch.countDown(); + } + }); + } + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + Assert.assertEquals(1, succeeds.get()); + } + @Test public void testUploadWithDeferredContentProviderRacingWithSend() throws Exception {