This commit is contained in:
Clebert Suconic 2019-05-29 15:09:25 -04:00
commit 9e3dbde394
2 changed files with 148 additions and 124 deletions

View File

@ -323,19 +323,24 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
*/ */
@Override @Override
protected void performCachedLargeMessageDeletes() { protected void performCachedLargeMessageDeletes() {
largeMessagesToDelete.forEach((messageId, largeServerMessage) -> { storageManagerLock.writeLock().lock();
SequentialFile msg = createFileForLargeMessage(messageId, LargeMessageExtension.DURABLE); try {
try { largeMessagesToDelete.forEach((messageId, largeServerMessage) -> {
msg.delete(); SequentialFile msg = createFileForLargeMessage(messageId, LargeMessageExtension.DURABLE);
} catch (Exception e) { try {
ActiveMQServerLogger.LOGGER.journalErrorDeletingMessage(e, messageId); msg.delete();
} } catch (Exception e) {
if (replicator != null) { ActiveMQServerLogger.LOGGER.journalErrorDeletingMessage(e, messageId);
replicator.largeMessageDelete(messageId, JournalStorageManager.this); }
} if (replicator != null) {
confirmLargeMessage(largeServerMessage); replicator.largeMessageDelete(messageId, JournalStorageManager.this);
}); }
largeMessagesToDelete.clear(); confirmLargeMessage(largeServerMessage);
});
largeMessagesToDelete.clear();
} finally {
storageManagerLock.writeLock().unlock();
}
} }
protected SequentialFile createFileForLargeMessage(final long messageID, final boolean durable) { protected SequentialFile createFileForLargeMessage(final long messageID, final boolean durable) {

View File

@ -15,134 +15,153 @@
*/ */
package org.apache.activemq.artemis.core.persistence.impl.journal; package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.UUID;
import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; import java.util.stream.Stream;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.junit.Assert;
import org.junit.Test;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.impl.JournalLoader;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static java.util.stream.Collectors.toList;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
public class JournalStorageManagerTest { public class JournalStorageManagerTest {
ScheduledExecutorService dumbScheduler = new ScheduledExecutorService() { @Parameterized.Parameter
@Override public JournalType journalType;
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return null;
}
@Override @Parameterized.Parameters(name = "journal type={0}")
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { public static Collection<Object[]> getParams() {
return null; final JournalType[] values = JournalType.values();
} return Stream.of(JournalType.values())
.map(journalType -> new Object[]{journalType})
.collect(toList());
}
@Override private static ExecutorService executor;
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { private static ExecutorService ioExecutor;
return null; private static ExecutorService testExecutor;
}
@Override @BeforeClass
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { public static void initExecutors() {
return null; executor = Executors.newSingleThreadExecutor();
} //to allow concurrent compaction and I/O operations
ioExecutor = Executors.newFixedThreadPool(2);
testExecutor = Executors.newSingleThreadExecutor();
}
@Override @AfterClass
public void shutdown() { public static void destroyExecutors() {
ioExecutor.shutdownNow();
} executor.shutdownNow();
testExecutor.shutdownNow();
@Override }
public List<Runnable> shutdownNow() {
return null;
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public boolean isTerminated() {
return false;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return null;
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return null;
}
@Override
public Future<?> submit(Runnable task) {
return null;
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return null;
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout,
TimeUnit unit) throws InterruptedException {
return null;
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return null;
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout,
TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return null;
}
@Override
public void execute(Runnable command) {
}
};
ExecutorFactory dumbExecutor = new ExecutorFactory() {
@Override
public ArtemisExecutor getExecutor() {
return new ArtemisExecutor() {
@Override
public void execute(Runnable command) {
command.run();
}
};
}
};
/** /**
* Test of fixJournalFileSize method, of class JournalStorageManager. * Test of fixJournalFileSize method, of class JournalStorageManager.
*/ */
@Test @Test
public void testFixJournalFileSize() { public void testFixJournalFileSize() {
JournalStorageManager manager = new JournalStorageManager(new ConfigurationImpl(), null, dumbExecutor, dumbScheduler, dumbExecutor); if (journalType == JournalType.ASYNCIO) {
assumeTrue("AIO is not supported on this platform", AIOSequentialFileFactory.isSupported());
}
final ConfigurationImpl configuration = new ConfigurationImpl().setJournalType(journalType);
final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
final ExecutorFactory ioExecutorFactory = new OrderedExecutorFactory(ioExecutor);
final JournalStorageManager manager = spy(new JournalStorageManager(configuration, null, executorFactory, null, ioExecutorFactory));
Assert.assertEquals(4096, manager.fixJournalFileSize(1024, 4096)); Assert.assertEquals(4096, manager.fixJournalFileSize(1024, 4096));
Assert.assertEquals(4096, manager.fixJournalFileSize(4098, 4096)); Assert.assertEquals(4096, manager.fixJournalFileSize(4098, 4096));
Assert.assertEquals(8192, manager.fixJournalFileSize(8192, 4096)); Assert.assertEquals(8192, manager.fixJournalFileSize(8192, 4096));
} }
@Test(timeout = 20_000)
public void testStopReplicationDoesNotDeadlockWhileStopping() throws Exception {
if (journalType == JournalType.ASYNCIO) {
assumeTrue("AIO is not supported on this platform", AIOSequentialFileFactory.isSupported());
}
final ConfigurationImpl configuration = new ConfigurationImpl().setJournalType(journalType);
final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
final ExecutorFactory ioExecutorFactory = new OrderedExecutorFactory(ioExecutor);
final JournalStorageManager manager = spy(new JournalStorageManager(configuration, null, executorFactory, null, ioExecutorFactory));
manager.start();
manager.loadBindingJournal(new ArrayList<>(), new ArrayList<>(), new ArrayList<>());
final PostOffice postOffice = mock(PostOffice.class);
final JournalLoader journalLoader = mock(JournalLoader.class);
manager.loadMessageJournal(postOffice, null, null, null, null, null, null, journalLoader);
final ReplicationManager replicationManager = mock(ReplicationManager.class);
final PagingManager pagingManager = mock(PagingManager.class);
when(pagingManager.getStoreNames()).thenReturn(new SimpleString[0]);
manager.startReplication(replicationManager, pagingManager, UUID.randomUUID().toString(), false, 0);
final LargeServerMessage largeMessage = manager.createLargeMessage(manager.generateID() + 1, new CoreMessage());
largeMessage.setDurable(true);
when(replicationManager.isSynchronizing()).thenReturn(true);
largeMessage.deleteFile();
final long pendingRecordID = largeMessage.getPendingRecordID();
final AtomicReference<CompletableFuture<Void>> stopReplication = new AtomicReference<>();
doAnswer(invocation -> {
final CompletableFuture<Void> finished = new CompletableFuture<>();
final CountDownLatch beforeStopReplication;
if (stopReplication.compareAndSet(null, finished)) {
beforeStopReplication = new CountDownLatch(1);
//before the deadlock fix:
//manager::stop is already owning the large message lock here
//but not yet the manager read lock
testExecutor.execute(() -> {
beforeStopReplication.countDown();
try {
//it needs to acquire the manager write lock
//and large message lock next
manager.stopReplication();
finished.complete(null);
} catch (Exception e) {
finished.completeExceptionally(e);
}
});
} else {
beforeStopReplication = null;
}
if (beforeStopReplication != null) {
beforeStopReplication.await();
//do not remove this sleep: before the deadlock fix
//it was needed to give manager::stopReplication the chance to acquire
//the manager write lock before manager::stop
TimeUnit.MILLISECONDS.sleep(500);
}
//confirmPendingLargeMessage will acquire manager read lock
return invocation.callRealMethod();
}).when(manager).confirmPendingLargeMessage(pendingRecordID);
manager.stop();
final CompletableFuture<Void> stoppedReplication = stopReplication.get();
Assert.assertNotNull(stoppedReplication);
stoppedReplication.get();
}
} }