diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/BeanSupport.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/BeanSupport.java new file mode 100644 index 0000000000..4985c651e1 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/BeanSupport.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils.uri; + +import java.beans.PropertyDescriptor; +import java.io.UnsupportedEncodingException; +import java.net.URI; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.beanutils.BeanUtilsBean; +import org.apache.commons.beanutils.Converter; + +public class BeanSupport { + + private static final BeanUtilsBean beanUtils = new BeanUtilsBean(); + + static { + // This is to customize the BeanUtils to use Fluent Proeprties as well + beanUtils.getPropertyUtils().addBeanIntrospector(new FluentPropertyBeanIntrospectorWithIgnores()); + } + + public static void registerConverter(Converter converter, Class type) { + synchronized (beanUtils) { + beanUtils.getConvertUtils().register(converter, type); + } + } + + public static

P copyData(P source, P target) throws Exception { + synchronized (beanUtils) { + beanUtils.copyProperties(source, target); + } + return target; + } + + public static

P setData(URI uri, P obj, Map query) throws Exception { + synchronized (beanUtils) { + beanUtils.setProperty(obj, "host", uri.getHost()); + beanUtils.setProperty(obj, "port", uri.getPort()); + beanUtils.setProperty(obj, "userInfo", uri.getUserInfo()); + beanUtils.populate(obj, query); + } + return obj; + } + + public static

P setData( P obj, Map data) throws Exception { + synchronized (beanUtils) { + beanUtils.populate(obj, data); + } + return obj; + } + + public static void setData(URI uri, + HashMap properties, + Set allowableProperties, + Map query, + Map extraProps) { + if (allowableProperties.contains("host")) { + properties.put("host", "" + uri.getHost()); + } + if (allowableProperties.contains("port")) { + properties.put("port", "" + uri.getPort()); + } + if (allowableProperties.contains("userInfo")) { + properties.put("userInfo", "" + uri.getUserInfo()); + } + for (Map.Entry entry : query.entrySet()) { + if (allowableProperties.contains(entry.getKey())) { + properties.put(entry.getKey(), entry.getValue()); + } + else { + extraProps.put(entry.getKey(), entry.getValue()); + } + } + } + + public static String getData(List ignored, Object... beans) throws Exception { + StringBuilder sb = new StringBuilder(); + boolean empty = true; + synchronized (beanUtils) { + for (Object bean : beans) { + if (bean != null) { + PropertyDescriptor[] descriptors = beanUtils.getPropertyUtils().getPropertyDescriptors(bean); + for (PropertyDescriptor descriptor : descriptors) { + if (descriptor.getReadMethod() != null && isWriteable(descriptor, ignored)) { + String value = beanUtils.getProperty(bean, descriptor.getName()); + if (value != null) { + if (!empty) { + sb.append("&"); + } + empty = false; + sb.append(descriptor.getName()).append("=").append(encodeURI(value)); + } + } + } + } + } + } + return sb.toString(); + } + + private static boolean isWriteable(PropertyDescriptor descriptor, List ignored) { + if (ignored != null && ignored.contains(descriptor.getName())) { + return false; + } + Class type = descriptor.getPropertyType(); + return (type == Double.class) || + (type == double.class) || + (type == Long.class) || + (type == long.class) || + (type == Integer.class) || + (type == int.class) || + (type == Float.class) || + (type == float.class) || + (type == Boolean.class) || + (type == boolean.class) || + (type == String.class); + } + + + public static String decodeURI(String value) throws UnsupportedEncodingException { + return URLDecoder.decode(value, "UTF-8"); + } + + public static String encodeURI(String value) throws UnsupportedEncodingException { + return URLEncoder.encode(value, "UTF-8"); + } + +} diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/URISchema.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/URISchema.java index 25ce8e9f9b..3120292bcc 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/URISchema.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/uri/URISchema.java @@ -16,19 +16,11 @@ */ package org.apache.activemq.artemis.utils.uri; -import java.beans.PropertyDescriptor; import java.io.UnsupportedEncodingException; import java.net.URI; import java.net.URISyntaxException; -import java.net.URLDecoder; -import java.net.URLEncoder; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Set; - -import org.apache.commons.beanutils.BeanUtilsBean; -import org.apache.commons.beanutils.Converter; public abstract class URISchema { @@ -39,7 +31,7 @@ public abstract class URISchema { } public void populateObject(URI uri, T bean) throws Exception { - setData(uri, bean, parseQuery(uri.getQuery(), null)); + BeanSupport.setData(uri, bean, parseQuery(uri.getQuery(), null)); } public URI newURI(T bean) throws Exception { @@ -97,38 +89,17 @@ public abstract class URISchema { protected abstract T internalNewObject(URI uri, Map query, P param) throws Exception; - /** This is the default implementation. - * Sub classes are should provide a proper implementation for their schemas. */ + /** + * This is the default implementation. + * Sub classes are should provide a proper implementation for their schemas. + */ protected URI internalNewURI(T bean) throws Exception { - String query = URISchema.getData(null, bean); + String query = BeanSupport.getData(null, bean); - return new URI(getSchemaName(), - null, - "//", query, null); + return new URI(getSchemaName(), null, "//", query, null); } - private static final BeanUtilsBean beanUtils = new BeanUtilsBean(); - - public static void registerConverter(Converter converter, Class type) { - synchronized (beanUtils) { - beanUtils.getConvertUtils().register(converter, type); - } - } - - public static String decodeURI(String value) throws UnsupportedEncodingException { - return URLDecoder.decode(value, "UTF-8"); - } - - public static String encodeURI(String value) throws UnsupportedEncodingException { - return URLEncoder.encode(value, "UTF-8"); - } - - static { - // This is to customize the BeanUtils to use Fluent Proeprties as well - beanUtils.getPropertyUtils().addBeanIntrospector(new FluentPropertyBeanIntrospectorWithIgnores()); - } - public static Map parseQuery(String uri, Map propertyOverrides) throws URISyntaxException { try { @@ -138,8 +109,8 @@ public abstract class URISchema { for (int i = 0; i < parameters.length; i++) { int p = parameters[i].indexOf("="); if (p >= 0) { - String name = decodeURI(parameters[i].substring(0, p)); - String value = decodeURI(parameters[i].substring(p + 1)); + String name = BeanSupport.decodeURI(parameters[i].substring(0, p)); + String value = BeanSupport.decodeURI(parameters[i].substring(p + 1)); rc.put(name, value); } else { @@ -171,84 +142,4 @@ public abstract class URISchema { return buffer.toString(); } - - protected static

P copyData(P source, P target) throws Exception { - synchronized (beanUtils) { - beanUtils.copyProperties(source, target); - } - return target; - } - - protected static

P setData(URI uri, P obj, Map query) throws Exception { - synchronized (beanUtils) { - beanUtils.setProperty(obj, "host", uri.getHost()); - beanUtils.setProperty(obj, "port", uri.getPort()); - beanUtils.setProperty(obj, "userInfo", uri.getUserInfo()); - beanUtils.populate(obj, query); - } - return obj; - } - - public static void setData(URI uri, - HashMap properties, - Set allowableProperties, - Map query) { - if (allowableProperties.contains("host")) { - properties.put("host", "" + uri.getHost()); - } - if (allowableProperties.contains("port")) { - properties.put("port", "" + uri.getPort()); - } - if (allowableProperties.contains("userInfo")) { - properties.put("userInfo", "" + uri.getUserInfo()); - } - for (Map.Entry entry : query.entrySet()) { - if (allowableProperties.contains(entry.getKey())) { - properties.put(entry.getKey(), entry.getValue()); - } - } - } - - public static String getData(List ignored, Object... beans) throws Exception { - StringBuilder sb = new StringBuilder(); - boolean empty = true; - synchronized (beanUtils) { - for (Object bean : beans) { - if (bean != null) { - PropertyDescriptor[] descriptors = beanUtils.getPropertyUtils().getPropertyDescriptors(bean); - for (PropertyDescriptor descriptor : descriptors) { - if (descriptor.getReadMethod() != null && isWriteable(descriptor, ignored)) { - String value = beanUtils.getProperty(bean, descriptor.getName()); - if (value != null) { - if (!empty) { - sb.append("&"); - } - empty = false; - sb.append(descriptor.getName()).append("=").append(encodeURI(value)); - } - } - } - } - } - } - return sb.toString(); - } - - private static boolean isWriteable(PropertyDescriptor descriptor, List ignored) { - if (ignored != null && ignored.contains(descriptor.getName())) { - return false; - } - Class type = descriptor.getPropertyType(); - return (type == Double.class) || - (type == double.class) || - (type == Long.class) || - (type == long.class) || - (type == Integer.class) || - (type == int.class) || - (type == Float.class) || - (type == float.class) || - (type == Boolean.class) || - (type == boolean.class) || - (type == String.class); - } } diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/URIParserTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/URIParserTest.java index 183d586f62..71a783344b 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/URIParserTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/URIParserTest.java @@ -18,10 +18,13 @@ package org.apache.activemq.artemis.utils; import java.net.URI; +import java.util.HashMap; import java.util.Map; +import org.apache.activemq.artemis.utils.uri.BeanSupport; import org.apache.activemq.artemis.utils.uri.URIFactory; import org.apache.activemq.artemis.utils.uri.URISchema; +import org.apache.activemq.artemis.utils.uri.URISupport; import org.junit.Assert; import org.junit.Test; @@ -99,6 +102,30 @@ public class URIParserTest { Assert.assertEquals("something", fruit.getFluentName()); } + @Test + public void testQueryConversion() throws Exception { + Map query = new HashMap(); + String queryString = URISupport.createQueryString(query); + System.out.println("queryString1: " + queryString); + Assert.assertTrue(queryString.isEmpty()); + + query.put("key1", "value1"); + queryString = URISupport.createQueryString(query); + System.out.println("queryString2: " + queryString); + Assert.assertEquals("key1=value1", queryString); + + query.put("key2", "value2"); + queryString = URISupport.createQueryString(query); + System.out.println("queryString3: " + queryString); + Assert.assertEquals("key1=value1&key2=value2", queryString); + + query.put("key3", "value3"); + queryString = URISupport.createQueryString(query); + System.out.println("queryString4: " + queryString); + Assert.assertEquals("key1=value1&key2=value2&key3=value3", queryString); + + } + class FruitParser extends URIFactory { FruitParser() { @@ -116,7 +143,7 @@ public class URIParserTest { @Override public FruitBase internalNewObject(URI uri, Map query, String fruitName) throws Exception { - return setData(uri, new Fruit(getSchemaName()), query); + return BeanSupport.setData(uri, new Fruit(getSchemaName()), query); } } @@ -130,7 +157,7 @@ public class URIParserTest { @Override public FruitBase internalNewObject(URI uri, Map query, String fruitName) throws Exception { - return setData(uri, new FruitBase(getSchemaName()), query); + return BeanSupport.setData(uri, new FruitBase(getSchemaName()), query); } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java index 6f91537421..deceeeaad5 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java @@ -51,6 +51,8 @@ public class TransportConfiguration implements Serializable { private Map params; + private Map extraProps; + private static final byte TYPE_BOOLEAN = 0; private static final byte TYPE_INT = 1; @@ -93,6 +95,19 @@ public class TransportConfiguration implements Serializable { * @param name The name of this TransportConfiguration */ public TransportConfiguration(final String className, final Map params, final String name) { + this(className, params, name, null); + } + + /** + * Creates a TransportConfiguration with a specific name providing the class name of the {@link org.apache.activemq.artemis.spi.core.remoting.ConnectorFactory} + * and any parameters needed. + * + * @param className The class name of the ConnectorFactory + * @param params The parameters needed by the ConnectorFactory + * @param name The name of this TransportConfiguration + * @param extraProps The extra properties that specific to protocols + */ + public TransportConfiguration(final String className, final Map params, final String name, final Map extraProps) { factoryClassName = className; if (params == null || params.isEmpty()) { @@ -103,6 +118,7 @@ public class TransportConfiguration implements Serializable { } this.name = name; + this.extraProps = extraProps; } public TransportConfiguration newTransportConfig(String newName) { @@ -156,6 +172,9 @@ public class TransportConfiguration implements Serializable { return params; } + public Map getExtraParams() { + return extraProps; + } @Override public int hashCode() { @@ -249,10 +268,52 @@ public class TransportConfiguration implements Serializable { first = false; } + if (extraProps != null) { + for (Map.Entry entry : extraProps.entrySet()) { + if (!first) { + str.append("&"); + } + + String key = entry.getKey(); + String val = entry.getValue() == null ? "null" : entry.getValue().toString(); + + str.append(replaceWildcardChars(key)).append('=').append(replaceWildcardChars(val)); + + first = false; + } + } } return str.toString(); } + private void encodeMap(final ActiveMQBuffer buffer, final Map map) { + for (Map.Entry entry : map.entrySet()) { + buffer.writeString(entry.getKey()); + + Object val = entry.getValue(); + + if (val instanceof Boolean) { + buffer.writeByte(TransportConfiguration.TYPE_BOOLEAN); + buffer.writeBoolean((Boolean) val); + } + else if (val instanceof Integer) { + buffer.writeByte(TransportConfiguration.TYPE_INT); + buffer.writeInt((Integer) val); + } + else if (val instanceof Long) { + buffer.writeByte(TransportConfiguration.TYPE_LONG); + buffer.writeLong((Long) val); + } + else if (val instanceof String) { + buffer.writeByte(TransportConfiguration.TYPE_STRING); + buffer.writeString((String) val); + } + else { + throw ActiveMQClientMessageBundle.BUNDLE.invalidEncodeType(val); + } + } + } + /** * Encodes this TransportConfiguration into a buffer. *

@@ -267,31 +328,10 @@ public class TransportConfiguration implements Serializable { buffer.writeInt(params == null ? 0 : params.size()); if (params != null) { - for (Map.Entry entry : params.entrySet()) { - buffer.writeString(entry.getKey()); - - Object val = entry.getValue(); - - if (val instanceof Boolean) { - buffer.writeByte(TransportConfiguration.TYPE_BOOLEAN); - buffer.writeBoolean((Boolean) val); - } - else if (val instanceof Integer) { - buffer.writeByte(TransportConfiguration.TYPE_INT); - buffer.writeInt((Integer) val); - } - else if (val instanceof Long) { - buffer.writeByte(TransportConfiguration.TYPE_LONG); - buffer.writeLong((Long) val); - } - else if (val instanceof String) { - buffer.writeByte(TransportConfiguration.TYPE_STRING); - buffer.writeString((String) val); - } - else { - throw ActiveMQClientMessageBundle.BUNDLE.invalidEncodeType(val); - } - } + encodeMap(buffer, params); + } + if (extraProps != null) { + encodeMap(buffer, extraProps); } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index 755d93be97..0803782e9e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -53,9 +53,9 @@ import org.apache.activemq.artemis.core.remoting.impl.TransportConfigurationUtil import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; +import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; import org.apache.activemq.artemis.spi.core.remoting.Connection; -import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.Connector; import org.apache.activemq.artemis.spi.core.remoting.ConnectorFactory; import org.apache.activemq.artemis.spi.core.remoting.SessionContext; @@ -67,7 +67,7 @@ import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.UUIDGenerator; -public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ConnectionLifeCycleListener { +public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener { // Constants // ------------------------------------------------------------------------------------ @@ -350,7 +350,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C @Override public void connectionCreated(final ActiveMQComponent component, final Connection connection, - final String protocol) { + final ClientProtocolManager protocol) { } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java index d2233d102b..d963d1d698 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java @@ -25,8 +25,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; +import org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; -import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; /** * Common handler implementation for client and server side handler. @@ -37,13 +37,13 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler { private final BufferHandler handler; - private final ConnectionLifeCycleListener listener; + private final BaseConnectionLifeCycleListener listener; volatile boolean active; protected ActiveMQChannelHandler(final ChannelGroup group, final BufferHandler handler, - final ConnectionLifeCycleListener listener) { + final BaseConnectionLifeCycleListener listener) { this.group = group; this.handler = handler; this.listener = listener; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java index 6608b54f8a..45334eeb92 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java @@ -37,8 +37,8 @@ import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.Connection; -import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.IPV6Util; @@ -53,7 +53,7 @@ public class NettyConnection implements Connection { private boolean closed; - private final ConnectionLifeCycleListener listener; + private final BaseConnectionLifeCycleListener listener; private final boolean batchingEnabled; @@ -79,7 +79,7 @@ public class NettyConnection implements Connection { public NettyConnection(final Map configuration, final Channel channel, - final ConnectionLifeCycleListener listener, + final BaseConnectionLifeCycleListener listener, boolean batchingEnabled, boolean directDeliver) { this.configuration = configuration; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java index 55435f568c..355202f5ed 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java @@ -94,10 +94,11 @@ import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtoco import org.apache.activemq.artemis.core.remoting.impl.ssl.SSLSupport; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.spi.core.remoting.AbstractConnector; +import org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; +import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; import org.apache.activemq.artemis.spi.core.remoting.Connection; -import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.artemis.utils.FutureLatch; @@ -151,7 +152,7 @@ public class NettyConnector extends AbstractConnector { private final BufferHandler handler; - private final ConnectionLifeCycleListener listener; + private final BaseConnectionLifeCycleListener listener; private boolean sslEnabled = TransportConstants.DEFAULT_SSL_ENABLED; @@ -231,7 +232,7 @@ public class NettyConnector extends AbstractConnector { // Public -------------------------------------------------------- public NettyConnector(final Map configuration, final BufferHandler handler, - final ConnectionLifeCycleListener listener, + final BaseConnectionLifeCycleListener listener, final Executor closeExecutor, final Executor threadPool, final ScheduledExecutorService scheduledThreadPool) { @@ -240,7 +241,7 @@ public class NettyConnector extends AbstractConnector { public NettyConnector(final Map configuration, final BufferHandler handler, - final ConnectionLifeCycleListener listener, + final BaseConnectionLifeCycleListener listener, final Executor closeExecutor, final Executor threadPool, final ScheduledExecutorService scheduledThreadPool, @@ -681,7 +682,7 @@ public class NettyConnector extends AbstractConnector { // No acceptor on a client connection Listener connectionListener = new Listener(); NettyConnection conn = new NettyConnection(configuration, ch, connectionListener, !httpEnabled && batchDelay > 0, false); - connectionListener.connectionCreated(null, conn, protocolManager.getName()); + connectionListener.connectionCreated(null, conn, protocolManager); return conn; } else { @@ -709,7 +710,7 @@ public class NettyConnector extends AbstractConnector { ActiveMQClientChannelHandler(final ChannelGroup group, final BufferHandler handler, - final ConnectionLifeCycleListener listener) { + final ClientConnectionLifeCycleListener listener) { super(group, handler, listener); } } @@ -899,12 +900,12 @@ public class NettyConnector extends AbstractConnector { } } - private class Listener implements ConnectionLifeCycleListener { + private class Listener implements ClientConnectionLifeCycleListener { @Override public void connectionCreated(final ActiveMQComponent component, final Connection connection, - final String protocol) { + final ClientProtocolManager protocol) { if (connections.putIfAbsent(connection.getID(), connection) != null) { throw ActiveMQClientMessageBundle.BUNDLE.connectionExists(connection.getID()); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnectorFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnectorFactory.java index a7c5f0e334..a0648df58c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnectorFactory.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnectorFactory.java @@ -21,8 +21,8 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; +import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; -import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.Connector; import org.apache.activemq.artemis.spi.core.remoting.ConnectorFactory; @@ -31,7 +31,7 @@ public class NettyConnectorFactory implements ConnectorFactory { @Override public Connector createConnector(final Map configuration, final BufferHandler handler, - final ConnectionLifeCycleListener listener, + final ClientConnectionLifeCycleListener listener, final Executor closeExecutor, final Executor threadPool, final ScheduledExecutorService scheduledThreadPool, diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/BaseConnectionLifeCycleListener.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/BaseConnectionLifeCycleListener.java new file mode 100644 index 0000000000..77a8c59f68 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/BaseConnectionLifeCycleListener.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.spi.core.remoting; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.core.server.ActiveMQComponent; + +/** + * A ConnectionLifeCycleListener is called by the remoting implementation to notify of connection events. + */ +public interface BaseConnectionLifeCycleListener { + + /** + * This method is used both by client connector creation and server connection creation through + * acceptors. On the client side the {@code component} parameter is normally passed as + * {@code null}. + *

+ * Leaving this method here and adding a different one at + * {@code ServerConnectionLifeCycleListener} is a compromise for a reasonable split between the + * activemq-server and activemq-client packages while avoiding to pull too much into activemq-core. + * The pivotal point keeping us from removing the method is {@link ConnectorFactory} and the + * usage of it. + * + * @param component This will probably be an {@code Acceptor} and only used on the server side. + * @param connection the connection that has been created + * @param protocol the messaging protocol type this connection uses + */ + void connectionCreated(ActiveMQComponent component, Connection connection, ProtocolClass protocol); + + /** + * Called when a connection is destroyed. + * + * @param connectionID the connection being destroyed. + */ + void connectionDestroyed(Object connectionID); + + /** + * Called when an error occurs on the connection. + * + * @param connectionID the id of the connection. + * @param me the exception. + */ + void connectionException(Object connectionID, ActiveMQException me); + + void connectionReadyForWrites(Object connectionID, boolean ready); +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientConnectionLifeCycleListener.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientConnectionLifeCycleListener.java new file mode 100644 index 0000000000..deb36cd72d --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ClientConnectionLifeCycleListener.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.spi.core.remoting; + +public interface ClientConnectionLifeCycleListener extends BaseConnectionLifeCycleListener { + +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ConnectionLifeCycleListener.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ConnectionLifeCycleListener.java index b5d7d975fe..f70222701a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ConnectionLifeCycleListener.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ConnectionLifeCycleListener.java @@ -16,45 +16,11 @@ */ package org.apache.activemq.artemis.spi.core.remoting; -import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.core.server.ActiveMQComponent; - /** * A ConnectionLifeCycleListener is called by the remoting implementation to notify of connection events. + * @deprecated use {@link ClientConnectionLifeCycleListener} instead. */ -public interface ConnectionLifeCycleListener { +@Deprecated +public interface ConnectionLifeCycleListener extends BaseConnectionLifeCycleListener { - /** - * This method is used both by client connector creation and server connection creation through - * acceptors. On the client side the {@code component} parameter is normally passed as - * {@code null}. - *

- * Leaving this method here and adding a different one at - * {@code ServerConnectionLifeCycleListener} is a compromise for a reasonable split between the - * activemq-server and activemq-client packages while avoiding to pull too much into activemq-core. - * The pivotal point keeping us from removing the method is {@link ConnectorFactory} and the - * usage of it. - * - * @param component This will probably be an {@code Acceptor} and only used on the server side. - * @param connection the connection that has been created - * @param protocol the messaging protocol type this connection uses - */ - void connectionCreated(ActiveMQComponent component, Connection connection, String protocol); - - /** - * Called when a connection is destroyed. - * - * @param connectionID the connection being destroyed. - */ - void connectionDestroyed(Object connectionID); - - /** - * Called when an error occurs on the connection. - * - * @param connectionID the id of the connection. - * @param me the exception. - */ - void connectionException(Object connectionID, ActiveMQException me); - - void connectionReadyForWrites(Object connectionID, boolean ready); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ConnectorFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ConnectorFactory.java index 3d7c7b30d6..e709f7806d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ConnectorFactory.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ConnectorFactory.java @@ -42,7 +42,7 @@ public interface ConnectorFactory extends TransportConfigurationHelper { */ Connector createConnector(Map configuration, BufferHandler handler, - ConnectionLifeCycleListener listener, + ClientConnectionLifeCycleListener listener, Executor closeExecutor, Executor threadPool, ScheduledExecutorService scheduledThreadPool, diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/TCPTransportConfigurationSchema.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/TCPTransportConfigurationSchema.java index 309e3e40b9..e962a5d211 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/TCPTransportConfigurationSchema.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/TCPTransportConfigurationSchema.java @@ -26,6 +26,7 @@ import java.util.Set; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; +import org.apache.activemq.artemis.utils.uri.BeanSupport; import org.apache.activemq.artemis.utils.uri.SchemaConstants; public class TCPTransportConfigurationSchema extends AbstractTransportConfigurationSchema { @@ -60,10 +61,13 @@ public class TCPTransportConfigurationSchema extends AbstractTransportConfigurat String factoryName) throws URISyntaxException { HashMap props = new HashMap<>(); - setData(uri, props, allowableProperties, query); + Map extraProps = new HashMap<>(); + BeanSupport.setData(uri, props, allowableProperties, query, extraProps); List transportConfigurations = new ArrayList<>(); - transportConfigurations.add(new TransportConfiguration(factoryName, props, name)); + TransportConfiguration config = new TransportConfiguration(factoryName, props, name, extraProps); + + transportConfigurations.add(config); String connectors = uri.getFragment(); if (connectors != null && !connectors.trim().isEmpty()) { @@ -71,9 +75,10 @@ public class TCPTransportConfigurationSchema extends AbstractTransportConfigurat for (String s : split) { URI extraUri = new URI(s); HashMap newProps = new HashMap<>(); - setData(extraUri, newProps, allowableProperties, query); - setData(extraUri, newProps, allowableProperties, parseQuery(extraUri.getQuery(), null)); - transportConfigurations.add(new TransportConfiguration(factoryName, newProps, name + ":" + extraUri.toString())); + extraProps = new HashMap<>(); + BeanSupport.setData(extraUri, newProps, allowableProperties, query, extraProps); + BeanSupport.setData(extraUri, newProps, allowableProperties, parseQuery(extraUri.getQuery(), null), extraProps); + transportConfigurations.add(new TransportConfiguration(factoryName, newProps, name + ":" + extraUri.toString(), extraProps)); } } return transportConfigurations; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/AbstractServerLocatorSchema.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/AbstractServerLocatorSchema.java index e1a5f2b14d..d0693d45f1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/AbstractServerLocatorSchema.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/AbstractServerLocatorSchema.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.uri.schema.serverLocator; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.utils.uri.BeanSupport; import org.apache.activemq.artemis.utils.uri.URISchema; import java.net.URI; @@ -25,6 +26,6 @@ import java.util.Map; public abstract class AbstractServerLocatorSchema extends URISchema { protected ConnectionOptions newConnectionOptions(URI uri, Map query) throws Exception { - return setData(uri, new ConnectionOptions(), query); + return BeanSupport.setData(uri, new ConnectionOptions(), query); } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/InVMServerLocatorSchema.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/InVMServerLocatorSchema.java index ace312aca0..2060ea946f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/InVMServerLocatorSchema.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/InVMServerLocatorSchema.java @@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.uri.schema.connector.InVMTransportConfigurationSchema; +import org.apache.activemq.artemis.utils.uri.BeanSupport; import org.apache.activemq.artemis.utils.uri.SchemaConstants; import java.net.URI; @@ -37,7 +38,7 @@ public class InVMServerLocatorSchema extends AbstractServerLocatorSchema { protected ServerLocator internalNewObject(URI uri, Map query, String name) throws Exception { TransportConfiguration tc = InVMTransportConfigurationSchema.createTransportConfiguration(uri, query, name, "org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory"); ServerLocator factory = ActiveMQClient.createServerLocatorWithoutHA(tc); - return setData(uri, factory, query); + return BeanSupport.setData(uri, factory, query); } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/JGroupsServerLocatorSchema.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/JGroupsServerLocatorSchema.java index 73a1b942c0..c8ee3054db 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/JGroupsServerLocatorSchema.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/JGroupsServerLocatorSchema.java @@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.JGroupsPropertiesBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.utils.uri.BeanSupport; import org.apache.activemq.artemis.utils.uri.SchemaConstants; import java.io.NotSerializableException; @@ -63,7 +64,7 @@ public class JGroupsServerLocatorSchema extends AbstractServerLocatorSchema { else { throw new NotSerializableException(endpoint + "not serializable"); } - String query = getData(null, bean, dgc, endpoint); + String query = BeanSupport.getData(null, bean, dgc, endpoint); dgc.setBroadcastEndpointFactory(endpoint); return new URI(SchemaConstants.JGROUPS, null, auth, -1, null, query, null); } @@ -79,11 +80,11 @@ public class JGroupsServerLocatorSchema extends AbstractServerLocatorSchema { endpointFactory = new JGroupsPropertiesBroadcastEndpointFactory().setChannelName(uri.getAuthority()); } - setData(uri, endpointFactory, query); + BeanSupport.setData(uri, endpointFactory, query); DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration().setName(name).setBroadcastEndpointFactory(endpointFactory); - setData(uri, dcConfig, query); + BeanSupport.setData(uri, dcConfig, query); return dcConfig; } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/TCPServerLocatorSchema.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/TCPServerLocatorSchema.java index d141ee60df..4a2e2aacd5 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/TCPServerLocatorSchema.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/TCPServerLocatorSchema.java @@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactor import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.uri.schema.connector.TCPTransportConfigurationSchema; import org.apache.activemq.artemis.utils.IPV6Util; +import org.apache.activemq.artemis.utils.uri.BeanSupport; import org.apache.activemq.artemis.utils.uri.SchemaConstants; public class TCPServerLocatorSchema extends AbstractServerLocatorSchema { @@ -52,7 +53,7 @@ public class TCPServerLocatorSchema extends AbstractServerLocatorSchema { @Override protected URI internalNewURI(ServerLocator bean) throws Exception { - String query = getData(null, bean); + String query = BeanSupport.getData(null, bean); TransportConfiguration[] staticConnectors = bean.getStaticTransportConfigurations(); return getURI(query, staticConnectors); } @@ -122,9 +123,9 @@ public class TCPServerLocatorSchema extends AbstractServerLocatorSchema { else { empty = false; } - cb.append(encodeURI(entry.getKey())); + cb.append(BeanSupport.encodeURI(entry.getKey())); cb.append("="); - cb.append(encodeURI(entry.getValue().toString())); + cb.append(BeanSupport.encodeURI(entry.getValue().toString())); } } return cb.toString(); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/UDPServerLocatorSchema.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/UDPServerLocatorSchema.java index a21e1a9053..3498804367 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/UDPServerLocatorSchema.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/serverLocator/UDPServerLocatorSchema.java @@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.utils.uri.BeanSupport; import org.apache.activemq.artemis.utils.uri.SchemaConstants; public class UDPServerLocatorSchema extends AbstractServerLocatorSchema { @@ -60,7 +61,7 @@ public class UDPServerLocatorSchema extends AbstractServerLocatorSchema { DiscoveryGroupConfiguration dgc = bean.getDiscoveryGroupConfiguration(); UDPBroadcastEndpointFactory endpoint = (UDPBroadcastEndpointFactory) dgc.getBroadcastEndpointFactory(); dgc.setBroadcastEndpointFactory(endpoint); - String query = getData(IGNORED, bean, dgc, endpoint); + String query = BeanSupport.getData(IGNORED, bean, dgc, endpoint); return new URI(SchemaConstants.UDP, null, endpoint.getGroupAddress(), endpoint.getGroupPort(), null, query, null); } @@ -71,11 +72,11 @@ public class UDPServerLocatorSchema extends AbstractServerLocatorSchema { String name) throws Exception { UDPBroadcastEndpointFactory endpointFactoryConfiguration = new UDPBroadcastEndpointFactory().setGroupAddress(host).setGroupPort(port); - setData(uri, endpointFactoryConfiguration, query); + BeanSupport.setData(uri, endpointFactoryConfiguration, query); - DiscoveryGroupConfiguration dgc = setData(uri, new DiscoveryGroupConfiguration(), query).setName(name).setBroadcastEndpointFactory(endpointFactoryConfiguration); + DiscoveryGroupConfiguration dgc = BeanSupport.setData(uri, new DiscoveryGroupConfiguration(), query).setName(name).setBroadcastEndpointFactory(endpointFactoryConfiguration); - setData(uri, dgc, query); + BeanSupport.setData(uri, dgc, query); return dgc; } } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/uri/AbstractCFSchema.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/uri/AbstractCFSchema.java index b93d2eae4b..9b407dbed8 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/uri/AbstractCFSchema.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/uri/AbstractCFSchema.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.utils.uri.BeanSupport; import org.apache.activemq.artemis.utils.uri.URISchema; public abstract class AbstractCFSchema extends URISchema { @@ -32,7 +33,7 @@ public abstract class AbstractCFSchema extends URISchema - * This is a convenience method. - * - * @param name - */ + + public EmbeddedJMS setConfiguration(Configuration configuration) { + super.setConfiguration(configuration); + return this; + } + + /** + * Lookup in the registry for registered object, i.e. a ConnectionFactory. + *

+ * This is a convenience method. + * + * @param name + */ public Object lookup(String name) { return serverManager.getRegistry().lookup(name); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManagerFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManagerFactory.java index c11e05b508..e677563255 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManagerFactory.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManagerFactory.java @@ -22,10 +22,12 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; +import org.apache.activemq.artemis.utils.uri.BeanSupport; import org.osgi.service.component.annotations.Component; import java.util.Collections; import java.util.List; +import java.util.Map; @Component(service = ProtocolManagerFactory.class) public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory { @@ -38,9 +40,10 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory @Override public ProtocolManager createProtocolManager(ActiveMQServer server, - List incomingInterceptors, - List outgoingInterceptors) { - return new ProtonProtocolManager(this, server); + final Map parameters, + List incomingInterceptors, + List outgoingInterceptors) throws Exception { + return BeanSupport.setData(new ProtonProtocolManager(this, server), parameters); } @Override diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java index deeb191e2f..9fd851ff0e 100644 --- a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java +++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManagerFactory.java @@ -17,7 +17,9 @@ package org.apache.activemq.artemis.core.protocol.hornetq; import java.util.List; +import java.util.Map; +import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -36,11 +38,17 @@ public class HornetQProtocolManagerFactory extends CoreProtocolManagerFactory { @Override public ProtocolManager createProtocolManager(final ActiveMQServer server, - final List incomingInterceptors, - List outgoingInterceptors) { - incomingInterceptors.add(new HQPropertiesConversionInterceptor(true)); - outgoingInterceptors.add(new HQPropertiesConversionInterceptor(false)); - return new HornetQProtocolManager(this, server, incomingInterceptors, outgoingInterceptors); + final Map parameters, + final List incomingInterceptors, + List outgoingInterceptors) { + + List hqIncoming = filterInterceptors(incomingInterceptors); + List hqOutgoing = filterInterceptors(outgoingInterceptors); + + hqIncoming.add(new HQPropertiesConversionInterceptor(true)); + hqOutgoing.add(new HQPropertiesConversionInterceptor(false)); + + return new HornetQProtocolManager(this, server, hqIncoming, hqOutgoing); } @Override diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java index 5bb34b5fd5..982723f89b 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java @@ -18,14 +18,16 @@ package org.apache.activemq.artemis.core.protocol.mqtt; import java.util.List; +import java.util.Map; +import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; import org.osgi.service.component.annotations.Component; @Component(service = ProtocolManagerFactory.class) -public class MQTTProtocolManagerFactory implements ProtocolManagerFactory { +public class MQTTProtocolManagerFactory implements ProtocolManagerFactory { public static final String MQTT_PROTOCOL_NAME = "MQTT"; @@ -35,8 +37,9 @@ public class MQTTProtocolManagerFactory implements ProtocolManagerFactory { @Override public ProtocolManager createProtocolManager(ActiveMQServer server, - List incomingInterceptors, - List outgoingInterceptors) { + final Map parameters, + List incomingInterceptors, + List outgoingInterceptors) { return new MQTTProtocolManager(server); } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManagerFactory.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManagerFactory.java index 6b3076dd9d..772ce8b81e 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManagerFactory.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManagerFactory.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.openwire; import java.util.Collections; import java.util.List; +import java.util.Map; import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.Interceptor; @@ -25,6 +26,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; +import org.apache.activemq.artemis.utils.uri.BeanSupport; import org.osgi.service.component.annotations.Component; @Component(service = ProtocolManagerFactory.class) @@ -38,9 +40,10 @@ public class OpenWireProtocolManagerFactory extends AbstractProtocolManagerFacto @Override public ProtocolManager createProtocolManager(final ActiveMQServer server, - final List incomingInterceptors, - List outgoingInterceptors) { - return new OpenWireProtocolManager(this, server); + Map parameters, + final List incomingInterceptors, + List outgoingInterceptors) throws Exception { + return BeanSupport.setData(new OpenWireProtocolManager(this, server), parameters); } @Override diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index a388f50375..3e642c60cd 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -62,7 +62,6 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.wireformat.WireFormat; public class AMQSession implements SessionCallback { - private AMQServerSession coreSession; private ConnectionInfo connInfo; private SessionInfo sessInfo; diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/ProtonServerMessage.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/ProtonServerMessage.java index ff9cccadeb..c3fab5dc50 100644 --- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/ProtonServerMessage.java +++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/util/ProtonServerMessage.java @@ -29,7 +29,6 @@ import org.apache.qpid.proton.codec.DecoderImpl; import org.apache.qpid.proton.codec.EncoderImpl; import org.apache.qpid.proton.codec.WritableBuffer; import org.apache.qpid.proton.message.MessageError; -import org.apache.qpid.proton.message.MessageFormat; import org.apache.qpid.proton.message.ProtonJMessage; /** @@ -454,36 +453,6 @@ public class ProtonServerMessage implements ProtonJMessage { return 0; } - @Override - public void load(Object data) { - - } - - @Override - public Object save() { - return null; - } - - @Override - public String toAMQPFormat(Object value) { - return null; - } - - @Override - public Object parseAMQPFormat(String value) { - return null; - } - - @Override - public void setMessageFormat(MessageFormat format) { - - } - - @Override - public MessageFormat getMessageFormat() { - return null; - } - @Override public void clear() { diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManagerFactory.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManagerFactory.java index 2d41e03bb2..f7d5d4ab0e 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManagerFactory.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManagerFactory.java @@ -17,12 +17,14 @@ package org.apache.activemq.artemis.core.protocol.stomp; import java.util.List; +import java.util.Map; import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; +import org.apache.activemq.artemis.utils.uri.BeanSupport; import org.osgi.service.component.annotations.Component; @Component(service = ProtocolManagerFactory.class) @@ -36,14 +38,15 @@ public class StompProtocolManagerFactory extends AbstractProtocolManagerFactory< @Override public ProtocolManager createProtocolManager(final ActiveMQServer server, - final List incomingInterceptors, - List outgoingInterceptors) { - return new StompProtocolManager(this, server, incomingInterceptors, outgoingInterceptors); + final Map parameters, + final List incomingInterceptors, + List outgoingInterceptors) throws Exception { + return BeanSupport.setData(new StompProtocolManager(this, server, filterInterceptors(incomingInterceptors), filterInterceptors(outgoingInterceptors)), parameters); } @Override public List filterInterceptors(List interceptors) { - return filterInterceptors(StompFrameInterceptor.class, interceptors); + return internalFilterInterceptors(StompFrameInterceptor.class, interceptors); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java index 8f965ce5b8..fed7a7542c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java @@ -84,6 +84,10 @@ public class ProtocolHandler { } } + public ProtocolManager getProtocol(String name) { + return this.protocolMap.get(name); + } + class ProtocolDecoder extends ByteToMessageDecoder { private final boolean http; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java index 2a07606472..7fed5347f3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManagerFactory.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl; import java.util.List; +import java.util.Map; import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.Interceptor; @@ -24,6 +25,7 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; +import org.apache.activemq.artemis.utils.uri.BeanSupport; public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory { @@ -41,9 +43,10 @@ public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory incomingInterceptors, - List outgoingInterceptors) { - return new CoreProtocolManager(this, server, incomingInterceptors, outgoingInterceptors); + Map parameters, + final List incomingInterceptors, + List outgoingInterceptors) throws Exception { + return BeanSupport.setData(new CoreProtocolManager(this, server, filterInterceptors(incomingInterceptors), filterInterceptors(outgoingInterceptors)), parameters); } @Override @@ -51,7 +54,7 @@ public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory protocolMap; + + + public AbstractAcceptor(Map protocolMap) { + this.protocolMap = protocolMap; + } + /** + * This will update the list of interceptors for each ProtocolManager inside the acceptor. + * */ + public void updateInterceptors(List incomingInterceptors, List outgoingInterceptors) { + for (ProtocolManager manager : protocolMap.values()) { + manager.updateInterceptors(incomingInterceptors, outgoingInterceptors); + } + } + +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java index 795a711e4f..2967b2a3ef 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptor.java @@ -25,28 +25,29 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; +import org.apache.activemq.artemis.core.remoting.impl.AbstractAcceptor; import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationService; -import org.apache.activemq.artemis.spi.core.remoting.Acceptor; +import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.Connection; -import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; +import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener; import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.TypedProperties; -public final class InVMAcceptor implements Acceptor { +public final class InVMAcceptor extends AbstractAcceptor { private final int id; private final BufferHandler handler; - private final ConnectionLifeCycleListener listener; + private final ServerConnectionLifeCycleListener listener; private final ConcurrentMap connections = new ConcurrentHashMap<>(); @@ -72,8 +73,10 @@ public final class InVMAcceptor implements Acceptor { final ClusterConnection clusterConnection, final Map configuration, final BufferHandler handler, - final ConnectionLifeCycleListener listener, + final ServerConnectionLifeCycleListener listener, + final Map protocolMap, final Executor threadPool) { + super(protocolMap); this.name = name; @@ -219,7 +222,7 @@ public final class InVMAcceptor implements Acceptor { InVMConnection inVMConnection = new InVMConnection(id, connectionID, remoteHandler, connectionListener, clientExecutor, defaultActiveMQPrincipal); - connectionListener.connectionCreated(this, inVMConnection, ActiveMQClient.DEFAULT_CORE_PROTOCOL); + connectionListener.connectionCreated(this, inVMConnection, protocolMap.get(ActiveMQClient.DEFAULT_CORE_PROTOCOL)); } public void disconnect(final String connectionID) { @@ -249,7 +252,7 @@ public final class InVMAcceptor implements Acceptor { this.defaultActiveMQPrincipal = defaultActiveMQPrincipal; } - private class Listener implements ConnectionLifeCycleListener { + private class Listener implements ServerConnectionLifeCycleListener { //private static Listener instance = new Listener(); private final InVMConnector connector; @@ -261,7 +264,7 @@ public final class InVMAcceptor implements Acceptor { @Override public void connectionCreated(final ActiveMQComponent component, final Connection connection, - final String protocol) { + final ProtocolManager protocol) { if (connections.putIfAbsent((String) connection.getID(), connection) != null) { throw ActiveMQMessageBundle.BUNDLE.connectionExists(connection.getID()); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptorFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptorFactory.java index e28ee3a3eb..883b116ec7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptorFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMAcceptorFactory.java @@ -25,7 +25,7 @@ import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; -import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; +import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener; public class InVMAcceptorFactory implements AcceptorFactory { @@ -34,10 +34,10 @@ public class InVMAcceptorFactory implements AcceptorFactory { final ClusterConnection clusterConnection, final Map configuration, final BufferHandler handler, - final ConnectionLifeCycleListener listener, + final ServerConnectionLifeCycleListener listener, final Executor threadPool, final ScheduledExecutorService scheduledThreadPool, - final Map protocolHandler) { - return new InVMAcceptor(name, clusterConnection, configuration, handler, listener, threadPool); + final Map protocolMap) { + return new InVMAcceptor(name, clusterConnection, configuration, handler, listener, protocolMap, threadPool); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java index 2af8a16737..70d62895d5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java @@ -31,9 +31,9 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.Connection; -import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.UUIDGenerator; @@ -43,7 +43,7 @@ public class InVMConnection implements Connection { private final BufferHandler handler; - private final ConnectionLifeCycleListener listener; + private final BaseConnectionLifeCycleListener listener; private final String id; @@ -64,7 +64,7 @@ public class InVMConnection implements Connection { public InVMConnection(final int serverID, final BufferHandler handler, - final ConnectionLifeCycleListener listener, + final BaseConnectionLifeCycleListener listener, final Executor executor) { this(serverID, UUIDGenerator.getInstance().generateSimpleStringUUID().toString(), handler, listener, executor); } @@ -72,7 +72,7 @@ public class InVMConnection implements Connection { public InVMConnection(final int serverID, final String id, final BufferHandler handler, - final ConnectionLifeCycleListener listener, + final BaseConnectionLifeCycleListener listener, final Executor executor) { this(serverID, id, handler, listener, executor, null); } @@ -80,7 +80,7 @@ public class InVMConnection implements Connection { public InVMConnection(final int serverID, final String id, final BufferHandler handler, - final ConnectionLifeCycleListener listener, + final BaseConnectionLifeCycleListener listener, final Executor executor, final ActiveMQPrincipal defaultActiveMQPrincipal) { this.serverID = serverID; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java index 33a314746f..c1fab773d5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnector.java @@ -29,7 +29,9 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.spi.core.remoting.AbstractConnector; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; +import org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; +import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; @@ -72,7 +74,7 @@ public class InVMConnector extends AbstractConnector { private final BufferHandler handler; - private final ConnectionLifeCycleListener listener; + private final BaseConnectionLifeCycleListener listener; private final InVMAcceptor acceptor; @@ -86,7 +88,7 @@ public class InVMConnector extends AbstractConnector { public InVMConnector(final Map configuration, final BufferHandler handler, - final ConnectionLifeCycleListener listener, + final ClientConnectionLifeCycleListener listener, final Executor closeExecutor, final Executor threadPool, ClientProtocolManager protocolManager) { @@ -181,11 +183,11 @@ public class InVMConnector extends AbstractConnector { // This may be an injection point for mocks on tests protected Connection internalCreateConnection(final BufferHandler handler, - final ConnectionLifeCycleListener listener, + final ClientConnectionLifeCycleListener listener, final Executor serverExecutor) { // No acceptor on a client connection InVMConnection inVMConnection = new InVMConnection(id, handler, listener, serverExecutor); - listener.connectionCreated(null, inVMConnection, protocolManager.getName()); + listener.connectionCreated(null, inVMConnection, protocolManager); return inVMConnection; } @@ -195,17 +197,23 @@ public class InVMConnector extends AbstractConnector { return id == serverId; } - private class Listener implements ConnectionLifeCycleListener { + private class Listener implements ClientConnectionLifeCycleListener { @Override public void connectionCreated(final ActiveMQComponent component, final Connection connection, - final String protocol) { + final ClientProtocolManager protocol) { if (connections.putIfAbsent((String) connection.getID(), connection) != null) { throw ActiveMQMessageBundle.BUNDLE.connectionExists(connection.getID()); } - listener.connectionCreated(component, connection, protocol); + if (listener instanceof ConnectionLifeCycleListener) { + listener.connectionCreated(component, connection, protocol.getName()); + } + else { + listener.connectionCreated(component, connection, protocol); + } + } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnectorFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnectorFactory.java index 77ca86b353..112e3f462e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnectorFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnectorFactory.java @@ -21,8 +21,8 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; +import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; -import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.Connector; import org.apache.activemq.artemis.spi.core.remoting.ConnectorFactory; @@ -31,7 +31,7 @@ public class InVMConnectorFactory implements ConnectorFactory { @Override public Connector createConnector(final Map configuration, final BufferHandler handler, - final ConnectionLifeCycleListener listener, + final ClientConnectionLifeCycleListener listener, final Executor closeExecutor, final Executor threadPool, final ScheduledExecutorService scheduledThreadPool, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java index 345981e852..a5067c612a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java @@ -62,6 +62,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl; import org.apache.activemq.artemis.core.protocol.ProtocolHandler; +import org.apache.activemq.artemis.core.remoting.impl.AbstractAcceptor; import org.apache.activemq.artemis.core.remoting.impl.ssl.SSLSupport; import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; import org.apache.activemq.artemis.core.server.ActiveMQComponent; @@ -71,18 +72,17 @@ import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationService; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; -import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.Connection; -import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; +import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener; import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.TypedProperties; /** - * A Netty TCP Acceptor that supports SSL + * A Netty TCP Acceptor that is embedding Netty. */ -public class NettyAcceptor implements Acceptor { +public class NettyAcceptor extends AbstractAcceptor { static { // Disable resource leak detection for performance reasons by default @@ -107,7 +107,7 @@ public class NettyAcceptor implements Acceptor { private final BufferHandler handler; - private final ConnectionLifeCycleListener listener; + private final ServerConnectionLifeCycleListener listener; private final boolean sslEnabled; @@ -173,9 +173,11 @@ public class NettyAcceptor implements Acceptor { final ClusterConnection clusterConnection, final Map configuration, final BufferHandler handler, - final ConnectionLifeCycleListener listener, + final ServerConnectionLifeCycleListener listener, final ScheduledExecutorService scheduledThreadPool, final Map protocolMap) { + super(protocolMap); + this.name = name; this.clusterConnection = clusterConnection; @@ -604,7 +606,7 @@ public class NettyAcceptor implements Acceptor { ActiveMQServerChannelHandler(final ChannelGroup group, final BufferHandler handler, - final ConnectionLifeCycleListener listener) { + final ServerConnectionLifeCycleListener listener) { super(group, handler, listener); } @@ -618,7 +620,7 @@ public class NettyAcceptor implements Acceptor { NettyServerConnection nc = new NettyServerConnection(configuration, ctx.channel(), connectionListener, !httpEnabled && batchDelay > 0, directDeliver); - connectionListener.connectionCreated(NettyAcceptor.this, nc, protocol); + connectionListener.connectionCreated(NettyAcceptor.this, nc, protocolHandler.getProtocol(protocol)); SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); if (sslHandler != null) { @@ -648,12 +650,12 @@ public class NettyAcceptor implements Acceptor { } } - private class Listener implements ConnectionLifeCycleListener { + private class Listener implements ServerConnectionLifeCycleListener { @Override public void connectionCreated(final ActiveMQComponent component, final Connection connection, - final String protocol) { + final ProtocolManager protocol) { if (connections.putIfAbsent(connection.getID(), (NettyServerConnection) connection) != null) { throw ActiveMQMessageBundle.BUNDLE.connectionExists(connection.getID()); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptorFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptorFactory.java index 880522f764..5628a7f34f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptorFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptorFactory.java @@ -25,7 +25,7 @@ import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; -import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; +import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener; public class NettyAcceptorFactory implements AcceptorFactory { @@ -34,7 +34,7 @@ public class NettyAcceptorFactory implements AcceptorFactory { final ClusterConnection connection, final Map configuration, final BufferHandler handler, - final ConnectionLifeCycleListener listener, + final ServerConnectionLifeCycleListener listener, final Executor threadPool, final ScheduledExecutorService scheduledThreadPool, final Map protocolMap) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java index 31957c41b5..29e39d5bfe 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyServerConnection.java @@ -21,13 +21,13 @@ import java.util.Map; import io.netty.channel.Channel; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; -import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; +import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener; public class NettyServerConnection extends NettyConnection { public NettyServerConnection(Map configuration, Channel channel, - ConnectionLifeCycleListener listener, + ServerConnectionLifeCycleListener listener, boolean batchingEnabled, boolean directDeliver) { super(configuration, channel, listener, batchingEnabled, directDeliver); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java index 4b83657934..03fadd7589 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java @@ -65,12 +65,12 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.Connection; -import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; +import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.artemis.utils.ReusableLatch; -public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycleListener { +public class RemotingServiceImpl implements RemotingService, ServerConnectionLifeCycleListener { // Constants ----------------------------------------------------- private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled(); @@ -107,7 +107,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle private final ClusterManager clusterManager; - private final Map protocolMap = new ConcurrentHashMap(); + private final Map protocolMap = new ConcurrentHashMap(); private ActiveMQPrincipal defaultInvmSecurityPrincipal; @@ -144,7 +144,8 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle this.flushExecutor = flushExecutor; ActiveMQServerLogger.LOGGER.addingProtocolSupport(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory.getModuleName()); - this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory.createProtocolManager(server, coreProtocolManagerFactory.filterInterceptors(incomingInterceptors), coreProtocolManagerFactory.filterInterceptors(outgoingInterceptors))); +// this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory.createProtocolManager(server, coreProtocolManagerFactory.filterInterceptors(incomingInterceptors), coreProtocolManagerFactory.filterInterceptors(outgoingInterceptors))); + this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory); if (config.isResolveProtocols()) { resolveProtocols(server, this.getClass().getClassLoader()); @@ -157,9 +158,10 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle if (protocolManagerFactories != null) { for (ProtocolManagerFactory protocolManagerFactory : protocolManagerFactories) { String[] protocols = protocolManagerFactory.getProtocols(); - for (String protocol : protocols) { - ActiveMQServerLogger.LOGGER.addingProtocolSupport(protocol, protocolManagerFactory.getModuleName()); - protocolMap.put(protocol, protocolManagerFactory.createProtocolManager(server, incomingInterceptors, outgoingInterceptors)); + for (String protocolName : protocols) { + ActiveMQServerLogger.LOGGER.addingProtocolSupport(protocolName, protocolManagerFactory.getModuleName()); + // protocolMap.put(protocol, protocolManagerFactory.createProtocolManager(server, incomingInterceptors, outgoingInterceptors)); + protocolMap.put(protocolName, protocolManagerFactory); } } } @@ -172,7 +174,8 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle String[] protocols = next.getProtocols(); for (String protocol : protocols) { ActiveMQServerLogger.LOGGER.addingProtocolSupport(protocol, next.getModuleName()); - protocolMap.put(protocol, next.createProtocolManager(server, next.filterInterceptors(incomingInterceptors), next.filterInterceptors(outgoingInterceptors))); + // protocolMap.put(protocol, next.createProtocolManager(server, next.filterInterceptors(incomingInterceptors), next.filterInterceptors(outgoingInterceptors))); + protocolMap.put(protocol, next); } } } @@ -210,45 +213,35 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle try { AcceptorFactory factory = server.getServiceRegistry().getAcceptorFactory(info.getName(), info.getFactoryClassName()); - Map supportedProtocols = new ConcurrentHashMap(); + Map selectedProtocolFactories = new ConcurrentHashMap(); @SuppressWarnings("deprecation") String protocol = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME, null, info.getParams()); - if (protocol != null) { ActiveMQServerLogger.LOGGER.warnDeprecatedProtocol(); - ProtocolManager protocolManager = protocolMap.get(protocol); - - if (protocolManager == null) { - ActiveMQServerLogger.LOGGER.noProtocolManagerFound(protocol, info.toString()); - } - else { - supportedProtocols.put(protocol, protocolManager); - } + locateProtocols(protocol, info, selectedProtocolFactories); } String protocols = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOLS_PROP_NAME, null, info.getParams()); if (protocols != null) { - String[] actualProtocols = protocols.split(","); - - if (actualProtocols != null) { - for (String actualProtocol : actualProtocols) { - ProtocolManager protocolManager = protocolMap.get(actualProtocol); - - if (protocolManager == null) { - ActiveMQServerLogger.LOGGER.noProtocolManagerFound(actualProtocol, info.toString()); - } - else { - supportedProtocols.put(actualProtocol, protocolManager); - } - } - } + locateProtocols(protocols, info, selectedProtocolFactories); } ClusterConnection clusterConnection = lookupClusterConnection(info); - Acceptor acceptor = factory.createAcceptor(info.getName(), clusterConnection, info.getParams(), new DelegatingBufferHandler(), this, threadPool, scheduledThreadPool, supportedProtocols.isEmpty() ? protocolMap : supportedProtocols); + // If empty: we get the default list + if (selectedProtocolFactories.isEmpty()) { + selectedProtocolFactories = protocolMap; + } + + Map selectedProtocols = new ConcurrentHashMap(); + for (Map.Entry entry: selectedProtocolFactories.entrySet()) { + selectedProtocols.put(entry.getKey(), entry.getValue().createProtocolManager(server, info.getExtraParams(), incomingInterceptors, outgoingInterceptors)); + } + + + Acceptor acceptor = factory.createAcceptor(info.getName(), clusterConnection, info.getParams(), new DelegatingBufferHandler(), this, threadPool, scheduledThreadPool, selectedProtocols); if (defaultInvmSecurityPrincipal != null && acceptor.isUnsecurable()) { acceptor.setDefaultActiveMQPrincipal(defaultInvmSecurityPrincipal); @@ -280,6 +273,25 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle started = true; } + private void locateProtocols(String protocolList, + Object transportConfig, + Map protocolMap) { + String[] protocolsSplit = protocolList.split(","); + + if (protocolsSplit != null) { + for (String protocolItem : protocolsSplit) { + ProtocolManagerFactory protocolManagerFactory = protocolMap.get(protocolItem); + + if (protocolManagerFactory == null) { + ActiveMQServerLogger.LOGGER.noProtocolManagerFound(protocolItem, transportConfig.toString()); + } + else { + protocolMap.put(protocolItem, protocolManagerFactory); + } + } + } + } + @Override public synchronized void startAcceptors() throws Exception { if (isStarted()) { @@ -469,25 +481,19 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle // ConnectionLifeCycleListener implementation ----------------------------------- - private ProtocolManager getProtocolManager(String protocol) { + private ProtocolManagerFactory getProtocolManager(String protocol) { return protocolMap.get(protocol); } @Override public void connectionCreated(final ActiveMQComponent component, final Connection connection, - final String protocol) { + final ProtocolManager protocol) { if (server == null) { throw new IllegalStateException("Unable to create connection, server hasn't finished starting up"); } - ProtocolManager pmgr = this.getProtocolManager(protocol.toString()); - - if (pmgr == null) { - throw ActiveMQMessageBundle.BUNDLE.unknownProtocol(protocol); - } - - ConnectionEntry entry = pmgr.createConnectionEntry((Acceptor) component, connection); + ConnectionEntry entry = protocol.createConnectionEntry((Acceptor) component, connection); if (isTrace) { ActiveMQServerLogger.LOGGER.trace("Connection created " + connection); @@ -720,10 +726,9 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle } protected void updateProtocols() { - for (ProtocolManager protocolManager : this.protocolMap.values()) { - protocolManager.updateInterceptors(incomingInterceptors, outgoingInterceptors); + for (Acceptor acceptor : this.acceptors.values()) { + acceptor.updateInterceptors(incomingInterceptors, outgoingInterceptors); } - } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/MessageLoadBalancingType.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/MessageLoadBalancingType.java index d9a8688787..cb48b49dda 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/MessageLoadBalancingType.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/MessageLoadBalancingType.java @@ -16,7 +16,7 @@ */ package org.apache.activemq.artemis.core.server.cluster.impl; -import org.apache.activemq.artemis.utils.uri.URISchema; +import org.apache.activemq.artemis.utils.uri.BeanSupport; import org.apache.commons.beanutils.Converter; public enum MessageLoadBalancingType { @@ -24,7 +24,7 @@ public enum MessageLoadBalancingType { static { // for URI support on ClusterConnection - URISchema.registerConverter(new MessageLoadBalancingTypeConverter(), MessageLoadBalancingType.class); + BeanSupport.registerConverter(new MessageLoadBalancingTypeConverter(), MessageLoadBalancingType.class); } static class MessageLoadBalancingTypeConverter implements Converter { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManagerFactory.java index 6c4eb8bfa1..6a61065504 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManagerFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManagerFactory.java @@ -34,7 +34,7 @@ public abstract class AbstractProtocolManagerFactory

* @param listIn * @return */ - protected List

filterInterceptors(Class

type, List listIn) { + protected List

internalFilterInterceptors(Class

type, List listIn) { if (listIn == null) { return Collections.emptyList(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java index e7c1d0700c..d3b1b2e5d1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.spi.core.protocol; import java.util.List; +import java.util.Map; import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -34,8 +35,9 @@ public interface ProtocolManagerFactory

{ * @return */ ProtocolManager createProtocolManager(ActiveMQServer server, - List

incomingInterceptors, - List

outgoingInterceptors); + Map parameters, + List incomingInterceptors, + List outgoingInterceptors) throws Exception; /** * This should get the entire list and only return the ones this factory can deal with * diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java index dbd261801b..b4c195282a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Acceptor.java @@ -16,8 +16,10 @@ */ package org.apache.activemq.artemis.spi.core.remoting; +import java.util.List; import java.util.Map; +import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; @@ -38,6 +40,11 @@ public interface Acceptor extends ActiveMQComponent { */ void pause(); + /** + * This will update the list of interceptors for each ProtocolManager inside the acceptor. + * */ + void updateInterceptors(List incomingInterceptors, List outgoingInterceptors); + /** * @return the cluster connection associated with this Acceptor */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/AcceptorFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/AcceptorFactory.java index 1580d18870..4390d4f0c5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/AcceptorFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/AcceptorFactory.java @@ -46,7 +46,7 @@ public interface AcceptorFactory { ClusterConnection clusterConnection, Map configuration, BufferHandler handler, - ConnectionLifeCycleListener listener, + ServerConnectionLifeCycleListener listener, Executor threadPool, ScheduledExecutorService scheduledThreadPool, Map protocolMap); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ServerConnectionLifeCycleListener.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ServerConnectionLifeCycleListener.java index 1d8d5b0781..7e3c8fdb04 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ServerConnectionLifeCycleListener.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ServerConnectionLifeCycleListener.java @@ -16,15 +16,7 @@ */ package org.apache.activemq.artemis.spi.core.remoting; -public interface ServerConnectionLifeCycleListener extends ConnectionLifeCycleListener { +import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; - /** - * This method is used both by client connector creation and server connection creation through acceptors. - * the acceptor will be set to null on client operations - * - * @param acceptor The acceptor here will be always null on a client connection created event. - * @param connection the connection that has been created - * @param protocol the protocol to use - */ - void connectionCreated(Acceptor acceptor, Connection connection, String protocol); +public interface ServerConnectionLifeCycleListener extends BaseConnectionLifeCycleListener { } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/uri/schemas/clusterConnection/ClusterConnectionMulticastSchema.java b/artemis-server/src/main/java/org/apache/activemq/artemis/uri/schemas/clusterConnection/ClusterConnectionMulticastSchema.java index f7769240c6..b465be43a2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/uri/schemas/clusterConnection/ClusterConnectionMulticastSchema.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/uri/schemas/clusterConnection/ClusterConnectionMulticastSchema.java @@ -21,6 +21,7 @@ import java.net.URI; import java.util.Map; import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; +import org.apache.activemq.artemis.utils.uri.BeanSupport; import org.apache.activemq.artemis.utils.uri.URISupport; public class ClusterConnectionMulticastSchema extends ClusterConnectionStaticSchema { @@ -40,7 +41,7 @@ public class ClusterConnectionMulticastSchema extends ClusterConnectionStaticSch else { bean.setDiscoveryGroupName(uri.getHost()); Map parameters = URISupport.parseParameters(uri); - setData(uri, bean, parameters); + BeanSupport.setData(uri, bean, parameters); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/uri/schemas/clusterConnection/ClusterConnectionStaticSchema.java b/artemis-server/src/main/java/org/apache/activemq/artemis/uri/schemas/clusterConnection/ClusterConnectionStaticSchema.java index f81aa580f9..0f62d264a8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/uri/schemas/clusterConnection/ClusterConnectionStaticSchema.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/uri/schemas/clusterConnection/ClusterConnectionStaticSchema.java @@ -21,6 +21,7 @@ import java.net.URI; import java.util.Map; import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; +import org.apache.activemq.artemis.utils.uri.BeanSupport; import org.apache.activemq.artemis.utils.uri.URISchema; import org.apache.activemq.artemis.utils.uri.URISupport; @@ -50,7 +51,7 @@ public class ClusterConnectionStaticSchema extends URISchema configs = parser.newObject(new URI("tcp://localhost:8080?tcpSendBufferSize=1048576&tcpReceiveBufferSize=1048576&protocols=openwire&banana=x"), "test"); + + for (TransportConfiguration config : configs) { + System.out.println("config:" + config); + Assert.assertTrue(config.getExtraParams().get("banana").equals("x")); + } + } +} diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/uri/ClusterConnectionConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/uri/ClusterConnectionConfigurationTest.java index 771d89cd3f..3df2ecfd74 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/uri/ClusterConnectionConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/uri/ClusterConnectionConfigurationTest.java @@ -41,7 +41,6 @@ public class ClusterConnectionConfigurationTest { ClusterConnectionConfigurationParser parser = new ClusterConnectionConfigurationParser(); ClusterConnectionConfiguration configuration = parser.newObject(new URI("static://(tcp://localhost:6556,tcp://localhost:6557)?minLargeMessageSize=132;messageLoadBalancingType=OFF"), null); Assert.assertEquals(132, configuration.getMinLargeMessageSize()); - Assert.assertEquals(MessageLoadBalancingType.OFF, configuration.getMessageLoadBalancingType()); Assert.assertEquals(2, configuration.getCompositeMembers().getComponents().length); Assert.assertEquals("tcp://localhost:6556", configuration.getCompositeMembers().getComponents()[0].toString()); Assert.assertEquals("tcp://localhost:6557", configuration.getCompositeMembers().getComponents()[1].toString()); diff --git a/examples/protocols/amqp/queue/pom.xml b/examples/protocols/amqp/queue/pom.xml index d4691f3815..5666e451df 100644 --- a/examples/protocols/amqp/queue/pom.xml +++ b/examples/protocols/amqp/queue/pom.xml @@ -39,7 +39,7 @@ under the License. org.apache.qpid qpid-jms-client - 0.5.0 + 0.7.0 diff --git a/pom.xml b/pom.xml index 353ce17d49..3c5b93d3d3 100644 --- a/pom.xml +++ b/pom.xml @@ -81,7 +81,7 @@ ${project.version}(${activemq.version.incrementingVersion}) 3.0.14.Final - 0.10 + 0.12.0 5.12.0 1.10 10.11.1.1 diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml index a444965edc..6c38b7c044 100644 --- a/tests/integration-tests/pom.xml +++ b/tests/integration-tests/pom.xml @@ -188,7 +188,7 @@ org.apache.qpid qpid-jms-client - 0.5.0 + 0.7.0 org.apache.qpid diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java index ac2ff3c78b..260fcfeaad 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java @@ -26,10 +26,11 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory; import org.apache.activemq.artemis.core.server.ActiveMQComponent; +import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.Connection; -import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; +import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.junit.Assert; import org.junit.Test; @@ -48,7 +49,7 @@ public class NettyAcceptorFactoryTest extends ActiveMQTestBase { } }; - ConnectionLifeCycleListener listener = new ConnectionLifeCycleListener() { + ServerConnectionLifeCycleListener listener = new ServerConnectionLifeCycleListener() { @Override public void connectionException(final Object connectionID, final ActiveMQException me) { @@ -61,7 +62,7 @@ public class NettyAcceptorFactoryTest extends ActiveMQTestBase { @Override public void connectionCreated(ActiveMQComponent component, final Connection connection, - final String protocol) { + final ProtocolManager protocol) { } @Override diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java index f0ea5db1a5..3b1b0253ba 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java @@ -25,13 +25,14 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; +import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.Connection; -import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -72,7 +73,7 @@ public class NettyAcceptorTest extends ActiveMQTestBase { }; Map params = new HashMap<>(); - ConnectionLifeCycleListener listener = new ConnectionLifeCycleListener() { + ServerConnectionLifeCycleListener listener = new ServerConnectionLifeCycleListener() { @Override public void connectionException(final Object connectionID, final ActiveMQException me) { @@ -85,7 +86,7 @@ public class NettyAcceptorTest extends ActiveMQTestBase { @Override public void connectionCreated(final ActiveMQComponent component, final Connection connection, - final String protocol) { + final ProtocolManager protocol) { } @Override diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/server/impl/fake/FakeAcceptorFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/server/impl/fake/FakeAcceptorFactory.java index 838c829aa6..1c15c692d3 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/server/impl/fake/FakeAcceptorFactory.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/server/impl/fake/FakeAcceptorFactory.java @@ -16,10 +16,12 @@ */ package org.apache.activemq.artemis.tests.unit.core.remoting.server.impl.fake; +import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.management.NotificationService; @@ -27,7 +29,7 @@ import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; -import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; +import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener; public class FakeAcceptorFactory implements AcceptorFactory { @@ -38,7 +40,7 @@ public class FakeAcceptorFactory implements AcceptorFactory { ClusterConnection clusterConnection, Map configuration, BufferHandler handler, - ConnectionLifeCycleListener listener, + ServerConnectionLifeCycleListener listener, Executor threadPool, ScheduledExecutorService scheduledThreadPool, Map protocolMap) { @@ -57,6 +59,11 @@ public class FakeAcceptorFactory implements AcceptorFactory { } + @Override + public void updateInterceptors(List incomingInterceptors, + List outgoingInterceptors) { + } + @Override public ClusterConnection getClusterConnection() { return null;