ARTEMIS-2097 - via elastic queue use case test based on ARTEMIS-3365 and ARTEMIS-3569

scenario - avoid paging, if address is full chain another broker and produce to the head, consume from the tail using producer and consumer roles to partition connections. When tail is drained, drop it.
 - adds a option to treat an idle consumer as slow
 - adds basic support for credit based address blocking ARTEMIS-2097
 - adds some more visiblity to address memory usage and balancer attribute modifier operations
This commit is contained in:
gtully 2021-11-25 15:29:40 +00:00 committed by Gary Tully
parent 56299e846a
commit 158157260c
28 changed files with 1333 additions and 28 deletions

View File

@ -2886,5 +2886,28 @@ public interface AuditLogger extends BasicLogger {
@Message(id = 601752, value = "User {0} failed to purge address {1}", format = Message.Format.MESSAGE_FORMAT)
void purgeAddressFailure(String user, String queueName);
static void getAddressLimitPercent(Object source) {
BASE_LOGGER.getAddressLimitPercent(getCaller(), source);
}
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601753, value = "User {0} is getting address limit % on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void getAddressLimitPercent(String user, Object source, Object... args);
static void block(Object source) {
BASE_LOGGER.block(getCaller(), source);
}
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601754, value = "User {0} is blocking target resource: {1}", format = Message.Format.MESSAGE_FORMAT)
void block(String user, Object source);
static void unBlock(Object source) {
BASE_LOGGER.unBlock(getCaller(), source);
}
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601755, value = "User {0} is unblocking target resource: {1}", format = Message.Format.MESSAGE_FORMAT)
void unBlock(String user, Object source);
}

View File

@ -105,6 +105,25 @@ public interface AddressControl {
@Attribute(desc = "whether this address is paging")
boolean isPaging() throws Exception;
/**
* Returns the % of memory limit that is currently in use
*
* @throws Exception
*/
@Attribute(desc = "the % of memory limit (global or local) that is in use by this address")
int getAddressLimitPercent() throws Exception;
/**
* Blocks message production to this address by limiting credit
* @return true if production is blocked
* @throws Exception
*/
@Operation(desc = "Stops message production to this address, typically with flow control.", impact = MBeanOperationInfo.ACTION)
boolean block() throws Exception;
@Operation(desc = "Resumes message production to this address, if previously blocked.", impact = MBeanOperationInfo.ACTION)
void unblock() throws Exception;
/**
* Returns the number of bytes used by each page for this address.
*/

View File

@ -28,4 +28,16 @@ public interface BrokerBalancerControl {
@Operation(desc = "Get the target associated with key as JSON", impact = MBeanOperationInfo.INFO)
String getTargetAsJSON(@Parameter(desc = "a key", name = "key") String key);
@Operation(desc = "Set the local target filter regular expression", impact = MBeanOperationInfo.ACTION)
void setLocalTargetFilter(@Parameter(desc = "the regular expression", name = "regExp") String regExp);
@Operation(desc = "Get the local target filter regular expression", impact = MBeanOperationInfo.INFO)
String getLocalTargetFilter();
@Operation(desc = "Set the target key filter regular expression", impact = MBeanOperationInfo.ACTION)
void setTargetKeyFilter(@Parameter(desc = "the regular expression", name = "regExp") String regExp);
@Operation(desc = "Get the target key filter regular expression", impact = MBeanOperationInfo.INFO)
String getTargetKeyFilter();
}

View File

@ -321,6 +321,66 @@ public class AddressControlImpl extends AbstractControl implements AddressContro
}
}
@Override
public int getAddressLimitPercent() throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getAddressLimitPercent(this.addressInfo);
}
clearIO();
try {
final PagingStore pagingStore = getPagingStore();
if (pagingStore == null) {
return 0;
}
return pagingStore.getAddressLimitPercent();
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.debug("Failed to get address limit %", e);
return -1;
} finally {
blockOnIO();
}
}
@Override
public boolean block() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.block(this.addressInfo);
}
clearIO();
boolean result = false;
try {
final PagingStore pagingStore = getPagingStore();
if (pagingStore != null) {
pagingStore.block();
result = true;
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.debug("Failed to block", e);
} finally {
blockOnIO();
}
return result;
}
@Override
public void unblock() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.unBlock(this.addressInfo);
}
clearIO();
try {
final PagingStore pagingStore = getPagingStore();
if (pagingStore != null) {
pagingStore.unblock();
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.debug("Failed to unblock", e);
} finally {
blockOnIO();
}
}
@Override
public int getNumberOfPages() {
if (AuditLogger.isBaseLoggingEnabled()) {

View File

@ -107,6 +107,26 @@ public class BrokerBalancerControlImpl extends AbstractControl implements Broker
return null;
}
@Override
public void setLocalTargetFilter(String regExp) {
balancer.setLocalTargetFilter(regExp);
}
@Override
public String getLocalTargetFilter() {
return balancer.getLocalTargetFilter();
}
@Override
public void setTargetKeyFilter(String regExp) {
balancer.getTargetKeyResolver().setKeyFilter(regExp);
}
@Override
public String getTargetKeyFilter() {
return balancer.getTargetKeyResolver().getKeyFilter();
}
@Override
protected MBeanOperationInfo[] fillMBeanOperationInfo() {
return MBeanInfoHelper.getMBeanOperationsInfo(BrokerBalancerControl.class);

View File

@ -130,4 +130,7 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
checkMemory(runWhenAvailable);
}
default long getMaxSize() {
return 0;
}
}

View File

@ -193,4 +193,10 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
void enableCleanup();
void destroy() throws Exception;
int getAddressLimitPercent();
void block();
void unblock();
}

View File

@ -116,6 +116,7 @@ public final class PagingManagerImpl implements PagingManager {
this.managementAddress = managementAddress;
}
@Override
public long getMaxSize() {
return maxSize;
}
@ -276,7 +277,7 @@ public final class PagingManagerImpl implements PagingManager {
@Override
public boolean isGlobalFull() {
return diskFull || maxSize > 0 && globalSizeBytes.get() > maxSize;
return diskFull || maxSize > 0 && globalSizeBytes.get() >= maxSize;
}
@Override

View File

@ -126,6 +126,8 @@ public class PagingStoreImpl implements PagingStore {
private volatile boolean blocking = false;
private volatile boolean blockedViaAddressControl = false;
private long rejectThreshold;
public PagingStoreImpl(final SimpleString address,
@ -695,6 +697,13 @@ public class PagingStoreImpl implements PagingStore {
@Override
public boolean checkMemory(boolean runOnFailure, final Runnable runWhenAvailable) {
if (blockedViaAddressControl) {
if (runWhenAvailable != null) {
onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));
}
return false;
}
if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL && (maxSize != -1 || usingGlobalMaxSize || pagingManager.isDiskFull())) {
if (isFull()) {
if (runOnFailure && runWhenAvailable != null) {
@ -703,7 +712,7 @@ public class PagingStoreImpl implements PagingStore {
return false;
}
} else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && (maxSize != -1 || usingGlobalMaxSize)) {
if (pagingManager.isDiskFull() || maxSize > 0 && sizeInBytes.get() > maxSize || pagingManager.isGlobalFull()) {
if (pagingManager.isDiskFull() || maxSize > 0 && sizeInBytes.get() >= maxSize || pagingManager.isGlobalFull()) {
onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));
@ -711,7 +720,7 @@ public class PagingStoreImpl implements PagingStore {
// has been added, but the check to execute was done before the element was added
// NOTE! We do not fix this race by locking the whole thing, doing this check provides
// MUCH better performance in a highly concurrent environment
if (!pagingManager.isGlobalFull() && (sizeInBytes.get() <= maxSize || maxSize < 0)) {
if (!pagingManager.isGlobalFull() && (sizeInBytes.get() < maxSize || maxSize < 0)) {
// run it now
runWhenAvailable.run();
} else {
@ -770,8 +779,8 @@ public class PagingStoreImpl implements PagingStore {
return checkReleaseMemory(pagingManager.isGlobalFull(), sizeInBytes.get());
}
public boolean checkReleaseMemory(boolean globalOversized, long newSize) {
if (!globalOversized && (newSize <= maxSize || maxSize < 0)) {
public boolean checkReleaseMemory(boolean globalFull, long newSize) {
if (!blockedViaAddressControl && !globalFull && (newSize < maxSize || maxSize < 0)) {
if (!onMemoryFreedRunnables.isEmpty()) {
executor.execute(this::memoryReleased);
if (blocking) {
@ -799,12 +808,6 @@ public class PagingStoreImpl implements PagingStore {
if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP || addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
if (full) {
if (!printedDropMessagesWarning) {
printedDropMessagesWarning = true;
ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, sizeInBytes.get(), maxSize, pagingManager.getGlobalSize());
}
if (message.isLargeMessage()) {
((LargeServerMessage) message).deleteFile();
}
@ -814,6 +817,10 @@ public class PagingStoreImpl implements PagingStore {
}
// Address is full, we just pretend we are paging, and drop the data
if (!printedDropMessagesWarning) {
printedDropMessagesWarning = true;
ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, sizeInBytes.get(), maxSize, pagingManager.getGlobalSize());
}
return true;
} else {
return false;
@ -1150,7 +1157,36 @@ public class PagingStoreImpl implements PagingStore {
// To be used on isDropMessagesWhenFull
@Override
public boolean isFull() {
return maxSize > 0 && getAddressSize() > maxSize || pagingManager.isGlobalFull();
return maxSize > 0 && getAddressSize() >= maxSize || pagingManager.isGlobalFull();
}
@Override
public int getAddressLimitPercent() {
final long currentUsage = getAddressSize();
if (maxSize > 0) {
return (int) (currentUsage * 100 / maxSize);
} else if (pagingManager.isUsingGlobalSize()) {
return (int) (currentUsage * 100 / pagingManager.getMaxSize());
}
return 0;
}
@Override
public void block() {
if (!blockedViaAddressControl) {
ActiveMQServerLogger.LOGGER.blockingViaControl(address);
}
blockedViaAddressControl = true;
}
@Override
public void unblock() {
if (blockedViaAddressControl) {
ActiveMQServerLogger.LOGGER.unblockingViaControl(address);
}
blockedViaAddressControl = false;
checkReleasedMemory();
}
@Override

View File

@ -2207,4 +2207,13 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.INFO)
@Message(id = 224113, value = "Auto removing Address {0}", format = Message.Format.MESSAGE_FORMAT)
void autoRemoveAddress(String name);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 224114, value = "Address control block, blocking message production on address ''{0}''. Clients will not get further credit.", format = Message.Format.MESSAGE_FORMAT)
void blockingViaControl(SimpleString addressName);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 224115, value = "Address control unblock of address ''{0}''. Clients will be granted credit as normal.", format = Message.Format.MESSAGE_FORMAT)
void unblockingViaControl(SimpleString addressName);
}

View File

@ -49,7 +49,7 @@ public class BrokerBalancer implements ActiveMQComponent {
private final TargetResult localTarget;
private final Pattern localTargetFilter;
private volatile Pattern localTargetFilter;
private final Pool pool;
@ -214,6 +214,18 @@ public class BrokerBalancer implements ActiveMQComponent {
return result != null ? result : TargetResult.REFUSED_UNAVAILABLE_RESULT;
}
public void setLocalTargetFilter(String regExp) {
if (regExp == null || regExp.trim().isEmpty()) {
this.localTargetFilter = null;
} else {
this.localTargetFilter = Pattern.compile(regExp);
}
}
public TargetKeyResolver getTargetKeyResolver() {
return targetKeyResolver;
}
private String transform(String key) {
String result = key;
if (transformer != null) {

View File

@ -38,7 +38,7 @@ public class TargetKeyResolver {
private final TargetKey key;
private final Pattern keyFilter;
private volatile Pattern keyFilter;
public TargetKey getKey() {
@ -51,8 +51,7 @@ public class TargetKeyResolver {
public TargetKeyResolver(TargetKey key, String keyFilter) {
this.key = key;
this.keyFilter = keyFilter != null ? Pattern.compile(keyFilter) : null;
setKeyFilter(keyFilter);
}
public String resolve(Connection connection, String clientID, String username) {
@ -137,4 +136,12 @@ public class TargetKeyResolver {
return keyValue;
}
public void setKeyFilter(String regExp) {
if (regExp == null || regExp.isBlank()) {
this.keyFilter = null;
} else {
this.keyFilter = Pattern.compile(regExp);
}
}
}

View File

@ -107,6 +107,10 @@ public class EmbeddedActiveMQ {
return this;
}
public Configuration getConfiguration() {
return configuration;
}
public ActiveMQServer getActiveMQServer() {
return activeMQServer;
}

View File

@ -4550,7 +4550,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (consumer instanceof ServerConsumerImpl) {
ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
float consumerRate = serverConsumer.getRate();
if (consumerRate < thresholdInMsgPerSecond) {
if (consumerRate < thresholdInMsgPerSecond || (consumerRate == 0 && thresholdInMsgPerSecond == 0)) {
RemotingConnection connection = null;
ActiveMQServer server = ((PostOfficeImpl) postOffice).getServer();
RemotingService remotingService = server.getRemotingService();

View File

@ -833,7 +833,9 @@ address, if it exists.
`slow-consumer-threshold`. The minimum rate of message consumption allowed
before a consumer is considered "slow." Measured in units specified by the
slow-consumer-threshold-measurement-unit configuration option. Default is `-1`
(i.e. disabled); any other valid value must be greater than 0.
(i.e. disabled); any other value must be greater than 0 to ensure a queue
has messages, and it is the actual consumer that is slow. A value of 0 will
allow a consumer with no messages pending to be considered slow.
Read more about [slow consumers](slow-consumers.md).
`slow-consumer-threshold-measurement-unit`. The units used to measure the

View File

@ -127,13 +127,20 @@ Individual addresses can be managed using the `AddressControl` interface.
`removeRole()` methods. You can list all the roles associated to the queue with
the `getRoles()` method
- Pausing and resuming Address
- Pausing and resuming an Address
The `AddressControl` can pause and resume an address and all the queues that
are bound to it. Newly added queue will be paused too until the address is resumed.
Thus all messages sent to the address will be received but not delivered. When it is
resumed, delivering will occur again.
- Blocking and un blocking an Address
The `AddressControl` can block and unblock an address. A blocked address will not issue
any more credit to existing producers. New producers will not be granted any credit.
When the address is unblocked, credit granting will resume. In this way, it is possible
to drain all the queues associated with an address to quiesce a broker in a managed way.
### Queue Management
The bulk of the management API deals with queues. The `QueueControl` interface

View File

@ -7,8 +7,8 @@ If messages build up in the consumer's server-side queue then memory
will begin filling up and the broker may enter paging mode which would
impact performance negatively. However, criteria can be set so that
consumers which don't acknowledge messages quickly enough can
potentially be disconnected from the broker which in the case of a
non-durable JMS subscriber would allow the broker to remove the
potentially be disconnected from the broker, which in the case of a
non-durable JMS subscriber, would allow the broker to remove the
subscription and all of its messages freeing up valuable server
resources.

View File

@ -24,14 +24,18 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ResourceAllocationException;
import javax.jms.Session;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
@ -85,6 +89,39 @@ public class AmqpFlowControlTest extends JMSClientTestSupport {
}
}
@Test(timeout = 60000)
public void testCreditIsNotGivenOnLinkCreationWhileBlockedAndIsGivenOnceThenUnblocked() throws Exception {
AmqpClient client = createAmqpClient(new URI(singleCreditAcceptorURI));
AmqpConnection connection = addConnection(client.connect());
try {
AddressControl addressControl = ManagementControlHelper.createAddressControl(SimpleString.toSimpleString(getQueueName()), mBeanServer);
addressControl.block();
AmqpSession session = connection.createSession();
final AmqpSender sender = session.createSender(getQueueName());
assertTrue("Should get 0 credit", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return 0 == sender.getSender().getCredit();
}
}, 5000, 20));
addressControl.unblock();
assertTrue("Should now get issued one credit", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return 1 == sender.getSender().getCredit();
}
}, 5000, 20));
sender.close();
AmqpSender sender2 = session.createSender(getQueueName());
assertEquals("Should only be issued one credit", 1, sender2.getSender().getCredit());
} finally {
connection.close();
}
}
@Test(timeout = 60000)
public void testCreditsAreNotAllocatedWhenAddressIsFull() throws Exception {
AmqpClient client = createAmqpClient(new URI(singleCreditAcceptorURI));
@ -109,7 +146,7 @@ public class AmqpFlowControlTest extends JMSClientTestSupport {
}
@Test(timeout = 60000)
public void testAddressIsBlockedForOtherProdudersWhenFull() throws Exception {
public void testAddressIsBlockedForOtherProducersWhenFull() throws Exception {
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination d = session.createQueue(getQueueName());
@ -130,6 +167,51 @@ public class AmqpFlowControlTest extends JMSClientTestSupport {
assertTrue(addressSize >= MAX_SIZE_BYTES_REJECT_THRESHOLD);
}
@Test(timeout = 60000)
public void testSendBlocksWhenAddressBlockedAndCompletesAfterUnblocked() throws Exception {
Connection connection = createConnection(new URI(singleCreditAcceptorURI.replace("tcp", "amqp")), null, null, null, true);
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination d = session.createQueue(getQueueName());
final MessageProducer p = session.createProducer(d);
final CountDownLatch running = new CountDownLatch(1);
final CountDownLatch done = new CountDownLatch(1);
AddressControl addressControl = ManagementControlHelper.createAddressControl(SimpleString.toSimpleString(getQueueName()), mBeanServer);
assertTrue("blocked ok", addressControl.block());
// one credit
p.send(session.createBytesMessage());
// this send will block, no credit
new Thread(new Runnable() {
@Override
public void run() {
try {
running.countDown();
p.send(session.createBytesMessage());
} catch (JMSException ignored) {
} finally {
done.countDown();
}
}
}).start();
assertTrue(running.await(5, TimeUnit.SECONDS));
assertFalse(done.await(200, TimeUnit.MILLISECONDS));
addressControl.unblock();
assertTrue(done.await(5, TimeUnit.SECONDS));
// good to go again
p.send(session.createBytesMessage());
assertEquals(3, addressControl.getMessageCount());
}
@Test(timeout = 60000)
public void testCreditsAreRefreshedWhenAddressIsUnblocked() throws Exception {
fillAddress(getQueueName());

View File

@ -154,7 +154,7 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport {
return createConnection(getBrokerQpidJMSConnectionURI(), username, password, clientId, start);
}
private Connection createConnection(URI remoteURI, String username, String password, String clientId, boolean start) throws JMSException {
protected Connection createConnection(URI remoteURI, String username, String password, String clientId, boolean start) throws JMSException {
JmsConnectionFactory factory = new JmsConnectionFactory(remoteURI);
Connection connection = trackJMSConnection(factory.createConnection(username, password));

View File

@ -0,0 +1,700 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.balancing;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import javax.security.auth.Subject;
import java.io.File;
import java.net.URI;
import java.util.Set;
import java.util.Stack;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.BrokerBalancerControl;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager5;
import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal;
import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal;
import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.junit.After;
import org.junit.Test;
public class ElasticQueueTest extends ActiveMQTestBase {
static final String qName = "EQ";
static final SimpleString qNameSimple = SimpleString.toSimpleString(qName);
final int base_port = 61616;
final Stack<EmbeddedActiveMQ> nodes = new Stack<>();
private final String balancerConfigName = "role_name_sharder";
private final ExecutorService executorService = Executors.newFixedThreadPool(3);
@After
public void cleanup() {
for (EmbeddedActiveMQ activeMQ : nodes) {
try {
activeMQ.stop();
} catch (Throwable ignored) {
}
}
nodes.clear();
executorService.shutdownNow();
}
String urlForNodes(Stack<EmbeddedActiveMQ> nodes) {
StringBuilder builder = new StringBuilder("failover:(");
int port_start = base_port;
for (EmbeddedActiveMQ ignored : nodes) {
if (port_start != base_port) {
builder.append(",");
}
builder.append("amqp://localhost:").append(port_start++);
}
// fast reconnect, randomize to get to all brokers and timeout sends that block on no credit
builder.append(")?failover.randomize=true&failover.maxReconnectAttempts=1&jms.sendTimeout=" + 1000);
return builder.toString();
}
// allow tracking of failover reconnects
static class ConnectionListener implements JmsConnectionListener {
AtomicInteger connectionCount;
ConnectionListener(AtomicInteger connectionCount) {
this.connectionCount = connectionCount;
}
@Override
public void onConnectionEstablished(URI uri) {
}
@Override
public void onConnectionFailure(Throwable throwable) {
}
@Override
public void onConnectionInterrupted(URI uri) {
}
@Override
public void onConnectionRestored(URI uri) {
connectionCount.incrementAndGet();
}
@Override
public void onInboundMessage(JmsInboundMessageDispatch jmsInboundMessageDispatch) {
}
@Override
public void onSessionClosed(Session session, Throwable throwable) {
}
@Override
public void onConsumerClosed(MessageConsumer messageConsumer, Throwable throwable) {
}
@Override
public void onProducerClosed(MessageProducer messageProducer, Throwable throwable) {
}
}
// slow consumer
class EQConsumer implements Runnable {
final AtomicInteger consumedCount = new AtomicInteger();
final AtomicInteger connectionCount = new AtomicInteger();
final AtomicBoolean done = new AtomicBoolean();
final AtomicInteger delayMillis;
private final String url;
long lastConsumed = 0;
EQConsumer(String url) {
this(url, 500);
}
EQConsumer(String url, int delay) {
this.url = url;
this.delayMillis = new AtomicInteger(delay);
}
@Override
public void run() {
try {
while (!done.get()) {
JmsConnectionFactory factory = new JmsConnectionFactory("CONSUMER", "PASSWORD", url);
try (JmsConnection connection = (JmsConnection) factory.createConnection()) {
// track disconnects via faiover listener
connectionCount.incrementAndGet();
connection.addConnectionListener(new ConnectionListener(connectionCount));
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createConsumer(session.createQueue(qName));
while (!done.get()) {
Message receivedMessage = messageConsumer.receiveNoWait();
if (receivedMessage != null) {
consumedCount.incrementAndGet();
lastConsumed = receivedMessage.getLongProperty("PID");
receivedMessage.acknowledge();
}
TimeUnit.MILLISECONDS.sleep(delayMillis.get());
}
} catch (JMSException okTryAgainWithNewConnection) {
}
}
} catch (Exception outOfHere) {
outOfHere.printStackTrace();
}
}
public long getLastConsumed() {
return lastConsumed;
}
}
// regular producer
static class EQProducer implements Runnable {
final AtomicInteger producedCount = new AtomicInteger();
final AtomicInteger connectionCount = new AtomicInteger();
final AtomicBoolean done = new AtomicBoolean();
private final String url;
EQProducer(String url) {
this.url = url;
}
@Override
public void run() {
URI connectedToUri = null;
while (!done.get()) {
JmsConnectionFactory factory = new JmsConnectionFactory("PRODUCER", "PASSWORD", url);
try (JmsConnection connection = (JmsConnection) factory.createConnection()) {
// track disconnects via faiover listener
connectionCount.incrementAndGet();
connection.addConnectionListener(new ConnectionListener(connectionCount));
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer messageProducer = session.createProducer(session.createQueue(qName));
BytesMessage message = session.createBytesMessage();
message.writeBytes(new byte[1024]);
while (!done.get()) {
connectedToUri = connection.getConnectedURI();
message.setLongProperty("PID", producedCount.get() + 1);
messageProducer.send(message);
producedCount.incrementAndGet();
}
} catch (JMSException expected) {
System.out.println("expected send failure: " + expected.toString() + " PID: " + producedCount.get() + ", uri: " + connectedToUri);
}
}
}
public long getLastProduced() {
return producedCount.get();
}
}
// combined producer/ async consumer
static class EQProducerAsyncConsumer implements Runnable {
final AtomicInteger producedCount = new AtomicInteger();
final AtomicInteger connectionCount = new AtomicInteger();
final AtomicBoolean done = new AtomicBoolean();
final AtomicBoolean producerDone = new AtomicBoolean();
final AtomicInteger consumerSleepMillis = new AtomicInteger(1000);
private final String url;
final AtomicInteger consumedCount = new AtomicInteger();
private final String user;
private long lastConsumed;
EQProducerAsyncConsumer(String url, String user) {
this.url = url;
this.user = user;
}
@Override
public void run() {
while (!done.get()) {
JmsConnectionFactory factory = new JmsConnectionFactory(user, "PASSWORD", url);
try (JmsConnection connection = (JmsConnection) factory.createConnection()) {
// track disconnects via faiover listener
connectionCount.incrementAndGet();
connection.addConnectionListener(new ConnectionListener(connectionCount));
connection.start();
Session clientSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer messageConsumer = clientSession.createConsumer(clientSession.createQueue(qName));
// consume async
messageConsumer.setMessageListener(message -> {
consumedCount.incrementAndGet();
try {
lastConsumed = message.getLongProperty("PID");
if (!producerDone.get()) {
TimeUnit.MILLISECONDS.sleep(consumerSleepMillis.get());
}
message.acknowledge();
} catch (JMSException | InterruptedException ignored) {
}
});
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer messageProducer = session.createProducer(session.createQueue(qName));
BytesMessage message = session.createBytesMessage();
message.writeBytes(new byte[1024]);
while (!done.get()) {
if (!producerDone.get()) {
message.setLongProperty("PID", producedCount.get() + 1);
messageProducer.send(message);
producedCount.incrementAndGet();
} else {
// just hang about and let the consumer listener work
TimeUnit.SECONDS.sleep(5);
}
}
} catch (JMSException | InterruptedException ignored) {
}
}
}
public long getLastProduced() {
return producedCount.get();
}
public long getLastConsumed() {
return lastConsumed;
}
}
MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer();
// hardwire authenticaton to map USER to EQ_USER etc
final ActiveMQSecurityManager5 customSecurityManager = new ActiveMQSecurityManager5() {
@Override
public Subject authenticate(String user,
String password,
RemotingConnection remotingConnection,
String securityDomain) {
Subject subject = null;
if (validateUser(user, password)) {
subject = new Subject();
subject.getPrincipals().add(new UserPrincipal(user));
subject.getPrincipals().add(new RolePrincipal("EQ_" + user));
if (user.equals("BOTH")) {
subject.getPrincipals().add(new RolePrincipal("EQ_PRODUCER"));
subject.getPrincipals().add(new RolePrincipal("EQ_CONSUMER"));
}
}
return subject;
}
@Override
public boolean authorize(Subject subject, Set<Role> roles, CheckType checkType, String address) {
return true;
}
@Override
public boolean validateUser(final String username, final String password) {
return (username.equals("CONSUMER") || username.equals("PRODUCER") || username.equals("BOTH"));
}
@Override
public boolean validateUserAndRole(final String username,
final String password,
final Set<Role> requiredRoles,
final CheckType checkType) {
return username.equals("CONSUMER") || username.equals("PRODUCER") || username.equals("BOTH");
}
};
final ObjectNameBuilder node0NameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "Node0", true);
final ObjectNameBuilder node1NameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "Node1", true);
/*
use case is dispatch from memory, with non-blocking producers
producers add to the head of the broker chain, consumers receive from the tail
when head == tail we are back to one broker for that address, the end of the chain
*/
private void prepareNodesAndStartCombinedHeadTail() throws Exception {
AddressSettings blockingQueue = new AddressSettings();
blockingQueue
.setMaxSizeBytes(100 * 1024)
.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL)
.setSlowConsumerPolicy(SlowConsumerPolicy.KILL).setSlowConsumerThreshold(0).setSlowConsumerCheckPeriod(1)
.setAutoDeleteQueues(false).setAutoDeleteAddresses(false); // so slow consumer can kick in!
Configuration baseConfig = new ConfigurationImpl();
baseConfig.getAddressesSettings().put(qName, blockingQueue);
BrokerBalancerConfiguration balancerConfiguration = new BrokerBalancerConfiguration();
balancerConfiguration.setName(balancerConfigName).setTargetKey(TargetKey.ROLE_NAME).setTargetKeyFilter("(?<=^EQ_).*"); // strip EQ_ prefix
baseConfig.addBalancerConfiguration(balancerConfiguration);
// prepare two nodes
for (int nodeId = 0; nodeId < 2; nodeId++) {
Configuration configuration = baseConfig.copy();
configuration.setName("Node" + nodeId);
configuration.setBrokerInstance(new File(getTestDirfile(), configuration.getName()));
configuration.addAcceptorConfiguration("tcp", "tcp://localhost:" + (base_port + (nodeId)) + "?redirect-to=" + balancerConfigName + ";amqpCredits=1000;amqpMinCredits=300");
nodes.add(new EmbeddedActiveMQ().setConfiguration(configuration));
nodes.get(nodeId).setSecurityManager(customSecurityManager);
nodes.get(nodeId).setMbeanServer(mBeanServer);
}
// node0 initially handles both producer & consumer (head & tail)
nodes.get(0).getConfiguration().getBalancerConfigurations().get(0).setLocalTargetFilter("PRODUCER|CONSUMER");
nodes.get(0).start();
}
@Test (timeout = 60000)
public void testScale0_1() throws Exception {
prepareNodesAndStartCombinedHeadTail();
// slow consumer, delay on each message received
EQConsumer eqConsumer = new EQConsumer(urlForNodes(nodes));
executorService.submit(eqConsumer);
// verify consumer reconnects on no messages
assertTrue(Wait.waitFor(() -> eqConsumer.connectionCount.get() > 1, 5000, 200));
EQProducer eqProducer = new EQProducer(urlForNodes(nodes));
executorService.submit(eqProducer);
// verify producer reconnects on fail full!
assertTrue(Wait.waitFor(() -> eqProducer.connectionCount.get() > 1, 10000, 200));
// operator mode, poll queue control - to allow producer to continue, activate next broker in the 'chain'
AddressControl addressControl0 = (AddressControl) ManagementControlHelper.createProxy(node0NameBuilder.getAddressObjectName(qNameSimple), AddressControl.class, mBeanServer);
assertTrue(Wait.waitFor(() -> {
int usage = addressControl0.getAddressLimitPercent();
System.out.println("Want 100% on Head&Tail, usage % " + usage);
return usage == 100;
},10000, 500));
// stop producer on Node0, only accept consumers
BrokerBalancerControl balancerControl0 = (BrokerBalancerControl) ManagementControlHelper.createProxy(node0NameBuilder.getBrokerBalancerObjectName(balancerConfigName), BrokerBalancerControl.class, mBeanServer);
balancerControl0.setLocalTargetFilter("CONSUMER");
// start node1 exclusively for Producer
nodes.get(1).getConfiguration().getBalancerConfigurations().get(0).setLocalTargetFilter("PRODUCER");
nodes.get(1).start();
// auto created address when producer connects
AddressControl addressControl1 = (AddressControl) ManagementControlHelper.createProxy(node1NameBuilder.getAddressObjectName(qNameSimple), AddressControl.class, mBeanServer);
assertTrue("Producer is on Head, Node1", Wait.waitFor(() -> {
try {
int usage = addressControl1.getAddressLimitPercent();
System.out.println("Node1 (head) usage % " + usage);
return usage > 10;
} catch (javax.management.InstanceNotFoundException notYetReadyExpected) {
}
return false;
},5000, 200));
// wait for Node0 to drain
eqConsumer.delayMillis.set(0); // fast
assertTrue(Wait.waitFor(() -> {
int usage = addressControl0.getAddressLimitPercent();
System.out.println("Want 0, Node0 (tail) usage % " + usage);
return usage == 0;
},20000, 500));
balancerControl0.setLocalTargetFilter(""); // Node0 is out of service, Node1 (new head&tail) is where it is all at going forward!
BrokerBalancerControl balancerControl1 = (BrokerBalancerControl) ManagementControlHelper.createProxy(node1NameBuilder.getBrokerBalancerObjectName(balancerConfigName), BrokerBalancerControl.class, mBeanServer);
balancerControl1.setLocalTargetFilter("CONSUMER|PRODUCER"); // Node1 is serving (head & tail)
// back to one element in the chain
nodes.get(0).stop();
eqConsumer.delayMillis.set(500); // slow
assertTrue("New head&tail, Node1 full", Wait.waitFor(() -> {
int usage = addressControl1.getAddressLimitPercent();
System.out.println("Node1 usage % " + usage);
return usage == 100;
},10000, 200));
// stop the producer
eqProducer.done.set(true);
eqConsumer.delayMillis.set(0); // fast again
// wait for Node1 to drain
assertTrue(Wait.waitFor(() -> {
int usage = addressControl1.getAddressLimitPercent();
System.out.println("Want 0, on producer complete, Node1 usage % " + usage);
return usage == 0;
}, 10000, 200));
assertTrue("Got all produced", Wait.waitFor(() -> {
System.out.println("consumed pid: " + eqConsumer.getLastConsumed() + ", produced: " + eqProducer.getLastProduced());
return (eqProducer.getLastProduced() == eqConsumer.getLastConsumed());
}, 4000, 100));
eqConsumer.done.set(true);
nodes.get(1).stop();
}
// Q: what happens for a producer/consumer connection?
// A: we can limit it to a PRODUCER role, and it can only send, with addressControl.pause() the consumer
// will get nothing to avoid out of order messages, b/c it is connected to the head broker, not the tail!
// Some pure CONSUMER role needs to drain the tail in this case.
@Test (timeout = 60000)
public void testScale0_1_CombinedProducerConsumerConnectionWithProducerRole() throws Exception {
prepareNodesAndStartCombinedHeadTail();
EQProducerAsyncConsumer eqProducerConsumer = new EQProducerAsyncConsumer(urlForNodes(nodes), "PRODUCER");
executorService.submit(eqProducerConsumer);
AddressControl addressControl0 = (AddressControl) ManagementControlHelper.createProxy(node0NameBuilder.getAddressObjectName(qNameSimple), AddressControl.class, mBeanServer);
assertTrue(Wait.waitFor(() -> {
try {
int usage = addressControl0.getAddressLimitPercent();
System.out.println("Head&Tail usage % " + usage);
return usage == 100;
} catch (javax.management.InstanceNotFoundException notYetReadyExpected) {
}
return false;
},10000, 200));
assertTrue("producer got full error and reconnected", Wait.waitFor(() -> eqProducerConsumer.connectionCount.get() > 2));
long lastProducedToHeadTail = eqProducerConsumer.getLastProduced();
// stop producer on Node0, only accept consumers. make it a tail broker
BrokerBalancerControl balancerControl0 = (BrokerBalancerControl) ManagementControlHelper.createProxy(node0NameBuilder.getBrokerBalancerObjectName(balancerConfigName), BrokerBalancerControl.class, mBeanServer);
balancerControl0.setLocalTargetFilter("CONSUMER");
// start new head exclusively for Producer
nodes.get(1).getConfiguration().getBalancerConfigurations().get(0).setLocalTargetFilter("PRODUCER");
nodes.get(1).start();
// ensure nothing can be consumed from the head
AddressControl addressControl1 = (AddressControl) ManagementControlHelper.createProxy(node1NameBuilder.getAddressObjectName(qNameSimple), AddressControl.class, mBeanServer);
assertTrue(Wait.waitFor(() -> {
try {
addressControl1.pause();
return true;
} catch (javax.management.InstanceNotFoundException notYetReadyExpected) {
}
return false;
},10000, 200));
// need another connection to drain tail
EQConsumer eqConsumer = new EQConsumer(urlForNodes(nodes), 0);
executorService.submit(eqConsumer);
// wait for tail to drain
assertTrue(Wait.waitFor(() -> {
int usage = addressControl0.getAddressLimitPercent();
System.out.println("Tail usage % " + usage);
return usage == 0;
},10000, 200));
assertTrue(Wait.waitFor(() -> {
System.out.println("drain tail, lastProduced: " + lastProducedToHeadTail + ", consumed: " + eqConsumer.getLastConsumed());
return lastProducedToHeadTail == eqConsumer.getLastConsumed();
},5000, 100));
eqConsumer.done.set(true);
balancerControl0 = (BrokerBalancerControl) ManagementControlHelper.createProxy(node0NameBuilder.getBrokerBalancerObjectName(balancerConfigName), BrokerBalancerControl.class, mBeanServer);
balancerControl0.setLocalTargetFilter(""); // out of service
nodes.get(0).stop();
// resume consumption on new head
addressControl1.resume();
// head should fill
assertTrue(Wait.waitFor(() -> {
int usage = addressControl1.getAddressLimitPercent();
System.out.println("Head&Tail usage % " + usage);
return usage == 100;
},10000, 200));
eqProducerConsumer.producerDone.set(true);
// head should drain
assertTrue(Wait.waitFor(() -> {
int usage = addressControl1.getAddressLimitPercent();
System.out.println("Node1 usage % " + usage);
return usage == 0;
},10000, 200));
assertTrue(Wait.waitFor(() -> {
System.out.println("current head&tail lastProduced: " + eqProducerConsumer.getLastProduced() + ", consumed: " + eqProducerConsumer.getLastConsumed());
return eqProducerConsumer.getLastProduced() == eqProducerConsumer.getLastConsumed();
},5000, 100));
eqProducerConsumer.done.set(true);
nodes.get(1).stop();
}
// If we had a producer block (based on credit) it could also consume but not produce if we allow
// it to have both roles. With both roles, we need to be able to turn off production and best with credit.
// blocking credit takes effect for new links, existing producers will see the FAIL exception.
// Blocked producers make use of jms.sendTimeout to error out.
@Test (timeout = 60000)
public void testScale0_1_CombinedRoleConnection() throws Exception {
prepareNodesAndStartCombinedHeadTail();
EQProducerAsyncConsumer eqProducerConsumer = new EQProducerAsyncConsumer(urlForNodes(nodes), "BOTH");
executorService.submit(eqProducerConsumer);
AddressControl addressControl0 = (AddressControl) ManagementControlHelper.createProxy(node0NameBuilder.getAddressObjectName(qNameSimple), AddressControl.class, mBeanServer);
assertTrue(Wait.waitFor(() -> {
try {
int usage = addressControl0.getAddressLimitPercent();
System.out.println("Head&Tail usage % " + usage);
return usage == 100;
} catch (javax.management.InstanceNotFoundException notYetReadyExpected) {
}
return false;
},20000, 200));
assertTrue("producer got full error and reconnected", Wait.waitFor(() -> eqProducerConsumer.connectionCount.get() > 0));
// stop producer on Node0, only accept consumers. make it a tail broker
BrokerBalancerControl balancerControl0 = (BrokerBalancerControl) ManagementControlHelper.createProxy(node0NameBuilder.getBrokerBalancerObjectName(balancerConfigName), BrokerBalancerControl.class, mBeanServer);
balancerControl0.setTargetKeyFilter("(?<=^EQ_)CONSUMER"); // because both roles present, we need to filter the roles with an exact match, otherwise we get the first one!
// ensure nothing more can be produced
addressControl0.block();
System.out.println("Tail blocked!");
// start new head exclusively for Producer
nodes.get(1).getConfiguration().getBalancerConfigurations().get(0).setTargetKeyFilter("(?<=^EQ_)PRODUCER"); // just accept the producer role as key
nodes.get(1).getConfiguration().getBalancerConfigurations().get(0).setLocalTargetFilter(null); // initially won't accept any till we pause
// new Head needs the address configured, such that we can start the balancer with the address paused
nodes.get(1).getConfiguration().getAddressConfigurations().add(new CoreAddressConfiguration().setName(qName).addRoutingType(RoutingType.ANYCAST).addQueueConfiguration(new QueueConfiguration(qName).setRoutingType(RoutingType.ANYCAST)));
nodes.get(1).start();
// ensure nothing can be consumed from the head
AddressControl addressControl1 = (AddressControl) ManagementControlHelper.createProxy(node1NameBuilder.getAddressObjectName(qNameSimple), AddressControl.class, mBeanServer);
assertTrue(Wait.waitFor(() -> {
try {
addressControl1.pause();
return true;
} catch (javax.management.InstanceNotFoundException notYetReadyExpected) {
}
return false;
}, 5000, 100));
BrokerBalancerControl balancerControl1 = (BrokerBalancerControl) ManagementControlHelper.createProxy(node1NameBuilder.getBrokerBalancerObjectName(balancerConfigName), BrokerBalancerControl.class, mBeanServer);
balancerControl1.setLocalTargetFilter("PRODUCER");
System.out.println("Head enabled for producers... limit: " + addressControl1.getAddressLimitPercent());
// let the consumer run as fast as possible
eqProducerConsumer.consumerSleepMillis.set(0);
// wait for tail to drain, connection should bounce due to the slow consumer strategy and get to consume from the tail
assertTrue(Wait.waitFor(() -> {
int usage = addressControl0.getAddressLimitPercent();
System.out.println("Want 0, tail usage % " + usage);
return usage == 0;
}, 20000, 200));
System.out.println("Tail drained!");
balancerControl0.setLocalTargetFilter(null); // out of service
// resume consumers on the new head&tail
addressControl1.resume();
// slow down consumers again
eqProducerConsumer.consumerSleepMillis.set(2000);
// head should fill
assertTrue(Wait.waitFor(() -> {
int usage = addressControl1.getAddressLimitPercent();
System.out.println("want 100%, head&tail usage % " + usage);
return usage == 100;
}, 20000, 200));
eqProducerConsumer.producerDone.set(true);
// head should drain
assertTrue(Wait.waitFor(() -> {
int usage = addressControl1.getAddressLimitPercent();
System.out.println("Want 0, head&tail usage % " + usage);
return usage == 0;
}, 20000, 200));
assertTrue(Wait.waitFor(() -> {
System.out.println("current head&tail lastProduced: " + eqProducerConsumer.getLastProduced() + ", consumed: " + eqProducerConsumer.getLastConsumed());
return eqProducerConsumer.getLastProduced() == eqProducerConsumer.getLastConsumed();
}, 20000, 200));
eqProducerConsumer.done.set(true);
nodes.get(1).stop();
}
}

View File

@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -70,7 +71,7 @@ public class MessageCounterTest extends ActiveMQTestBase {
session.commit();
session.start();
Assert.assertEquals(100, getMessageCount(server.getPostOffice(), QUEUE.toString()));
Wait.waitFor(() -> 100 == getMessageCount(server.getPostOffice(), QUEUE.toString()), 500, 10);
ClientConsumer consumer = session.createConsumer(QUEUE, null, false);

View File

@ -612,6 +612,41 @@ public class SlowConsumerTest extends ActiveMQTestBase {
Wait.assertEquals(0, queue::getConsumerCount);
}
@Test
public void testKilledOnNoMessagesSoCanBeRebalanced() throws Exception {
AddressSettings addressSettings = new AddressSettings();
addressSettings.setSlowConsumerCheckPeriod(1);
addressSettings.setSlowConsumerThresholdMeasurementUnit(MESSAGES_PER_SECOND);
addressSettings.setSlowConsumerThreshold(0); // if there are no messages pending, kill me
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
server.getAddressSettingsRepository().removeMatch(QUEUE.toString());
server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
final Queue queue = server.locateQueue(QUEUE);
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = addClientSession(sf.createSession(false, true, true, false));
ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
int messages = 1;
for (int i = 0; i < messages; i++) {
producer.send(session.createMessage(true));
}
ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
session.start();
consumer.receive(500).individualAcknowledge();
assertEquals(1, queue.getConsumerCount());
// gets whacked!
Wait.assertEquals(0, queue::getConsumerCount, 2000, 100);
}
/**
* This test creates 3 consumers on one queue. A producer sends
* messages at a rate of 2 messages per second. Each consumer

View File

@ -768,6 +768,7 @@ public class AddressControlTest extends ManagementTestBase {
Assert.assertTrue(addressControl.getAddressSize() > pageLimitNumberOfMessages * payLoadSize );
final long exactSizeValueBeforeRestart = addressControl.getAddressSize();
final int exactPercentBeforeRestart = addressControl.getAddressLimitPercent();
// restart to reload journal
server.stop();
@ -776,6 +777,7 @@ public class AddressControlTest extends ManagementTestBase {
addressControl = createManagementControl(address);
Assert.assertTrue(addressControl.getAddressSize() > pageLimitNumberOfMessages * payLoadSize );
Assert.assertEquals(exactSizeValueBeforeRestart, addressControl.getAddressSize());
Assert.assertEquals(exactPercentBeforeRestart, addressControl.getAddressLimitPercent());
}

View File

@ -91,6 +91,21 @@ public class AddressControlUsingCoreTest extends AddressControlTest {
return (boolean) proxy.retrieveAttributeValue("paging");
}
@Override
public int getAddressLimitPercent() throws Exception {
return (int) proxy.retrieveAttributeValue("addressLimitPercent", Integer.class);
}
@Override
public boolean block() throws Exception {
return (boolean) proxy.invokeOperation("block");
}
@Override
public void unblock() throws Exception {
proxy.invokeOperation("unBlock");
}
@Override
public long getNumberOfBytesPerPage() throws Exception {
return (long) proxy.retrieveAttributeValue("numberOfBytesPerPage");

View File

@ -143,6 +143,31 @@ public class BrokerBalancerControlTest extends BalancingTestBase {
Assert.assertNull(connectorData);
}
@Test
public void testLocalTargetAccessors() throws Exception {
BrokerBalancerControl brokerBalancerControl = getBrokerBalancerControlForLocalTarget();
assertNull(brokerBalancerControl.getLocalTargetFilter());
final String v = "EQ";
brokerBalancerControl.setLocalTargetFilter(v);
assertEquals(v, brokerBalancerControl.getLocalTargetFilter());
brokerBalancerControl.setLocalTargetFilter("");
assertNull(brokerBalancerControl.getLocalTargetFilter());
brokerBalancerControl.setLocalTargetFilter(null);
assertNull(brokerBalancerControl.getLocalTargetFilter());
assertNull(brokerBalancerControl.getTargetKeyFilter());
brokerBalancerControl.setTargetKeyFilter(v);
assertEquals(v, brokerBalancerControl.getTargetKeyFilter());
brokerBalancerControl.setTargetKeyFilter("");
assertNull(brokerBalancerControl.getTargetKeyFilter());
brokerBalancerControl.setTargetKeyFilter(null);
assertNull(brokerBalancerControl.getTargetKeyFilter());
}
@Test
public void testGetLocalTargetAsJSON() throws Exception {
BrokerBalancerControl brokerBalancerControl = getBrokerBalancerControlForLocalTarget();

View File

@ -477,5 +477,18 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
@Override
public void destroy() throws Exception {
}
@Override
public int getAddressLimitPercent() {
return 0;
}
@Override
public void block() {
}
@Override
public void unblock() {
}
}
}

View File

@ -65,6 +65,7 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.jboss.logging.Logger;
import org.junit.After;
@ -852,6 +853,216 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
}
}
@Test
public void testGetAddressLimitPercent() throws Exception {
SequentialFileFactory factory = new FakeSequentialFileFactory();
PagingStoreFactory storeFactory = new FakeStoreFactory(factory);
PagingStoreImpl store = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100,
createMockManager(), createStorageManagerMock(), factory, storeFactory,
PagingStoreImplTest.destinationTestName,
new AddressSettings()
.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK),
getExecutorFactory().getExecutor(), true);
store.start();
try {
assertEquals(0, store.getAddressLimitPercent());
store.addSize(100);
// no limit set
assertEquals(0, store.getAddressLimitPercent());
store.applySetting(new AddressSettings().setMaxSizeBytes(1000));
assertEquals(10, store.getAddressLimitPercent());
store.addSize(900);
assertEquals(100, store.getAddressLimitPercent());
store.addSize(900);
assertEquals(190, store.getAddressLimitPercent());
store.addSize(-900);
assertEquals(100, store.getAddressLimitPercent());
store.addSize(-1);
assertEquals(99, store.getAddressLimitPercent());
} finally {
store.stop();
}
}
@Test
public void testGetAddressLimitPercentGlobalSize() throws Exception {
SequentialFileFactory factory = new FakeSequentialFileFactory();
PagingStoreFactory storeFactory = new FakeStoreFactory(factory);
final AtomicLong limit = new AtomicLong();
AtomicLong globalSize = new AtomicLong();
PagingManager pagingManager = new FakePagingManager() {
@Override
public boolean isUsingGlobalSize() {
return limit.get() > 0;
}
@Override
public FakePagingManager addSize(int s) {
globalSize.addAndGet(s);
return this;
}
@Override
public long getGlobalSize() {
return globalSize.get();
}
@Override
public boolean isGlobalFull() {
return globalSize.get() >= limit.get();
}
@Override
public long getMaxSize() {
return limit.get();
}
};
PagingStoreImpl store = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100,
pagingManager, createStorageManagerMock(), factory, storeFactory,
PagingStoreImplTest.destinationTestName,
new AddressSettings()
.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK),
getExecutorFactory().getExecutor(), true);
store.start();
try {
// no usage yet
assertEquals(0, store.getAddressLimitPercent());
store.addSize(100);
// no global limit set
assertEquals(0, store.getAddressLimitPercent());
// set a global limit
limit.set(1000);
assertEquals(10, store.getAddressLimitPercent());
store.addSize(900);
assertEquals(100, store.getAddressLimitPercent());
store.addSize(900);
assertEquals(190, store.getAddressLimitPercent());
store.addSize(-900);
assertEquals(100, store.getAddressLimitPercent());
store.addSize(-1);
assertEquals(99, store.getAddressLimitPercent());
} finally {
store.stop();
}
}
@Test
public void testBlockUnblock() throws Exception {
SequentialFileFactory factory = new FakeSequentialFileFactory();
PagingStoreFactory storeFactory = new FakeStoreFactory(factory);
PagingStoreImpl store = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100,
createMockManager(), createStorageManagerMock(), factory, storeFactory,
PagingStoreImplTest.destinationTestName,
new AddressSettings()
.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK),
getExecutorFactory().getExecutor(), true);
store.start();
try {
final AtomicInteger calls = new AtomicInteger();
final Runnable trackMemoryChecks = new Runnable() {
@Override
public void run() {
calls.incrementAndGet();
}
};
store.applySetting(new AddressSettings().setMaxSizeBytes(1000).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK));
store.addSize(100);
store.checkMemory(trackMemoryChecks);
assertEquals(1, calls.get());
store.block();
store.checkMemory(trackMemoryChecks);
assertEquals(1, calls.get());
store.unblock();
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return 2 == calls.get();
}
}, 1000, 50));
store.addSize(900);
assertEquals(100, store.getAddressLimitPercent());
// address full blocks
store.checkMemory(trackMemoryChecks);
assertEquals(2, calls.get());
store.block();
// release memory
store.addSize(-900);
assertEquals(10, store.getAddressLimitPercent());
// not yet released memory checks b/c of blocked
assertEquals(2, calls.get());
store.unblock();
// now released
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return 3 == calls.get();
}
}, 1000, 50));
// reverse - unblock while full does not release
store.block();
store.addSize(900);
assertEquals(100, store.getAddressLimitPercent());
store.checkMemory(trackMemoryChecks);
assertEquals("no change", 3, calls.get());
assertEquals("no change to be sure to be sure!", 3, calls.get());
store.unblock();
assertEquals("no change after unblock", 3, calls.get());
store.addSize(-900);
assertEquals(10, store.getAddressLimitPercent());
assertTrue("change", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return 4 == calls.get();
}
}, 1000, 50));
} finally {
store.stop();
}
}
/**
* @return
*/

View File

@ -27,7 +27,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
public final class FakePagingManager implements PagingManager {
public class FakePagingManager implements PagingManager {
@Override
public void addBlockedStore(PagingStore store) {