This closes #322

This commit is contained in:
Clebert Suconic 2016-01-19 17:42:37 -05:00
commit 2d4654ca26
62 changed files with 2679 additions and 225 deletions

View File

@ -59,6 +59,10 @@ public class URIFactory<T, P> {
return schemaFactory.newObject(uri, param);
}
public T newObject(String uri, P param) throws Exception {
return newObject(new URI(uri), param);
}
public void populateObject(URI uri, T bean) throws Exception {
URISchema<T, P> schemaFactory = schemas.get(uri.getScheme());
@ -69,6 +73,11 @@ public class URIFactory<T, P> {
schemaFactory.populateObject(uri, bean);
}
public void populateObject(String uri, T bean) throws Exception {
populateObject(new URI(uri), bean);
}
public URI createSchema(String scheme, T bean) throws Exception {
URISchema<T, P> schemaFactory = schemas.get(scheme);

View File

@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.beanutils.Converter;
public abstract class URISchema<T, P> {
@ -99,6 +100,12 @@ public abstract class URISchema<T, P> {
private static final BeanUtilsBean beanUtils = new BeanUtilsBean();
public static void registerConverter(Converter converter, Class type) {
synchronized (beanUtils) {
beanUtils.getConvertUtils().register(converter, type);
}
}
static {
// This is to customize the BeanUtils to use Fluent Proeprties as well
beanUtils.getPropertyUtils().addBeanIntrospector(new FluentPropertyBeanIntrospectorWithIgnores());

View File

@ -0,0 +1,560 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.uri;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Utility class that provides methods for parsing URI's
*
* This class can be used to split composite URI's into their component parts and is used to extract any
* URI options from each URI in order to set specific properties on Beans.
*
* (copied from activemq 5)
*/
public class URISupport {
/**
* A composite URI can be split into one or more CompositeData object which each represent the
* individual URIs that comprise the composite one.
*/
public static class CompositeData {
private String host;
private String scheme;
private String path;
private URI[] components;
private Map<String, String> parameters;
private String fragment;
public URI[] getComponents() {
return components;
}
public String getFragment() {
return fragment;
}
public Map<String, String> getParameters() {
return parameters;
}
public String getScheme() {
return scheme;
}
public String getPath() {
return path;
}
public String getHost() {
return host;
}
public URI toURI() throws URISyntaxException {
StringBuffer sb = new StringBuffer();
if (scheme != null) {
sb.append(scheme);
sb.append(':');
}
if (host != null && host.length() != 0) {
sb.append(host);
}
else {
sb.append('(');
for (int i = 0; i < components.length; i++) {
if (i != 0) {
sb.append(',');
}
sb.append(components[i].toString());
}
sb.append(')');
}
if (path != null) {
sb.append('/');
sb.append(path);
}
if (!parameters.isEmpty()) {
sb.append("?");
sb.append(createQueryString(parameters));
}
if (fragment != null) {
sb.append("#");
sb.append(fragment);
}
return new URI(sb.toString());
}
}
/**
* Give a URI break off any URI options and store them in a Key / Value Mapping.
*
* @param uri The URI whose query should be extracted and processed.
* @return A Mapping of the URI options.
* @throws java.net.URISyntaxException
*/
public static Map<String, String> parseQuery(String uri) throws URISyntaxException {
try {
uri = uri.substring(uri.lastIndexOf("?") + 1); // get only the relevant part of the query
Map<String, String> rc = new HashMap<String, String>();
if (uri != null && !uri.isEmpty()) {
parseParameters(rc, uri.split("&"));
parseParameters(rc, uri.split(";"));
}
return rc;
}
catch (UnsupportedEncodingException e) {
throw (URISyntaxException) new URISyntaxException(e.toString(), "Invalid encoding").initCause(e);
}
}
private static void parseParameters(Map<String, String> rc,
String[] parameters) throws UnsupportedEncodingException {
for (int i = 0; i < parameters.length; i++) {
int p = parameters[i].indexOf("=");
if (p >= 0) {
String name = URLDecoder.decode(parameters[i].substring(0, p), "UTF-8");
String value = URLDecoder.decode(parameters[i].substring(p + 1), "UTF-8");
rc.put(name, value);
}
else {
rc.put(parameters[i], null);
}
}
}
/**
* Given a URI parse and extract any URI query options and return them as a Key / Value mapping.
*
* This method differs from the {@link parseQuery} method in that it handles composite URI types and
* will extract the URI options from the outermost composite URI.
*
* @param uri The URI whose query should be extracted and processed.
* @return A Mapping of the URI options.
* @throws java.net.URISyntaxException
*/
public static Map<String, String> parseParameters(URI uri) throws URISyntaxException {
if (!isCompositeURI(uri)) {
return uri.getQuery() == null ? emptyMap() : parseQuery(stripPrefix(uri.getQuery(), "?"));
}
else {
CompositeData data = URISupport.parseComposite(uri);
Map<String, String> parameters = new HashMap<String, String>();
parameters.putAll(data.getParameters());
if (parameters.isEmpty()) {
parameters = emptyMap();
}
return parameters;
}
}
/**
* Given a Key / Value mapping create and append a URI query value that represents the mapped entries, return the
* newly updated URI that contains the value of the given URI and the appended query value.
*
* @param uri The source URI that will have the Map entries appended as a URI query value.
* @param queryParameters The Key / Value mapping that will be transformed into a URI query string.
* @return A new URI value that combines the given URI and the constructed query string.
* @throws java.net.URISyntaxException
*/
public static URI applyParameters(URI uri, Map<String, String> queryParameters) throws URISyntaxException {
return applyParameters(uri, queryParameters, "");
}
/**
* Given a Key / Value mapping create and append a URI query value that represents the mapped entries, return the
* newly updated URI that contains the value of the given URI and the appended query value. Each entry in the query
* string is prefixed by the supplied optionPrefix string.
*
* @param uri The source URI that will have the Map entries appended as a URI query value.
* @param queryParameters The Key / Value mapping that will be transformed into a URI query string.
* @param optionPrefix A string value that when not null or empty is used to prefix each query option key.
* @return A new URI value that combines the given URI and the constructed query string.
* @throws java.net.URISyntaxException
*/
public static URI applyParameters(URI uri,
Map<String, String> queryParameters,
String optionPrefix) throws URISyntaxException {
if (queryParameters != null && !queryParameters.isEmpty()) {
StringBuffer newQuery = uri.getRawQuery() != null ? new StringBuffer(uri.getRawQuery()) : new StringBuffer();
for (Map.Entry<String, String> param : queryParameters.entrySet()) {
if (param.getKey().startsWith(optionPrefix)) {
if (newQuery.length() != 0) {
newQuery.append('&');
}
final String key = param.getKey().substring(optionPrefix.length());
newQuery.append(key).append('=').append(param.getValue());
}
}
uri = createURIWithQuery(uri, newQuery.toString());
}
return uri;
}
@SuppressWarnings("unchecked")
private static Map<String, String> emptyMap() {
return Collections.EMPTY_MAP;
}
/**
* Removes any URI query from the given uri and return a new URI that does not contain the query portion.
*
* @param uri The URI whose query value is to be removed.
* @return a new URI that does not contain a query value.
* @throws java.net.URISyntaxException
*/
public static URI removeQuery(URI uri) throws URISyntaxException {
return createURIWithQuery(uri, null);
}
/**
* Creates a URI with the given query, removing an previous query value from the given URI.
*
* @param uri The source URI whose existing query is replaced with the newly supplied one.
* @param query The new URI query string that should be appended to the given URI.
* @return a new URI that is a combination of the original URI and the given query string.
* @throws java.net.URISyntaxException
*/
public static URI createURIWithQuery(URI uri, String query) throws URISyntaxException {
String schemeSpecificPart = uri.getRawSchemeSpecificPart();
// strip existing query if any
int questionMark = schemeSpecificPart.lastIndexOf("?");
// make sure question mark is not within parentheses
if (questionMark < schemeSpecificPart.lastIndexOf(")")) {
questionMark = -1;
}
if (questionMark > 0) {
schemeSpecificPart = schemeSpecificPart.substring(0, questionMark);
}
if (query != null && query.length() > 0) {
schemeSpecificPart += "?" + query;
}
return new URI(uri.getScheme(), schemeSpecificPart, uri.getFragment());
}
/**
* Given a composite URI, parse the individual URI elements contained within that URI and return
* a CompsoteData instance that contains the parsed URI values.
*
* @param uri The target URI that should be parsed.
* @return a new CompsiteData instance representing the parsed composite URI.
* @throws java.net.URISyntaxException
*/
public static CompositeData parseComposite(URI uri) throws URISyntaxException {
CompositeData rc = new CompositeData();
rc.scheme = uri.getScheme();
String ssp = stripPrefix(uri.getRawSchemeSpecificPart().trim(), "//").trim();
parseComposite(uri, rc, ssp);
rc.fragment = uri.getFragment();
return rc;
}
/**
* Examine a URI and determine if it is a Composite type or not.
*
* @param uri The URI that is to be examined.
* @return true if the given URI is a Compsote type.
*/
public static boolean isCompositeURI(URI uri) {
String ssp = stripPrefix(uri.getRawSchemeSpecificPart().trim(), "//").trim();
if (ssp.indexOf('(') == 0 && checkParenthesis(ssp)) {
return true;
}
return false;
}
/**
* Given a string and a position in that string of an open parend, find the matching close parend.
*
* @param str The string to be searched for a matching parend.
* @param first The index in the string of the opening parend whose close value is to be searched.
* @return the index in the string where the closing parend is located.
* @throws java.net.URISyntaxException fi the string does not contain a matching parend.
*/
public static int indexOfParenthesisMatch(String str, int first) throws URISyntaxException {
int index = -1;
if (first < 0 || first > str.length()) {
throw new IllegalArgumentException("Invalid position for first parenthesis: " + first);
}
if (str.charAt(first) != '(') {
throw new IllegalArgumentException("character at indicated position is not a parenthesis");
}
int depth = 1;
char[] array = str.toCharArray();
for (index = first + 1; index < array.length; ++index) {
char current = array[index];
if (current == '(') {
depth++;
}
else if (current == ')') {
if (--depth == 0) {
break;
}
}
}
if (depth != 0) {
throw new URISyntaxException(str, "URI did not contain a matching parenthesis.");
}
return index;
}
/**
* Given a composite URI and a CompositeData instance and the scheme specific part extracted from the source URI,
* parse the composite URI and populate the CompositeData object with the results. The source URI is used only
* for logging as the ssp should have already been extracted from it and passed here.
*
* @param uri The original source URI whose ssp is parsed into the composite data.
* @param rc The CompsositeData instance that will be populated from the given ssp.
* @param ssp The scheme specific part from the original string that is a composite or one or more URIs.
* @throws java.net.URISyntaxException
*/
private static void parseComposite(URI uri, CompositeData rc, String ssp) throws URISyntaxException {
String componentString;
String params;
if (!checkParenthesis(ssp)) {
throw new URISyntaxException(uri.toString(), "Not a matching number of '(' and ')' parenthesis");
}
int p;
int initialParen = ssp.indexOf("(");
if (initialParen == 0) {
rc.host = ssp.substring(0, initialParen);
p = rc.host.indexOf("/");
if (p >= 0) {
rc.path = rc.host.substring(p);
rc.host = rc.host.substring(0, p);
}
p = indexOfParenthesisMatch(ssp, initialParen);
componentString = ssp.substring(initialParen + 1, p);
params = ssp.substring(p + 1).trim();
}
else {
componentString = ssp;
params = "";
}
String[] components = splitComponents(componentString);
rc.components = new URI[components.length];
for (int i = 0; i < components.length; i++) {
rc.components[i] = new URI(components[i].trim());
}
p = params.indexOf("?");
if (p >= 0) {
if (p > 0) {
rc.path = stripPrefix(params.substring(0, p), "/");
}
rc.parameters = parseQuery(params.substring(p + 1));
}
else {
if (params.length() > 0) {
rc.path = stripPrefix(params, "/");
}
rc.parameters = emptyMap();
}
}
/**
* Given the inner portion of a composite URI, split and return each inner URI as a string
* element in a new String array.
*
* @param str The inner URI elements of a composite URI string.
* @return an array containing each inner URI from the composite one.
*/
private static String[] splitComponents(String str) {
List<String> l = new ArrayList<String>();
int last = 0;
int depth = 0;
char[] chars = str.toCharArray();
for (int i = 0; i < chars.length; i++) {
switch (chars[i]) {
case '(':
depth++;
break;
case ')':
depth--;
break;
case ',':
if (depth == 0) {
String s = str.substring(last, i);
l.add(s);
last = i + 1;
}
break;
default:
}
}
String s = str.substring(last);
if (s.length() != 0) {
l.add(s);
}
String[] rc = new String[l.size()];
l.toArray(rc);
return rc;
}
/**
* String the given prefix from the target string and return the result.
*
* @param value The string that should be trimmed of the given prefix if present.
* @param prefix The prefix to remove from the target string.
* @return either the original string or a new string minus the supplied prefix if present.
*/
public static String stripPrefix(String value, String prefix) {
if (value.startsWith(prefix)) {
return value.substring(prefix.length());
}
return value;
}
/**
* Strip a URI of its scheme element.
*
* @param uri The URI whose scheme value should be stripped.
* @return The stripped URI value.
* @throws java.net.URISyntaxException
*/
public static URI stripScheme(URI uri) throws URISyntaxException {
return new URI(stripPrefix(uri.getSchemeSpecificPart().trim(), "//"));
}
/**
* Given a key / value mapping, create and return a URI formatted query string that is valid and
* can be appended to a URI.
*
* @param options The Mapping that will create the new Query string.
* @return a URI formatted query string.
* @throws java.net.URISyntaxException
*/
public static String createQueryString(Map<String, ? extends Object> options) throws URISyntaxException {
try {
if (options.size() > 0) {
StringBuffer rc = new StringBuffer();
boolean first = true;
for (String key : options.keySet()) {
if (first) {
first = false;
}
else {
rc.append("&");
}
String value = (String) options.get(key);
rc.append(URLEncoder.encode(key, "UTF-8"));
rc.append("=");
rc.append(URLEncoder.encode(value, "UTF-8"));
}
return rc.toString();
}
else {
return "";
}
}
catch (UnsupportedEncodingException e) {
throw (URISyntaxException) new URISyntaxException(e.toString(), "Invalid encoding").initCause(e);
}
}
/**
* Creates a URI from the original URI and the remaining parameters.
*
* When the query options of a URI are applied to certain objects the used portion of the query options needs
* to be removed and replaced with those that remain so that other parts of the code can attempt to apply the
* remainder or give an error is unknown values were given. This method is used to update a URI with those
* remainder values.
*
* @param originalURI The URI whose current parameters are remove and replaced with the given remainder value.
* @param params The URI params that should be used to replace the current ones in the target.
* @return a new URI that matches the original one but has its query options replaced with the given ones.
* @throws java.net.URISyntaxException
*/
public static URI createRemainingURI(URI originalURI, Map<String, String> params) throws URISyntaxException {
String s = createQueryString(params);
if (s.length() == 0) {
s = null;
}
return createURIWithQuery(originalURI, s);
}
/**
* Given a URI value create and return a new URI that matches the target one but with the scheme value
* supplied to this method.
*
* @param bindAddr The URI whose scheme value should be altered.
* @param scheme The new scheme value to use for the returned URI.
* @return a new URI that is a copy of the original except that its scheme matches the supplied one.
* @throws java.net.URISyntaxException
*/
public static URI changeScheme(URI bindAddr, String scheme) throws URISyntaxException {
return new URI(scheme, bindAddr.getUserInfo(), bindAddr.getHost(), bindAddr.getPort(), bindAddr.getPath(), bindAddr.getQuery(), bindAddr.getFragment());
}
/**
* Examine the supplied string and ensure that all parends appear as matching pairs.
*
* @param str The target string to examine.
* @return true if the target string has valid parend pairings.
*/
public static boolean checkParenthesis(String str) {
boolean result = true;
if (str != null) {
int open = 0;
int closed = 0;
int i = 0;
while ((i = str.indexOf('(', i)) >= 0) {
i++;
open++;
}
i = 0;
while ((i = str.indexOf(')', i)) >= 0) {
i++;
closed++;
}
result = open == closed;
}
return result;
}
}

View File

@ -17,14 +17,14 @@
package org.apache.activemq.artemis.utils;
import java.net.URI;
import java.util.Map;
import org.apache.activemq.artemis.utils.uri.URIFactory;
import org.apache.activemq.artemis.utils.uri.URISchema;
import org.junit.Assert;
import org.junit.Test;
import java.net.URI;
import java.util.Map;
public class URIParserTest {
/**

View File

@ -160,6 +160,9 @@ public final class ActiveMQDefaultConfiguration {
// the name of the address that consumers bind to receive management notifications
private static SimpleString DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS = new SimpleString("activemq.notifications");
// The default address used for clustering
private static String DEFAULT_CLUSTER_ADDRESS = "jms";
// Cluster username. It applies to all cluster configurations.
private static String DEFAULT_CLUSTER_USER = "ACTIVEMQ.CLUSTER.ADMIN.USER";
@ -499,6 +502,11 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS;
}
/** The default Cluster address for the Cluster connection*/
public static String getDefaultClusterAddress() {
return DEFAULT_CLUSTER_ADDRESS;
}
/**
* Cluster username. It applies to all cluster configurations.
*/

View File

@ -92,4 +92,6 @@ public interface TopologyMember {
*/
boolean isMember(TransportConfiguration configuration);
String toURI();
}

View File

@ -19,7 +19,11 @@ package org.apache.activemq.artemis.core.client.impl;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import java.util.Map;
public final class TopologyMemberImpl implements TopologyMember {
@ -117,6 +121,15 @@ public final class TopologyMemberImpl implements TopologyMember {
}
}
@Override
public String toURI() {
TransportConfiguration liveConnector = getLive();
Map<String, Object> props = liveConnector.getParams();
String host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, "localhost", props);
int port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, 0, props);
return "tcp://" + host + ":" + port;
}
@Override
public String toString() {
return "TopologyMember[id = " + nodeId + ", connector=" + connector + ", backupGroupName=" + backupGroupName + ", scaleDownGroupName=" + scaleDownGroupName + "]";

View File

@ -16,12 +16,14 @@
*/
package org.apache.activemq.artemis.uri;
import java.util.List;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.uri.schema.connector.InVMTransportConfigurationSchema;
import org.apache.activemq.artemis.uri.schema.connector.TCPTransportConfigurationSchema;
import org.apache.activemq.artemis.utils.uri.URIFactory;
import java.util.List;
public class ConnectorTransportConfigurationParser extends URIFactory<List<TransportConfiguration>, String> {
public ConnectorTransportConfigurationParser() {

View File

@ -17,6 +17,10 @@
package org.apache.activemq.artemis.uri;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.uri.schema.serverLocator.InVMServerLocatorSchema;
import org.apache.activemq.artemis.uri.schema.serverLocator.JGroupsServerLocatorSchema;
import org.apache.activemq.artemis.uri.schema.serverLocator.TCPServerLocatorSchema;
import org.apache.activemq.artemis.uri.schema.serverLocator.UDPServerLocatorSchema;
import org.apache.activemq.artemis.utils.uri.URIFactory;
public class ServerLocatorParser extends URIFactory<ServerLocator, String> {

View File

@ -14,13 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.uri;
package org.apache.activemq.artemis.uri.schema.connector;
import java.util.List;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.utils.uri.URISchema;
import java.util.List;
public abstract class AbstractTransportConfigurationSchema extends URISchema<List<TransportConfiguration>, String> {
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.uri;
package org.apache.activemq.artemis.uri.schema.connector;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.utils.uri.SchemaConstants;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.uri;
package org.apache.activemq.artemis.uri.schema.connector;
import java.net.URI;
import java.net.URISyntaxException;
@ -27,7 +27,6 @@ import java.util.Set;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.utils.uri.SchemaConstants;
import org.apache.activemq.artemis.utils.uri.URISchema;
public class TCPTransportConfigurationSchema extends AbstractTransportConfigurationSchema {
@ -61,7 +60,7 @@ public class TCPTransportConfigurationSchema extends AbstractTransportConfigurat
String factoryName) throws URISyntaxException {
HashMap<String, Object> props = new HashMap<>();
URISchema.setData(uri, props, allowableProperties, query);
setData(uri, props, allowableProperties, query);
List<TransportConfiguration> transportConfigurations = new ArrayList<>();
transportConfigurations.add(new TransportConfiguration(factoryName, props, name));
@ -72,8 +71,8 @@ public class TCPTransportConfigurationSchema extends AbstractTransportConfigurat
for (String s : split) {
URI extraUri = new URI(s);
HashMap<String, Object> newProps = new HashMap<>();
URISchema.setData(extraUri, newProps, allowableProperties, query);
URISchema.setData(extraUri, newProps, allowableProperties, URISchema.parseQuery(extraUri.getQuery(), null));
setData(extraUri, newProps, allowableProperties, query);
setData(extraUri, newProps, allowableProperties, parseQuery(extraUri.getQuery(), null));
transportConfigurations.add(new TransportConfiguration(factoryName, newProps, name + ":" + extraUri.toString()));
}
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.uri;
package org.apache.activemq.artemis.uri.schema.serverLocator;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.utils.uri.URISchema;

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.uri;
package org.apache.activemq.artemis.uri.schema.serverLocator;
/**
* This will represent all the possible options you could setup on URLs

View File

@ -14,13 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.uri;
package org.apache.activemq.artemis.uri.schema.serverLocator;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.uri.schema.connector.InVMTransportConfigurationSchema;
import org.apache.activemq.artemis.utils.uri.SchemaConstants;
import org.apache.activemq.artemis.utils.uri.URISchema;
import java.net.URI;
import java.net.URISyntaxException;
@ -37,7 +37,7 @@ public class InVMServerLocatorSchema extends AbstractServerLocatorSchema {
protected ServerLocator internalNewObject(URI uri, Map<String, String> query, String name) throws Exception {
TransportConfiguration tc = InVMTransportConfigurationSchema.createTransportConfiguration(uri, query, name, "org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory");
ServerLocator factory = ActiveMQClient.createServerLocatorWithoutHA(tc);
return URISchema.setData(uri, factory, query);
return setData(uri, factory, query);
}
@Override

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.uri;
package org.apache.activemq.artemis.uri.schema.serverLocator;
import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.uri;
package org.apache.activemq.artemis.uri.schema.serverLocator;
import java.net.URI;
import java.util.List;
@ -25,9 +25,9 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.uri.schema.connector.TCPTransportConfigurationSchema;
import org.apache.activemq.artemis.utils.IPV6Util;
import org.apache.activemq.artemis.utils.uri.SchemaConstants;
import org.apache.activemq.artemis.utils.uri.URISchema;
public class TCPServerLocatorSchema extends AbstractServerLocatorSchema {
@Override
@ -52,7 +52,7 @@ public class TCPServerLocatorSchema extends AbstractServerLocatorSchema {
@Override
protected URI internalNewURI(ServerLocator bean) throws Exception {
String query = URISchema.getData(null, bean);
String query = getData(null, bean);
TransportConfiguration[] staticConnectors = bean.getStaticTransportConfigurations();
return getURI(query, staticConnectors);
}

View File

@ -14,23 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.uri;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.utils.uri.SchemaConstants;
import org.apache.activemq.artemis.utils.uri.URISchema;
package org.apache.activemq.artemis.uri.schema.serverLocator;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.utils.uri.SchemaConstants;
public class UDPServerLocatorSchema extends AbstractServerLocatorSchema {
protected static List<String> IGNORED = new ArrayList<>();
public static List<String> IGNORED = new ArrayList<>();
static {
IGNORED.add("localBindAddress");
@ -61,7 +60,7 @@ public class UDPServerLocatorSchema extends AbstractServerLocatorSchema {
DiscoveryGroupConfiguration dgc = bean.getDiscoveryGroupConfiguration();
UDPBroadcastEndpointFactory endpoint = (UDPBroadcastEndpointFactory) dgc.getBroadcastEndpointFactory();
dgc.setBroadcastEndpointFactory(endpoint);
String query = URISchema.getData(IGNORED, bean, dgc, endpoint);
String query = getData(IGNORED, bean, dgc, endpoint);
return new URI(SchemaConstants.UDP, null, endpoint.getGroupAddress(), endpoint.getGroupPort(), null, query, null);
}
@ -72,11 +71,11 @@ public class UDPServerLocatorSchema extends AbstractServerLocatorSchema {
String name) throws Exception {
UDPBroadcastEndpointFactory endpointFactoryConfiguration = new UDPBroadcastEndpointFactory().setGroupAddress(host).setGroupPort(port);
URISchema.setData(uri, endpointFactoryConfiguration, query);
setData(uri, endpointFactoryConfiguration, query);
DiscoveryGroupConfiguration dgc = URISchema.setData(uri, new DiscoveryGroupConfiguration(), query).setName(name).setBroadcastEndpointFactory(endpointFactoryConfiguration);
DiscoveryGroupConfiguration dgc = setData(uri, new DiscoveryGroupConfiguration(), query).setName(name).setBroadcastEndpointFactory(endpointFactoryConfiguration);
URISchema.setData(uri, dgc, query);
setData(uri, dgc, query);
return dgc;
}
}

View File

@ -16,17 +16,17 @@
*/
package org.apache.activemq.artemis.utils;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
public class ConfigurationHelper {
public static String getStringProperty(final String propName, final String def, final Map<String, Object> props) {
public static String getStringProperty(final String propName, final String def, final Map<String, ?> props) {
if (props == null) {
return def;
}
@ -46,7 +46,7 @@ public class ConfigurationHelper {
}
}
public static int getIntProperty(final String propName, final int def, final Map<String, Object> props) {
public static int getIntProperty(final String propName, final int def, final Map<String, ?> props) {
if (props == null) {
return def;
}
@ -71,7 +71,7 @@ public class ConfigurationHelper {
}
}
public static long getLongProperty(final String propName, final long def, final Map<String, Object> props) {
public static long getLongProperty(final String propName, final long def, final Map<String, ?> props) {
if (props == null) {
return def;
}
@ -97,7 +97,7 @@ public class ConfigurationHelper {
}
}
public static boolean getBooleanProperty(final String propName, final boolean def, final Map<String, Object> props) {
public static boolean getBooleanProperty(final String propName, final boolean def, final Map<String, ?> props) {
if (props == null) {
return def;
}
@ -160,7 +160,7 @@ public class ConfigurationHelper {
public static String getPasswordProperty(final String propName,
final String def,
final Map<String, Object> props,
final Map<String, ?> props,
String defaultMaskPassword,
String defaultPasswordCodec) {
if (props == null) {
@ -201,4 +201,17 @@ public class ConfigurationHelper {
}
}
public static double getDoubleProperty(String name, double def, Map<String, ?> props) {
if (props == null) {
return def;
}
Object prop = props.get(name);
if (prop == null) {
return def;
}
else {
String value = prop.toString();
return Double.parseDouble(value);
}
}
}

View File

@ -18,6 +18,8 @@ package org.apache.activemq.artemis.uri;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.uri.schema.serverLocator.InVMServerLocatorSchema;
import org.apache.activemq.artemis.uri.schema.connector.InVMTransportConfigurationSchema;
import org.apache.activemq.artemis.utils.uri.SchemaConstants;
import java.net.URI;

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.JGroupsPropertiesBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.uri.schema.serverLocator.JGroupsServerLocatorSchema;
import org.apache.activemq.artemis.utils.uri.SchemaConstants;
import org.apache.activemq.artemis.utils.uri.URISchema;

View File

@ -18,6 +18,7 @@
package org.apache.activemq.artemis.uri;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.uri.schema.serverLocator.ConnectionOptions;
/**
* This will represent all the possible options you could setup on URLs

View File

@ -21,6 +21,8 @@ import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.uri.schema.serverLocator.TCPServerLocatorSchema;
import org.apache.activemq.artemis.uri.schema.connector.TCPTransportConfigurationSchema;
import org.apache.activemq.artemis.utils.uri.SchemaConstants;
import org.apache.activemq.artemis.utils.uri.URISchema;

View File

@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.uri.schema.serverLocator.UDPServerLocatorSchema;
import org.apache.activemq.artemis.utils.uri.SchemaConstants;
import org.apache.activemq.artemis.utils.uri.URISchema;

View File

@ -55,8 +55,9 @@ public class EmbeddedJMS extends EmbeddedActiveMQ {
*
* @param registry
*/
public void setRegistry(BindingRegistry registry) {
public EmbeddedJMS setRegistry(BindingRegistry registry) {
this.registry = registry;
return this;
}
/**
@ -64,8 +65,9 @@ public class EmbeddedJMS extends EmbeddedActiveMQ {
*
* @param jmsConfiguration
*/
public void setJmsConfiguration(JMSConfiguration jmsConfiguration) {
public EmbeddedJMS setJmsConfiguration(JMSConfiguration jmsConfiguration) {
this.jmsConfiguration = jmsConfiguration;
return this;
}
/**
@ -73,8 +75,9 @@ public class EmbeddedJMS extends EmbeddedActiveMQ {
*
* @param context
*/
public void setContext(Context context) {
public EmbeddedJMS setContext(Context context) {
this.context = context;
return this;
}
/**
@ -89,7 +92,7 @@ public class EmbeddedJMS extends EmbeddedActiveMQ {
}
@Override
public void start() throws Exception {
public EmbeddedJMS start() throws Exception {
super.initStart();
if (jmsConfiguration != null) {
serverManager = new JMSServerManagerImpl(activeMQServer, jmsConfiguration);
@ -116,11 +119,14 @@ public class EmbeddedJMS extends EmbeddedActiveMQ {
}
serverManager.setRegistry(registry);
serverManager.start();
return this;
}
@Override
public void stop() throws Exception {
public EmbeddedJMS stop() throws Exception {
serverManager.stop();
return this;
}
}

View File

@ -16,21 +16,29 @@
*/
package org.apache.activemq.artemis.core.config;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import java.io.Serializable;
import java.net.URI;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.uri.ClusterConnectionConfigurationParser;
import org.apache.activemq.artemis.uri.ConnectorTransportConfigurationParser;
import org.apache.activemq.artemis.utils.uri.URISupport;
public final class ClusterConnectionConfiguration implements Serializable {
private static final long serialVersionUID = 8948303813427795935L;
private String name;
private String address;
private String address = ActiveMQDefaultConfiguration.getDefaultClusterAddress();
private String connectorName;
@ -56,6 +64,8 @@ public final class ClusterConnectionConfiguration implements Serializable {
private MessageLoadBalancingType messageLoadBalancingType = Enum.valueOf(MessageLoadBalancingType.class, ActiveMQDefaultConfiguration.getDefaultClusterMessageLoadBalancingType());
private URISupport.CompositeData compositeMembers;
private List<String> staticConnectors = Collections.emptyList();
private String discoveryGroupName = null;
@ -75,6 +85,11 @@ public final class ClusterConnectionConfiguration implements Serializable {
public ClusterConnectionConfiguration() {
}
public ClusterConnectionConfiguration(URI uri) throws Exception {
ClusterConnectionConfigurationParser parser = new ClusterConnectionConfigurationParser();
parser.populateObject(uri, this);
}
public String getName() {
return name;
}
@ -93,6 +108,15 @@ public final class ClusterConnectionConfiguration implements Serializable {
return this;
}
public ClusterConnectionConfiguration setCompositeMembers(URISupport.CompositeData members) {
this.compositeMembers = members;
return this;
}
public URISupport.CompositeData getCompositeMembers() {
return compositeMembers;
}
/**
* @return the clientFailureCheckPeriod
*/
@ -334,6 +358,72 @@ public final class ClusterConnectionConfiguration implements Serializable {
return this;
}
/**
* This method will match the configuration and return the proper TransportConfiguration for the Configuration
*/
public TransportConfiguration[] getTransportConfigurations(Configuration configuration) throws Exception {
if (getCompositeMembers() != null) {
ConnectorTransportConfigurationParser connectorTransportConfigurationParser = new ConnectorTransportConfigurationParser();
URI[] members = getCompositeMembers().getComponents();
List<TransportConfiguration> list = new LinkedList<>();
for (int i = 0; i < members.length; i++) {
list.addAll(connectorTransportConfigurationParser.newObject(members[i], null));
}
return list.toArray(new TransportConfiguration[list.size()]);
}
else {
return configuration.getTransportConfigurations(staticConnectors);
}
}
/**
* This method will return the proper discovery configuration from the main configuration
*/
public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration(Configuration configuration) {
if (discoveryGroupName != null) {
DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations().get(discoveryGroupName);
if (dg == null) {
ActiveMQServerLogger.LOGGER.clusterConnectionNoDiscoveryGroup(discoveryGroupName);
return null;
}
return dg;
}
else {
return null;
}
}
public TransportConfiguration getTransportConfiguration(Configuration configuration) {
TransportConfiguration connector = configuration.getConnectorConfigurations().get(getConnectorName());
if (connector == null) {
ActiveMQServerLogger.LOGGER.clusterConnectionNoConnector(connectorName);
return null;
}
return connector;
}
public boolean validateConfiguration() {
if (getName() == null) {
ActiveMQServerLogger.LOGGER.clusterConnectionNotUnique();
return false;
}
if (getAddress() == null) {
ActiveMQServerLogger.LOGGER.clusterConnectionNoForwardAddress();
return false;
}
return true;
}
@Override
public int hashCode() {
final int prime = 31;
@ -440,4 +530,33 @@ public final class ClusterConnectionConfiguration implements Serializable {
return false;
return true;
}
@Override
public String toString() {
return "ClusterConnectionConfiguration{" +
"name='" + name + '\'' +
", address='" + address + '\'' +
", connectorName='" + connectorName + '\'' +
", clientFailureCheckPeriod=" + clientFailureCheckPeriod +
", connectionTTL=" + connectionTTL +
", retryInterval=" + retryInterval +
", retryIntervalMultiplier=" + retryIntervalMultiplier +
", maxRetryInterval=" + maxRetryInterval +
", initialConnectAttempts=" + initialConnectAttempts +
", reconnectAttempts=" + reconnectAttempts +
", callTimeout=" + callTimeout +
", callFailoverTimeout=" + callFailoverTimeout +
", duplicateDetection=" + duplicateDetection +
", messageLoadBalancingType=" + messageLoadBalancingType +
", compositeMembers=" + compositeMembers +
", staticConnectors=" + staticConnectors +
", discoveryGroupName='" + discoveryGroupName + '\'' +
", maxHops=" + maxHops +
", confirmationWindowSize=" + confirmationWindowSize +
", allowDirectConnectionsOnly=" + allowDirectConnectionsOnly +
", minLargeMessageSize=" + minLargeMessageSize +
", clusterNotificationInterval=" + clusterNotificationInterval +
", clusterNotificationAttempts=" + clusterNotificationAttempts +
'}';
}
}

View File

@ -369,6 +369,8 @@ public interface Configuration {
Configuration addClusterConfiguration(final ClusterConnectionConfiguration config);
ClusterConnectionConfiguration addClusterConfiguration(String name, String uri) throws Exception;
Configuration clearClusterConfigurations();
/**
@ -911,6 +913,10 @@ public interface Configuration {
* */
Configuration setResolveProtocols(boolean resolveProtocols);
TransportConfiguration[] getTransportConfigurations(String ...connectorNames);
TransportConfiguration[] getTransportConfigurations(List<String> connectorNames);
/*
* @see #setResolveProtocols()
* @return whether ActiveMQ Artemis should resolve and use any Protocols available on the classpath

View File

@ -22,9 +22,12 @@ import java.io.File;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.lang.reflect.Array;
import java.net.URI;
import java.security.AccessController;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@ -48,6 +51,7 @@ import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
@ -90,7 +94,7 @@ public class ConfigurationImpl implements Configuration, Serializable {
protected String jmxDomain = ActiveMQDefaultConfiguration.getDefaultJmxDomain();
protected boolean jmxUseBrokerName = ActiveMQDefaultConfiguration.isDefaultJMXUseBrokerName();
protected boolean jmxUseBrokerName = ActiveMQDefaultConfiguration.isDefaultJMXUseBrokerName();
protected long connectionTTLOverride = ActiveMQDefaultConfiguration.getDefaultConnectionTtlOverride();
@ -491,6 +495,12 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this;
}
public ClusterConnectionConfiguration addClusterConfiguration(String name, String uri) throws Exception {
ClusterConnectionConfiguration newConfig = new ClusterConnectionConfiguration(new URI(uri)).setName(name);
clusterConfigurations.add(newConfig);
return newConfig;
}
@Override
public ConfigurationImpl clearClusterConfigurations() {
clusterConfigurations.clear();
@ -685,7 +695,6 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this;
}
@Override
public int getJournalMinFiles() {
return journalMinFiles;
@ -1280,6 +1289,28 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this;
}
public TransportConfiguration[] getTransportConfigurations(String... connectorNames) {
return getTransportConfigurations(Arrays.asList(connectorNames));
}
public TransportConfiguration[] getTransportConfigurations(final List<String> connectorNames) {
TransportConfiguration[] tcConfigs = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class, connectorNames.size());
int count = 0;
for (String connectorName : connectorNames) {
TransportConfiguration connector = getConnectorConfigurations().get(connectorName);
if (connector == null) {
ActiveMQServerLogger.LOGGER.warn("bridgeNoConnector(connectorName)");
return null;
}
tcConfigs[count++] = connector;
}
return tcConfigs;
}
@Override
public boolean isResolveProtocols() {
return resolveProtocols;

View File

@ -409,6 +409,14 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
parseClusterConnectionConfiguration(ccNode, config);
}
NodeList ccNodesURI = e.getElementsByTagName("cluster-connection-uri");
for (int i = 0; i < ccNodesURI.getLength(); i++) {
Element ccNode = (Element) ccNodesURI.item(i);
parseClusterConnectionConfigurationURI(ccNode, config);
}
NodeList dvNodes = e.getElementsByTagName("divert");
for (int i = 0; i < dvNodes.getLength(); i++) {
@ -1236,7 +1244,18 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
}
}
private void parseClusterConnectionConfiguration(final Element e, final Configuration mainConfig) {
private void parseClusterConnectionConfigurationURI(final Element e, final Configuration mainConfig) throws Exception {
String name = e.getAttribute("name");
String uri = e.getAttribute("address");
ClusterConnectionConfiguration config = mainConfig.addClusterConfiguration(name, uri);
System.out.println("Adding cluster connection :: " + config);
}
private void parseClusterConnectionConfiguration(final Element e, final Configuration mainConfig) throws Exception {
String name = e.getAttribute("name");
String address = getString(e, "address", null, Validators.NOT_NULL_OR_EMPTY);

View File

@ -869,7 +869,7 @@ public interface ActiveMQServerLogger extends BasicLogger {
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222134, value = "No connector defined with name {0}. The bridge will not be deployed.",
format = Message.Format.MESSAGE_FORMAT)
void bridgeNoConnector(String name);
void noConnector(String name);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222135, value = "Stopping Redistributor, Timed out waiting for tasks to complete", format = Message.Format.MESSAGE_FORMAT)

View File

@ -73,7 +73,7 @@ public class BackupManager implements ActiveMQComponent {
* configuration, informing the cluster manager so that it can add it to its topology and announce itself to the cluster.
* */
@Override
public synchronized void start() {
public synchronized void start() throws Exception {
if (started)
return;
//deploy the backup connectors using the cluster configuration
@ -117,14 +117,18 @@ public class BackupManager implements ActiveMQComponent {
/*
* create the connectors using the cluster configurations
* */
private void deployBackupConnector(final ClusterConnectionConfiguration config) {
TransportConfiguration connector = ClusterConfigurationUtil.getTransportConfiguration(config, configuration);
private void deployBackupConnector(final ClusterConnectionConfiguration config) throws Exception {
if (!config.validateConfiguration()) {
return;
}
TransportConfiguration connector = config.getTransportConfiguration(configuration);
if (connector == null)
return;
if (config.getDiscoveryGroupName() != null) {
DiscoveryGroupConfiguration dg = ClusterConfigurationUtil.getDiscoveryGroupConfiguration(config, configuration);
DiscoveryGroupConfiguration dg = config.getDiscoveryGroupConfiguration(configuration);
if (dg == null)
return;
@ -134,7 +138,7 @@ public class BackupManager implements ActiveMQComponent {
backupConnectors.add(backupConnector);
}
else {
TransportConfiguration[] tcConfigs = ClusterConfigurationUtil.getTransportConfigurations(config, configuration);
TransportConfiguration[] tcConfigs = config.getTransportConfigurations(configuration);
StaticBackupConnector backupConnector = new StaticBackupConnector(tcConfigs, config.getName(), connector, config.getRetryInterval(), clusterManager);

View File

@ -1,87 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.cluster;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import java.lang.reflect.Array;
import java.util.List;
public class ClusterConfigurationUtil {
public static TransportConfiguration getTransportConfiguration(ClusterConnectionConfiguration config,
Configuration configuration) {
if (config.getName() == null) {
ActiveMQServerLogger.LOGGER.clusterConnectionNotUnique();
return null;
}
if (config.getAddress() == null) {
ActiveMQServerLogger.LOGGER.clusterConnectionNoForwardAddress();
return null;
}
TransportConfiguration connector = configuration.getConnectorConfigurations().get(config.getConnectorName());
if (connector == null) {
ActiveMQServerLogger.LOGGER.clusterConnectionNoConnector(config.getConnectorName());
return null;
}
return connector;
}
public static DiscoveryGroupConfiguration getDiscoveryGroupConfiguration(ClusterConnectionConfiguration config,
Configuration configuration) {
DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations().get(config.getDiscoveryGroupName());
if (dg == null) {
ActiveMQServerLogger.LOGGER.clusterConnectionNoDiscoveryGroup(config.getDiscoveryGroupName());
return null;
}
return dg;
}
public static TransportConfiguration[] getTransportConfigurations(ClusterConnectionConfiguration config,
Configuration configuration) {
return config.getStaticConnectors() != null ? connectorNameListToArray(config.getStaticConnectors(), configuration) : null;
}
public static TransportConfiguration[] connectorNameListToArray(final List<String> connectorNames,
Configuration configuration) {
TransportConfiguration[] tcConfigs = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class, connectorNames.size());
int count = 0;
for (String connectorName : connectorNames) {
TransportConfiguration connector = configuration.getConnectorConfigurations().get(connectorName);
if (connector == null) {
ActiveMQServerLogger.LOGGER.bridgeNoConnector(connectorName);
return null;
}
tcConfigs[count++] = connector;
}
return tcConfigs;
}
}

View File

@ -16,6 +16,18 @@
*/
package org.apache.activemq.artemis.core.server.cluster;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration;
@ -54,18 +66,6 @@ import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.FutureLatch;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
/**
* A ClusterManager manages {@link ClusterConnection}s, {@link BroadcastGroup}s and {@link Bridge}s.
* <p>
@ -432,7 +432,7 @@ public final class ClusterManager implements ActiveMQComponent {
}
else {
TransportConfiguration[] tcConfigs = ClusterConfigurationUtil.connectorNameListToArray(config.getStaticConnectors(), configuration);
TransportConfiguration[] tcConfigs = configuration.getTransportConfigurations(config.getStaticConnectors());
if (tcConfigs == null) {
ActiveMQServerLogger.LOGGER.bridgeCantFindConnectors(config.getName());
@ -581,10 +581,16 @@ public final class ClusterManager implements ActiveMQComponent {
}
private void deployClusterConnection(final ClusterConnectionConfiguration config) throws Exception {
TransportConfiguration connector = ClusterConfigurationUtil.getTransportConfiguration(config, configuration);
if (connector == null)
if (!config.validateConfiguration()) {
return;
}
TransportConfiguration connector = config.getTransportConfiguration(configuration);
if (connector == null) {
return;
}
if (clusterConnections.containsKey(config.getName())) {
ActiveMQServerLogger.LOGGER.clusterConnectionAlreadyExists(config.getConnectorName());
@ -594,7 +600,7 @@ public final class ClusterManager implements ActiveMQComponent {
ClusterConnectionImpl clusterConnection;
if (config.getDiscoveryGroupName() != null) {
DiscoveryGroupConfiguration dg = ClusterConfigurationUtil.getDiscoveryGroupConfiguration(config, configuration);
DiscoveryGroupConfiguration dg = config.getDiscoveryGroupConfiguration(configuration);
if (dg == null)
return;
@ -611,7 +617,7 @@ public final class ClusterManager implements ActiveMQComponent {
clusterController.addClusterConnection(clusterConnection.getName(), dg, config);
}
else {
TransportConfiguration[] tcConfigs = ClusterConfigurationUtil.getTransportConfigurations(config, configuration);
TransportConfiguration[] tcConfigs = config.getTransportConfigurations(configuration);
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug(this + " defining cluster connection towards " + Arrays.toString(tcConfigs));

View File

@ -16,6 +16,10 @@
*/
package org.apache.activemq.artemis.core.server.cluster.ha;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
@ -24,12 +28,6 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class ScaleDownPolicy {
@ -126,20 +124,6 @@ public class ScaleDownPolicy {
private static TransportConfiguration[] connectorNameListToArray(final List<String> connectorNames,
ActiveMQServer activeMQServer) {
TransportConfiguration[] tcConfigs = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class, connectorNames.size());
int count = 0;
for (String connectorName : connectorNames) {
TransportConfiguration connector = activeMQServer.getConfiguration().getConnectorConfigurations().get(connectorName);
if (connector == null) {
ActiveMQServerLogger.LOGGER.bridgeNoConnector(connectorName);
return null;
}
tcConfigs[count++] = connector;
}
return tcConfigs;
return activeMQServer.getConfiguration().getTransportConfigurations(connectorNames);
}
}

View File

@ -16,9 +16,25 @@
*/
package org.apache.activemq.artemis.core.server.cluster.impl;
import org.apache.activemq.artemis.utils.uri.URISchema;
import org.apache.commons.beanutils.Converter;
public enum MessageLoadBalancingType {
OFF("OFF"), STRICT("STRICT"), ON_DEMAND("ON_DEMAND");
static {
// for URI support on ClusterConnection
URISchema.registerConverter(new MessageLoadBalancingTypeConverter(), MessageLoadBalancingType.class);
}
static class MessageLoadBalancingTypeConverter implements Converter {
@Override
public <T> T convert(Class<T> type, Object value) {
return type.cast(MessageLoadBalancingType.getType(value.toString()));
}
}
private String type;
MessageLoadBalancingType(final String type) {
@ -28,4 +44,19 @@ public enum MessageLoadBalancingType {
public String getType() {
return type;
}
public static MessageLoadBalancingType getType(String string) {
if (string.equals(OFF.getType())) {
return MessageLoadBalancingType.OFF;
}
else if (string.equals(STRICT.getType())) {
return MessageLoadBalancingType.STRICT;
}
else if (string.equals(ON_DEMAND.getType())) {
return MessageLoadBalancingType.ON_DEMAND;
}
else {
return null;
}
}
}

View File

@ -18,10 +18,13 @@ package org.apache.activemq.artemis.core.server.embedded;
import javax.management.MBeanServer;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
@ -42,8 +45,9 @@ public class EmbeddedActiveMQ {
*
* @param filename
*/
public void setConfigResourcePath(String filename) {
public EmbeddedActiveMQ setConfigResourcePath(String filename) {
configResourcePath = filename;
return this;
}
/**
@ -51,8 +55,30 @@ public class EmbeddedActiveMQ {
*
* @param securityManager
*/
public void setSecurityManager(ActiveMQSecurityManager securityManager) {
public EmbeddedActiveMQ setSecurityManager(ActiveMQSecurityManager securityManager) {
this.securityManager = securityManager;
return this;
}
/**
* It will iterate the cluster connections until you have at least the number of expected servers
* @param timeWait Time to wait on each iteration
* @param unit unit of time to wait
* @param iterations number of iterations
* @param servers number of minimal servers
* @return
*/
public boolean waitClusterForming(long timeWait, TimeUnit unit, int iterations, int servers) throws Exception {
for (int i = 0; i < iterations; i++) {
for (ClusterConnection connection : activeMQServer.getClusterManager().getClusterConnections()) {
if (connection.getTopology().getMembers().size() == servers) {
return true;
}
Thread.sleep(unit.toMillis(timeWait));
}
}
return false;
}
/**
@ -60,8 +86,9 @@ public class EmbeddedActiveMQ {
*
* @param mbeanServer
*/
public void setMbeanServer(MBeanServer mbeanServer) {
public EmbeddedActiveMQ setMbeanServer(MBeanServer mbeanServer) {
this.mbeanServer = mbeanServer;
return this;
}
/**
@ -70,18 +97,19 @@ public class EmbeddedActiveMQ {
*
* @param configuration
*/
public void setConfiguration(Configuration configuration) {
public EmbeddedActiveMQ setConfiguration(Configuration configuration) {
this.configuration = configuration;
return this;
}
public ActiveMQServer getActiveMQServer() {
return activeMQServer;
}
public void start() throws Exception {
public EmbeddedActiveMQ start() throws Exception {
initStart();
activeMQServer.start();
return this;
}
protected void initStart() throws Exception {
@ -105,7 +133,8 @@ public class EmbeddedActiveMQ {
}
}
public void stop() throws Exception {
public EmbeddedActiveMQ stop() throws Exception {
activeMQServer.stop();
return this;
}
}

View File

@ -49,7 +49,6 @@ import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import java.lang.reflect.Array;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -392,20 +391,6 @@ public class SharedNothingLiveActivation extends LiveActivation {
}
private TransportConfiguration[] connectorNameListToArray(final List<String> connectorNames) {
TransportConfiguration[] tcConfigs = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class, connectorNames.size());
int count = 0;
for (String connectorName : connectorNames) {
TransportConfiguration connector = activeMQServer.getConfiguration().getConnectorConfigurations().get(connectorName);
if (connector == null) {
ActiveMQServerLogger.LOGGER.bridgeNoConnector(connectorName);
return null;
}
tcConfigs[count++] = connector;
}
return tcConfigs;
return activeMQServer.getConfiguration().getTransportConfigurations(connectorNames);
}
}

View File

@ -16,12 +16,14 @@
*/
package org.apache.activemq.artemis.uri;
import java.util.List;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.uri.schemas.acceptor.InVMAcceptorTransportConfigurationSchema;
import org.apache.activemq.artemis.uri.schemas.acceptor.TCPAcceptorTransportConfigurationSchema;
import org.apache.activemq.artemis.utils.uri.URIFactory;
import java.util.List;
public class AcceptorTransportConfigurationParser extends URIFactory<List<TransportConfiguration>, String> {
public AcceptorTransportConfigurationParser() {

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.uri;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.uri.schemas.clusterConnection.ClusterConnectionMulticastSchema;
import org.apache.activemq.artemis.uri.schemas.clusterConnection.ClusterConnectionStaticSchema;
import org.apache.activemq.artemis.utils.uri.URIFactory;
public class ClusterConnectionConfigurationParser extends URIFactory<ClusterConnectionConfiguration, String> {
public ClusterConnectionConfigurationParser() {
registerSchema(new ClusterConnectionStaticSchema());
registerSchema(new ClusterConnectionMulticastSchema());
}
}

View File

@ -14,9 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.uri;
package org.apache.activemq.artemis.uri.schemas.acceptor;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory;
import org.apache.activemq.artemis.uri.schema.connector.InVMTransportConfigurationSchema;
public class InVMAcceptorTransportConfigurationSchema extends InVMTransportConfigurationSchema {

View File

@ -14,9 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.uri;
package org.apache.activemq.artemis.uri.schemas.acceptor;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory;
import org.apache.activemq.artemis.uri.schema.connector.TCPTransportConfigurationSchema;
import java.net.URI;
import java.util.Set;

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.uri.schemas.clusterConnection;
import java.net.URI;
import java.util.Map;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.utils.uri.URISupport;
public class ClusterConnectionMulticastSchema extends ClusterConnectionStaticSchema {
// nothing different ATM. This is like a placeholder for future changes
@Override
public String getSchemaName() {
return "multicast";
}
@Override
public void populateObject(URI uri, ClusterConnectionConfiguration bean) throws Exception {
if (URISupport.isCompositeURI(uri)) {
super.populateObject(uri, bean);
}
else {
bean.setDiscoveryGroupName(uri.getHost());
Map<String, String> parameters = URISupport.parseParameters(uri);
setData(uri, bean, parameters);
}
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.uri.schemas.clusterConnection;
import java.net.URI;
import java.util.Map;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.utils.uri.URISchema;
import org.apache.activemq.artemis.utils.uri.URISupport;
public class ClusterConnectionStaticSchema extends URISchema<ClusterConnectionConfiguration, String> {
@Override
public String getSchemaName() {
return "static";
}
@Override
protected ClusterConnectionConfiguration internalNewObject(URI uri,
Map<String, String> query,
String param) throws Exception {
ClusterConnectionConfiguration configuration = new ClusterConnectionConfiguration();
populateObject(uri, configuration);
return configuration;
}
@Override
public void populateObject(URI uri, ClusterConnectionConfiguration bean) throws Exception {
URISupport.CompositeData compositeData = URISupport.parseComposite(uri);
bean.setCompositeMembers(compositeData);
setData(uri, bean, compositeData.getParameters());
}
@Override
protected URI internalNewURI(ClusterConnectionConfiguration bean) throws Exception {
return null;
}
}

View File

@ -458,18 +458,12 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="cluster-connections" maxOccurs="1" minOccurs="0">
<xsd:element name="cluster-connections" type="clusterConnectionChoiceType" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
a list of cluster connections
</xsd:documentation>
</xsd:annotation>
<xsd:complexType>
<xsd:sequence>
<xsd:element name="cluster-connection" type="cluster-connectionType" maxOccurs="unbounded"
minOccurs="0"/>
</xsd:sequence>
</xsd:complexType>
</xsd:element>
<xsd:element name="grouping-handler" type="groupingHandlerType" maxOccurs="1" minOccurs="0">
@ -1181,6 +1175,33 @@
<!-- CLUSTER CONNECTION CONFIGURATION -->
<xsd:complexType name="clusterConnectionChoiceType">
<xsd:sequence>
<xsd:choice maxOccurs="unbounded">
<xsd:element name="cluster-connection-uri" type="cluster-connectionUriType"/>
<xsd:element name="cluster-connection" type="cluster-connectionType">
</xsd:element>
</xsd:choice>
</xsd:sequence>
</xsd:complexType>
<xsd:complexType name="cluster-connectionUriType">
<xsd:attribute name="address" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
uri of the cluster connection
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="name" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
name of the cluster connection
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<xsd:complexType name="cluster-connectionType">
<xsd:sequence>
<xsd:element name="address" type="xsd:string" maxOccurs="1" minOccurs="1">
@ -1388,6 +1409,31 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="uri" type="xsd:string" use="optional">
<xsd:annotation>
<xsd:documentation>
The URI for the cluster connection options
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<xsd:complexType name="cluster-connection-uri-type">
<xsd:attribute name="name" type="xsd:ID" use="required">
<xsd:annotation>
<xsd:documentation>
unique name for this cluster connection
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="uri" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
The URI for the cluster connection options
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- DIVERT CONFIGURATION TYPE -->

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;
import org.junit.Assert;
import org.junit.Test;
public class FileConfigurationParserTest extends ActiveMQTestBase {
@ -69,6 +70,23 @@ public class FileConfigurationParserTest extends ActiveMQTestBase {
deploymentManager.readConfiguration();
}
@Test
public void testParsingClusterConnectionURIs() throws Exception {
FileConfigurationParser parser = new FileConfigurationParser();
String configStr = firstPart + "<cluster-connections>\n" +
" <cluster-connection-uri name=\"my-cluster\" address=\"multicast://my-discovery-group?messageLoadBalancingType=STRICT;retryInterval=333;connectorName=netty-connector;maxHops=1\"/>\n" +
"</cluster-connections>\n" + lastPart;
ByteArrayInputStream input = new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
Configuration config = parser.parseMainConfig(input);
Assert.assertEquals(1, config.getClusterConfigurations().size());
Assert.assertEquals("my-discovery-group", config.getClusterConfigurations().get(0).getDiscoveryGroupName());
Assert.assertEquals(333, config.getClusterConfigurations().get(0).getRetryInterval());
}
@Test
public void testParsingDefaultServerConfig() throws Exception {
FileConfigurationParser parser = new FileConfigurationParser();

View File

@ -499,13 +499,22 @@ public abstract class ActiveMQTestBase extends Assert {
}
}
private static int failedGCCalls = 0;
public static void forceGC() {
if (failedGCCalls >= 10) {
log.info("ignoring forceGC call since it seems System.gc is not working anyways");
return;
}
log.info("#test forceGC");
CountDownLatch finalized = new CountDownLatch(1);
WeakReference<DumbReference> dumbReference = new WeakReference<>(new DumbReference(finalized));
long timeout = System.currentTimeMillis() + 1000;
// A loop that will wait GC, using the minimal time as possible
while (!(dumbReference.get() == null && finalized.getCount() == 0)) {
while (!(dumbReference.get() == null && finalized.getCount() == 0) && System.currentTimeMillis() < timeout) {
System.gc();
System.runFinalization();
try {
@ -514,7 +523,16 @@ public abstract class ActiveMQTestBase extends Assert {
catch (InterruptedException e) {
}
}
log.info("#test forceGC Done");
if (dumbReference.get() != null) {
failedGCCalls++;
log.info("It seems that GC is disabled at your VM");
}
else {
// a success would reset the count
failedGCCalls = 0;
}
log.info("#test forceGC Done ");
}
public static void forceGC(final Reference<?> ref, final long timeout) {

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.uri;
import java.net.URI;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.junit.Assert;
import org.junit.Test;
public class ClusterConnectionConfigurationTest {
@Test
public void testClusterConnectionStatic() throws Exception {
ClusterConnectionConfigurationParser parser = new ClusterConnectionConfigurationParser();
ClusterConnectionConfiguration configuration = parser.newObject(new URI("static:(tcp://localhost:6556,tcp://localhost:6557)?minLargeMessageSize=132;s&messageLoadBalancingType=OFF"), null);
Assert.assertEquals(MessageLoadBalancingType.OFF, configuration.getMessageLoadBalancingType());
Assert.assertEquals(132, configuration.getMinLargeMessageSize());
Assert.assertEquals("tcp://localhost:6556", configuration.getCompositeMembers().getComponents()[0].toString());
Assert.assertEquals("tcp://localhost:6557", configuration.getCompositeMembers().getComponents()[1].toString());
}
@Test
public void testClusterConnectionStatic2() throws Exception {
ClusterConnectionConfigurationParser parser = new ClusterConnectionConfigurationParser();
ClusterConnectionConfiguration configuration = parser.newObject(new URI("static://(tcp://localhost:6556,tcp://localhost:6557)?minLargeMessageSize=132;messageLoadBalancingType=OFF"), null);
Assert.assertEquals(132, configuration.getMinLargeMessageSize());
Assert.assertEquals(MessageLoadBalancingType.OFF, configuration.getMessageLoadBalancingType());
Assert.assertEquals(2, configuration.getCompositeMembers().getComponents().length);
Assert.assertEquals("tcp://localhost:6556", configuration.getCompositeMembers().getComponents()[0].toString());
Assert.assertEquals("tcp://localhost:6557", configuration.getCompositeMembers().getComponents()[1].toString());
}
@Test
public void testClusterConnectionStaticOnConstrcutor() throws Exception {
ClusterConnectionConfiguration configuration = new ClusterConnectionConfiguration(new URI("static:(tcp://localhost:6556,tcp://localhost:6557)?minLargeMessageSize=132"));
Assert.assertEquals(132, configuration.getMinLargeMessageSize());
Assert.assertEquals("tcp://localhost:6556", configuration.getCompositeMembers().getComponents()[0].toString());
Assert.assertEquals("tcp://localhost:6557", configuration.getCompositeMembers().getComponents()[1].toString());
}
@Test
public void testClusterConnectionMulticast() throws Exception {
ClusterConnectionConfigurationParser parser = new ClusterConnectionConfigurationParser();
ClusterConnectionConfiguration configuration = parser.newObject(new URI("multicast://myGroup?minLargeMessageSize=132"), null);
Assert.assertEquals("myGroup", configuration.getDiscoveryGroupName());
Assert.assertEquals(132, configuration.getMinLargeMessageSize());
}
}

View File

@ -0,0 +1,244 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq.examples.clustered</groupId>
<artifactId>broker-clustered</artifactId>
<version>1.1.1-SNAPSHOT</version>
</parent>
<artifactId>clustered-static-discovery-uri</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ Artemis JMS Clustered Static Discovery Example</name>
<properties>
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-cli</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-client</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-maven-plugin</artifactId>
<executions>
<execution>
<id>create0</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<instance>${basedir}/target/server0</instance>
<configuration>${basedir}/target/classes/activemq/server0</configuration>
<!-- this makes it easier in certain envs -->
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
</configuration>
</execution>
<execution>
<id>create1</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<instance>${basedir}/target/server1</instance>
<configuration>${basedir}/target/classes/activemq/server1</configuration>
<!-- this makes it easier in certain envs -->
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
</configuration>
</execution>
<execution>
<id>create2</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<instance>${basedir}/target/server2</instance>
<configuration>${basedir}/target/classes/activemq/server2</configuration>
<!-- this makes it easier in certain envs -->
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
</configuration>
</execution>
<execution>
<id>create3</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<instance>${basedir}/target/server3</instance>
<configuration>${basedir}/target/classes/activemq/server3</configuration>
</configuration>
</execution>
<execution>
<id>start0</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<spawn>true</spawn>
<location>${basedir}/target/server0</location>
<testURI>tcp://localhost:61616</testURI>
<args>
<param>run</param>
</args>
<name>server0</name>
</configuration>
</execution>
<execution>
<id>start1</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<spawn>true</spawn>
<ignore>${noServer}</ignore>
<location>${basedir}/target/server1</location>
<testURI>tcp://localhost:61617</testURI>
<args>
<param>run</param>
</args>
<name>server1</name>
</configuration>
</execution>
<execution>
<id>start2</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<spawn>true</spawn>
<location>${basedir}/target/server2</location>
<testURI>tcp://localhost:61618</testURI>
<args>
<param>run</param>
</args>
<name>server2</name>
</configuration>
</execution>
<execution>
<id>start3</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<spawn>true</spawn>
<location>${basedir}/target/server3</location>
<testURI>tcp://localhost:61619</testURI>
<args>
<param>run</param>
</args>
<name>server3</name>
</configuration>
</execution>
<execution>
<id>runClient</id>
<goals>
<goal>runClient</goal>
</goals>
<configuration>
<clientClass>org.apache.activemq.artemis.jms.example.StaticClusteredQueueExample</clientClass>
</configuration>
</execution>
<execution>
<id>stop0</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<location>${basedir}/target/server0</location>
<args>
<param>stop</param>
</args>
</configuration>
</execution>
<execution>
<id>stop1</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<location>${basedir}/target/server1</location>
<args>
<param>stop</param>
</args>
</configuration>
</execution>
<execution>
<id>stop2</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<location>${basedir}/target/server2</location>
<args>
<param>stop</param>
</args>
</configuration>
</execution>
<execution>
<id>stop3</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<location>${basedir}/target/server3</location>
<args>
<param>stop</param>
</args>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.apache.activemq.examples.clustered</groupId>
<artifactId>clustered-static-discovery-uri</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,58 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<html>
<head>
<title>ActiveMQ Artemis JMS Load Balanced Static Clustered Queue Example</title>
<link rel="stylesheet" type="text/css" href="../../../common/common.css" />
<link rel="stylesheet" type="text/css" href="../../../common/prettify.css" />
<script type="text/javascript" src="../../../common/prettify.js"></script>
</head>
<body onload="prettyPrint()">
<h1>JMS Load Balanced Static Clustered Queue Example</h1>
<pre>To run the example, simply type <b>mvn verify</b> from this directory, <br>or <b>mvn -PnoServer verify</b> if you want to start and create the server manually.</pre>
<p>This example demonstrates a JMS queue deployed on two different nodes. The two nodes are configured to form a cluster
from a <em>static</em> list of nodes.</p>
<p>We then create a consumer on the queue on each node, and we create a producer on only one of the nodes.</p>
<p>We then send some messages via the producer, and we verify that <b>both</b> consumers receive the sent messages
in a round-robin fashion.</p>
<p>In other words, ActiveMQ Artemis <b>load balances</b> the sent messages across all consumers on the cluster</p>
<p>This example uses JNDI to lookup the JMS Queue and ConnectionFactory objects. If you prefer not to use
JNDI, these could be instantiated directly.</p>
<p>Here's the relevant snippet from the server configuration, which tells the server to form a cluster between the two nodes
and to load balance the messages between the nodes.</p>
<pre class="prettyprint">
<code>&lt;cluster-connection name="my-cluster"&gt;
&lt;address&gt;jms&lt;/address&gt;
&lt;connector-ref>netty-connector&lt;/connector-ref>
&lt;retry-interval&gt;500&lt;/retry-interval&gt;
&lt;use-duplicate-detection&gt;true&lt;/use-duplicate-detection&gt;
&lt;message-load-balancing&gt;STRICT&lt;/message-load-balancing&gt;
&lt;max-hops&gt;1&lt;/max-hops&gt;
&lt;static-connectors>
&lt;connector-ref>server1-connector&lt;/connector-ref>
&lt;/static-connectors>
&lt;/cluster-connection&gt;
</code>
</pre>
<p>For more information on ActiveMQ Artemis load balancing, and clustering in general, please see the clustering
section of the user manual.</p>
</body>
</html>

View File

@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms.example;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.util.ServerUtil;
/**
* A simple example that demonstrates server side load-balancing of messages between the queue instances on different
* nodes of the cluster. The cluster is created from a static list of nodes.
*/
public class StaticClusteredQueueExample {
public static void main(final String[] args) throws Exception {
Connection initialConnection = null;
Connection connection0 = null;
Connection connection1 = null;
Connection connection2 = null;
Connection connection3 = null;
try {
// Step 2. Use direct instantiation (or JNDI if you like)
Queue queue = ActiveMQJMSClient.createQueue("exampleQueue");
// Step 3. new JMS Connection Factory object from JNDI on server 3
ConnectionFactory cf0 = new ActiveMQConnectionFactory("tcp://localhost:61619");
//grab an initial connection and wait, in reality you wouldn't do it this way but since we want to ensure an
// equal load balance we do this and then create 4 connections round robined
initialConnection = cf0.createConnection();
Thread.sleep(2000);
// Step 6. We create a JMS Connection connection0 which is a connection to server 0
connection0 = cf0.createConnection();
// Step 7. We create a JMS Connection connection1 which is a connection to server 1
connection1 = cf0.createConnection();
// Step 6. We create a JMS Connection connection0 which is a connection to server 0
connection2 = cf0.createConnection();
// Step 7. We create a JMS Connection connection1 which is a connection to server 1
connection3 = cf0.createConnection();
// Step 8. We create a JMS Session on server 0
Session session0 = connection0.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Step 9. We create a JMS Session on server 1
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Step 8. We create a JMS Session on server 0
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Step 9. We create a JMS Session on server 1
Session session3 = connection3.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Step 10. We start the connections to ensure delivery occurs on them
connection0.start();
connection1.start();
connection2.start();
connection3.start();
// Step 11. We create JMS MessageConsumer objects on server 0 and server 1
MessageConsumer consumer0 = session0.createConsumer(queue);
MessageConsumer consumer1 = session1.createConsumer(queue);
MessageConsumer consumer2 = session2.createConsumer(queue);
MessageConsumer consumer3 = session3.createConsumer(queue);
Thread.sleep(2000);
// Step 12. We create a JMS MessageProducer object on server 3
MessageProducer producer = session3.createProducer(queue);
// Step 13. We send some messages to server 0
final int numMessages = 20;
for (int i = 0; i < numMessages; i++) {
TextMessage message = session0.createTextMessage("This is text message " + i);
producer.send(message);
System.out.println("Sent message: " + message.getText());
}
Thread.sleep(2000);
// Step 14. We now consume those messages on *both* server 0 and server 1.
// We note the messages have been distributed between servers in a round robin fashion
// JMS Queues implement point-to-point message where each message is only ever consumed by a
// maximum of one consumer
int con0Node = ServerUtil.getServer(connection0);
int con1Node = ServerUtil.getServer(connection1);
int con2Node = ServerUtil.getServer(connection2);
int con3Node = ServerUtil.getServer(connection3);
if (con0Node + con1Node + con2Node + con3Node != 6) {
throw new IllegalStateException();
}
for (int i = 0; i < numMessages; i += 4) {
TextMessage message0 = (TextMessage) consumer0.receive(5000);
System.out.println("Got message: " + message0.getText() + " from node " + con0Node);
TextMessage message1 = (TextMessage) consumer1.receive(5000);
System.out.println("Got message: " + message1.getText() + " from node " + con1Node);
TextMessage message2 = (TextMessage) consumer2.receive(5000);
System.out.println("Got message: " + message2.getText() + " from node " + con2Node);
TextMessage message3 = (TextMessage) consumer3.receive(5000);
System.out.println("Got message: " + message3.getText() + " from node " + con3Node);
}
}
finally {
// Step 15. Be sure to close our resources!
if (initialConnection != null) {
initialConnection.close();
}
if (connection0 != null) {
connection0.close();
}
if (connection1 != null) {
connection1.close();
}
if (connection2 != null) {
connection2.close();
}
if (connection3 != null) {
connection3.close();
}
}
}
}

View File

@ -0,0 +1,70 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="urn:activemq"
xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
<jms xmlns="urn:activemq:jms">
<!--the queue used by the example-->
<queue name="exampleQueue"/>
</jms>
<core xmlns="urn:activemq:core">
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/largemessages</large-messages-directory>
<paging-directory>./data/paging</paging-directory>
<!-- Connectors -->
<connectors>
<connector name="netty-connector">tcp://localhost:61616</connector>
</connectors>
<!-- Acceptors -->
<acceptors>
<acceptor name="netty-acceptor">tcp://localhost:61616</acceptor>
</acceptors>
<cluster-connections>
<cluster-connection-uri name="my-cluster" address="static://(tcp://localhost:61617)?connectorName=netty-connector;retryInterval=500;messageLoadBalancingType=STRICT;maxHops=1"/>
</cluster-connections>
<!-- Other config -->
<security-settings>
<!--security for example queue-->
<security-setting match="jms.queue.exampleQueue">
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="send" roles="guest"/>
</security-setting>
</security-settings>
</core>
</configuration>

View File

@ -0,0 +1,70 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="urn:activemq"
xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
<jms xmlns="urn:activemq:jms">
<!--the queue used by the example-->
<queue name="exampleQueue"/>
</jms>
<core xmlns="urn:activemq:core">
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/largemessages</large-messages-directory>
<paging-directory>./data/paging</paging-directory>
<!-- Connectors -->
<connectors>
<connector name="netty-connector">tcp://localhost:61617</connector>
</connectors>
<!-- Acceptors -->
<acceptors>
<acceptor name="netty-acceptor">tcp://localhost:61617</acceptor>
</acceptors>
<!-- Clustering configuration -->
<cluster-connections>
<cluster-connection-uri name="my-cluster" address="static://(tcp://localhost:61616)?connectorName=netty-connector;retryInterval=500;messageLoadBalancingType=STRICT;maxHops=1"/>
</cluster-connections>
<!-- Other config -->
<security-settings>
<!--security for example queue-->
<security-setting match="jms.queue.exampleQueue">
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="send" roles="guest"/>
</security-setting>
</security-settings>
</core>
</configuration>

View File

@ -0,0 +1,66 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="urn:activemq"
xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
<jms xmlns="urn:activemq:jms">
<!--the queue used by the example-->
<queue name="exampleQueue"/>
</jms>
<core xmlns="urn:activemq:core">
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/largemessages</large-messages-directory>
<paging-directory>./data/paging</paging-directory>
<!-- Connectors -->
<connectors>
<connector name="netty-connector">tcp://localhost:61618</connector>
</connectors>
<!-- Acceptors -->
<acceptors>
<acceptor name="netty-acceptor">tcp://localhost:61618</acceptor>
</acceptors>
<cluster-connections>
<cluster-connection-uri name="my-cluster" address="static://(tcp://localhost:61616)?connectorName=netty-connector;retryInterval=500;messageLoadBalancingType=STRICT;maxHops=1"/>
</cluster-connections>
<!-- Other config -->
<security-settings>
<!--security for example queue-->
<security-setting match="jms.queue.exampleQueue">
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="send" roles="guest"/>
</security-setting>
</security-settings>
</core>
</configuration>

View File

@ -0,0 +1,67 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="urn:activemq"
xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
<jms xmlns="urn:activemq:jms">
<!--the queue used by the example-->
<queue name="exampleQueue"/>
</jms>
<core xmlns="urn:activemq:core">
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/largemessages</large-messages-directory>
<paging-directory>./data/paging</paging-directory>
<!-- Connectors -->
<connectors>
<connector name="netty-connector">tcp://localhost:61619</connector>
</connectors>
<!-- Acceptors -->
<acceptors>
<acceptor name="netty-acceptor">tcp://localhost:61619</acceptor>
</acceptors>
<!-- Clustering configuration -->
<cluster-connections>
<cluster-connection-uri name="my-cluster" address="static://(tcp://localhost:61616)?connectorName=netty-connector;retryInterval=500;messageLoadBalancingType=STRICT;maxHops=1"/>
</cluster-connections>
<!-- Other config -->
<security-settings>
<!--security for example queue-->
<security-setting match="jms.queue.exampleQueue">
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="send" roles="guest"/>
</security-setting>
</security-settings>
</core>
</configuration>

View File

@ -0,0 +1,156 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq.examples.clustered</groupId>
<artifactId>broker-clustered</artifactId>
<version>1.1.1-SNAPSHOT</version>
</parent>
<artifactId>clustered-topic-uri</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ Artemis JMS Clustered Topic Example</name>
<properties>
<activemq.basedir>${project.basedir}/../../../..</activemq.basedir>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-client</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-maven-plugin</artifactId>
<executions>
<execution>
<id>create0</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<instance>${basedir}/target/server0</instance>
<configuration>${basedir}/target/classes/activemq/server0</configuration>
<!-- this makes it easier in certain envs -->
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
</configuration>
</execution>
<execution>
<id>create1</id>
<goals>
<goal>create</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<instance>${basedir}/target/server1</instance>
<configuration>${basedir}/target/classes/activemq/server1</configuration>
<!-- this makes it easier in certain envs -->
<javaOptions>-Djava.net.preferIPv4Stack=true</javaOptions>
</configuration>
</execution>
<execution>
<id>start0</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<spawn>true</spawn>
<location>${basedir}/target/server0</location>
<testURI>tcp://localhost:61616</testURI>
<args>
<param>run</param>
</args>
<name>server0</name>
</configuration>
</execution>
<execution>
<id>start1</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<spawn>true</spawn>
<location>${basedir}/target/server1</location>
<testURI>tcp://localhost:61617</testURI>
<args>
<param>run</param>
</args>
<name>server1</name>
</configuration>
</execution>
<execution>
<id>runClient</id>
<goals>
<goal>runClient</goal>
</goals>
<configuration>
<clientClass>org.apache.activemq.artemis.jms.example.ClusteredTopicExample</clientClass>
</configuration>
</execution>
<execution>
<id>stop0</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<location>${basedir}/target/server0</location>
<args>
<param>stop</param>
</args>
</configuration>
</execution>
<execution>
<id>stop1</id>
<goals>
<goal>cli</goal>
</goals>
<configuration>
<ignore>${noServer}</ignore>
<location>${basedir}/target/server1</location>
<args>
<param>stop</param>
</args>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>org.apache.activemq.examples.clustered</groupId>
<artifactId>clustered-topic-uri</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,46 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<html>
<head>
<title>ActiveMQ Artemis JMS Clustered Topic Example</title>
<link rel="stylesheet" type="text/css" href="../../../common/common.css" />
<link rel="stylesheet" type="text/css" href="../../../common/prettify.css" />
<script type="text/javascript" src="../../../common/prettify.js"></script>
</head>
<body onload="prettyPrint()">
<h1>JMS Clustered Topic Example</h1>
<pre>To run the example, simply type <b>mvn verify</b> from this directory, <br>or <b>mvn -PnoServer verify</b> if you want to start and create the server manually.</pre>
<p>This example demonstrates a JMS Topic deployed on two different nodes. The two nodes are configured to form a cluster.</p>
<p>We then create a subscriber on the topic on each node, and we create a producer on only one of the nodes.</p>
<p>We then send some messages via the producer, and we verify that <b>both</b> subscribers receive all the
sent messages.</p>
<p>A JMS Topic is an example of <b>publish-subscribe</b> messaging where all subscribers receive all the
messages sent to the topic (assuming they have no message selectors).</p>
<p>This example uses JNDI to lookup the JMS Queue and ConnectionFactory objects. If you prefer not to use
JNDI, these could be instantiated directly.
<p>Here's the relevant snippet from the server configuration, which tells the server to form a cluster between the two nodes
and to load balance the messages between the nodes.</p>
<p>This example differes from different-topic as it will use an URI to define the cluster connection.</p>
<pre class="prettyprint"><code>&lt;cluster-connection-uri name="my-cluster" address="uri="multicast://my-discovery-group?messageLoadBalancingType=STRICT;retryInterval=500;connectorName=netty-connector;maxHops=1"/&gt;</code></pre>
<p>For more information on ActiveMQ Artemis load balancing, and clustering in general, please see the clustering
section of the user manual.</p>
</body>
</html>

View File

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.jms.example;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.InitialContext;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
/**
* A simple example that shows a JMS Topic clustered across two nodes of a cluster.
* Messages are sent on one node and received by consumers on both nodes.
*/
public class ClusteredTopicExample {
public static void main(final String[] args) throws Exception {
Connection connection0 = null;
Connection connection1 = null;
InitialContext ic0 = null;
InitialContext ic1 = null;
try {
// Step 1. Instantiate topic
Topic topic = ActiveMQJMSClient.createTopic("exampleTopic");
// Step 2. Look-up a JMS Connection Factory object from JNDI on server 0
ConnectionFactory cf0 = new ActiveMQConnectionFactory("tcp://localhost:61616");
// Step 3. Look-up a JMS Connection Factory object from JNDI on server 1
ConnectionFactory cf1 = new ActiveMQConnectionFactory("tcp://localhost:61617");
// Step 4. We create a JMS Connection connection0 which is a connection to server 0
connection0 = cf0.createConnection();
// Step 5. We create a JMS Connection connection1 which is a connection to server 1
connection1 = cf1.createConnection();
// Step 6. We create a JMS Session on server 0
Session session0 = connection0.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Step 7. We create a JMS Session on server 1
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Step 8. We start the connections to ensure delivery occurs on them
connection0.start();
connection1.start();
// Step 9. We create JMS MessageConsumer objects on server 0 and server 1
MessageConsumer consumer0 = session0.createConsumer(topic);
MessageConsumer consumer1 = session1.createConsumer(topic);
Thread.sleep(1000);
// Step 10. We create a JMS MessageProducer object on server 0
MessageProducer producer = session0.createProducer(topic);
// Step 11. We send some messages to server 0
final int numMessages = 10;
for (int i = 0; i < numMessages; i++) {
TextMessage message = session0.createTextMessage("This is text message " + i);
producer.send(message);
System.out.println("Sent message: " + message.getText());
}
// Step 12. We now consume those messages on *both* server 0 and server 1.
// We note that all messages have been consumed by *both* consumers.
// JMS Topics implement *publish-subscribe* messaging where all consumers get a copy of all messages
for (int i = 0; i < numMessages; i++) {
TextMessage message0 = (TextMessage) consumer0.receive(5000);
System.out.println("Got message: " + message0.getText() + " from node 0");
TextMessage message1 = (TextMessage) consumer1.receive(5000);
System.out.println("Got message: " + message1.getText() + " from node 1");
}
}
finally {
// Step 15. Be sure to close our JMS resources!
if (connection0 != null) {
connection0.close();
}
if (connection1 != null) {
connection1.close();
}
if (ic0 != null) {
ic0.close();
}
if (ic1 != null) {
ic1.close();
}
}
}
}

View File

@ -0,0 +1,89 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="urn:activemq"
xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
<jms xmlns="urn:activemq:jms">
<!--the topic used by the example-->
<topic name="exampleTopic"/>
</jms>
<core xmlns="urn:activemq:core">
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/largemessages</large-messages-directory>
<paging-directory>./data/paging</paging-directory>
<!-- Connectors -->
<connectors>
<connector name="netty-connector">tcp://localhost:61616</connector>
</connectors>
<!-- Acceptors -->
<acceptors>
<acceptor name="netty-acceptor">tcp://localhost:61616</acceptor>
</acceptors>
<!-- Clustering configuration -->
<broadcast-groups>
<broadcast-group name="my-broadcast-group">
<group-address>${udp-address:231.7.7.7}</group-address>
<group-port>9876</group-port>
<broadcast-period>100</broadcast-period>
<connector-ref>netty-connector</connector-ref>
</broadcast-group>
</broadcast-groups>
<discovery-groups>
<discovery-group name="my-discovery-group">
<group-address>${udp-address:231.7.7.7}</group-address>
<group-port>9876</group-port>
<refresh-timeout>10000</refresh-timeout>
</discovery-group>
</discovery-groups>
<cluster-connections>
<cluster-connection-uri name="my-cluster" address="multicast://my-discovery-group?messageLoadBalancingType=STRICT;retryInterval=500;connectorName=netty-connector;maxHops=1"/>
</cluster-connections>
<!-- other configuration -->
<security-settings>
<!--security for example queue-->
<security-setting match="jms.topic.exampleTopic">
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="send" roles="guest"/>
</security-setting>
</security-settings>
</core>
</configuration>

View File

@ -0,0 +1,87 @@
<?xml version='1.0'?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="urn:activemq"
xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
<jms xmlns="urn:activemq:jms">
<!--the topic used by the example-->
<topic name="exampleTopic"/>
</jms>
<core xmlns="urn:activemq:core">
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/largemessages</large-messages-directory>
<paging-directory>./data/paging</paging-directory>
<!-- Connectors -->
<connectors>
<connector name="netty-connector">tcp://localhost:61617</connector>
</connectors>
<!-- Acceptors -->
<acceptors>
<acceptor name="netty-acceptor">tcp://localhost:61617</acceptor>
</acceptors>
<!-- Clustering configuration -->
<broadcast-groups>
<broadcast-group name="my-broadcast-group">
<group-address>${udp-address:231.7.7.7}</group-address>
<group-port>9876</group-port>
<broadcast-period>100</broadcast-period>
<connector-ref>netty-connector</connector-ref>
</broadcast-group>
</broadcast-groups>
<discovery-groups>
<discovery-group name="my-discovery-group">
<group-address>${udp-address:231.7.7.7}</group-address>
<group-port>9876</group-port>
<refresh-timeout>10000</refresh-timeout>
</discovery-group>
</discovery-groups>
<cluster-connections>
<cluster-connection-uri name="my-cluster" address="multicast://my-discovery-group?messageLoadBalancingType=STRICT;retryInterval=500;connectorName=netty-connector;maxHops=1"/>
</cluster-connections>
<!-- other configuration -->
<security-settings>
<!--security for example queue-->
<security-setting match="jms.topic.exampleTopic">
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="send" roles="guest"/>
</security-setting>
</security-settings>
</core>
</configuration>

View File

@ -54,7 +54,9 @@ under the License.
<module>clustered-queue</module>
<module>clustered-static-oneway</module>
<module>clustered-static-discovery</module>
<module>clustered-static-discovery-uri</module>
<module>clustered-topic</module>
<module>clustered-topic-uri</module>
<module>queue-message-redistribution</module>
<module>symmetric-cluster</module>
</modules>
@ -69,7 +71,9 @@ under the License.
<module>clustered-queue</module>
<module>clustered-static-oneway</module>
<module>clustered-static-discovery</module>
<module>clustered-static-discovery-uri</module>
<module>clustered-topic</module>
<module>clustered-topic-uri</module>
<module>queue-message-redistribution</module>
<module>symmetric-cluster</module>
</modules>

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.cluster.distribution;
import java.io.File;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -1691,6 +1692,23 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
config.getClusterConfigurations().add(clusterConf);
}
protected void setupClusterConnection(final String name,
final String uri,
int server) throws Exception {
ActiveMQServer serverFrom = servers[server];
if (serverFrom == null) {
throw new IllegalStateException("No server at node " + server);
}
ClusterConnectionConfiguration configuration = new ClusterConnectionConfiguration(new URI(uri)).setName(name);
serverFrom.getConfiguration().addClusterConfiguration(configuration);
}
protected void setupClusterConnection(final String name,
final String address,
final MessageLoadBalancingType messageLoadBalancingType,

View File

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.junit.Before;
import org.junit.Test;
/**
* A SymmetricClusterTest
*
* Most of the cases are covered in OneWayTwoNodeClusterTest - we don't duplicate them all here
*/
public class URISimpleClusterTest extends ClusterTestBase {
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
setupServers();
}
protected boolean isNetty() {
return true;
}
@Test
public void testBasicRoundRobin() throws Exception {
setupCluster(MessageLoadBalancingType.ON_DEMAND);
startServers();
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
setupSessionFactory(3, isNetty());
setupSessionFactory(4, isNetty());
createQueue(0, "queues.testaddress", "queue0", null, false);
createQueue(1, "queues.testaddress", "queue0", null, false);
createQueue(2, "queues.testaddress", "queue0", null, false);
createQueue(3, "queues.testaddress", "queue0", null, false);
createQueue(4, "queues.testaddress", "queue0", null, false);
addConsumer(0, 0, "queue0", null);
addConsumer(1, 1, "queue0", null);
addConsumer(2, 2, "queue0", null);
addConsumer(3, 3, "queue0", null);
addConsumer(4, 4, "queue0", null);
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);
waitForBindings(2, "queues.testaddress", 1, 1, true);
waitForBindings(3, "queues.testaddress", 1, 1, true);
waitForBindings(4, "queues.testaddress", 1, 1, true);
waitForBindings(0, "queues.testaddress", 4, 4, false);
waitForBindings(1, "queues.testaddress", 4, 4, false);
waitForBindings(2, "queues.testaddress", 4, 4, false);
waitForBindings(3, "queues.testaddress", 4, 4, false);
waitForBindings(4, "queues.testaddress", 4, 4, false);
send(0, "queues.testaddress", 10, false, null);
verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
verifyNotReceive(0, 1, 2, 3, 4);
}
protected static String generateURI(int serverID) {
return "tcp://127.0.0.1:" + (org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + serverID);
}
protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception {
for (int i = 0; i < 5; i++) {
servers[i].getConfiguration().addConnectorConfiguration("netty-connector", generateURI(i));
}
setupClusterConnection("cluster", "static://(" + generateURI(1) + "," + generateURI(2) + "," + generateURI(3) + "," + generateURI(4) + ")?connectorName=netty-connector;retryInterval=500;messageLoadBalancingType=" + messageLoadBalancingType.toString() + ";maxHops=1;address=queues", 0);
setupClusterConnection("cluster", "static://(" + generateURI(0) + "," + generateURI(2) + "," + generateURI(3) + "," + generateURI(4) + ")?connectorName=netty-connector;retryInterval=500;messageLoadBalancingType=" + messageLoadBalancingType.toString() + ";maxHops=1;address=queues", 1);
setupClusterConnection("cluster", "static://(" + generateURI(0) + "," + generateURI(1) + "," + generateURI(3) + "," + generateURI(4) + ")?connectorName=netty-connector;retryInterval=500;messageLoadBalancingType=" + messageLoadBalancingType.toString() + ";maxHops=1;address=queues", 2);
setupClusterConnection("cluster", "static://(" + generateURI(0) + "," + generateURI(1) + "," + generateURI(2) + "," + generateURI(4) + ")?connectorName=netty-connector;retryInterval=500;messageLoadBalancingType=" + messageLoadBalancingType.toString() + ";maxHops=1;address=queues", 3);
setupClusterConnection("cluster", "static://(" + generateURI(0) + "," + generateURI(1) + "," + generateURI(2) + "," + generateURI(3) + ")?connectorName=netty-connector;retryInterval=500;messageLoadBalancingType=" + messageLoadBalancingType.toString() + ";maxHops=1;address=queues", 4);
}
protected void setupServers() throws Exception {
setupServer(0, isFileStorage(), isNetty());
setupServer(1, isFileStorage(), isNetty());
setupServer(2, isFileStorage(), isNetty());
setupServer(3, isFileStorage(), isNetty());
setupServer(4, isFileStorage(), isNetty());
}
protected void startServers() throws Exception {
startServers(0, 1, 2, 3, 4);
}
protected void stopServers() throws Exception {
closeAllConsumers();
closeAllSessionFactories();
closeAllServerLocatorsFactories();
stopServers(0, 1, 2, 3, 4);
}
@Override
protected boolean isFileStorage() {
return false;
}
}