ARTEMIS-2293 addPacket in LargeMessageControllerImpl won't notifyAll for exception

This commit is contained in:
Justin Bertram 2021-11-02 14:52:17 -05:00 committed by clebertsuconic
parent f31122a722
commit 82645aa4e9
2 changed files with 29 additions and 2 deletions

View File

@ -162,14 +162,14 @@ public class LargeMessageControllerImpl implements LargeMessageController {
flowControlCredit = flowControlSize;
notifyAll();
if (streamEnded) {
outStream.close();
}
} catch (Exception e) {
ActiveMQClientLogger.LOGGER.errorAddingPacket(e);
handledException = e;
} finally {
notifyAll();
}
} else {
if (fileCache != null) {

View File

@ -467,6 +467,33 @@ public class LargeMessageBufferTest extends ActiveMQTestBase {
assertTrue("It was supposed to wait at least 1 second", System.currentTimeMillis() - time > 1000);
}
@Test
public void testStreamDataWaitCompletionOnException() throws Exception {
final LargeMessageControllerImpl outBuffer = new LargeMessageControllerImpl(new FakeConsumerInternal(), 5, 5000);
class FakeOutputStream extends OutputStream {
@Override
public void write(int b) throws IOException {
throw new IOException();
}
}
outBuffer.setOutputStream(new FakeOutputStream());
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
try {
outBuffer.waitCompletion(0);
fail("supposed to throw an exception");
} catch (ActiveMQException e) {
latch.countDown();
}
}).start();
outBuffer.addPacket(RandomUtil.randomBytes(), 1, true);
assertTrue("The IOException should trigger an immediate failure", latch.await(3, TimeUnit.SECONDS));
}
@Test
public void testStreamDataWaitCompletionOnSlowComingBuffer() throws Exception {
final LargeMessageControllerImpl outBuffer = new LargeMessageControllerImpl(new FakeConsumerInternal(), 5, 1000);