ARTEMIS-876 Deprecate JMSServerManager and update JMS bootstrap
This commit is contained in:
parent
683ae68989
commit
b742a357c5
|
@ -18,14 +18,20 @@ package org.apache.activemq.artemis.integration;
|
|||
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
|
||||
import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.dto.ServerDTO;
|
||||
import org.apache.activemq.artemis.integration.bootstrap.ActiveMQBootstrapLogger;
|
||||
import org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration;
|
||||
import org.apache.activemq.artemis.jms.server.config.TopicConfiguration;
|
||||
import org.apache.activemq.artemis.jms.server.config.impl.FileJMSConfiguration;
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
|
||||
|
||||
|
@ -52,12 +58,48 @@ public class FileBroker implements Broker {
|
|||
|
||||
//todo if we start to pullout more configs from the main config then we should pull out the configuration objects from factories if available
|
||||
FileConfiguration configuration = new FileConfiguration();
|
||||
|
||||
// Keep this as we still want to parse destinations in the <jms> element
|
||||
FileJMSConfiguration jmsConfiguration = new FileJMSConfiguration();
|
||||
|
||||
FileDeploymentManager fileDeploymentManager = new FileDeploymentManager(configurationUrl);
|
||||
fileDeploymentManager.addDeployable(configuration).addDeployable(jmsConfiguration);
|
||||
fileDeploymentManager.readConfiguration();
|
||||
|
||||
/**
|
||||
* This is a bit of a hack for backwards config compatibility since we no longer want to start the broker
|
||||
* using the JMSServerManager which would normally deploy JMS destinations. Here we take the JMS destination
|
||||
* configurations from the parsed JMS configuration and add them to the core configuration.
|
||||
*
|
||||
* It's also important here that we are adding them to the core ADDRESS configurations as those will be
|
||||
* deployed first and therefore their configuration will take precedence over other legacy queue configurations
|
||||
* which are deployed later. This is so we can maintain support for configurations like those found in the
|
||||
* bridge and divert examples where there are JMS and core queues with the same name (which was itself a bit
|
||||
* of a hack).
|
||||
*
|
||||
* This should be removed when support for the old "jms" configuation element is also removed.
|
||||
*/
|
||||
{
|
||||
for (JMSQueueConfiguration jmsQueueConfig : jmsConfiguration.getQueueConfigurations()) {
|
||||
List<CoreAddressConfiguration> coreAddressConfigurations = configuration.getAddressConfigurations();
|
||||
coreAddressConfigurations.add(new CoreAddressConfiguration()
|
||||
.setName(jmsQueueConfig.getName())
|
||||
.addRoutingType(RoutingType.ANYCAST)
|
||||
.addQueueConfiguration(new CoreQueueConfiguration()
|
||||
.setAddress(jmsQueueConfig.getName())
|
||||
.setName(jmsQueueConfig.getName())
|
||||
.setFilterString(jmsQueueConfig.getSelector())
|
||||
.setRoutingType(RoutingType.ANYCAST)));
|
||||
}
|
||||
|
||||
for (TopicConfiguration topicConfig : jmsConfiguration.getTopicConfigurations()) {
|
||||
List<CoreAddressConfiguration> coreAddressConfigurations = configuration.getAddressConfigurations();
|
||||
coreAddressConfigurations.add(new CoreAddressConfiguration()
|
||||
.setName(topicConfig.getName())
|
||||
.addRoutingType(RoutingType.MULTICAST));
|
||||
}
|
||||
}
|
||||
|
||||
components = fileDeploymentManager.buildService(securityManager, ManagementFactory.getPlatformMBeanServer());
|
||||
|
||||
ArrayList<ActiveMQComponent> componentsByStartOrder = getComponentsByStartOrder(components);
|
||||
|
|
|
@ -360,6 +360,9 @@ public final class ActiveMQDefaultConfiguration {
|
|||
// whether this is an exclusive divert
|
||||
private static boolean DEFAULT_DIVERT_EXCLUSIVE = false;
|
||||
|
||||
// how the divert should handle the message's routing type
|
||||
private static String DEFAULT_DIVERT_ROUTING_TYPE = RoutingType.STRIP.toString();
|
||||
|
||||
// If true then the server will request a backup on another node
|
||||
private static boolean DEFAULT_HAPOLICY_REQUEST_BACKUP = false;
|
||||
|
||||
|
@ -1018,6 +1021,13 @@ public final class ActiveMQDefaultConfiguration {
|
|||
return DEFAULT_DIVERT_EXCLUSIVE;
|
||||
}
|
||||
|
||||
/**
|
||||
* how the divert should handle the message's routing type
|
||||
*/
|
||||
public static String getDefaultDivertRoutingType() {
|
||||
return DEFAULT_DIVERT_ROUTING_TYPE;
|
||||
}
|
||||
|
||||
/**
|
||||
* If true then the server will request a backup on another node
|
||||
*/
|
||||
|
|
|
@ -899,6 +899,16 @@ public interface ActiveMQServerControl {
|
|||
@Parameter(name = "filterString", desc = "Filter of the divert") String filterString,
|
||||
@Parameter(name = "transformerClassName", desc = "Class name of the divert's transformer") String transformerClassName) throws Exception;
|
||||
|
||||
@Operation(desc = "Create a Divert", impact = MBeanOperationInfo.ACTION)
|
||||
void createDivert(@Parameter(name = "name", desc = "Name of the divert") String name,
|
||||
@Parameter(name = "routingName", desc = "Routing name of the divert") String routingName,
|
||||
@Parameter(name = "address", desc = "Address to divert from") String address,
|
||||
@Parameter(name = "forwardingAddress", desc = "Address to divert to") String forwardingAddress,
|
||||
@Parameter(name = "exclusive", desc = "Is the divert exclusive?") boolean exclusive,
|
||||
@Parameter(name = "filterString", desc = "Filter of the divert") String filterString,
|
||||
@Parameter(name = "transformerClassName", desc = "Class name of the divert's transformer") String transformerClassName,
|
||||
@Parameter(name = "routingType", desc = "How should the routing-type on the diverted messages be set?") String routingType) throws Exception;
|
||||
|
||||
@Operation(desc = "Destroy a Divert", impact = MBeanOperationInfo.ACTION)
|
||||
void destroyDivert(@Parameter(name = "name", desc = "Name of the divert") String name) throws Exception;
|
||||
|
||||
|
|
|
@ -65,4 +65,10 @@ public interface DivertControl {
|
|||
*/
|
||||
@Attribute(desc = "name of the org.apache.activemq.artemis.core.server.cluster.Transformer implementation associated with this divert")
|
||||
String getTransformerClassName();
|
||||
|
||||
/**
|
||||
* Returns the routing type used by this divert.
|
||||
*/
|
||||
@Attribute(desc = "routing type used by this divert")
|
||||
String getRoutingType();
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.server;
|
|||
|
||||
public enum RoutingType {
|
||||
|
||||
MULTICAST, ANYCAST;
|
||||
MULTICAST, ANYCAST, STRIP, PASS;
|
||||
|
||||
public byte getType() {
|
||||
switch (this) {
|
||||
|
@ -26,6 +26,10 @@ public enum RoutingType {
|
|||
return 0;
|
||||
case ANYCAST:
|
||||
return 1;
|
||||
case STRIP:
|
||||
return 2;
|
||||
case PASS:
|
||||
return 3;
|
||||
default:
|
||||
return -1;
|
||||
}
|
||||
|
@ -37,6 +41,10 @@ public enum RoutingType {
|
|||
return MULTICAST;
|
||||
case 1:
|
||||
return ANYCAST;
|
||||
case 2:
|
||||
return STRIP;
|
||||
case 3:
|
||||
return PASS;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -424,8 +424,7 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
|
|||
}
|
||||
} catch (ActiveMQQueueExistsException e) {
|
||||
// The queue was created by another client/admin between the query check and send create queue packet
|
||||
}
|
||||
catch (ActiveMQException e) {
|
||||
} catch (ActiveMQException e) {
|
||||
throw JMSExceptionHelper.convertFromActiveMQException(e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -311,8 +311,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
|||
} else {
|
||||
throw new InvalidDestinationException("Destination " + jbd.getName() + " does not exist");
|
||||
}
|
||||
}
|
||||
catch (ActiveMQQueueExistsException e) {
|
||||
} catch (ActiveMQQueueExistsException e) {
|
||||
// Queue was created between our query and create queue request. Ignore.
|
||||
}
|
||||
|
||||
|
@ -655,8 +654,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
|
|||
if (response.isAutoCreateJmsQueues()) {
|
||||
try {
|
||||
session.createQueue(dest.getSimpleAddress(), RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true);
|
||||
}
|
||||
catch (ActiveMQQueueExistsException e) {
|
||||
} catch (ActiveMQQueueExistsException e) {
|
||||
// The queue was created by another client/admin between the query check and send create queue packet
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.activemq.artemis.spi.core.naming.BindingRegistry;
|
|||
/**
|
||||
* The JMS Management interface.
|
||||
*/
|
||||
@Deprecated
|
||||
public interface JMSServerManager extends ActiveMQComponent {
|
||||
|
||||
String getVersion();
|
||||
|
|
|
@ -55,18 +55,16 @@ public class FileJMSConfiguration extends JMSConfigurationImpl implements Deploy
|
|||
|
||||
private static final boolean DEFAULT_QUEUE_DURABILITY = true;
|
||||
|
||||
private boolean parsed = false;
|
||||
|
||||
@Override
|
||||
public void parse(Element config, URL url) throws Exception {
|
||||
parseConfiguration(config);
|
||||
setConfigurationUrl(url);
|
||||
parsed = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isParsed() {
|
||||
return parsed;
|
||||
// always return false here so that the FileDeploymentManager will not invoke buildService()
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -109,6 +109,7 @@ import org.w3c.dom.Element;
|
|||
* If a JMSConfiguration object is used, the JMS resources can not be
|
||||
* redeployed.
|
||||
*/
|
||||
@Deprecated
|
||||
public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback {
|
||||
|
||||
private static final String REJECT_FILTER = ActiveMQServerImpl.GENERIC_IGNORED_FILTER;
|
||||
|
|
|
@ -48,7 +48,7 @@ public class CoreAddressConfiguration implements Serializable {
|
|||
return routingTypes;
|
||||
}
|
||||
|
||||
public CoreAddressConfiguration addDeliveryMode(RoutingType routingType) {
|
||||
public CoreAddressConfiguration addRoutingType(RoutingType routingType) {
|
||||
routingTypes.add(routingType);
|
||||
return this;
|
||||
}
|
||||
|
|
|
@ -118,8 +118,9 @@ public class CoreQueueConfiguration implements Serializable {
|
|||
return routingType;
|
||||
}
|
||||
|
||||
public void setRoutingType(RoutingType routingType) {
|
||||
public CoreQueueConfiguration setRoutingType(RoutingType routingType) {
|
||||
this.routingType = routingType;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.config;
|
|||
import java.io.Serializable;
|
||||
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
|
||||
public class DivertConfiguration implements Serializable {
|
||||
|
@ -39,6 +40,8 @@ public class DivertConfiguration implements Serializable {
|
|||
|
||||
private String transformerClassName = null;
|
||||
|
||||
private RoutingType routingType = RoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultDivertRoutingType());
|
||||
|
||||
public DivertConfiguration() {
|
||||
}
|
||||
|
||||
|
@ -70,6 +73,10 @@ public class DivertConfiguration implements Serializable {
|
|||
return transformerClassName;
|
||||
}
|
||||
|
||||
public RoutingType getRoutingType() {
|
||||
return routingType;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param name the name to set
|
||||
*/
|
||||
|
@ -130,6 +137,14 @@ public class DivertConfiguration implements Serializable {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param routingType the routingType to set
|
||||
*/
|
||||
public DivertConfiguration setRoutingType(final RoutingType routingType) {
|
||||
this.routingType = routingType;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
|
@ -141,6 +156,7 @@ public class DivertConfiguration implements Serializable {
|
|||
result = prime * result + ((name == null) ? 0 : name.hashCode());
|
||||
result = prime * result + ((routingName == null) ? 0 : routingName.hashCode());
|
||||
result = prime * result + ((transformerClassName == null) ? 0 : transformerClassName.hashCode());
|
||||
result = prime * result + ((routingType == null) ? 0 : routingType.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -185,6 +201,11 @@ public class DivertConfiguration implements Serializable {
|
|||
return false;
|
||||
} else if (!transformerClassName.equals(other.transformerClassName))
|
||||
return false;
|
||||
if (routingType == null) {
|
||||
if (other.routingType != null)
|
||||
return false;
|
||||
} else if (!routingType.equals(other.routingType))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.config.impl;
|
|||
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||
import org.apache.activemq.artemis.core.server.JournalType;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
|
||||
|
@ -164,6 +165,19 @@ public final class Validators {
|
|||
}
|
||||
};
|
||||
|
||||
public static final Validator ROUTING_TYPE = new Validator() {
|
||||
@Override
|
||||
public void validate(final String name, final Object value) {
|
||||
String val = (String) value;
|
||||
if (val == null || !val.equals(RoutingType.ANYCAST.toString()) &&
|
||||
!val.equals(RoutingType.MULTICAST.toString()) &&
|
||||
!val.equals(RoutingType.PASS.toString()) &&
|
||||
!val.equals(RoutingType.STRIP.toString())) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.invalidRoutingType(val);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
public static final Validator MAX_QUEUE_CONSUMERS = new Validator() {
|
||||
@Override
|
||||
public void validate(String name, Object value) {
|
||||
|
|
|
@ -945,10 +945,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
for (int j = 0; j < children.getLength(); j++) {
|
||||
Node child = children.item(j);
|
||||
if (child.getNodeName().equals("multicast")) {
|
||||
addressConfiguration.addDeliveryMode(RoutingType.MULTICAST);
|
||||
addressConfiguration.addRoutingType(RoutingType.MULTICAST);
|
||||
queueConfigurations.addAll(parseQueueConfigurations((Element) child, RoutingType.MULTICAST));
|
||||
} else if (child.getNodeName().equals("anycast")) {
|
||||
addressConfiguration.addDeliveryMode(RoutingType.ANYCAST);
|
||||
addressConfiguration.addRoutingType(RoutingType.ANYCAST);
|
||||
queueConfigurations.addAll(parseQueueConfigurations((Element) child, RoutingType.ANYCAST));
|
||||
}
|
||||
}
|
||||
|
@ -1570,6 +1570,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
String transformerClassName = getString(e, "transformer-class-name", null, Validators.NO_CHECK);
|
||||
|
||||
RoutingType routingType = RoutingType.valueOf(getString(e, "routing-type", ActiveMQDefaultConfiguration.getDefaultDivertRoutingType(), Validators.ROUTING_TYPE));
|
||||
|
||||
String filterString = null;
|
||||
|
||||
NodeList children = e.getChildNodes();
|
||||
|
@ -1582,7 +1584,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
}
|
||||
}
|
||||
|
||||
DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerClassName(transformerClassName);
|
||||
DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerClassName(transformerClassName).setRoutingType(routingType);
|
||||
|
||||
mainConfig.getDivertConfigurations().add(config);
|
||||
}
|
||||
|
|
|
@ -1895,11 +1895,23 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
|||
final boolean exclusive,
|
||||
final String filterString,
|
||||
final String transformerClassName) throws Exception {
|
||||
createDivert(name, routingName, address, forwardingAddress, exclusive, filterString, transformerClassName, ActiveMQDefaultConfiguration.getDefaultDivertRoutingType());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createDivert(final String name,
|
||||
final String routingName,
|
||||
final String address,
|
||||
final String forwardingAddress,
|
||||
final boolean exclusive,
|
||||
final String filterString,
|
||||
final String transformerClassName,
|
||||
final String routingType) throws Exception {
|
||||
checkStarted();
|
||||
|
||||
clearIO();
|
||||
try {
|
||||
DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerClassName(transformerClassName);
|
||||
DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerClassName(transformerClassName).setRoutingType(RoutingType.valueOf(routingType));
|
||||
server.deployDivert(config);
|
||||
} finally {
|
||||
blockOnIO();
|
||||
|
|
|
@ -98,6 +98,16 @@ public class DivertControlImpl extends AbstractControl implements DivertControl
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRoutingType() {
|
||||
clearIO();
|
||||
try {
|
||||
return configuration.getRoutingType().toString();
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getUniqueName() {
|
||||
clearIO();
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
|
|||
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
||||
import org.apache.activemq.artemis.core.postoffice.BindingsFactory;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction;
|
||||
|
@ -190,6 +191,9 @@ public class SimpleAddressManager implements AddressManager {
|
|||
@Override
|
||||
public AddressInfo addOrUpdateAddressInfo(AddressInfo addressInfo) {
|
||||
AddressInfo from = addAddressInfo(addressInfo);
|
||||
if (from != null) {
|
||||
ActiveMQServerLogger.LOGGER.info("Address " + addressInfo.getName() + " exists already as " + from + ", updating instead with: " + addressInfo);
|
||||
}
|
||||
return (from == null) ? addressInfo : updateAddressInfo(from, addressInfo);
|
||||
}
|
||||
|
||||
|
@ -198,6 +202,7 @@ public class SimpleAddressManager implements AddressManager {
|
|||
for (RoutingType routingType : to.getRoutingTypes()) {
|
||||
from.addRoutingType(routingType);
|
||||
}
|
||||
ActiveMQServerLogger.LOGGER.info("Update result: " + from);
|
||||
return from;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -410,4 +410,7 @@ public interface ActiveMQMessageBundle {
|
|||
IllegalArgumentException invalidRoutingTypeForAddress(RoutingType routingType,
|
||||
String address,
|
||||
Set<RoutingType> supportedRoutingTypes);
|
||||
|
||||
@Message(id = 119208, value = "Invalid routing type {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||
IllegalArgumentException invalidRoutingType(String val);
|
||||
}
|
||||
|
|
|
@ -1872,7 +1872,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
|
||||
Filter filter = FilterImpl.createFilter(config.getFilterString());
|
||||
|
||||
Divert divert = new DivertImpl(new SimpleString(config.getForwardingAddress()), sName, new SimpleString(config.getRoutingName()), config.isExclusive(), filter, transformer, postOffice, storageManager);
|
||||
Divert divert = new DivertImpl(new SimpleString(config.getForwardingAddress()), sName, new SimpleString(config.getRoutingName()), config.isExclusive(), filter, transformer, postOffice, storageManager, config.getRoutingType());
|
||||
|
||||
Binding binding = new DivertBinding(storageManager.generateID(), sAddress, divert);
|
||||
|
||||
|
@ -2398,6 +2398,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
AddressInfo result = postOffice.addOrUpdateAddressInfo(addressInfo);
|
||||
|
||||
// TODO: is this the right way to do this?
|
||||
// TODO: deal with possible duplicates, may be adding new records when old ones already exist
|
||||
long txID = storageManager.generateID();
|
||||
storageManager.addAddressBinding(txID, addressInfo);
|
||||
storageManager.commitBindings(txID);
|
||||
|
|
|
@ -16,12 +16,14 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.server.impl;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.filter.Filter;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
||||
import org.apache.activemq.artemis.core.server.Divert;
|
||||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||
import org.apache.activemq.artemis.core.server.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.ServerMessage;
|
||||
import org.apache.activemq.artemis.core.server.cluster.Transformer;
|
||||
import org.jboss.logging.Logger;
|
||||
|
@ -49,6 +51,8 @@ public class DivertImpl implements Divert {
|
|||
|
||||
private final StorageManager storageManager;
|
||||
|
||||
private final RoutingType routingType;
|
||||
|
||||
public DivertImpl(final SimpleString forwardAddress,
|
||||
final SimpleString uniqueName,
|
||||
final SimpleString routingName,
|
||||
|
@ -56,7 +60,8 @@ public class DivertImpl implements Divert {
|
|||
final Filter filter,
|
||||
final Transformer transformer,
|
||||
final PostOffice postOffice,
|
||||
final StorageManager storageManager) {
|
||||
final StorageManager storageManager,
|
||||
final RoutingType routingType) {
|
||||
this.forwardAddress = forwardAddress;
|
||||
|
||||
this.uniqueName = uniqueName;
|
||||
|
@ -72,6 +77,8 @@ public class DivertImpl implements Divert {
|
|||
this.postOffice = postOffice;
|
||||
|
||||
this.storageManager = storageManager;
|
||||
|
||||
this.routingType = routingType;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -97,6 +104,20 @@ public class DivertImpl implements Divert {
|
|||
|
||||
copy.setExpiration(message.getExpiration());
|
||||
|
||||
switch (routingType) {
|
||||
case ANYCAST:
|
||||
copy.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
|
||||
break;
|
||||
case MULTICAST:
|
||||
copy.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
|
||||
break;
|
||||
case STRIP:
|
||||
copy.removeProperty(Message.HDR_ROUTING_TYPE);
|
||||
break;
|
||||
case PASS:
|
||||
break;
|
||||
}
|
||||
|
||||
if (transformer != null) {
|
||||
copy = transformer.transform(copy);
|
||||
}
|
||||
|
|
|
@ -1576,6 +1576,22 @@
|
|||
</xsd:element>
|
||||
|
||||
<xsd:element ref="filter" maxOccurs="1" minOccurs="0"/>
|
||||
|
||||
<xsd:element name="routing-type" default="STRIP" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
how should the routing-type on the diverted messages be set?
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
<xsd:simpleType>
|
||||
<xsd:restriction base="xsd:string">
|
||||
<xsd:enumeration value="ANYCAST"/>
|
||||
<xsd:enumeration value="MULTICAST"/>
|
||||
<xsd:enumeration value="STRIP"/>
|
||||
<xsd:enumeration value="PASS"/>
|
||||
</xsd:restriction>
|
||||
</xsd:simpleType>
|
||||
</xsd:element>
|
||||
</xsd:all>
|
||||
|
||||
<xsd:attribute name="name" type="xsd:ID" use="required">
|
||||
|
|
|
@ -22,11 +22,6 @@ under the License.
|
|||
xmlns="urn:activemq"
|
||||
xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
|
||||
|
||||
<jms xmlns="urn:activemq:jms">
|
||||
<!--the queue used by the example-->
|
||||
<queue name="exampleQueue"/>
|
||||
</jms>
|
||||
|
||||
<core xmlns="urn:activemq:core">
|
||||
|
||||
<bindings-directory>./data/messaging/bindings</bindings-directory>
|
||||
|
|
|
@ -121,6 +121,142 @@ public class DivertTest extends ActiveMQTestBase {
|
|||
Assert.assertNull(consumer2.receiveImmediate());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleNonExclusiveDivertWithRoutingType() throws Exception {
|
||||
final String testAddress = "testAddress";
|
||||
|
||||
final String forwardAddress = "forwardAddress";
|
||||
|
||||
DivertConfiguration divertConf = new DivertConfiguration().setName("divert1").setRoutingName("divert1").setAddress(testAddress).setForwardingAddress(forwardAddress);
|
||||
|
||||
Configuration config = createDefaultInVMConfig().addDivertConfiguration(divertConf);
|
||||
|
||||
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, false));
|
||||
|
||||
server.start();
|
||||
|
||||
ServerLocator locator = createInVMNonHALocator();
|
||||
|
||||
ClientSessionFactory sf = createSessionFactory(locator);
|
||||
|
||||
ClientSession session = sf.createSession(false, true, true);
|
||||
|
||||
final SimpleString queueName1 = new SimpleString("queue1");
|
||||
|
||||
final SimpleString queueName2 = new SimpleString("queue2");
|
||||
|
||||
session.createQueue(new SimpleString(forwardAddress), RoutingType.ANYCAST, queueName1, null, false);
|
||||
|
||||
session.createQueue(new SimpleString(testAddress), RoutingType.MULTICAST, queueName2, null, false);
|
||||
|
||||
session.start();
|
||||
|
||||
ClientProducer producer = session.createProducer(new SimpleString(testAddress));
|
||||
|
||||
ClientConsumer consumer1 = session.createConsumer(queueName1);
|
||||
|
||||
ClientConsumer consumer2 = session.createConsumer(queueName2);
|
||||
|
||||
final int numMessages = 1;
|
||||
|
||||
final SimpleString propKey = new SimpleString("testkey");
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
ClientMessage message = session.createMessage(false);
|
||||
|
||||
message.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
|
||||
|
||||
message.putIntProperty(propKey, i);
|
||||
|
||||
producer.send(message);
|
||||
}
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
ClientMessage message = consumer1.receive(DivertTest.TIMEOUT);
|
||||
|
||||
Assert.assertNotNull(message);
|
||||
|
||||
Assert.assertEquals(i, message.getObjectProperty(propKey));
|
||||
|
||||
message.acknowledge();
|
||||
}
|
||||
|
||||
Assert.assertNull(consumer1.receiveImmediate());
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
ClientMessage message = consumer2.receive(DivertTest.TIMEOUT);
|
||||
|
||||
Assert.assertNotNull(message);
|
||||
|
||||
Assert.assertEquals(i, message.getObjectProperty(propKey));
|
||||
|
||||
message.acknowledge();
|
||||
}
|
||||
|
||||
Assert.assertNull(consumer2.receiveImmediate());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleExclusiveDivertWithRoutingType() throws Exception {
|
||||
final String testAddress = "testAddress";
|
||||
|
||||
final String forwardAddress = "forwardAddress";
|
||||
|
||||
DivertConfiguration divertConf = new DivertConfiguration().setName("divert1").setRoutingName("divert1").setAddress(testAddress).setForwardingAddress(forwardAddress).setExclusive(true);
|
||||
|
||||
Configuration config = createDefaultInVMConfig().addDivertConfiguration(divertConf);
|
||||
|
||||
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, false));
|
||||
|
||||
server.start();
|
||||
|
||||
ServerLocator locator = createInVMNonHALocator();
|
||||
|
||||
ClientSessionFactory sf = createSessionFactory(locator);
|
||||
|
||||
ClientSession session = sf.createSession(false, true, true);
|
||||
|
||||
final SimpleString queueName1 = new SimpleString("queue1");
|
||||
|
||||
final SimpleString queueName2 = new SimpleString("queue2");
|
||||
|
||||
session.createQueue(new SimpleString(forwardAddress), RoutingType.ANYCAST, queueName1, null, false);
|
||||
|
||||
session.createQueue(new SimpleString(testAddress), RoutingType.MULTICAST, queueName2, null, false);
|
||||
|
||||
session.start();
|
||||
|
||||
ClientProducer producer = session.createProducer(new SimpleString(testAddress));
|
||||
|
||||
ClientConsumer consumer1 = session.createConsumer(queueName1);
|
||||
|
||||
final int numMessages = 1;
|
||||
|
||||
final SimpleString propKey = new SimpleString("testkey");
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
ClientMessage message = session.createMessage(false);
|
||||
|
||||
message.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
|
||||
|
||||
message.putIntProperty(propKey, i);
|
||||
|
||||
producer.send(message);
|
||||
}
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
ClientMessage message = consumer1.receive(DivertTest.TIMEOUT);
|
||||
|
||||
Assert.assertNotNull(message);
|
||||
|
||||
Assert.assertEquals(i, message.getObjectProperty(propKey));
|
||||
|
||||
message.acknowledge();
|
||||
}
|
||||
|
||||
Assert.assertNull(consumer1.receiveImmediate());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleDivertWithExpiry() throws Exception {
|
||||
final String testAddress = "testAddress";
|
||||
|
|
|
@ -712,6 +712,18 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
|
|||
proxy.invokeOperation("createDivert", name, routingName, address, forwardingAddress, exclusive, filterString, transformerClassName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createDivert(String name,
|
||||
String routingName,
|
||||
String address,
|
||||
String forwardingAddress,
|
||||
boolean exclusive,
|
||||
String filterString,
|
||||
String transformerClassName,
|
||||
String routingType) throws Exception {
|
||||
proxy.invokeOperation("createDivert", name, routingName, address, forwardingAddress, exclusive, filterString, transformerClassName, routingType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroyDivert(String name) throws Exception {
|
||||
proxy.invokeOperation("destroyDivert", name);
|
||||
|
|
|
@ -61,6 +61,11 @@ public class DivertControlUsingCoreTest extends DivertControlTest {
|
|||
return (String) proxy.retrieveAttributeValue("transformerClassName");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRoutingType() {
|
||||
return (String) proxy.retrieveAttributeValue("routingType");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getUniqueName() {
|
||||
return (String) proxy.retrieveAttributeValue("uniqueName");
|
||||
|
|
Loading…
Reference in New Issue