From 270b383e80296fb47dba6a719ef1616ddcaab1ef Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Sat, 3 Nov 2018 16:37:26 +0100 Subject: [PATCH] ARTEMIS-1710 Allow management msgs to exceed global-max-size limit --- .../artemis/cli/commands/tools/DBOption.java | 4 +- .../amqp/broker/AMQPSessionCallback.java | 8 +- .../protocol/openwire/amq/AMQSession.java | 25 +++-- .../management/impl/AddressControlImpl.java | 28 ++++-- .../core/paging/impl/PagingManagerImpl.java | 18 +++- .../AbstractJournalStorageManager.java | 3 + .../core/postoffice/impl/BindingsImpl.java | 6 +- .../core/postoffice/impl/PostOfficeImpl.java | 6 +- .../artemis/core/server/QueueConfig.java | 8 +- .../core/server/impl/ActiveMQServerImpl.java | 2 +- .../server/impl/PostOfficeJournalLoader.java | 2 +- .../core/server/impl/ScaleDownHandler.java | 13 ++- .../core/server/impl/ServerSessionImpl.java | 4 +- .../server/files/FileMoveManagerTest.java | 3 +- .../region/policy/DestinationProxy.java | 15 ++- .../integration/paging/GlobalPagingTest.java | 95 +++++++++++++++++++ .../replication/ReplicationTest.java | 2 +- .../postoffice/impl/BindingsImplTest.java | 2 +- 18 files changed, 203 insertions(+), 41 deletions(-) diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java index aa0b47a3f8..2f2d8e9140 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/DBOption.java @@ -241,11 +241,11 @@ public class DBOption extends OptionalLocking { storageManager, 1000L, scheduledExecutorService, executorFactory, false, null); - pagingmanager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository); + pagingmanager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository, configuration.getManagementAddress()); } else { storageManager = new JournalStorageManager(config, EmptyCriticalAnalyzer.getInstance(), executorFactory, executorFactory); PagingStoreFactory pageStoreFactory = new PagingStoreFactoryNIO(storageManager, config.getPagingLocation(), 1000L, scheduledExecutorService, executorFactory, true, null); - pagingmanager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository); + pagingmanager = new PagingManagerImpl(pageStoreFactory, addressSettingsRepository, configuration.getManagementAddress()); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 3d8ae5a934..61816af43a 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -486,7 +486,7 @@ public class AMQPSessionCallback implements SessionCallback { try { PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddressSimpleString()); - if (store.isRejectingMessages()) { + if (store != null && store.isRejectingMessages()) { // We drop pre-settled messages (and abort any associated Tx) if (delivery.remotelySettled()) { if (transaction != null) { @@ -585,7 +585,11 @@ public class AMQPSessionCallback implements SessionCallback { pagingManager.checkMemory(runnable); } else { final PagingStore store = manager.getServer().getPagingManager().getPageStore(address); - store.checkMemory(runnable); + if (store != null) { + store.checkMemory(runnable); + } else { + runnable.run(); + } } } catch (Exception e) { throw new RuntimeException(e); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 0429297b89..bad9bc5c82 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -418,9 +418,13 @@ public class AMQSession implements SessionCallback { //non-persistent messages goes here, by default we stop reading from //transport connection.getTransportConnection().setAutoRead(false); - if (!store.checkMemory(enableAutoReadAndTtl)) { - enableAutoReadAndTtl(); - throw new ResourceAllocationException("Queue is full " + address); + if (store != null) { + if (!store.checkMemory(enableAutoReadAndTtl)) { + enableAutoReadAndTtl(); + throw new ResourceAllocationException("Queue is full " + address); + } + } else { + enableAutoReadAndTtl.run(); } getCoreSession().send(coreMsg, false, dest.isTemporary()); @@ -443,7 +447,7 @@ public class AMQSession implements SessionCallback { final AtomicInteger count, final org.apache.activemq.artemis.api.core.Message coreMsg, final SimpleString address) throws ResourceAllocationException { - if (!store.checkMemory(false, () -> { + final Runnable task = () -> { Exception exceptionToSend = null; try { @@ -496,10 +500,15 @@ public class AMQSession implements SessionCallback { }); } } - })) { - this.connection.getContext().setDontSendReponse(false); - connection.enableTtl(); - throw new ResourceAllocationException("Queue is full " + address); + }; + if (store != null) { + if (!store.checkMemory(false, task)) { + this.connection.getContext().setDontSendReponse(false); + connection.enableTtl(); + throw new ResourceAllocationException("Queue is full " + address); + } + } else { + task.run(); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java index 0eb39e0ea3..b24c370dac 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java @@ -201,17 +201,29 @@ public class AddressControlImpl extends AbstractControl implements AddressContro public long getNumberOfBytesPerPage() throws Exception { clearIO(); try { - return pagingManager.getPageStore(addressInfo.getName()).getPageSizeBytes(); + final PagingStore pagingStore = getPagingStore(); + if (pagingStore == null) { + return 0; + } + return pagingStore.getPageSizeBytes(); } finally { blockOnIO(); } } + private PagingStore getPagingStore() throws Exception { + return pagingManager.getPageStore(addressInfo.getName()); + } + @Override public long getAddressSize() throws Exception { clearIO(); try { - return pagingManager.getPageStore(addressInfo.getName()).getAddressSize(); + final PagingStore pagingStore = getPagingStore(); + if (pagingStore == null) { + return 0; + } + return pagingStore.getAddressSize(); } finally { blockOnIO(); } @@ -240,7 +252,11 @@ public class AddressControlImpl extends AbstractControl implements AddressContro public boolean isPaging() throws Exception { clearIO(); try { - return pagingManager.getPageStore(addressInfo.getName()).isPaging(); + final PagingStore pagingStore = getPagingStore(); + if (pagingStore == null) { + return false; + } + return pagingStore.isPaging(); } finally { blockOnIO(); } @@ -250,12 +266,12 @@ public class AddressControlImpl extends AbstractControl implements AddressContro public int getNumberOfPages() throws Exception { clearIO(); try { - PagingStore pageStore = pagingManager.getPageStore(addressInfo.getName()); + final PagingStore pageStore = getPagingStore(); - if (!pageStore.isPaging()) { + if (pageStore == null || !pageStore.isPaging()) { return 0; } else { - return pagingManager.getPageStore(addressInfo.getName()).getNumberOfPages(); + return pageStore.getNumberOfPages(); } } finally { blockOnIO(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java index 07f5ad7a7d..357fda2ef6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java @@ -86,6 +86,8 @@ public final class PagingManagerImpl implements PagingManager { private ActiveMQScheduledComponent scheduledComponent = null; + private final SimpleString managementAddress; + // Static // -------------------------------------------------------------------------------------------------------------------------- @@ -105,17 +107,25 @@ public final class PagingManagerImpl implements PagingManager { public PagingManagerImpl(final PagingStoreFactory pagingSPI, final HierarchicalRepository addressSettingsRepository, - final long maxSize) { + final long maxSize, + final SimpleString managementAddress) { pagingStoreFactory = pagingSPI; this.addressSettingsRepository = addressSettingsRepository; addressSettingsRepository.registerListener(this); this.maxSize = maxSize; this.memoryExecutor = pagingSPI.newExecutor(); + this.managementAddress = managementAddress; } public PagingManagerImpl(final PagingStoreFactory pagingSPI, final HierarchicalRepository addressSettingsRepository) { - this(pagingSPI, addressSettingsRepository, -1); + this(pagingSPI, addressSettingsRepository, -1, null); + } + + public PagingManagerImpl(final PagingStoreFactory pagingSPI, + final HierarchicalRepository addressSettingsRepository, + final SimpleString managementAddress) { + this(pagingSPI, addressSettingsRepository, -1, managementAddress); } @Override @@ -329,6 +339,9 @@ public final class PagingManagerImpl implements PagingManager { */ @Override public PagingStore getPageStore(final SimpleString storeName) throws Exception { + if (managementAddress != null && storeName.startsWith(managementAddress)) { + return null; + } PagingStore store = stores.get(storeName); if (store != null) { @@ -438,6 +451,7 @@ public final class PagingManagerImpl implements PagingManager { } private PagingStore newStore(final SimpleString address) throws Exception { + assert managementAddress == null || (managementAddress != null && !address.startsWith(managementAddress)); syncLock.readLock().lock(); try { PagingStore store = stores.get(address); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index 4eaa08d84b..9624c01a28 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -1244,6 +1244,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp if (queueInfo != null) { SimpleString address = queueInfo.getAddress(); PagingStore store = pagingManager.getPageStore(address); + if (store == null) { + return null; + } subs = store.getCursorProvider().getSubscription(queueID); pageSubscriptions.put(queueID, subs); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index c20e988380..56abddbc09 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -30,7 +30,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; -import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; @@ -62,13 +61,10 @@ public final class BindingsImpl implements Bindings { private final GroupingHandler groupingHandler; - private final PagingStore pageStore; - private final SimpleString name; - public BindingsImpl(final SimpleString name, final GroupingHandler groupingHandler, final PagingStore pageStore) { + public BindingsImpl(final SimpleString name, final GroupingHandler groupingHandler) { this.groupingHandler = groupingHandler; - this.pageStore = pageStore; this.name = name; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index e573f85ea8..5346d6c736 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -1252,7 +1252,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding for (Map.Entry entry : context.getContexListing().entrySet()) { PagingStore store = pagingManager.getPageStore(entry.getKey()); - if (storageManager.addToPage(store, message, context.getTransaction(), entry.getValue())) { + if (store != null && storageManager.addToPage(store, message, context.getTransaction(), entry.getValue())) { if (message.isLargeMessage()) { confirmLargeMessageSend(tx, message); } @@ -1696,9 +1696,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding } @Override - public Bindings createBindings(final SimpleString address) throws Exception { + public Bindings createBindings(final SimpleString address) { GroupingHandler groupingHandler = server.getGroupingHandler(); - BindingsImpl bindings = new BindingsImpl(address, groupingHandler, pagingManager.getPageStore(address)); + BindingsImpl bindings = new BindingsImpl(address, groupingHandler); if (groupingHandler != null) { groupingHandler.addListener(bindings); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java index e682891163..53226b41d5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java @@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.FilterUtils; import org.apache.activemq.artemis.core.paging.PagingManager; +import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; public final class QueueConfig { @@ -202,7 +203,12 @@ public final class QueueConfig { final PageSubscription pageSubscription; if (pagingManager != null && !FilterUtils.isTopicIdentification(filter)) { try { - pageSubscription = this.pagingManager.getPageStore(address).getCursorProvider().createSubscription(id, filter, durable); + final PagingStore pageStore = this.pagingManager.getPageStore(address); + if (pageStore != null) { + pageSubscription = pageStore.getCursorProvider().createSubscription(id, filter, durable); + } else { + pageSubscription = null; + } } catch (Exception e) { throw new IllegalStateException(e); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index f14c1d285d..aad9cecf35 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2384,7 +2384,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { @Override public PagingManager createPagingManager() throws Exception { - return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize()); + return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository, configuration.getGlobalMaxSize(), configuration.getManagementAddress()); } protected PagingStoreFactory getPagingStoreFactory() throws Exception { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index cf1d145607..5dba891343 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -363,7 +363,7 @@ public class PostOfficeJournalLoader implements JournalLoader { // This can't be true! assert (perQueue != null); - if (store.checkPageFileExists(pageId.intValue())) { + if (store != null && store.checkPageFileExists(pageId.intValue())) { // on this case we need to recalculate the records Page pg = store.createPage(pageId.intValue()); pg.open(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java index 02fe1bf269..7585165b31 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java @@ -154,7 +154,9 @@ public class ScaleDownHandler { Transaction tx = new TransactionImpl(storageManager); - pageStore.disableCleanup(); + if (pageStore != null) { + pageStore.disableCleanup(); + } try { @@ -240,8 +242,10 @@ public class ScaleDownHandler { return messageCount; } finally { - pageStore.enableCleanup(); - pageStore.getCursorProvider().scheduleCleanup(); + if (pageStore != null) { + pageStore.enableCleanup(); + pageStore.getCursorProvider().scheduleCleanup(); + } } } @@ -556,6 +560,9 @@ public class ScaleDownHandler { public boolean lookup(MessageReference reference) throws Exception { if (reference.isPaged()) { + if (store == null) { + return false; + } PageSubscription subscription = store.getCursorProvider().getSubscription(queue.getID()); if (subscription.contains((PagedReference) reference)) { return true; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 0516335f69..d13cd76014 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1536,7 +1536,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener { final SimpleString addr = removePrefix(address); PagingStore store = server.getPagingManager().getPageStore(addr); - if (!store.checkMemory(new Runnable() { + if (store == null) { + callback.sendProducerCreditsMessage(credits, address); + } else if (!store.checkMemory(new Runnable() { @Override public void run() { callback.sendProducerCreditsMessage(credits, address); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java index f47c827571..bf8cfb2e58 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java @@ -28,6 +28,7 @@ import java.io.PrintWriter; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.impl.PagingManagerImpl; @@ -304,7 +305,7 @@ public class FileMoveManagerTest { PagingStoreFactoryNIO storeFactory = new PagingStoreFactoryNIO(storageManager, dataLocation, 100, null, new OrderedExecutorFactory(threadPool), true, null); - PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory, addressSettings, -1); + PagingManagerImpl managerImpl = new PagingManagerImpl(storeFactory, addressSettings, -1, ActiveMQDefaultConfiguration.getDefaultManagementAddress()); managerImpl.start(); diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/DestinationProxy.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/DestinationProxy.java index 3f6c2524d6..a9acce4067 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/DestinationProxy.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/DestinationProxy.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.broker.ConnectionContext; @@ -153,7 +154,11 @@ public class DestinationProxy implements Destination { @Override public long getUsage() { try { - return server.getPagingManager().getPageStore(view.getAddress()).getAddressSize(); + final PagingStore pageStore = server.getPagingManager().getPageStore(view.getAddress()); + if (pageStore == null) { + return 0; + } + return pageStore.getAddressSize(); } catch (Exception e) { throw new RuntimeException(e); } @@ -221,9 +226,13 @@ public class DestinationProxy implements Destination { @Override public int getPercentUsage() { - long total = 0; + final long total; try { - total = server.getPagingManager().getPageStore(view.getAddress()).getMaxSize(); + final PagingStore pageStore = server.getPagingManager().getPageStore(view.getAddress()); + if (pageStore == null) { + return 0; + } + total = pageStore.getMaxSize(); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java index 4c7753284f..d4cbdd3331 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/GlobalPagingTest.java @@ -19,15 +19,23 @@ package org.apache.activemq.artemis.tests.integration.paging; import java.nio.ByteBuffer; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientRequestor; import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.StoreConfiguration; +import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.Queue; @@ -192,4 +200,91 @@ public class GlobalPagingTest extends PagingTest { session.commit(); } + @Test + public void testManagementAddressCannotPageOrChangeGlobalSize() throws Exception { + clearDataRecreateServerDirs(); + + Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); + + final ActiveMQServer server = createServer(true, config, PagingTest.PAGE_SIZE, -1); + + try { + final SimpleString managementAddress = server.getConfiguration().getManagementAddress(); + server.getConfiguration().setGlobalMaxSize(1); + server.start(); + + final ServerLocator locator = createInVMNonHALocator() + .setBlockOnNonDurableSend(true) + .setBlockOnDurableSend(true) + .setBlockOnAcknowledge(true); + + try (ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(false, true, true)) { + + session.start(); + + if (server.locateQueue(managementAddress) == null) { + + session.createQueue(managementAddress, managementAddress, null, true); + } + + final Queue managementQueue = server.locateQueue(managementAddress); + + Assert.assertNull(managementQueue.getPageSubscription()); + + Assert.assertNull(server.getPagingManager().getPageStore(managementAddress)); + + final SimpleString address = SimpleString.toSimpleString("queue"); + + if (server.locateQueue(address) == null) { + + session.createQueue(address, address, null, true); + } + + final CountDownLatch startSendMessages = new CountDownLatch(1); + + final PagingManager pagingManager = server.getPagingManager(); + + final long globalSize = pagingManager.getGlobalSize(); + + final Thread globalSizeChecker = new Thread(() -> { + startSendMessages.countDown(); + while (!Thread.currentThread().isInterrupted()) { + Assert.assertEquals(globalSize, pagingManager.getGlobalSize()); + } + }); + + globalSizeChecker.start(); + + try (ClientRequestor requestor = new ClientRequestor(session, managementAddress)) { + + ClientMessage message = session.createMessage(false); + + ManagementHelper.putAttribute(message, "queue." + address.toString(), "messageCount"); + + Assert.assertTrue("bodySize = " + message.getBodySize() + " must be > of globalMaxSize = " + server.getConfiguration().getGlobalMaxSize(), message.getBodySize() > server.getConfiguration().getGlobalMaxSize()); + + startSendMessages.await(); + + for (int i = 0; i < 100; i++) { + try { + ClientMessage reply = requestor.request(message); + Assert.assertEquals(0L, ManagementHelper.getResult(reply)); + } catch (ActiveMQAddressFullException e) { + Assert.fail(e.getMessage()); + return; + } + } + + } finally { + globalSizeChecker.interrupt(); + } + } + + } finally { + server.stop(true); + } + } + } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java index 07c02c3e56..c27ea686b9 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java @@ -598,7 +598,7 @@ public final class ReplicationTest extends ActiveMQTestBase { final ExecutorFactory executorFactory, final HierarchicalRepository addressSettingsRepository) throws Exception { - PagingManager paging = new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), 1000, null, executorFactory, false, null), addressSettingsRepository); + PagingManager paging = new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), 1000, null, executorFactory, false, null), addressSettingsRepository, configuration.getManagementAddress()); paging.start(); return paging; diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java index 830e61f263..40202e5ce3 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/BindingsImplTest.java @@ -70,7 +70,7 @@ public class BindingsImplTest extends ActiveMQTestBase { private void internalTest(final boolean route) throws Exception { final FakeBinding fake = new FakeBinding(new SimpleString("a")); - final Bindings bind = new BindingsImpl(null, null, null); + final Bindings bind = new BindingsImpl(null, null); bind.addBinding(fake); bind.addBinding(new FakeBinding(new SimpleString("a"))); bind.addBinding(new FakeBinding(new SimpleString("a")));