This closes #1569
This commit is contained in:
commit
8886ec2924
|
@ -248,10 +248,10 @@ public final class JsonUtil {
|
|||
return array.build();
|
||||
}
|
||||
|
||||
public static JsonObject toJsonObject(Map<String, Object> map) {
|
||||
public static JsonObject toJsonObject(Map<String, ?> map) {
|
||||
JsonObjectBuilder jsonObjectBuilder = JsonLoader.createObjectBuilder();
|
||||
if (map != null) {
|
||||
for (Map.Entry<String, Object> entry : map.entrySet()) {
|
||||
for (Map.Entry<String, ?> entry : map.entrySet()) {
|
||||
addToObject(entry.getKey(), entry.getValue(), jsonObjectBuilder);
|
||||
}
|
||||
}
|
||||
|
@ -266,6 +266,14 @@ public final class JsonUtil {
|
|||
return Json.createReader(new StringReader(jsonString)).readObject();
|
||||
}
|
||||
|
||||
public static Map<String, String> readJsonProperties(String jsonString) {
|
||||
Map<String, String> properties = new HashMap<>();
|
||||
if (jsonString != null) {
|
||||
JsonUtil.readJsonObject(jsonString).forEach((k, v) -> properties.put(k, v.toString()));
|
||||
}
|
||||
return properties;
|
||||
}
|
||||
|
||||
public static Object convertJsonValue(Object jsonValue, Class desiredType) {
|
||||
if (jsonValue instanceof JsonNumber) {
|
||||
JsonNumber number = (JsonNumber) jsonValue;
|
||||
|
|
|
@ -1028,6 +1028,28 @@ public interface ActiveMQServerControl {
|
|||
@Parameter(name = "transformerClassName", desc = "Class name of the divert's transformer") String transformerClassName,
|
||||
@Parameter(name = "routingType", desc = "How should the routing-type on the diverted messages be set?") String routingType) throws Exception;
|
||||
|
||||
@Operation(desc = "Create a Divert", impact = MBeanOperationInfo.ACTION)
|
||||
void createDivert(@Parameter(name = "name", desc = "Name of the divert") String name,
|
||||
@Parameter(name = "routingName", desc = "Routing name of the divert") String routingName,
|
||||
@Parameter(name = "address", desc = "Address to divert from") String address,
|
||||
@Parameter(name = "forwardingAddress", desc = "Address to divert to") String forwardingAddress,
|
||||
@Parameter(name = "exclusive", desc = "Is the divert exclusive?") boolean exclusive,
|
||||
@Parameter(name = "filterString", desc = "Filter of the divert") String filterString,
|
||||
@Parameter(name = "transformerClassName", desc = "Class name of the divert's transformer") String transformerClassName,
|
||||
@Parameter(name = "transformerProperties", desc = "Configuration properties of the divert's transformer") Map<String, String> transformerProperties,
|
||||
@Parameter(name = "routingType", desc = "How should the routing-type on the diverted messages be set?") String routingType) throws Exception;
|
||||
|
||||
@Operation(desc = "Create a Divert", impact = MBeanOperationInfo.ACTION)
|
||||
void createDivert(@Parameter(name = "name", desc = "Name of the divert") String name,
|
||||
@Parameter(name = "routingName", desc = "Routing name of the divert") String routingName,
|
||||
@Parameter(name = "address", desc = "Address to divert from") String address,
|
||||
@Parameter(name = "forwardingAddress", desc = "Address to divert to") String forwardingAddress,
|
||||
@Parameter(name = "exclusive", desc = "Is the divert exclusive?") boolean exclusive,
|
||||
@Parameter(name = "filterString", desc = "Filter of the divert") String filterString,
|
||||
@Parameter(name = "transformerClassName", desc = "Class name of the divert's transformer") String transformerClassName,
|
||||
@Parameter(name = "transformerPropertiesAsJSON", desc = "Configuration properties of the divert's transformer in JSON form") String transformerPropertiesAsJSON,
|
||||
@Parameter(name = "routingType", desc = "How should the routing-type on the diverted messages be set?") String routingType) throws Exception;
|
||||
|
||||
@Operation(desc = "Destroy a Divert", impact = MBeanOperationInfo.ACTION)
|
||||
void destroyDivert(@Parameter(name = "name", desc = "Name of the divert") String name) throws Exception;
|
||||
|
||||
|
@ -1054,6 +1076,48 @@ public interface ActiveMQServerControl {
|
|||
@Parameter(name = "user", desc = "User name") String user,
|
||||
@Parameter(name = "password", desc = "User password") String password) throws Exception;
|
||||
|
||||
@Operation(desc = "Create a Bridge", impact = MBeanOperationInfo.ACTION)
|
||||
void createBridge(@Parameter(name = "name", desc = "Name of the bridge") String name,
|
||||
@Parameter(name = "queueName", desc = "Name of the source queue") String queueName,
|
||||
@Parameter(name = "forwardingAddress", desc = "Forwarding address") String forwardingAddress,
|
||||
@Parameter(name = "filterString", desc = "Filter of the bridge") String filterString,
|
||||
@Parameter(name = "transformerClassName", desc = "Class name of the bridge transformer") String transformerClassName,
|
||||
@Parameter(name = "transformerProperties", desc = "Configuration properties of the bridge transformer") Map<String, String> transformerProperties,
|
||||
@Parameter(name = "retryInterval", desc = "Connection retry interval") long retryInterval,
|
||||
@Parameter(name = "retryIntervalMultiplier", desc = "Connection retry interval multiplier") double retryIntervalMultiplier,
|
||||
@Parameter(name = "initialConnectAttempts", desc = "Number of initial connection attempts") int initialConnectAttempts,
|
||||
@Parameter(name = "reconnectAttempts", desc = "Number of reconnection attempts") int reconnectAttempts,
|
||||
@Parameter(name = "useDuplicateDetection", desc = "Use duplicate detection") boolean useDuplicateDetection,
|
||||
@Parameter(name = "confirmationWindowSize", desc = "Confirmation window size") int confirmationWindowSize,
|
||||
@Parameter(name = "producerWindowSize", desc = "Producer window size") int producerWindowSize,
|
||||
@Parameter(name = "clientFailureCheckPeriod", desc = "Period to check client failure") long clientFailureCheckPeriod,
|
||||
@Parameter(name = "staticConnectorNames", desc = "comma separated list of connector names or name of discovery group if 'useDiscoveryGroup' is set to true") String connectorNames,
|
||||
@Parameter(name = "useDiscoveryGroup", desc = "use discovery group") boolean useDiscoveryGroup,
|
||||
@Parameter(name = "ha", desc = "Is it using HA") boolean ha,
|
||||
@Parameter(name = "user", desc = "User name") String user,
|
||||
@Parameter(name = "password", desc = "User password") String password) throws Exception;
|
||||
|
||||
@Operation(desc = "Create a Bridge", impact = MBeanOperationInfo.ACTION)
|
||||
void createBridge(@Parameter(name = "name", desc = "Name of the bridge") String name,
|
||||
@Parameter(name = "queueName", desc = "Name of the source queue") String queueName,
|
||||
@Parameter(name = "forwardingAddress", desc = "Forwarding address") String forwardingAddress,
|
||||
@Parameter(name = "filterString", desc = "Filter of the bridge") String filterString,
|
||||
@Parameter(name = "transformerClassName", desc = "Class name of the bridge transformer") String transformerClassName,
|
||||
@Parameter(name = "transformerPropertiesAsJSON", desc = "Configuration properties of the bridge transformer in JSON form") String transformerPropertiesAsJSON,
|
||||
@Parameter(name = "retryInterval", desc = "Connection retry interval") long retryInterval,
|
||||
@Parameter(name = "retryIntervalMultiplier", desc = "Connection retry interval multiplier") double retryIntervalMultiplier,
|
||||
@Parameter(name = "initialConnectAttempts", desc = "Number of initial connection attempts") int initialConnectAttempts,
|
||||
@Parameter(name = "reconnectAttempts", desc = "Number of reconnection attempts") int reconnectAttempts,
|
||||
@Parameter(name = "useDuplicateDetection", desc = "Use duplicate detection") boolean useDuplicateDetection,
|
||||
@Parameter(name = "confirmationWindowSize", desc = "Confirmation window size") int confirmationWindowSize,
|
||||
@Parameter(name = "producerWindowSize", desc = "Producer window size") int producerWindowSize,
|
||||
@Parameter(name = "clientFailureCheckPeriod", desc = "Period to check client failure") long clientFailureCheckPeriod,
|
||||
@Parameter(name = "staticConnectorNames", desc = "comma separated list of connector names or name of discovery group if 'useDiscoveryGroup' is set to true") String connectorNames,
|
||||
@Parameter(name = "useDiscoveryGroup", desc = "use discovery group") boolean useDiscoveryGroup,
|
||||
@Parameter(name = "ha", desc = "Is it using HA") boolean ha,
|
||||
@Parameter(name = "user", desc = "User name") String user,
|
||||
@Parameter(name = "password", desc = "User password") String password) throws Exception;
|
||||
|
||||
@Operation(desc = "Create a Bridge", impact = MBeanOperationInfo.ACTION)
|
||||
void createBridge(@Parameter(name = "name", desc = "Name of the bridge") String name,
|
||||
@Parameter(name = "queueName", desc = "Name of the source queue") String queueName,
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.api.core.management;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* A BridgeControl is used to manage a Bridge.
|
||||
*/
|
||||
|
@ -51,6 +53,18 @@ public interface BridgeControl extends ActiveMQComponentControl {
|
|||
@Attribute(desc = "name of the org.apache.activemq.artemis.core.server.cluster.Transformer implementation associated with this bridge")
|
||||
String getTransformerClassName();
|
||||
|
||||
/**
|
||||
* Returns a map of the properties configured for the transformer.
|
||||
*/
|
||||
@Attribute(desc = "map of key, value pairs used to configure the transformer in JSON form")
|
||||
String getTransformerPropertiesAsJSON() throws Exception;
|
||||
|
||||
/**
|
||||
* Returns a map of the properties configured for the transformer.
|
||||
*/
|
||||
@Attribute(desc = "map of key, value pairs used to configure the transformer")
|
||||
Map<String, String> getTransformerProperties() throws Exception;
|
||||
|
||||
/**
|
||||
* Returns any list of static connectors used by this bridge
|
||||
*/
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.api.core.management;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* A DivertControl is used to manage a divert.
|
||||
*/
|
||||
|
@ -66,6 +68,18 @@ public interface DivertControl {
|
|||
@Attribute(desc = "name of the org.apache.activemq.artemis.core.server.cluster.Transformer implementation associated with this divert")
|
||||
String getTransformerClassName();
|
||||
|
||||
/**
|
||||
* Returns a map of the properties configured for the transformer.
|
||||
*/
|
||||
@Attribute(desc = "map of key, value pairs used to configure the transformer in JSON form")
|
||||
String getTransformerPropertiesAsJSON();
|
||||
|
||||
/**
|
||||
* Returns a map of the properties configured for the transformer.
|
||||
*/
|
||||
@Attribute(desc = "map of key, value pairs used to configure the transformer")
|
||||
Map<String, String> getTransformerProperties() throws Exception;
|
||||
|
||||
/**
|
||||
* Returns the routing type used by this divert.
|
||||
*/
|
||||
|
|
|
@ -40,7 +40,7 @@ public final class BridgeConfiguration implements Serializable {
|
|||
|
||||
private boolean ha = false;
|
||||
|
||||
private String transformerClassName = null;
|
||||
private TransformerConfiguration transformerConfiguration = null;
|
||||
|
||||
private long retryInterval = ActiveMQClient.DEFAULT_RETRY_INTERVAL;
|
||||
|
||||
|
@ -150,15 +150,15 @@ public final class BridgeConfiguration implements Serializable {
|
|||
return this;
|
||||
}
|
||||
|
||||
public String getTransformerClassName() {
|
||||
return transformerClassName;
|
||||
public TransformerConfiguration getTransformerConfiguration() {
|
||||
return transformerConfiguration;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param transformerClassName the transformerClassName to set
|
||||
* @param transformerConfiguration the transformerConfiguration to set
|
||||
*/
|
||||
public BridgeConfiguration setTransformerClassName(final String transformerClassName) {
|
||||
this.transformerClassName = transformerClassName;
|
||||
public BridgeConfiguration setTransformerConfiguration(final TransformerConfiguration transformerConfiguration) {
|
||||
this.transformerConfiguration = transformerConfiguration;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -373,7 +373,7 @@ public final class BridgeConfiguration implements Serializable {
|
|||
temp = Double.doubleToLongBits(retryIntervalMultiplier);
|
||||
result = prime * result + (int) (temp ^ (temp >>> 32));
|
||||
result = prime * result + ((staticConnectors == null) ? 0 : staticConnectors.hashCode());
|
||||
result = prime * result + ((transformerClassName == null) ? 0 : transformerClassName.hashCode());
|
||||
result = prime * result + ((transformerConfiguration == null) ? 0 : transformerConfiguration.hashCode());
|
||||
result = prime * result + (useDuplicateDetection ? 1231 : 1237);
|
||||
result = prime * result + ((user == null) ? 0 : user.hashCode());
|
||||
return result;
|
||||
|
@ -447,10 +447,10 @@ public final class BridgeConfiguration implements Serializable {
|
|||
return false;
|
||||
} else if (!staticConnectors.equals(other.staticConnectors))
|
||||
return false;
|
||||
if (transformerClassName == null) {
|
||||
if (other.transformerClassName != null)
|
||||
if (transformerConfiguration == null) {
|
||||
if (other.transformerConfiguration != null)
|
||||
return false;
|
||||
} else if (!transformerClassName.equals(other.transformerClassName))
|
||||
} else if (!transformerConfiguration.equals(other.transformerConfiguration))
|
||||
return false;
|
||||
if (useDuplicateDetection != other.useDuplicateDetection)
|
||||
return false;
|
||||
|
|
|
@ -38,7 +38,7 @@ public class DivertConfiguration implements Serializable {
|
|||
|
||||
private String filterString = null;
|
||||
|
||||
private String transformerClassName = null;
|
||||
private TransformerConfiguration transformerConfiguration = null;
|
||||
|
||||
private DivertConfigurationRoutingType routingType = DivertConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultDivertRoutingType());
|
||||
|
||||
|
@ -69,8 +69,8 @@ public class DivertConfiguration implements Serializable {
|
|||
return filterString;
|
||||
}
|
||||
|
||||
public String getTransformerClassName() {
|
||||
return transformerClassName;
|
||||
public TransformerConfiguration getTransformerConfiguration() {
|
||||
return transformerConfiguration;
|
||||
}
|
||||
|
||||
public DivertConfigurationRoutingType getRoutingType() {
|
||||
|
@ -130,10 +130,10 @@ public class DivertConfiguration implements Serializable {
|
|||
}
|
||||
|
||||
/**
|
||||
* @param transformerClassName the transformerClassName to set
|
||||
* @param transformerConfiguration the transformerConfiguration to set
|
||||
*/
|
||||
public DivertConfiguration setTransformerClassName(final String transformerClassName) {
|
||||
this.transformerClassName = transformerClassName;
|
||||
public DivertConfiguration setTransformerConfiguration(final TransformerConfiguration transformerConfiguration) {
|
||||
this.transformerConfiguration = transformerConfiguration;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -155,7 +155,7 @@ public class DivertConfiguration implements Serializable {
|
|||
result = prime * result + ((forwardingAddress == null) ? 0 : forwardingAddress.hashCode());
|
||||
result = prime * result + ((name == null) ? 0 : name.hashCode());
|
||||
result = prime * result + ((routingName == null) ? 0 : routingName.hashCode());
|
||||
result = prime * result + ((transformerClassName == null) ? 0 : transformerClassName.hashCode());
|
||||
result = prime * result + ((transformerConfiguration == null) ? 0 : transformerConfiguration.hashCode());
|
||||
result = prime * result + ((routingType == null) ? 0 : routingType.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
@ -196,10 +196,10 @@ public class DivertConfiguration implements Serializable {
|
|||
return false;
|
||||
} else if (!routingName.equals(other.routingName))
|
||||
return false;
|
||||
if (transformerClassName == null) {
|
||||
if (other.transformerClassName != null)
|
||||
if (transformerConfiguration == null) {
|
||||
if (other.transformerConfiguration != null)
|
||||
return false;
|
||||
} else if (!transformerClassName.equals(other.transformerClassName))
|
||||
} else if (!transformerConfiguration.equals(other.transformerConfiguration))
|
||||
return false;
|
||||
if (routingType == null) {
|
||||
if (other.routingType != null)
|
||||
|
|
|
@ -0,0 +1,91 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.config;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public final class TransformerConfiguration implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = -1057244274380572226L;
|
||||
|
||||
private String className = null;
|
||||
|
||||
private Map<String, String> properties = new HashMap<>();
|
||||
|
||||
public TransformerConfiguration() {
|
||||
}
|
||||
|
||||
public String getClassName() {
|
||||
return className;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param className the class name to set
|
||||
*/
|
||||
public TransformerConfiguration setClassName(final String className) {
|
||||
this.className = className;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Map<String, String> getProperties() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param properties the properties to set
|
||||
*/
|
||||
public TransformerConfiguration setProperties(final Map<String, String> properties) {
|
||||
if (properties != null) {
|
||||
this.properties.putAll(properties);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + ((className == null) ? 0 : className.hashCode());
|
||||
result = prime * result + ((properties == null) ? 0 : properties.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj)
|
||||
return true;
|
||||
if (obj == null)
|
||||
return false;
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
TransformerConfiguration other = (TransformerConfiguration) obj;
|
||||
if (className == null) {
|
||||
if (other.className != null)
|
||||
return false;
|
||||
} else if (!className.equals(other.className))
|
||||
return false;
|
||||
if (properties == null) {
|
||||
if (other.properties != null)
|
||||
return false;
|
||||
} else if (!properties.equals(other.properties))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
|
@ -31,6 +31,7 @@ import java.util.Set;
|
|||
|
||||
import org.apache.activemq.artemis.ArtemisConstants;
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
|
||||
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
|
||||
import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
|
||||
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
|
||||
|
@ -1626,6 +1627,27 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
mainConfiguration.setGroupingHandlerConfiguration(new GroupingHandlerConfiguration().setName(new SimpleString(name)).setType(type.equals(GroupingHandlerConfiguration.TYPE.LOCAL.getType()) ? GroupingHandlerConfiguration.TYPE.LOCAL : GroupingHandlerConfiguration.TYPE.REMOTE).setAddress(new SimpleString(address)).setTimeout(timeout).setGroupTimeout(groupTimeout).setReaperPeriod(reaperPeriod));
|
||||
}
|
||||
|
||||
private TransformerConfiguration getTransformerConfiguration(final Node node) {
|
||||
Element element = (Element) node;
|
||||
String className = getString(element, "class-name", null, Validators.NO_CHECK);
|
||||
|
||||
Map<String, String> properties = new HashMap<>();
|
||||
NodeList children = element.getChildNodes();
|
||||
for (int j = 0; j < children.getLength(); j++) {
|
||||
Node child = children.item(j);
|
||||
if (child.getNodeName().equals("property")) {
|
||||
String key = getAttributeValue(child, "key");
|
||||
String value = getAttributeValue(child, "value");
|
||||
properties.put(key, value);
|
||||
}
|
||||
}
|
||||
return new TransformerConfiguration().setClassName(className).setProperties(properties);
|
||||
}
|
||||
|
||||
private TransformerConfiguration getTransformerConfiguration(final String transformerClassName) {
|
||||
return new TransformerConfiguration().setClassName(transformerClassName).setProperties(Collections.EMPTY_MAP);
|
||||
}
|
||||
|
||||
private void parseBridgeConfiguration(final Element brNode, final Configuration mainConfig) throws Exception {
|
||||
String name = brNode.getAttribute("name");
|
||||
|
||||
|
@ -1684,6 +1706,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
boolean ha = getBoolean(brNode, "ha", false);
|
||||
|
||||
TransformerConfiguration transformerConfiguration = null;
|
||||
|
||||
String filterString = null;
|
||||
|
||||
List<String> staticConnectorNames = new ArrayList<>();
|
||||
|
@ -1701,10 +1725,16 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
discoveryGroupName = child.getAttributes().getNamedItem("discovery-group-name").getNodeValue();
|
||||
} else if (child.getNodeName().equals("static-connectors")) {
|
||||
getStaticConnectors(staticConnectorNames, child);
|
||||
} else if (child.getNodeName().equals("transformer")) {
|
||||
transformerConfiguration = getTransformerConfiguration(child);
|
||||
}
|
||||
}
|
||||
|
||||
BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerClassName(transformerClassName).setMinLargeMessageSize(minLargeMessageSize).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setRetryInterval(retryInterval).setMaxRetryInterval(maxRetryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setReconnectAttemptsOnSameNode(reconnectAttemptsSameNode).setUseDuplicateDetection(useDuplicateDetection).setConfirmationWindowSize(confirmationWindowSize).setProducerWindowSize(producerWindowSize).setHA(ha).setUser(user).setPassword(password);
|
||||
if (transformerConfiguration == null && transformerClassName != null) {
|
||||
transformerConfiguration = getTransformerConfiguration(transformerClassName);
|
||||
}
|
||||
|
||||
BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setMinLargeMessageSize(minLargeMessageSize).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setRetryInterval(retryInterval).setMaxRetryInterval(maxRetryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setReconnectAttemptsOnSameNode(reconnectAttemptsSameNode).setUseDuplicateDetection(useDuplicateDetection).setConfirmationWindowSize(confirmationWindowSize).setProducerWindowSize(producerWindowSize).setHA(ha).setUser(user).setPassword(password);
|
||||
|
||||
if (!staticConnectorNames.isEmpty()) {
|
||||
config.setStaticConnectors(staticConnectorNames);
|
||||
|
@ -1742,6 +1772,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
DivertConfigurationRoutingType routingType = DivertConfigurationRoutingType.valueOf(getString(e, "routing-type", ActiveMQDefaultConfiguration.getDefaultDivertRoutingType(), Validators.DIVERT_ROUTING_TYPE));
|
||||
|
||||
TransformerConfiguration transformerConfiguration = null;
|
||||
|
||||
String filterString = null;
|
||||
|
||||
NodeList children = e.getChildNodes();
|
||||
|
@ -1751,16 +1783,22 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
|
||||
if (child.getNodeName().equals("filter")) {
|
||||
filterString = getAttributeValue(child, "string");
|
||||
} else if (child.getNodeName().equals("transformer")) {
|
||||
transformerConfiguration = getTransformerConfiguration(child);
|
||||
}
|
||||
}
|
||||
|
||||
DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerClassName(transformerClassName).setRoutingType(routingType);
|
||||
if (transformerConfiguration == null && transformerClassName != null) {
|
||||
transformerConfiguration = getTransformerConfiguration(transformerClassName);
|
||||
}
|
||||
|
||||
DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setRoutingType(routingType);
|
||||
|
||||
mainConfig.getDivertConfigurations().add(config);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param node
|
||||
* @param e
|
||||
* @return
|
||||
*/
|
||||
protected void parseWildcardConfiguration(final Element e, final Configuration mainConfig) {
|
||||
|
|
|
@ -48,6 +48,7 @@ import java.util.stream.Collectors;
|
|||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.JsonUtil;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||
|
@ -64,6 +65,7 @@ import org.apache.activemq.artemis.core.config.BridgeConfiguration;
|
|||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.DivertConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
|
||||
import org.apache.activemq.artemis.core.filter.Filter;
|
||||
import org.apache.activemq.artemis.core.management.impl.view.AddressView;
|
||||
import org.apache.activemq.artemis.core.management.impl.view.ConnectionView;
|
||||
|
@ -2262,11 +2264,38 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
|||
final String filterString,
|
||||
final String transformerClassName,
|
||||
final String routingType) throws Exception {
|
||||
createDivert(name, routingName, address, forwardingAddress, exclusive, filterString, transformerClassName, (String) null, routingType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createDivert(final String name,
|
||||
final String routingName,
|
||||
final String address,
|
||||
final String forwardingAddress,
|
||||
final boolean exclusive,
|
||||
final String filterString,
|
||||
final String transformerClassName,
|
||||
final String transformerPropertiesAsJSON,
|
||||
final String routingType) throws Exception {
|
||||
createDivert(name, routingName, address, forwardingAddress, exclusive, filterString, transformerClassName, JsonUtil.readJsonProperties(transformerPropertiesAsJSON), routingType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createDivert(final String name,
|
||||
final String routingName,
|
||||
final String address,
|
||||
final String forwardingAddress,
|
||||
final boolean exclusive,
|
||||
final String filterString,
|
||||
final String transformerClassName,
|
||||
final Map<String, String> transformerProperties,
|
||||
final String routingType) throws Exception {
|
||||
checkStarted();
|
||||
|
||||
clearIO();
|
||||
try {
|
||||
DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerClassName(transformerClassName).setRoutingType(DivertConfigurationRoutingType.valueOf(routingType));
|
||||
TransformerConfiguration transformerConfiguration = transformerClassName == null ? null : new TransformerConfiguration().setClassName(transformerClassName).setProperties(transformerProperties);
|
||||
DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setRoutingType(DivertConfigurationRoutingType.valueOf(routingType));
|
||||
server.deployDivert(config);
|
||||
} finally {
|
||||
blockOnIO();
|
||||
|
@ -2323,12 +2352,95 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
|||
final boolean ha,
|
||||
final String user,
|
||||
final String password) throws Exception {
|
||||
createBridge(name,
|
||||
queueName,
|
||||
forwardingAddress,
|
||||
filterString,
|
||||
transformerClassName,
|
||||
(String) null,
|
||||
retryInterval,
|
||||
retryIntervalMultiplier,
|
||||
initialConnectAttempts,
|
||||
reconnectAttempts,
|
||||
useDuplicateDetection,
|
||||
confirmationWindowSize,
|
||||
producerWindowSize,
|
||||
clientFailureCheckPeriod,
|
||||
staticConnectorsOrDiscoveryGroup,
|
||||
useDiscoveryGroup,
|
||||
ha,
|
||||
user,
|
||||
password);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createBridge(final String name,
|
||||
final String queueName,
|
||||
final String forwardingAddress,
|
||||
final String filterString,
|
||||
final String transformerClassName,
|
||||
final String transformerPropertiesAsJSON,
|
||||
final long retryInterval,
|
||||
final double retryIntervalMultiplier,
|
||||
final int initialConnectAttempts,
|
||||
final int reconnectAttempts,
|
||||
final boolean useDuplicateDetection,
|
||||
final int confirmationWindowSize,
|
||||
final int producerWindowSize,
|
||||
final long clientFailureCheckPeriod,
|
||||
final String staticConnectorsOrDiscoveryGroup,
|
||||
boolean useDiscoveryGroup,
|
||||
final boolean ha,
|
||||
final String user,
|
||||
final String password) throws Exception {
|
||||
createBridge(name,
|
||||
queueName,
|
||||
forwardingAddress,
|
||||
filterString,
|
||||
transformerClassName,
|
||||
JsonUtil.readJsonProperties(transformerPropertiesAsJSON),
|
||||
retryInterval,
|
||||
retryIntervalMultiplier,
|
||||
initialConnectAttempts,
|
||||
reconnectAttempts,
|
||||
useDuplicateDetection,
|
||||
confirmationWindowSize,
|
||||
producerWindowSize,
|
||||
clientFailureCheckPeriod,
|
||||
staticConnectorsOrDiscoveryGroup,
|
||||
useDiscoveryGroup,
|
||||
ha,
|
||||
user,
|
||||
password);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createBridge(final String name,
|
||||
final String queueName,
|
||||
final String forwardingAddress,
|
||||
final String filterString,
|
||||
final String transformerClassName,
|
||||
final Map<String, String> transformerProperties,
|
||||
final long retryInterval,
|
||||
final double retryIntervalMultiplier,
|
||||
final int initialConnectAttempts,
|
||||
final int reconnectAttempts,
|
||||
final boolean useDuplicateDetection,
|
||||
final int confirmationWindowSize,
|
||||
final int producerWindowSize,
|
||||
final long clientFailureCheckPeriod,
|
||||
final String staticConnectorsOrDiscoveryGroup,
|
||||
boolean useDiscoveryGroup,
|
||||
final boolean ha,
|
||||
final String user,
|
||||
final String password) throws Exception {
|
||||
checkStarted();
|
||||
|
||||
clearIO();
|
||||
|
||||
try {
|
||||
BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerClassName(transformerClassName).setClientFailureCheckPeriod(clientFailureCheckPeriod).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setUseDuplicateDetection(useDuplicateDetection).setConfirmationWindowSize(confirmationWindowSize).setProducerWindowSize(producerWindowSize).setHA(ha).setUser(user).setPassword(password);
|
||||
TransformerConfiguration transformerConfiguration = transformerClassName == null ? null : new TransformerConfiguration().setClassName(transformerClassName).setProperties(transformerProperties);
|
||||
BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setClientFailureCheckPeriod(clientFailureCheckPeriod).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setUseDuplicateDetection(useDuplicateDetection).setConfirmationWindowSize(confirmationWindowSize).setProducerWindowSize(producerWindowSize).setHA(ha).setUser(user).setPassword(password);
|
||||
|
||||
if (useDiscoveryGroup) {
|
||||
config.setDiscoveryGroupName(staticConnectorsOrDiscoveryGroup);
|
||||
|
@ -2365,7 +2477,8 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
|
|||
clearIO();
|
||||
|
||||
try {
|
||||
BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerClassName(transformerClassName).setClientFailureCheckPeriod(clientFailureCheckPeriod).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setUseDuplicateDetection(useDuplicateDetection).setConfirmationWindowSize(confirmationWindowSize).setHA(ha).setUser(user).setPassword(password);
|
||||
TransformerConfiguration transformerConfiguration = transformerClassName == null ? null : new TransformerConfiguration().setClassName(transformerClassName);
|
||||
BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setClientFailureCheckPeriod(clientFailureCheckPeriod).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setUseDuplicateDetection(useDuplicateDetection).setConfirmationWindowSize(confirmationWindowSize).setHA(ha).setUser(user).setPassword(password);
|
||||
|
||||
if (useDiscoveryGroup) {
|
||||
config.setDiscoveryGroupName(staticConnectorsOrDiscoveryGroup);
|
||||
|
|
|
@ -19,7 +19,9 @@ package org.apache.activemq.artemis.core.management.impl;
|
|||
import javax.management.MBeanAttributeInfo;
|
||||
import javax.management.MBeanOperationInfo;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.JsonUtil;
|
||||
import org.apache.activemq.artemis.api.core.management.BridgeControl;
|
||||
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
|
@ -144,7 +146,22 @@ public class BridgeControlImpl extends AbstractControl implements BridgeControl
|
|||
public String getTransformerClassName() {
|
||||
clearIO();
|
||||
try {
|
||||
return configuration.getTransformerClassName();
|
||||
return configuration.getTransformerConfiguration() == null ? null : configuration.getTransformerConfiguration().getClassName();
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTransformerPropertiesAsJSON() {
|
||||
return JsonUtil.toJsonObject(getTransformerProperties()).toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getTransformerProperties() {
|
||||
clearIO();
|
||||
try {
|
||||
return configuration.getTransformerConfiguration() == null ? null : configuration.getTransformerConfiguration().getProperties();
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
|
|
|
@ -19,6 +19,9 @@ package org.apache.activemq.artemis.core.management.impl;
|
|||
import javax.management.MBeanAttributeInfo;
|
||||
import javax.management.MBeanOperationInfo;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.JsonUtil;
|
||||
import org.apache.activemq.artemis.api.core.management.DivertControl;
|
||||
import org.apache.activemq.artemis.core.config.DivertConfiguration;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
|
@ -92,7 +95,22 @@ public class DivertControlImpl extends AbstractControl implements DivertControl
|
|||
public String getTransformerClassName() {
|
||||
clearIO();
|
||||
try {
|
||||
return configuration.getTransformerClassName();
|
||||
return configuration.getTransformerConfiguration() == null ? null : configuration.getTransformerConfiguration().getClassName();
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTransformerPropertiesAsJSON() {
|
||||
return JsonUtil.toJsonObject(getTransformerProperties()).toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getTransformerProperties() {
|
||||
clearIO();
|
||||
try {
|
||||
return configuration.getTransformerConfiguration() == null ? null : configuration.getTransformerConfiguration().getProperties();
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
|
|
|
@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.server;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.filter.Filter;
|
||||
import org.apache.activemq.artemis.core.server.cluster.Transformer;
|
||||
import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
||||
|
||||
public interface Divert extends Bindable {
|
||||
|
||||
|
|
|
@ -24,7 +24,8 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||
import org.apache.activemq.artemis.api.core.BaseInterceptor;
|
||||
import org.apache.activemq.artemis.api.core.Pair;
|
||||
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.cluster.Transformer;
|
||||
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory;
|
||||
|
||||
/**
|
||||
|
@ -87,24 +88,24 @@ public interface ServiceRegistry {
|
|||
List<BaseInterceptor> getOutgoingInterceptors(List<String> classNames);
|
||||
|
||||
/**
|
||||
* Get an instance of org.apache.activemq.artemis.core.server.cluster.Transformer for a divert
|
||||
* Get an instance of org.apache.activemq.artemis.core.server.transformer.Transformer for a divert
|
||||
*
|
||||
* @param name the name of divert for which the transformer will be used
|
||||
* @param className the fully qualified name of the transformer implementation (can be null)
|
||||
* @param transformerConfiguration the transformer configuration
|
||||
* @return
|
||||
*/
|
||||
Transformer getDivertTransformer(String name, String className);
|
||||
Transformer getDivertTransformer(String name, TransformerConfiguration transformerConfiguration);
|
||||
|
||||
void addDivertTransformer(String name, Transformer transformer);
|
||||
|
||||
/**
|
||||
* Get an instance of org.apache.activemq.artemis.core.server.cluster.Transformer for a bridge
|
||||
* Get an instance of org.apache.activemq.artemis.core.server.transformer.Transformer for a bridge
|
||||
*
|
||||
* @param name the name of bridge for which the transformer will be used
|
||||
* @param className the fully qualified name of the transformer implementation (can be null)
|
||||
* @param transformerConfiguration the transformer configuration
|
||||
* @return
|
||||
*/
|
||||
Transformer getBridgeTransformer(String name, String className);
|
||||
Transformer getBridgeTransformer(String name, TransformerConfiguration transformerConfiguration);
|
||||
|
||||
void addBridgeTransformer(String name, Transformer transformer);
|
||||
|
||||
|
|
|
@ -53,6 +53,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
|||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||
import org.apache.activemq.artemis.core.server.NodeManager;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ha.HAManager;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.BroadcastGroupImpl;
|
||||
|
@ -395,7 +396,7 @@ public final class ClusterManager implements ActiveMQComponent {
|
|||
return;
|
||||
}
|
||||
|
||||
Transformer transformer = server.getServiceRegistry().getBridgeTransformer(config.getName(), config.getTransformerClassName());
|
||||
Transformer transformer = server.getServiceRegistry().getBridgeTransformer(config.getName(), config.getTransformerConfiguration());
|
||||
|
||||
Binding binding = postOffice.getBinding(new SimpleString(config.getQueueName()));
|
||||
|
||||
|
|
|
@ -16,9 +16,8 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.server.cluster;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
|
||||
public interface Transformer {
|
||||
|
||||
Message transform(Message message);
|
||||
/**
|
||||
* This is for back compatibility with package move.
|
||||
*/
|
||||
public interface Transformer extends org.apache.activemq.artemis.core.server.transformer.Transformer {
|
||||
}
|
||||
|
|
|
@ -53,7 +53,7 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
|||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.cluster.Bridge;
|
||||
import org.apache.activemq.artemis.core.server.cluster.Transformer;
|
||||
import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
||||
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
|
||||
import org.apache.activemq.artemis.core.server.management.Notification;
|
||||
import org.apache.activemq.artemis.core.server.management.NotificationService;
|
||||
|
|
|
@ -45,7 +45,7 @@ import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtoco
|
|||
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
|
||||
import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord;
|
||||
import org.apache.activemq.artemis.core.server.cluster.Transformer;
|
||||
import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
||||
import org.apache.activemq.artemis.utils.CompositeAddress;
|
||||
import org.apache.activemq.artemis.utils.UUID;
|
||||
import org.apache.activemq.artemis.utils.UUIDGenerator;
|
||||
|
|
|
@ -136,7 +136,7 @@ import org.apache.activemq.artemis.core.server.ServiceComponent;
|
|||
import org.apache.activemq.artemis.core.server.ServiceRegistry;
|
||||
import org.apache.activemq.artemis.core.server.cluster.BackupManager;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
|
||||
import org.apache.activemq.artemis.core.server.cluster.Transformer;
|
||||
import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy;
|
||||
import org.apache.activemq.artemis.core.server.files.FileMoveManager;
|
||||
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
|
||||
|
@ -2051,7 +2051,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
|
||||
SimpleString sAddress = new SimpleString(config.getAddress());
|
||||
|
||||
Transformer transformer = getServiceRegistry().getDivertTransformer(config.getName(), config.getTransformerClassName());
|
||||
Transformer transformer = getServiceRegistry().getDivertTransformer(config.getName(), config.getTransformerConfiguration());
|
||||
|
||||
Filter filter = FilterImpl.createFilter(config.getFilterString());
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.apache.activemq.artemis.core.server.Divert;
|
|||
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
|
||||
import org.apache.activemq.artemis.core.server.RoutingContext;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.core.server.cluster.Transformer;
|
||||
import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
/**
|
||||
|
|
|
@ -30,10 +30,11 @@ import java.util.concurrent.ScheduledExecutorService;
|
|||
import org.apache.activemq.artemis.api.core.BaseInterceptor;
|
||||
import org.apache.activemq.artemis.api.core.Pair;
|
||||
import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||
import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
|
||||
import org.apache.activemq.artemis.core.server.ServiceRegistry;
|
||||
import org.apache.activemq.artemis.core.server.cluster.Transformer;
|
||||
import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory;
|
||||
import org.apache.activemq.artemis.utils.ClassloadingUtil;
|
||||
|
||||
|
@ -153,11 +154,11 @@ public class ServiceRegistryImpl implements ServiceRegistry {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Transformer getDivertTransformer(String name, String className) {
|
||||
public Transformer getDivertTransformer(String name, TransformerConfiguration transformerConfiguration) {
|
||||
Transformer transformer = divertTransformers.get(name);
|
||||
|
||||
if (transformer == null && className != null) {
|
||||
transformer = instantiateTransformer(className);
|
||||
if (transformer == null && transformerConfiguration != null && transformerConfiguration.getClassName() != null) {
|
||||
transformer = instantiateTransformer(transformerConfiguration);
|
||||
addDivertTransformer(name, transformer);
|
||||
}
|
||||
|
||||
|
@ -180,11 +181,11 @@ public class ServiceRegistryImpl implements ServiceRegistry {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Transformer getBridgeTransformer(String name, String className) {
|
||||
public Transformer getBridgeTransformer(String name, TransformerConfiguration transformerConfiguration) {
|
||||
Transformer transformer = bridgeTransformers.get(name);
|
||||
|
||||
if (transformer == null && className != null) {
|
||||
transformer = instantiateTransformer(className);
|
||||
if (transformer == null && transformerConfiguration != null && transformerConfiguration.getClassName() != null) {
|
||||
transformer = instantiateTransformer(transformerConfiguration);
|
||||
addBridgeTransformer(name, transformer);
|
||||
}
|
||||
|
||||
|
@ -218,14 +219,15 @@ public class ServiceRegistryImpl implements ServiceRegistry {
|
|||
});
|
||||
}
|
||||
|
||||
private Transformer instantiateTransformer(final String className) {
|
||||
private Transformer instantiateTransformer(final TransformerConfiguration transformerConfiguration) {
|
||||
Transformer transformer = null;
|
||||
|
||||
if (className != null) {
|
||||
if (transformerConfiguration != null && transformerConfiguration.getClassName() != null) {
|
||||
try {
|
||||
transformer = loadClass(className);
|
||||
transformer = loadClass(transformerConfiguration.getClassName());
|
||||
transformer.init(Collections.unmodifiableMap(transformerConfiguration.getProperties()));
|
||||
} catch (Exception e) {
|
||||
throw ActiveMQMessageBundle.BUNDLE.errorCreatingTransformerClass(e, className);
|
||||
throw ActiveMQMessageBundle.BUNDLE.errorCreatingTransformerClass(e, transformerConfiguration.getClassName());
|
||||
}
|
||||
}
|
||||
return transformer;
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.server.transformer;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
|
||||
public class AddHeadersTransformer implements Transformer {
|
||||
|
||||
private Map<SimpleString, SimpleString> headers = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public void init(Map<String, String> properties) {
|
||||
properties.forEach((k,v) -> headers.put(SimpleString.toSimpleString(k), SimpleString.toSimpleString(v)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message transform(Message message) {
|
||||
headers.forEach(message::putStringProperty);
|
||||
return message;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.server.transformer;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
|
||||
public interface Transformer {
|
||||
|
||||
default void init(Map<String, String> properties) { }
|
||||
|
||||
Message transform(Message message);
|
||||
}
|
|
@ -1262,6 +1262,14 @@
|
|||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="transformer" type="transformerType" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
optional transformer configuration
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="min-large-message-size" type="xsd:string" default="102400" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
|
@ -1418,6 +1426,44 @@
|
|||
</xsd:attribute>
|
||||
</xsd:complexType>
|
||||
|
||||
<!-- TRANSFORMER CONFIGURATION -->
|
||||
<xsd:complexType name="transformerType">
|
||||
<xsd:sequence>
|
||||
<xsd:element name="class-name" type="xsd:string" maxOccurs="1" minOccurs="1">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
optional name of transformer class
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
<xsd:element name="property" type="transformerProperty" maxOccurs="unbounded" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
properties to configure the transformer class
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
</xsd:sequence>
|
||||
</xsd:complexType>
|
||||
|
||||
|
||||
<xsd:complexType name="transformerProperty">
|
||||
<xsd:attribute name="key" type="xsd:string" use="required">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
key for the property
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="value" type="xsd:string" use="required">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
value for the property
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
</xsd:complexType>
|
||||
|
||||
|
||||
<!-- CLUSTER CONNECTION CONFIGURATION -->
|
||||
|
||||
|
@ -1704,6 +1750,14 @@
|
|||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="transformer" type="transformerType" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
optional transformer configuration
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="exclusive" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
|
|
|
@ -193,7 +193,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
|||
Assert.assertEquals(12999, ((UDPBroadcastEndpointFactory) dc.getBroadcastEndpointFactory()).getGroupPort());
|
||||
Assert.assertEquals(23456, dc.getRefreshTimeout());
|
||||
|
||||
Assert.assertEquals(2, conf.getDivertConfigurations().size());
|
||||
Assert.assertEquals(3, conf.getDivertConfigurations().size());
|
||||
for (DivertConfiguration dic : conf.getDivertConfigurations()) {
|
||||
if (dic.getName().equals("divert1")) {
|
||||
Assert.assertEquals("divert1", dic.getName());
|
||||
|
@ -201,20 +201,25 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
|||
Assert.assertEquals("address1", dic.getAddress());
|
||||
Assert.assertEquals("forwarding-address1", dic.getForwardingAddress());
|
||||
Assert.assertEquals("speed > 88", dic.getFilterString());
|
||||
Assert.assertEquals("org.foo.Transformer", dic.getTransformerClassName());
|
||||
Assert.assertEquals("org.foo.Transformer", dic.getTransformerConfiguration().getClassName());
|
||||
Assert.assertEquals(true, dic.isExclusive());
|
||||
} else {
|
||||
} else if (dic.getName().equals("divert2")) {
|
||||
Assert.assertEquals("divert2", dic.getName());
|
||||
Assert.assertEquals("routing-name2", dic.getRoutingName());
|
||||
Assert.assertEquals("address2", dic.getAddress());
|
||||
Assert.assertEquals("forwarding-address2", dic.getForwardingAddress());
|
||||
Assert.assertEquals("speed < 88", dic.getFilterString());
|
||||
Assert.assertEquals("org.foo.Transformer2", dic.getTransformerClassName());
|
||||
Assert.assertEquals("org.foo.Transformer2", dic.getTransformerConfiguration().getClassName());
|
||||
Assert.assertEquals(false, dic.isExclusive());
|
||||
} else {
|
||||
Assert.assertEquals("divert3", dic.getName());
|
||||
Assert.assertEquals("org.foo.DivertTransformer3", dic.getTransformerConfiguration().getClassName());
|
||||
Assert.assertEquals("divertTransformerValue1", dic.getTransformerConfiguration().getProperties().get("divertTransformerKey1"));
|
||||
Assert.assertEquals("divertTransformerValue2", dic.getTransformerConfiguration().getProperties().get("divertTransformerKey2"));
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertEquals(2, conf.getBridgeConfigurations().size());
|
||||
Assert.assertEquals(3, conf.getBridgeConfigurations().size());
|
||||
for (BridgeConfiguration bc : conf.getBridgeConfigurations()) {
|
||||
if (bc.getName().equals("bridge1")) {
|
||||
Assert.assertEquals("bridge1", bc.getName());
|
||||
|
@ -224,7 +229,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
|||
assertEquals("connection time-to-live", 370, bc.getConnectionTTL());
|
||||
Assert.assertEquals("bridge-forwarding-address1", bc.getForwardingAddress());
|
||||
Assert.assertEquals("sku > 1", bc.getFilterString());
|
||||
Assert.assertEquals("org.foo.BridgeTransformer", bc.getTransformerClassName());
|
||||
Assert.assertEquals("org.foo.BridgeTransformer", bc.getTransformerConfiguration().getClassName());
|
||||
Assert.assertEquals(3, bc.getRetryInterval());
|
||||
Assert.assertEquals(0.2, bc.getRetryIntervalMultiplier(), 0.0001);
|
||||
assertEquals("max retry interval", 10002, bc.getMaxRetryInterval());
|
||||
|
@ -234,15 +239,21 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
|||
Assert.assertEquals(null, bc.getDiscoveryGroupName());
|
||||
Assert.assertEquals(444, bc.getProducerWindowSize());
|
||||
Assert.assertEquals(1073741824, bc.getConfirmationWindowSize());
|
||||
} else {
|
||||
} else if (bc.getName().equals("bridge2")) {
|
||||
Assert.assertEquals("bridge2", bc.getName());
|
||||
Assert.assertEquals("queue2", bc.getQueueName());
|
||||
Assert.assertEquals("bridge-forwarding-address2", bc.getForwardingAddress());
|
||||
Assert.assertEquals(null, bc.getFilterString());
|
||||
Assert.assertEquals(null, bc.getTransformerClassName());
|
||||
Assert.assertEquals(null, bc.getTransformerConfiguration());
|
||||
Assert.assertEquals(null, bc.getStaticConnectors());
|
||||
Assert.assertEquals("dg1", bc.getDiscoveryGroupName());
|
||||
Assert.assertEquals(568320, bc.getProducerWindowSize());
|
||||
} else {
|
||||
Assert.assertEquals("bridge3", bc.getName());
|
||||
Assert.assertEquals("org.foo.BridgeTransformer3", bc.getTransformerConfiguration().getClassName());
|
||||
Assert.assertEquals("bridgeTransformerValue1", bc.getTransformerConfiguration().getProperties().get("bridgeTransformerKey1"));
|
||||
Assert.assertEquals("bridgeTransformerValue2", bc.getTransformerConfiguration().getProperties().get("bridgeTransformerKey2"));
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -125,6 +125,18 @@
|
|||
<transformer-class-name>org.foo.Transformer2</transformer-class-name>
|
||||
<exclusive>false</exclusive>
|
||||
</divert>
|
||||
<divert name="divert3">
|
||||
<routing-name>routing-name2</routing-name>
|
||||
<address>address2</address>
|
||||
<forwarding-address>forwarding-address2</forwarding-address>
|
||||
<filter string="speed < 88"/>
|
||||
<transformer>
|
||||
<class-name>org.foo.DivertTransformer3</class-name>
|
||||
<property key="divertTransformerKey1" value="divertTransformerValue1"/>
|
||||
<property key="divertTransformerKey2" value="divertTransformerValue2"/>
|
||||
</transformer>
|
||||
<exclusive>false</exclusive>
|
||||
</divert>
|
||||
</diverts>
|
||||
<amqp-use-core-subscription-naming>true</amqp-use-core-subscription-naming>
|
||||
<queues>
|
||||
|
@ -166,6 +178,17 @@
|
|||
<producer-window-size>555k</producer-window-size>
|
||||
<discovery-group-ref discovery-group-name="dg1"/>
|
||||
</bridge>
|
||||
<bridge name="bridge3">
|
||||
<queue-name>queue3</queue-name>
|
||||
<forwarding-address>bridge-forwarding-address2</forwarding-address>
|
||||
<transformer>
|
||||
<class-name>org.foo.BridgeTransformer3</class-name>
|
||||
<property key="bridgeTransformerKey1" value="bridgeTransformerValue1"/>
|
||||
<property key="bridgeTransformerKey2" value="bridgeTransformerValue2"/>
|
||||
</transformer>
|
||||
<producer-window-size>555k</producer-window-size>
|
||||
<discovery-group-ref discovery-group-name="dg1"/>
|
||||
</bridge>
|
||||
</bridges>
|
||||
<ha-policy>
|
||||
<!--only one of the following-->
|
||||
|
|
|
@ -1091,6 +1091,14 @@
|
|||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="transformer" type="transformerType" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
optional transformer configuration
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="min-large-message-size" type="xsd:int" default="102400" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
|
@ -1246,6 +1254,44 @@
|
|||
</xsd:attribute>
|
||||
</xsd:complexType>
|
||||
|
||||
<!-- TRANSFORMER CONFIGURATION -->
|
||||
<xsd:complexType name="transformerType">
|
||||
<xsd:sequence>
|
||||
<xsd:element name="class-name" type="xsd:string" maxOccurs="1" minOccurs="1">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
optional name of transformer class
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
<xsd:element name="property" type="transformerProperty" maxOccurs="unbounded" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
properties to configure the transformer class
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
</xsd:sequence>
|
||||
</xsd:complexType>
|
||||
|
||||
|
||||
<xsd:complexType name="transformerProperty">
|
||||
<xsd:attribute name="key" type="xsd:string" use="required">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
key for the property
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="value" type="xsd:string" use="required">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
value for the property
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
</xsd:complexType>
|
||||
|
||||
|
||||
<!-- CLUSTER CONNECTION CONFIGURATION -->
|
||||
|
||||
|
@ -1530,6 +1576,14 @@
|
|||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="transformer" type="transformerType" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
optional transformer configuration
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="exclusive" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
|
|
|
@ -92,7 +92,7 @@ Let's take a look at all the parameters in turn:
|
|||
|
||||
- `transformer-class-name`. An optional transformer-class-name can be
|
||||
specified. This is the name of a user-defined class which implements
|
||||
the `org.apache.activemq.artemis.core.server.cluster.Transformer` interface.
|
||||
the `org.apache.activemq.artemis.core.server.transformer.Transformer` interface.
|
||||
|
||||
If this is specified then the transformer's `transform()` method
|
||||
will be invoked with the message before it is forwarded. This gives
|
||||
|
|
|
@ -18,7 +18,7 @@ package org.apache.activemq.artemis.jms.example;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.server.cluster.Transformer;
|
||||
import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
||||
|
||||
public class HatColourChangeTransformer implements Transformer {
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@ package org.apache.activemq.artemis.jms.example;
|
|||
|
||||
import org.apache.activemq.artemis.api.core.Message;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.server.cluster.Transformer;
|
||||
import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
||||
|
||||
public class AddForwardingTimeTransformer implements Transformer {
|
||||
|
||||
|
|
|
@ -62,8 +62,9 @@ under the License.
|
|||
<address>priceUpdates</address>
|
||||
<forwarding-address>priceForwarding</forwarding-address>
|
||||
<filter string="office='New York'"/>
|
||||
<transformer-class-name>org.apache.activemq.artemis.jms.example.AddForwardingTimeTransformer
|
||||
</transformer-class-name>
|
||||
<transformer>
|
||||
<class-name>org.apache.activemq.artemis.jms.example.AddForwardingTimeTransformer</class-name>
|
||||
</transformer>
|
||||
<exclusive>true</exclusive>
|
||||
</divert>
|
||||
</diverts>
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
|||
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
|
||||
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
|
||||
|
@ -69,7 +70,8 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
|||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.cluster.Bridge;
|
||||
import org.apache.activemq.artemis.core.server.cluster.Transformer;
|
||||
import org.apache.activemq.artemis.core.server.transformer.AddHeadersTransformer;
|
||||
import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
|
||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
|
||||
|
@ -948,7 +950,7 @@ public class BridgeTest extends ActiveMQTestBase {
|
|||
internaltestWithTransformer(true);
|
||||
}
|
||||
|
||||
public void internaltestWithTransformer(final boolean useFiles) throws Exception {
|
||||
private void internaltestWithTransformer(final boolean useFiles) throws Exception {
|
||||
Map<String, Object> server0Params = new HashMap<>();
|
||||
server0 = createClusteredServerWithParams(isNetty(), 0, false, server0Params);
|
||||
|
||||
|
@ -971,7 +973,7 @@ public class BridgeTest extends ActiveMQTestBase {
|
|||
ArrayList<String> staticConnectors = new ArrayList<>();
|
||||
staticConnectors.add(server1tc.getName());
|
||||
|
||||
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setTransformerClassName(SimpleTransformer.class.getName()).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(1024).setStaticConnectors(staticConnectors);
|
||||
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setTransformerConfiguration(new TransformerConfiguration().setClassName(SimpleTransformer.class.getName())).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(1024).setStaticConnectors(staticConnectors);
|
||||
|
||||
List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
|
||||
bridgeConfigs.add(bridgeConfiguration);
|
||||
|
@ -1051,6 +1053,114 @@ public class BridgeTest extends ActiveMQTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithTransformerProperties() throws Exception {
|
||||
final String propKey = "bridged";
|
||||
final String propValue = "true";
|
||||
|
||||
|
||||
TransformerConfiguration transformerConfiguration = new TransformerConfiguration().setClassName(AddHeadersTransformer.class.getName());
|
||||
transformerConfiguration.getProperties().put(propKey, propValue);
|
||||
|
||||
Map<String, Object> server0Params = new HashMap<>();
|
||||
server0 = createClusteredServerWithParams(isNetty(), 0, false, server0Params);
|
||||
|
||||
Map<String, Object> server1Params = new HashMap<>();
|
||||
addTargetParameters(server1Params);
|
||||
server1 = createClusteredServerWithParams(isNetty(), 1, false, server1Params);
|
||||
|
||||
final String testAddress = "testAddress";
|
||||
final String queueName0 = "queue0";
|
||||
final String forwardAddress = "forwardAddress";
|
||||
final String queueName1 = "queue1";
|
||||
|
||||
Map<String, TransportConfiguration> connectors = new HashMap<>();
|
||||
TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params);
|
||||
TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
|
||||
connectors.put(server1tc.getName(), server1tc);
|
||||
|
||||
server0.getConfiguration().setConnectorConfigurations(connectors);
|
||||
|
||||
ArrayList<String> staticConnectors = new ArrayList<>();
|
||||
staticConnectors.add(server1tc.getName());
|
||||
|
||||
BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName("bridge1").setQueueName(queueName0).setForwardingAddress(forwardAddress).setTransformerConfiguration(transformerConfiguration).setRetryInterval(1000).setReconnectAttemptsOnSameNode(-1).setUseDuplicateDetection(false).setConfirmationWindowSize(1024).setStaticConnectors(staticConnectors);
|
||||
|
||||
List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
|
||||
bridgeConfigs.add(bridgeConfiguration);
|
||||
server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
|
||||
|
||||
CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration().setAddress(testAddress).setName(queueName0);
|
||||
List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<>();
|
||||
queueConfigs0.add(queueConfig0);
|
||||
server0.getConfiguration().setQueueConfigurations(queueConfigs0);
|
||||
|
||||
CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration().setAddress(forwardAddress).setName(queueName1);
|
||||
List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<>();
|
||||
queueConfigs1.add(queueConfig1);
|
||||
server1.getConfiguration().setQueueConfigurations(queueConfigs1);
|
||||
|
||||
server1.start();
|
||||
server0.start();
|
||||
|
||||
locator = addServerLocator(ActiveMQClient.createServerLocatorWithoutHA(server0tc, server1tc));
|
||||
ClientSessionFactory sf0 = locator.createSessionFactory(server0tc);
|
||||
|
||||
ClientSessionFactory sf1 = locator.createSessionFactory(server1tc);
|
||||
|
||||
ClientSession session0 = sf0.createSession(false, true, true);
|
||||
|
||||
ClientSession session1 = sf1.createSession(false, true, true);
|
||||
|
||||
ClientProducer producer0 = session0.createProducer(new SimpleString(testAddress));
|
||||
|
||||
ClientConsumer consumer1 = session1.createConsumer(queueName1);
|
||||
|
||||
session1.start();
|
||||
|
||||
final int numMessages = 10;
|
||||
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
ClientMessage message = session0.createMessage(true);
|
||||
|
||||
message.getBodyBuffer().writeString("doo be doo be doo be doo");
|
||||
|
||||
producer0.send(message);
|
||||
}
|
||||
|
||||
for (int i = 0; i < numMessages; i++) {
|
||||
ClientMessage message = consumer1.receive(200);
|
||||
|
||||
Assert.assertNotNull(message);
|
||||
|
||||
String messagePropVal = message.getStringProperty(propKey);
|
||||
|
||||
Assert.assertEquals(propValue, messagePropVal);
|
||||
|
||||
String sval = message.getBodyBuffer().readString();
|
||||
|
||||
Assert.assertEquals("doo be doo be doo be doo", sval);
|
||||
|
||||
message.acknowledge();
|
||||
|
||||
}
|
||||
|
||||
Assert.assertNull(consumer1.receiveImmediate());
|
||||
|
||||
session0.close();
|
||||
|
||||
session1.close();
|
||||
|
||||
sf0.close();
|
||||
|
||||
sf1.close();
|
||||
|
||||
if (server0.getConfiguration().isPersistenceEnabled()) {
|
||||
assertEquals(0, loadQueues(server0).size());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSawtoothLoad() throws Exception {
|
||||
Map<String, Object> server0Params = new HashMap<>();
|
||||
|
|
|
@ -37,7 +37,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
|||
import org.apache.activemq.artemis.core.server.Divert;
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
|
||||
import org.apache.activemq.artemis.core.server.cluster.Transformer;
|
||||
import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
|
|
|
@ -797,6 +797,32 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
|
|||
proxy.invokeOperation("createDivert", name, routingName, address, forwardingAddress, exclusive, filterString, transformerClassName, routingType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createDivert(String name,
|
||||
String routingName,
|
||||
String address,
|
||||
String forwardingAddress,
|
||||
boolean exclusive,
|
||||
String filterString,
|
||||
String transformerClassName,
|
||||
Map<String, String> transformerProperties,
|
||||
String routingType) throws Exception {
|
||||
proxy.invokeOperation("createDivert", name, routingName, address, forwardingAddress, exclusive, filterString, transformerClassName, transformerProperties, routingType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createDivert(String name,
|
||||
String routingName,
|
||||
String address,
|
||||
String forwardingAddress,
|
||||
boolean exclusive,
|
||||
String filterString,
|
||||
String transformerClassName,
|
||||
String transformerPropertiesAsJSON,
|
||||
String routingType) throws Exception {
|
||||
proxy.invokeOperation("createDivert", name, routingName, address, forwardingAddress, exclusive, filterString, transformerClassName, transformerPropertiesAsJSON, routingType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroyDivert(String name) throws Exception {
|
||||
proxy.invokeOperation("destroyDivert", name);
|
||||
|
@ -869,6 +895,52 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
|
|||
proxy.invokeOperation("createBridge", name, queueName, forwardingAddress, filterString, transformerClassName, retryInterval, retryIntervalMultiplier, initialConnectAttempts, reconnectAttempts, useDuplicateDetection, confirmationWindowSize, producerWindowSize, clientFailureCheckPeriod, connectorNames, useDiscovery, ha, user, password);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createBridge(String name,
|
||||
String queueName,
|
||||
String forwardingAddress,
|
||||
String filterString,
|
||||
String transformerClassName,
|
||||
Map<String, String> transformerProperties,
|
||||
long retryInterval,
|
||||
double retryIntervalMultiplier,
|
||||
int initialConnectAttempts,
|
||||
int reconnectAttempts,
|
||||
boolean useDuplicateDetection,
|
||||
int confirmationWindowSize,
|
||||
int producerWindowSize,
|
||||
long clientFailureCheckPeriod,
|
||||
String connectorNames,
|
||||
boolean useDiscovery,
|
||||
boolean ha,
|
||||
String user,
|
||||
String password) throws Exception {
|
||||
proxy.invokeOperation("createBridge", name, queueName, forwardingAddress, filterString, transformerClassName, transformerProperties, retryInterval, retryIntervalMultiplier, initialConnectAttempts, reconnectAttempts, useDuplicateDetection, confirmationWindowSize, producerWindowSize, clientFailureCheckPeriod, connectorNames, useDiscovery, ha, user, password);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createBridge(String name,
|
||||
String queueName,
|
||||
String forwardingAddress,
|
||||
String filterString,
|
||||
String transformerClassName,
|
||||
String transformerPropertiesAsJSON,
|
||||
long retryInterval,
|
||||
double retryIntervalMultiplier,
|
||||
int initialConnectAttempts,
|
||||
int reconnectAttempts,
|
||||
boolean useDuplicateDetection,
|
||||
int confirmationWindowSize,
|
||||
int producerWindowSize,
|
||||
long clientFailureCheckPeriod,
|
||||
String connectorNames,
|
||||
boolean useDiscovery,
|
||||
boolean ha,
|
||||
String user,
|
||||
String password) throws Exception {
|
||||
proxy.invokeOperation("createBridge", name, queueName, forwardingAddress, filterString, transformerClassName, transformerPropertiesAsJSON, retryInterval, retryIntervalMultiplier, initialConnectAttempts, reconnectAttempts, useDuplicateDetection, confirmationWindowSize, producerWindowSize, clientFailureCheckPeriod, connectorNames, useDiscovery, ha, user, password);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createBridge(String name,
|
||||
String queueName,
|
||||
|
|
|
@ -61,7 +61,9 @@ public class DivertControlTest extends ManagementTestBase {
|
|||
|
||||
Assert.assertEquals(divertConfig.getForwardingAddress(), divertControl.getForwardingAddress());
|
||||
|
||||
Assert.assertEquals(divertConfig.getTransformerClassName(), divertControl.getTransformerClassName());
|
||||
Assert.assertEquals(divertConfig.getTransformerConfiguration().getClassName(), divertControl.getTransformerClassName());
|
||||
|
||||
Assert.assertEquals(divertConfig.getTransformerConfiguration().getProperties(), divertControl.getTransformerProperties());
|
||||
}
|
||||
|
||||
// Package protected ---------------------------------------------
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.management;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.management.DivertControl;
|
||||
import org.apache.activemq.artemis.api.core.management.ResourceNames;
|
||||
|
||||
|
@ -61,6 +63,16 @@ public class DivertControlUsingCoreTest extends DivertControlTest {
|
|||
return (String) proxy.retrieveAttributeValue("transformerClassName");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTransformerPropertiesAsJSON() {
|
||||
return (String) proxy.retrieveAttributeValue("transformerPropertiesAsJSON");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getTransformerProperties() {
|
||||
return (Map<String, String>) proxy.retrieveAttributeValue("transformerProperties");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRoutingType() {
|
||||
return (String) proxy.retrieveAttributeValue("routingType");
|
||||
|
|
Loading…
Reference in New Issue