ARTEMIS-2415 JDBCJournal miss pending tasks during shutdown
Wait deleting large message tasks during stop.
This commit is contained in:
parent
84ccb7e491
commit
a85f029106
|
@ -108,6 +108,8 @@ public class JDBCJournalStorageManager extends JournalStorageManager {
|
||||||
idGenerator.persistCurrentID();
|
idGenerator.persistCurrentID();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
deletingLargeMessageTasks.await(30, TimeUnit.SECONDS);
|
||||||
|
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
executor.execute(new Runnable() {
|
executor.execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -65,6 +65,7 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
||||||
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
|
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
|
||||||
import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
|
import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
|
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||||
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
|
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
|
||||||
import org.jboss.logging.Logger;
|
import org.jboss.logging.Logger;
|
||||||
|
|
||||||
|
@ -86,6 +87,8 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
||||||
|
|
||||||
protected ReplicationManager replicator;
|
protected ReplicationManager replicator;
|
||||||
|
|
||||||
|
protected final ReusableLatch deletingLargeMessageTasks = new ReusableLatch();
|
||||||
|
|
||||||
public JournalStorageManager(final Configuration config,
|
public JournalStorageManager(final Configuration config,
|
||||||
final CriticalAnalyzer analyzer,
|
final CriticalAnalyzer analyzer,
|
||||||
final ExecutorFactory executorFactory,
|
final ExecutorFactory executorFactory,
|
||||||
|
@ -272,6 +275,8 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
||||||
idGenerator.persistCurrentID();
|
idGenerator.persistCurrentID();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
deletingLargeMessageTasks.await(30, TimeUnit.SECONDS);
|
||||||
|
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
try {
|
try {
|
||||||
executor.execute(new Runnable() {
|
executor.execute(new Runnable() {
|
||||||
|
@ -517,6 +522,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
deletingLargeMessageTasks.countUp();
|
||||||
getContext(true).executeOnCompletion(new IOCallback() {
|
getContext(true).executeOnCompletion(new IOCallback() {
|
||||||
@Override
|
@Override
|
||||||
public void done() {
|
public void done() {
|
||||||
|
@ -525,11 +531,13 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
||||||
} else {
|
} else {
|
||||||
executor.execute(deleteAction);
|
executor.execute(deleteAction);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
deletingLargeMessageTasks.countDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onError(int errorCode, String errorMessage) {
|
public void onError(int errorCode, String errorMessage) {
|
||||||
|
deletingLargeMessageTasks.countDown();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,6 +42,8 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
||||||
import org.apache.activemq.artemis.core.server.impl.JournalLoader;
|
import org.apache.activemq.artemis.core.server.impl.JournalLoader;
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
import org.apache.activemq.artemis.utils.ExecutorFactory;
|
||||||
|
import org.apache.activemq.artemis.utils.Wait;
|
||||||
|
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
|
||||||
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -49,6 +51,7 @@ import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import static java.util.stream.Collectors.toList;
|
import static java.util.stream.Collectors.toList;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
|
@ -212,4 +215,74 @@ public class JournalStorageManagerTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeletingLargeMessagePendingTasksOnShutdown() throws Exception {
|
||||||
|
if (journalType == JournalType.ASYNCIO) {
|
||||||
|
assumeTrue("AIO is not supported on this platform", AIOSequentialFileFactory.isSupported());
|
||||||
|
}
|
||||||
|
final Configuration configuration = createDefaultInVMConfig().setJournalType(journalType);
|
||||||
|
final ExecutorFactory executorFactory = spy(new OrderedExecutorFactory(executor));
|
||||||
|
final ExecutorFactory ioExecutorFactory = new OrderedExecutorFactory(ioExecutor);
|
||||||
|
final ArtemisExecutor artemisExecutor = executorFactory.getExecutor();
|
||||||
|
final ArtemisExecutor artemisExecutorWrapper = spy(artemisExecutor);
|
||||||
|
Mockito.when(executorFactory.getExecutor()).thenReturn(artemisExecutorWrapper);
|
||||||
|
final JournalStorageManager manager = new JournalStorageManager(configuration, null, executorFactory, null, ioExecutorFactory);
|
||||||
|
manager.start();
|
||||||
|
manager.loadBindingJournal(new ArrayList<>(), new ArrayList<>(), new ArrayList<>());
|
||||||
|
final PostOffice postOffice = mock(PostOffice.class);
|
||||||
|
final JournalLoader journalLoader = mock(JournalLoader.class);
|
||||||
|
manager.loadMessageJournal(postOffice, null, null, null, null, null, null, journalLoader);
|
||||||
|
final LargeServerMessage largeMessage = manager.createLargeMessage(manager.generateID() + 1, new CoreMessage().setDurable(true));
|
||||||
|
final SequentialFile file = largeMessage.getFile();
|
||||||
|
|
||||||
|
boolean fileExists = file.exists();
|
||||||
|
manager.getContext(true).storeLineUp();
|
||||||
|
Assert.assertTrue(fileExists);
|
||||||
|
manager.deleteLargeMessageFile(largeMessage);
|
||||||
|
|
||||||
|
final Thread currentThread = Thread.currentThread();
|
||||||
|
final CountDownLatch beforeLatch = new CountDownLatch(1);
|
||||||
|
final CountDownLatch afterStopLatch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
//Simulate an executor task that begins after store done and ends after manager stop begins.
|
||||||
|
artemisExecutor.execute(() -> {
|
||||||
|
try {
|
||||||
|
//Wait until thread is ready to start executing manager stop.
|
||||||
|
Assert.assertTrue(beforeLatch.await(30000, TimeUnit.MILLISECONDS));
|
||||||
|
|
||||||
|
//Wait until thread executing manager stop is waiting for another thread.
|
||||||
|
Assert.assertTrue(Wait.waitFor(() -> currentThread.getState() == Thread.State.TIMED_WAITING, 30000));
|
||||||
|
} catch (Exception ignore) {
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Mockito.doAnswer(invocationOnMock -> {
|
||||||
|
invocationOnMock.callRealMethod();
|
||||||
|
|
||||||
|
if (Thread.currentThread().equals(currentThread) &&
|
||||||
|
invocationOnMock.getArgument(0).getClass().getName().contains(JournalStorageManager.class.getName())) {
|
||||||
|
|
||||||
|
//Simulate an executor task that ends after manager stop.
|
||||||
|
artemisExecutor.execute(() -> {
|
||||||
|
try {
|
||||||
|
//Wait until manager stop is executed.
|
||||||
|
afterStopLatch.await(30000, TimeUnit.MILLISECONDS);
|
||||||
|
} catch (Exception ignore) {
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}).when(artemisExecutorWrapper).execute(Mockito.any(Runnable.class));
|
||||||
|
|
||||||
|
manager.getContext(true).done();
|
||||||
|
|
||||||
|
beforeLatch.countDown();
|
||||||
|
manager.stop();
|
||||||
|
fileExists = file.exists();
|
||||||
|
afterStopLatch.countDown();
|
||||||
|
|
||||||
|
Assert.assertFalse(fileExists);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue