diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java index 629405b8a7..9e67f8d900 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JDBCJournalStorageManager.java @@ -108,6 +108,8 @@ public class JDBCJournalStorageManager extends JournalStorageManager { idGenerator.persistCurrentID(); } + deletingLargeMessageTasks.await(30, TimeUnit.SECONDS); + final CountDownLatch latch = new CountDownLatch(1); executor.execute(new Runnable() { @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 2ae475c592..df4fec375b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -65,6 +65,7 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; import org.apache.activemq.artemis.journal.ActiveMQJournalBundle; import org.apache.activemq.artemis.utils.ExecutorFactory; +import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer; import org.jboss.logging.Logger; @@ -86,6 +87,8 @@ public class JournalStorageManager extends AbstractJournalStorageManager { protected ReplicationManager replicator; + protected final ReusableLatch deletingLargeMessageTasks = new ReusableLatch(); + public JournalStorageManager(final Configuration config, final CriticalAnalyzer analyzer, final ExecutorFactory executorFactory, @@ -272,6 +275,8 @@ public class JournalStorageManager extends AbstractJournalStorageManager { idGenerator.persistCurrentID(); } + deletingLargeMessageTasks.await(30, TimeUnit.SECONDS); + final CountDownLatch latch = new CountDownLatch(1); try { executor.execute(new Runnable() { @@ -517,6 +522,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager { }; + deletingLargeMessageTasks.countUp(); getContext(true).executeOnCompletion(new IOCallback() { @Override public void done() { @@ -525,11 +531,13 @@ public class JournalStorageManager extends AbstractJournalStorageManager { } else { executor.execute(deleteAction); } + + deletingLargeMessageTasks.countDown(); } @Override public void onError(int errorCode, String errorMessage) { - + deletingLargeMessageTasks.countDown(); } }); } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java index a0580941be..4e2741d41e 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManagerTest.java @@ -42,6 +42,8 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.impl.JournalLoader; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ExecutorFactory; +import org.apache.activemq.artemis.utils.Wait; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.junit.AfterClass; import org.junit.Assert; @@ -49,6 +51,7 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.mockito.Mockito; import static java.util.stream.Collectors.toList; import static org.hamcrest.Matchers.is; @@ -212,4 +215,74 @@ public class JournalStorageManagerTest extends ActiveMQTestBase { } } + @Test + public void testDeletingLargeMessagePendingTasksOnShutdown() throws Exception { + if (journalType == JournalType.ASYNCIO) { + assumeTrue("AIO is not supported on this platform", AIOSequentialFileFactory.isSupported()); + } + final Configuration configuration = createDefaultInVMConfig().setJournalType(journalType); + final ExecutorFactory executorFactory = spy(new OrderedExecutorFactory(executor)); + final ExecutorFactory ioExecutorFactory = new OrderedExecutorFactory(ioExecutor); + final ArtemisExecutor artemisExecutor = executorFactory.getExecutor(); + final ArtemisExecutor artemisExecutorWrapper = spy(artemisExecutor); + Mockito.when(executorFactory.getExecutor()).thenReturn(artemisExecutorWrapper); + final JournalStorageManager manager = new JournalStorageManager(configuration, null, executorFactory, null, ioExecutorFactory); + manager.start(); + manager.loadBindingJournal(new ArrayList<>(), new ArrayList<>(), new ArrayList<>()); + final PostOffice postOffice = mock(PostOffice.class); + final JournalLoader journalLoader = mock(JournalLoader.class); + manager.loadMessageJournal(postOffice, null, null, null, null, null, null, journalLoader); + final LargeServerMessage largeMessage = manager.createLargeMessage(manager.generateID() + 1, new CoreMessage().setDurable(true)); + final SequentialFile file = largeMessage.getFile(); + + boolean fileExists = file.exists(); + manager.getContext(true).storeLineUp(); + Assert.assertTrue(fileExists); + manager.deleteLargeMessageFile(largeMessage); + + final Thread currentThread = Thread.currentThread(); + final CountDownLatch beforeLatch = new CountDownLatch(1); + final CountDownLatch afterStopLatch = new CountDownLatch(1); + + //Simulate an executor task that begins after store done and ends after manager stop begins. + artemisExecutor.execute(() -> { + try { + //Wait until thread is ready to start executing manager stop. + Assert.assertTrue(beforeLatch.await(30000, TimeUnit.MILLISECONDS)); + + //Wait until thread executing manager stop is waiting for another thread. + Assert.assertTrue(Wait.waitFor(() -> currentThread.getState() == Thread.State.TIMED_WAITING, 30000)); + } catch (Exception ignore) { + } + }); + + Mockito.doAnswer(invocationOnMock -> { + invocationOnMock.callRealMethod(); + + if (Thread.currentThread().equals(currentThread) && + invocationOnMock.getArgument(0).getClass().getName().contains(JournalStorageManager.class.getName())) { + + //Simulate an executor task that ends after manager stop. + artemisExecutor.execute(() -> { + try { + //Wait until manager stop is executed. + afterStopLatch.await(30000, TimeUnit.MILLISECONDS); + } catch (Exception ignore) { + } + }); + } + + return null; + }).when(artemisExecutorWrapper).execute(Mockito.any(Runnable.class)); + + manager.getContext(true).done(); + + beforeLatch.countDown(); + manager.stop(); + fileExists = file.exists(); + afterStopLatch.countDown(); + + Assert.assertFalse(fileExists); + } + }