ARTEMIS-3848 High CPU usage on ReadWriteLocks

This is caused by too many entries on the HashMap for ThreadLocals.
Also: I'm reviewing some readlock usage on the StorageManager to simplify things a little bit.
This commit is contained in:
Clebert Suconic 2022-05-25 09:21:51 -04:00 committed by clebertsuconic
parent f14ba5f742
commit a2dd805a2f
13 changed files with 122 additions and 170 deletions

View File

@ -25,27 +25,23 @@ package org.apache.activemq.artemis.utils.actors;
*/
public abstract class HandlerBase {
//marker instance used to recognize if a thread is performing a packet handling
private static final Object DUMMY = Boolean.TRUE;
// this cannot be static as the Actor will be used within another executor. For that reason
// each instance will have its own ThreadLocal.
// ... a thread that has its thread-local map populated with DUMMY while performing a handler
private final ThreadLocal<Object> inHandler = new ThreadLocal<>();
protected void enter() {
assert inHandler.get() == null : "should be null";
inHandler.set(DUMMY);
private static class Counter {
int count = 0;
}
public boolean inHandler() {
final Object dummy = inHandler.get();
return dummy != null;
/** an actor could be used within an OrderedExecutor. So we need this counter to decide if there's a Handler anywhere in the stack trace */
private static final ThreadLocal<Counter> inHandler = ThreadLocal.withInitial(() -> new Counter());
protected static void enter() {
inHandler.get().count++;
}
protected void leave() {
assert inHandler.get() != null : "marker not set";
inHandler.set(null);
public static boolean inHandler() {
return inHandler.get().count > 0;
}
protected static void leave() {
inHandler.get().count--;
}
}

View File

@ -62,7 +62,7 @@ public class NettyConnection implements Connection {
* here for when the connection (or Netty Channel) becomes available again.
*/
private final List<ReadyListener> readyListeners = new ArrayList<>();
private final FastThreadLocal<ArrayList<ReadyListener>> localListenersPool = new FastThreadLocal<>();
private static final FastThreadLocal<ArrayList<ReadyListener>> readyListenersPool = new FastThreadLocal<>();
private final boolean batchingEnabled;
@ -138,51 +138,47 @@ public class NettyConnection implements Connection {
@Override
public final void fireReady(final boolean ready) {
ArrayList<ReadyListener> readyToCall = localListenersPool.get();
if (readyToCall != null) {
localListenersPool.set(null);
// We are reusing a previously created ArrayList for this localArray
ArrayList<ReadyListener> localArrayList = readyListenersPool.get();
if (localArrayList == null) {
localArrayList = new ArrayList<>();
readyListenersPool.set(localArrayList);
} else {
localArrayList.clear();
}
synchronized (readyListeners) {
this.ready = ready;
if (ready) {
final int size = this.readyListeners.size();
if (readyToCall != null) {
readyToCall.ensureCapacity(size);
}
localArrayList.ensureCapacity(size);
try {
for (int i = 0; i < size; i++) {
final ReadyListener readyListener = readyListeners.get(i);
if (readyListener == null) {
break;
}
if (readyToCall == null) {
readyToCall = new ArrayList<>(size);
}
readyToCall.add(readyListener);
localArrayList.add(readyListener);
}
} finally {
readyListeners.clear();
}
}
}
if (readyToCall != null) {
try {
readyToCall.forEach(readyListener -> {
try {
readyListener.readyForWriting();
} catch (Throwable logOnly) {
ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(logOnly);
}
});
} catch (Throwable t) {
ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(t);
} finally {
readyToCall.clear();
if (localListenersPool.get() != null) {
localListenersPool.set(readyToCall);
try {
localArrayList.forEach(readyListener -> {
try {
readyListener.readyForWriting();
} catch (Throwable logOnly) {
ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(logOnly);
}
}
});
} catch (Throwable t) {
ActiveMQClientLogger.LOGGER.failedToSetChannelReadyForWriting(t);
} finally {
localArrayList.clear();
}
}

View File

@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.paging;
import java.io.File;
import java.util.Collection;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RefCountMessageListener;
@ -89,14 +88,11 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
/**
* Write message to page if we are paging.
*
* @param readLock a read lock from the storage manager. This is an encapsulation violation made
* to keep the code less complex. If give {@code null} the method will throw a
* {@link NullPointerException}
* @return {@code true} if we are paging and have handled the data, {@code false} if the data
* needs to be sent to the journal
* @throws NullPointerException if {@code readLock} is null
*/
boolean page(Message message, Transaction tx, RouteContextList listCtx, ReadLock readLock) throws Exception;
boolean page(Message message, Transaction tx, RouteContextList listCtx) throws Exception;
Page createPage(int page) throws Exception;

View File

@ -30,7 +30,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -897,8 +896,7 @@ public class PagingStoreImpl implements PagingStore {
@Override
public boolean page(Message message,
final Transaction tx,
RouteContextList listCtx,
final ReadLock managerLock) throws Exception {
RouteContextList listCtx) throws Exception {
if (!running) {
return false;
@ -941,62 +939,53 @@ public class PagingStoreImpl implements PagingStore {
lock.readLock().unlock();
}
if (managerLock != null) {
managerLock.lock();
}
lock.writeLock().lock();
try {
lock.writeLock().lock();
if (!paging) {
return false;
}
try {
if (!paging) {
return false;
}
final long transactionID = tx == null ? -1 : tx.getID();
PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), transactionID);
final long transactionID = tx == null ? -1 : tx.getID();
PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), transactionID);
if (message.isLargeMessage()) {
((LargeServerMessage) message).setPaged();
}
if (message.isLargeMessage()) {
((LargeServerMessage) message).setPaged();
}
int bytesToWrite = pagedMessage.getEncodeSize() + Page.SIZE_RECORD;
int bytesToWrite = pagedMessage.getEncodeSize() + Page.SIZE_RECORD;
currentPageSize += bytesToWrite;
if (currentPageSize > pageSize && currentPage.getNumberOfMessages() > 0) {
// Make sure nothing is currently validating or using currentPage
openNewPage();
currentPageSize += bytesToWrite;
if (currentPageSize > pageSize && currentPage.getNumberOfMessages() > 0) {
// Make sure nothing is currently validating or using currentPage
openNewPage();
currentPageSize += bytesToWrite;
}
if (tx != null) {
installPageTransaction(tx, listCtx);
}
// the apply counter will make sure we write a record on journal
// especially on the case for non transactional sends and paging
// doing this will give us a possibility of recovering the page counters
long persistentSize = pagedMessage.getPersistentSize() > 0 ? pagedMessage.getPersistentSize() : 0;
final Page page = currentPage;
applyPageCounters(tx, page, listCtx, persistentSize);
page.write(pagedMessage);
if (tx == null && syncNonTransactional && message.isDurable()) {
sync();
}
if (logger.isTraceEnabled()) {
logger.tracef("Paging message %s on pageStore %s pageNr=%d", pagedMessage, getStoreName(), page.getPageId());
}
return true;
} finally {
lock.writeLock().unlock();
}
if (tx != null) {
installPageTransaction(tx, listCtx);
}
// the apply counter will make sure we write a record on journal
// especially on the case for non transactional sends and paging
// doing this will give us a possibility of recovering the page counters
long persistentSize = pagedMessage.getPersistentSize() > 0 ? pagedMessage.getPersistentSize() : 0;
final Page page = currentPage;
applyPageCounters(tx, page, listCtx, persistentSize);
page.write(pagedMessage);
if (tx == null && syncNonTransactional && message.isDurable()) {
sync();
}
if (logger.isTraceEnabled()) {
logger.tracef("Paging message %s on pageStore %s pageNr=%d", pagedMessage, getStoreName(), page.getPageId());
}
return true;
} finally {
if (managerLock != null) {
managerLock.unlock();
}
lock.writeLock().unlock();
}
}

View File

@ -436,11 +436,6 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
/**
* Write message to page if we are paging.
* <p>
* This is primarily a {@link PagingStore} call, but as with any other call writing persistent
* data, it must go through here. Both for the sake of replication, and also to ensure that it
* takes the locks (storage manager and pagingStore) in the right order. Avoiding thus the
* creation of dead-locks.
*
* @return {@code true} if we are paging and have handled the data, {@code false} if the data
* needs to be sent to the journal

View File

@ -174,12 +174,20 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
protected final ScheduledExecutorService scheduledExecutorService;
protected final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(true);
protected final ReentrantReadWriteLock storageManagerLock = new ReentrantReadWriteLock(false);
// I would rather cache the Closeable instance here..
// I never know when the JRE decides to create a new instance on every call.
// So I'm playing safe here. That's all
protected final ArtemisCloseable unlockCloseable = storageManagerLock.readLock()::unlock;
protected final ArtemisCloseable unlockCloseable = this::unlockCloseable;
protected static final ArtemisCloseable dummyCloseable = () -> { };
private static final ThreadLocal<Boolean> reentrant = ThreadLocal.withInitial(() -> false);
private void unlockCloseable() {
storageManagerLock.readLock().unlock();
reentrant.set(false);
}
protected Journal messageJournal;
@ -395,6 +403,12 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
@Override
public ArtemisCloseable closeableReadLock() {
if (reentrant.get()) {
return dummyCloseable;
}
reentrant.set(true);
CriticalCloseable measure = measureCritical(CRITICAL_STORE);
storageManagerLock.readLock().lock();
@ -415,20 +429,6 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
}
}
/**
* for internal use and testsuite, don't use it outside of tests
*/
public void writeLock() {
storageManagerLock.writeLock().lock();
}
/**
* for internal use and testsuite, don't use it outside of tests
*/
public void writeUnlock() {
storageManagerLock.writeLock().unlock();
}
@Override
public void storeAcknowledge(final long queueID, final long messageID) throws Exception {
try (ArtemisCloseable lock = closeableReadLock()) {
@ -2150,17 +2150,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
@Override
public boolean addToPage(PagingStore store, Message msg, Transaction tx, RouteContextList listCtx) throws Exception {
/**
* Exposing the read-lock here is an encapsulation violation done in order to keep the code
* simpler. The alternative would be to add a second method, say 'verifyPaging', to
* PagingStore.
* <p>
* Adding this second method would also be more surprise prone as it would require a certain
* calling order.
* <p>
* The reasoning is that exposing the lock is more explicit and therefore `less bad`.
*/
return store.page(msg, tx, listCtx, storageManagerLock.readLock());
try (ArtemisCloseable closeable = closeableReadLock()) {
return store.page(msg, tx, listCtx);
}
}
private void installLargeMessageConfirmationOnTX(Transaction tx, long recordID) {

View File

@ -642,18 +642,8 @@ public class NullStorageManager implements StorageManager {
Message msg,
Transaction tx,
RouteContextList listCtx) throws Exception {
/**
* Exposing the read-lock here is an encapsulation violation done in order to keep the code
* simpler. The alternative would be to add a second method, say 'verifyPaging', to
* PagingStore.
* <p>
* Adding this second method would also be more surprise prone as it would require a certain
* calling order.
* <p>
* The reasoning is that exposing the lock is more explicit and therefore `less bad`.
*/
if (store != null) {
return store.page(msg, tx, listCtx, null);
return store.page(msg, tx, listCtx);
} else {
return false;
}

View File

@ -1131,7 +1131,7 @@
<arg>--add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED</arg>
<arg>--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED</arg>
<arg>-XDcompilePolicy=simple</arg>
<arg>-Xplugin:ErrorProne -Xep:MissingOverride:ERROR -Xep:NonAtomicVolatileUpdate:ERROR -Xep:SynchronizeOnNonFinalField:ERROR -Xep:StaticQualifiedUsingExpression:ERROR -Xep:WaitNotInLoop:ERROR -XepExcludedPaths:.*/generated-sources/.*</arg>
<arg>-Xplugin:ErrorProne -Xep:ThreadLocalUsage:ERROR -Xep:MissingOverride:ERROR -Xep:NonAtomicVolatileUpdate:ERROR -Xep:SynchronizeOnNonFinalField:ERROR -Xep:StaticQualifiedUsingExpression:ERROR -Xep:WaitNotInLoop:ERROR -XepExcludedPaths:.*/generated-sources/.*</arg>
</compilerArgs>
</configuration>
</plugin>
@ -1165,7 +1165,7 @@
<arg>--add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED</arg>
<arg>--add-exports=java.base/jdk.internal.misc=ALL-UNNAMED</arg>
<arg>-XDcompilePolicy=simple</arg>
<arg>-Xplugin:ErrorProne -Xep:MissingOverride:WARN -Xep:NonAtomicVolatileUpdate:ERROR -Xep:SynchronizeOnNonFinalField:ERROR -Xep:StaticQualifiedUsingExpression:ERROR -Xep:WaitNotInLoop:ERROR -XepExcludedPaths:.*/generated-sources/.*</arg>
<arg>-Xplugin:ErrorProne -Xep:ThreadLocalUsage:ERROR -Xep:MissingOverride:WARN -Xep:NonAtomicVolatileUpdate:ERROR -Xep:SynchronizeOnNonFinalField:ERROR -Xep:StaticQualifiedUsingExpression:ERROR -Xep:WaitNotInLoop:ERROR -XepExcludedPaths:.*/generated-sources/.*</arg>
<arg>-J--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED</arg>
<arg>-J--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED</arg>
<arg>-J--add-exports=jdk.compiler/com.sun.tools.javac.main=ALL-UNNAMED</arg>

View File

@ -198,7 +198,7 @@ public class RaceOnCursorIteratorTest extends ActiveMQTestBase {
final RoutingContextImpl ctx = new RoutingContextImpl(null);
ctx.addQueue(ADDRESS, queue);
pagingStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS), lock);
pagingStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS));
return msg;
}

View File

@ -23,7 +23,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.Message;
@ -354,8 +353,7 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
@Override
public boolean page(Message message,
Transaction tx,
RouteContextList listCtx,
ReentrantReadWriteLock.ReadLock readLock) throws Exception {
RouteContextList listCtx) throws Exception {
return false;
}

View File

@ -408,7 +408,7 @@ public class PageCursorStressTest extends ActiveMQTestBase {
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS), lock));
Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS)));
PagedReference readMessage = iterator.next();
@ -441,7 +441,7 @@ public class PageCursorStressTest extends ActiveMQTestBase {
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS), lock));
Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS)));
}
PagedReference readMessage = iterator.next();
@ -471,7 +471,7 @@ public class PageCursorStressTest extends ActiveMQTestBase {
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS), lock));
Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS)));
}
PagedReference readMessage = iterator.next();
@ -556,7 +556,7 @@ public class PageCursorStressTest extends ActiveMQTestBase {
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS), lock));
Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS)));
}
if (tx != null) {
@ -775,7 +775,7 @@ public class PageCursorStressTest extends ActiveMQTestBase {
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS), lock));
Assert.assertTrue(pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS)));
}
return pageStore.getNumberOfPages();
@ -872,7 +872,7 @@ public class PageCursorStressTest extends ActiveMQTestBase {
Message msg = new CoreMessage(storage.generateID(), buffer.writerIndex());
msg.getBodyBuffer().writeBytes(buffer, 0, buffer.writerIndex());
msg.putIntProperty("key", i);
pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS), lock);
pageStore.page(msg, ctx.getTransaction(), ctx.getContextListing(ADDRESS));
}
return txImpl;

View File

@ -66,11 +66,11 @@ public class PagingManagerImplTest extends ActiveMQTestBase {
ICoreMessage msg = createMessage(1L, new SimpleString("simple-test"), createRandomBuffer(10));
final RoutingContextImpl ctx = new RoutingContextImpl(null);
Assert.assertFalse(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName()), lock));
Assert.assertFalse(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName())));
store.startPaging();
Assert.assertTrue(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName()), lock));
Assert.assertTrue(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName())));
Page page = store.depage();
@ -89,7 +89,7 @@ public class PagingManagerImplTest extends ActiveMQTestBase {
Assert.assertNull(store.depage());
final RoutingContextImpl ctx2 = new RoutingContextImpl(null);
Assert.assertFalse(store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName()), lock));
Assert.assertFalse(store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName())));
}

View File

@ -176,7 +176,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
Assert.assertTrue(storeImpl.isPaging());
final RoutingContextImpl ctx = new RoutingContextImpl(null);
Assert.assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()), lock));
Assert.assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName())));
Assert.assertEquals(1, storeImpl.getNumberOfPages());
@ -219,7 +219,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
Message msg = createMessage(i, storeImpl, destination, buffer);
final RoutingContextImpl ctx = new RoutingContextImpl(null);
Assert.assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()), lock));
Assert.assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName())));
}
@ -286,7 +286,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
msg.putIntProperty("page", page);
final RoutingContextImpl ctx = new RoutingContextImpl(null);
ctx.addQueue(fakeQueue.getName(), fakeQueue);
Assert.assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()), lock));
Assert.assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName())));
if (i > 0 && i % 10 == 0) {
storeImpl.forceAnotherPage();
page++;
@ -372,7 +372,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
msg.putIntProperty("page", page);
final RoutingContextImpl ctx = new RoutingContextImpl(null);
ctx.addQueue(fakeQueue.getName(), fakeQueue);
Assert.assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName()), lock));
Assert.assertTrue(storeImpl.page(msg, ctx.getTransaction(), ctx.getContextListing(storeImpl.getStoreName())));
if (i > 0 && i % 10 == 0) {
storeImpl.forceAnotherPage();
page++;
@ -474,7 +474,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
Message msg = createMessage(i, store, destination, buffer);
final RoutingContextImpl ctx = new RoutingContextImpl(null);
Assert.assertTrue(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName()), lock));
Assert.assertTrue(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName())));
}
Assert.assertEquals(2, store.getNumberOfPages());
@ -509,7 +509,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
Message msg = createMessage(1, store, destination, buffers.get(0));
final RoutingContextImpl ctx = new RoutingContextImpl(null);
Assert.assertTrue(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName()), lock));
Assert.assertTrue(store.page(msg, ctx.getTransaction(), ctx.getContextListing(store.getStoreName())));
Page newPage = store.depage();
@ -529,14 +529,14 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
{
final RoutingContextImpl ctx2 = new RoutingContextImpl(null);
Assert.assertFalse(store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName()), lock));
Assert.assertFalse(store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName())));
}
store.startPaging();
{
final RoutingContextImpl ctx2 = new RoutingContextImpl(null);
Assert.assertTrue(store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName()), lock));
Assert.assertTrue(store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName())));
}
Page page = store.depage();
@ -619,7 +619,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
// Just using the internal API to remove it from the page file system
Message msg = createMessage(id, storeImpl, destination, createRandomBuffer(id, 5));
final RoutingContextImpl ctx2 = new RoutingContextImpl(null);
if (storeImpl.page(msg, ctx2.getTransaction(), ctx2.getContextListing(storeImpl.getStoreName()), lock)) {
if (storeImpl.page(msg, ctx2.getTransaction(), ctx2.getContextListing(storeImpl.getStoreName()))) {
buffers.put(id, msg);
} else {
break;
@ -736,7 +736,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
storeImpl2.forceAnotherPage();
final RoutingContextImpl ctx = new RoutingContextImpl(null);
storeImpl2.page(lastMsg, ctx.getTransaction(), ctx.getContextListing(storeImpl2.getStoreName()), lock);
storeImpl2.page(lastMsg, ctx.getTransaction(), ctx.getContextListing(storeImpl2.getStoreName()));
buffers2.put(lastMessageId, lastMsg);
Page lastPage = null;
@ -858,7 +858,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
msg.putLongProperty("count", i);
final RoutingContextImpl ctx2 = new RoutingContextImpl(null);
while (!store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName()), lock)) {
while (!store.page(msg, ctx2.getTransaction(), ctx2.getContextListing(store.getStoreName()))) {
store.startPaging();
}
@ -1282,7 +1282,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
msg.putLongProperty("count", id);
final RoutingContextImpl ctx2 = new RoutingContextImpl(null);
storeImpl.page(msg, ctx2.getTransaction(), ctx2.getContextListing(storeImpl.getStoreName()), lock);
storeImpl.page(msg, ctx2.getTransaction(), ctx2.getContextListing(storeImpl.getStoreName()));
}
private CoreMessage createMessage(final long id,