This commit is contained in:
Clebert Suconic 2020-05-26 20:31:19 -04:00
commit 0c3ced60ff
19 changed files with 386 additions and 40 deletions

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core;
/**
* An operation failed because an address exists on the server.
*/
public final class ActiveMQDivertDoesNotExistException extends ActiveMQException {
public ActiveMQDivertDoesNotExistException() {
super(ActiveMQExceptionType.DIVERT_DOES_NOT_EXIST);
}
public ActiveMQDivertDoesNotExistException(String msg) {
super(ActiveMQExceptionType.DIVERT_DOES_NOT_EXIST, msg);
}
}

View File

@ -261,6 +261,12 @@ public enum ActiveMQExceptionType {
public ActiveMQException createException(String msg) { public ActiveMQException createException(String msg) {
return new ActiveMQReplicationTimeooutException(msg); return new ActiveMQReplicationTimeooutException(msg);
} }
},
DIVERT_DOES_NOT_EXIST(221) {
@Override
public ActiveMQException createException(String msg) {
return new ActiveMQDivertDoesNotExistException(msg);
}
}; };
private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP; private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP;

View File

@ -2662,4 +2662,12 @@ public interface AuditLogger extends BasicLogger {
@LogMessage(level = Logger.Level.INFO) @LogMessage(level = Logger.Level.INFO)
@Message(id = 601726, value = "User {0} failed to browse messages from queue {1}", format = Message.Format.MESSAGE_FORMAT) @Message(id = 601726, value = "User {0} failed to browse messages from queue {1}", format = Message.Format.MESSAGE_FORMAT)
void browseMessagesFailure(String user, String queueName); void browseMessagesFailure(String user, String queueName);
static void updateDivert(Object source, Object... args) {
LOGGER.updateDivert(getCaller(), source, arrayToString(args));
}
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601727, value = "User {0} is updating a divert on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void updateDivert(String user, Object source, Object... args);
} }

View File

@ -1573,6 +1573,17 @@ public interface ActiveMQServerControl {
@Parameter(name = "transformerPropertiesAsJSON", desc = "Configuration properties of the divert's transformer in JSON form") String transformerPropertiesAsJSON, @Parameter(name = "transformerPropertiesAsJSON", desc = "Configuration properties of the divert's transformer in JSON form") String transformerPropertiesAsJSON,
@Parameter(name = "routingType", desc = "How should the routing-type on the diverted messages be set?") String routingType) throws Exception; @Parameter(name = "routingType", desc = "How should the routing-type on the diverted messages be set?") String routingType) throws Exception;
/**
* update a divert
*/
@Operation(desc = "Update a divert", impact = MBeanOperationInfo.ACTION)
void updateDivert(@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "forwardingAddress", desc = "Address to divert to") String forwardingAddress,
@Parameter(name = "filterString", desc = "Filter of the divert") String filterString,
@Parameter(name = "transformerClassName", desc = "Class name of the divert's transformer") String transformerClassName,
@Parameter(name = "transformerProperties", desc = "Configuration properties of the divert's transformer") Map<String, String> transformerProperties,
@Parameter(name = "routingType", desc = "How should the routing-type on the diverted messages be set?") String routingType) throws Exception;
@Operation(desc = "Destroy a Divert", impact = MBeanOperationInfo.ACTION) @Operation(desc = "Destroy a Divert", impact = MBeanOperationInfo.ACTION)
void destroyDivert(@Parameter(name = "name", desc = "Name of the divert") String name) throws Exception; void destroyDivert(@Parameter(name = "name", desc = "Name of the divert") String name) throws Exception;

View File

@ -97,6 +97,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ConnectorServiceFactory; import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType; import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerConsumer;
@ -3415,6 +3416,38 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
} }
} }
@Override
public void updateDivert(final String name,
final String forwardingAddress,
final String filterString,
final String transformerClassName,
final Map<String, String> transformerProperties,
final String routingType) throws Exception {
if (AuditLogger.isEnabled()) {
AuditLogger.updateDivert(this.server, name, forwardingAddress, filterString,
transformerClassName, transformerProperties, routingType);
}
checkStarted();
clearIO();
try {
TransformerConfiguration transformerConfiguration = transformerClassName == null ? null :
new TransformerConfiguration(transformerClassName).setProperties(transformerProperties);
DivertConfiguration config = new DivertConfiguration().setName(name).setForwardingAddress(forwardingAddress).
setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).
setRoutingType(ComponentConfigurationRoutingType.valueOf(routingType));
final Divert divert = server.updateDivert(config);
if (divert == null) {
throw ActiveMQMessageBundle.BUNDLE.divertDoesNotExist(config.getName());
}
} finally {
blockOnIO();
}
}
@Override @Override
public void destroyDivert(final String name) throws Exception { public void destroyDivert(final String name) throws Exception {
if (AuditLogger.isEnabled()) { if (AuditLogger.isEnabled()) {

View File

@ -19,14 +19,17 @@ package org.apache.activemq.artemis.core.management.impl;
import javax.management.MBeanAttributeInfo; import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo; import javax.management.MBeanOperationInfo;
import java.util.Collections;
import java.util.Map; import java.util.Map;
import org.apache.activemq.artemis.api.core.JsonUtil; import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.management.DivertControl; import org.apache.activemq.artemis.api.core.management.DivertControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.transformer.RegisteredTransformer;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.logs.AuditLogger; import org.apache.activemq.artemis.logs.AuditLogger;
public class DivertControlImpl extends AbstractControl implements DivertControl { public class DivertControlImpl extends AbstractControl implements DivertControl {
@ -37,8 +40,6 @@ public class DivertControlImpl extends AbstractControl implements DivertControl
private final Divert divert; private final Divert divert;
private final DivertConfiguration configuration;
private final String internalNamingPrefix; private final String internalNamingPrefix;
// Static -------------------------------------------------------- // Static --------------------------------------------------------
@ -49,11 +50,9 @@ public class DivertControlImpl extends AbstractControl implements DivertControl
public DivertControlImpl(final Divert divert, public DivertControlImpl(final Divert divert,
final StorageManager storageManager, final StorageManager storageManager,
final DivertConfiguration configuration,
final String internalNamingPrefix) throws Exception { final String internalNamingPrefix) throws Exception {
super(DivertControl.class, storageManager); super(DivertControl.class, storageManager);
this.divert = divert; this.divert = divert;
this.configuration = configuration;
this.internalNamingPrefix = internalNamingPrefix; this.internalNamingPrefix = internalNamingPrefix;
} }
@ -64,7 +63,7 @@ public class DivertControlImpl extends AbstractControl implements DivertControl
} }
clearIO(); clearIO();
try { try {
return configuration.getAddress(); return divert.getAddress().toString();
} finally { } finally {
blockOnIO(); blockOnIO();
} }
@ -77,7 +76,8 @@ public class DivertControlImpl extends AbstractControl implements DivertControl
} }
clearIO(); clearIO();
try { try {
return configuration.getFilterString(); Filter filter = divert.getFilter();
return filter != null ? filter.getFilterString().toString() : null;
} finally { } finally {
blockOnIO(); blockOnIO();
} }
@ -90,7 +90,7 @@ public class DivertControlImpl extends AbstractControl implements DivertControl
} }
clearIO(); clearIO();
try { try {
return configuration.getForwardingAddress(); return divert.getForwardAddress().toString();
} finally { } finally {
blockOnIO(); blockOnIO();
} }
@ -116,7 +116,9 @@ public class DivertControlImpl extends AbstractControl implements DivertControl
} }
clearIO(); clearIO();
try { try {
return configuration.getTransformerConfiguration() == null ? null : configuration.getTransformerConfiguration().getClassName(); Transformer transformer = divert.getTransformer();
return transformer != null ? (transformer instanceof RegisteredTransformer ?
((RegisteredTransformer)transformer).getTransformer() : transformer).getClass().getName() : null;
} finally { } finally {
blockOnIO(); blockOnIO();
} }
@ -137,7 +139,9 @@ public class DivertControlImpl extends AbstractControl implements DivertControl
} }
clearIO(); clearIO();
try { try {
return configuration.getTransformerConfiguration() == null ? null : configuration.getTransformerConfiguration().getProperties(); Transformer transformer = divert.getTransformer();
return transformer != null && transformer instanceof RegisteredTransformer ?
((RegisteredTransformer)transformer).getProperties() : Collections.emptyMap();
} finally { } finally {
blockOnIO(); blockOnIO();
} }
@ -150,7 +154,7 @@ public class DivertControlImpl extends AbstractControl implements DivertControl
} }
clearIO(); clearIO();
try { try {
return configuration.getRoutingType().toString(); return divert.getRoutingType().toString();
} finally { } finally {
blockOnIO(); blockOnIO();
} }

View File

@ -31,8 +31,6 @@ public class DivertBinding implements Binding {
private final Divert divert; private final Divert divert;
private final Filter filter;
private final SimpleString uniqueName; private final SimpleString uniqueName;
private final SimpleString routingName; private final SimpleString routingName;
@ -48,8 +46,6 @@ public class DivertBinding implements Binding {
this.divert = divert; this.divert = divert;
filter = divert.getFilter();
uniqueName = divert.getUniqueName(); uniqueName = divert.getUniqueName();
routingName = divert.getRoutingName(); routingName = divert.getRoutingName();
@ -64,7 +60,7 @@ public class DivertBinding implements Binding {
@Override @Override
public Filter getFilter() { public Filter getFilter() {
return filter; return divert.getFilter();
} }
@Override @Override
@ -129,7 +125,7 @@ public class DivertBinding implements Binding {
", divert=" + ", divert=" +
divert + divert +
", filter=" + ", filter=" +
filter + divert.getFilter() +
", uniqueName=" + ", uniqueName=" +
uniqueName + uniqueName +
", routingName=" + ", routingName=" +

View File

@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException;
import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException; import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException;
import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException; import org.apache.activemq.artemis.api.core.ActiveMQDeleteAddressException;
import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException; import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
import org.apache.activemq.artemis.api.core.ActiveMQDivertDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateMetaDataException; import org.apache.activemq.artemis.api.core.ActiveMQDuplicateMetaDataException;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
@ -488,4 +489,7 @@ public interface ActiveMQMessageBundle {
@Message(id = 229230, value = "Failed to bind acceptor {0} to {1}", format = Message.Format.MESSAGE_FORMAT) @Message(id = 229230, value = "Failed to bind acceptor {0} to {1}", format = Message.Format.MESSAGE_FORMAT)
IllegalStateException failedToBind(String acceptor, String hostPort, @Cause Exception e); IllegalStateException failedToBind(String acceptor, String hostPort, @Cause Exception e);
@Message(id = 229231, value = "Divert Does Not Exist: {0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQDivertDoesNotExistException divertDoesNotExist(String divert);
} }

View File

@ -664,7 +664,9 @@ public interface ActiveMQServer extends ServiceComponent {
FederationManager getFederationManager(); FederationManager getFederationManager();
void deployDivert(DivertConfiguration config) throws Exception; Divert deployDivert(DivertConfiguration config) throws Exception;
Divert updateDivert(DivertConfiguration config) throws Exception;
void destroyDivert(SimpleString name) throws Exception; void destroyDivert(SimpleString name) throws Exception;

View File

@ -22,6 +22,8 @@ import org.apache.activemq.artemis.core.server.transformer.Transformer;
public interface Divert extends Bindable { public interface Divert extends Bindable {
SimpleString getAddress();
Filter getFilter(); Filter getFilter();
boolean isExclusive(); boolean isExclusive();
@ -33,4 +35,14 @@ public interface Divert extends Bindable {
Transformer getTransformer(); Transformer getTransformer();
SimpleString getForwardAddress(); SimpleString getForwardAddress();
ComponentConfigurationRoutingType getRoutingType();
void setFilter(Filter filter);
void setTransformer(Transformer transformer);
void setForwardAddress(SimpleString forwardAddress);
void setRoutingType(ComponentConfigurationRoutingType routingType);
} }

View File

@ -2544,7 +2544,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override @Override
public void deployDivert(DivertConfiguration config) throws Exception { public Divert deployDivert(DivertConfiguration config) throws Exception {
if (config.getName() == null) { if (config.getName() == null) {
throw ActiveMQMessageBundle.BUNDLE.divertWithNoName(); throw ActiveMQMessageBundle.BUNDLE.divertWithNoName();
} }
@ -2552,13 +2552,13 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (config.getAddress() == null) { if (config.getAddress() == null) {
ActiveMQServerLogger.LOGGER.divertWithNoAddress(); ActiveMQServerLogger.LOGGER.divertWithNoAddress();
return; return null;
} }
if (config.getForwardingAddress() == null) { if (config.getForwardingAddress() == null) {
ActiveMQServerLogger.LOGGER.divertWithNoForwardingAddress(); ActiveMQServerLogger.LOGGER.divertWithNoForwardingAddress();
return; return null;
} }
SimpleString sName = new SimpleString(config.getName()); SimpleString sName = new SimpleString(config.getName());
@ -2566,7 +2566,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (postOffice.getBinding(sName) != null) { if (postOffice.getBinding(sName) != null) {
ActiveMQServerLogger.LOGGER.divertBindingAlreadyExists(sName); ActiveMQServerLogger.LOGGER.divertBindingAlreadyExists(sName);
return; return null;
} }
SimpleString sAddress = new SimpleString(config.getAddress()); SimpleString sAddress = new SimpleString(config.getAddress());
@ -2575,13 +2575,53 @@ public class ActiveMQServerImpl implements ActiveMQServer {
Filter filter = FilterImpl.createFilter(config.getFilterString()); Filter filter = FilterImpl.createFilter(config.getFilterString());
Divert divert = new DivertImpl(new SimpleString(config.getForwardingAddress()), sName, new SimpleString(config.getRoutingName()), config.isExclusive(), filter, transformer, postOffice, storageManager, config.getRoutingType()); Divert divert = new DivertImpl(sName, sAddress, new SimpleString(config.getForwardingAddress()),
new SimpleString(config.getRoutingName()), config.isExclusive(),
filter, transformer, postOffice, storageManager, config.getRoutingType());
Binding binding = new DivertBinding(storageManager.generateID(), sAddress, divert); Binding binding = new DivertBinding(storageManager.generateID(), sAddress, divert);
postOffice.addBinding(binding); postOffice.addBinding(binding);
managementService.registerDivert(divert, config); managementService.registerDivert(divert);
return divert;
}
@Override
public Divert updateDivert(DivertConfiguration config) throws Exception {
final DivertBinding divertBinding = (DivertBinding) postOffice.getBinding(SimpleString.toSimpleString(config.getName()));
if (divertBinding == null) {
return null;
}
final Divert divert = divertBinding.getDivert();
Filter filter = FilterImpl.createFilter(config.getFilterString());
if (filter != null && !filter.equals(divert.getFilter())) {
divert.setFilter(filter);
}
if (config.getTransformerConfiguration() != null) {
getServiceRegistry().removeDivertTransformer(divert.getUniqueName().toString());
Transformer transformer = getServiceRegistry().getDivertTransformer(
config.getName(), config.getTransformerConfiguration());
divert.setTransformer(transformer);
}
if (config.getForwardingAddress() != null) {
SimpleString forwardAddress = SimpleString.toSimpleString(config.getForwardingAddress());
if (!forwardAddress.equals(config)) {
divert.setForwardAddress(forwardAddress);
}
}
if (config.getRoutingType() != null && divert.getRoutingType() != config.getRoutingType()) {
divert.setRoutingType(config.getRoutingType());
}
return divert;
} }
@Override @Override

View File

@ -37,7 +37,9 @@ public class DivertImpl implements Divert {
private final PostOffice postOffice; private final PostOffice postOffice;
private final SimpleString forwardAddress; private final SimpleString address;
private volatile SimpleString forwardAddress;
private final SimpleString uniqueName; private final SimpleString uniqueName;
@ -45,16 +47,17 @@ public class DivertImpl implements Divert {
private final boolean exclusive; private final boolean exclusive;
private final Filter filter; private volatile Filter filter;
private final Transformer transformer; private volatile Transformer transformer;
private final StorageManager storageManager; private final StorageManager storageManager;
private final ComponentConfigurationRoutingType routingType; private volatile ComponentConfigurationRoutingType routingType;
public DivertImpl(final SimpleString forwardAddress, public DivertImpl(final SimpleString uniqueName,
final SimpleString uniqueName, final SimpleString address,
final SimpleString forwardAddress,
final SimpleString routingName, final SimpleString routingName,
final boolean exclusive, final boolean exclusive,
final Filter filter, final Filter filter,
@ -62,6 +65,8 @@ public class DivertImpl implements Divert {
final PostOffice postOffice, final PostOffice postOffice,
final StorageManager storageManager, final StorageManager storageManager,
final ComponentConfigurationRoutingType routingType) { final ComponentConfigurationRoutingType routingType) {
this.address = address;
this.forwardAddress = forwardAddress; this.forwardAddress = forwardAddress;
this.uniqueName = uniqueName; this.uniqueName = uniqueName;
@ -153,6 +158,11 @@ public class DivertImpl implements Divert {
return exclusive; return exclusive;
} }
@Override
public SimpleString getAddress() {
return address;
}
@Override @Override
public Filter getFilter() { public Filter getFilter() {
return filter; return filter;
@ -168,6 +178,31 @@ public class DivertImpl implements Divert {
return forwardAddress; return forwardAddress;
} }
@Override
public ComponentConfigurationRoutingType getRoutingType() {
return routingType;
}
@Override
public void setFilter(Filter filter) {
this.filter = filter;
}
@Override
public void setTransformer(Transformer transformer) {
this.transformer = transformer;
}
@Override
public void setForwardAddress(SimpleString forwardAddress) {
this.forwardAddress = forwardAddress;
}
@Override
public void setRoutingType(ComponentConfigurationRoutingType routingType) {
this.routingType = routingType;
}
/* (non-Javadoc) /* (non-Javadoc)
* @see java.lang.Object#toString() * @see java.lang.Object#toString()
*/ */

View File

@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.config.TransformerConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ConnectorServiceFactory; import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
import org.apache.activemq.artemis.core.server.ServiceRegistry; import org.apache.activemq.artemis.core.server.ServiceRegistry;
import org.apache.activemq.artemis.core.server.transformer.RegisteredTransformer;
import org.apache.activemq.artemis.core.server.transformer.Transformer; import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory; import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory;
import org.apache.activemq.artemis.utils.ClassloadingUtil; import org.apache.activemq.artemis.utils.ClassloadingUtil;
@ -249,7 +250,7 @@ public class ServiceRegistryImpl implements ServiceRegistry {
if (transformerConfiguration != null && transformerConfiguration.getClassName() != null) { if (transformerConfiguration != null && transformerConfiguration.getClassName() != null) {
try { try {
transformer = loadClass(transformerConfiguration.getClassName()); transformer = new RegisteredTransformer(loadClass(transformerConfiguration.getClassName()));
transformer.init(Collections.unmodifiableMap(transformerConfiguration.getProperties())); transformer.init(Collections.unmodifiableMap(transformerConfiguration.getProperties()));
} catch (Exception e) { } catch (Exception e) {
throw ActiveMQMessageBundle.BUNDLE.errorCreatingTransformerClass(e, transformerConfiguration.getClassName()); throw ActiveMQMessageBundle.BUNDLE.errorCreatingTransformerClass(e, transformerConfiguration.getClassName());

View File

@ -30,7 +30,6 @@ import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl; import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl;
import org.apache.activemq.artemis.core.messagecounter.MessageCounterManager; import org.apache.activemq.artemis.core.messagecounter.MessageCounterManager;
import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingManager;
@ -104,7 +103,7 @@ public interface ManagementService extends NotificationService, ActiveMQComponen
void unregisterAcceptors(); void unregisterAcceptors();
void registerDivert(Divert divert, DivertConfiguration config) throws Exception; void registerDivert(Divert divert) throws Exception;
void unregisterDivert(SimpleString name, SimpleString address) throws Exception; void unregisterDivert(SimpleString name, SimpleString address) throws Exception;

View File

@ -57,7 +57,6 @@ import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.management.impl.AcceptorControlImpl; import org.apache.activemq.artemis.core.management.impl.AcceptorControlImpl;
import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl; import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl;
import org.apache.activemq.artemis.core.management.impl.AddressControlImpl; import org.apache.activemq.artemis.core.management.impl.AddressControlImpl;
@ -289,11 +288,11 @@ public class ManagementServiceImpl implements ManagementService {
} }
@Override @Override
public synchronized void registerDivert(final Divert divert, final DivertConfiguration config) throws Exception { public synchronized void registerDivert(final Divert divert) throws Exception {
ObjectName objectName = objectNameBuilder.getDivertObjectName(divert.getUniqueName().toString(), config.getAddress()); ObjectName objectName = objectNameBuilder.getDivertObjectName(divert.getUniqueName().toString(), divert.getAddress().toString());
DivertControl divertControl = new DivertControlImpl(divert, storageManager, config, messagingServer.getInternalNamingPrefix()); DivertControl divertControl = new DivertControlImpl(divert, storageManager, messagingServer.getInternalNamingPrefix());
registerInJMX(objectName, divertControl); registerInJMX(objectName, divertControl);
registerInRegistry(ResourceNames.DIVERT + config.getName(), divertControl); registerInRegistry(ResourceNames.DIVERT + divert.getUniqueName(), divertControl);
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("registered divert " + objectName); logger.debug("registered divert " + objectName);

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.transformer;
import java.util.Map;
import org.apache.activemq.artemis.api.core.Message;
public class RegisteredTransformer implements Transformer {
private final Transformer transformer;
private Map<String, String> properties;
public Transformer getTransformer() {
return transformer;
}
public Map<String, String> getProperties() {
return properties;
}
public RegisteredTransformer(Transformer transformer) {
this.transformer = transformer;
}
@Override
public void init(Map<String, String> properties) {
this.properties = properties;
this.transformer.init(properties);
}
@Override
public Message transform(Message message) {
return transformer.transform(message);
}
}

View File

@ -33,7 +33,6 @@ import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl; import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl;
import org.apache.activemq.artemis.core.messagecounter.MessageCounterManager; import org.apache.activemq.artemis.core.messagecounter.MessageCounterManager;
import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingManager;
@ -278,7 +277,7 @@ public class ClusteredResetMockTest extends ActiveMQTestBase {
} }
@Override @Override
public void registerDivert(Divert divert, DivertConfiguration config) throws Exception { public void registerDivert(Divert divert) throws Exception {
} }

View File

@ -34,6 +34,7 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.JsonUtil; import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.QueueConfiguration;
@ -1320,6 +1321,110 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
} }
@Test
public void testCreateAndUpdateDivert() throws Exception {
String address = RandomUtil.randomString();
String name = RandomUtil.randomString();
String routingName = RandomUtil.randomString();
String forwardingAddress = RandomUtil.randomString();
String updatedForwardingAddress = RandomUtil.randomString();
ActiveMQServerControl serverControl = createManagementControl();
checkNoResource(ObjectNameBuilder.DEFAULT.getDivertObjectName(name, address));
assertEquals(0, serverControl.getDivertNames().length);
serverControl.createDivert(name.toString(), routingName, address, forwardingAddress, true, null, null);
checkResource(ObjectNameBuilder.DEFAULT.getDivertObjectName(name, address));
DivertControl divertControl = ManagementControlHelper.createDivertControl(name.toString(), address, mbeanServer);
assertEquals(name.toString(), divertControl.getUniqueName());
assertEquals(address, divertControl.getAddress());
assertEquals(forwardingAddress, divertControl.getForwardingAddress());
assertEquals(routingName, divertControl.getRoutingName());
assertTrue(divertControl.isExclusive());
assertNull(divertControl.getFilter());
assertNull(divertControl.getTransformerClassName());
String[] divertNames = serverControl.getDivertNames();
assertEquals(1, divertNames.length);
assertEquals(name, divertNames[0]);
// check that a message sent to the address is diverted exclusively
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory csf = createSessionFactory(locator);
ClientSession session = csf.createSession();
String updatedDivertQueue = RandomUtil.randomString();
String divertQueue = RandomUtil.randomString();
String queue = RandomUtil.randomString();
if (legacyCreateQueue) {
session.createQueue(updatedForwardingAddress, RoutingType.ANYCAST, updatedDivertQueue);
session.createQueue(forwardingAddress, RoutingType.ANYCAST, divertQueue);
session.createQueue(address, RoutingType.ANYCAST, queue);
} else {
session.createQueue(new QueueConfiguration(updatedDivertQueue).setAddress(updatedForwardingAddress).setRoutingType(RoutingType.ANYCAST).setDurable(false));
session.createQueue(new QueueConfiguration(divertQueue).setAddress(forwardingAddress).setRoutingType(RoutingType.ANYCAST).setDurable(false));
session.createQueue(new QueueConfiguration(queue).setAddress(address).setRoutingType(RoutingType.ANYCAST).setDurable(false));
}
ClientProducer producer = session.createProducer(address);
ClientMessage message = session.createMessage(false);
String text = RandomUtil.randomString();
message.putStringProperty("prop", text);
producer.send(message);
ClientConsumer consumer = session.createConsumer(queue);
ClientConsumer divertedConsumer = session.createConsumer(divertQueue);
ClientConsumer updatedDivertedConsumer = session.createConsumer(updatedDivertQueue);
session.start();
assertNull(consumer.receiveImmediate());
message = divertedConsumer.receive(5000);
assertNotNull(message);
assertEquals(text, message.getStringProperty("prop"));
assertNull(updatedDivertedConsumer.receiveImmediate());
serverControl.updateDivert(name.toString(), updatedForwardingAddress, null, null, null, ActiveMQDefaultConfiguration.getDefaultDivertRoutingType());
checkResource(ObjectNameBuilder.DEFAULT.getDivertObjectName(name, address));
divertControl = ManagementControlHelper.createDivertControl(name.toString(), address, mbeanServer);
assertEquals(name.toString(), divertControl.getUniqueName());
assertEquals(address, divertControl.getAddress());
assertEquals(updatedForwardingAddress, divertControl.getForwardingAddress());
assertEquals(routingName, divertControl.getRoutingName());
assertTrue(divertControl.isExclusive());
assertNull(divertControl.getFilter());
assertNull(divertControl.getTransformerClassName());
divertNames = serverControl.getDivertNames();
assertEquals(1, divertNames.length);
assertEquals(name, divertNames[0]);
// check that a message is no longer exclusively diverted
message = session.createMessage(false);
String text2 = RandomUtil.randomString();
message.putStringProperty("prop", text2);
producer.send(message);
assertNull(consumer.receiveImmediate());
assertNull(divertedConsumer.receiveImmediate());
message = updatedDivertedConsumer.receive(5000);
assertNotNull(message);
assertEquals(text2, message.getStringProperty("prop"));
consumer.close();
divertedConsumer.close();
updatedDivertedConsumer.close();
session.deleteQueue(queue);
session.deleteQueue(divertQueue);
session.deleteQueue(updatedDivertQueue);
session.close();
locator.close();
}
@Test @Test
public void testCreateAndDestroyBridge() throws Exception { public void testCreateAndDestroyBridge() throws Exception {
String name = RandomUtil.randomString(); String name = RandomUtil.randomString();

View File

@ -1238,6 +1238,16 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
proxy.invokeOperation("createDivert", name, routingName, address, forwardingAddress, exclusive, filterString, transformerClassName, transformerPropertiesAsJSON, routingType); proxy.invokeOperation("createDivert", name, routingName, address, forwardingAddress, exclusive, filterString, transformerClassName, transformerPropertiesAsJSON, routingType);
} }
@Override
public void updateDivert(String name,
String forwardingAddress,
String filterString,
String transformerClassName,
Map<String, String> transformerProperties,
String routingType) throws Exception {
proxy.invokeOperation("updateDivert", name, forwardingAddress, filterString, transformerClassName, transformerProperties, routingType);
}
@Override @Override
public void destroyDivert(String name) throws Exception { public void destroyDivert(String name) throws Exception {
proxy.invokeOperation("destroyDivert", name); proxy.invokeOperation("destroyDivert", name);