ARTEMIS-1446 - Support Transformer configuration by properties

Update Tranformer to be able to handle initiation via propertiers (map<string, string>)
Update Configuration to have more specific transfromer configuration type, and to take properties.
Support back compatibility.
Add AddHeadersTransformer which is a main use case, and can act as example also.
Update Control's to expose new property configuration
Add test cases
Update examples for new transformer config style
This commit is contained in:
Michael Andre Pearce 2017-09-27 20:01:29 +01:00 committed by Clebert Suconic
parent c94ca2d43a
commit 4db8cd54ca
35 changed files with 864 additions and 78 deletions

View File

@ -248,10 +248,10 @@ public final class JsonUtil {
return array.build();
}
public static JsonObject toJsonObject(Map<String, Object> map) {
public static JsonObject toJsonObject(Map<String, ?> map) {
JsonObjectBuilder jsonObjectBuilder = JsonLoader.createObjectBuilder();
if (map != null) {
for (Map.Entry<String, Object> entry : map.entrySet()) {
for (Map.Entry<String, ?> entry : map.entrySet()) {
addToObject(entry.getKey(), entry.getValue(), jsonObjectBuilder);
}
}
@ -266,6 +266,14 @@ public final class JsonUtil {
return Json.createReader(new StringReader(jsonString)).readObject();
}
public static Map<String, String> readJsonProperties(String jsonString) {
Map<String, String> properties = new HashMap<>();
if (jsonString != null) {
JsonUtil.readJsonObject(jsonString).forEach((k, v) -> properties.put(k, v.toString()));
}
return properties;
}
public static Object convertJsonValue(Object jsonValue, Class desiredType) {
if (jsonValue instanceof JsonNumber) {
JsonNumber number = (JsonNumber) jsonValue;

View File

@ -1028,6 +1028,28 @@ public interface ActiveMQServerControl {
@Parameter(name = "transformerClassName", desc = "Class name of the divert's transformer") String transformerClassName,
@Parameter(name = "routingType", desc = "How should the routing-type on the diverted messages be set?") String routingType) throws Exception;
@Operation(desc = "Create a Divert", impact = MBeanOperationInfo.ACTION)
void createDivert(@Parameter(name = "name", desc = "Name of the divert") String name,
@Parameter(name = "routingName", desc = "Routing name of the divert") String routingName,
@Parameter(name = "address", desc = "Address to divert from") String address,
@Parameter(name = "forwardingAddress", desc = "Address to divert to") String forwardingAddress,
@Parameter(name = "exclusive", desc = "Is the divert exclusive?") boolean exclusive,
@Parameter(name = "filterString", desc = "Filter of the divert") String filterString,
@Parameter(name = "transformerClassName", desc = "Class name of the divert's transformer") String transformerClassName,
@Parameter(name = "transformerProperties", desc = "Configuration properties of the divert's transformer") Map<String, String> transformerProperties,
@Parameter(name = "routingType", desc = "How should the routing-type on the diverted messages be set?") String routingType) throws Exception;
@Operation(desc = "Create a Divert", impact = MBeanOperationInfo.ACTION)
void createDivert(@Parameter(name = "name", desc = "Name of the divert") String name,
@Parameter(name = "routingName", desc = "Routing name of the divert") String routingName,
@Parameter(name = "address", desc = "Address to divert from") String address,
@Parameter(name = "forwardingAddress", desc = "Address to divert to") String forwardingAddress,
@Parameter(name = "exclusive", desc = "Is the divert exclusive?") boolean exclusive,
@Parameter(name = "filterString", desc = "Filter of the divert") String filterString,
@Parameter(name = "transformerClassName", desc = "Class name of the divert's transformer") String transformerClassName,
@Parameter(name = "transformerPropertiesAsJSON", desc = "Configuration properties of the divert's transformer in JSON form") String transformerPropertiesAsJSON,
@Parameter(name = "routingType", desc = "How should the routing-type on the diverted messages be set?") String routingType) throws Exception;
@Operation(desc = "Destroy a Divert", impact = MBeanOperationInfo.ACTION)
void destroyDivert(@Parameter(name = "name", desc = "Name of the divert") String name) throws Exception;
@ -1054,6 +1076,48 @@ public interface ActiveMQServerControl {
@Parameter(name = "user", desc = "User name") String user,
@Parameter(name = "password", desc = "User password") String password) throws Exception;
@Operation(desc = "Create a Bridge", impact = MBeanOperationInfo.ACTION)
void createBridge(@Parameter(name = "name", desc = "Name of the bridge") String name,
@Parameter(name = "queueName", desc = "Name of the source queue") String queueName,
@Parameter(name = "forwardingAddress", desc = "Forwarding address") String forwardingAddress,
@Parameter(name = "filterString", desc = "Filter of the bridge") String filterString,
@Parameter(name = "transformerClassName", desc = "Class name of the bridge transformer") String transformerClassName,
@Parameter(name = "transformerProperties", desc = "Configuration properties of the bridge transformer") Map<String, String> transformerProperties,
@Parameter(name = "retryInterval", desc = "Connection retry interval") long retryInterval,
@Parameter(name = "retryIntervalMultiplier", desc = "Connection retry interval multiplier") double retryIntervalMultiplier,
@Parameter(name = "initialConnectAttempts", desc = "Number of initial connection attempts") int initialConnectAttempts,
@Parameter(name = "reconnectAttempts", desc = "Number of reconnection attempts") int reconnectAttempts,
@Parameter(name = "useDuplicateDetection", desc = "Use duplicate detection") boolean useDuplicateDetection,
@Parameter(name = "confirmationWindowSize", desc = "Confirmation window size") int confirmationWindowSize,
@Parameter(name = "producerWindowSize", desc = "Producer window size") int producerWindowSize,
@Parameter(name = "clientFailureCheckPeriod", desc = "Period to check client failure") long clientFailureCheckPeriod,
@Parameter(name = "staticConnectorNames", desc = "comma separated list of connector names or name of discovery group if 'useDiscoveryGroup' is set to true") String connectorNames,
@Parameter(name = "useDiscoveryGroup", desc = "use discovery group") boolean useDiscoveryGroup,
@Parameter(name = "ha", desc = "Is it using HA") boolean ha,
@Parameter(name = "user", desc = "User name") String user,
@Parameter(name = "password", desc = "User password") String password) throws Exception;
@Operation(desc = "Create a Bridge", impact = MBeanOperationInfo.ACTION)
void createBridge(@Parameter(name = "name", desc = "Name of the bridge") String name,
@Parameter(name = "queueName", desc = "Name of the source queue") String queueName,
@Parameter(name = "forwardingAddress", desc = "Forwarding address") String forwardingAddress,
@Parameter(name = "filterString", desc = "Filter of the bridge") String filterString,
@Parameter(name = "transformerClassName", desc = "Class name of the bridge transformer") String transformerClassName,
@Parameter(name = "transformerPropertiesAsJSON", desc = "Configuration properties of the bridge transformer in JSON form") String transformerPropertiesAsJSON,
@Parameter(name = "retryInterval", desc = "Connection retry interval") long retryInterval,
@Parameter(name = "retryIntervalMultiplier", desc = "Connection retry interval multiplier") double retryIntervalMultiplier,
@Parameter(name = "initialConnectAttempts", desc = "Number of initial connection attempts") int initialConnectAttempts,
@Parameter(name = "reconnectAttempts", desc = "Number of reconnection attempts") int reconnectAttempts,
@Parameter(name = "useDuplicateDetection", desc = "Use duplicate detection") boolean useDuplicateDetection,
@Parameter(name = "confirmationWindowSize", desc = "Confirmation window size") int confirmationWindowSize,
@Parameter(name = "producerWindowSize", desc = "Producer window size") int producerWindowSize,
@Parameter(name = "clientFailureCheckPeriod", desc = "Period to check client failure") long clientFailureCheckPeriod,
@Parameter(name = "staticConnectorNames", desc = "comma separated list of connector names or name of discovery group if 'useDiscoveryGroup' is set to true") String connectorNames,
@Parameter(name = "useDiscoveryGroup", desc = "use discovery group") boolean useDiscoveryGroup,
@Parameter(name = "ha", desc = "Is it using HA") boolean ha,
@Parameter(name = "user", desc = "User name") String user,
@Parameter(name = "password", desc = "User password") String password) throws Exception;
@Operation(desc = "Create a Bridge", impact = MBeanOperationInfo.ACTION)
void createBridge(@Parameter(name = "name", desc = "Name of the bridge") String name,
@Parameter(name = "queueName", desc = "Name of the source queue") String queueName,

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.api.core.management;
import java.util.Map;
/**
* A BridgeControl is used to manage a Bridge.
*/
@ -51,6 +53,18 @@ public interface BridgeControl extends ActiveMQComponentControl {
@Attribute(desc = "name of the org.apache.activemq.artemis.core.server.cluster.Transformer implementation associated with this bridge")
String getTransformerClassName();
/**
* Returns a map of the properties configured for the transformer.
*/
@Attribute(desc = "map of key, value pairs used to configure the transformer in JSON form")
String getTransformerPropertiesAsJSON() throws Exception;
/**
* Returns a map of the properties configured for the transformer.
*/
@Attribute(desc = "map of key, value pairs used to configure the transformer")
Map<String, String> getTransformerProperties() throws Exception;
/**
* Returns any list of static connectors used by this bridge
*/

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.api.core.management;
import java.util.Map;
/**
* A DivertControl is used to manage a divert.
*/
@ -66,6 +68,18 @@ public interface DivertControl {
@Attribute(desc = "name of the org.apache.activemq.artemis.core.server.cluster.Transformer implementation associated with this divert")
String getTransformerClassName();
/**
* Returns a map of the properties configured for the transformer.
*/
@Attribute(desc = "map of key, value pairs used to configure the transformer in JSON form")
String getTransformerPropertiesAsJSON();
/**
* Returns a map of the properties configured for the transformer.
*/
@Attribute(desc = "map of key, value pairs used to configure the transformer")
Map<String, String> getTransformerProperties() throws Exception;
/**
* Returns the routing type used by this divert.
*/

View File

@ -40,7 +40,7 @@ public final class BridgeConfiguration implements Serializable {
private boolean ha = false;
private String transformerClassName = null;
private TransformerConfiguration transformerConfiguration = null;
private long retryInterval = ActiveMQClient.DEFAULT_RETRY_INTERVAL;
@ -150,15 +150,15 @@ public final class BridgeConfiguration implements Serializable {
return this;
}
public String getTransformerClassName() {
return transformerClassName;
public TransformerConfiguration getTransformerConfiguration() {
return transformerConfiguration;
}
/**
* @param transformerClassName the transformerClassName to set
* @param transformerConfiguration the transformerConfiguration to set
*/
public BridgeConfiguration setTransformerClassName(final String transformerClassName) {
this.transformerClassName = transformerClassName;
public BridgeConfiguration setTransformerConfiguration(final TransformerConfiguration transformerConfiguration) {
this.transformerConfiguration = transformerConfiguration;
return this;
}
@ -373,7 +373,7 @@ public final class BridgeConfiguration implements Serializable {
temp = Double.doubleToLongBits(retryIntervalMultiplier);
result = prime * result + (int) (temp ^ (temp >>> 32));
result = prime * result + ((staticConnectors == null) ? 0 : staticConnectors.hashCode());
result = prime * result + ((transformerClassName == null) ? 0 : transformerClassName.hashCode());
result = prime * result + ((transformerConfiguration == null) ? 0 : transformerConfiguration.hashCode());
result = prime * result + (useDuplicateDetection ? 1231 : 1237);
result = prime * result + ((user == null) ? 0 : user.hashCode());
return result;
@ -447,10 +447,10 @@ public final class BridgeConfiguration implements Serializable {
return false;
} else if (!staticConnectors.equals(other.staticConnectors))
return false;
if (transformerClassName == null) {
if (other.transformerClassName != null)
if (transformerConfiguration == null) {
if (other.transformerConfiguration != null)
return false;
} else if (!transformerClassName.equals(other.transformerClassName))
} else if (!transformerConfiguration.equals(other.transformerConfiguration))
return false;
if (useDuplicateDetection != other.useDuplicateDetection)
return false;

View File

@ -38,7 +38,7 @@ public class DivertConfiguration implements Serializable {
private String filterString = null;
private String transformerClassName = null;
private TransformerConfiguration transformerConfiguration = null;
private DivertConfigurationRoutingType routingType = DivertConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultDivertRoutingType());
@ -69,8 +69,8 @@ public class DivertConfiguration implements Serializable {
return filterString;
}
public String getTransformerClassName() {
return transformerClassName;
public TransformerConfiguration getTransformerConfiguration() {
return transformerConfiguration;
}
public DivertConfigurationRoutingType getRoutingType() {
@ -130,10 +130,10 @@ public class DivertConfiguration implements Serializable {
}
/**
* @param transformerClassName the transformerClassName to set
* @param transformerConfiguration the transformerConfiguration to set
*/
public DivertConfiguration setTransformerClassName(final String transformerClassName) {
this.transformerClassName = transformerClassName;
public DivertConfiguration setTransformerConfiguration(final TransformerConfiguration transformerConfiguration) {
this.transformerConfiguration = transformerConfiguration;
return this;
}
@ -155,7 +155,7 @@ public class DivertConfiguration implements Serializable {
result = prime * result + ((forwardingAddress == null) ? 0 : forwardingAddress.hashCode());
result = prime * result + ((name == null) ? 0 : name.hashCode());
result = prime * result + ((routingName == null) ? 0 : routingName.hashCode());
result = prime * result + ((transformerClassName == null) ? 0 : transformerClassName.hashCode());
result = prime * result + ((transformerConfiguration == null) ? 0 : transformerConfiguration.hashCode());
result = prime * result + ((routingType == null) ? 0 : routingType.hashCode());
return result;
}
@ -196,10 +196,10 @@ public class DivertConfiguration implements Serializable {
return false;
} else if (!routingName.equals(other.routingName))
return false;
if (transformerClassName == null) {
if (other.transformerClassName != null)
if (transformerConfiguration == null) {
if (other.transformerConfiguration != null)
return false;
} else if (!transformerClassName.equals(other.transformerClassName))
} else if (!transformerConfiguration.equals(other.transformerConfiguration))
return false;
if (routingType == null) {
if (other.routingType != null)

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.config;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
public final class TransformerConfiguration implements Serializable {
private static final long serialVersionUID = -1057244274380572226L;
private String className = null;
private Map<String, String> properties = new HashMap<>();
public TransformerConfiguration() {
}
public String getClassName() {
return className;
}
/**
* @param className the class name to set
*/
public TransformerConfiguration setClassName(final String className) {
this.className = className;
return this;
}
public Map<String, String> getProperties() {
return properties;
}
/**
* @param properties the properties to set
*/
public TransformerConfiguration setProperties(final Map<String, String> properties) {
if (properties != null) {
this.properties.putAll(properties);
}
return this;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((className == null) ? 0 : className.hashCode());
result = prime * result + ((properties == null) ? 0 : properties.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
TransformerConfiguration other = (TransformerConfiguration) obj;
if (className == null) {
if (other.className != null)
return false;
} else if (!className.equals(other.className))
return false;
if (properties == null) {
if (other.properties != null)
return false;
} else if (!properties.equals(other.properties))
return false;
return true;
}
}

View File

@ -31,6 +31,7 @@ import java.util.Set;
import org.apache.activemq.artemis.ArtemisConstants;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
@ -1626,6 +1627,27 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
mainConfiguration.setGroupingHandlerConfiguration(new GroupingHandlerConfiguration().setName(new SimpleString(name)).setType(type.equals(GroupingHandlerConfiguration.TYPE.LOCAL.getType()) ? GroupingHandlerConfiguration.TYPE.LOCAL : GroupingHandlerConfiguration.TYPE.REMOTE).setAddress(new SimpleString(address)).setTimeout(timeout).setGroupTimeout(groupTimeout).setReaperPeriod(reaperPeriod));
}
private TransformerConfiguration getTransformerConfiguration(final Node node) {
Element element = (Element) node;
String className = getString(element, "class-name", null, Validators.NO_CHECK);
Map<String, String> properties = new HashMap<>();
NodeList children = element.getChildNodes();
for (int j = 0; j < children.getLength(); j++) {
Node child = children.item(j);
if (child.getNodeName().equals("property")) {
String key = getAttributeValue(child, "key");
String value = getAttributeValue(child, "value");
properties.put(key, value);
}
}
return new TransformerConfiguration().setClassName(className).setProperties(properties);
}
private TransformerConfiguration getTransformerConfiguration(final String transformerClassName) {
return new TransformerConfiguration().setClassName(transformerClassName).setProperties(Collections.EMPTY_MAP);
}
private void parseBridgeConfiguration(final Element brNode, final Configuration mainConfig) throws Exception {
String name = brNode.getAttribute("name");
@ -1684,6 +1706,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
boolean ha = getBoolean(brNode, "ha", false);
TransformerConfiguration transformerConfiguration = null;
String filterString = null;
List<String> staticConnectorNames = new ArrayList<>();
@ -1701,10 +1725,16 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
discoveryGroupName = child.getAttributes().getNamedItem("discovery-group-name").getNodeValue();
} else if (child.getNodeName().equals("static-connectors")) {
getStaticConnectors(staticConnectorNames, child);
} else if (child.getNodeName().equals("transformer")) {
transformerConfiguration = getTransformerConfiguration(child);
}
}
BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerClassName(transformerClassName).setMinLargeMessageSize(minLargeMessageSize).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setRetryInterval(retryInterval).setMaxRetryInterval(maxRetryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setReconnectAttemptsOnSameNode(reconnectAttemptsSameNode).setUseDuplicateDetection(useDuplicateDetection).setConfirmationWindowSize(confirmationWindowSize).setProducerWindowSize(producerWindowSize).setHA(ha).setUser(user).setPassword(password);
if (transformerConfiguration == null && transformerClassName != null) {
transformerConfiguration = getTransformerConfiguration(transformerClassName);
}
BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setMinLargeMessageSize(minLargeMessageSize).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setRetryInterval(retryInterval).setMaxRetryInterval(maxRetryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setReconnectAttemptsOnSameNode(reconnectAttemptsSameNode).setUseDuplicateDetection(useDuplicateDetection).setConfirmationWindowSize(confirmationWindowSize).setProducerWindowSize(producerWindowSize).setHA(ha).setUser(user).setPassword(password);
if (!staticConnectorNames.isEmpty()) {
config.setStaticConnectors(staticConnectorNames);
@ -1742,6 +1772,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
DivertConfigurationRoutingType routingType = DivertConfigurationRoutingType.valueOf(getString(e, "routing-type", ActiveMQDefaultConfiguration.getDefaultDivertRoutingType(), Validators.DIVERT_ROUTING_TYPE));
TransformerConfiguration transformerConfiguration = null;
String filterString = null;
NodeList children = e.getChildNodes();
@ -1751,16 +1783,22 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
if (child.getNodeName().equals("filter")) {
filterString = getAttributeValue(child, "string");
} else if (child.getNodeName().equals("transformer")) {
transformerConfiguration = getTransformerConfiguration(child);
}
}
DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerClassName(transformerClassName).setRoutingType(routingType);
if (transformerConfiguration == null && transformerClassName != null) {
transformerConfiguration = getTransformerConfiguration(transformerClassName);
}
DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setRoutingType(routingType);
mainConfig.getDivertConfigurations().add(config);
}
/**
* @param node
* @param e
* @return
*/
protected void parseWildcardConfiguration(final Element e, final Configuration mainConfig) {

View File

@ -48,6 +48,7 @@ import java.util.stream.Collectors;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
@ -64,6 +65,7 @@ import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.management.impl.view.AddressView;
import org.apache.activemq.artemis.core.management.impl.view.ConnectionView;
@ -2262,11 +2264,38 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
final String filterString,
final String transformerClassName,
final String routingType) throws Exception {
createDivert(name, routingName, address, forwardingAddress, exclusive, filterString, transformerClassName, (String) null, routingType);
}
@Override
public void createDivert(final String name,
final String routingName,
final String address,
final String forwardingAddress,
final boolean exclusive,
final String filterString,
final String transformerClassName,
final String transformerPropertiesAsJSON,
final String routingType) throws Exception {
createDivert(name, routingName, address, forwardingAddress, exclusive, filterString, transformerClassName, JsonUtil.readJsonProperties(transformerPropertiesAsJSON), routingType);
}
@Override
public void createDivert(final String name,
final String routingName,
final String address,
final String forwardingAddress,
final boolean exclusive,
final String filterString,
final String transformerClassName,
final Map<String, String> transformerProperties,
final String routingType) throws Exception {
checkStarted();
clearIO();
try {
DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerClassName(transformerClassName).setRoutingType(DivertConfigurationRoutingType.valueOf(routingType));
TransformerConfiguration transformerConfiguration = transformerClassName == null ? null : new TransformerConfiguration().setClassName(transformerClassName).setProperties(transformerProperties);
DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setRoutingType(DivertConfigurationRoutingType.valueOf(routingType));
server.deployDivert(config);
} finally {
blockOnIO();
@ -2323,12 +2352,95 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
final boolean ha,
final String user,
final String password) throws Exception {
createBridge(name,
queueName,
forwardingAddress,
filterString,
transformerClassName,
(String) null,
retryInterval,
retryIntervalMultiplier,
initialConnectAttempts,
reconnectAttempts,
useDuplicateDetection,
confirmationWindowSize,
producerWindowSize,
clientFailureCheckPeriod,
staticConnectorsOrDiscoveryGroup,
useDiscoveryGroup,
ha,
user,
password);
}
@Override
public void createBridge(final String name,
final String queueName,
final String forwardingAddress,
final String filterString,
final String transformerClassName,
final String transformerPropertiesAsJSON,
final long retryInterval,
final double retryIntervalMultiplier,
final int initialConnectAttempts,
final int reconnectAttempts,
final boolean useDuplicateDetection,
final int confirmationWindowSize,
final int producerWindowSize,
final long clientFailureCheckPeriod,
final String staticConnectorsOrDiscoveryGroup,
boolean useDiscoveryGroup,
final boolean ha,
final String user,
final String password) throws Exception {
createBridge(name,
queueName,
forwardingAddress,
filterString,
transformerClassName,
JsonUtil.readJsonProperties(transformerPropertiesAsJSON),
retryInterval,
retryIntervalMultiplier,
initialConnectAttempts,
reconnectAttempts,
useDuplicateDetection,
confirmationWindowSize,
producerWindowSize,
clientFailureCheckPeriod,
staticConnectorsOrDiscoveryGroup,
useDiscoveryGroup,
ha,
user,
password);
}
@Override
public void createBridge(final String name,
final String queueName,
final String forwardingAddress,
final String filterString,
final String transformerClassName,
final Map<String, String> transformerProperties,
final long retryInterval,
final double retryIntervalMultiplier,
final int initialConnectAttempts,
final int reconnectAttempts,
final boolean useDuplicateDetection,
final int confirmationWindowSize,
final int producerWindowSize,
final long clientFailureCheckPeriod,
final String staticConnectorsOrDiscoveryGroup,
boolean useDiscoveryGroup,
final boolean ha,
final String user,
final String password) throws Exception {
checkStarted();
clearIO();
try {
BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerClassName(transformerClassName).setClientFailureCheckPeriod(clientFailureCheckPeriod).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setUseDuplicateDetection(useDuplicateDetection).setConfirmationWindowSize(confirmationWindowSize).setProducerWindowSize(producerWindowSize).setHA(ha).setUser(user).setPassword(password);
TransformerConfiguration transformerConfiguration = transformerClassName == null ? null : new TransformerConfiguration().setClassName(transformerClassName).setProperties(transformerProperties);
BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setClientFailureCheckPeriod(clientFailureCheckPeriod).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setUseDuplicateDetection(useDuplicateDetection).setConfirmationWindowSize(confirmationWindowSize).setProducerWindowSize(producerWindowSize).setHA(ha).setUser(user).setPassword(password);
if (useDiscoveryGroup) {
config.setDiscoveryGroupName(staticConnectorsOrDiscoveryGroup);
@ -2365,7 +2477,8 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
clearIO();
try {
BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerClassName(transformerClassName).setClientFailureCheckPeriod(clientFailureCheckPeriod).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setUseDuplicateDetection(useDuplicateDetection).setConfirmationWindowSize(confirmationWindowSize).setHA(ha).setUser(user).setPassword(password);
TransformerConfiguration transformerConfiguration = transformerClassName == null ? null : new TransformerConfiguration().setClassName(transformerClassName);
BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setClientFailureCheckPeriod(clientFailureCheckPeriod).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setUseDuplicateDetection(useDuplicateDetection).setConfirmationWindowSize(confirmationWindowSize).setHA(ha).setUser(user).setPassword(password);
if (useDiscoveryGroup) {
config.setDiscoveryGroupName(staticConnectorsOrDiscoveryGroup);

View File

@ -19,7 +19,9 @@ package org.apache.activemq.artemis.core.management.impl;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.management.BridgeControl;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@ -144,7 +146,22 @@ public class BridgeControlImpl extends AbstractControl implements BridgeControl
public String getTransformerClassName() {
clearIO();
try {
return configuration.getTransformerClassName();
return configuration.getTransformerConfiguration() == null ? null : configuration.getTransformerConfiguration().getClassName();
} finally {
blockOnIO();
}
}
@Override
public String getTransformerPropertiesAsJSON() {
return JsonUtil.toJsonObject(getTransformerProperties()).toString();
}
@Override
public Map<String, String> getTransformerProperties() {
clearIO();
try {
return configuration.getTransformerConfiguration() == null ? null : configuration.getTransformerConfiguration().getProperties();
} finally {
blockOnIO();
}

View File

@ -19,6 +19,9 @@ package org.apache.activemq.artemis.core.management.impl;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;
import java.util.Map;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.management.DivertControl;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@ -92,7 +95,22 @@ public class DivertControlImpl extends AbstractControl implements DivertControl
public String getTransformerClassName() {
clearIO();
try {
return configuration.getTransformerClassName();
return configuration.getTransformerConfiguration() == null ? null : configuration.getTransformerConfiguration().getClassName();
} finally {
blockOnIO();
}
}
@Override
public String getTransformerPropertiesAsJSON() {
return JsonUtil.toJsonObject(getTransformerProperties()).toString();
}
@Override
public Map<String, String> getTransformerProperties() {
clearIO();
try {
return configuration.getTransformerConfiguration() == null ? null : configuration.getTransformerConfiguration().getProperties();
} finally {
blockOnIO();
}

View File

@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.server;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.server.cluster.Transformer;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
public interface Divert extends Bindable {

View File

@ -24,7 +24,8 @@ import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
import org.apache.activemq.artemis.core.server.cluster.Transformer;
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory;
/**
@ -87,24 +88,24 @@ public interface ServiceRegistry {
List<BaseInterceptor> getOutgoingInterceptors(List<String> classNames);
/**
* Get an instance of org.apache.activemq.artemis.core.server.cluster.Transformer for a divert
* Get an instance of org.apache.activemq.artemis.core.server.transformer.Transformer for a divert
*
* @param name the name of divert for which the transformer will be used
* @param className the fully qualified name of the transformer implementation (can be null)
* @param transformerConfiguration the transformer configuration
* @return
*/
Transformer getDivertTransformer(String name, String className);
Transformer getDivertTransformer(String name, TransformerConfiguration transformerConfiguration);
void addDivertTransformer(String name, Transformer transformer);
/**
* Get an instance of org.apache.activemq.artemis.core.server.cluster.Transformer for a bridge
* Get an instance of org.apache.activemq.artemis.core.server.transformer.Transformer for a bridge
*
* @param name the name of bridge for which the transformer will be used
* @param className the fully qualified name of the transformer implementation (can be null)
* @param transformerConfiguration the transformer configuration
* @return
*/
Transformer getBridgeTransformer(String name, String className);
Transformer getBridgeTransformer(String name, TransformerConfiguration transformerConfiguration);
void addBridgeTransformer(String name, Transformer transformer);

View File

@ -53,6 +53,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.server.cluster.ha.HAManager;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.BroadcastGroupImpl;
@ -395,7 +396,7 @@ public final class ClusterManager implements ActiveMQComponent {
return;
}
Transformer transformer = server.getServiceRegistry().getBridgeTransformer(config.getName(), config.getTransformerClassName());
Transformer transformer = server.getServiceRegistry().getBridgeTransformer(config.getName(), config.getTransformerConfiguration());
Binding binding = postOffice.getBinding(new SimpleString(config.getQueueName()));

View File

@ -16,9 +16,8 @@
*/
package org.apache.activemq.artemis.core.server.cluster;
import org.apache.activemq.artemis.api.core.Message;
public interface Transformer {
Message transform(Message message);
/**
* This is for back compatibility with package move.
*/
public interface Transformer extends org.apache.activemq.artemis.core.server.transformer.Transformer {
}

View File

@ -53,7 +53,7 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.Transformer;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationService;

View File

@ -45,7 +45,7 @@ import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtoco
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
import org.apache.activemq.artemis.core.server.cluster.Transformer;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;

View File

@ -136,7 +136,7 @@ import org.apache.activemq.artemis.core.server.ServiceComponent;
import org.apache.activemq.artemis.core.server.ServiceRegistry;
import org.apache.activemq.artemis.core.server.cluster.BackupManager;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.server.cluster.Transformer;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy;
import org.apache.activemq.artemis.core.server.files.FileMoveManager;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
@ -2051,7 +2051,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
SimpleString sAddress = new SimpleString(config.getAddress());
Transformer transformer = getServiceRegistry().getDivertTransformer(config.getName(), config.getTransformerClassName());
Transformer transformer = getServiceRegistry().getDivertTransformer(config.getName(), config.getTransformerConfiguration());
Filter filter = FilterImpl.createFilter(config.getFilterString());

View File

@ -25,7 +25,7 @@ import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.Transformer;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.jboss.logging.Logger;
/**

View File

@ -30,10 +30,11 @@ import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
import org.apache.activemq.artemis.core.server.ServiceRegistry;
import org.apache.activemq.artemis.core.server.cluster.Transformer;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory;
import org.apache.activemq.artemis.utils.ClassloadingUtil;
@ -153,11 +154,11 @@ public class ServiceRegistryImpl implements ServiceRegistry {
}
@Override
public Transformer getDivertTransformer(String name, String className) {
public Transformer getDivertTransformer(String name, TransformerConfiguration transformerConfiguration) {
Transformer transformer = divertTransformers.get(name);
if (transformer == null && className != null) {
transformer = instantiateTransformer(className);
if (transformer == null && transformerConfiguration != null && transformerConfiguration.getClassName() != null) {
transformer = instantiateTransformer(transformerConfiguration);
addDivertTransformer(name, transformer);
}
@ -180,11 +181,11 @@ public class ServiceRegistryImpl implements ServiceRegistry {
}
@Override
public Transformer getBridgeTransformer(String name, String className) {
public Transformer getBridgeTransformer(String name, TransformerConfiguration transformerConfiguration) {
Transformer transformer = bridgeTransformers.get(name);
if (transformer == null && className != null) {
transformer = instantiateTransformer(className);
if (transformer == null && transformerConfiguration != null && transformerConfiguration.getClassName() != null) {
transformer = instantiateTransformer(transformerConfiguration);
addBridgeTransformer(name, transformer);
}
@ -218,14 +219,15 @@ public class ServiceRegistryImpl implements ServiceRegistry {
});
}
private Transformer instantiateTransformer(final String className) {
private Transformer instantiateTransformer(final TransformerConfiguration transformerConfiguration) {
Transformer transformer = null;
if (className != null) {
if (transformerConfiguration != null && transformerConfiguration.getClassName() != null) {
try {
transformer = loadClass(className);
transformer = loadClass(transformerConfiguration.getClassName());
transformer.init(Collections.unmodifiableMap(transformerConfiguration.getProperties()));
} catch (Exception e) {
throw ActiveMQMessageBundle.BUNDLE.errorCreatingTransformerClass(e, className);
throw ActiveMQMessageBundle.BUNDLE.errorCreatingTransformerClass(e, transformerConfiguration.getClassName());
}
}
return transformer;

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.transformer;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
public class AddHeadersTransformer implements Transformer {
private Map<SimpleString, SimpleString> headers = new HashMap<>();
@Override
public void init(Map<String, String> properties) {
properties.forEach((k,v) -> headers.put(SimpleString.toSimpleString(k), SimpleString.toSimpleString(v)));
}
@Override
public Message transform(Message message) {
headers.forEach(message::putStringProperty);
return message;
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.transformer;
import java.util.Map;
import org.apache.activemq.artemis.api.core.Message;
public interface Transformer {
default void init(Map<String, String> properties) { }
Message transform(Message message);
}

View File

@ -1262,6 +1262,14 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="transformer" type="transformerType" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
optional transformer configuration
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="min-large-message-size" type="xsd:string" default="102400" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
@ -1418,6 +1426,44 @@
</xsd:attribute>
</xsd:complexType>
<!-- TRANSFORMER CONFIGURATION -->
<xsd:complexType name="transformerType">
<xsd:sequence>
<xsd:element name="class-name" type="xsd:string" maxOccurs="1" minOccurs="1">
<xsd:annotation>
<xsd:documentation>
optional name of transformer class
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="property" type="transformerProperty" maxOccurs="unbounded" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
properties to configure the transformer class
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:sequence>
</xsd:complexType>
<xsd:complexType name="transformerProperty">
<xsd:attribute name="key" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
key for the property
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="value" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
value for the property
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- CLUSTER CONNECTION CONFIGURATION -->
@ -1704,6 +1750,14 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="transformer" type="transformerType" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
optional transformer configuration
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="exclusive" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>

View File

@ -193,7 +193,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
Assert.assertEquals(12999, ((UDPBroadcastEndpointFactory) dc.getBroadcastEndpointFactory()).getGroupPort());
Assert.assertEquals(23456, dc.getRefreshTimeout());
Assert.assertEquals(2, conf.getDivertConfigurations().size());
Assert.assertEquals(3, conf.getDivertConfigurations().size());
for (DivertConfiguration dic : conf.getDivertConfigurations()) {
if (dic.getName().equals("divert1")) {
Assert.assertEquals("divert1", dic.getName());
@ -201,20 +201,25 @@ public class FileConfigurationTest extends ConfigurationImplTest {
Assert.assertEquals("address1", dic.getAddress());
Assert.assertEquals("forwarding-address1", dic.getForwardingAddress());
Assert.assertEquals("speed > 88", dic.getFilterString());
Assert.assertEquals("org.foo.Transformer", dic.getTransformerClassName());
Assert.assertEquals("org.foo.Transformer", dic.getTransformerConfiguration().getClassName());
Assert.assertEquals(true, dic.isExclusive());
} else {
} else if (dic.getName().equals("divert2")) {
Assert.assertEquals("divert2", dic.getName());
Assert.assertEquals("routing-name2", dic.getRoutingName());
Assert.assertEquals("address2", dic.getAddress());
Assert.assertEquals("forwarding-address2", dic.getForwardingAddress());
Assert.assertEquals("speed < 88", dic.getFilterString());
Assert.assertEquals("org.foo.Transformer2", dic.getTransformerClassName());
Assert.assertEquals("org.foo.Transformer2", dic.getTransformerConfiguration().getClassName());
Assert.assertEquals(false, dic.isExclusive());
} else {
Assert.assertEquals("divert3", dic.getName());
Assert.assertEquals("org.foo.DivertTransformer3", dic.getTransformerConfiguration().getClassName());
Assert.assertEquals("divertTransformerValue1", dic.getTransformerConfiguration().getProperties().get("divertTransformerKey1"));
Assert.assertEquals("divertTransformerValue2", dic.getTransformerConfiguration().getProperties().get("divertTransformerKey2"));
}
}
Assert.assertEquals(2, conf.getBridgeConfigurations().size());
Assert.assertEquals(3, conf.getBridgeConfigurations().size());
for (BridgeConfiguration bc : conf.getBridgeConfigurations()) {
if (bc.getName().equals("bridge1")) {
Assert.assertEquals("bridge1", bc.getName());
@ -224,7 +229,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals("connection time-to-live", 370, bc.getConnectionTTL());
Assert.assertEquals("bridge-forwarding-address1", bc.getForwardingAddress());
Assert.assertEquals("sku > 1", bc.getFilterString());
Assert.assertEquals("org.foo.BridgeTransformer", bc.getTransformerClassName());
Assert.assertEquals("org.foo.BridgeTransformer", bc.getTransformerConfiguration().getClassName());
Assert.assertEquals(3, bc.getRetryInterval());
Assert.assertEquals(0.2, bc.getRetryIntervalMultiplier(), 0.0001);
assertEquals("max retry interval", 10002, bc.getMaxRetryInterval());
@ -234,15 +239,21 @@ public class FileConfigurationTest extends ConfigurationImplTest {
Assert.assertEquals(null, bc.getDiscoveryGroupName());
Assert.assertEquals(444, bc.getProducerWindowSize());
Assert.assertEquals(1073741824, bc.getConfirmationWindowSize());
} else {
} else if (bc.getName().equals("bridge2")) {
Assert.assertEquals("bridge2", bc.getName());
Assert.assertEquals("queue2", bc.getQueueName());
Assert.assertEquals("bridge-forwarding-address2", bc.getForwardingAddress());
Assert.assertEquals(null, bc.getFilterString());
Assert.assertEquals(null, bc.getTransformerClassName());
Assert.assertEquals(null, bc.getTransformerConfiguration());
Assert.assertEquals(null, bc.getStaticConnectors());
Assert.assertEquals("dg1", bc.getDiscoveryGroupName());
Assert.assertEquals(568320, bc.getProducerWindowSize());
} else {
Assert.assertEquals("bridge3", bc.getName());
Assert.assertEquals("org.foo.BridgeTransformer3", bc.getTransformerConfiguration().getClassName());
Assert.assertEquals("bridgeTransformerValue1", bc.getTransformerConfiguration().getProperties().get("bridgeTransformerKey1"));
Assert.assertEquals("bridgeTransformerValue2", bc.getTransformerConfiguration().getProperties().get("bridgeTransformerKey2"));
}
}

View File

@ -125,6 +125,18 @@
<transformer-class-name>org.foo.Transformer2</transformer-class-name>
<exclusive>false</exclusive>
</divert>
<divert name="divert3">
<routing-name>routing-name2</routing-name>
<address>address2</address>
<forwarding-address>forwarding-address2</forwarding-address>
<filter string="speed &lt; 88"/>
<transformer>
<class-name>org.foo.DivertTransformer3</class-name>
<property key="divertTransformerKey1" value="divertTransformerValue1"/>
<property key="divertTransformerKey2" value="divertTransformerValue2"/>
</transformer>
<exclusive>false</exclusive>
</divert>
</diverts>
<amqp-use-core-subscription-naming>true</amqp-use-core-subscription-naming>
<queues>
@ -166,6 +178,17 @@
<producer-window-size>555k</producer-window-size>
<discovery-group-ref discovery-group-name="dg1"/>
</bridge>
<bridge name="bridge3">
<queue-name>queue3</queue-name>
<forwarding-address>bridge-forwarding-address2</forwarding-address>
<transformer>
<class-name>org.foo.BridgeTransformer3</class-name>
<property key="bridgeTransformerKey1" value="bridgeTransformerValue1"/>
<property key="bridgeTransformerKey2" value="bridgeTransformerValue2"/>
</transformer>
<producer-window-size>555k</producer-window-size>
<discovery-group-ref discovery-group-name="dg1"/>
</bridge>
</bridges>
<ha-policy>
<!--only one of the following-->

View File

@ -1091,6 +1091,14 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="transformer" type="transformerType" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
optional transformer configuration
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="min-large-message-size" type="xsd:int" default="102400" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
@ -1246,6 +1254,44 @@
</xsd:attribute>
</xsd:complexType>
<!-- TRANSFORMER CONFIGURATION -->
<xsd:complexType name="transformerType">
<xsd:sequence>
<xsd:element name="class-name" type="xsd:string" maxOccurs="1" minOccurs="1">
<xsd:annotation>
<xsd:documentation>
optional name of transformer class
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="property" type="transformerProperty" maxOccurs="unbounded" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
properties to configure the transformer class
</xsd:documentation>
</xsd:annotation>
</xsd:element>
</xsd:sequence>
</xsd:complexType>
<xsd:complexType name="transformerProperty">
<xsd:attribute name="key" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
key for the property
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="value" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
value for the property
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- CLUSTER CONNECTION CONFIGURATION -->
@ -1530,6 +1576,14 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="transformer" type="transformerType" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
optional transformer configuration
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="exclusive" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>

View File

@ -92,7 +92,7 @@ Let's take a look at all the parameters in turn:
- `transformer-class-name`. An optional transformer-class-name can be
specified. This is the name of a user-defined class which implements
the `org.apache.activemq.artemis.core.server.cluster.Transformer` interface.
the `org.apache.activemq.artemis.core.server.transformer.Transformer` interface.
If this is specified then the transformer's `transform()` method
will be invoked with the message before it is forwarded. This gives

View File

@ -18,7 +18,7 @@ package org.apache.activemq.artemis.jms.example;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.cluster.Transformer;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
public class HatColourChangeTransformer implements Transformer {

View File

@ -18,7 +18,7 @@ package org.apache.activemq.artemis.jms.example;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.cluster.Transformer;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
public class AddForwardingTimeTransformer implements Transformer {

View File

@ -62,8 +62,9 @@ under the License.
<address>priceUpdates</address>
<forwarding-address>priceForwarding</forwarding-address>
<filter string="office='New York'"/>
<transformer-class-name>org.apache.activemq.artemis.jms.example.AddForwardingTimeTransformer
</transformer-class-name>
<transformer>
<class-name>org.apache.activemq.artemis.jms.example.AddForwardingTimeTransformer</class-name>
</transformer>
<exclusive>true</exclusive>
</divert>
</diverts>

View File

@ -51,6 +51,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
@ -69,7 +70,8 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.Transformer;
import org.apache.activemq.artemis.core.server.transformer.AddHeadersTransformer;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
@ -948,7 +950,7 @@ public class BridgeTest extends ActiveMQTestBase {
internaltestWithTransformer(true);
}
public void internaltestWithTransformer(final boolean useFiles) throws Exception {
private void internaltestWithTransformer(final boolean useFiles) throws Exception {
Map<String, Object> server0Params = new HashMap<>();
server0 = createClusteredServerWithParams(isNetty(), 0, false, server0Params);
@ -971,7 +973,7 @@ public class BridgeTest extends ActiveMQTestBase {
ArrayList<String> staticConnectors = new ArrayList<>();
staticConnectors.add(server1tc.getName());
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setTransformerClassName(SimpleTransformer.class.getName()).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(1024).setStaticConnectors(staticConnectors);
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setTransformerConfiguration(new TransformerConfiguration().setClassName(SimpleTransformer.class.getName())).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(1024).setStaticConnectors(staticConnectors);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
bridgeConfigs.add(bridgeConfiguration);
@ -1051,6 +1053,114 @@ public class BridgeTest extends ActiveMQTestBase {
}
}
@Test
public void testWithTransformerProperties() throws Exception {
final String propKey = "bridged";
final String propValue = "true";
TransformerConfiguration transformerConfiguration = new TransformerConfiguration().setClassName(AddHeadersTransformer.class.getName());
transformerConfiguration.getProperties().put(propKey, propValue);
Map<String, Object> server0Params = new HashMap<>();
server0 = createClusteredServerWithParams(isNetty(), 0, false, server0Params);
Map<String, Object> server1Params = new HashMap<>();
addTargetParameters(server1Params);
server1 = createClusteredServerWithParams(isNetty(), 1, false, server1Params);
final String testAddress = "testAddress";
final String queueName0 = "queue0";
final String forwardAddress = "forwardAddress";
final String queueName1 = "queue1";
Map<String, TransportConfiguration> connectors = new HashMap<>();
TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
connectors.put(server1tc.getName(), server1tc);
server0.getConfiguration().setConnectorConfigurations(connectors);
ArrayList<String> staticConnectors = new ArrayList<>();
staticConnectors.add(server1tc.getName());
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setTransformerConfiguration(transformerConfiguration).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(1024).setStaticConnectors(staticConnectors);
List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
bridgeConfigs.add(bridgeConfiguration);
server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0);
List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<>();
queueConfigs0.add(queueConfig0);
server0.getConfiguration().setQueueConfigurations(queueConfigs0);
CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration().setAddress(forwardAddress).setName(queueName1);
List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<>();
queueConfigs1.add(queueConfig1);
server1.getConfiguration().setQueueConfigurations(queueConfigs1);
server1.start();
server0.start();
locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc));
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
ClientSession session0 = sf0.createSession(false, true, true);
ClientSession session1 = sf1.createSession(false, true, true);
ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
ClientConsumer consumer1 = session1.createConsumer(queueName1);
session1.start();
final int numMessages = 10;
for (int i = 0; i < numMessages; i++) {
ClientMessage message = session0.createMessage(true);
message.getBodyBuffer().writeString("doo be doo be doo be doo");
producer0.send(message);
}
for (int i = 0; i < numMessages; i++) {
ClientMessage message = consumer1.receive(200);
Assert.assertNotNull(message);
String messagePropVal = message.getStringProperty(propKey);
Assert.assertEquals(propValue, messagePropVal);
String sval = message.getBodyBuffer().readString();
Assert.assertEquals("doo be doo be doo be doo", sval);
message.acknowledge();
}
Assert.assertNull(consumer1.receiveImmediate());
session0.close();
session1.close();
sf0.close();
sf1.close();
if (server0.getConfiguration().isPersistenceEnabled()) {
assertEquals(0, loadQueues(server0).size());
}
}
@Test
public void testSawtoothLoad() throws Exception {
Map<String, Object> server0Params = new HashMap<>();

View File

@ -37,7 +37,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.cluster.Transformer;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;

View File

@ -797,6 +797,32 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
proxy.invokeOperation("createDivert", name, routingName, address, forwardingAddress, exclusive, filterString, transformerClassName, routingType);
}
@Override
public void createDivert(String name,
String routingName,
String address,
String forwardingAddress,
boolean exclusive,
String filterString,
String transformerClassName,
Map<String, String> transformerProperties,
String routingType) throws Exception {
proxy.invokeOperation("createDivert", name, routingName, address, forwardingAddress, exclusive, filterString, transformerClassName, transformerProperties, routingType);
}
@Override
public void createDivert(String name,
String routingName,
String address,
String forwardingAddress,
boolean exclusive,
String filterString,
String transformerClassName,
String transformerPropertiesAsJSON,
String routingType) throws Exception {
proxy.invokeOperation("createDivert", name, routingName, address, forwardingAddress, exclusive, filterString, transformerClassName, transformerPropertiesAsJSON, routingType);
}
@Override
public void destroyDivert(String name) throws Exception {
proxy.invokeOperation("destroyDivert", name);
@ -869,6 +895,52 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
proxy.invokeOperation("createBridge", name, queueName, forwardingAddress, filterString, transformerClassName, retryInterval, retryIntervalMultiplier, initialConnectAttempts, reconnectAttempts, useDuplicateDetection, confirmationWindowSize, producerWindowSize, clientFailureCheckPeriod, connectorNames, useDiscovery, ha, user, password);
}
@Override
public void createBridge(String name,
String queueName,
String forwardingAddress,
String filterString,
String transformerClassName,
Map<String, String> transformerProperties,
long retryInterval,
double retryIntervalMultiplier,
int initialConnectAttempts,
int reconnectAttempts,
boolean useDuplicateDetection,
int confirmationWindowSize,
int producerWindowSize,
long clientFailureCheckPeriod,
String connectorNames,
boolean useDiscovery,
boolean ha,
String user,
String password) throws Exception {
proxy.invokeOperation("createBridge", name, queueName, forwardingAddress, filterString, transformerClassName, transformerProperties, retryInterval, retryIntervalMultiplier, initialConnectAttempts, reconnectAttempts, useDuplicateDetection, confirmationWindowSize, producerWindowSize, clientFailureCheckPeriod, connectorNames, useDiscovery, ha, user, password);
}
@Override
public void createBridge(String name,
String queueName,
String forwardingAddress,
String filterString,
String transformerClassName,
String transformerPropertiesAsJSON,
long retryInterval,
double retryIntervalMultiplier,
int initialConnectAttempts,
int reconnectAttempts,
boolean useDuplicateDetection,
int confirmationWindowSize,
int producerWindowSize,
long clientFailureCheckPeriod,
String connectorNames,
boolean useDiscovery,
boolean ha,
String user,
String password) throws Exception {
proxy.invokeOperation("createBridge", name, queueName, forwardingAddress, filterString, transformerClassName, transformerPropertiesAsJSON, retryInterval, retryIntervalMultiplier, initialConnectAttempts, reconnectAttempts, useDuplicateDetection, confirmationWindowSize, producerWindowSize, clientFailureCheckPeriod, connectorNames, useDiscovery, ha, user, password);
}
@Override
public void createBridge(String name,
String queueName,

View File

@ -61,7 +61,9 @@ public class DivertControlTest extends ManagementTestBase {
Assert.assertEquals(divertConfig.getForwardingAddress(), divertControl.getForwardingAddress());
Assert.assertEquals(divertConfig.getTransformerClassName(), divertControl.getTransformerClassName());
Assert.assertEquals(divertConfig.getTransformerConfiguration().getClassName(), divertControl.getTransformerClassName());
Assert.assertEquals(divertConfig.getTransformerConfiguration().getProperties(), divertControl.getTransformerProperties());
}
// Package protected ---------------------------------------------

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.tests.integration.management;
import java.util.Map;
import org.apache.activemq.artemis.api.core.management.DivertControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
@ -61,6 +63,16 @@ public class DivertControlUsingCoreTest extends DivertControlTest {
return (String) proxy.retrieveAttributeValue("transformerClassName");
}
@Override
public String getTransformerPropertiesAsJSON() {
return (String) proxy.retrieveAttributeValue("transformerPropertiesAsJSON");
}
@Override
public Map<String, String> getTransformerProperties() {
return (Map<String, String>) proxy.retrieveAttributeValue("transformerProperties");
}
@Override
public String getRoutingType() {
return (String) proxy.retrieveAttributeValue("routingType");