Fixes #2225 - Test failure: StreamResetTest.testBlockingWriteAfterStreamReceivingReset.

The problem was caused by the reset arriving to the server
_before_ the commit callback was invoked.
Now waiting for the commit callback to complete before
sending the reset to the server.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2018-03-29 12:31:44 +02:00
parent e6eac94898
commit 072442a5e5
2 changed files with 68 additions and 46 deletions

View File

@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.AsyncContext; import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream; import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener; import javax.servlet.WriteListener;
import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServlet;
@ -244,12 +243,13 @@ public class StreamResetTest extends AbstractTest
@Test @Test
public void testBlockingWriteAfterStreamReceivingReset() throws Exception public void testBlockingWriteAfterStreamReceivingReset() throws Exception
{ {
final CountDownLatch resetLatch = new CountDownLatch(1); CountDownLatch commitLatch = new CountDownLatch(1);
final CountDownLatch dataLatch = new CountDownLatch(1); CountDownLatch resetLatch = new CountDownLatch(1);
CountDownLatch dataLatch = new CountDownLatch(1);
start(new HttpServlet() start(new HttpServlet()
{ {
@Override @Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException protected void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException
{ {
Charset charset = StandardCharsets.UTF_8; Charset charset = StandardCharsets.UTF_8;
byte[] data = "AFTER RESET".getBytes(charset); byte[] data = "AFTER RESET".getBytes(charset);
@ -258,11 +258,15 @@ public class StreamResetTest extends AbstractTest
response.setContentType("text/plain;charset=" + charset.name()); response.setContentType("text/plain;charset=" + charset.name());
response.setContentLength(data.length * 10); response.setContentLength(data.length * 10);
response.flushBuffer(); response.flushBuffer();
// Wait for the commit callback to complete.
commitLatch.countDown();
try try
{ {
// Wait for the reset to happen. // Wait for the reset to be sent.
Assert.assertTrue(resetLatch.await(10, TimeUnit.SECONDS)); Assert.assertTrue(resetLatch.await(5, TimeUnit.SECONDS));
// Wait for the reset to arrive to the server and be processed.
Thread.sleep(1000);
} }
catch (InterruptedException x) catch (InterruptedException x)
{ {
@ -282,7 +286,7 @@ public class StreamResetTest extends AbstractTest
} }
catch (InterruptedException x) catch (InterruptedException x)
{ {
throw new InterruptedIOException();
} }
catch (IOException x) catch (IOException x)
{ {
@ -299,23 +303,33 @@ public class StreamResetTest extends AbstractTest
@Override @Override
public void onHeaders(Stream stream, HeadersFrame frame) public void onHeaders(Stream stream, HeadersFrame frame)
{ {
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); try
resetLatch.countDown(); {
commitLatch.await(5, TimeUnit.SECONDS);
Callback.Completable completable = new Callback.Completable();
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), completable);
completable.thenRun(resetLatch::countDown);
}
catch (InterruptedException x)
{
x.printStackTrace();
}
} }
}); });
Assert.assertTrue(dataLatch.await(10, TimeUnit.SECONDS)); Assert.assertTrue(dataLatch.await(5, TimeUnit.SECONDS));
} }
@Test @Test
public void testAsyncWriteAfterStreamReceivingReset() throws Exception public void testAsyncWriteAfterStreamReceivingReset() throws Exception
{ {
final CountDownLatch resetLatch = new CountDownLatch(1); CountDownLatch commitLatch = new CountDownLatch(1);
final CountDownLatch dataLatch = new CountDownLatch(1); CountDownLatch resetLatch = new CountDownLatch(1);
CountDownLatch dataLatch = new CountDownLatch(1);
start(new HttpServlet() start(new HttpServlet()
{ {
@Override @Override
protected void doGet(HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException protected void doGet(HttpServletRequest request, final HttpServletResponse response) throws IOException
{ {
Charset charset = StandardCharsets.UTF_8; Charset charset = StandardCharsets.UTF_8;
final ByteBuffer data = ByteBuffer.wrap("AFTER RESET".getBytes(charset)); final ByteBuffer data = ByteBuffer.wrap("AFTER RESET".getBytes(charset));
@ -324,6 +338,8 @@ public class StreamResetTest extends AbstractTest
response.setContentType("text/plain;charset=" + charset.name()); response.setContentType("text/plain;charset=" + charset.name());
response.setContentLength(data.remaining()); response.setContentLength(data.remaining());
response.flushBuffer(); response.flushBuffer();
// Wait for the commit callback to complete.
commitLatch.countDown();
try try
{ {
@ -339,10 +355,7 @@ public class StreamResetTest extends AbstractTest
// Write some content asynchronously after the stream has been reset. // Write some content asynchronously after the stream has been reset.
final AsyncContext context = request.startAsync(); final AsyncContext context = request.startAsync();
new Thread() new Thread(() ->
{
@Override
public void run()
{ {
try try
{ {
@ -365,8 +378,7 @@ public class StreamResetTest extends AbstractTest
{ {
x.printStackTrace(); x.printStackTrace();
} }
} }).start();
}.start();
} }
}); });
@ -378,8 +390,17 @@ public class StreamResetTest extends AbstractTest
@Override @Override
public void onHeaders(Stream stream, HeadersFrame frame) public void onHeaders(Stream stream, HeadersFrame frame)
{ {
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); try
resetLatch.countDown(); {
commitLatch.await(5, TimeUnit.SECONDS);
Callback.Completable completable = new Callback.Completable();
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), completable);
completable.thenRun(resetLatch::countDown);
}
catch (InterruptedException x)
{
x.printStackTrace();
}
} }
}); });
@ -439,7 +460,7 @@ public class StreamResetTest extends AbstractTest
context.addServlet(new ServletHolder(new HttpServlet() context.addServlet(new ServletHolder(new HttpServlet()
{ {
@Override @Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{ {
phaser.get().countDown(); phaser.get().countDown();
IO.copy(request.getInputStream(), response.getOutputStream()); IO.copy(request.getInputStream(), response.getOutputStream());
@ -526,7 +547,7 @@ public class StreamResetTest extends AbstractTest
start(new HttpServlet() start(new HttpServlet()
{ {
@Override @Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{ {
try try
{ {
@ -578,7 +599,7 @@ public class StreamResetTest extends AbstractTest
start(new HttpServlet() start(new HttpServlet()
{ {
@Override @Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException protected void service(HttpServletRequest request, HttpServletResponse response)
{ {
AsyncContext asyncContext = request.startAsync(); AsyncContext asyncContext = request.startAsync();
asyncContext.start(() -> asyncContext.start(() ->
@ -642,7 +663,7 @@ public class StreamResetTest extends AbstractTest
start(new HttpServlet() start(new HttpServlet()
{ {
@Override @Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException protected void service(HttpServletRequest request, HttpServletResponse response)
{ {
try try
{ {
@ -694,7 +715,7 @@ public class StreamResetTest extends AbstractTest
start(new HttpServlet() start(new HttpServlet()
{ {
@Override @Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{ {
AsyncContext asyncContext = request.startAsync(); AsyncContext asyncContext = request.startAsync();
ServletOutputStream output = response.getOutputStream(); ServletOutputStream output = response.getOutputStream();

View File

@ -303,9 +303,10 @@ public class HttpTransportOverHTTP2 implements HttpTransport
commit = this.commit; commit = this.commit;
if (state == State.WRITING) if (state == State.WRITING)
{ {
this.state = State.IDLE;
callback = this.callback; callback = this.callback;
this.callback = null; this.callback = null;
this.state = State.IDLE; this.commit = false;
} }
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -330,13 +331,13 @@ public class HttpTransportOverHTTP2 implements HttpTransport
if (state == State.WRITING) if (state == State.WRITING)
{ {
this.state = State.FAILED; this.state = State.FAILED;
this.failure = failure;
callback = this.callback; callback = this.callback;
this.callback = null; this.callback = null;
this.failure = failure;
} }
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug(String.format("HTTP2 Response #%d/%h failed to %s", stream.getId(), stream.getSession(), commit ? "commit" : "flush"), failure); LOG.debug(String.format("HTTP2 Response #%d/%h %s %s", stream.getId(), stream.getSession(), commit ? "commit" : "flush", callback == null ? "ignored" : "failed"), failure);
if (callback != null) if (callback != null)
callback.failed(failure); callback.failed(failure);
} }