From 158157260ce9f496e2814c2dd6f5e19aaefbd762 Mon Sep 17 00:00:00 2001 From: gtully Date: Thu, 25 Nov 2021 15:29:40 +0000 Subject: [PATCH] ARTEMIS-2097 - via elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569 scenario - avoid paging, if address is full chain another broker and produce to the head, consume from the tail using producer and consumer roles to partition connections. When tail is drained, drop it. - adds a option to treat an idle consumer as slow - adds basic support for credit based address blocking ARTEMIS-2097 - adds some more visiblity to address memory usage and balancer attribute modifier operations --- .../activemq/artemis/logs/AuditLogger.java | 23 + .../api/core/management/AddressControl.java | 25 +- .../management/BrokerBalancerControl.java | 12 + .../management/impl/AddressControlImpl.java | 60 ++ .../impl/BrokerBalancerControlImpl.java | 20 + .../artemis/core/paging/PagingManager.java | 3 + .../artemis/core/paging/PagingStore.java | 6 + .../core/paging/impl/PagingManagerImpl.java | 3 +- .../core/paging/impl/PagingStoreImpl.java | 58 +- .../core/server/ActiveMQServerLogger.java | 9 + .../core/server/balancing/BrokerBalancer.java | 14 +- .../balancing/targets/TargetKeyResolver.java | 13 +- .../server/embedded/EmbeddedActiveMQ.java | 4 + .../artemis/core/server/impl/QueueImpl.java | 2 +- docs/user-manual/en/address-model.md | 4 +- docs/user-manual/en/management.md | 9 +- docs/user-manual/en/slow-consumers.md | 4 +- .../integration/amqp/AmqpFlowControlTest.java | 84 ++- .../amqp/JMSClientTestSupport.java | 2 +- .../balancing/ElasticQueueTest.java | 700 ++++++++++++++++++ .../client/MessageCounterTest.java | 3 +- .../integration/client/SlowConsumerTest.java | 35 + .../management/AddressControlTest.java | 2 + .../AddressControlUsingCoreTest.java | 15 + .../management/BrokerBalancerControlTest.java | 25 + .../storage/PersistMultiThreadTest.java | 13 + .../core/paging/impl/PagingStoreImplTest.java | 211 ++++++ .../tests/unit/util/FakePagingManager.java | 2 +- 28 files changed, 1333 insertions(+), 28 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/ElasticQueueTest.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java index f249c751c3..5878295e27 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java @@ -2886,5 +2886,28 @@ public interface AuditLogger extends BasicLogger { @Message(id = 601752, value = "User {0} failed to purge address {1}", format = Message.Format.MESSAGE_FORMAT) void purgeAddressFailure(String user, String queueName); + static void getAddressLimitPercent(Object source) { + BASE_LOGGER.getAddressLimitPercent(getCaller(), source); + } + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 601753, value = "User {0} is getting address limit % on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT) + void getAddressLimitPercent(String user, Object source, Object... args); + + static void block(Object source) { + BASE_LOGGER.block(getCaller(), source); + } + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 601754, value = "User {0} is blocking target resource: {1}", format = Message.Format.MESSAGE_FORMAT) + void block(String user, Object source); + + static void unBlock(Object source) { + BASE_LOGGER.unBlock(getCaller(), source); + } + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 601755, value = "User {0} is unblocking target resource: {1}", format = Message.Format.MESSAGE_FORMAT) + void unBlock(String user, Object source); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java index beef4c30af..a78c1dd062 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/AddressControl.java @@ -105,6 +105,25 @@ public interface AddressControl { @Attribute(desc = "whether this address is paging") boolean isPaging() throws Exception; + /** + * Returns the % of memory limit that is currently in use + * + * @throws Exception + */ + @Attribute(desc = "the % of memory limit (global or local) that is in use by this address") + int getAddressLimitPercent() throws Exception; + + /** + * Blocks message production to this address by limiting credit + * @return true if production is blocked + * @throws Exception + */ + @Operation(desc = "Stops message production to this address, typically with flow control.", impact = MBeanOperationInfo.ACTION) + boolean block() throws Exception; + + @Operation(desc = "Resumes message production to this address, if previously blocked.", impact = MBeanOperationInfo.ACTION) + void unblock() throws Exception; + /** * Returns the number of bytes used by each page for this address. */ @@ -171,7 +190,7 @@ public interface AddressControl { /** - * Pauses all the queues bound to this address.Messages are no longer delivered to all its bounded queues. + * Pauses all the queues bound to this address. Messages are no longer delivered to all its bounded queues. * Newly added queue will be paused too until resume is called. * @throws java.lang.Exception */ @@ -179,7 +198,7 @@ public interface AddressControl { void pause() throws Exception; /** - * Pauses all the queues bound to this address.Messages are no longer delivered to all its bounded queues.Newly added queue will be paused too until resume is called. + * Pauses all the queues bound to this address. Messages are no longer delivered to all its bounded queues. Newly added queue will be paused too until resume is called. * @param persist if true, the pause state will be persisted. * @throws java.lang.Exception */ @@ -187,7 +206,7 @@ public interface AddressControl { void pause(@Parameter(name = "persist", desc = "if true, the pause state will be persisted.") boolean persist) throws Exception; /** - * Resume all the queues bound of this address.Messages are delivered again to all its bounded queues. + * Resume all the queues bound of this address. Messages are delivered again to all its bounded queues. * @throws java.lang.Exception */ @Operation(desc = "Resumes the queues bound to this address", impact = MBeanOperationInfo.ACTION) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BrokerBalancerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BrokerBalancerControl.java index 983888f0be..f781bc1ea3 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BrokerBalancerControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BrokerBalancerControl.java @@ -28,4 +28,16 @@ public interface BrokerBalancerControl { @Operation(desc = "Get the target associated with key as JSON", impact = MBeanOperationInfo.INFO) String getTargetAsJSON(@Parameter(desc = "a key", name = "key") String key); + + @Operation(desc = "Set the local target filter regular expression", impact = MBeanOperationInfo.ACTION) + void setLocalTargetFilter(@Parameter(desc = "the regular expression", name = "regExp") String regExp); + + @Operation(desc = "Get the local target filter regular expression", impact = MBeanOperationInfo.INFO) + String getLocalTargetFilter(); + + @Operation(desc = "Set the target key filter regular expression", impact = MBeanOperationInfo.ACTION) + void setTargetKeyFilter(@Parameter(desc = "the regular expression", name = "regExp") String regExp); + + @Operation(desc = "Get the target key filter regular expression", impact = MBeanOperationInfo.INFO) + String getTargetKeyFilter(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java index ac45a3059b..d499e270f3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java @@ -321,6 +321,66 @@ public class AddressControlImpl extends AbstractControl implements AddressContro } } + @Override + public int getAddressLimitPercent() throws Exception { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.getAddressLimitPercent(this.addressInfo); + } + clearIO(); + try { + final PagingStore pagingStore = getPagingStore(); + if (pagingStore == null) { + return 0; + } + return pagingStore.getAddressLimitPercent(); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.debug("Failed to get address limit %", e); + return -1; + } finally { + blockOnIO(); + } + } + + @Override + public boolean block() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.block(this.addressInfo); + } + clearIO(); + boolean result = false; + try { + final PagingStore pagingStore = getPagingStore(); + if (pagingStore != null) { + pagingStore.block(); + result = true; + } + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.debug("Failed to block", e); + + } finally { + blockOnIO(); + } + return result; + } + + @Override + public void unblock() { + if (AuditLogger.isBaseLoggingEnabled()) { + AuditLogger.unBlock(this.addressInfo); + } + clearIO(); + try { + final PagingStore pagingStore = getPagingStore(); + if (pagingStore != null) { + pagingStore.unblock(); + } + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.debug("Failed to unblock", e); + } finally { + blockOnIO(); + } + } + @Override public int getNumberOfPages() { if (AuditLogger.isBaseLoggingEnabled()) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerBalancerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerBalancerControlImpl.java index eb637ec8ff..857f20a53b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerBalancerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BrokerBalancerControlImpl.java @@ -107,6 +107,26 @@ public class BrokerBalancerControlImpl extends AbstractControl implements Broker return null; } + @Override + public void setLocalTargetFilter(String regExp) { + balancer.setLocalTargetFilter(regExp); + } + + @Override + public String getLocalTargetFilter() { + return balancer.getLocalTargetFilter(); + } + + @Override + public void setTargetKeyFilter(String regExp) { + balancer.getTargetKeyResolver().setKeyFilter(regExp); + } + + @Override + public String getTargetKeyFilter() { + return balancer.getTargetKeyResolver().getKeyFilter(); + } + @Override protected MBeanOperationInfo[] fillMBeanOperationInfo() { return MBeanInfoHelper.getMBeanOperationsInfo(BrokerBalancerControl.class); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java index 9586fa502a..7b1f2d2924 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingManager.java @@ -130,4 +130,7 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository checkMemory(runWhenAvailable); } + default long getMaxSize() { + return 0; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java index 275805ec2b..ddc12e6733 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java @@ -193,4 +193,10 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener void enableCleanup(); void destroy() throws Exception; + + int getAddressLimitPercent(); + + void block(); + + void unblock(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java index d8c90bcc0f..579d10b295 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java @@ -116,6 +116,7 @@ public final class PagingManagerImpl implements PagingManager { this.managementAddress = managementAddress; } + @Override public long getMaxSize() { return maxSize; } @@ -276,7 +277,7 @@ public final class PagingManagerImpl implements PagingManager { @Override public boolean isGlobalFull() { - return diskFull || maxSize > 0 && globalSizeBytes.get() > maxSize; + return diskFull || maxSize > 0 && globalSizeBytes.get() >= maxSize; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index 8cd3ed3d99..89eadcfda8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -126,6 +126,8 @@ public class PagingStoreImpl implements PagingStore { private volatile boolean blocking = false; + private volatile boolean blockedViaAddressControl = false; + private long rejectThreshold; public PagingStoreImpl(final SimpleString address, @@ -695,6 +697,13 @@ public class PagingStoreImpl implements PagingStore { @Override public boolean checkMemory(boolean runOnFailure, final Runnable runWhenAvailable) { + if (blockedViaAddressControl) { + if (runWhenAvailable != null) { + onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable)); + } + return false; + } + if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL && (maxSize != -1 || usingGlobalMaxSize || pagingManager.isDiskFull())) { if (isFull()) { if (runOnFailure && runWhenAvailable != null) { @@ -703,7 +712,7 @@ public class PagingStoreImpl implements PagingStore { return false; } } else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && (maxSize != -1 || usingGlobalMaxSize)) { - if (pagingManager.isDiskFull() || maxSize > 0 && sizeInBytes.get() > maxSize || pagingManager.isGlobalFull()) { + if (pagingManager.isDiskFull() || maxSize > 0 && sizeInBytes.get() >= maxSize || pagingManager.isGlobalFull()) { onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable)); @@ -711,7 +720,7 @@ public class PagingStoreImpl implements PagingStore { // has been added, but the check to execute was done before the element was added // NOTE! We do not fix this race by locking the whole thing, doing this check provides // MUCH better performance in a highly concurrent environment - if (!pagingManager.isGlobalFull() && (sizeInBytes.get() <= maxSize || maxSize < 0)) { + if (!pagingManager.isGlobalFull() && (sizeInBytes.get() < maxSize || maxSize < 0)) { // run it now runWhenAvailable.run(); } else { @@ -770,8 +779,8 @@ public class PagingStoreImpl implements PagingStore { return checkReleaseMemory(pagingManager.isGlobalFull(), sizeInBytes.get()); } - public boolean checkReleaseMemory(boolean globalOversized, long newSize) { - if (!globalOversized && (newSize <= maxSize || maxSize < 0)) { + public boolean checkReleaseMemory(boolean globalFull, long newSize) { + if (!blockedViaAddressControl && !globalFull && (newSize < maxSize || maxSize < 0)) { if (!onMemoryFreedRunnables.isEmpty()) { executor.execute(this::memoryReleased); if (blocking) { @@ -799,12 +808,6 @@ public class PagingStoreImpl implements PagingStore { if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP || addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) { if (full) { - if (!printedDropMessagesWarning) { - printedDropMessagesWarning = true; - - ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, sizeInBytes.get(), maxSize, pagingManager.getGlobalSize()); - } - if (message.isLargeMessage()) { ((LargeServerMessage) message).deleteFile(); } @@ -814,6 +817,10 @@ public class PagingStoreImpl implements PagingStore { } // Address is full, we just pretend we are paging, and drop the data + if (!printedDropMessagesWarning) { + printedDropMessagesWarning = true; + ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, sizeInBytes.get(), maxSize, pagingManager.getGlobalSize()); + } return true; } else { return false; @@ -1150,7 +1157,36 @@ public class PagingStoreImpl implements PagingStore { // To be used on isDropMessagesWhenFull @Override public boolean isFull() { - return maxSize > 0 && getAddressSize() > maxSize || pagingManager.isGlobalFull(); + return maxSize > 0 && getAddressSize() >= maxSize || pagingManager.isGlobalFull(); + } + + + @Override + public int getAddressLimitPercent() { + final long currentUsage = getAddressSize(); + if (maxSize > 0) { + return (int) (currentUsage * 100 / maxSize); + } else if (pagingManager.isUsingGlobalSize()) { + return (int) (currentUsage * 100 / pagingManager.getMaxSize()); + } + return 0; + } + + @Override + public void block() { + if (!blockedViaAddressControl) { + ActiveMQServerLogger.LOGGER.blockingViaControl(address); + } + blockedViaAddressControl = true; + } + + @Override + public void unblock() { + if (blockedViaAddressControl) { + ActiveMQServerLogger.LOGGER.unblockingViaControl(address); + } + blockedViaAddressControl = false; + checkReleasedMemory(); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 657cb66dcf..6dd51b3638 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -2207,4 +2207,13 @@ public interface ActiveMQServerLogger extends BasicLogger { @LogMessage(level = Logger.Level.INFO) @Message(id = 224113, value = "Auto removing Address {0}", format = Message.Format.MESSAGE_FORMAT) void autoRemoveAddress(String name); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 224114, value = "Address control block, blocking message production on address ''{0}''. Clients will not get further credit.", format = Message.Format.MESSAGE_FORMAT) + void blockingViaControl(SimpleString addressName); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 224115, value = "Address control unblock of address ''{0}''. Clients will be granted credit as normal.", format = Message.Format.MESSAGE_FORMAT) + void unblockingViaControl(SimpleString addressName); + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java index a39c73844a..ae43e11cc1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java @@ -49,7 +49,7 @@ public class BrokerBalancer implements ActiveMQComponent { private final TargetResult localTarget; - private final Pattern localTargetFilter; + private volatile Pattern localTargetFilter; private final Pool pool; @@ -214,6 +214,18 @@ public class BrokerBalancer implements ActiveMQComponent { return result != null ? result : TargetResult.REFUSED_UNAVAILABLE_RESULT; } + public void setLocalTargetFilter(String regExp) { + if (regExp == null || regExp.trim().isEmpty()) { + this.localTargetFilter = null; + } else { + this.localTargetFilter = Pattern.compile(regExp); + } + } + + public TargetKeyResolver getTargetKeyResolver() { + return targetKeyResolver; + } + private String transform(String key) { String result = key; if (transformer != null) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKeyResolver.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKeyResolver.java index 6a409d03ec..28b8e49957 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKeyResolver.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKeyResolver.java @@ -38,7 +38,7 @@ public class TargetKeyResolver { private final TargetKey key; - private final Pattern keyFilter; + private volatile Pattern keyFilter; public TargetKey getKey() { @@ -51,8 +51,7 @@ public class TargetKeyResolver { public TargetKeyResolver(TargetKey key, String keyFilter) { this.key = key; - - this.keyFilter = keyFilter != null ? Pattern.compile(keyFilter) : null; + setKeyFilter(keyFilter); } public String resolve(Connection connection, String clientID, String username) { @@ -137,4 +136,12 @@ public class TargetKeyResolver { return keyValue; } + + public void setKeyFilter(String regExp) { + if (regExp == null || regExp.isBlank()) { + this.keyFilter = null; + } else { + this.keyFilter = Pattern.compile(regExp); + } + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/embedded/EmbeddedActiveMQ.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/embedded/EmbeddedActiveMQ.java index 8a6512608e..fe0ed5e455 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/embedded/EmbeddedActiveMQ.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/embedded/EmbeddedActiveMQ.java @@ -107,6 +107,10 @@ public class EmbeddedActiveMQ { return this; } + public Configuration getConfiguration() { + return configuration; + } + public ActiveMQServer getActiveMQServer() { return activeMQServer; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index d3da7e9e6b..fccf078e2c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -4550,7 +4550,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (consumer instanceof ServerConsumerImpl) { ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer; float consumerRate = serverConsumer.getRate(); - if (consumerRate < thresholdInMsgPerSecond) { + if (consumerRate < thresholdInMsgPerSecond || (consumerRate == 0 && thresholdInMsgPerSecond == 0)) { RemotingConnection connection = null; ActiveMQServer server = ((PostOfficeImpl) postOffice).getServer(); RemotingService remotingService = server.getRemotingService(); diff --git a/docs/user-manual/en/address-model.md b/docs/user-manual/en/address-model.md index 740086c590..ecdc8167d4 100644 --- a/docs/user-manual/en/address-model.md +++ b/docs/user-manual/en/address-model.md @@ -833,7 +833,9 @@ address, if it exists. `slow-consumer-threshold`. The minimum rate of message consumption allowed before a consumer is considered "slow." Measured in units specified by the slow-consumer-threshold-measurement-unit configuration option. Default is `-1` - (i.e. disabled); any other valid value must be greater than 0. + (i.e. disabled); any other value must be greater than 0 to ensure a queue +has messages, and it is the actual consumer that is slow. A value of 0 will +allow a consumer with no messages pending to be considered slow. Read more about [slow consumers](slow-consumers.md). `slow-consumer-threshold-measurement-unit`. The units used to measure the diff --git a/docs/user-manual/en/management.md b/docs/user-manual/en/management.md index 49a4c33312..f725e039b5 100644 --- a/docs/user-manual/en/management.md +++ b/docs/user-manual/en/management.md @@ -127,13 +127,20 @@ Individual addresses can be managed using the `AddressControl` interface. `removeRole()` methods. You can list all the roles associated to the queue with the `getRoles()` method -- Pausing and resuming Address +- Pausing and resuming an Address The `AddressControl` can pause and resume an address and all the queues that are bound to it. Newly added queue will be paused too until the address is resumed. Thus all messages sent to the address will be received but not delivered. When it is resumed, delivering will occur again. +- Blocking and un blocking an Address + + The `AddressControl` can block and unblock an address. A blocked address will not issue + any more credit to existing producers. New producers will not be granted any credit. + When the address is unblocked, credit granting will resume. In this way, it is possible + to drain all the queues associated with an address to quiesce a broker in a managed way. + ### Queue Management The bulk of the management API deals with queues. The `QueueControl` interface diff --git a/docs/user-manual/en/slow-consumers.md b/docs/user-manual/en/slow-consumers.md index fc85d38a00..80ce65f869 100644 --- a/docs/user-manual/en/slow-consumers.md +++ b/docs/user-manual/en/slow-consumers.md @@ -7,8 +7,8 @@ If messages build up in the consumer's server-side queue then memory will begin filling up and the broker may enter paging mode which would impact performance negatively. However, criteria can be set so that consumers which don't acknowledge messages quickly enough can -potentially be disconnected from the broker which in the case of a -non-durable JMS subscriber would allow the broker to remove the +potentially be disconnected from the broker, which in the case of a +non-durable JMS subscriber, would allow the broker to remove the subscription and all of its messages freeing up valuable server resources. diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java index 714c113d97..1d911508f7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlTest.java @@ -24,14 +24,18 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; import javax.jms.Destination; +import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.ResourceAllocationException; import javax.jms.Session; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.AddressControl; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper; +import org.apache.activemq.artemis.utils.Wait; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; @@ -85,6 +89,39 @@ public class AmqpFlowControlTest extends JMSClientTestSupport { } } + @Test(timeout = 60000) + public void testCreditIsNotGivenOnLinkCreationWhileBlockedAndIsGivenOnceThenUnblocked() throws Exception { + AmqpClient client = createAmqpClient(new URI(singleCreditAcceptorURI)); + AmqpConnection connection = addConnection(client.connect()); + + try { + AddressControl addressControl = ManagementControlHelper.createAddressControl(SimpleString.toSimpleString(getQueueName()), mBeanServer); + addressControl.block(); + AmqpSession session = connection.createSession(); + final AmqpSender sender = session.createSender(getQueueName()); + assertTrue("Should get 0 credit", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + return 0 == sender.getSender().getCredit(); + } + }, 5000, 20)); + + addressControl.unblock(); + assertTrue("Should now get issued one credit", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + return 1 == sender.getSender().getCredit(); + } + }, 5000, 20)); + sender.close(); + + AmqpSender sender2 = session.createSender(getQueueName()); + assertEquals("Should only be issued one credit", 1, sender2.getSender().getCredit()); + } finally { + connection.close(); + } + } + @Test(timeout = 60000) public void testCreditsAreNotAllocatedWhenAddressIsFull() throws Exception { AmqpClient client = createAmqpClient(new URI(singleCreditAcceptorURI)); @@ -109,7 +146,7 @@ public class AmqpFlowControlTest extends JMSClientTestSupport { } @Test(timeout = 60000) - public void testAddressIsBlockedForOtherProdudersWhenFull() throws Exception { + public void testAddressIsBlockedForOtherProducersWhenFull() throws Exception { Connection connection = createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination d = session.createQueue(getQueueName()); @@ -130,6 +167,51 @@ public class AmqpFlowControlTest extends JMSClientTestSupport { assertTrue(addressSize >= MAX_SIZE_BYTES_REJECT_THRESHOLD); } + @Test(timeout = 60000) + public void testSendBlocksWhenAddressBlockedAndCompletesAfterUnblocked() throws Exception { + Connection connection = createConnection(new URI(singleCreditAcceptorURI.replace("tcp", "amqp")), null, null, null, true); + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination d = session.createQueue(getQueueName()); + final MessageProducer p = session.createProducer(d); + + final CountDownLatch running = new CountDownLatch(1); + final CountDownLatch done = new CountDownLatch(1); + + AddressControl addressControl = ManagementControlHelper.createAddressControl(SimpleString.toSimpleString(getQueueName()), mBeanServer); + + assertTrue("blocked ok", addressControl.block()); + + // one credit + p.send(session.createBytesMessage()); + + // this send will block, no credit + new Thread(new Runnable() { + @Override + public void run() { + try { + running.countDown(); + p.send(session.createBytesMessage()); + } catch (JMSException ignored) { + } finally { + done.countDown(); + } + } + }).start(); + + assertTrue(running.await(5, TimeUnit.SECONDS)); + + assertFalse(done.await(200, TimeUnit.MILLISECONDS)); + + addressControl.unblock(); + + assertTrue(done.await(5, TimeUnit.SECONDS)); + + // good to go again + p.send(session.createBytesMessage()); + + assertEquals(3, addressControl.getMessageCount()); + } + @Test(timeout = 60000) public void testCreditsAreRefreshedWhenAddressIsUnblocked() throws Exception { fillAddress(getQueueName()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java index b3d1393928..0643f6a6c6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java @@ -154,7 +154,7 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport { return createConnection(getBrokerQpidJMSConnectionURI(), username, password, clientId, start); } - private Connection createConnection(URI remoteURI, String username, String password, String clientId, boolean start) throws JMSException { + protected Connection createConnection(URI remoteURI, String username, String password, String clientId, boolean start) throws JMSException { JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI); Connection connection = trackJMSConnection(factory.createConnection(username, password)); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/ElasticQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/ElasticQueueTest.java new file mode 100644 index 0000000000..e70c4e8057 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/ElasticQueueTest.java @@ -0,0 +1,700 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.balancing; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.management.MBeanServer; +import javax.management.MBeanServerFactory; +import javax.security.auth.Subject; +import java.io.File; +import java.net.URI; +import java.util.Set; +import java.util.Stack; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.management.AddressControl; +import org.apache.activemq.artemis.api.core.management.BrokerBalancerControl; +import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; +import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration; +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.security.CheckType; +import org.apache.activemq.artemis.core.security.Role; +import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey; +import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager5; +import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal; +import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal; +import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.Wait; +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.JmsConnectionListener; +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; +import org.junit.After; +import org.junit.Test; + +public class ElasticQueueTest extends ActiveMQTestBase { + + static final String qName = "EQ"; + static final SimpleString qNameSimple = SimpleString.toSimpleString(qName); + + final int base_port = 61616; + final Stack nodes = new Stack<>(); + private final String balancerConfigName = "role_name_sharder"; + + private final ExecutorService executorService = Executors.newFixedThreadPool(3); + @After + public void cleanup() { + for (EmbeddedActiveMQ activeMQ : nodes) { + try { + activeMQ.stop(); + } catch (Throwable ignored) { + } + } + nodes.clear(); + executorService.shutdownNow(); + } + + String urlForNodes(Stack nodes) { + StringBuilder builder = new StringBuilder("failover:("); + int port_start = base_port; + for (EmbeddedActiveMQ ignored : nodes) { + if (port_start != base_port) { + builder.append(","); + } + builder.append("amqp://localhost:").append(port_start++); + } + // fast reconnect, randomize to get to all brokers and timeout sends that block on no credit + builder.append(")?failover.randomize=true&failover.maxReconnectAttempts=1&jms.sendTimeout=" + 1000); + return builder.toString(); + } + + // allow tracking of failover reconnects + static class ConnectionListener implements JmsConnectionListener { + + AtomicInteger connectionCount; + + ConnectionListener(AtomicInteger connectionCount) { + this.connectionCount = connectionCount; + } + + @Override + public void onConnectionEstablished(URI uri) { + } + + @Override + public void onConnectionFailure(Throwable throwable) { + } + + @Override + public void onConnectionInterrupted(URI uri) { + } + + @Override + public void onConnectionRestored(URI uri) { + connectionCount.incrementAndGet(); + } + + @Override + public void onInboundMessage(JmsInboundMessageDispatch jmsInboundMessageDispatch) { + } + + @Override + public void onSessionClosed(Session session, Throwable throwable) { + } + + @Override + public void onConsumerClosed(MessageConsumer messageConsumer, Throwable throwable) { + } + + @Override + public void onProducerClosed(MessageProducer messageProducer, Throwable throwable) { + } + } + + // slow consumer + class EQConsumer implements Runnable { + + final AtomicInteger consumedCount = new AtomicInteger(); + final AtomicInteger connectionCount = new AtomicInteger(); + final AtomicBoolean done = new AtomicBoolean(); + final AtomicInteger delayMillis; + private final String url; + long lastConsumed = 0; + + EQConsumer(String url) { + this(url, 500); + } + + EQConsumer(String url, int delay) { + this.url = url; + this.delayMillis = new AtomicInteger(delay); + } + + @Override + public void run() { + + try { + while (!done.get()) { + JmsConnectionFactory factory = new JmsConnectionFactory("CONSUMER", "PASSWORD", url); + + try (JmsConnection connection = (JmsConnection) factory.createConnection()) { + + // track disconnects via faiover listener + connectionCount.incrementAndGet(); + connection.addConnectionListener(new ConnectionListener(connectionCount)); + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer messageConsumer = session.createConsumer(session.createQueue(qName)); + + while (!done.get()) { + Message receivedMessage = messageConsumer.receiveNoWait(); + if (receivedMessage != null) { + consumedCount.incrementAndGet(); + lastConsumed = receivedMessage.getLongProperty("PID"); + receivedMessage.acknowledge(); + } + TimeUnit.MILLISECONDS.sleep(delayMillis.get()); + } + } catch (JMSException okTryAgainWithNewConnection) { + } + } + } catch (Exception outOfHere) { + outOfHere.printStackTrace(); + } + } + + public long getLastConsumed() { + return lastConsumed; + } + } + + // regular producer + static class EQProducer implements Runnable { + + final AtomicInteger producedCount = new AtomicInteger(); + final AtomicInteger connectionCount = new AtomicInteger(); + final AtomicBoolean done = new AtomicBoolean(); + private final String url; + + EQProducer(String url) { + this.url = url; + } + + @Override + public void run() { + + URI connectedToUri = null; + while (!done.get()) { + JmsConnectionFactory factory = new JmsConnectionFactory("PRODUCER", "PASSWORD", url); + + try (JmsConnection connection = (JmsConnection) factory.createConnection()) { + + // track disconnects via faiover listener + connectionCount.incrementAndGet(); + connection.addConnectionListener(new ConnectionListener(connectionCount)); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer messageProducer = session.createProducer(session.createQueue(qName)); + + + BytesMessage message = session.createBytesMessage(); + message.writeBytes(new byte[1024]); + while (!done.get()) { + connectedToUri = connection.getConnectedURI(); + message.setLongProperty("PID", producedCount.get() + 1); + messageProducer.send(message); + producedCount.incrementAndGet(); + } + } catch (JMSException expected) { + System.out.println("expected send failure: " + expected.toString() + " PID: " + producedCount.get() + ", uri: " + connectedToUri); + } + } + } + + public long getLastProduced() { + return producedCount.get(); + } + } + + // combined producer/ async consumer + static class EQProducerAsyncConsumer implements Runnable { + + final AtomicInteger producedCount = new AtomicInteger(); + final AtomicInteger connectionCount = new AtomicInteger(); + final AtomicBoolean done = new AtomicBoolean(); + final AtomicBoolean producerDone = new AtomicBoolean(); + final AtomicInteger consumerSleepMillis = new AtomicInteger(1000); + private final String url; + final AtomicInteger consumedCount = new AtomicInteger(); + private final String user; + private long lastConsumed; + + EQProducerAsyncConsumer(String url, String user) { + this.url = url; + this.user = user; + } + + @Override + public void run() { + + while (!done.get()) { + JmsConnectionFactory factory = new JmsConnectionFactory(user, "PASSWORD", url); + + try (JmsConnection connection = (JmsConnection) factory.createConnection()) { + + // track disconnects via faiover listener + connectionCount.incrementAndGet(); + connection.addConnectionListener(new ConnectionListener(connectionCount)); + connection.start(); + + Session clientSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer messageConsumer = clientSession.createConsumer(clientSession.createQueue(qName)); + // consume async + messageConsumer.setMessageListener(message -> { + consumedCount.incrementAndGet(); + try { + lastConsumed = message.getLongProperty("PID"); + if (!producerDone.get()) { + TimeUnit.MILLISECONDS.sleep(consumerSleepMillis.get()); + } + message.acknowledge(); + } catch (JMSException | InterruptedException ignored) { + } + }); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer messageProducer = session.createProducer(session.createQueue(qName)); + BytesMessage message = session.createBytesMessage(); + message.writeBytes(new byte[1024]); + while (!done.get()) { + if (!producerDone.get()) { + message.setLongProperty("PID", producedCount.get() + 1); + messageProducer.send(message); + producedCount.incrementAndGet(); + } else { + // just hang about and let the consumer listener work + TimeUnit.SECONDS.sleep(5); + } + } + } catch (JMSException | InterruptedException ignored) { + } + } + } + + public long getLastProduced() { + return producedCount.get(); + } + + public long getLastConsumed() { + return lastConsumed; + } + } + + MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer(); + + // hardwire authenticaton to map USER to EQ_USER etc + final ActiveMQSecurityManager5 customSecurityManager = new ActiveMQSecurityManager5() { + @Override + public Subject authenticate(String user, + String password, + RemotingConnection remotingConnection, + String securityDomain) { + Subject subject = null; + if (validateUser(user, password)) { + subject = new Subject(); + subject.getPrincipals().add(new UserPrincipal(user)); + subject.getPrincipals().add(new RolePrincipal("EQ_" + user)); + if (user.equals("BOTH")) { + subject.getPrincipals().add(new RolePrincipal("EQ_PRODUCER")); + subject.getPrincipals().add(new RolePrincipal("EQ_CONSUMER")); + } + } + return subject; + } + + @Override + public boolean authorize(Subject subject, Set roles, CheckType checkType, String address) { + return true; + } + + @Override + public boolean validateUser(final String username, final String password) { + return (username.equals("CONSUMER") || username.equals("PRODUCER") || username.equals("BOTH")); + } + + @Override + public boolean validateUserAndRole(final String username, + final String password, + final Set requiredRoles, + final CheckType checkType) { + return username.equals("CONSUMER") || username.equals("PRODUCER") || username.equals("BOTH"); + } + }; + + final ObjectNameBuilder node0NameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "Node0", true); + final ObjectNameBuilder node1NameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "Node1", true); + + + /* + use case is dispatch from memory, with non-blocking producers + producers add to the head of the broker chain, consumers receive from the tail + when head == tail we are back to one broker for that address, the end of the chain + */ + private void prepareNodesAndStartCombinedHeadTail() throws Exception { + AddressSettings blockingQueue = new AddressSettings(); + blockingQueue + .setMaxSizeBytes(100 * 1024) + .setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL) + .setSlowConsumerPolicy(SlowConsumerPolicy.KILL).setSlowConsumerThreshold(0).setSlowConsumerCheckPeriod(1) + .setAutoDeleteQueues(false).setAutoDeleteAddresses(false); // so slow consumer can kick in! + + Configuration baseConfig = new ConfigurationImpl(); + baseConfig.getAddressesSettings().put(qName, blockingQueue); + + BrokerBalancerConfiguration balancerConfiguration = new BrokerBalancerConfiguration(); + balancerConfiguration.setName(balancerConfigName).setTargetKey(TargetKey.ROLE_NAME).setTargetKeyFilter("(?<=^EQ_).*"); // strip EQ_ prefix + baseConfig.addBalancerConfiguration(balancerConfiguration); + + // prepare two nodes + for (int nodeId = 0; nodeId < 2; nodeId++) { + Configuration configuration = baseConfig.copy(); + configuration.setName("Node" + nodeId); + configuration.setBrokerInstance(new File(getTestDirfile(), configuration.getName())); + configuration.addAcceptorConfiguration("tcp", "tcp://localhost:" + (base_port + (nodeId)) + "?redirect-to=" + balancerConfigName + ";amqpCredits=1000;amqpMinCredits=300"); + nodes.add(new EmbeddedActiveMQ().setConfiguration(configuration)); + nodes.get(nodeId).setSecurityManager(customSecurityManager); + nodes.get(nodeId).setMbeanServer(mBeanServer); + } + + // node0 initially handles both producer & consumer (head & tail) + nodes.get(0).getConfiguration().getBalancerConfigurations().get(0).setLocalTargetFilter("PRODUCER|CONSUMER"); + nodes.get(0).start(); + } + + @Test (timeout = 60000) + public void testScale0_1() throws Exception { + + prepareNodesAndStartCombinedHeadTail(); + + // slow consumer, delay on each message received + EQConsumer eqConsumer = new EQConsumer(urlForNodes(nodes)); + executorService.submit(eqConsumer); + + // verify consumer reconnects on no messages + assertTrue(Wait.waitFor(() -> eqConsumer.connectionCount.get() > 1, 5000, 200)); + + EQProducer eqProducer = new EQProducer(urlForNodes(nodes)); + executorService.submit(eqProducer); + + // verify producer reconnects on fail full! + assertTrue(Wait.waitFor(() -> eqProducer.connectionCount.get() > 1, 10000, 200)); + + // operator mode, poll queue control - to allow producer to continue, activate next broker in the 'chain' + AddressControl addressControl0 = (AddressControl) ManagementControlHelper.createProxy(node0NameBuilder.getAddressObjectName(qNameSimple), AddressControl.class, mBeanServer); + + assertTrue(Wait.waitFor(() -> { + int usage = addressControl0.getAddressLimitPercent(); + System.out.println("Want 100% on Head&Tail, usage % " + usage); + return usage == 100; + },10000, 500)); + + // stop producer on Node0, only accept consumers + BrokerBalancerControl balancerControl0 = (BrokerBalancerControl) ManagementControlHelper.createProxy(node0NameBuilder.getBrokerBalancerObjectName(balancerConfigName), BrokerBalancerControl.class, mBeanServer); + balancerControl0.setLocalTargetFilter("CONSUMER"); + + // start node1 exclusively for Producer + nodes.get(1).getConfiguration().getBalancerConfigurations().get(0).setLocalTargetFilter("PRODUCER"); + nodes.get(1).start(); + + // auto created address when producer connects + AddressControl addressControl1 = (AddressControl) ManagementControlHelper.createProxy(node1NameBuilder.getAddressObjectName(qNameSimple), AddressControl.class, mBeanServer); + assertTrue("Producer is on Head, Node1", Wait.waitFor(() -> { + try { + int usage = addressControl1.getAddressLimitPercent(); + System.out.println("Node1 (head) usage % " + usage); + return usage > 10; + } catch (javax.management.InstanceNotFoundException notYetReadyExpected) { + } + return false; + },5000, 200)); + + // wait for Node0 to drain + eqConsumer.delayMillis.set(0); // fast + assertTrue(Wait.waitFor(() -> { + int usage = addressControl0.getAddressLimitPercent(); + System.out.println("Want 0, Node0 (tail) usage % " + usage); + return usage == 0; + },20000, 500)); + balancerControl0.setLocalTargetFilter(""); // Node0 is out of service, Node1 (new head&tail) is where it is all at going forward! + + BrokerBalancerControl balancerControl1 = (BrokerBalancerControl) ManagementControlHelper.createProxy(node1NameBuilder.getBrokerBalancerObjectName(balancerConfigName), BrokerBalancerControl.class, mBeanServer); + balancerControl1.setLocalTargetFilter("CONSUMER|PRODUCER"); // Node1 is serving (head & tail) + + // back to one element in the chain + nodes.get(0).stop(); + + eqConsumer.delayMillis.set(500); // slow + + assertTrue("New head&tail, Node1 full", Wait.waitFor(() -> { + int usage = addressControl1.getAddressLimitPercent(); + System.out.println("Node1 usage % " + usage); + return usage == 100; + },10000, 200)); + + // stop the producer + eqProducer.done.set(true); + + eqConsumer.delayMillis.set(0); // fast again + + // wait for Node1 to drain + assertTrue(Wait.waitFor(() -> { + int usage = addressControl1.getAddressLimitPercent(); + System.out.println("Want 0, on producer complete, Node1 usage % " + usage); + return usage == 0; + }, 10000, 200)); + + assertTrue("Got all produced", Wait.waitFor(() -> { + System.out.println("consumed pid: " + eqConsumer.getLastConsumed() + ", produced: " + eqProducer.getLastProduced()); + return (eqProducer.getLastProduced() == eqConsumer.getLastConsumed()); + }, 4000, 100)); + + eqConsumer.done.set(true); + + nodes.get(1).stop(); + } + + // Q: what happens for a producer/consumer connection? + // A: we can limit it to a PRODUCER role, and it can only send, with addressControl.pause() the consumer + // will get nothing to avoid out of order messages, b/c it is connected to the head broker, not the tail! + // Some pure CONSUMER role needs to drain the tail in this case. + @Test (timeout = 60000) + public void testScale0_1_CombinedProducerConsumerConnectionWithProducerRole() throws Exception { + + prepareNodesAndStartCombinedHeadTail(); + + EQProducerAsyncConsumer eqProducerConsumer = new EQProducerAsyncConsumer(urlForNodes(nodes), "PRODUCER"); + executorService.submit(eqProducerConsumer); + + AddressControl addressControl0 = (AddressControl) ManagementControlHelper.createProxy(node0NameBuilder.getAddressObjectName(qNameSimple), AddressControl.class, mBeanServer); + assertTrue(Wait.waitFor(() -> { + try { + int usage = addressControl0.getAddressLimitPercent(); + System.out.println("Head&Tail usage % " + usage); + return usage == 100; + } catch (javax.management.InstanceNotFoundException notYetReadyExpected) { + } + return false; + },10000, 200)); + + assertTrue("producer got full error and reconnected", Wait.waitFor(() -> eqProducerConsumer.connectionCount.get() > 2)); + + long lastProducedToHeadTail = eqProducerConsumer.getLastProduced(); + + // stop producer on Node0, only accept consumers. make it a tail broker + BrokerBalancerControl balancerControl0 = (BrokerBalancerControl) ManagementControlHelper.createProxy(node0NameBuilder.getBrokerBalancerObjectName(balancerConfigName), BrokerBalancerControl.class, mBeanServer); + balancerControl0.setLocalTargetFilter("CONSUMER"); + + // start new head exclusively for Producer + nodes.get(1).getConfiguration().getBalancerConfigurations().get(0).setLocalTargetFilter("PRODUCER"); + nodes.get(1).start(); + + // ensure nothing can be consumed from the head + AddressControl addressControl1 = (AddressControl) ManagementControlHelper.createProxy(node1NameBuilder.getAddressObjectName(qNameSimple), AddressControl.class, mBeanServer); + assertTrue(Wait.waitFor(() -> { + try { + addressControl1.pause(); + return true; + } catch (javax.management.InstanceNotFoundException notYetReadyExpected) { + } + return false; + },10000, 200)); + + // need another connection to drain tail + EQConsumer eqConsumer = new EQConsumer(urlForNodes(nodes), 0); + executorService.submit(eqConsumer); + + // wait for tail to drain + assertTrue(Wait.waitFor(() -> { + int usage = addressControl0.getAddressLimitPercent(); + System.out.println("Tail usage % " + usage); + return usage == 0; + },10000, 200)); + + assertTrue(Wait.waitFor(() -> { + System.out.println("drain tail, lastProduced: " + lastProducedToHeadTail + ", consumed: " + eqConsumer.getLastConsumed()); + return lastProducedToHeadTail == eqConsumer.getLastConsumed(); + },5000, 100)); + + eqConsumer.done.set(true); + + balancerControl0 = (BrokerBalancerControl) ManagementControlHelper.createProxy(node0NameBuilder.getBrokerBalancerObjectName(balancerConfigName), BrokerBalancerControl.class, mBeanServer); + balancerControl0.setLocalTargetFilter(""); // out of service + + nodes.get(0).stop(); + + // resume consumption on new head + addressControl1.resume(); + + // head should fill + assertTrue(Wait.waitFor(() -> { + int usage = addressControl1.getAddressLimitPercent(); + System.out.println("Head&Tail usage % " + usage); + return usage == 100; + },10000, 200)); + + eqProducerConsumer.producerDone.set(true); + + // head should drain + assertTrue(Wait.waitFor(() -> { + int usage = addressControl1.getAddressLimitPercent(); + System.out.println("Node1 usage % " + usage); + return usage == 0; + },10000, 200)); + + assertTrue(Wait.waitFor(() -> { + System.out.println("current head&tail lastProduced: " + eqProducerConsumer.getLastProduced() + ", consumed: " + eqProducerConsumer.getLastConsumed()); + return eqProducerConsumer.getLastProduced() == eqProducerConsumer.getLastConsumed(); + },5000, 100)); + + eqProducerConsumer.done.set(true); + + nodes.get(1).stop(); + } + + + // If we had a producer block (based on credit) it could also consume but not produce if we allow + // it to have both roles. With both roles, we need to be able to turn off production and best with credit. + // blocking credit takes effect for new links, existing producers will see the FAIL exception. + // Blocked producers make use of jms.sendTimeout to error out. + @Test (timeout = 60000) + public void testScale0_1_CombinedRoleConnection() throws Exception { + + prepareNodesAndStartCombinedHeadTail(); + + EQProducerAsyncConsumer eqProducerConsumer = new EQProducerAsyncConsumer(urlForNodes(nodes), "BOTH"); + executorService.submit(eqProducerConsumer); + + AddressControl addressControl0 = (AddressControl) ManagementControlHelper.createProxy(node0NameBuilder.getAddressObjectName(qNameSimple), AddressControl.class, mBeanServer); + assertTrue(Wait.waitFor(() -> { + try { + int usage = addressControl0.getAddressLimitPercent(); + System.out.println("Head&Tail usage % " + usage); + return usage == 100; + } catch (javax.management.InstanceNotFoundException notYetReadyExpected) { + } + return false; + },20000, 200)); + + assertTrue("producer got full error and reconnected", Wait.waitFor(() -> eqProducerConsumer.connectionCount.get() > 0)); + + // stop producer on Node0, only accept consumers. make it a tail broker + BrokerBalancerControl balancerControl0 = (BrokerBalancerControl) ManagementControlHelper.createProxy(node0NameBuilder.getBrokerBalancerObjectName(balancerConfigName), BrokerBalancerControl.class, mBeanServer); + balancerControl0.setTargetKeyFilter("(?<=^EQ_)CONSUMER"); // because both roles present, we need to filter the roles with an exact match, otherwise we get the first one! + // ensure nothing more can be produced + addressControl0.block(); + System.out.println("Tail blocked!"); + + // start new head exclusively for Producer + nodes.get(1).getConfiguration().getBalancerConfigurations().get(0).setTargetKeyFilter("(?<=^EQ_)PRODUCER"); // just accept the producer role as key + nodes.get(1).getConfiguration().getBalancerConfigurations().get(0).setLocalTargetFilter(null); // initially won't accept any till we pause + + // new Head needs the address configured, such that we can start the balancer with the address paused + nodes.get(1).getConfiguration().getAddressConfigurations().add(new CoreAddressConfiguration().setName(qName).addRoutingType(RoutingType.ANYCAST).addQueueConfiguration(new QueueConfiguration(qName).setRoutingType(RoutingType.ANYCAST))); + nodes.get(1).start(); + + // ensure nothing can be consumed from the head + AddressControl addressControl1 = (AddressControl) ManagementControlHelper.createProxy(node1NameBuilder.getAddressObjectName(qNameSimple), AddressControl.class, mBeanServer); + assertTrue(Wait.waitFor(() -> { + try { + addressControl1.pause(); + return true; + } catch (javax.management.InstanceNotFoundException notYetReadyExpected) { + } + return false; + }, 5000, 100)); + BrokerBalancerControl balancerControl1 = (BrokerBalancerControl) ManagementControlHelper.createProxy(node1NameBuilder.getBrokerBalancerObjectName(balancerConfigName), BrokerBalancerControl.class, mBeanServer); + balancerControl1.setLocalTargetFilter("PRODUCER"); + System.out.println("Head enabled for producers... limit: " + addressControl1.getAddressLimitPercent()); + + // let the consumer run as fast as possible + eqProducerConsumer.consumerSleepMillis.set(0); + + // wait for tail to drain, connection should bounce due to the slow consumer strategy and get to consume from the tail + assertTrue(Wait.waitFor(() -> { + int usage = addressControl0.getAddressLimitPercent(); + System.out.println("Want 0, tail usage % " + usage); + return usage == 0; + }, 20000, 200)); + + System.out.println("Tail drained!"); + balancerControl0.setLocalTargetFilter(null); // out of service + + // resume consumers on the new head&tail + addressControl1.resume(); + + // slow down consumers again + eqProducerConsumer.consumerSleepMillis.set(2000); + + // head should fill + assertTrue(Wait.waitFor(() -> { + int usage = addressControl1.getAddressLimitPercent(); + System.out.println("want 100%, head&tail usage % " + usage); + return usage == 100; + }, 20000, 200)); + + eqProducerConsumer.producerDone.set(true); + + // head should drain + assertTrue(Wait.waitFor(() -> { + int usage = addressControl1.getAddressLimitPercent(); + System.out.println("Want 0, head&tail usage % " + usage); + return usage == 0; + }, 20000, 200)); + + assertTrue(Wait.waitFor(() -> { + System.out.println("current head&tail lastProduced: " + eqProducerConsumer.getLastProduced() + ", consumed: " + eqProducerConsumer.getLastConsumed()); + return eqProducerConsumer.getLastProduced() == eqProducerConsumer.getLastConsumed(); + }, 20000, 200)); + + eqProducerConsumer.done.set(true); + + nodes.get(1).stop(); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageCounterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageCounterTest.java index c3708fee45..dfd3aaaa3f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageCounterTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MessageCounterTest.java @@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.Wait; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -70,7 +71,7 @@ public class MessageCounterTest extends ActiveMQTestBase { session.commit(); session.start(); - Assert.assertEquals(100, getMessageCount(server.getPostOffice(), QUEUE.toString())); + Wait.waitFor(() -> 100 == getMessageCount(server.getPostOffice(), QUEUE.toString()), 500, 10); ClientConsumer consumer = session.createConsumer(QUEUE, null, false); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java index c69f37e40f..0f381a8d9e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java @@ -612,6 +612,41 @@ public class SlowConsumerTest extends ActiveMQTestBase { Wait.assertEquals(0, queue::getConsumerCount); } + + @Test + public void testKilledOnNoMessagesSoCanBeRebalanced() throws Exception { + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setSlowConsumerCheckPeriod(1); + addressSettings.setSlowConsumerThresholdMeasurementUnit(MESSAGES_PER_SECOND); + addressSettings.setSlowConsumerThreshold(0); // if there are no messages pending, kill me + addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL); + + server.getAddressSettingsRepository().removeMatch(QUEUE.toString()); + server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings); + + final Queue queue = server.locateQueue(QUEUE); + + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = addClientSession(sf.createSession(false, true, true, false)); + + ClientProducer producer = addClientProducer(session.createProducer(QUEUE)); + + int messages = 1; + + for (int i = 0; i < messages; i++) { + producer.send(session.createMessage(true)); + } + + ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE)); + session.start(); + consumer.receive(500).individualAcknowledge(); + assertEquals(1, queue.getConsumerCount()); + + // gets whacked! + Wait.assertEquals(0, queue::getConsumerCount, 2000, 100); + } + /** * This test creates 3 consumers on one queue. A producer sends * messages at a rate of 2 messages per second. Each consumer diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java index 73b39e15a8..9b1e52fdf0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java @@ -768,6 +768,7 @@ public class AddressControlTest extends ManagementTestBase { Assert.assertTrue(addressControl.getAddressSize() > pageLimitNumberOfMessages * payLoadSize ); final long exactSizeValueBeforeRestart = addressControl.getAddressSize(); + final int exactPercentBeforeRestart = addressControl.getAddressLimitPercent(); // restart to reload journal server.stop(); @@ -776,6 +777,7 @@ public class AddressControlTest extends ManagementTestBase { addressControl = createManagementControl(address); Assert.assertTrue(addressControl.getAddressSize() > pageLimitNumberOfMessages * payLoadSize ); Assert.assertEquals(exactSizeValueBeforeRestart, addressControl.getAddressSize()); + Assert.assertEquals(exactPercentBeforeRestart, addressControl.getAddressLimitPercent()); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java index e160dfe255..2ebc38f9a5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlUsingCoreTest.java @@ -91,6 +91,21 @@ public class AddressControlUsingCoreTest extends AddressControlTest { return (boolean) proxy.retrieveAttributeValue("paging"); } + @Override + public int getAddressLimitPercent() throws Exception { + return (int) proxy.retrieveAttributeValue("addressLimitPercent", Integer.class); + } + + @Override + public boolean block() throws Exception { + return (boolean) proxy.invokeOperation("block"); + } + + @Override + public void unblock() throws Exception { + proxy.invokeOperation("unBlock"); + } + @Override public long getNumberOfBytesPerPage() throws Exception { return (long) proxy.retrieveAttributeValue("numberOfBytesPerPage"); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BrokerBalancerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BrokerBalancerControlTest.java index 005de61a05..2469a5065d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BrokerBalancerControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BrokerBalancerControlTest.java @@ -143,6 +143,31 @@ public class BrokerBalancerControlTest extends BalancingTestBase { Assert.assertNull(connectorData); } + @Test + public void testLocalTargetAccessors() throws Exception { + BrokerBalancerControl brokerBalancerControl = getBrokerBalancerControlForLocalTarget(); + + assertNull(brokerBalancerControl.getLocalTargetFilter()); + final String v = "EQ"; + brokerBalancerControl.setLocalTargetFilter(v); + assertEquals(v, brokerBalancerControl.getLocalTargetFilter()); + + brokerBalancerControl.setLocalTargetFilter(""); + assertNull(brokerBalancerControl.getLocalTargetFilter()); + + brokerBalancerControl.setLocalTargetFilter(null); + assertNull(brokerBalancerControl.getLocalTargetFilter()); + + assertNull(brokerBalancerControl.getTargetKeyFilter()); + brokerBalancerControl.setTargetKeyFilter(v); + assertEquals(v, brokerBalancerControl.getTargetKeyFilter()); + brokerBalancerControl.setTargetKeyFilter(""); + assertNull(brokerBalancerControl.getTargetKeyFilter()); + + brokerBalancerControl.setTargetKeyFilter(null); + assertNull(brokerBalancerControl.getTargetKeyFilter()); + } + @Test public void testGetLocalTargetAsJSON() throws Exception { BrokerBalancerControl brokerBalancerControl = getBrokerBalancerControlForLocalTarget(); diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java index 09c883184f..4b154bfb7e 100644 --- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java +++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java @@ -477,5 +477,18 @@ public class PersistMultiThreadTest extends ActiveMQTestBase { @Override public void destroy() throws Exception { } + + @Override + public int getAddressLimitPercent() { + return 0; + } + + @Override + public void block() { + } + + @Override + public void unblock() { + } } } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java index 424f8f9a0b..772868749f 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java @@ -65,6 +65,7 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.RandomUtil; +import org.apache.activemq.artemis.utils.Wait; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.jboss.logging.Logger; import org.junit.After; @@ -852,6 +853,216 @@ public class PagingStoreImplTest extends ActiveMQTestBase { } } + @Test + public void testGetAddressLimitPercent() throws Exception { + SequentialFileFactory factory = new FakeSequentialFileFactory(); + + PagingStoreFactory storeFactory = new FakeStoreFactory(factory); + + PagingStoreImpl store = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, + createMockManager(), createStorageManagerMock(), factory, storeFactory, + PagingStoreImplTest.destinationTestName, + new AddressSettings() + .setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK), + getExecutorFactory().getExecutor(), true); + + store.start(); + try { + assertEquals(0, store.getAddressLimitPercent()); + store.addSize(100); + + // no limit set + assertEquals(0, store.getAddressLimitPercent()); + + store.applySetting(new AddressSettings().setMaxSizeBytes(1000)); + assertEquals(10, store.getAddressLimitPercent()); + + store.addSize(900); + assertEquals(100, store.getAddressLimitPercent()); + + store.addSize(900); + assertEquals(190, store.getAddressLimitPercent()); + + store.addSize(-900); + assertEquals(100, store.getAddressLimitPercent()); + + store.addSize(-1); + assertEquals(99, store.getAddressLimitPercent()); + + } finally { + store.stop(); + } + } + + @Test + public void testGetAddressLimitPercentGlobalSize() throws Exception { + SequentialFileFactory factory = new FakeSequentialFileFactory(); + + PagingStoreFactory storeFactory = new FakeStoreFactory(factory); + + final AtomicLong limit = new AtomicLong(); + AtomicLong globalSize = new AtomicLong(); + PagingManager pagingManager = new FakePagingManager() { + @Override + public boolean isUsingGlobalSize() { + return limit.get() > 0; + } + + @Override + public FakePagingManager addSize(int s) { + globalSize.addAndGet(s); + return this; + } + + @Override + public long getGlobalSize() { + return globalSize.get(); + } + + @Override + public boolean isGlobalFull() { + return globalSize.get() >= limit.get(); + } + + @Override + public long getMaxSize() { + return limit.get(); + } + }; + + PagingStoreImpl store = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, + pagingManager, createStorageManagerMock(), factory, storeFactory, + PagingStoreImplTest.destinationTestName, + new AddressSettings() + .setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK), + getExecutorFactory().getExecutor(), true); + + store.start(); + try { + // no usage yet + assertEquals(0, store.getAddressLimitPercent()); + + store.addSize(100); + + // no global limit set + assertEquals(0, store.getAddressLimitPercent()); + + // set a global limit + limit.set(1000); + assertEquals(10, store.getAddressLimitPercent()); + + store.addSize(900); + assertEquals(100, store.getAddressLimitPercent()); + + store.addSize(900); + assertEquals(190, store.getAddressLimitPercent()); + + store.addSize(-900); + assertEquals(100, store.getAddressLimitPercent()); + + store.addSize(-1); + assertEquals(99, store.getAddressLimitPercent()); + + } finally { + store.stop(); + } + } + + @Test + public void testBlockUnblock() throws Exception { + SequentialFileFactory factory = new FakeSequentialFileFactory(); + + PagingStoreFactory storeFactory = new FakeStoreFactory(factory); + + PagingStoreImpl store = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, + createMockManager(), createStorageManagerMock(), factory, storeFactory, + PagingStoreImplTest.destinationTestName, + new AddressSettings() + .setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK), + getExecutorFactory().getExecutor(), true); + + store.start(); + try { + final AtomicInteger calls = new AtomicInteger(); + final Runnable trackMemoryChecks = new Runnable() { + @Override + public void run() { + calls.incrementAndGet(); + } + }; + store.applySetting(new AddressSettings().setMaxSizeBytes(1000).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK)); + store.addSize(100); + store.checkMemory(trackMemoryChecks); + assertEquals(1, calls.get()); + + store.block(); + store.checkMemory(trackMemoryChecks); + assertEquals(1, calls.get()); + + store.unblock(); + + assertTrue(Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + return 2 == calls.get(); + } + }, 1000, 50)); + + store.addSize(900); + assertEquals(100, store.getAddressLimitPercent()); + + // address full blocks + store.checkMemory(trackMemoryChecks); + assertEquals(2, calls.get()); + + store.block(); + + // release memory + store.addSize(-900); + assertEquals(10, store.getAddressLimitPercent()); + + // not yet released memory checks b/c of blocked + assertEquals(2, calls.get()); + + store.unblock(); + + // now released + assertTrue(Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + return 3 == calls.get(); + } + }, 1000, 50)); + + // reverse - unblock while full does not release + store.block(); + + store.addSize(900); + assertEquals(100, store.getAddressLimitPercent()); + + store.checkMemory(trackMemoryChecks); + assertEquals("no change", 3, calls.get()); + assertEquals("no change to be sure to be sure!", 3, calls.get()); + + store.unblock(); + assertEquals("no change after unblock", 3, calls.get()); + + store.addSize(-900); + assertEquals(10, store.getAddressLimitPercent()); + + assertTrue("change", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + return 4 == calls.get(); + } + }, 1000, 50)); + + + } finally { + store.stop(); + } + } + /** * @return */ diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java index d014fc5043..a7ee37e48e 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/FakePagingManager.java @@ -27,7 +27,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.server.files.FileStoreMonitor; -public final class FakePagingManager implements PagingManager { +public class FakePagingManager implements PagingManager { @Override public void addBlockedStore(PagingStore store) {