From 3f23c9916e694abeed281cd03ba476e183ecf866 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 14 Jan 2016 18:07:12 -0500 Subject: [PATCH] ARTEMIS-347 - supporting URIs on the cluster connection --- .../artemis/utils/uri/URIFactory.java | 9 + .../activemq/artemis/utils/uri/URISchema.java | 7 + .../artemis/utils/uri/URISupport.java | 560 ++++++++++++++++++ .../activemq/artemis/utils/URIParserTest.java | 6 +- .../config/ActiveMQDefaultConfiguration.java | 8 + .../api/core/client/TopologyMember.java | 2 + .../core/client/impl/TopologyMemberImpl.java | 13 + ...ConnectorTransportConfigurationParser.java | 6 +- .../artemis/uri/ServerLocatorParser.java | 4 + .../AbstractTransportConfigurationSchema.java | 6 +- .../InVMTransportConfigurationSchema.java | 2 +- .../TCPTransportConfigurationSchema.java | 9 +- .../AbstractServerLocatorSchema.java | 2 +- .../serverLocator}/ConnectionOptions.java | 2 +- .../InVMServerLocatorSchema.java | 6 +- .../JGroupsServerLocatorSchema.java | 2 +- .../TCPServerLocatorSchema.java | 6 +- .../UDPServerLocatorSchema.java | 25 +- .../artemis/utils/ConfigurationHelper.java | 31 +- .../activemq/artemis/uri/InVMSchema.java | 2 + .../activemq/artemis/uri/JGroupsSchema.java | 1 + .../artemis/uri/JMSConnectionOptions.java | 1 + .../activemq/artemis/uri/TCPSchema.java | 2 + .../activemq/artemis/uri/UDPSchema.java | 1 + .../jms/server/embedded/EmbeddedJMS.java | 16 +- .../ClusterConnectionConfiguration.java | 129 +++- .../artemis/core/config/Configuration.java | 6 + .../core/config/impl/ConfigurationImpl.java | 35 +- .../impl/FileConfigurationParser.java | 21 +- .../core/server/ActiveMQServerLogger.java | 2 +- .../core/server/cluster/BackupManager.java | 14 +- .../cluster/ClusterConfigurationUtil.java | 87 --- .../core/server/cluster/ClusterManager.java | 40 +- .../server/cluster/ha/ScaleDownPolicy.java | 26 +- .../impl/MessageLoadBalancingType.java | 31 + .../server/embedded/EmbeddedActiveMQ.java | 43 +- .../impl/SharedNothingLiveActivation.java | 17 +- .../AcceptorTransportConfigurationParser.java | 6 +- .../ClusterConnectionConfigurationParser.java | 31 + ...MAcceptorTransportConfigurationSchema.java | 3 +- ...PAcceptorTransportConfigurationSchema.java | 3 +- .../ClusterConnectionMulticastSchema.java | 48 ++ .../ClusterConnectionStaticSchema.java | 61 ++ .../schema/artemis-configuration.xsd | 60 +- .../impl/FileConfigurationParserTest.java | 18 + .../ClusterConnectionConfigurationTest.java | 65 ++ .../clustered-static-discovery-uri/pom.xml | 244 ++++++++ .../readme.html | 58 ++ .../example/StaticClusteredQueueExample.java | 173 ++++++ .../resources/activemq/server0/broker.xml | 70 +++ .../resources/activemq/server1/broker.xml | 70 +++ .../resources/activemq/server2/broker.xml | 66 +++ .../resources/activemq/server3/broker.xml | 67 +++ .../clustered/clustered-topic-uri/pom.xml | 156 +++++ .../clustered/clustered-topic-uri/readme.html | 46 ++ .../jms/example/ClusteredTopicExample.java | 129 ++++ .../resources/activemq/server0/broker.xml | 89 +++ .../resources/activemq/server1/broker.xml | 87 +++ examples/features/clustered/pom.xml | 4 + .../cluster/distribution/ClusterTestBase.java | 18 + .../distribution/URISimpleClusterTest.java | 130 ++++ 61 files changed, 2659 insertions(+), 223 deletions(-) create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/URISupport.java rename artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/{ => schema/connector}/AbstractTransportConfigurationSchema.java (94%) rename artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/{ => schema/connector}/InVMTransportConfigurationSchema.java (98%) rename artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/{ => schema/connector}/TCPTransportConfigurationSchema.java (90%) rename artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/{ => schema/serverLocator}/AbstractServerLocatorSchema.java (95%) rename artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/{ => schema/serverLocator}/ConnectionOptions.java (96%) rename artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/{ => schema/serverLocator}/InVMServerLocatorSchema.java (92%) rename artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/{ => schema/serverLocator}/JGroupsServerLocatorSchema.java (98%) rename artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/{ => schema/serverLocator}/TCPServerLocatorSchema.java (96%) rename artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/{ => schema/serverLocator}/UDPServerLocatorSchema.java (85%) delete mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConfigurationUtil.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/uri/ClusterConnectionConfigurationParser.java rename artemis-server/src/main/java/org/apache/activemq/artemis/uri/{ => schemas/acceptor}/InVMAcceptorTransportConfigurationSchema.java (88%) rename artemis-server/src/main/java/org/apache/activemq/artemis/uri/{ => schemas/acceptor}/TCPAcceptorTransportConfigurationSchema.java (89%) create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/uri/schemas/clusterConnection/ClusterConnectionMulticastSchema.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/uri/schemas/clusterConnection/ClusterConnectionStaticSchema.java create mode 100644 artemis-server/src/test/java/org/apache/activemq/artemis/uri/ClusterConnectionConfigurationTest.java create mode 100644 examples/features/clustered/clustered-static-discovery-uri/pom.xml create mode 100644 examples/features/clustered/clustered-static-discovery-uri/readme.html create mode 100644 examples/features/clustered/clustered-static-discovery-uri/src/main/java/org/apache/activemq/artemis/jms/example/StaticClusteredQueueExample.java create mode 100644 examples/features/clustered/clustered-static-discovery-uri/src/main/resources/activemq/server0/broker.xml create mode 100644 examples/features/clustered/clustered-static-discovery-uri/src/main/resources/activemq/server1/broker.xml create mode 100644 examples/features/clustered/clustered-static-discovery-uri/src/main/resources/activemq/server2/broker.xml create mode 100644 examples/features/clustered/clustered-static-discovery-uri/src/main/resources/activemq/server3/broker.xml create mode 100644 examples/features/clustered/clustered-topic-uri/pom.xml create mode 100644 examples/features/clustered/clustered-topic-uri/readme.html create mode 100644 examples/features/clustered/clustered-topic-uri/src/main/java/org/apache/activemq/artemis/jms/example/ClusteredTopicExample.java create mode 100644 examples/features/clustered/clustered-topic-uri/src/main/resources/activemq/server0/broker.xml create mode 100644 examples/features/clustered/clustered-topic-uri/src/main/resources/activemq/server1/broker.xml create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/URISimpleClusterTest.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/URIFactory.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/URIFactory.java index dae6faaf67..ca31e8f574 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/URIFactory.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/URIFactory.java @@ -59,6 +59,10 @@ public class URIFactory { return schemaFactory.newObject(uri, param); } + public T newObject(String uri, P param) throws Exception { + return newObject(new URI(uri), param); + } + public void populateObject(URI uri, T bean) throws Exception { URISchema schemaFactory = schemas.get(uri.getScheme()); @@ -69,6 +73,11 @@ public class URIFactory { schemaFactory.populateObject(uri, bean); } + + public void populateObject(String uri, T bean) throws Exception { + populateObject(new URI(uri), bean); + } + public URI createSchema(String scheme, T bean) throws Exception { URISchema schemaFactory = schemas.get(scheme); diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/URISchema.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/URISchema.java index 9ee206b560..0d6940e543 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/URISchema.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/URISchema.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Set; import org.apache.commons.beanutils.BeanUtilsBean; +import org.apache.commons.beanutils.Converter; public abstract class URISchema { @@ -99,6 +100,12 @@ public abstract class URISchema { private static final BeanUtilsBean beanUtils = new BeanUtilsBean(); + public static void registerConverter(Converter converter, Class type) { + synchronized (beanUtils) { + beanUtils.getConvertUtils().register(converter, type); + } + } + static { // This is to customize the BeanUtils to use Fluent Proeprties as well beanUtils.getPropertyUtils().addBeanIntrospector(new FluentPropertyBeanIntrospectorWithIgnores()); diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/URISupport.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/URISupport.java new file mode 100644 index 0000000000..4b21b6c45c --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/URISupport.java @@ -0,0 +1,560 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.uri; + +import java.io.UnsupportedEncodingException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Utility class that provides methods for parsing URI's + * + * This class can be used to split composite URI's into their component parts and is used to extract any + * URI options from each URI in order to set specific properties on Beans. + * + * (copied from activemq 5) + */ +public class URISupport { + + /** + * A composite URI can be split into one or more CompositeData object which each represent the + * individual URIs that comprise the composite one. + */ + public static class CompositeData { + + private String host; + private String scheme; + private String path; + private URI[] components; + private Map parameters; + private String fragment; + + public URI[] getComponents() { + return components; + } + + public String getFragment() { + return fragment; + } + + public Map getParameters() { + return parameters; + } + + public String getScheme() { + return scheme; + } + + public String getPath() { + return path; + } + + public String getHost() { + return host; + } + + public URI toURI() throws URISyntaxException { + StringBuffer sb = new StringBuffer(); + if (scheme != null) { + sb.append(scheme); + sb.append(':'); + } + + if (host != null && host.length() != 0) { + sb.append(host); + } + else { + sb.append('('); + for (int i = 0; i < components.length; i++) { + if (i != 0) { + sb.append(','); + } + sb.append(components[i].toString()); + } + sb.append(')'); + } + + if (path != null) { + sb.append('/'); + sb.append(path); + } + if (!parameters.isEmpty()) { + sb.append("?"); + sb.append(createQueryString(parameters)); + } + if (fragment != null) { + sb.append("#"); + sb.append(fragment); + } + return new URI(sb.toString()); + } + } + + /** + * Give a URI break off any URI options and store them in a Key / Value Mapping. + * + * @param uri The URI whose query should be extracted and processed. + * @return A Mapping of the URI options. + * @throws java.net.URISyntaxException + */ + public static Map parseQuery(String uri) throws URISyntaxException { + try { + uri = uri.substring(uri.lastIndexOf("?") + 1); // get only the relevant part of the query + Map rc = new HashMap(); + if (uri != null && !uri.isEmpty()) { + parseParameters(rc, uri.split("&")); + parseParameters(rc, uri.split(";")); + } + return rc; + } + catch (UnsupportedEncodingException e) { + throw (URISyntaxException) new URISyntaxException(e.toString(), "Invalid encoding").initCause(e); + } + } + + private static void parseParameters(Map rc, + String[] parameters) throws UnsupportedEncodingException { + for (int i = 0; i < parameters.length; i++) { + int p = parameters[i].indexOf("="); + if (p >= 0) { + String name = URLDecoder.decode(parameters[i].substring(0, p), "UTF-8"); + String value = URLDecoder.decode(parameters[i].substring(p + 1), "UTF-8"); + rc.put(name, value); + } + else { + rc.put(parameters[i], null); + } + } + } + + /** + * Given a URI parse and extract any URI query options and return them as a Key / Value mapping. + * + * This method differs from the {@link parseQuery} method in that it handles composite URI types and + * will extract the URI options from the outermost composite URI. + * + * @param uri The URI whose query should be extracted and processed. + * @return A Mapping of the URI options. + * @throws java.net.URISyntaxException + */ + public static Map parseParameters(URI uri) throws URISyntaxException { + if (!isCompositeURI(uri)) { + return uri.getQuery() == null ? emptyMap() : parseQuery(stripPrefix(uri.getQuery(), "?")); + } + else { + CompositeData data = URISupport.parseComposite(uri); + Map parameters = new HashMap(); + parameters.putAll(data.getParameters()); + if (parameters.isEmpty()) { + parameters = emptyMap(); + } + + return parameters; + } + } + + /** + * Given a Key / Value mapping create and append a URI query value that represents the mapped entries, return the + * newly updated URI that contains the value of the given URI and the appended query value. + * + * @param uri The source URI that will have the Map entries appended as a URI query value. + * @param queryParameters The Key / Value mapping that will be transformed into a URI query string. + * @return A new URI value that combines the given URI and the constructed query string. + * @throws java.net.URISyntaxException + */ + public static URI applyParameters(URI uri, Map queryParameters) throws URISyntaxException { + return applyParameters(uri, queryParameters, ""); + } + + /** + * Given a Key / Value mapping create and append a URI query value that represents the mapped entries, return the + * newly updated URI that contains the value of the given URI and the appended query value. Each entry in the query + * string is prefixed by the supplied optionPrefix string. + * + * @param uri The source URI that will have the Map entries appended as a URI query value. + * @param queryParameters The Key / Value mapping that will be transformed into a URI query string. + * @param optionPrefix A string value that when not null or empty is used to prefix each query option key. + * @return A new URI value that combines the given URI and the constructed query string. + * @throws java.net.URISyntaxException + */ + public static URI applyParameters(URI uri, + Map queryParameters, + String optionPrefix) throws URISyntaxException { + if (queryParameters != null && !queryParameters.isEmpty()) { + StringBuffer newQuery = uri.getRawQuery() != null ? new StringBuffer(uri.getRawQuery()) : new StringBuffer(); + for (Map.Entry param : queryParameters.entrySet()) { + if (param.getKey().startsWith(optionPrefix)) { + if (newQuery.length() != 0) { + newQuery.append('&'); + } + final String key = param.getKey().substring(optionPrefix.length()); + newQuery.append(key).append('=').append(param.getValue()); + } + } + uri = createURIWithQuery(uri, newQuery.toString()); + } + return uri; + } + + @SuppressWarnings("unchecked") + private static Map emptyMap() { + return Collections.EMPTY_MAP; + } + + /** + * Removes any URI query from the given uri and return a new URI that does not contain the query portion. + * + * @param uri The URI whose query value is to be removed. + * @return a new URI that does not contain a query value. + * @throws java.net.URISyntaxException + */ + public static URI removeQuery(URI uri) throws URISyntaxException { + return createURIWithQuery(uri, null); + } + + /** + * Creates a URI with the given query, removing an previous query value from the given URI. + * + * @param uri The source URI whose existing query is replaced with the newly supplied one. + * @param query The new URI query string that should be appended to the given URI. + * @return a new URI that is a combination of the original URI and the given query string. + * @throws java.net.URISyntaxException + */ + public static URI createURIWithQuery(URI uri, String query) throws URISyntaxException { + String schemeSpecificPart = uri.getRawSchemeSpecificPart(); + // strip existing query if any + int questionMark = schemeSpecificPart.lastIndexOf("?"); + // make sure question mark is not within parentheses + if (questionMark < schemeSpecificPart.lastIndexOf(")")) { + questionMark = -1; + } + if (questionMark > 0) { + schemeSpecificPart = schemeSpecificPart.substring(0, questionMark); + } + if (query != null && query.length() > 0) { + schemeSpecificPart += "?" + query; + } + return new URI(uri.getScheme(), schemeSpecificPart, uri.getFragment()); + } + + /** + * Given a composite URI, parse the individual URI elements contained within that URI and return + * a CompsoteData instance that contains the parsed URI values. + * + * @param uri The target URI that should be parsed. + * @return a new CompsiteData instance representing the parsed composite URI. + * @throws java.net.URISyntaxException + */ + public static CompositeData parseComposite(URI uri) throws URISyntaxException { + + CompositeData rc = new CompositeData(); + rc.scheme = uri.getScheme(); + String ssp = stripPrefix(uri.getRawSchemeSpecificPart().trim(), "//").trim(); + + parseComposite(uri, rc, ssp); + + rc.fragment = uri.getFragment(); + return rc; + } + + /** + * Examine a URI and determine if it is a Composite type or not. + * + * @param uri The URI that is to be examined. + * @return true if the given URI is a Compsote type. + */ + public static boolean isCompositeURI(URI uri) { + String ssp = stripPrefix(uri.getRawSchemeSpecificPart().trim(), "//").trim(); + + if (ssp.indexOf('(') == 0 && checkParenthesis(ssp)) { + return true; + } + return false; + } + + /** + * Given a string and a position in that string of an open parend, find the matching close parend. + * + * @param str The string to be searched for a matching parend. + * @param first The index in the string of the opening parend whose close value is to be searched. + * @return the index in the string where the closing parend is located. + * @throws java.net.URISyntaxException fi the string does not contain a matching parend. + */ + public static int indexOfParenthesisMatch(String str, int first) throws URISyntaxException { + int index = -1; + + if (first < 0 || first > str.length()) { + throw new IllegalArgumentException("Invalid position for first parenthesis: " + first); + } + + if (str.charAt(first) != '(') { + throw new IllegalArgumentException("character at indicated position is not a parenthesis"); + } + + int depth = 1; + char[] array = str.toCharArray(); + for (index = first + 1; index < array.length; ++index) { + char current = array[index]; + if (current == '(') { + depth++; + } + else if (current == ')') { + if (--depth == 0) { + break; + } + } + } + + if (depth != 0) { + throw new URISyntaxException(str, "URI did not contain a matching parenthesis."); + } + + return index; + } + + /** + * Given a composite URI and a CompositeData instance and the scheme specific part extracted from the source URI, + * parse the composite URI and populate the CompositeData object with the results. The source URI is used only + * for logging as the ssp should have already been extracted from it and passed here. + * + * @param uri The original source URI whose ssp is parsed into the composite data. + * @param rc The CompsositeData instance that will be populated from the given ssp. + * @param ssp The scheme specific part from the original string that is a composite or one or more URIs. + * @throws java.net.URISyntaxException + */ + private static void parseComposite(URI uri, CompositeData rc, String ssp) throws URISyntaxException { + String componentString; + String params; + + if (!checkParenthesis(ssp)) { + throw new URISyntaxException(uri.toString(), "Not a matching number of '(' and ')' parenthesis"); + } + + int p; + int initialParen = ssp.indexOf("("); + if (initialParen == 0) { + + rc.host = ssp.substring(0, initialParen); + p = rc.host.indexOf("/"); + + if (p >= 0) { + rc.path = rc.host.substring(p); + rc.host = rc.host.substring(0, p); + } + + p = indexOfParenthesisMatch(ssp, initialParen); + componentString = ssp.substring(initialParen + 1, p); + params = ssp.substring(p + 1).trim(); + + } + else { + componentString = ssp; + params = ""; + } + + String[] components = splitComponents(componentString); + rc.components = new URI[components.length]; + for (int i = 0; i < components.length; i++) { + rc.components[i] = new URI(components[i].trim()); + } + + p = params.indexOf("?"); + if (p >= 0) { + if (p > 0) { + rc.path = stripPrefix(params.substring(0, p), "/"); + } + rc.parameters = parseQuery(params.substring(p + 1)); + } + else { + if (params.length() > 0) { + rc.path = stripPrefix(params, "/"); + } + rc.parameters = emptyMap(); + } + } + + /** + * Given the inner portion of a composite URI, split and return each inner URI as a string + * element in a new String array. + * + * @param str The inner URI elements of a composite URI string. + * @return an array containing each inner URI from the composite one. + */ + private static String[] splitComponents(String str) { + List l = new ArrayList(); + + int last = 0; + int depth = 0; + char[] chars = str.toCharArray(); + for (int i = 0; i < chars.length; i++) { + switch (chars[i]) { + case '(': + depth++; + break; + case ')': + depth--; + break; + case ',': + if (depth == 0) { + String s = str.substring(last, i); + l.add(s); + last = i + 1; + } + break; + default: + } + } + + String s = str.substring(last); + if (s.length() != 0) { + l.add(s); + } + + String[] rc = new String[l.size()]; + l.toArray(rc); + return rc; + } + + /** + * String the given prefix from the target string and return the result. + * + * @param value The string that should be trimmed of the given prefix if present. + * @param prefix The prefix to remove from the target string. + * @return either the original string or a new string minus the supplied prefix if present. + */ + public static String stripPrefix(String value, String prefix) { + if (value.startsWith(prefix)) { + return value.substring(prefix.length()); + } + return value; + } + + /** + * Strip a URI of its scheme element. + * + * @param uri The URI whose scheme value should be stripped. + * @return The stripped URI value. + * @throws java.net.URISyntaxException + */ + public static URI stripScheme(URI uri) throws URISyntaxException { + return new URI(stripPrefix(uri.getSchemeSpecificPart().trim(), "//")); + } + + /** + * Given a key / value mapping, create and return a URI formatted query string that is valid and + * can be appended to a URI. + * + * @param options The Mapping that will create the new Query string. + * @return a URI formatted query string. + * @throws java.net.URISyntaxException + */ + public static String createQueryString(Map options) throws URISyntaxException { + try { + if (options.size() > 0) { + StringBuffer rc = new StringBuffer(); + boolean first = true; + for (String key : options.keySet()) { + if (first) { + first = false; + } + else { + rc.append("&"); + } + String value = (String) options.get(key); + rc.append(URLEncoder.encode(key, "UTF-8")); + rc.append("="); + rc.append(URLEncoder.encode(value, "UTF-8")); + } + return rc.toString(); + } + else { + return ""; + } + } + catch (UnsupportedEncodingException e) { + throw (URISyntaxException) new URISyntaxException(e.toString(), "Invalid encoding").initCause(e); + } + } + + /** + * Creates a URI from the original URI and the remaining parameters. + * + * When the query options of a URI are applied to certain objects the used portion of the query options needs + * to be removed and replaced with those that remain so that other parts of the code can attempt to apply the + * remainder or give an error is unknown values were given. This method is used to update a URI with those + * remainder values. + * + * @param originalURI The URI whose current parameters are remove and replaced with the given remainder value. + * @param params The URI params that should be used to replace the current ones in the target. + * @return a new URI that matches the original one but has its query options replaced with the given ones. + * @throws java.net.URISyntaxException + */ + public static URI createRemainingURI(URI originalURI, Map params) throws URISyntaxException { + String s = createQueryString(params); + if (s.length() == 0) { + s = null; + } + return createURIWithQuery(originalURI, s); + } + + /** + * Given a URI value create and return a new URI that matches the target one but with the scheme value + * supplied to this method. + * + * @param bindAddr The URI whose scheme value should be altered. + * @param scheme The new scheme value to use for the returned URI. + * @return a new URI that is a copy of the original except that its scheme matches the supplied one. + * @throws java.net.URISyntaxException + */ + public static URI changeScheme(URI bindAddr, String scheme) throws URISyntaxException { + return new URI(scheme, bindAddr.getUserInfo(), bindAddr.getHost(), bindAddr.getPort(), bindAddr.getPath(), bindAddr.getQuery(), bindAddr.getFragment()); + } + + /** + * Examine the supplied string and ensure that all parends appear as matching pairs. + * + * @param str The target string to examine. + * @return true if the target string has valid parend pairings. + */ + public static boolean checkParenthesis(String str) { + boolean result = true; + if (str != null) { + int open = 0; + int closed = 0; + + int i = 0; + while ((i = str.indexOf('(', i)) >= 0) { + i++; + open++; + } + i = 0; + while ((i = str.indexOf(')', i)) >= 0) { + i++; + closed++; + } + result = open == closed; + } + return result; + } +} diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/URIParserTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/URIParserTest.java index 5e4849da69..f2fd107b8b 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/URIParserTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/URIParserTest.java @@ -17,14 +17,14 @@ package org.apache.activemq.artemis.utils; +import java.net.URI; +import java.util.Map; + import org.apache.activemq.artemis.utils.uri.URIFactory; import org.apache.activemq.artemis.utils.uri.URISchema; import org.junit.Assert; import org.junit.Test; -import java.net.URI; -import java.util.Map; - public class URIParserTest { /** diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 462e59753c..b4fc4937d1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -160,6 +160,9 @@ public final class ActiveMQDefaultConfiguration { // the name of the address that consumers bind to receive management notifications private static SimpleString DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS = new SimpleString("activemq.notifications"); + // The default address used for clustering + private static String DEFAULT_CLUSTER_ADDRESS = "jms"; + // Cluster username. It applies to all cluster configurations. private static String DEFAULT_CLUSTER_USER = "ACTIVEMQ.CLUSTER.ADMIN.USER"; @@ -499,6 +502,11 @@ public final class ActiveMQDefaultConfiguration { return DEFAULT_MANAGEMENT_NOTIFICATION_ADDRESS; } + /** The default Cluster address for the Cluster connection*/ + public static String getDefaultClusterAddress() { + return DEFAULT_CLUSTER_ADDRESS; + } + /** * Cluster username. It applies to all cluster configurations. */ diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/TopologyMember.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/TopologyMember.java index 26de0dc2c5..4247b4328a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/TopologyMember.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/TopologyMember.java @@ -92,4 +92,6 @@ public interface TopologyMember { */ boolean isMember(TransportConfiguration configuration); + + String toURI(); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java index 50bb50b42e..c646059ec6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java @@ -19,7 +19,11 @@ package org.apache.activemq.artemis.core.client.impl; import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.TopologyMember; +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.utils.ConfigurationHelper; + +import java.util.Map; public final class TopologyMemberImpl implements TopologyMember { @@ -117,6 +121,15 @@ public final class TopologyMemberImpl implements TopologyMember { } } + @Override + public String toURI() { + TransportConfiguration liveConnector = getLive(); + Map props = liveConnector.getParams(); + String host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, "localhost", props); + int port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, 0, props); + return "tcp://" + host + ":" + port; + } + @Override public String toString() { return "TopologyMember[id = " + nodeId + ", connector=" + connector + ", backupGroupName=" + backupGroupName + ", scaleDownGroupName=" + scaleDownGroupName + "]"; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/ConnectorTransportConfigurationParser.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/ConnectorTransportConfigurationParser.java index 650a3b8288..627575dd95 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/ConnectorTransportConfigurationParser.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/ConnectorTransportConfigurationParser.java @@ -16,12 +16,14 @@ */ package org.apache.activemq.artemis.uri; +import java.util.List; + import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.uri.schema.connector.InVMTransportConfigurationSchema; +import org.apache.activemq.artemis.uri.schema.connector.TCPTransportConfigurationSchema; import org.apache.activemq.artemis.utils.uri.URIFactory; -import java.util.List; - public class ConnectorTransportConfigurationParser extends URIFactory, String> { public ConnectorTransportConfigurationParser() { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/ServerLocatorParser.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/ServerLocatorParser.java index 2bf11a4275..3d47fdd86c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/ServerLocatorParser.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/ServerLocatorParser.java @@ -17,6 +17,10 @@ package org.apache.activemq.artemis.uri; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.uri.schema.serverLocator.InVMServerLocatorSchema; +import org.apache.activemq.artemis.uri.schema.serverLocator.JGroupsServerLocatorSchema; +import org.apache.activemq.artemis.uri.schema.serverLocator.TCPServerLocatorSchema; +import org.apache.activemq.artemis.uri.schema.serverLocator.UDPServerLocatorSchema; import org.apache.activemq.artemis.utils.uri.URIFactory; public class ServerLocatorParser extends URIFactory { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/AbstractTransportConfigurationSchema.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/AbstractTransportConfigurationSchema.java similarity index 94% rename from artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/AbstractTransportConfigurationSchema.java rename to artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/AbstractTransportConfigurationSchema.java index b76fa7fff9..a8fb1de1a7 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/AbstractTransportConfigurationSchema.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/AbstractTransportConfigurationSchema.java @@ -14,13 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.artemis.uri; +package org.apache.activemq.artemis.uri.schema.connector; + +import java.util.List; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.utils.uri.URISchema; -import java.util.List; - public abstract class AbstractTransportConfigurationSchema extends URISchema, String> { } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/InVMTransportConfigurationSchema.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/InVMTransportConfigurationSchema.java similarity index 98% rename from artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/InVMTransportConfigurationSchema.java rename to artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/InVMTransportConfigurationSchema.java index b473ba428f..8e8b966248 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/InVMTransportConfigurationSchema.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/InVMTransportConfigurationSchema.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.artemis.uri; +package org.apache.activemq.artemis.uri.schema.connector; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.utils.uri.SchemaConstants; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/TCPTransportConfigurationSchema.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/TCPTransportConfigurationSchema.java similarity index 90% rename from artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/TCPTransportConfigurationSchema.java rename to artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/TCPTransportConfigurationSchema.java index 10fda78302..309e3e40b9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/TCPTransportConfigurationSchema.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/TCPTransportConfigurationSchema.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.artemis.uri; +package org.apache.activemq.artemis.uri.schema.connector; import java.net.URI; import java.net.URISyntaxException; @@ -27,7 +27,6 @@ import java.util.Set; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; import org.apache.activemq.artemis.utils.uri.SchemaConstants; -import org.apache.activemq.artemis.utils.uri.URISchema; public class TCPTransportConfigurationSchema extends AbstractTransportConfigurationSchema { @@ -61,7 +60,7 @@ public class TCPTransportConfigurationSchema extends AbstractTransportConfigurat String factoryName) throws URISyntaxException { HashMap props = new HashMap<>(); - URISchema.setData(uri, props, allowableProperties, query); + setData(uri, props, allowableProperties, query); List transportConfigurations = new ArrayList<>(); transportConfigurations.add(new TransportConfiguration(factoryName, props, name)); @@ -72,8 +71,8 @@ public class TCPTransportConfigurationSchema extends AbstractTransportConfigurat for (String s : split) { URI extraUri = new URI(s); HashMap newProps = new HashMap<>(); - URISchema.setData(extraUri, newProps, allowableProperties, query); - URISchema.setData(extraUri, newProps, allowableProperties, URISchema.parseQuery(extraUri.getQuery(), null)); + setData(extraUri, newProps, allowableProperties, query); + setData(extraUri, newProps, allowableProperties, parseQuery(extraUri.getQuery(), null)); transportConfigurations.add(new TransportConfiguration(factoryName, newProps, name + ":" + extraUri.toString())); } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/AbstractServerLocatorSchema.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/AbstractServerLocatorSchema.java similarity index 95% rename from artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/AbstractServerLocatorSchema.java rename to artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/AbstractServerLocatorSchema.java index 3fe97bb261..e1a5f2b14d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/AbstractServerLocatorSchema.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/AbstractServerLocatorSchema.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.artemis.uri; +package org.apache.activemq.artemis.uri.schema.serverLocator; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.utils.uri.URISchema; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/ConnectionOptions.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/ConnectionOptions.java similarity index 96% rename from artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/ConnectionOptions.java rename to artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/ConnectionOptions.java index f08bfd7dde..8931dd6b49 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/ConnectionOptions.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/ConnectionOptions.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.activemq.artemis.uri; +package org.apache.activemq.artemis.uri.schema.serverLocator; /** * This will represent all the possible options you could setup on URLs diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/InVMServerLocatorSchema.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/InVMServerLocatorSchema.java similarity index 92% rename from artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/InVMServerLocatorSchema.java rename to artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/InVMServerLocatorSchema.java index 5bbad3798b..ace312aca0 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/InVMServerLocatorSchema.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/InVMServerLocatorSchema.java @@ -14,13 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.artemis.uri; +package org.apache.activemq.artemis.uri.schema.serverLocator; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.uri.schema.connector.InVMTransportConfigurationSchema; import org.apache.activemq.artemis.utils.uri.SchemaConstants; -import org.apache.activemq.artemis.utils.uri.URISchema; import java.net.URI; import java.net.URISyntaxException; @@ -37,7 +37,7 @@ public class InVMServerLocatorSchema extends AbstractServerLocatorSchema { protected ServerLocator internalNewObject(URI uri, Map query, String name) throws Exception { TransportConfiguration tc = InVMTransportConfigurationSchema.createTransportConfiguration(uri, query, name, "org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory"); ServerLocator factory = ActiveMQClient.createServerLocatorWithoutHA(tc); - return URISchema.setData(uri, factory, query); + return setData(uri, factory, query); } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/JGroupsServerLocatorSchema.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/JGroupsServerLocatorSchema.java similarity index 98% rename from artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/JGroupsServerLocatorSchema.java rename to artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/JGroupsServerLocatorSchema.java index 1749d9dd18..73a1b942c0 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/JGroupsServerLocatorSchema.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/JGroupsServerLocatorSchema.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.artemis.uri; +package org.apache.activemq.artemis.uri.schema.serverLocator; import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/TCPServerLocatorSchema.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/TCPServerLocatorSchema.java similarity index 96% rename from artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/TCPServerLocatorSchema.java rename to artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/TCPServerLocatorSchema.java index a7c76526b0..b330d0ea0e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/TCPServerLocatorSchema.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/TCPServerLocatorSchema.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.artemis.uri; +package org.apache.activemq.artemis.uri.schema.serverLocator; import java.net.URI; import java.util.List; @@ -25,9 +25,9 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.uri.schema.connector.TCPTransportConfigurationSchema; import org.apache.activemq.artemis.utils.IPV6Util; import org.apache.activemq.artemis.utils.uri.SchemaConstants; -import org.apache.activemq.artemis.utils.uri.URISchema; public class TCPServerLocatorSchema extends AbstractServerLocatorSchema { @Override @@ -52,7 +52,7 @@ public class TCPServerLocatorSchema extends AbstractServerLocatorSchema { @Override protected URI internalNewURI(ServerLocator bean) throws Exception { - String query = URISchema.getData(null, bean); + String query = getData(null, bean); TransportConfiguration[] staticConnectors = bean.getStaticTransportConfigurations(); return getURI(query, staticConnectors); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/UDPServerLocatorSchema.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/UDPServerLocatorSchema.java similarity index 85% rename from artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/UDPServerLocatorSchema.java rename to artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/UDPServerLocatorSchema.java index b3a85fce2d..a21e1a9053 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/UDPServerLocatorSchema.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/UDPServerLocatorSchema.java @@ -14,23 +14,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.artemis.uri; - -import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; -import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; -import org.apache.activemq.artemis.api.core.client.ActiveMQClient; -import org.apache.activemq.artemis.api.core.client.ServerLocator; -import org.apache.activemq.artemis.utils.uri.SchemaConstants; -import org.apache.activemq.artemis.utils.uri.URISchema; +package org.apache.activemq.artemis.uri.schema.serverLocator; import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; +import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.utils.uri.SchemaConstants; + public class UDPServerLocatorSchema extends AbstractServerLocatorSchema { - protected static List IGNORED = new ArrayList<>(); + public static List IGNORED = new ArrayList<>(); static { IGNORED.add("localBindAddress"); @@ -61,7 +60,7 @@ public class UDPServerLocatorSchema extends AbstractServerLocatorSchema { DiscoveryGroupConfiguration dgc = bean.getDiscoveryGroupConfiguration(); UDPBroadcastEndpointFactory endpoint = (UDPBroadcastEndpointFactory) dgc.getBroadcastEndpointFactory(); dgc.setBroadcastEndpointFactory(endpoint); - String query = URISchema.getData(IGNORED, bean, dgc, endpoint); + String query = getData(IGNORED, bean, dgc, endpoint); return new URI(SchemaConstants.UDP, null, endpoint.getGroupAddress(), endpoint.getGroupPort(), null, query, null); } @@ -72,11 +71,11 @@ public class UDPServerLocatorSchema extends AbstractServerLocatorSchema { String name) throws Exception { UDPBroadcastEndpointFactory endpointFactoryConfiguration = new UDPBroadcastEndpointFactory().setGroupAddress(host).setGroupPort(port); - URISchema.setData(uri, endpointFactoryConfiguration, query); + setData(uri, endpointFactoryConfiguration, query); - DiscoveryGroupConfiguration dgc = URISchema.setData(uri, new DiscoveryGroupConfiguration(), query).setName(name).setBroadcastEndpointFactory(endpointFactoryConfiguration); + DiscoveryGroupConfiguration dgc = setData(uri, new DiscoveryGroupConfiguration(), query).setName(name).setBroadcastEndpointFactory(endpointFactoryConfiguration); - URISchema.setData(uri, dgc, query); + setData(uri, dgc, query); return dgc; } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/ConfigurationHelper.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/ConfigurationHelper.java index 8771451135..166c0a7a01 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/ConfigurationHelper.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/ConfigurationHelper.java @@ -16,17 +16,17 @@ */ package org.apache.activemq.artemis.utils; -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; -import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; - import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; +import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; + public class ConfigurationHelper { - public static String getStringProperty(final String propName, final String def, final Map props) { + public static String getStringProperty(final String propName, final String def, final Map props) { if (props == null) { return def; } @@ -46,7 +46,7 @@ public class ConfigurationHelper { } } - public static int getIntProperty(final String propName, final int def, final Map props) { + public static int getIntProperty(final String propName, final int def, final Map props) { if (props == null) { return def; } @@ -71,7 +71,7 @@ public class ConfigurationHelper { } } - public static long getLongProperty(final String propName, final long def, final Map props) { + public static long getLongProperty(final String propName, final long def, final Map props) { if (props == null) { return def; } @@ -97,7 +97,7 @@ public class ConfigurationHelper { } } - public static boolean getBooleanProperty(final String propName, final boolean def, final Map props) { + public static boolean getBooleanProperty(final String propName, final boolean def, final Map props) { if (props == null) { return def; } @@ -160,7 +160,7 @@ public class ConfigurationHelper { public static String getPasswordProperty(final String propName, final String def, - final Map props, + final Map props, String defaultMaskPassword, String defaultPasswordCodec) { if (props == null) { @@ -201,4 +201,17 @@ public class ConfigurationHelper { } } + public static double getDoubleProperty(String name, double def, Map props) { + if (props == null) { + return def; + } + Object prop = props.get(name); + if (prop == null) { + return def; + } + else { + String value = prop.toString(); + return Double.parseDouble(value); + } + } } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/uri/InVMSchema.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/uri/InVMSchema.java index b161786377..4e69c4d30a 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/uri/InVMSchema.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/uri/InVMSchema.java @@ -18,6 +18,8 @@ package org.apache.activemq.artemis.uri; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.uri.schema.serverLocator.InVMServerLocatorSchema; +import org.apache.activemq.artemis.uri.schema.connector.InVMTransportConfigurationSchema; import org.apache.activemq.artemis.utils.uri.SchemaConstants; import java.net.URI; diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/uri/JGroupsSchema.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/uri/JGroupsSchema.java index a6b368037c..2874fa9b31 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/uri/JGroupsSchema.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/uri/JGroupsSchema.java @@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.JGroupsPropertiesBroadcastEndpointFactory; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.uri.schema.serverLocator.JGroupsServerLocatorSchema; import org.apache.activemq.artemis.utils.uri.SchemaConstants; import org.apache.activemq.artemis.utils.uri.URISchema; diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/uri/JMSConnectionOptions.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/uri/JMSConnectionOptions.java index 04d8e8f16c..92fb3b7686 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/uri/JMSConnectionOptions.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/uri/JMSConnectionOptions.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.uri; import org.apache.activemq.artemis.api.jms.JMSFactoryType; +import org.apache.activemq.artemis.uri.schema.serverLocator.ConnectionOptions; /** * This will represent all the possible options you could setup on URLs diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/uri/TCPSchema.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/uri/TCPSchema.java index 7a35a41ecd..5e9eb1464f 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/uri/TCPSchema.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/uri/TCPSchema.java @@ -21,6 +21,8 @@ import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.uri.schema.serverLocator.TCPServerLocatorSchema; +import org.apache.activemq.artemis.uri.schema.connector.TCPTransportConfigurationSchema; import org.apache.activemq.artemis.utils.uri.SchemaConstants; import org.apache.activemq.artemis.utils.uri.URISchema; diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/uri/UDPSchema.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/uri/UDPSchema.java index 9c83755a7e..5a6ca9a0b7 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/uri/UDPSchema.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/uri/UDPSchema.java @@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.uri.schema.serverLocator.UDPServerLocatorSchema; import org.apache.activemq.artemis.utils.uri.SchemaConstants; import org.apache.activemq.artemis.utils.uri.URISchema; diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/embedded/EmbeddedJMS.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/embedded/EmbeddedJMS.java index 10962f9f9c..7e235a6e36 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/embedded/EmbeddedJMS.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/server/embedded/EmbeddedJMS.java @@ -55,8 +55,9 @@ public class EmbeddedJMS extends EmbeddedActiveMQ { * * @param registry */ - public void setRegistry(BindingRegistry registry) { + public EmbeddedJMS setRegistry(BindingRegistry registry) { this.registry = registry; + return this; } /** @@ -64,8 +65,9 @@ public class EmbeddedJMS extends EmbeddedActiveMQ { * * @param jmsConfiguration */ - public void setJmsConfiguration(JMSConfiguration jmsConfiguration) { + public EmbeddedJMS setJmsConfiguration(JMSConfiguration jmsConfiguration) { this.jmsConfiguration = jmsConfiguration; + return this; } /** @@ -73,8 +75,9 @@ public class EmbeddedJMS extends EmbeddedActiveMQ { * * @param context */ - public void setContext(Context context) { + public EmbeddedJMS setContext(Context context) { this.context = context; + return this; } /** @@ -89,7 +92,7 @@ public class EmbeddedJMS extends EmbeddedActiveMQ { } @Override - public void start() throws Exception { + public EmbeddedJMS start() throws Exception { super.initStart(); if (jmsConfiguration != null) { serverManager = new JMSServerManagerImpl(activeMQServer, jmsConfiguration); @@ -116,11 +119,14 @@ public class EmbeddedJMS extends EmbeddedActiveMQ { } serverManager.setRegistry(registry); serverManager.start(); + + return this; } @Override - public void stop() throws Exception { + public EmbeddedJMS stop() throws Exception { serverManager.stop(); + return this; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ClusterConnectionConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ClusterConnectionConfiguration.java index d1256115ed..f3f2b25d3f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ClusterConnectionConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/ClusterConnectionConfiguration.java @@ -16,21 +16,29 @@ */ package org.apache.activemq.artemis.core.config; -import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; -import org.apache.activemq.artemis.api.core.client.ActiveMQClient; -import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; - import java.io.Serializable; +import java.net.URI; import java.util.Collections; +import java.util.LinkedList; import java.util.List; +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.uri.ClusterConnectionConfigurationParser; +import org.apache.activemq.artemis.uri.ConnectorTransportConfigurationParser; +import org.apache.activemq.artemis.utils.uri.URISupport; + public final class ClusterConnectionConfiguration implements Serializable { private static final long serialVersionUID = 8948303813427795935L; private String name; - private String address; + private String address = ActiveMQDefaultConfiguration.getDefaultClusterAddress(); private String connectorName; @@ -56,6 +64,8 @@ public final class ClusterConnectionConfiguration implements Serializable { private MessageLoadBalancingType messageLoadBalancingType = Enum.valueOf(MessageLoadBalancingType.class, ActiveMQDefaultConfiguration.getDefaultClusterMessageLoadBalancingType()); + private URISupport.CompositeData compositeMembers; + private List staticConnectors = Collections.emptyList(); private String discoveryGroupName = null; @@ -75,6 +85,11 @@ public final class ClusterConnectionConfiguration implements Serializable { public ClusterConnectionConfiguration() { } + public ClusterConnectionConfiguration(URI uri) throws Exception { + ClusterConnectionConfigurationParser parser = new ClusterConnectionConfigurationParser(); + parser.populateObject(uri, this); + } + public String getName() { return name; } @@ -93,6 +108,15 @@ public final class ClusterConnectionConfiguration implements Serializable { return this; } + public ClusterConnectionConfiguration setCompositeMembers(URISupport.CompositeData members) { + this.compositeMembers = members; + return this; + } + + public URISupport.CompositeData getCompositeMembers() { + return compositeMembers; + } + /** * @return the clientFailureCheckPeriod */ @@ -334,6 +358,72 @@ public final class ClusterConnectionConfiguration implements Serializable { return this; } + /** + * This method will match the configuration and return the proper TransportConfiguration for the Configuration + */ + public TransportConfiguration[] getTransportConfigurations(Configuration configuration) throws Exception { + + if (getCompositeMembers() != null) { + ConnectorTransportConfigurationParser connectorTransportConfigurationParser = new ConnectorTransportConfigurationParser(); + + URI[] members = getCompositeMembers().getComponents(); + + List list = new LinkedList<>(); + + for (int i = 0; i < members.length; i++) { + list.addAll(connectorTransportConfigurationParser.newObject(members[i], null)); + } + + return list.toArray(new TransportConfiguration[list.size()]); + } + else { + return configuration.getTransportConfigurations(staticConnectors); + } + } + + /** + * This method will return the proper discovery configuration from the main configuration + */ + public DiscoveryGroupConfiguration getDiscoveryGroupConfiguration(Configuration configuration) { + if (discoveryGroupName != null) { + DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations().get(discoveryGroupName); + + if (dg == null) { + ActiveMQServerLogger.LOGGER.clusterConnectionNoDiscoveryGroup(discoveryGroupName); + return null; + } + return dg; + } + else { + return null; + } + } + + public TransportConfiguration getTransportConfiguration(Configuration configuration) { + TransportConfiguration connector = configuration.getConnectorConfigurations().get(getConnectorName()); + + if (connector == null) { + ActiveMQServerLogger.LOGGER.clusterConnectionNoConnector(connectorName); + return null; + } + return connector; + } + + public boolean validateConfiguration() { + if (getName() == null) { + ActiveMQServerLogger.LOGGER.clusterConnectionNotUnique(); + return false; + } + + if (getAddress() == null) { + ActiveMQServerLogger.LOGGER.clusterConnectionNoForwardAddress(); + + return false; + } + + return true; + } + @Override public int hashCode() { final int prime = 31; @@ -440,4 +530,33 @@ public final class ClusterConnectionConfiguration implements Serializable { return false; return true; } + + @Override + public String toString() { + return "ClusterConnectionConfiguration{" + + "name='" + name + '\'' + + ", address='" + address + '\'' + + ", connectorName='" + connectorName + '\'' + + ", clientFailureCheckPeriod=" + clientFailureCheckPeriod + + ", connectionTTL=" + connectionTTL + + ", retryInterval=" + retryInterval + + ", retryIntervalMultiplier=" + retryIntervalMultiplier + + ", maxRetryInterval=" + maxRetryInterval + + ", initialConnectAttempts=" + initialConnectAttempts + + ", reconnectAttempts=" + reconnectAttempts + + ", callTimeout=" + callTimeout + + ", callFailoverTimeout=" + callFailoverTimeout + + ", duplicateDetection=" + duplicateDetection + + ", messageLoadBalancingType=" + messageLoadBalancingType + + ", compositeMembers=" + compositeMembers + + ", staticConnectors=" + staticConnectors + + ", discoveryGroupName='" + discoveryGroupName + '\'' + + ", maxHops=" + maxHops + + ", confirmationWindowSize=" + confirmationWindowSize + + ", allowDirectConnectionsOnly=" + allowDirectConnectionsOnly + + ", minLargeMessageSize=" + minLargeMessageSize + + ", clusterNotificationInterval=" + clusterNotificationInterval + + ", clusterNotificationAttempts=" + clusterNotificationAttempts + + '}'; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java index e493769fb7..d52f53f9fc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java @@ -369,6 +369,8 @@ public interface Configuration { Configuration addClusterConfiguration(final ClusterConnectionConfiguration config); + ClusterConnectionConfiguration addClusterConfiguration(String name, String uri) throws Exception; + Configuration clearClusterConfigurations(); /** @@ -911,6 +913,10 @@ public interface Configuration { * */ Configuration setResolveProtocols(boolean resolveProtocols); + TransportConfiguration[] getTransportConfigurations(String ...connectorNames); + + TransportConfiguration[] getTransportConfigurations(List connectorNames); + /* * @see #setResolveProtocols() * @return whether ActiveMQ Artemis should resolve and use any Protocols available on the classpath diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java index 0a6582fabd..a90374ecaa 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java @@ -22,9 +22,12 @@ import java.io.File; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; +import java.lang.reflect.Array; +import java.net.URI; import java.security.AccessController; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; @@ -48,6 +51,7 @@ import org.apache.activemq.artemis.core.config.StoreConfiguration; import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration; import org.apache.activemq.artemis.core.security.Role; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.SecuritySettingPlugin; import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration; @@ -90,7 +94,7 @@ public class ConfigurationImpl implements Configuration, Serializable { protected String jmxDomain = ActiveMQDefaultConfiguration.getDefaultJmxDomain(); - protected boolean jmxUseBrokerName = ActiveMQDefaultConfiguration.isDefaultJMXUseBrokerName(); + protected boolean jmxUseBrokerName = ActiveMQDefaultConfiguration.isDefaultJMXUseBrokerName(); protected long connectionTTLOverride = ActiveMQDefaultConfiguration.getDefaultConnectionTtlOverride(); @@ -491,6 +495,12 @@ public class ConfigurationImpl implements Configuration, Serializable { return this; } + public ClusterConnectionConfiguration addClusterConfiguration(String name, String uri) throws Exception { + ClusterConnectionConfiguration newConfig = new ClusterConnectionConfiguration(new URI(uri)).setName(name); + clusterConfigurations.add(newConfig); + return newConfig; + } + @Override public ConfigurationImpl clearClusterConfigurations() { clusterConfigurations.clear(); @@ -685,7 +695,6 @@ public class ConfigurationImpl implements Configuration, Serializable { return this; } - @Override public int getJournalMinFiles() { return journalMinFiles; @@ -1280,6 +1289,28 @@ public class ConfigurationImpl implements Configuration, Serializable { return this; } + public TransportConfiguration[] getTransportConfigurations(String... connectorNames) { + return getTransportConfigurations(Arrays.asList(connectorNames)); + } + + public TransportConfiguration[] getTransportConfigurations(final List connectorNames) { + TransportConfiguration[] tcConfigs = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class, connectorNames.size()); + int count = 0; + for (String connectorName : connectorNames) { + TransportConfiguration connector = getConnectorConfigurations().get(connectorName); + + if (connector == null) { + ActiveMQServerLogger.LOGGER.warn("bridgeNoConnector(connectorName)"); + + return null; + } + + tcConfigs[count++] = connector; + } + + return tcConfigs; + } + @Override public boolean isResolveProtocols() { return resolveProtocols; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index e34207ee39..793abd2678 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -409,6 +409,14 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { parseClusterConnectionConfiguration(ccNode, config); } + NodeList ccNodesURI = e.getElementsByTagName("cluster-connection-uri"); + + for (int i = 0; i < ccNodesURI.getLength(); i++) { + Element ccNode = (Element) ccNodesURI.item(i); + + parseClusterConnectionConfigurationURI(ccNode, config); + } + NodeList dvNodes = e.getElementsByTagName("divert"); for (int i = 0; i < dvNodes.getLength(); i++) { @@ -1236,7 +1244,18 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { } } - private void parseClusterConnectionConfiguration(final Element e, final Configuration mainConfig) { + private void parseClusterConnectionConfigurationURI(final Element e, final Configuration mainConfig) throws Exception { + String name = e.getAttribute("name"); + + + String uri = e.getAttribute("address"); + + ClusterConnectionConfiguration config = mainConfig.addClusterConfiguration(name, uri); + + System.out.println("Adding cluster connection :: " + config); + } + + private void parseClusterConnectionConfiguration(final Element e, final Configuration mainConfig) throws Exception { String name = e.getAttribute("name"); String address = getString(e, "address", null, Validators.NOT_NULL_OR_EMPTY); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 5b7c39a91b..c0dfde37a5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -869,7 +869,7 @@ public interface ActiveMQServerLogger extends BasicLogger { @LogMessage(level = Logger.Level.WARN) @Message(id = 222134, value = "No connector defined with name {0}. The bridge will not be deployed.", format = Message.Format.MESSAGE_FORMAT) - void bridgeNoConnector(String name); + void noConnector(String name); @LogMessage(level = Logger.Level.WARN) @Message(id = 222135, value = "Stopping Redistributor, Timed out waiting for tasks to complete", format = Message.Format.MESSAGE_FORMAT) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java index 81a3184e59..cbb4141d76 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java @@ -73,7 +73,7 @@ public class BackupManager implements ActiveMQComponent { * configuration, informing the cluster manager so that it can add it to its topology and announce itself to the cluster. * */ @Override - public synchronized void start() { + public synchronized void start() throws Exception { if (started) return; //deploy the backup connectors using the cluster configuration @@ -117,14 +117,18 @@ public class BackupManager implements ActiveMQComponent { /* * create the connectors using the cluster configurations * */ - private void deployBackupConnector(final ClusterConnectionConfiguration config) { - TransportConfiguration connector = ClusterConfigurationUtil.getTransportConfiguration(config, configuration); + private void deployBackupConnector(final ClusterConnectionConfiguration config) throws Exception { + if (!config.validateConfiguration()) { + return; + } + + TransportConfiguration connector = config.getTransportConfiguration(configuration); if (connector == null) return; if (config.getDiscoveryGroupName() != null) { - DiscoveryGroupConfiguration dg = ClusterConfigurationUtil.getDiscoveryGroupConfiguration(config, configuration); + DiscoveryGroupConfiguration dg = config.getDiscoveryGroupConfiguration(configuration); if (dg == null) return; @@ -134,7 +138,7 @@ public class BackupManager implements ActiveMQComponent { backupConnectors.add(backupConnector); } else { - TransportConfiguration[] tcConfigs = ClusterConfigurationUtil.getTransportConfigurations(config, configuration); + TransportConfiguration[] tcConfigs = config.getTransportConfigurations(configuration); StaticBackupConnector backupConnector = new StaticBackupConnector(tcConfigs, config.getName(), connector, config.getRetryInterval(), clusterManager); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConfigurationUtil.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConfigurationUtil.java deleted file mode 100644 index 0d6b761a43..0000000000 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConfigurationUtil.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.core.server.cluster; - -import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; -import org.apache.activemq.artemis.api.core.TransportConfiguration; -import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; -import org.apache.activemq.artemis.core.config.Configuration; -import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; - -import java.lang.reflect.Array; -import java.util.List; - -public class ClusterConfigurationUtil { - - public static TransportConfiguration getTransportConfiguration(ClusterConnectionConfiguration config, - Configuration configuration) { - if (config.getName() == null) { - ActiveMQServerLogger.LOGGER.clusterConnectionNotUnique(); - - return null; - } - - if (config.getAddress() == null) { - ActiveMQServerLogger.LOGGER.clusterConnectionNoForwardAddress(); - - return null; - } - - TransportConfiguration connector = configuration.getConnectorConfigurations().get(config.getConnectorName()); - - if (connector == null) { - ActiveMQServerLogger.LOGGER.clusterConnectionNoConnector(config.getConnectorName()); - return null; - } - return connector; - } - - public static DiscoveryGroupConfiguration getDiscoveryGroupConfiguration(ClusterConnectionConfiguration config, - Configuration configuration) { - DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations().get(config.getDiscoveryGroupName()); - - if (dg == null) { - ActiveMQServerLogger.LOGGER.clusterConnectionNoDiscoveryGroup(config.getDiscoveryGroupName()); - return null; - } - return dg; - } - - public static TransportConfiguration[] getTransportConfigurations(ClusterConnectionConfiguration config, - Configuration configuration) { - return config.getStaticConnectors() != null ? connectorNameListToArray(config.getStaticConnectors(), configuration) : null; - } - - public static TransportConfiguration[] connectorNameListToArray(final List connectorNames, - Configuration configuration) { - TransportConfiguration[] tcConfigs = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class, connectorNames.size()); - int count = 0; - for (String connectorName : connectorNames) { - TransportConfiguration connector = configuration.getConnectorConfigurations().get(connectorName); - - if (connector == null) { - ActiveMQServerLogger.LOGGER.bridgeNoConnector(connectorName); - - return null; - } - - tcConfigs[count++] = connector; - } - - return tcConfigs; - } -} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java index 9bb449a9a0..a85e070d91 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterManager.java @@ -16,6 +16,18 @@ */ package org.apache.activemq.artemis.core.server.cluster; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; + import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.BroadcastGroupConfiguration; @@ -54,18 +66,6 @@ import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.FutureLatch; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; - /** * A ClusterManager manages {@link ClusterConnection}s, {@link BroadcastGroup}s and {@link Bridge}s. *

@@ -432,7 +432,7 @@ public final class ClusterManager implements ActiveMQComponent { } else { - TransportConfiguration[] tcConfigs = ClusterConfigurationUtil.connectorNameListToArray(config.getStaticConnectors(), configuration); + TransportConfiguration[] tcConfigs = configuration.getTransportConfigurations(config.getStaticConnectors()); if (tcConfigs == null) { ActiveMQServerLogger.LOGGER.bridgeCantFindConnectors(config.getName()); @@ -581,10 +581,16 @@ public final class ClusterManager implements ActiveMQComponent { } private void deployClusterConnection(final ClusterConnectionConfiguration config) throws Exception { - TransportConfiguration connector = ClusterConfigurationUtil.getTransportConfiguration(config, configuration); - if (connector == null) + if (!config.validateConfiguration()) { return; + } + + TransportConfiguration connector = config.getTransportConfiguration(configuration); + + if (connector == null) { + return; + } if (clusterConnections.containsKey(config.getName())) { ActiveMQServerLogger.LOGGER.clusterConnectionAlreadyExists(config.getConnectorName()); @@ -594,7 +600,7 @@ public final class ClusterManager implements ActiveMQComponent { ClusterConnectionImpl clusterConnection; if (config.getDiscoveryGroupName() != null) { - DiscoveryGroupConfiguration dg = ClusterConfigurationUtil.getDiscoveryGroupConfiguration(config, configuration); + DiscoveryGroupConfiguration dg = config.getDiscoveryGroupConfiguration(configuration); if (dg == null) return; @@ -611,7 +617,7 @@ public final class ClusterManager implements ActiveMQComponent { clusterController.addClusterConnection(clusterConnection.getName(), dg, config); } else { - TransportConfiguration[] tcConfigs = ClusterConfigurationUtil.getTransportConfigurations(config, configuration); + TransportConfiguration[] tcConfigs = config.getTransportConfigurations(configuration); if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { ActiveMQServerLogger.LOGGER.debug(this + " defining cluster connection towards " + Arrays.toString(tcConfigs)); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java index 17777d431b..1fcbd4d277 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ScaleDownPolicy.java @@ -16,6 +16,10 @@ */ package org.apache.activemq.artemis.core.server.cluster.ha; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration; @@ -24,12 +28,6 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; - -import java.lang.reflect.Array; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; public class ScaleDownPolicy { @@ -126,20 +124,6 @@ public class ScaleDownPolicy { private static TransportConfiguration[] connectorNameListToArray(final List connectorNames, ActiveMQServer activeMQServer) { - TransportConfiguration[] tcConfigs = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class, connectorNames.size()); - int count = 0; - for (String connectorName : connectorNames) { - TransportConfiguration connector = activeMQServer.getConfiguration().getConnectorConfigurations().get(connectorName); - - if (connector == null) { - ActiveMQServerLogger.LOGGER.bridgeNoConnector(connectorName); - - return null; - } - - tcConfigs[count++] = connector; - } - - return tcConfigs; + return activeMQServer.getConfiguration().getTransportConfigurations(connectorNames); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/MessageLoadBalancingType.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/MessageLoadBalancingType.java index 9c578cfc91..d9a8688787 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/MessageLoadBalancingType.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/MessageLoadBalancingType.java @@ -16,9 +16,25 @@ */ package org.apache.activemq.artemis.core.server.cluster.impl; +import org.apache.activemq.artemis.utils.uri.URISchema; +import org.apache.commons.beanutils.Converter; + public enum MessageLoadBalancingType { OFF("OFF"), STRICT("STRICT"), ON_DEMAND("ON_DEMAND"); + static { + // for URI support on ClusterConnection + URISchema.registerConverter(new MessageLoadBalancingTypeConverter(), MessageLoadBalancingType.class); + } + + static class MessageLoadBalancingTypeConverter implements Converter { + + @Override + public T convert(Class type, Object value) { + return type.cast(MessageLoadBalancingType.getType(value.toString())); + } + } + private String type; MessageLoadBalancingType(final String type) { @@ -28,4 +44,19 @@ public enum MessageLoadBalancingType { public String getType() { return type; } + + public static MessageLoadBalancingType getType(String string) { + if (string.equals(OFF.getType())) { + return MessageLoadBalancingType.OFF; + } + else if (string.equals(STRICT.getType())) { + return MessageLoadBalancingType.STRICT; + } + else if (string.equals(ON_DEMAND.getType())) { + return MessageLoadBalancingType.ON_DEMAND; + } + else { + return null; + } + } } \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/embedded/EmbeddedActiveMQ.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/embedded/EmbeddedActiveMQ.java index fc77bdcd53..ef384e0fcc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/embedded/EmbeddedActiveMQ.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/embedded/EmbeddedActiveMQ.java @@ -18,10 +18,13 @@ package org.apache.activemq.artemis.core.server.embedded; import javax.management.MBeanServer; +import java.util.concurrent.TimeUnit; + import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.FileDeploymentManager; import org.apache.activemq.artemis.core.config.impl.FileConfiguration; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager; @@ -42,8 +45,9 @@ public class EmbeddedActiveMQ { * * @param filename */ - public void setConfigResourcePath(String filename) { + public EmbeddedActiveMQ setConfigResourcePath(String filename) { configResourcePath = filename; + return this; } /** @@ -51,8 +55,30 @@ public class EmbeddedActiveMQ { * * @param securityManager */ - public void setSecurityManager(ActiveMQSecurityManager securityManager) { + public EmbeddedActiveMQ setSecurityManager(ActiveMQSecurityManager securityManager) { this.securityManager = securityManager; + return this; + } + + /** + * It will iterate the cluster connections until you have at least the number of expected servers + * @param timeWait Time to wait on each iteration + * @param unit unit of time to wait + * @param iterations number of iterations + * @param servers number of minimal servers + * @return + */ + public boolean waitClusterForming(long timeWait, TimeUnit unit, int iterations, int servers) throws Exception { + for (int i = 0; i < iterations; i++) { + for (ClusterConnection connection : activeMQServer.getClusterManager().getClusterConnections()) { + if (connection.getTopology().getMembers().size() == servers) { + return true; + } + Thread.sleep(unit.toMillis(timeWait)); + } + } + + return false; } /** @@ -60,8 +86,9 @@ public class EmbeddedActiveMQ { * * @param mbeanServer */ - public void setMbeanServer(MBeanServer mbeanServer) { + public EmbeddedActiveMQ setMbeanServer(MBeanServer mbeanServer) { this.mbeanServer = mbeanServer; + return this; } /** @@ -70,18 +97,19 @@ public class EmbeddedActiveMQ { * * @param configuration */ - public void setConfiguration(Configuration configuration) { + public EmbeddedActiveMQ setConfiguration(Configuration configuration) { this.configuration = configuration; + return this; } public ActiveMQServer getActiveMQServer() { return activeMQServer; } - public void start() throws Exception { + public EmbeddedActiveMQ start() throws Exception { initStart(); activeMQServer.start(); - + return this; } protected void initStart() throws Exception { @@ -105,7 +133,8 @@ public class EmbeddedActiveMQ { } } - public void stop() throws Exception { + public EmbeddedActiveMQ stop() throws Exception { activeMQServer.stop(); + return this; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java index 8803e2288f..299df00ef6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java @@ -49,7 +49,6 @@ import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; -import java.lang.reflect.Array; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -392,20 +391,6 @@ public class SharedNothingLiveActivation extends LiveActivation { } private TransportConfiguration[] connectorNameListToArray(final List connectorNames) { - TransportConfiguration[] tcConfigs = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class, connectorNames.size()); - int count = 0; - for (String connectorName : connectorNames) { - TransportConfiguration connector = activeMQServer.getConfiguration().getConnectorConfigurations().get(connectorName); - - if (connector == null) { - ActiveMQServerLogger.LOGGER.bridgeNoConnector(connectorName); - - return null; - } - - tcConfigs[count++] = connector; - } - - return tcConfigs; + return activeMQServer.getConfiguration().getTransportConfigurations(connectorNames); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/uri/AcceptorTransportConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/uri/AcceptorTransportConfigurationParser.java index e8f7dce8c2..42300e45c1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/uri/AcceptorTransportConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/uri/AcceptorTransportConfigurationParser.java @@ -16,12 +16,14 @@ */ package org.apache.activemq.artemis.uri; +import java.util.List; + import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.uri.schemas.acceptor.InVMAcceptorTransportConfigurationSchema; +import org.apache.activemq.artemis.uri.schemas.acceptor.TCPAcceptorTransportConfigurationSchema; import org.apache.activemq.artemis.utils.uri.URIFactory; -import java.util.List; - public class AcceptorTransportConfigurationParser extends URIFactory, String> { public AcceptorTransportConfigurationParser() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/uri/ClusterConnectionConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/uri/ClusterConnectionConfigurationParser.java new file mode 100644 index 0000000000..e6637dee3c --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/uri/ClusterConnectionConfigurationParser.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.uri; + +import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; +import org.apache.activemq.artemis.uri.schemas.clusterConnection.ClusterConnectionMulticastSchema; +import org.apache.activemq.artemis.uri.schemas.clusterConnection.ClusterConnectionStaticSchema; +import org.apache.activemq.artemis.utils.uri.URIFactory; + +public class ClusterConnectionConfigurationParser extends URIFactory { + + public ClusterConnectionConfigurationParser() { + registerSchema(new ClusterConnectionStaticSchema()); + registerSchema(new ClusterConnectionMulticastSchema()); + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/uri/InVMAcceptorTransportConfigurationSchema.java b/artemis-server/src/main/java/org/apache/activemq/artemis/uri/schemas/acceptor/InVMAcceptorTransportConfigurationSchema.java similarity index 88% rename from artemis-server/src/main/java/org/apache/activemq/artemis/uri/InVMAcceptorTransportConfigurationSchema.java rename to artemis-server/src/main/java/org/apache/activemq/artemis/uri/schemas/acceptor/InVMAcceptorTransportConfigurationSchema.java index 72ec88ed0c..3391f665e2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/uri/InVMAcceptorTransportConfigurationSchema.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/uri/schemas/acceptor/InVMAcceptorTransportConfigurationSchema.java @@ -14,9 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.artemis.uri; +package org.apache.activemq.artemis.uri.schemas.acceptor; import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory; +import org.apache.activemq.artemis.uri.schema.connector.InVMTransportConfigurationSchema; public class InVMAcceptorTransportConfigurationSchema extends InVMTransportConfigurationSchema { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/uri/TCPAcceptorTransportConfigurationSchema.java b/artemis-server/src/main/java/org/apache/activemq/artemis/uri/schemas/acceptor/TCPAcceptorTransportConfigurationSchema.java similarity index 89% rename from artemis-server/src/main/java/org/apache/activemq/artemis/uri/TCPAcceptorTransportConfigurationSchema.java rename to artemis-server/src/main/java/org/apache/activemq/artemis/uri/schemas/acceptor/TCPAcceptorTransportConfigurationSchema.java index ecd5ea44f8..ed92ac383f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/uri/TCPAcceptorTransportConfigurationSchema.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/uri/schemas/acceptor/TCPAcceptorTransportConfigurationSchema.java @@ -14,9 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.artemis.uri; +package org.apache.activemq.artemis.uri.schemas.acceptor; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory; +import org.apache.activemq.artemis.uri.schema.connector.TCPTransportConfigurationSchema; import java.net.URI; import java.util.Set; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/uri/schemas/clusterConnection/ClusterConnectionMulticastSchema.java b/artemis-server/src/main/java/org/apache/activemq/artemis/uri/schemas/clusterConnection/ClusterConnectionMulticastSchema.java new file mode 100644 index 0000000000..f7769240c6 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/uri/schemas/clusterConnection/ClusterConnectionMulticastSchema.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.uri.schemas.clusterConnection; + +import java.net.URI; +import java.util.Map; + +import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; +import org.apache.activemq.artemis.utils.uri.URISupport; + +public class ClusterConnectionMulticastSchema extends ClusterConnectionStaticSchema { + + // nothing different ATM. This is like a placeholder for future changes + @Override + public String getSchemaName() { + return "multicast"; + } + + @Override + public void populateObject(URI uri, ClusterConnectionConfiguration bean) throws Exception { + + if (URISupport.isCompositeURI(uri)) { + super.populateObject(uri, bean); + } + else { + bean.setDiscoveryGroupName(uri.getHost()); + Map parameters = URISupport.parseParameters(uri); + setData(uri, bean, parameters); + + } + } + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/uri/schemas/clusterConnection/ClusterConnectionStaticSchema.java b/artemis-server/src/main/java/org/apache/activemq/artemis/uri/schemas/clusterConnection/ClusterConnectionStaticSchema.java new file mode 100644 index 0000000000..f81aa580f9 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/uri/schemas/clusterConnection/ClusterConnectionStaticSchema.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.uri.schemas.clusterConnection; + +import java.net.URI; +import java.util.Map; + +import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; +import org.apache.activemq.artemis.utils.uri.URISchema; +import org.apache.activemq.artemis.utils.uri.URISupport; + +public class ClusterConnectionStaticSchema extends URISchema { + + @Override + public String getSchemaName() { + return "static"; + } + + @Override + protected ClusterConnectionConfiguration internalNewObject(URI uri, + Map query, + String param) throws Exception { + + + ClusterConnectionConfiguration configuration = new ClusterConnectionConfiguration(); + + populateObject(uri, configuration); + + return configuration; + } + + + @Override + public void populateObject(URI uri, ClusterConnectionConfiguration bean) throws Exception { + URISupport.CompositeData compositeData = URISupport.parseComposite(uri); + bean.setCompositeMembers(compositeData); + + setData(uri, bean, compositeData.getParameters()); + } + + + @Override + protected URI internalNewURI(ClusterConnectionConfiguration bean) throws Exception { + return null; + } +} diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 8703e7bbe3..2f7145468a 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -458,18 +458,12 @@ - + a list of cluster connections - - - - - @@ -1181,6 +1175,33 @@ + + + + + + + + + + + + + + + uri of the cluster connection + + + + + + + name of the cluster connection + + + + + @@ -1388,6 +1409,31 @@ + + + + The URI for the cluster connection options + + + + + + + + + + + unique name for this cluster connection + + + + + + + The URI for the cluster connection options + + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java index 84441a2fdd..44f1c61c3e 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java @@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.config.FileDeploymentManager; import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec; +import org.junit.Assert; import org.junit.Test; public class FileConfigurationParserTest extends ActiveMQTestBase { @@ -69,6 +70,23 @@ public class FileConfigurationParserTest extends ActiveMQTestBase { deploymentManager.readConfiguration(); } + @Test + public void testParsingClusterConnectionURIs() throws Exception { + FileConfigurationParser parser = new FileConfigurationParser(); + + String configStr = firstPart + "\n" + + " \n" + + "\n" + lastPart; + ByteArrayInputStream input = new ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8)); + + Configuration config = parser.parseMainConfig(input); + + Assert.assertEquals(1, config.getClusterConfigurations().size()); + + Assert.assertEquals("my-discovery-group", config.getClusterConfigurations().get(0).getDiscoveryGroupName()); + Assert.assertEquals(333, config.getClusterConfigurations().get(0).getRetryInterval()); + } + @Test public void testParsingDefaultServerConfig() throws Exception { FileConfigurationParser parser = new FileConfigurationParser(); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/uri/ClusterConnectionConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/uri/ClusterConnectionConfigurationTest.java new file mode 100644 index 0000000000..771d89cd3f --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/uri/ClusterConnectionConfigurationTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.uri; + +import java.net.URI; + +import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.junit.Assert; +import org.junit.Test; + +public class ClusterConnectionConfigurationTest { + + @Test + public void testClusterConnectionStatic() throws Exception { + ClusterConnectionConfigurationParser parser = new ClusterConnectionConfigurationParser(); + ClusterConnectionConfiguration configuration = parser.newObject(new URI("static:(tcp://localhost:6556,tcp://localhost:6557)?minLargeMessageSize=132;s&messageLoadBalancingType=OFF"), null); + Assert.assertEquals(MessageLoadBalancingType.OFF, configuration.getMessageLoadBalancingType()); + Assert.assertEquals(132, configuration.getMinLargeMessageSize()); + Assert.assertEquals("tcp://localhost:6556", configuration.getCompositeMembers().getComponents()[0].toString()); + Assert.assertEquals("tcp://localhost:6557", configuration.getCompositeMembers().getComponents()[1].toString()); + } + + @Test + public void testClusterConnectionStatic2() throws Exception { + ClusterConnectionConfigurationParser parser = new ClusterConnectionConfigurationParser(); + ClusterConnectionConfiguration configuration = parser.newObject(new URI("static://(tcp://localhost:6556,tcp://localhost:6557)?minLargeMessageSize=132;messageLoadBalancingType=OFF"), null); + Assert.assertEquals(132, configuration.getMinLargeMessageSize()); + Assert.assertEquals(MessageLoadBalancingType.OFF, configuration.getMessageLoadBalancingType()); + Assert.assertEquals(2, configuration.getCompositeMembers().getComponents().length); + Assert.assertEquals("tcp://localhost:6556", configuration.getCompositeMembers().getComponents()[0].toString()); + Assert.assertEquals("tcp://localhost:6557", configuration.getCompositeMembers().getComponents()[1].toString()); + } + + @Test + public void testClusterConnectionStaticOnConstrcutor() throws Exception { + ClusterConnectionConfiguration configuration = new ClusterConnectionConfiguration(new URI("static:(tcp://localhost:6556,tcp://localhost:6557)?minLargeMessageSize=132")); + Assert.assertEquals(132, configuration.getMinLargeMessageSize()); + Assert.assertEquals("tcp://localhost:6556", configuration.getCompositeMembers().getComponents()[0].toString()); + Assert.assertEquals("tcp://localhost:6557", configuration.getCompositeMembers().getComponents()[1].toString()); + } + + @Test + public void testClusterConnectionMulticast() throws Exception { + ClusterConnectionConfigurationParser parser = new ClusterConnectionConfigurationParser(); + ClusterConnectionConfiguration configuration = parser.newObject(new URI("multicast://myGroup?minLargeMessageSize=132"), null); + Assert.assertEquals("myGroup", configuration.getDiscoveryGroupName()); + Assert.assertEquals(132, configuration.getMinLargeMessageSize()); + } +} diff --git a/examples/features/clustered/clustered-static-discovery-uri/pom.xml b/examples/features/clustered/clustered-static-discovery-uri/pom.xml new file mode 100644 index 0000000000..949d87fb6c --- /dev/null +++ b/examples/features/clustered/clustered-static-discovery-uri/pom.xml @@ -0,0 +1,244 @@ + + + + + 4.0.0 + + + org.apache.activemq.examples.clustered + broker-clustered + 1.1.1-SNAPSHOT + + + clustered-static-discovery-uri + jar + ActiveMQ Artemis JMS Clustered Static Discovery Example + + + ${project.basedir}/../../../.. + + + + + org.apache.activemq + artemis-cli + ${project.version} + + + org.apache.activemq + artemis-jms-client + ${project.version} + + + + + + + org.apache.activemq + artemis-maven-plugin + + + create0 + + create + + + ${noServer} + ${basedir}/target/server0 + ${basedir}/target/classes/activemq/server0 + + -Djava.net.preferIPv4Stack=true + + + + create1 + + create + + + ${noServer} + ${basedir}/target/server1 + ${basedir}/target/classes/activemq/server1 + + -Djava.net.preferIPv4Stack=true + + + + create2 + + create + + + ${noServer} + ${basedir}/target/server2 + ${basedir}/target/classes/activemq/server2 + + -Djava.net.preferIPv4Stack=true + + + + create3 + + create + + + ${noServer} + ${basedir}/target/server3 + ${basedir}/target/classes/activemq/server3 + + + + start0 + + cli + + + ${noServer} + true + ${basedir}/target/server0 + tcp://localhost:61616 + + run + + server0 + + + + start1 + + cli + + + true + ${noServer} + ${basedir}/target/server1 + tcp://localhost:61617 + + run + + server1 + + + + start2 + + cli + + + ${noServer} + true + ${basedir}/target/server2 + tcp://localhost:61618 + + run + + server2 + + + + start3 + + cli + + + ${noServer} + true + ${basedir}/target/server3 + tcp://localhost:61619 + + run + + server3 + + + + runClient + + runClient + + + org.apache.activemq.artemis.jms.example.StaticClusteredQueueExample + + + + stop0 + + cli + + + ${noServer} + ${basedir}/target/server0 + + stop + + + + + stop1 + + cli + + + ${noServer} + ${basedir}/target/server1 + + stop + + + + + stop2 + + cli + + + ${noServer} + ${basedir}/target/server2 + + stop + + + + + stop3 + + cli + + + ${noServer} + ${basedir}/target/server3 + + stop + + + + + + + org.apache.activemq.examples.clustered + clustered-static-discovery-uri + ${project.version} + + + + + + + diff --git a/examples/features/clustered/clustered-static-discovery-uri/readme.html b/examples/features/clustered/clustered-static-discovery-uri/readme.html new file mode 100644 index 0000000000..88931b9eba --- /dev/null +++ b/examples/features/clustered/clustered-static-discovery-uri/readme.html @@ -0,0 +1,58 @@ + + + + + ActiveMQ Artemis JMS Load Balanced Static Clustered Queue Example + + + + + +

JMS Load Balanced Static Clustered Queue Example

+
To run the example, simply type mvn verify from this directory, 
or mvn -PnoServer verify if you want to start and create the server manually.
+ +

This example demonstrates a JMS queue deployed on two different nodes. The two nodes are configured to form a cluster + from a static list of nodes.

+

We then create a consumer on the queue on each node, and we create a producer on only one of the nodes.

+

We then send some messages via the producer, and we verify that both consumers receive the sent messages + in a round-robin fashion.

+

In other words, ActiveMQ Artemis load balances the sent messages across all consumers on the cluster

+

This example uses JNDI to lookup the JMS Queue and ConnectionFactory objects. If you prefer not to use + JNDI, these could be instantiated directly.

+

Here's the relevant snippet from the server configuration, which tells the server to form a cluster between the two nodes + and to load balance the messages between the nodes.

+
+     <cluster-connection name="my-cluster">
+        <address>jms</address>
+        <connector-ref>netty-connector</connector-ref>
+        <retry-interval>500</retry-interval>
+        <use-duplicate-detection>true</use-duplicate-detection>
+        <message-load-balancing>STRICT</message-load-balancing>
+        <max-hops>1</max-hops>
+        <static-connectors>
+           <connector-ref>server1-connector</connector-ref>
+        </static-connectors>
+     </cluster-connection>
+     
+     
+

For more information on ActiveMQ Artemis load balancing, and clustering in general, please see the clustering + section of the user manual.

+ + diff --git a/examples/features/clustered/clustered-static-discovery-uri/src/main/java/org/apache/activemq/artemis/jms/example/StaticClusteredQueueExample.java b/examples/features/clustered/clustered-static-discovery-uri/src/main/java/org/apache/activemq/artemis/jms/example/StaticClusteredQueueExample.java new file mode 100644 index 0000000000..453fafc6a6 --- /dev/null +++ b/examples/features/clustered/clustered-static-discovery-uri/src/main/java/org/apache/activemq/artemis/jms/example/StaticClusteredQueueExample.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jms.example; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.util.ServerUtil; + +/** + * A simple example that demonstrates server side load-balancing of messages between the queue instances on different + * nodes of the cluster. The cluster is created from a static list of nodes. + */ +public class StaticClusteredQueueExample { + + public static void main(final String[] args) throws Exception { + Connection initialConnection = null; + + Connection connection0 = null; + + Connection connection1 = null; + + Connection connection2 = null; + + Connection connection3 = null; + + try { + // Step 2. Use direct instantiation (or JNDI if you like) + Queue queue = ActiveMQJMSClient.createQueue("exampleQueue"); + + // Step 3. new JMS Connection Factory object from JNDI on server 3 + ConnectionFactory cf0 = new ActiveMQConnectionFactory("tcp://localhost:61619"); + + //grab an initial connection and wait, in reality you wouldn't do it this way but since we want to ensure an + // equal load balance we do this and then create 4 connections round robined + initialConnection = cf0.createConnection(); + + Thread.sleep(2000); + // Step 6. We create a JMS Connection connection0 which is a connection to server 0 + connection0 = cf0.createConnection(); + + // Step 7. We create a JMS Connection connection1 which is a connection to server 1 + connection1 = cf0.createConnection(); + + // Step 6. We create a JMS Connection connection0 which is a connection to server 0 + connection2 = cf0.createConnection(); + + // Step 7. We create a JMS Connection connection1 which is a connection to server 1 + connection3 = cf0.createConnection(); + + // Step 8. We create a JMS Session on server 0 + Session session0 = connection0.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Step 9. We create a JMS Session on server 1 + Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Step 8. We create a JMS Session on server 0 + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Step 9. We create a JMS Session on server 1 + Session session3 = connection3.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Step 10. We start the connections to ensure delivery occurs on them + connection0.start(); + + connection1.start(); + + connection2.start(); + + connection3.start(); + + // Step 11. We create JMS MessageConsumer objects on server 0 and server 1 + MessageConsumer consumer0 = session0.createConsumer(queue); + + MessageConsumer consumer1 = session1.createConsumer(queue); + + MessageConsumer consumer2 = session2.createConsumer(queue); + + MessageConsumer consumer3 = session3.createConsumer(queue); + + Thread.sleep(2000); + + // Step 12. We create a JMS MessageProducer object on server 3 + MessageProducer producer = session3.createProducer(queue); + + // Step 13. We send some messages to server 0 + + final int numMessages = 20; + + for (int i = 0; i < numMessages; i++) { + TextMessage message = session0.createTextMessage("This is text message " + i); + + producer.send(message); + + System.out.println("Sent message: " + message.getText()); + } + Thread.sleep(2000); + // Step 14. We now consume those messages on *both* server 0 and server 1. + // We note the messages have been distributed between servers in a round robin fashion + // JMS Queues implement point-to-point message where each message is only ever consumed by a + // maximum of one consumer + int con0Node = ServerUtil.getServer(connection0); + int con1Node = ServerUtil.getServer(connection1); + int con2Node = ServerUtil.getServer(connection2); + int con3Node = ServerUtil.getServer(connection3); + + if (con0Node + con1Node + con2Node + con3Node != 6) { + throw new IllegalStateException(); + } + for (int i = 0; i < numMessages; i += 4) { + TextMessage message0 = (TextMessage) consumer0.receive(5000); + + System.out.println("Got message: " + message0.getText() + " from node " + con0Node); + + TextMessage message1 = (TextMessage) consumer1.receive(5000); + + System.out.println("Got message: " + message1.getText() + " from node " + con1Node); + + TextMessage message2 = (TextMessage) consumer2.receive(5000); + + System.out.println("Got message: " + message2.getText() + " from node " + con2Node); + + TextMessage message3 = (TextMessage) consumer3.receive(5000); + + System.out.println("Got message: " + message3.getText() + " from node " + con3Node); + } + } + finally { + // Step 15. Be sure to close our resources! + + if (initialConnection != null) { + initialConnection.close(); + } + + if (connection0 != null) { + connection0.close(); + } + + if (connection1 != null) { + connection1.close(); + } + + if (connection2 != null) { + connection2.close(); + } + + if (connection3 != null) { + connection3.close(); + } + } + } +} diff --git a/examples/features/clustered/clustered-static-discovery-uri/src/main/resources/activemq/server0/broker.xml b/examples/features/clustered/clustered-static-discovery-uri/src/main/resources/activemq/server0/broker.xml new file mode 100644 index 0000000000..be093ab55a --- /dev/null +++ b/examples/features/clustered/clustered-static-discovery-uri/src/main/resources/activemq/server0/broker.xml @@ -0,0 +1,70 @@ + + + + + + + + + + + + + ./data/bindings + + ./data/journal + + ./data/largemessages + + ./data/paging + + + + + tcp://localhost:61616 + + + + + tcp://localhost:61616 + + + + + + + + + + + + + + + + + + + + + + diff --git a/examples/features/clustered/clustered-static-discovery-uri/src/main/resources/activemq/server1/broker.xml b/examples/features/clustered/clustered-static-discovery-uri/src/main/resources/activemq/server1/broker.xml new file mode 100644 index 0000000000..fc3bb2da1b --- /dev/null +++ b/examples/features/clustered/clustered-static-discovery-uri/src/main/resources/activemq/server1/broker.xml @@ -0,0 +1,70 @@ + + + + + + + + + + + + + ./data/bindings + + ./data/journal + + ./data/largemessages + + ./data/paging + + + + tcp://localhost:61617 + + + + + tcp://localhost:61617 + + + + + + + + + + + + + + + + + + + + + + + diff --git a/examples/features/clustered/clustered-static-discovery-uri/src/main/resources/activemq/server2/broker.xml b/examples/features/clustered/clustered-static-discovery-uri/src/main/resources/activemq/server2/broker.xml new file mode 100644 index 0000000000..608796f190 --- /dev/null +++ b/examples/features/clustered/clustered-static-discovery-uri/src/main/resources/activemq/server2/broker.xml @@ -0,0 +1,66 @@ + + + + + + + + + + + + ./data/bindings + + ./data/journal + + ./data/largemessages + + ./data/paging + + + + tcp://localhost:61618 + + + + + tcp://localhost:61618 + + + + + + + + + + + + + + + + + + + + + + diff --git a/examples/features/clustered/clustered-static-discovery-uri/src/main/resources/activemq/server3/broker.xml b/examples/features/clustered/clustered-static-discovery-uri/src/main/resources/activemq/server3/broker.xml new file mode 100644 index 0000000000..950f8cdcda --- /dev/null +++ b/examples/features/clustered/clustered-static-discovery-uri/src/main/resources/activemq/server3/broker.xml @@ -0,0 +1,67 @@ + + + + + + + + + + + + ./data/bindings + + ./data/journal + + ./data/largemessages + + ./data/paging + + + + tcp://localhost:61619 + + + + + tcp://localhost:61619 + + + + + + + + + + + + + + + + + + + + + + + diff --git a/examples/features/clustered/clustered-topic-uri/pom.xml b/examples/features/clustered/clustered-topic-uri/pom.xml new file mode 100644 index 0000000000..5dd10e1b40 --- /dev/null +++ b/examples/features/clustered/clustered-topic-uri/pom.xml @@ -0,0 +1,156 @@ + + + + + 4.0.0 + + + org.apache.activemq.examples.clustered + broker-clustered + 1.1.1-SNAPSHOT + + + clustered-topic-uri + jar + ActiveMQ Artemis JMS Clustered Topic Example + + + ${project.basedir}/../../../.. + + + + + org.apache.activemq + artemis-jms-client + ${project.version} + + + + + + + org.apache.activemq + artemis-maven-plugin + + + create0 + + create + + + ${noServer} + ${basedir}/target/server0 + ${basedir}/target/classes/activemq/server0 + + -Djava.net.preferIPv4Stack=true + + + + create1 + + create + + + ${noServer} + ${basedir}/target/server1 + ${basedir}/target/classes/activemq/server1 + + -Djava.net.preferIPv4Stack=true + + + + start0 + + cli + + + ${noServer} + true + ${basedir}/target/server0 + tcp://localhost:61616 + + run + + server0 + + + + start1 + + cli + + + ${noServer} + true + ${basedir}/target/server1 + tcp://localhost:61617 + + run + + server1 + + + + runClient + + runClient + + + org.apache.activemq.artemis.jms.example.ClusteredTopicExample + + + + stop0 + + cli + + + ${noServer} + ${basedir}/target/server0 + + stop + + + + + stop1 + + cli + + + ${noServer} + ${basedir}/target/server1 + + stop + + + + + + + org.apache.activemq.examples.clustered + clustered-topic-uri + ${project.version} + + + + + + diff --git a/examples/features/clustered/clustered-topic-uri/readme.html b/examples/features/clustered/clustered-topic-uri/readme.html new file mode 100644 index 0000000000..e7998a28ff --- /dev/null +++ b/examples/features/clustered/clustered-topic-uri/readme.html @@ -0,0 +1,46 @@ + + + + + ActiveMQ Artemis JMS Clustered Topic Example + + + + + +

JMS Clustered Topic Example

+
To run the example, simply type mvn verify from this directory, 
or mvn -PnoServer verify if you want to start and create the server manually.
+ +

This example demonstrates a JMS Topic deployed on two different nodes. The two nodes are configured to form a cluster.

+

We then create a subscriber on the topic on each node, and we create a producer on only one of the nodes.

+

We then send some messages via the producer, and we verify that both subscribers receive all the + sent messages.

+

A JMS Topic is an example of publish-subscribe messaging where all subscribers receive all the + messages sent to the topic (assuming they have no message selectors).

+

This example uses JNDI to lookup the JMS Queue and ConnectionFactory objects. If you prefer not to use + JNDI, these could be instantiated directly. +

Here's the relevant snippet from the server configuration, which tells the server to form a cluster between the two nodes + and to load balance the messages between the nodes.

+

This example differes from different-topic as it will use an URI to define the cluster connection.

+
<cluster-connection-uri name="my-cluster" address="uri="multicast://my-discovery-group?messageLoadBalancingType=STRICT;retryInterval=500;connectorName=netty-connector;maxHops=1"/>
+

For more information on ActiveMQ Artemis load balancing, and clustering in general, please see the clustering + section of the user manual.

+ + diff --git a/examples/features/clustered/clustered-topic-uri/src/main/java/org/apache/activemq/artemis/jms/example/ClusteredTopicExample.java b/examples/features/clustered/clustered-topic-uri/src/main/java/org/apache/activemq/artemis/jms/example/ClusteredTopicExample.java new file mode 100644 index 0000000000..f3ce9a05d4 --- /dev/null +++ b/examples/features/clustered/clustered-topic-uri/src/main/java/org/apache/activemq/artemis/jms/example/ClusteredTopicExample.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.jms.example; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.naming.InitialContext; + +import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; + +/** + * A simple example that shows a JMS Topic clustered across two nodes of a cluster. + * Messages are sent on one node and received by consumers on both nodes. + */ +public class ClusteredTopicExample { + + public static void main(final String[] args) throws Exception { + Connection connection0 = null; + + Connection connection1 = null; + + InitialContext ic0 = null; + + InitialContext ic1 = null; + + try { + + // Step 1. Instantiate topic + Topic topic = ActiveMQJMSClient.createTopic("exampleTopic"); + + // Step 2. Look-up a JMS Connection Factory object from JNDI on server 0 + ConnectionFactory cf0 = new ActiveMQConnectionFactory("tcp://localhost:61616"); + + // Step 3. Look-up a JMS Connection Factory object from JNDI on server 1 + ConnectionFactory cf1 = new ActiveMQConnectionFactory("tcp://localhost:61617"); + + // Step 4. We create a JMS Connection connection0 which is a connection to server 0 + connection0 = cf0.createConnection(); + + // Step 5. We create a JMS Connection connection1 which is a connection to server 1 + connection1 = cf1.createConnection(); + + // Step 6. We create a JMS Session on server 0 + Session session0 = connection0.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Step 7. We create a JMS Session on server 1 + Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Step 8. We start the connections to ensure delivery occurs on them + connection0.start(); + + connection1.start(); + + // Step 9. We create JMS MessageConsumer objects on server 0 and server 1 + MessageConsumer consumer0 = session0.createConsumer(topic); + + MessageConsumer consumer1 = session1.createConsumer(topic); + + Thread.sleep(1000); + + // Step 10. We create a JMS MessageProducer object on server 0 + MessageProducer producer = session0.createProducer(topic); + + // Step 11. We send some messages to server 0 + + final int numMessages = 10; + + for (int i = 0; i < numMessages; i++) { + TextMessage message = session0.createTextMessage("This is text message " + i); + + producer.send(message); + + System.out.println("Sent message: " + message.getText()); + } + + // Step 12. We now consume those messages on *both* server 0 and server 1. + // We note that all messages have been consumed by *both* consumers. + // JMS Topics implement *publish-subscribe* messaging where all consumers get a copy of all messages + + for (int i = 0; i < numMessages; i++) { + TextMessage message0 = (TextMessage) consumer0.receive(5000); + + System.out.println("Got message: " + message0.getText() + " from node 0"); + + TextMessage message1 = (TextMessage) consumer1.receive(5000); + + System.out.println("Got message: " + message1.getText() + " from node 1"); + } + } + finally { + // Step 15. Be sure to close our JMS resources! + if (connection0 != null) { + connection0.close(); + } + + if (connection1 != null) { + connection1.close(); + } + + if (ic0 != null) { + ic0.close(); + } + + if (ic1 != null) { + ic1.close(); + } + } + } +} diff --git a/examples/features/clustered/clustered-topic-uri/src/main/resources/activemq/server0/broker.xml b/examples/features/clustered/clustered-topic-uri/src/main/resources/activemq/server0/broker.xml new file mode 100644 index 0000000000..546e2e635a --- /dev/null +++ b/examples/features/clustered/clustered-topic-uri/src/main/resources/activemq/server0/broker.xml @@ -0,0 +1,89 @@ + + + + + + + + + + + + + ./data/bindings + + ./data/journal + + ./data/largemessages + + ./data/paging + + + + + tcp://localhost:61616 + + + + + tcp://localhost:61616 + + + + + + + ${udp-address:231.7.7.7} + 9876 + 100 + netty-connector + + + + + + ${udp-address:231.7.7.7} + 9876 + 10000 + + + + + + + + + + + + + + + + + + + + + + + diff --git a/examples/features/clustered/clustered-topic-uri/src/main/resources/activemq/server1/broker.xml b/examples/features/clustered/clustered-topic-uri/src/main/resources/activemq/server1/broker.xml new file mode 100644 index 0000000000..c7009f7b72 --- /dev/null +++ b/examples/features/clustered/clustered-topic-uri/src/main/resources/activemq/server1/broker.xml @@ -0,0 +1,87 @@ + + + + + + + + + + + + + ./data/bindings + + ./data/journal + + ./data/largemessages + + ./data/paging + + + + tcp://localhost:61617 + + + + + tcp://localhost:61617 + + + + + + ${udp-address:231.7.7.7} + 9876 + 100 + netty-connector + + + + + + ${udp-address:231.7.7.7} + 9876 + 10000 + + + + + + + + + + + + + + + + + + + + + + + diff --git a/examples/features/clustered/pom.xml b/examples/features/clustered/pom.xml index c17545aec7..e6f5a35372 100644 --- a/examples/features/clustered/pom.xml +++ b/examples/features/clustered/pom.xml @@ -54,7 +54,9 @@ under the License. clustered-queue clustered-static-oneway clustered-static-discovery + clustered-static-discovery-uri clustered-topic + clustered-topic-uri queue-message-redistribution symmetric-cluster @@ -69,7 +71,9 @@ under the License. clustered-queue clustered-static-oneway clustered-static-discovery + clustered-static-discovery-uri clustered-topic + clustered-topic-uri queue-message-redistribution symmetric-cluster diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java index ec174356a9..a861b23bec 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.cluster.distribution; import java.io.File; import java.io.PrintWriter; import java.io.StringWriter; +import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -1691,6 +1692,23 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { config.getClusterConfigurations().add(clusterConf); } + + protected void setupClusterConnection(final String name, + final String uri, + int server) throws Exception { + ActiveMQServer serverFrom = servers[server]; + + if (serverFrom == null) { + throw new IllegalStateException("No server at node " + server); + } + + ClusterConnectionConfiguration configuration = new ClusterConnectionConfiguration(new URI(uri)).setName(name); + + serverFrom.getConfiguration().addClusterConfiguration(configuration); + + + } + protected void setupClusterConnection(final String name, final String address, final MessageLoadBalancingType messageLoadBalancingType, diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/URISimpleClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/URISimpleClusterTest.java new file mode 100644 index 0000000000..cbf49cadd2 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/URISimpleClusterTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.distribution; + +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; +import org.junit.Before; +import org.junit.Test; + +/** + * A SymmetricClusterTest + * + * Most of the cases are covered in OneWayTwoNodeClusterTest - we don't duplicate them all here + */ +public class URISimpleClusterTest extends ClusterTestBase { + + private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + setupServers(); + } + + protected boolean isNetty() { + return true; + } + + @Test + public void testBasicRoundRobin() throws Exception { + setupCluster(MessageLoadBalancingType.ON_DEMAND); + + startServers(); + + setupSessionFactory(0, isNetty()); + setupSessionFactory(1, isNetty()); + setupSessionFactory(2, isNetty()); + setupSessionFactory(3, isNetty()); + setupSessionFactory(4, isNetty()); + + createQueue(0, "queues.testaddress", "queue0", null, false); + createQueue(1, "queues.testaddress", "queue0", null, false); + createQueue(2, "queues.testaddress", "queue0", null, false); + createQueue(3, "queues.testaddress", "queue0", null, false); + createQueue(4, "queues.testaddress", "queue0", null, false); + + addConsumer(0, 0, "queue0", null); + addConsumer(1, 1, "queue0", null); + addConsumer(2, 2, "queue0", null); + addConsumer(3, 3, "queue0", null); + addConsumer(4, 4, "queue0", null); + + waitForBindings(0, "queues.testaddress", 1, 1, true); + waitForBindings(1, "queues.testaddress", 1, 1, true); + waitForBindings(2, "queues.testaddress", 1, 1, true); + waitForBindings(3, "queues.testaddress", 1, 1, true); + waitForBindings(4, "queues.testaddress", 1, 1, true); + + waitForBindings(0, "queues.testaddress", 4, 4, false); + waitForBindings(1, "queues.testaddress", 4, 4, false); + waitForBindings(2, "queues.testaddress", 4, 4, false); + waitForBindings(3, "queues.testaddress", 4, 4, false); + waitForBindings(4, "queues.testaddress", 4, 4, false); + + send(0, "queues.testaddress", 10, false, null); + + verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4); + + verifyNotReceive(0, 1, 2, 3, 4); + } + + protected static String generateURI(int serverID) { + return "tcp://127.0.0.1:" + (org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + serverID); + } + + protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception { + for (int i = 0; i < 5; i++) { + servers[i].getConfiguration().addConnectorConfiguration("netty-connector", generateURI(i)); + } + + setupClusterConnection("cluster", "static://(" + generateURI(1) + "," + generateURI(2) + "," + generateURI(3) + "," + generateURI(4) + ")?connectorName=netty-connector;retryInterval=500;messageLoadBalancingType=" + messageLoadBalancingType.toString() + ";maxHops=1;address=queues", 0); + setupClusterConnection("cluster", "static://(" + generateURI(0) + "," + generateURI(2) + "," + generateURI(3) + "," + generateURI(4) + ")?connectorName=netty-connector;retryInterval=500;messageLoadBalancingType=" + messageLoadBalancingType.toString() + ";maxHops=1;address=queues", 1); + setupClusterConnection("cluster", "static://(" + generateURI(0) + "," + generateURI(1) + "," + generateURI(3) + "," + generateURI(4) + ")?connectorName=netty-connector;retryInterval=500;messageLoadBalancingType=" + messageLoadBalancingType.toString() + ";maxHops=1;address=queues", 2); + setupClusterConnection("cluster", "static://(" + generateURI(0) + "," + generateURI(1) + "," + generateURI(2) + "," + generateURI(4) + ")?connectorName=netty-connector;retryInterval=500;messageLoadBalancingType=" + messageLoadBalancingType.toString() + ";maxHops=1;address=queues", 3); + setupClusterConnection("cluster", "static://(" + generateURI(0) + "," + generateURI(1) + "," + generateURI(2) + "," + generateURI(3) + ")?connectorName=netty-connector;retryInterval=500;messageLoadBalancingType=" + messageLoadBalancingType.toString() + ";maxHops=1;address=queues", 4); + } + + protected void setupServers() throws Exception { + setupServer(0, isFileStorage(), isNetty()); + setupServer(1, isFileStorage(), isNetty()); + setupServer(2, isFileStorage(), isNetty()); + setupServer(3, isFileStorage(), isNetty()); + setupServer(4, isFileStorage(), isNetty()); + } + + protected void startServers() throws Exception { + startServers(0, 1, 2, 3, 4); + } + + protected void stopServers() throws Exception { + closeAllConsumers(); + + closeAllSessionFactories(); + + closeAllServerLocatorsFactories(); + + stopServers(0, 1, 2, 3, 4); + } + + @Override + protected boolean isFileStorage() { + return false; + } +}