This closes #1647 ARTEMIS-1365 Advisory consumers listed in Console

This commit is contained in:
Andy Taylor 2017-11-20 08:38:06 +00:00
commit 9a8055bd3f
15 changed files with 424 additions and 60 deletions

View File

@ -26,17 +26,6 @@ import org.apache.activemq.artemis.api.core.RoutingType;
public class PrefixUtil {
public static Pair<SimpleString, RoutingType> getAddressAndRoutingType(SimpleString address,
RoutingType defaultRoutingType,
Map<SimpleString, RoutingType> prefixes) {
for (Map.Entry<SimpleString, RoutingType> entry : prefixes.entrySet()) {
if (address.startsWith(entry.getKey())) {
return new Pair<>(removePrefix(address, entry.getKey()), entry.getValue());
}
}
return new Pair<>(address, defaultRoutingType);
}
public static Pair<SimpleString, Set<RoutingType>> getAddressAndRoutingTypes(SimpleString address,
Set<RoutingType> defaultRoutingTypes,
Map<SimpleString, RoutingType> prefixes) {
@ -59,7 +48,7 @@ public class PrefixUtil {
return address;
}
private static SimpleString removePrefix(SimpleString string, SimpleString prefix) {
public static SimpleString removePrefix(SimpleString string, SimpleString prefix) {
return string.subSeq(prefix.length(), string.length());
}
}

View File

@ -67,6 +67,7 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
import org.apache.activemq.artemis.core.server.TempQueueObserver;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RefsOperation;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
@ -726,15 +727,22 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public void addDestination(DestinationInfo info) throws Exception {
boolean created = false;
ActiveMQDestination dest = info.getDestination();
if (!protocolManager.isSupportAdvisory() && AdvisorySupport.isAdvisoryTopic(dest)) {
return;
}
SimpleString qName = SimpleString.toSimpleString(dest.getPhysicalName());
if (server.locateQueue(qName) == null) {
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(dest.getPhysicalName());
AddressInfo addressInfo = new AddressInfo(qName, dest.isTopic() ? RoutingType.MULTICAST : RoutingType.ANYCAST);
if (AdvisorySupport.isAdvisoryTopic(dest) && protocolManager.isSuppressInternalManagementObjects()) {
addressInfo.setInternal(true);
}
if (dest.isQueue() && (addressSettings.isAutoCreateQueues() || dest.isTemporary())) {
internalSession.createQueue(qName, qName, RoutingType.ANYCAST, null, dest.isTemporary(), !dest.isTemporary(), !dest.isTemporary());
internalSession.createQueue(addressInfo, qName, null, dest.isTemporary(), !dest.isTemporary(), !dest.isTemporary());
created = true;
} else if (dest.isTopic() && (addressSettings.isAutoCreateAddresses() || dest.isTemporary())) {
internalSession.createAddress(qName, RoutingType.MULTICAST, !dest.isTemporary());
internalSession.createAddress(addressInfo, !dest.isTemporary());
created = true;
}
}
@ -783,6 +791,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
List<AMQConsumer> consumersList = amqSession.createConsumer(info, new SlowConsumerDetection());
if (consumersList.size() == 0) {
return;
}
this.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumersList);
ss.addConsumer(info);
@ -878,6 +889,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
return state.getTempDestinations();
}
public boolean isSuppressInternalManagementObjects() {
return protocolManager.isSuppressInternalManagementObjects();
}
public boolean isSuppportAdvisory() {
return protocolManager.isSupportAdvisory();
}
class SlowConsumerDetection implements SlowConsumerDetectionListener {
@Override

View File

@ -119,6 +119,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
private long maxInactivityDurationInitalDelay = 10 * 1000L;
private boolean useKeepAlive = true;
private boolean supportAdvisory = true;
//prevents advisory addresses/queues to be registered
//to management service
private boolean suppressInternalManagementObjects = true;
private final OpenWireMessageConverter internalConverter;
private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
@ -348,6 +353,9 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
Command command,
ConsumerId targetConsumerId,
String originalConnectionId) throws Exception {
if (!this.isSupportAdvisory()) {
return;
}
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
if (originalConnectionId == null) {
@ -583,4 +591,20 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
public OpenWireMessageConverter getInternalConverter() {
return internalConverter;
}
public boolean isSupportAdvisory() {
return supportAdvisory;
}
public void setSupportAdvisory(boolean supportAdvisory) {
this.supportAdvisory = supportAdvisory;
}
public boolean isSuppressInternalManagementObjects() {
return suppressInternalManagementObjects;
}
public void setSuppressInternalManagementObjects(boolean suppressInternalManagementObjects) {
this.suppressInternalManagementObjects = suppressInternalManagementObjects;
}
}

View File

@ -63,11 +63,15 @@ public class AMQConsumer {
private AtomicInteger currentWindow;
private long messagePullSequence = 0;
private MessagePullHandler messagePullHandler;
//internal means we don't expose
//it's address/queue to management service
private boolean internalAddress = false;
public AMQConsumer(AMQSession amqSession,
org.apache.activemq.command.ActiveMQDestination d,
ConsumerInfo info,
ScheduledExecutorService scheduledPool) {
ScheduledExecutorService scheduledPool,
boolean internalAddress) {
this.session = amqSession;
this.openwireDestination = d;
this.info = info;
@ -77,6 +81,7 @@ public class AMQConsumer {
if (prefetchSize == 0) {
messagePullHandler = new MessagePullHandler();
}
this.internalAddress = internalAddress;
}
public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception {
@ -143,7 +148,10 @@ public class AMQConsumer {
AddressInfo addressInfo = session.getCoreServer().getAddressInfo(address);
if (addressInfo != null) {
addressInfo.addRoutingType(RoutingType.MULTICAST);
} else {
addressInfo = new AddressInfo(address, RoutingType.MULTICAST);
}
addressInfo.setInternal(internalAddress);
if (isDurable) {
queueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, clientID, subscriptionName));
QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName);
@ -166,16 +174,15 @@ public class AMQConsumer {
session.getCoreSession().deleteQueue(queueName);
// Create the new one
session.getCoreSession().createQueue(address, queueName, RoutingType.MULTICAST, selector, false, true);
session.getCoreSession().createQueue(addressInfo, queueName, selector, false, true);
}
} else {
session.getCoreSession().createQueue(address, queueName, RoutingType.MULTICAST, selector, false, true);
session.getCoreSession().createQueue(addressInfo, queueName, selector, false, true);
}
} else {
queueName = new SimpleString(UUID.randomUUID().toString());
session.getCoreSession().createQueue(address, queueName, RoutingType.MULTICAST, selector, true, false);
session.getCoreSession().createQueue(addressInfo, queueName, selector, true, false);
}
return queueName;

View File

@ -23,6 +23,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -159,6 +160,13 @@ public class AMQSession implements SessionCallback {
List<AMQConsumer> consumersList = new java.util.LinkedList<>();
for (ActiveMQDestination openWireDest : dests) {
boolean isInternalAddress = false;
if (AdvisorySupport.isAdvisoryTopic(dest)) {
if (!connection.isSuppportAdvisory()) {
continue;
}
isInternalAddress = connection.isSuppressInternalManagementObjects();
}
if (openWireDest.isQueue()) {
SimpleString queueName = new SimpleString(convertWildcard(openWireDest.getPhysicalName()));
@ -166,7 +174,7 @@ public class AMQSession implements SessionCallback {
throw new InvalidDestinationException("Destination doesn't exist: " + queueName);
}
}
AMQConsumer consumer = new AMQConsumer(this, openWireDest, info, scheduledPool);
AMQConsumer consumer = new AMQConsumer(this, openWireDest, info, scheduledPool, isInternalAddress);
long nativeID = consumerIDGenerator.generateID();
consumer.init(slowConsumerDetectionListener, nativeID);

View File

@ -20,6 +20,7 @@ import java.nio.charset.StandardCharsets;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
@ -350,7 +351,7 @@ public abstract class VersionedStompFrameHandler {
if (typeHeader != null) {
routingType = RoutingType.valueOf(typeHeader);
} else {
routingType = connection.getSession().getCoreSession().getAddressAndRoutingType(SimpleString.toSimpleString(destination), null).getB();
routingType = connection.getSession().getCoreSession().getAddressAndRoutingType(new AddressInfo(new SimpleString(destination))).getRoutingType();
}
return routingType;
}

View File

@ -318,6 +318,10 @@ public interface ActiveMQServer extends ServiceComponent {
SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers,
Boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception;
Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter,
SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers,
Boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception;
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue,
boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception;

View File

@ -115,6 +115,12 @@ public interface ServerSession extends SecurityAuth {
boolean temporary,
boolean durable) throws Exception;
Queue createQueue(AddressInfo address,
SimpleString name,
SimpleString filterString,
boolean temporary,
boolean durable) throws Exception;
/**
* Create queue with default delivery mode
*
@ -150,6 +156,13 @@ public interface ServerSession extends SecurityAuth {
boolean durable,
boolean autoCreated) throws Exception;
Queue createQueue(AddressInfo addressInfo,
SimpleString name,
SimpleString filterString,
boolean temporary,
boolean durable,
boolean autoCreated) throws Exception;
AddressInfo createAddress(SimpleString address,
Set<RoutingType> routingTypes,
boolean autoCreated) throws Exception;
@ -158,6 +171,9 @@ public interface ServerSession extends SecurityAuth {
RoutingType routingType,
boolean autoCreated) throws Exception;
AddressInfo createAddress(AddressInfo addressInfo,
boolean autoCreated) throws Exception;
void deleteQueue(SimpleString name) throws Exception;
ServerConsumer createConsumer(long consumerID,
@ -270,13 +286,11 @@ public interface ServerSession extends SecurityAuth {
/**
* Get the canonical (i.e. non-prefixed) address and the corresponding routing-type.
*
* @param address the address to inspect
* @param defaultRoutingType the {@code org.apache.activemq.artemis.api.core.RoutingType} to return if no prefix
* match is found.
* @param addressInfo the address to inspect
* @return a {@code org.apache.activemq.artemis.api.core.Pair} representing the canonical (i.e. non-prefixed) address
* name and the {@code org.apache.activemq.artemis.api.core.RoutingType} corresponding to the that prefix.
*/
Pair<SimpleString, RoutingType> getAddressAndRoutingType(SimpleString address, RoutingType defaultRoutingType);
AddressInfo getAddressAndRoutingType(AddressInfo addressInfo);
/**
* Get the canonical (i.e. non-prefixed) address and the corresponding routing-type.

View File

@ -1674,6 +1674,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
@Deprecated
public Queue createQueue(SimpleString address,
RoutingType routingType,
SimpleString queueName,
@ -1688,6 +1689,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return createQueue(address, routingType, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, autoCreateAddress);
}
@Override
public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception {
return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, autoCreateAddress);
}
@Deprecated
@Override
public Queue createQueue(final SimpleString address,
@ -2666,6 +2672,113 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return postOffice.getAddressInfo(address);
}
public Queue createQueue(final AddressInfo addrInfo,
final SimpleString queueName,
final SimpleString filterString,
final SimpleString user,
final boolean durable,
final boolean temporary,
final boolean ignoreIfExists,
final boolean transientQueue,
final boolean autoCreated,
final int maxConsumers,
final boolean purgeOnNoConsumers,
final boolean autoCreateAddress) throws Exception {
final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName);
if (binding != null) {
if (ignoreIfExists) {
return binding.getQueue();
} else {
throw ActiveMQMessageBundle.BUNDLE.queueAlreadyExists(queueName, binding.getAddress());
}
}
final Filter filter = FilterImpl.createFilter(filterString);
final long txID = storageManager.generateID();
final long queueID = storageManager.generateID();
final QueueConfig.Builder queueConfigBuilder;
final SimpleString addressToUse = addrInfo == null ? queueName : addrInfo.getName();
queueConfigBuilder = QueueConfig.builderWith(queueID, queueName, addressToUse);
AddressInfo info = postOffice.getAddressInfo(addressToUse);
RoutingType routingType = addrInfo == null ? null : addrInfo.getRoutingType();
RoutingType rt = (routingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : routingType);
if (autoCreateAddress) {
if (info == null) {
final AddressInfo addressInfo = new AddressInfo(addressToUse, rt);
addressInfo.setAutoCreated(true);
addressInfo.setInternal(addrInfo == null ? false : addrInfo.isInternal());
addAddressInfo(addressInfo);
} else if (!info.getRoutingTypes().contains(rt)) {
Set<RoutingType> routingTypes = new HashSet<>();
routingTypes.addAll(info.getRoutingTypes());
routingTypes.add(rt);
updateAddressInfo(info.getName(), routingTypes);
}
} else if (info == null) {
throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(addressToUse);
} else if (!info.getRoutingTypes().contains(rt)) {
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(rt, info.getName().toString(), info.getRoutingTypes());
}
final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).routingType(addrInfo.getRoutingType()).maxConsumers(maxConsumers).purgeOnNoConsumers(purgeOnNoConsumers).build();
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateQueue(queueConfig) : null);
final Queue queue = queueFactory.createQueueWith(queueConfig);
if (transientQueue) {
queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName()));
} else {
queue.setConsumersRefCount(new QueueManagerImpl(this, queue.getName()));
}
final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());
if (queue.isDurable()) {
storageManager.addQueueBinding(txID, localQueueBinding);
}
try {
postOffice.addBinding(localQueueBinding);
if (queue.isDurable()) {
storageManager.commitBindings(txID);
}
} catch (Exception e) {
try {
if (durable) {
storageManager.rollbackBindings(txID);
}
final PageSubscription pageSubscription = queue.getPageSubscription();
try {
queue.close();
} finally {
if (pageSubscription != null) {
pageSubscription.destroy();
}
}
} catch (Throwable ignored) {
logger.debug(ignored.getMessage(), ignored);
}
throw e;
}
if (addrInfo == null || !addrInfo.isInternal()) {
managementService.registerQueue(queue, queue.getAddress(), storageManager);
}
callPostQueueCreationCallbacks(queue.getName());
callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.afterCreateQueue(queue) : null);
return queue;
}
@Override
public Queue createQueue(final SimpleString address,
final RoutingType routingType,

View File

@ -16,17 +16,16 @@
*/
package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.utils.PrefixUtil;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.RoutingType;
public class AddressInfo {
//from openwire
public static final SimpleString ADVISORY_TOPIC = new SimpleString("ActiveMQ.Advisory.");
private long id;
private final SimpleString name;
@ -35,6 +34,8 @@ public class AddressInfo {
private Set<RoutingType> routingTypes;
private boolean internal = false;
public AddressInfo(SimpleString name) {
this(name, new HashSet<>());
}
@ -130,6 +131,27 @@ public class AddressInfo {
}
public boolean isInternal() {
return this.name.startsWith(ADVISORY_TOPIC);
return this.internal;
}
public void setInternal(boolean internal) {
this.internal = internal;
}
public AddressInfo create(SimpleString name, RoutingType routingType) {
AddressInfo info = new AddressInfo(name, routingType);
info.setInternal(this.internal);
return info;
}
public AddressInfo getAddressAndRoutingType(Map<SimpleString, RoutingType> prefixes) {
for (Map.Entry<SimpleString, RoutingType> entry : prefixes.entrySet()) {
if (this.getName().startsWith(entry.getKey())) {
AddressInfo newAddressInfo = this.create(PrefixUtil.removePrefix(this.getName(), entry.getKey()), entry.getValue());
return newAddressInfo;
}
}
return this;
}
}

View File

@ -548,9 +548,13 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
@Override
public Queue createQueue(final SimpleString address,
public Queue createQueue(AddressInfo addressInfo, SimpleString name, SimpleString filterString, boolean temporary, boolean durable) throws Exception {
AddressSettings as = server.getAddressSettingsRepository().getMatch(addressInfo.getName().toString());
return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), false);
}
public Queue createQueue(final AddressInfo addressInfo,
final SimpleString name,
final RoutingType routingType,
final SimpleString filterString,
final boolean temporary,
final boolean durable,
@ -559,18 +563,18 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
final boolean autoCreated) throws Exception {
final SimpleString unPrefixedName = removePrefix(name);
Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(address, routingType);
AddressInfo art = getAddressAndRoutingType(addressInfo);
if (durable) {
// make sure the user has privileges to create this queue
securityCheck(address, name, CheckType.CREATE_DURABLE_QUEUE, this);
securityCheck(addressInfo.getName(), name, CheckType.CREATE_DURABLE_QUEUE, this);
} else {
securityCheck(address, name, CheckType.CREATE_NON_DURABLE_QUEUE, this);
securityCheck(addressInfo.getName(), name, CheckType.CREATE_NON_DURABLE_QUEUE, this);
}
server.checkQueueCreationLimit(getUsername());
Queue queue = server.createQueue(art.getA(), art.getB(), unPrefixedName, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, purgeOnNoConsumers, server.getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateAddresses());
Queue queue = server.createQueue(art, unPrefixedName, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, purgeOnNoConsumers, server.getAddressSettingsRepository().getMatch(art.getName().toString()).isAutoCreateAddresses());
if (temporary) {
// Temporary queue in core simply means the queue will be deleted if
@ -591,13 +595,25 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
if (logger.isDebugEnabled()) {
logger.debug("Queue " + unPrefixedName + " created on address " + address +
" with filter=" + filterString + " temporary = " +
temporary + " durable=" + durable + " on session user=" + this.username + ", connection=" + this.remotingConnection);
logger.debug("Queue " + unPrefixedName + " created on address " + addressInfo.getName() +
" with filter=" + filterString + " temporary = " +
temporary + " durable=" + durable + " on session user=" + this.username + ", connection=" + this.remotingConnection);
}
return queue;
}
@Override
public Queue createQueue(final SimpleString address,
final SimpleString name,
final RoutingType routingType,
final SimpleString filterString,
final boolean temporary,
final boolean durable,
final int maxConsumers,
final boolean purgeOnNoConsumers,
final boolean autoCreated) throws Exception {
return createQueue(new AddressInfo(address, routingType), name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers, autoCreated);
}
@Override
@ -612,6 +628,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
return createQueue(address, name, routingType, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), autoCreated);
}
@Override
public Queue createQueue(AddressInfo addressInfo, SimpleString name, SimpleString filterString, boolean temporary, boolean durable, boolean autoCreated) throws Exception {
AddressSettings as = server.getAddressSettingsRepository().getMatch(addressInfo.getName().toString());
return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), autoCreated);
}
@Override
public AddressInfo createAddress(final SimpleString address,
Set<RoutingType> routingTypes,
@ -626,10 +648,15 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
public AddressInfo createAddress(final SimpleString address,
RoutingType routingType,
final boolean autoCreated) throws Exception {
Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(address, routingType);
securityCheck(art.getA(), CheckType.CREATE_ADDRESS, this);
server.addOrUpdateAddressInfo(new AddressInfo(art.getA(), art.getB()).setAutoCreated(autoCreated));
return server.getAddressInfo(art.getA());
return createAddress(new AddressInfo(address, routingType), autoCreated);
}
@Override
public AddressInfo createAddress(AddressInfo addressInfo, boolean autoCreated) throws Exception {
AddressInfo art = getAddressAndRoutingType(addressInfo);
securityCheck(art.getName(), CheckType.CREATE_ADDRESS, this);
server.addOrUpdateAddressInfo(art.setAutoCreated(autoCreated));
return server.getAddressInfo(art.getName());
}
@Override
@ -1672,12 +1699,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
} */
Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(msg.getAddressSimpleString(), routingType);
AddressInfo art = getAddressAndRoutingType(new AddressInfo(msg.getAddressSimpleString(), routingType));
// Consumer
// check the user has write access to this address.
try {
securityCheck(art.getA(), CheckType.SEND, this);
securityCheck(art.getName(), CheckType.SEND, this);
} catch (ActiveMQException e) {
if (!autoCommitSends && tx != null) {
tx.markAsRollbackOnly(e);
@ -1695,8 +1722,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
try {
routingContext.setAddress(art.getA());
routingContext.setRoutingType(art.getB());
routingContext.setAddress(art.getName());
routingContext.setRoutingType(art.getRoutingType());
result = postOffice.route(msg, routingContext, direct);
@ -1738,12 +1765,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
@Override
public Pair<SimpleString, RoutingType> getAddressAndRoutingType(SimpleString address,
RoutingType defaultRoutingType) {
public AddressInfo getAddressAndRoutingType(AddressInfo addressInfo) {
if (prefixEnabled) {
return PrefixUtil.getAddressAndRoutingType(address, defaultRoutingType, prefixes);
return addressInfo.getAddressAndRoutingType(prefixes);
}
return new Pair<>(address, defaultRoutingType);
return addressInfo;
}
@Override

View File

@ -233,17 +233,25 @@ public class ManagementServiceImpl implements ManagementService {
unregisterFromJMX(objectName);
unregisterFromRegistry(ResourceNames.ADDRESS + address);
}
@Override
public synchronized void registerQueue(final Queue queue,
final SimpleString address,
final AddressInfo addressInfo,
final StorageManager storageManager) throws Exception {
QueueControlImpl queueControl = new QueueControlImpl(queue, address.toString(), postOffice, storageManager, securityStore, addressSettingsRepository);
if (addressInfo.isInternal()) {
if (logger.isDebugEnabled()) {
logger.debug("won't register internal queue: " + queue);
}
return;
}
QueueControlImpl queueControl = new QueueControlImpl(queue, addressInfo.getName().toString(), postOffice, storageManager, securityStore, addressSettingsRepository);
if (messageCounterManager != null) {
MessageCounter counter = new MessageCounter(queue.getName().toString(), null, queue, false, queue.isDurable(), messageCounterManager.getMaxDayCount());
queueControl.setMessageCounter(counter);
messageCounterManager.registerMessageCounter(queue.getName().toString(), counter);
}
ObjectName objectName = objectNameBuilder.getQueueObjectName(address, queue.getName(), queue.getRoutingType());
ObjectName objectName = objectNameBuilder.getQueueObjectName(addressInfo.getName(), queue.getName(), queue.getRoutingType());
registerInJMX(objectName, queueControl);
registerInRegistry(ResourceNames.QUEUE + queue.getName(), queueControl);
@ -251,6 +259,12 @@ public class ManagementServiceImpl implements ManagementService {
logger.debug("registered queue " + objectName);
}
}
@Override
public synchronized void registerQueue(final Queue queue,
final SimpleString address,
final StorageManager storageManager) throws Exception {
registerQueue(queue, new AddressInfo(address), storageManager);
}
@Override
public synchronized void unregisterQueue(final SimpleString name, final SimpleString address, RoutingType routingType) throws Exception {

View File

@ -23,6 +23,7 @@ import java.util.Set;
/**
* allows objects to be mapped against a regex pattern and held in order in a list
*/
//tmp comment: Can we use AddressInfo as the 'match' key?
public interface HierarchicalRepository<T> {
void disableListeners();

View File

@ -178,6 +178,33 @@ maxInactivityDurationInitalDelay. The shortest duration is taken for the connect
More details please see [ActiveMQ InactivityMonitor](http://activemq.apache.org/activemq-inactivitymonitor.html).
### Disable/Enable Advisories
By default, advisory topics ([ActiveMQ Advisory](http://activemq.apache.org/advisory-message.html))
are created in order to send certain type of advisory messages to listening clients. As a result,
advisory addresses and queues will be displayed on the management console, along with user deployed
addresses and queues. This sometimes cause confusion because the advisory objects are internally
managed without user being aware of them. In addition, users may not want the advisory topics at all
(they cause extra resources and performance penalty) and it is convenient to disable them at all
from the broker side.
The protocol provides two parameters to control advisory behaviors on the broker side.
* supportAdvisory
Whether or not the broker supports advisory messages. If the value is true, advisory addresses/
queues will be created. If the value is false, no advisory addresses/queues are created. Default
value is true.
* suppressInternalManagementObjects
Whether or not the advisory addresses/queues, if any, will be registered to management service
(e.g. JMX registry). If set to true, no advisory addresses/queues will be registered. If set to
false, those are registered and will be displayed on the management console. Default value is
true.
The two parameters are configured on openwire acceptors, via URLs or API. For example:
<acceptor name="artemis">tcp://127.0.0.1:61616?protocols=CORE,AMQP,OPENWIRE;supportAdvisory=true;suppressInternalManagementObjects=false</acceptor>
## MQTT
MQTT is a light weight, client to server, publish / subscribe messaging protocol. MQTT has been specifically

View File

@ -17,24 +17,35 @@
package org.apache.activemq.artemis.tests.integration.openwire.management;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.advisory.ConsumerEventSource;
import org.apache.activemq.advisory.ProducerEventSource;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.jms.client.ActiveMQSession;
import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
import org.apache.activemq.artemis.tests.integration.openwire.OpenWireTestBase;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
@RunWith(Parameterized.class)
public class OpenWireManagementTest extends OpenWireTestBase {
private ActiveMQServerControl serverControl;
@ -44,6 +55,27 @@ public class OpenWireManagementTest extends OpenWireTestBase {
private ConnectionFactory factory;
@Parameterized.Parameters(name = "useDefault={0},supportAdvisory={1},suppressJmx={2}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[][] {
{true, false, false},
{false, true, false},
{false, true, true},
{false, false, false},
{false, false, true}
});
}
private boolean useDefault;
private boolean supportAdvisory;
private boolean suppressJmx;
public OpenWireManagementTest(boolean useDefault, boolean supportAdvisory, boolean suppressJmx) {
this.useDefault = useDefault;
this.supportAdvisory = supportAdvisory;
this.suppressJmx = suppressJmx;
}
@Before
@Override
public void setUp() throws Exception {
@ -55,6 +87,19 @@ public class OpenWireManagementTest extends OpenWireTestBase {
@Override
protected void extraServerConfig(Configuration serverConfig) {
serverConfig.setJMXManagementEnabled(true);
if (useDefault) {
//don't set parameters explicitly
return;
}
Set<TransportConfiguration> acceptorConfigs = serverConfig.getAcceptorConfigurations();
for (TransportConfiguration tconfig : acceptorConfigs) {
if ("netty".equals(tconfig.getName())) {
Map<String, Object> params = tconfig.getExtraParams();
params.put("supportAdvisory", supportAdvisory);
params.put("suppressInternalManagementObjects", suppressJmx);
System.out.println("Now use properties: " + params);
}
}
}
@Test
@ -67,7 +112,7 @@ public class OpenWireManagementTest extends OpenWireTestBase {
String[] addresses = serverControl.getAddressNames();
assertEquals(3, addresses.length);
for (String addr : addresses) {
assertFalse(addr.startsWith(AddressInfo.ADVISORY_TOPIC.toString()));
assertFalse(addr.startsWith(AdvisorySupport.ADVISORY_TOPIC_PREFIX));
}
try (Connection connection = factory.createConnection()) {
@ -88,11 +133,61 @@ public class OpenWireManagementTest extends OpenWireTestBase {
//after that point several advisory addresses are created.
//make sure they are not accessible via management api.
addresses = serverControl.getAddressNames();
boolean hasInternalAddress = false;
for (String addr : addresses) {
assertFalse(addr.startsWith(AddressInfo.ADVISORY_TOPIC.toString()));
hasInternalAddress = addr.startsWith(AdvisorySupport.ADVISORY_TOPIC_PREFIX);
if (hasInternalAddress) {
break;
}
}
assertEquals(!useDefault && supportAdvisory && !suppressJmx, hasInternalAddress);
consumerEventSource.stop();
producerEventSource.stop();
}
}
@Test
public void testHiddenInternalQueue() throws Exception {
server.createQueue(queueName1, RoutingType.ANYCAST, queueName1, null, true, false, -1, false, true);
String[] queues = serverControl.getQueueNames();
assertEquals(1, queues.length);
for (String queue : queues) {
assertFalse(checkQueueFromInternalAddress(queue));
}
try (Connection connection = factory.createConnection()) {
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(queueName1.toString());
//this causes advisory queues to be created
session.createProducer(destination);
queues = serverControl.getQueueNames();
boolean hasInternal = false;
String targetQueue = null;
for (String queue : queues) {
hasInternal = checkQueueFromInternalAddress(queue);
if (hasInternal) {
targetQueue = queue;
break;
}
}
assertEquals("targetQueue: " + targetQueue, !useDefault && supportAdvisory && !suppressJmx, hasInternal);
}
}
private boolean checkQueueFromInternalAddress(String queue) throws JMSException, ActiveMQException {
try (Connection coreConn = coreCf.createConnection()) {
ActiveMQSession session = (ActiveMQSession) coreConn.createSession();
ClientSession coreSession = session.getCoreSession();
ClientSession.QueueQuery query = coreSession.queueQuery(new SimpleString(queue));
assertTrue("Queue doesn't exist: " + queue, query.isExists());
SimpleString qAddr = query.getAddress();
return qAddr.toString().startsWith(AdvisorySupport.ADVISORY_TOPIC_PREFIX);
}
}
}