This commit is contained in:
Clebert Suconic 2019-01-17 17:51:52 -05:00
commit a04f8053b4
2 changed files with 99 additions and 5 deletions

View File

@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RefCountMessageListener;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
@ -187,7 +188,15 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
public synchronized void incrementDelayDeletionCount() {
delayDeletionCount.incrementAndGet();
try {
incrementRefCount();
if (paged) {
RefCountMessageListener tmpContext = super.getContext();
setContext(null);
incrementRefCount();
setContext(tmpContext);
} else {
incrementRefCount();
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorIncrementDelayDeletionCount(e);
}
@ -226,7 +235,15 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
@Override
public synchronized int decrementRefCount() throws Exception {
int currentRefCount = super.decrementRefCount();
int currentRefCount;
if (paged) {
RefCountMessageListener tmpContext = super.getContext();
setContext(null);
currentRefCount = super.decrementRefCount();
setContext(tmpContext);
} else {
currentRefCount = super.decrementRefCount();
}
// We use <= as this could be used by load.
// because of a failure, no references were loaded, so we have 0... and we still need to delete the associated
@ -234,7 +251,6 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
if (delayDeletionCount.get() <= 0) {
checkDelete();
}
return currentRefCount;
}
@ -534,5 +550,4 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
}
}
}

View File

@ -40,6 +40,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -2475,6 +2476,84 @@ public class LargeMessageTest extends LargeMessageTestBase {
session.close();
}
@Test
public void testGlobalSizeBytesAndAddressSizeOnPage() throws Exception {
testGlobalSizeBytesAndAddressSize(true);
}
@Test
public void testGlobalSizeBytesAndAddressSize() throws Exception {
testGlobalSizeBytesAndAddressSize(false);
}
public void testGlobalSizeBytesAndAddressSize(boolean isPage) throws Exception {
ActiveMQServer server = createServer(true, isNetty(), storeType);
server.start();
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
ClientSession session = sf.createSession(false, false);
LargeServerMessageImpl fileMessage = new LargeServerMessageImpl((JournalStorageManager) server.getStorageManager());
fileMessage.setMessageID(1005);
for (int i = 0; i < largeMessageSize; i++) {
fileMessage.addBytes(new byte[]{ActiveMQTestBase.getSamplebyte(i)});
}
fileMessage.releaseResources();
session.createQueue(ADDRESS, ADDRESS, true);
PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
if (isPage) {
store.startPaging();
}
ClientProducer prod = session.createProducer(ADDRESS);
prod.send(fileMessage);
fileMessage.deleteFile();
session.commit();
if (isPage) {
server.getPagingManager().getPageStore(ADDRESS).getCursorProvider().clearCache();
}
if (isPage) {
Assert.assertEquals(0, server.getPagingManager().getPageStore(ADDRESS).getAddressSize());
Assert.assertEquals(0, server.getPagingManager().getGlobalSize());
} else {
Assert.assertNotEquals(0, server.getPagingManager().getPageStore(ADDRESS).getAddressSize());
Assert.assertNotEquals(0, server.getPagingManager().getGlobalSize());
}
session.start();
ClientConsumer cons = session.createConsumer(ADDRESS);
ClientMessage msg = cons.receive(5000);
Assert.assertNotNull(msg);
msg.acknowledge();
session.commit();
Assert.assertEquals(0, server.getPagingManager().getPageStore(ADDRESS).getAddressSize());
Assert.assertEquals(0, server.getPagingManager().getGlobalSize());
session.close();
cons.close();
}
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------