This closes #393

This commit is contained in:
jbertram 2016-02-18 10:14:27 -06:00
commit 42b20ebf51
60 changed files with 687 additions and 406 deletions

View File

@ -0,0 +1,148 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.uri;
import java.beans.PropertyDescriptor;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.beanutils.Converter;
public class BeanSupport {
private static final BeanUtilsBean beanUtils = new BeanUtilsBean();
static {
// This is to customize the BeanUtils to use Fluent Proeprties as well
beanUtils.getPropertyUtils().addBeanIntrospector(new FluentPropertyBeanIntrospectorWithIgnores());
}
public static void registerConverter(Converter converter, Class type) {
synchronized (beanUtils) {
beanUtils.getConvertUtils().register(converter, type);
}
}
public static <P> P copyData(P source, P target) throws Exception {
synchronized (beanUtils) {
beanUtils.copyProperties(source, target);
}
return target;
}
public static <P> P setData(URI uri, P obj, Map<String, String> query) throws Exception {
synchronized (beanUtils) {
beanUtils.setProperty(obj, "host", uri.getHost());
beanUtils.setProperty(obj, "port", uri.getPort());
beanUtils.setProperty(obj, "userInfo", uri.getUserInfo());
beanUtils.populate(obj, query);
}
return obj;
}
public static <P> P setData( P obj, Map<String, Object> data) throws Exception {
synchronized (beanUtils) {
beanUtils.populate(obj, data);
}
return obj;
}
public static void setData(URI uri,
HashMap<String, Object> properties,
Set<String> allowableProperties,
Map<String, String> query,
Map<String, Object> extraProps) {
if (allowableProperties.contains("host")) {
properties.put("host", "" + uri.getHost());
}
if (allowableProperties.contains("port")) {
properties.put("port", "" + uri.getPort());
}
if (allowableProperties.contains("userInfo")) {
properties.put("userInfo", "" + uri.getUserInfo());
}
for (Map.Entry<String, String> entry : query.entrySet()) {
if (allowableProperties.contains(entry.getKey())) {
properties.put(entry.getKey(), entry.getValue());
}
else {
extraProps.put(entry.getKey(), entry.getValue());
}
}
}
public static String getData(List<String> ignored, Object... beans) throws Exception {
StringBuilder sb = new StringBuilder();
boolean empty = true;
synchronized (beanUtils) {
for (Object bean : beans) {
if (bean != null) {
PropertyDescriptor[] descriptors = beanUtils.getPropertyUtils().getPropertyDescriptors(bean);
for (PropertyDescriptor descriptor : descriptors) {
if (descriptor.getReadMethod() != null && isWriteable(descriptor, ignored)) {
String value = beanUtils.getProperty(bean, descriptor.getName());
if (value != null) {
if (!empty) {
sb.append("&");
}
empty = false;
sb.append(descriptor.getName()).append("=").append(encodeURI(value));
}
}
}
}
}
}
return sb.toString();
}
private static boolean isWriteable(PropertyDescriptor descriptor, List<String> ignored) {
if (ignored != null && ignored.contains(descriptor.getName())) {
return false;
}
Class<?> type = descriptor.getPropertyType();
return (type == Double.class) ||
(type == double.class) ||
(type == Long.class) ||
(type == long.class) ||
(type == Integer.class) ||
(type == int.class) ||
(type == Float.class) ||
(type == float.class) ||
(type == Boolean.class) ||
(type == boolean.class) ||
(type == String.class);
}
public static String decodeURI(String value) throws UnsupportedEncodingException {
return URLDecoder.decode(value, "UTF-8");
}
public static String encodeURI(String value) throws UnsupportedEncodingException {
return URLEncoder.encode(value, "UTF-8");
}
}

View File

@ -16,19 +16,11 @@
*/ */
package org.apache.activemq.artemis.utils.uri; package org.apache.activemq.artemis.utils.uri;
import java.beans.PropertyDescriptor;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.beanutils.Converter;
public abstract class URISchema<T, P> { public abstract class URISchema<T, P> {
@ -39,7 +31,7 @@ public abstract class URISchema<T, P> {
} }
public void populateObject(URI uri, T bean) throws Exception { public void populateObject(URI uri, T bean) throws Exception {
setData(uri, bean, parseQuery(uri.getQuery(), null)); BeanSupport.setData(uri, bean, parseQuery(uri.getQuery(), null));
} }
public URI newURI(T bean) throws Exception { public URI newURI(T bean) throws Exception {
@ -97,38 +89,17 @@ public abstract class URISchema<T, P> {
protected abstract T internalNewObject(URI uri, Map<String, String> query, P param) throws Exception; protected abstract T internalNewObject(URI uri, Map<String, String> query, P param) throws Exception;
/** This is the default implementation. /**
* Sub classes are should provide a proper implementation for their schemas. */ * This is the default implementation.
* Sub classes are should provide a proper implementation for their schemas.
*/
protected URI internalNewURI(T bean) throws Exception { protected URI internalNewURI(T bean) throws Exception {
String query = URISchema.getData(null, bean); String query = BeanSupport.getData(null, bean);
return new URI(getSchemaName(), return new URI(getSchemaName(), null, "//", query, null);
null,
"//", query, null);
} }
private static final BeanUtilsBean beanUtils = new BeanUtilsBean();
public static void registerConverter(Converter converter, Class type) {
synchronized (beanUtils) {
beanUtils.getConvertUtils().register(converter, type);
}
}
public static String decodeURI(String value) throws UnsupportedEncodingException {
return URLDecoder.decode(value, "UTF-8");
}
public static String encodeURI(String value) throws UnsupportedEncodingException {
return URLEncoder.encode(value, "UTF-8");
}
static {
// This is to customize the BeanUtils to use Fluent Proeprties as well
beanUtils.getPropertyUtils().addBeanIntrospector(new FluentPropertyBeanIntrospectorWithIgnores());
}
public static Map<String, String> parseQuery(String uri, public static Map<String, String> parseQuery(String uri,
Map<String, String> propertyOverrides) throws URISyntaxException { Map<String, String> propertyOverrides) throws URISyntaxException {
try { try {
@ -138,8 +109,8 @@ public abstract class URISchema<T, P> {
for (int i = 0; i < parameters.length; i++) { for (int i = 0; i < parameters.length; i++) {
int p = parameters[i].indexOf("="); int p = parameters[i].indexOf("=");
if (p >= 0) { if (p >= 0) {
String name = decodeURI(parameters[i].substring(0, p)); String name = BeanSupport.decodeURI(parameters[i].substring(0, p));
String value = decodeURI(parameters[i].substring(p + 1)); String value = BeanSupport.decodeURI(parameters[i].substring(p + 1));
rc.put(name, value); rc.put(name, value);
} }
else { else {
@ -171,84 +142,4 @@ public abstract class URISchema<T, P> {
return buffer.toString(); return buffer.toString();
} }
protected static <P> P copyData(P source, P target) throws Exception {
synchronized (beanUtils) {
beanUtils.copyProperties(source, target);
}
return target;
}
protected static <P> P setData(URI uri, P obj, Map<String, String> query) throws Exception {
synchronized (beanUtils) {
beanUtils.setProperty(obj, "host", uri.getHost());
beanUtils.setProperty(obj, "port", uri.getPort());
beanUtils.setProperty(obj, "userInfo", uri.getUserInfo());
beanUtils.populate(obj, query);
}
return obj;
}
public static void setData(URI uri,
HashMap<String, Object> properties,
Set<String> allowableProperties,
Map<String, String> query) {
if (allowableProperties.contains("host")) {
properties.put("host", "" + uri.getHost());
}
if (allowableProperties.contains("port")) {
properties.put("port", "" + uri.getPort());
}
if (allowableProperties.contains("userInfo")) {
properties.put("userInfo", "" + uri.getUserInfo());
}
for (Map.Entry<String, String> entry : query.entrySet()) {
if (allowableProperties.contains(entry.getKey())) {
properties.put(entry.getKey(), entry.getValue());
}
}
}
public static String getData(List<String> ignored, Object... beans) throws Exception {
StringBuilder sb = new StringBuilder();
boolean empty = true;
synchronized (beanUtils) {
for (Object bean : beans) {
if (bean != null) {
PropertyDescriptor[] descriptors = beanUtils.getPropertyUtils().getPropertyDescriptors(bean);
for (PropertyDescriptor descriptor : descriptors) {
if (descriptor.getReadMethod() != null && isWriteable(descriptor, ignored)) {
String value = beanUtils.getProperty(bean, descriptor.getName());
if (value != null) {
if (!empty) {
sb.append("&");
}
empty = false;
sb.append(descriptor.getName()).append("=").append(encodeURI(value));
}
}
}
}
}
}
return sb.toString();
}
private static boolean isWriteable(PropertyDescriptor descriptor, List<String> ignored) {
if (ignored != null && ignored.contains(descriptor.getName())) {
return false;
}
Class<?> type = descriptor.getPropertyType();
return (type == Double.class) ||
(type == double.class) ||
(type == Long.class) ||
(type == long.class) ||
(type == Integer.class) ||
(type == int.class) ||
(type == Float.class) ||
(type == float.class) ||
(type == Boolean.class) ||
(type == boolean.class) ||
(type == String.class);
}
} }

View File

@ -18,10 +18,13 @@
package org.apache.activemq.artemis.utils; package org.apache.activemq.artemis.utils;
import java.net.URI; import java.net.URI;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.activemq.artemis.utils.uri.BeanSupport;
import org.apache.activemq.artemis.utils.uri.URIFactory; import org.apache.activemq.artemis.utils.uri.URIFactory;
import org.apache.activemq.artemis.utils.uri.URISchema; import org.apache.activemq.artemis.utils.uri.URISchema;
import org.apache.activemq.artemis.utils.uri.URISupport;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -99,6 +102,30 @@ public class URIParserTest {
Assert.assertEquals("something", fruit.getFluentName()); Assert.assertEquals("something", fruit.getFluentName());
} }
@Test
public void testQueryConversion() throws Exception {
Map<String, String> query = new HashMap<String, String>();
String queryString = URISupport.createQueryString(query);
System.out.println("queryString1: " + queryString);
Assert.assertTrue(queryString.isEmpty());
query.put("key1", "value1");
queryString = URISupport.createQueryString(query);
System.out.println("queryString2: " + queryString);
Assert.assertEquals("key1=value1", queryString);
query.put("key2", "value2");
queryString = URISupport.createQueryString(query);
System.out.println("queryString3: " + queryString);
Assert.assertEquals("key1=value1&key2=value2", queryString);
query.put("key3", "value3");
queryString = URISupport.createQueryString(query);
System.out.println("queryString4: " + queryString);
Assert.assertEquals("key1=value1&key2=value2&key3=value3", queryString);
}
class FruitParser extends URIFactory<FruitBase, String> { class FruitParser extends URIFactory<FruitBase, String> {
FruitParser() { FruitParser() {
@ -116,7 +143,7 @@ public class URIParserTest {
@Override @Override
public FruitBase internalNewObject(URI uri, Map<String, String> query, String fruitName) throws Exception { public FruitBase internalNewObject(URI uri, Map<String, String> query, String fruitName) throws Exception {
return setData(uri, new Fruit(getSchemaName()), query); return BeanSupport.setData(uri, new Fruit(getSchemaName()), query);
} }
} }
@ -130,7 +157,7 @@ public class URIParserTest {
@Override @Override
public FruitBase internalNewObject(URI uri, Map<String, String> query, String fruitName) throws Exception { public FruitBase internalNewObject(URI uri, Map<String, String> query, String fruitName) throws Exception {
return setData(uri, new FruitBase(getSchemaName()), query); return BeanSupport.setData(uri, new FruitBase(getSchemaName()), query);
} }
} }

View File

@ -51,6 +51,8 @@ public class TransportConfiguration implements Serializable {
private Map<String, Object> params; private Map<String, Object> params;
private Map<String, Object> extraProps;
private static final byte TYPE_BOOLEAN = 0; private static final byte TYPE_BOOLEAN = 0;
private static final byte TYPE_INT = 1; private static final byte TYPE_INT = 1;
@ -93,6 +95,19 @@ public class TransportConfiguration implements Serializable {
* @param name The name of this TransportConfiguration * @param name The name of this TransportConfiguration
*/ */
public TransportConfiguration(final String className, final Map<String, Object> params, final String name) { public TransportConfiguration(final String className, final Map<String, Object> params, final String name) {
this(className, params, name, null);
}
/**
* Creates a TransportConfiguration with a specific name providing the class name of the {@link org.apache.activemq.artemis.spi.core.remoting.ConnectorFactory}
* and any parameters needed.
*
* @param className The class name of the ConnectorFactory
* @param params The parameters needed by the ConnectorFactory
* @param name The name of this TransportConfiguration
* @param extraProps The extra properties that specific to protocols
*/
public TransportConfiguration(final String className, final Map<String, Object> params, final String name, final Map<String, Object> extraProps) {
factoryClassName = className; factoryClassName = className;
if (params == null || params.isEmpty()) { if (params == null || params.isEmpty()) {
@ -103,6 +118,7 @@ public class TransportConfiguration implements Serializable {
} }
this.name = name; this.name = name;
this.extraProps = extraProps;
} }
public TransportConfiguration newTransportConfig(String newName) { public TransportConfiguration newTransportConfig(String newName) {
@ -156,6 +172,9 @@ public class TransportConfiguration implements Serializable {
return params; return params;
} }
public Map<String, Object> getExtraParams() {
return extraProps;
}
@Override @Override
public int hashCode() { public int hashCode() {
@ -249,25 +268,26 @@ public class TransportConfiguration implements Serializable {
first = false; first = false;
} }
if (extraProps != null) {
for (Map.Entry<String, Object> entry : extraProps.entrySet()) {
if (!first) {
str.append("&");
}
String key = entry.getKey();
String val = entry.getValue() == null ? "null" : entry.getValue().toString();
str.append(replaceWildcardChars(key)).append('=').append(replaceWildcardChars(val));
first = false;
}
}
} }
return str.toString(); return str.toString();
} }
/** private void encodeMap(final ActiveMQBuffer buffer, final Map<String, Object> map) {
* Encodes this TransportConfiguration into a buffer. for (Map.Entry<String, Object> entry : map.entrySet()) {
* <p>
* Note that this is only used internally ActiveMQ Artemis.
*
* @param buffer the buffer to encode into
*/
public void encode(final ActiveMQBuffer buffer) {
buffer.writeString(name);
buffer.writeString(factoryClassName);
buffer.writeInt(params == null ? 0 : params.size());
if (params != null) {
for (Map.Entry<String, Object> entry : params.entrySet()) {
buffer.writeString(entry.getKey()); buffer.writeString(entry.getKey());
Object val = entry.getValue(); Object val = entry.getValue();
@ -293,6 +313,26 @@ public class TransportConfiguration implements Serializable {
} }
} }
} }
/**
* Encodes this TransportConfiguration into a buffer.
* <p>
* Note that this is only used internally ActiveMQ Artemis.
*
* @param buffer the buffer to encode into
*/
public void encode(final ActiveMQBuffer buffer) {
buffer.writeString(name);
buffer.writeString(factoryClassName);
buffer.writeInt(params == null ? 0 : params.size());
if (params != null) {
encodeMap(buffer, params);
}
if (extraProps != null) {
encodeMap(buffer, extraProps);
}
} }
/** /**

View File

@ -53,9 +53,9 @@ import org.apache.activemq.artemis.core.remoting.impl.TransportConfigurationUtil
import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.Connector; import org.apache.activemq.artemis.spi.core.remoting.Connector;
import org.apache.activemq.artemis.spi.core.remoting.ConnectorFactory; import org.apache.activemq.artemis.spi.core.remoting.ConnectorFactory;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext; import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
@ -67,7 +67,7 @@ import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.UUIDGenerator;
public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ConnectionLifeCycleListener { public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {
// Constants // Constants
// ------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------
@ -350,7 +350,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
@Override @Override
public void connectionCreated(final ActiveMQComponent component, public void connectionCreated(final ActiveMQComponent component,
final Connection connection, final Connection connection,
final String protocol) { final ClientProtocolManager protocol) {
} }
@Override @Override

View File

@ -25,8 +25,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
/** /**
* Common handler implementation for client and server side handler. * Common handler implementation for client and server side handler.
@ -37,13 +37,13 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler {
private final BufferHandler handler; private final BufferHandler handler;
private final ConnectionLifeCycleListener listener; private final BaseConnectionLifeCycleListener listener;
volatile boolean active; volatile boolean active;
protected ActiveMQChannelHandler(final ChannelGroup group, protected ActiveMQChannelHandler(final ChannelGroup group,
final BufferHandler handler, final BufferHandler handler,
final ConnectionLifeCycleListener listener) { final BaseConnectionLifeCycleListener listener) {
this.group = group; this.group = group;
this.handler = handler; this.handler = handler;
this.listener = listener; this.listener = listener;

View File

@ -37,8 +37,8 @@ import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.IPV6Util; import org.apache.activemq.artemis.utils.IPV6Util;
@ -53,7 +53,7 @@ public class NettyConnection implements Connection {
private boolean closed; private boolean closed;
private final ConnectionLifeCycleListener listener; private final BaseConnectionLifeCycleListener listener;
private final boolean batchingEnabled; private final boolean batchingEnabled;
@ -79,7 +79,7 @@ public class NettyConnection implements Connection {
public NettyConnection(final Map<String, Object> configuration, public NettyConnection(final Map<String, Object> configuration,
final Channel channel, final Channel channel,
final ConnectionLifeCycleListener listener, final BaseConnectionLifeCycleListener listener,
boolean batchingEnabled, boolean batchingEnabled,
boolean directDeliver) { boolean directDeliver) {
this.configuration = configuration; this.configuration = configuration;

View File

@ -94,10 +94,11 @@ import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtoco
import org.apache.activemq.artemis.core.remoting.impl.ssl.SSLSupport; import org.apache.activemq.artemis.core.remoting.impl.ssl.SSLSupport;
import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.spi.core.remoting.AbstractConnector; import org.apache.activemq.artemis.spi.core.remoting.AbstractConnector;
import org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.FutureLatch; import org.apache.activemq.artemis.utils.FutureLatch;
@ -151,7 +152,7 @@ public class NettyConnector extends AbstractConnector {
private final BufferHandler handler; private final BufferHandler handler;
private final ConnectionLifeCycleListener listener; private final BaseConnectionLifeCycleListener listener;
private boolean sslEnabled = TransportConstants.DEFAULT_SSL_ENABLED; private boolean sslEnabled = TransportConstants.DEFAULT_SSL_ENABLED;
@ -231,7 +232,7 @@ public class NettyConnector extends AbstractConnector {
// Public -------------------------------------------------------- // Public --------------------------------------------------------
public NettyConnector(final Map<String, Object> configuration, public NettyConnector(final Map<String, Object> configuration,
final BufferHandler handler, final BufferHandler handler,
final ConnectionLifeCycleListener listener, final BaseConnectionLifeCycleListener listener,
final Executor closeExecutor, final Executor closeExecutor,
final Executor threadPool, final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool) { final ScheduledExecutorService scheduledThreadPool) {
@ -240,7 +241,7 @@ public class NettyConnector extends AbstractConnector {
public NettyConnector(final Map<String, Object> configuration, public NettyConnector(final Map<String, Object> configuration,
final BufferHandler handler, final BufferHandler handler,
final ConnectionLifeCycleListener listener, final BaseConnectionLifeCycleListener listener,
final Executor closeExecutor, final Executor closeExecutor,
final Executor threadPool, final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool, final ScheduledExecutorService scheduledThreadPool,
@ -681,7 +682,7 @@ public class NettyConnector extends AbstractConnector {
// No acceptor on a client connection // No acceptor on a client connection
Listener connectionListener = new Listener(); Listener connectionListener = new Listener();
NettyConnection conn = new NettyConnection(configuration, ch, connectionListener, !httpEnabled && batchDelay > 0, false); NettyConnection conn = new NettyConnection(configuration, ch, connectionListener, !httpEnabled && batchDelay > 0, false);
connectionListener.connectionCreated(null, conn, protocolManager.getName()); connectionListener.connectionCreated(null, conn, protocolManager);
return conn; return conn;
} }
else { else {
@ -709,7 +710,7 @@ public class NettyConnector extends AbstractConnector {
ActiveMQClientChannelHandler(final ChannelGroup group, ActiveMQClientChannelHandler(final ChannelGroup group,
final BufferHandler handler, final BufferHandler handler,
final ConnectionLifeCycleListener listener) { final ClientConnectionLifeCycleListener listener) {
super(group, handler, listener); super(group, handler, listener);
} }
} }
@ -899,12 +900,12 @@ public class NettyConnector extends AbstractConnector {
} }
} }
private class Listener implements ConnectionLifeCycleListener { private class Listener implements ClientConnectionLifeCycleListener {
@Override @Override
public void connectionCreated(final ActiveMQComponent component, public void connectionCreated(final ActiveMQComponent component,
final Connection connection, final Connection connection,
final String protocol) { final ClientProtocolManager protocol) {
if (connections.putIfAbsent(connection.getID(), connection) != null) { if (connections.putIfAbsent(connection.getID(), connection) != null) {
throw ActiveMQClientMessageBundle.BUNDLE.connectionExists(connection.getID()); throw ActiveMQClientMessageBundle.BUNDLE.connectionExists(connection.getID());
} }

View File

@ -21,8 +21,8 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.Connector; import org.apache.activemq.artemis.spi.core.remoting.Connector;
import org.apache.activemq.artemis.spi.core.remoting.ConnectorFactory; import org.apache.activemq.artemis.spi.core.remoting.ConnectorFactory;
@ -31,7 +31,7 @@ public class NettyConnectorFactory implements ConnectorFactory {
@Override @Override
public Connector createConnector(final Map<String, Object> configuration, public Connector createConnector(final Map<String, Object> configuration,
final BufferHandler handler, final BufferHandler handler,
final ConnectionLifeCycleListener listener, final ClientConnectionLifeCycleListener listener,
final Executor closeExecutor, final Executor closeExecutor,
final Executor threadPool, final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool, final ScheduledExecutorService scheduledThreadPool,

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.spi.core.remoting;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
/**
* A ConnectionLifeCycleListener is called by the remoting implementation to notify of connection events.
*/
public interface BaseConnectionLifeCycleListener<ProtocolClass> {
/**
* This method is used both by client connector creation and server connection creation through
* acceptors. On the client side the {@code component} parameter is normally passed as
* {@code null}.
* <p>
* Leaving this method here and adding a different one at
* {@code ServerConnectionLifeCycleListener} is a compromise for a reasonable split between the
* activemq-server and activemq-client packages while avoiding to pull too much into activemq-core.
* The pivotal point keeping us from removing the method is {@link ConnectorFactory} and the
* usage of it.
*
* @param component This will probably be an {@code Acceptor} and only used on the server side.
* @param connection the connection that has been created
* @param protocol the messaging protocol type this connection uses
*/
void connectionCreated(ActiveMQComponent component, Connection connection, ProtocolClass protocol);
/**
* Called when a connection is destroyed.
*
* @param connectionID the connection being destroyed.
*/
void connectionDestroyed(Object connectionID);
/**
* Called when an error occurs on the connection.
*
* @param connectionID the id of the connection.
* @param me the exception.
*/
void connectionException(Object connectionID, ActiveMQException me);
void connectionReadyForWrites(Object connectionID, boolean ready);
}

View File

@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.spi.core.remoting;
public interface ClientConnectionLifeCycleListener extends BaseConnectionLifeCycleListener<ClientProtocolManager> {
}

View File

@ -16,45 +16,11 @@
*/ */
package org.apache.activemq.artemis.spi.core.remoting; package org.apache.activemq.artemis.spi.core.remoting;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
/** /**
* A ConnectionLifeCycleListener is called by the remoting implementation to notify of connection events. * A ConnectionLifeCycleListener is called by the remoting implementation to notify of connection events.
* @deprecated use {@link ClientConnectionLifeCycleListener} instead.
*/ */
public interface ConnectionLifeCycleListener { @Deprecated
public interface ConnectionLifeCycleListener extends BaseConnectionLifeCycleListener<String> {
/**
* This method is used both by client connector creation and server connection creation through
* acceptors. On the client side the {@code component} parameter is normally passed as
* {@code null}.
* <p>
* Leaving this method here and adding a different one at
* {@code ServerConnectionLifeCycleListener} is a compromise for a reasonable split between the
* activemq-server and activemq-client packages while avoiding to pull too much into activemq-core.
* The pivotal point keeping us from removing the method is {@link ConnectorFactory} and the
* usage of it.
*
* @param component This will probably be an {@code Acceptor} and only used on the server side.
* @param connection the connection that has been created
* @param protocol the messaging protocol type this connection uses
*/
void connectionCreated(ActiveMQComponent component, Connection connection, String protocol);
/**
* Called when a connection is destroyed.
*
* @param connectionID the connection being destroyed.
*/
void connectionDestroyed(Object connectionID);
/**
* Called when an error occurs on the connection.
*
* @param connectionID the id of the connection.
* @param me the exception.
*/
void connectionException(Object connectionID, ActiveMQException me);
void connectionReadyForWrites(Object connectionID, boolean ready);
} }

View File

@ -42,7 +42,7 @@ public interface ConnectorFactory extends TransportConfigurationHelper {
*/ */
Connector createConnector(Map<String, Object> configuration, Connector createConnector(Map<String, Object> configuration,
BufferHandler handler, BufferHandler handler,
ConnectionLifeCycleListener listener, ClientConnectionLifeCycleListener listener,
Executor closeExecutor, Executor closeExecutor,
Executor threadPool, Executor threadPool,
ScheduledExecutorService scheduledThreadPool, ScheduledExecutorService scheduledThreadPool,

View File

@ -26,6 +26,7 @@ import java.util.Set;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.utils.uri.BeanSupport;
import org.apache.activemq.artemis.utils.uri.SchemaConstants; import org.apache.activemq.artemis.utils.uri.SchemaConstants;
public class TCPTransportConfigurationSchema extends AbstractTransportConfigurationSchema { public class TCPTransportConfigurationSchema extends AbstractTransportConfigurationSchema {
@ -60,10 +61,13 @@ public class TCPTransportConfigurationSchema extends AbstractTransportConfigurat
String factoryName) throws URISyntaxException { String factoryName) throws URISyntaxException {
HashMap<String, Object> props = new HashMap<>(); HashMap<String, Object> props = new HashMap<>();
setData(uri, props, allowableProperties, query); Map<String, Object> extraProps = new HashMap<>();
BeanSupport.setData(uri, props, allowableProperties, query, extraProps);
List<TransportConfiguration> transportConfigurations = new ArrayList<>(); List<TransportConfiguration> transportConfigurations = new ArrayList<>();
transportConfigurations.add(new TransportConfiguration(factoryName, props, name)); TransportConfiguration config = new TransportConfiguration(factoryName, props, name, extraProps);
transportConfigurations.add(config);
String connectors = uri.getFragment(); String connectors = uri.getFragment();
if (connectors != null && !connectors.trim().isEmpty()) { if (connectors != null && !connectors.trim().isEmpty()) {
@ -71,9 +75,10 @@ public class TCPTransportConfigurationSchema extends AbstractTransportConfigurat
for (String s : split) { for (String s : split) {
URI extraUri = new URI(s); URI extraUri = new URI(s);
HashMap<String, Object> newProps = new HashMap<>(); HashMap<String, Object> newProps = new HashMap<>();
setData(extraUri, newProps, allowableProperties, query); extraProps = new HashMap<>();
setData(extraUri, newProps, allowableProperties, parseQuery(extraUri.getQuery(), null)); BeanSupport.setData(extraUri, newProps, allowableProperties, query, extraProps);
transportConfigurations.add(new TransportConfiguration(factoryName, newProps, name + ":" + extraUri.toString())); BeanSupport.setData(extraUri, newProps, allowableProperties, parseQuery(extraUri.getQuery(), null), extraProps);
transportConfigurations.add(new TransportConfiguration(factoryName, newProps, name + ":" + extraUri.toString(), extraProps));
} }
} }
return transportConfigurations; return transportConfigurations;

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.uri.schema.serverLocator; package org.apache.activemq.artemis.uri.schema.serverLocator;
import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.utils.uri.BeanSupport;
import org.apache.activemq.artemis.utils.uri.URISchema; import org.apache.activemq.artemis.utils.uri.URISchema;
import java.net.URI; import java.net.URI;
@ -25,6 +26,6 @@ import java.util.Map;
public abstract class AbstractServerLocatorSchema extends URISchema<ServerLocator, String> { public abstract class AbstractServerLocatorSchema extends URISchema<ServerLocator, String> {
protected ConnectionOptions newConnectionOptions(URI uri, Map<String, String> query) throws Exception { protected ConnectionOptions newConnectionOptions(URI uri, Map<String, String> query) throws Exception {
return setData(uri, new ConnectionOptions(), query); return BeanSupport.setData(uri, new ConnectionOptions(), query);
} }
} }

View File

@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.uri.schema.connector.InVMTransportConfigurationSchema; import org.apache.activemq.artemis.uri.schema.connector.InVMTransportConfigurationSchema;
import org.apache.activemq.artemis.utils.uri.BeanSupport;
import org.apache.activemq.artemis.utils.uri.SchemaConstants; import org.apache.activemq.artemis.utils.uri.SchemaConstants;
import java.net.URI; import java.net.URI;
@ -37,7 +38,7 @@ public class InVMServerLocatorSchema extends AbstractServerLocatorSchema {
protected ServerLocator internalNewObject(URI uri, Map<String, String> query, String name) throws Exception { protected ServerLocator internalNewObject(URI uri, Map<String, String> query, String name) throws Exception {
TransportConfiguration tc = InVMTransportConfigurationSchema.createTransportConfiguration(uri, query, name, "org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory"); TransportConfiguration tc = InVMTransportConfigurationSchema.createTransportConfiguration(uri, query, name, "org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory");
ServerLocator factory = ActiveMQClient.createServerLocatorWithoutHA(tc); ServerLocator factory = ActiveMQClient.createServerLocatorWithoutHA(tc);
return setData(uri, factory, query); return BeanSupport.setData(uri, factory, query);
} }
@Override @Override

View File

@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.JGroupsPropertiesBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.JGroupsPropertiesBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.utils.uri.BeanSupport;
import org.apache.activemq.artemis.utils.uri.SchemaConstants; import org.apache.activemq.artemis.utils.uri.SchemaConstants;
import java.io.NotSerializableException; import java.io.NotSerializableException;
@ -63,7 +64,7 @@ public class JGroupsServerLocatorSchema extends AbstractServerLocatorSchema {
else { else {
throw new NotSerializableException(endpoint + "not serializable"); throw new NotSerializableException(endpoint + "not serializable");
} }
String query = getData(null, bean, dgc, endpoint); String query = BeanSupport.getData(null, bean, dgc, endpoint);
dgc.setBroadcastEndpointFactory(endpoint); dgc.setBroadcastEndpointFactory(endpoint);
return new URI(SchemaConstants.JGROUPS, null, auth, -1, null, query, null); return new URI(SchemaConstants.JGROUPS, null, auth, -1, null, query, null);
} }
@ -79,11 +80,11 @@ public class JGroupsServerLocatorSchema extends AbstractServerLocatorSchema {
endpointFactory = new JGroupsPropertiesBroadcastEndpointFactory().setChannelName(uri.getAuthority()); endpointFactory = new JGroupsPropertiesBroadcastEndpointFactory().setChannelName(uri.getAuthority());
} }
setData(uri, endpointFactory, query); BeanSupport.setData(uri, endpointFactory, query);
DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration().setName(name).setBroadcastEndpointFactory(endpointFactory); DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration().setName(name).setBroadcastEndpointFactory(endpointFactory);
setData(uri, dcConfig, query); BeanSupport.setData(uri, dcConfig, query);
return dcConfig; return dcConfig;
} }
} }

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactor
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.uri.schema.connector.TCPTransportConfigurationSchema; import org.apache.activemq.artemis.uri.schema.connector.TCPTransportConfigurationSchema;
import org.apache.activemq.artemis.utils.IPV6Util; import org.apache.activemq.artemis.utils.IPV6Util;
import org.apache.activemq.artemis.utils.uri.BeanSupport;
import org.apache.activemq.artemis.utils.uri.SchemaConstants; import org.apache.activemq.artemis.utils.uri.SchemaConstants;
public class TCPServerLocatorSchema extends AbstractServerLocatorSchema { public class TCPServerLocatorSchema extends AbstractServerLocatorSchema {
@ -52,7 +53,7 @@ public class TCPServerLocatorSchema extends AbstractServerLocatorSchema {
@Override @Override
protected URI internalNewURI(ServerLocator bean) throws Exception { protected URI internalNewURI(ServerLocator bean) throws Exception {
String query = getData(null, bean); String query = BeanSupport.getData(null, bean);
TransportConfiguration[] staticConnectors = bean.getStaticTransportConfigurations(); TransportConfiguration[] staticConnectors = bean.getStaticTransportConfigurations();
return getURI(query, staticConnectors); return getURI(query, staticConnectors);
} }
@ -122,9 +123,9 @@ public class TCPServerLocatorSchema extends AbstractServerLocatorSchema {
else { else {
empty = false; empty = false;
} }
cb.append(encodeURI(entry.getKey())); cb.append(BeanSupport.encodeURI(entry.getKey()));
cb.append("="); cb.append("=");
cb.append(encodeURI(entry.getValue().toString())); cb.append(BeanSupport.encodeURI(entry.getValue().toString()));
} }
} }
return cb.toString(); return cb.toString();

View File

@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory; import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.utils.uri.BeanSupport;
import org.apache.activemq.artemis.utils.uri.SchemaConstants; import org.apache.activemq.artemis.utils.uri.SchemaConstants;
public class UDPServerLocatorSchema extends AbstractServerLocatorSchema { public class UDPServerLocatorSchema extends AbstractServerLocatorSchema {
@ -60,7 +61,7 @@ public class UDPServerLocatorSchema extends AbstractServerLocatorSchema {
DiscoveryGroupConfiguration dgc = bean.getDiscoveryGroupConfiguration(); DiscoveryGroupConfiguration dgc = bean.getDiscoveryGroupConfiguration();
UDPBroadcastEndpointFactory endpoint = (UDPBroadcastEndpointFactory) dgc.getBroadcastEndpointFactory(); UDPBroadcastEndpointFactory endpoint = (UDPBroadcastEndpointFactory) dgc.getBroadcastEndpointFactory();
dgc.setBroadcastEndpointFactory(endpoint); dgc.setBroadcastEndpointFactory(endpoint);
String query = getData(IGNORED, bean, dgc, endpoint); String query = BeanSupport.getData(IGNORED, bean, dgc, endpoint);
return new URI(SchemaConstants.UDP, null, endpoint.getGroupAddress(), endpoint.getGroupPort(), null, query, null); return new URI(SchemaConstants.UDP, null, endpoint.getGroupAddress(), endpoint.getGroupPort(), null, query, null);
} }
@ -71,11 +72,11 @@ public class UDPServerLocatorSchema extends AbstractServerLocatorSchema {
String name) throws Exception { String name) throws Exception {
UDPBroadcastEndpointFactory endpointFactoryConfiguration = new UDPBroadcastEndpointFactory().setGroupAddress(host).setGroupPort(port); UDPBroadcastEndpointFactory endpointFactoryConfiguration = new UDPBroadcastEndpointFactory().setGroupAddress(host).setGroupPort(port);
setData(uri, endpointFactoryConfiguration, query); BeanSupport.setData(uri, endpointFactoryConfiguration, query);
DiscoveryGroupConfiguration dgc = setData(uri, new DiscoveryGroupConfiguration(), query).setName(name).setBroadcastEndpointFactory(endpointFactoryConfiguration); DiscoveryGroupConfiguration dgc = BeanSupport.setData(uri, new DiscoveryGroupConfiguration(), query).setName(name).setBroadcastEndpointFactory(endpointFactoryConfiguration);
setData(uri, dgc, query); BeanSupport.setData(uri, dgc, query);
return dgc; return dgc;
} }
} }

View File

@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.utils.uri.BeanSupport;
import org.apache.activemq.artemis.utils.uri.URISchema; import org.apache.activemq.artemis.utils.uri.URISchema;
public abstract class AbstractCFSchema extends URISchema<ActiveMQConnectionFactory, String> { public abstract class AbstractCFSchema extends URISchema<ActiveMQConnectionFactory, String> {
@ -32,7 +33,7 @@ public abstract class AbstractCFSchema extends URISchema<ActiveMQConnectionFacto
if (JMSConnectionOptions.convertCFType(type) == null) { if (JMSConnectionOptions.convertCFType(type) == null) {
ActiveMQClientLogger.LOGGER.invalidCFType(type, uri.toString()); ActiveMQClientLogger.LOGGER.invalidCFType(type, uri.toString());
} }
return setData(uri, new JMSConnectionOptions(), query); return BeanSupport.setData(uri, new JMSConnectionOptions(), query);
} }
} }

View File

@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.uri.schema.serverLocator.InVMServerLocatorSchema; import org.apache.activemq.artemis.uri.schema.serverLocator.InVMServerLocatorSchema;
import org.apache.activemq.artemis.uri.schema.connector.InVMTransportConfigurationSchema; import org.apache.activemq.artemis.uri.schema.connector.InVMTransportConfigurationSchema;
import org.apache.activemq.artemis.utils.uri.BeanSupport;
import org.apache.activemq.artemis.utils.uri.SchemaConstants; import org.apache.activemq.artemis.utils.uri.SchemaConstants;
import java.net.URI; import java.net.URI;
@ -38,7 +39,7 @@ public class InVMSchema extends AbstractCFSchema {
String name) throws Exception { String name) throws Exception {
JMSConnectionOptions options = newConectionOptions(uri, query); JMSConnectionOptions options = newConectionOptions(uri, query);
ActiveMQConnectionFactory factory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(options.getFactoryTypeEnum(), InVMTransportConfigurationSchema.createTransportConfiguration(uri, query, name, "org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory")); ActiveMQConnectionFactory factory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(options.getFactoryTypeEnum(), InVMTransportConfigurationSchema.createTransportConfiguration(uri, query, name, "org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory"));
return setData(uri, factory, query); return BeanSupport.setData(uri, factory, query);
} }
@Override @Override

View File

@ -28,8 +28,8 @@ import org.apache.activemq.artemis.api.core.JGroupsPropertiesBroadcastEndpointFa
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.uri.schema.serverLocator.JGroupsServerLocatorSchema; import org.apache.activemq.artemis.uri.schema.serverLocator.JGroupsServerLocatorSchema;
import org.apache.activemq.artemis.utils.uri.BeanSupport;
import org.apache.activemq.artemis.utils.uri.SchemaConstants; import org.apache.activemq.artemis.utils.uri.SchemaConstants;
import org.apache.activemq.artemis.utils.uri.URISchema;
public class JGroupsSchema extends AbstractCFSchema { public class JGroupsSchema extends AbstractCFSchema {
@ -53,7 +53,7 @@ public class JGroupsSchema extends AbstractCFSchema {
else { else {
factory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(dcConfig, options.getFactoryTypeEnum()); factory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(dcConfig, options.getFactoryTypeEnum());
} }
return URISchema.setData(uri, factory, query); return BeanSupport.setData(uri, factory, query);
} }
@Override @Override
@ -70,7 +70,7 @@ public class JGroupsSchema extends AbstractCFSchema {
else { else {
throw new NotSerializableException(endpoint + "not serializable"); throw new NotSerializableException(endpoint + "not serializable");
} }
String query = URISchema.getData(null, bean, dgc, endpoint); String query = BeanSupport.getData(null, bean, dgc, endpoint);
dgc.setBroadcastEndpointFactory(endpoint); dgc.setBroadcastEndpointFactory(endpoint);
return new URI(SchemaConstants.JGROUPS, null, auth, -1, null, query, null); return new URI(SchemaConstants.JGROUPS, null, auth, -1, null, query, null);
} }

View File

@ -23,8 +23,8 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.uri.schema.serverLocator.TCPServerLocatorSchema; import org.apache.activemq.artemis.uri.schema.serverLocator.TCPServerLocatorSchema;
import org.apache.activemq.artemis.uri.schema.connector.TCPTransportConfigurationSchema; import org.apache.activemq.artemis.uri.schema.connector.TCPTransportConfigurationSchema;
import org.apache.activemq.artemis.utils.uri.BeanSupport;
import org.apache.activemq.artemis.utils.uri.SchemaConstants; import org.apache.activemq.artemis.utils.uri.SchemaConstants;
import org.apache.activemq.artemis.utils.uri.URISchema;
import java.net.URI; import java.net.URI;
import java.util.List; import java.util.List;
@ -58,12 +58,12 @@ public class TCPSchema extends AbstractCFSchema {
factory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(options.getFactoryTypeEnum(), tcs); factory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(options.getFactoryTypeEnum(), tcs);
} }
return URISchema.setData(uri, factory, query); return BeanSupport.setData(uri, factory, query);
} }
@Override @Override
protected URI internalNewURI(ActiveMQConnectionFactory bean) throws Exception { protected URI internalNewURI(ActiveMQConnectionFactory bean) throws Exception {
String query = URISchema.getData(null, bean); String query = BeanSupport.getData(null, bean);
TransportConfiguration[] staticConnectors = bean.getStaticConnectors(); TransportConfiguration[] staticConnectors = bean.getStaticConnectors();
return TCPServerLocatorSchema.getURI(query, staticConnectors); return TCPServerLocatorSchema.getURI(query, staticConnectors);
} }

View File

@ -25,8 +25,8 @@ import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.uri.schema.serverLocator.UDPServerLocatorSchema; import org.apache.activemq.artemis.uri.schema.serverLocator.UDPServerLocatorSchema;
import org.apache.activemq.artemis.utils.uri.BeanSupport;
import org.apache.activemq.artemis.utils.uri.SchemaConstants; import org.apache.activemq.artemis.utils.uri.SchemaConstants;
import org.apache.activemq.artemis.utils.uri.URISchema;
public class UDPSchema extends AbstractCFSchema { public class UDPSchema extends AbstractCFSchema {
@ -50,14 +50,14 @@ public class UDPSchema extends AbstractCFSchema {
else { else {
factory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(dgc, options.getFactoryTypeEnum()); factory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(dgc, options.getFactoryTypeEnum());
} }
return URISchema.setData(uri, factory, query); return BeanSupport.setData(uri, factory, query);
} }
@Override @Override
protected URI internalNewURI(ActiveMQConnectionFactory bean) throws Exception { protected URI internalNewURI(ActiveMQConnectionFactory bean) throws Exception {
DiscoveryGroupConfiguration dgc = bean.getDiscoveryGroupConfiguration(); DiscoveryGroupConfiguration dgc = bean.getDiscoveryGroupConfiguration();
UDPBroadcastEndpointFactory endpoint = (UDPBroadcastEndpointFactory) dgc.getBroadcastEndpointFactory(); UDPBroadcastEndpointFactory endpoint = (UDPBroadcastEndpointFactory) dgc.getBroadcastEndpointFactory();
String query = URISchema.getData(UDPServerLocatorSchema.IGNORED, bean, dgc, endpoint); String query = BeanSupport.getData(UDPServerLocatorSchema.IGNORED, bean, dgc, endpoint);
dgc.setBroadcastEndpointFactory(endpoint); dgc.setBroadcastEndpointFactory(endpoint);
return new URI(SchemaConstants.UDP, null, endpoint.getGroupAddress(), endpoint.getGroupPort(), null, query, null); return new URI(SchemaConstants.UDP, null, endpoint.getGroupAddress(), endpoint.getGroupPort(), null, query, null);
} }

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.jms.server.embedded;
import javax.naming.Context; import javax.naming.Context;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.FileDeploymentManager; import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.registry.JndiBindingRegistry; import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
import org.apache.activemq.artemis.core.registry.MapBindingRegistry; import org.apache.activemq.artemis.core.registry.MapBindingRegistry;
@ -80,6 +81,12 @@ public class EmbeddedJMS extends EmbeddedActiveMQ {
return this; return this;
} }
public EmbeddedJMS setConfiguration(Configuration configuration) {
super.setConfiguration(configuration);
return this;
}
/** /**
* Lookup in the registry for registered object, i.e. a ConnectionFactory. * Lookup in the registry for registered object, i.e. a ConnectionFactory.
* <p> * <p>

View File

@ -22,10 +22,12 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.utils.uri.BeanSupport;
import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Component;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
@Component(service = ProtocolManagerFactory.class) @Component(service = ProtocolManagerFactory.class)
public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor> { public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor> {
@ -38,9 +40,10 @@ public class ProtonProtocolManagerFactory extends AbstractProtocolManagerFactory
@Override @Override
public ProtocolManager createProtocolManager(ActiveMQServer server, public ProtocolManager createProtocolManager(ActiveMQServer server,
List<Interceptor> incomingInterceptors, final Map<String, Object> parameters,
List<Interceptor> outgoingInterceptors) { List<BaseInterceptor> incomingInterceptors,
return new ProtonProtocolManager(this, server); List<BaseInterceptor> outgoingInterceptors) throws Exception {
return BeanSupport.setData(new ProtonProtocolManager(this, server), parameters);
} }
@Override @Override

View File

@ -17,7 +17,9 @@
package org.apache.activemq.artemis.core.protocol.hornetq; package org.apache.activemq.artemis.core.protocol.hornetq;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory; import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -36,11 +38,17 @@ public class HornetQProtocolManagerFactory extends CoreProtocolManagerFactory {
@Override @Override
public ProtocolManager createProtocolManager(final ActiveMQServer server, public ProtocolManager createProtocolManager(final ActiveMQServer server,
final List<Interceptor> incomingInterceptors, final Map<String, Object> parameters,
List<Interceptor> outgoingInterceptors) { final List<BaseInterceptor> incomingInterceptors,
incomingInterceptors.add(new HQPropertiesConversionInterceptor(true)); List<BaseInterceptor> outgoingInterceptors) {
outgoingInterceptors.add(new HQPropertiesConversionInterceptor(false));
return new HornetQProtocolManager(this, server, incomingInterceptors, outgoingInterceptors); List<Interceptor> hqIncoming = filterInterceptors(incomingInterceptors);
List<Interceptor> hqOutgoing = filterInterceptors(outgoingInterceptors);
hqIncoming.add(new HQPropertiesConversionInterceptor(true));
hqOutgoing.add(new HQPropertiesConversionInterceptor(false));
return new HornetQProtocolManager(this, server, hqIncoming, hqOutgoing);
} }
@Override @Override

View File

@ -18,14 +18,16 @@
package org.apache.activemq.artemis.core.protocol.mqtt; package org.apache.activemq.artemis.core.protocol.mqtt;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Component;
@Component(service = ProtocolManagerFactory.class) @Component(service = ProtocolManagerFactory.class)
public class MQTTProtocolManagerFactory implements ProtocolManagerFactory { public class MQTTProtocolManagerFactory implements ProtocolManagerFactory<BaseInterceptor> {
public static final String MQTT_PROTOCOL_NAME = "MQTT"; public static final String MQTT_PROTOCOL_NAME = "MQTT";
@ -35,8 +37,9 @@ public class MQTTProtocolManagerFactory implements ProtocolManagerFactory {
@Override @Override
public ProtocolManager createProtocolManager(ActiveMQServer server, public ProtocolManager createProtocolManager(ActiveMQServer server,
List incomingInterceptors, final Map<String, Object> parameters,
List outgoingInterceptors) { List<BaseInterceptor> incomingInterceptors,
List<BaseInterceptor> outgoingInterceptors) {
return new MQTTProtocolManager(server); return new MQTTProtocolManager(server);
} }

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.openwire;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Interceptor;
@ -25,6 +26,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.utils.uri.BeanSupport;
import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Component;
@Component(service = ProtocolManagerFactory.class) @Component(service = ProtocolManagerFactory.class)
@ -38,9 +40,10 @@ public class OpenWireProtocolManagerFactory extends AbstractProtocolManagerFacto
@Override @Override
public ProtocolManager createProtocolManager(final ActiveMQServer server, public ProtocolManager createProtocolManager(final ActiveMQServer server,
final List<Interceptor> incomingInterceptors, Map<String, Object> parameters,
List<Interceptor> outgoingInterceptors) { final List<BaseInterceptor> incomingInterceptors,
return new OpenWireProtocolManager(this, server); List<BaseInterceptor> outgoingInterceptors) throws Exception {
return BeanSupport.setData(new OpenWireProtocolManager(this, server), parameters);
} }
@Override @Override

View File

@ -62,7 +62,6 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormat;
public class AMQSession implements SessionCallback { public class AMQSession implements SessionCallback {
private AMQServerSession coreSession; private AMQServerSession coreSession;
private ConnectionInfo connInfo; private ConnectionInfo connInfo;
private SessionInfo sessInfo; private SessionInfo sessInfo;

View File

@ -29,7 +29,6 @@ import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.EncoderImpl; import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.WritableBuffer; import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.message.MessageError; import org.apache.qpid.proton.message.MessageError;
import org.apache.qpid.proton.message.MessageFormat;
import org.apache.qpid.proton.message.ProtonJMessage; import org.apache.qpid.proton.message.ProtonJMessage;
/** /**
@ -454,36 +453,6 @@ public class ProtonServerMessage implements ProtonJMessage {
return 0; return 0;
} }
@Override
public void load(Object data) {
}
@Override
public Object save() {
return null;
}
@Override
public String toAMQPFormat(Object value) {
return null;
}
@Override
public Object parseAMQPFormat(String value) {
return null;
}
@Override
public void setMessageFormat(MessageFormat format) {
}
@Override
public MessageFormat getMessageFormat() {
return null;
}
@Override @Override
public void clear() { public void clear() {

View File

@ -17,12 +17,14 @@
package org.apache.activemq.artemis.core.protocol.stomp; package org.apache.activemq.artemis.core.protocol.stomp;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.utils.uri.BeanSupport;
import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Component;
@Component(service = ProtocolManagerFactory.class) @Component(service = ProtocolManagerFactory.class)
@ -36,14 +38,15 @@ public class StompProtocolManagerFactory extends AbstractProtocolManagerFactory<
@Override @Override
public ProtocolManager createProtocolManager(final ActiveMQServer server, public ProtocolManager createProtocolManager(final ActiveMQServer server,
final List<StompFrameInterceptor> incomingInterceptors, final Map<String, Object> parameters,
List<StompFrameInterceptor> outgoingInterceptors) { final List<BaseInterceptor> incomingInterceptors,
return new StompProtocolManager(this, server, incomingInterceptors, outgoingInterceptors); List<BaseInterceptor> outgoingInterceptors) throws Exception {
return BeanSupport.setData(new StompProtocolManager(this, server, filterInterceptors(incomingInterceptors), filterInterceptors(outgoingInterceptors)), parameters);
} }
@Override @Override
public List<StompFrameInterceptor> filterInterceptors(List<BaseInterceptor> interceptors) { public List<StompFrameInterceptor> filterInterceptors(List<BaseInterceptor> interceptors) {
return filterInterceptors(StompFrameInterceptor.class, interceptors); return internalFilterInterceptors(StompFrameInterceptor.class, interceptors);
} }
@Override @Override

View File

@ -84,6 +84,10 @@ public class ProtocolHandler {
} }
} }
public ProtocolManager getProtocol(String name) {
return this.protocolMap.get(name);
}
class ProtocolDecoder extends ByteToMessageDecoder { class ProtocolDecoder extends ByteToMessageDecoder {
private final boolean http; private final boolean http;

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.protocol.core.impl; package org.apache.activemq.artemis.core.protocol.core.impl;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Interceptor;
@ -24,6 +25,7 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.utils.uri.BeanSupport;
public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor> { public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor> {
@ -41,9 +43,10 @@ public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory<I
*/ */
@Override @Override
public ProtocolManager createProtocolManager(final ActiveMQServer server, public ProtocolManager createProtocolManager(final ActiveMQServer server,
final List<Interceptor> incomingInterceptors, Map<String, Object> parameters,
List<Interceptor> outgoingInterceptors) { final List<BaseInterceptor> incomingInterceptors,
return new CoreProtocolManager(this, server, incomingInterceptors, outgoingInterceptors); List<BaseInterceptor> outgoingInterceptors) throws Exception {
return BeanSupport.setData(new CoreProtocolManager(this, server, filterInterceptors(incomingInterceptors), filterInterceptors(outgoingInterceptors)), parameters);
} }
@Override @Override
@ -51,7 +54,7 @@ public class CoreProtocolManagerFactory extends AbstractProtocolManagerFactory<I
// This is using this tool method // This is using this tool method
// it wouldn't be possible to write a generic method without this class parameter // it wouldn't be possible to write a generic method without this class parameter
// and I didn't want to bloat the cllaers for this // and I didn't want to bloat the cllaers for this
return filterInterceptors(Interceptor.class, interceptors); return internalFilterInterceptors(Interceptor.class, interceptors);
} }
@Override @Override

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.remoting.impl;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
public abstract class AbstractAcceptor implements Acceptor {
protected final Map<String, ProtocolManager> protocolMap;
public AbstractAcceptor(Map<String, ProtocolManager> protocolMap) {
this.protocolMap = protocolMap;
}
/**
* This will update the list of interceptors for each ProtocolManager inside the acceptor.
* */
public void updateInterceptors(List<BaseInterceptor> incomingInterceptors, List<BaseInterceptor> outgoingInterceptors) {
for (ProtocolManager manager : protocolMap.values()) {
manager.updateInterceptors(incomingInterceptors, outgoingInterceptors);
}
}
}

View File

@ -25,28 +25,29 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.core.remoting.impl.AbstractAcceptor;
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationService; import org.apache.activemq.artemis.core.server.management.NotificationService;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
import org.apache.activemq.artemis.utils.TypedProperties; import org.apache.activemq.artemis.utils.TypedProperties;
public final class InVMAcceptor implements Acceptor { public final class InVMAcceptor extends AbstractAcceptor {
private final int id; private final int id;
private final BufferHandler handler; private final BufferHandler handler;
private final ConnectionLifeCycleListener listener; private final ServerConnectionLifeCycleListener listener;
private final ConcurrentMap<String, Connection> connections = new ConcurrentHashMap<>(); private final ConcurrentMap<String, Connection> connections = new ConcurrentHashMap<>();
@ -72,8 +73,10 @@ public final class InVMAcceptor implements Acceptor {
final ClusterConnection clusterConnection, final ClusterConnection clusterConnection,
final Map<String, Object> configuration, final Map<String, Object> configuration,
final BufferHandler handler, final BufferHandler handler,
final ConnectionLifeCycleListener listener, final ServerConnectionLifeCycleListener listener,
final Map<String, ProtocolManager> protocolMap,
final Executor threadPool) { final Executor threadPool) {
super(protocolMap);
this.name = name; this.name = name;
@ -219,7 +222,7 @@ public final class InVMAcceptor implements Acceptor {
InVMConnection inVMConnection = new InVMConnection(id, connectionID, remoteHandler, connectionListener, clientExecutor, defaultActiveMQPrincipal); InVMConnection inVMConnection = new InVMConnection(id, connectionID, remoteHandler, connectionListener, clientExecutor, defaultActiveMQPrincipal);
connectionListener.connectionCreated(this, inVMConnection, ActiveMQClient.DEFAULT_CORE_PROTOCOL); connectionListener.connectionCreated(this, inVMConnection, protocolMap.get(ActiveMQClient.DEFAULT_CORE_PROTOCOL));
} }
public void disconnect(final String connectionID) { public void disconnect(final String connectionID) {
@ -249,7 +252,7 @@ public final class InVMAcceptor implements Acceptor {
this.defaultActiveMQPrincipal = defaultActiveMQPrincipal; this.defaultActiveMQPrincipal = defaultActiveMQPrincipal;
} }
private class Listener implements ConnectionLifeCycleListener { private class Listener implements ServerConnectionLifeCycleListener {
//private static Listener instance = new Listener(); //private static Listener instance = new Listener();
private final InVMConnector connector; private final InVMConnector connector;
@ -261,7 +264,7 @@ public final class InVMAcceptor implements Acceptor {
@Override @Override
public void connectionCreated(final ActiveMQComponent component, public void connectionCreated(final ActiveMQComponent component,
final Connection connection, final Connection connection,
final String protocol) { final ProtocolManager protocol) {
if (connections.putIfAbsent((String) connection.getID(), connection) != null) { if (connections.putIfAbsent((String) connection.getID(), connection) != null) {
throw ActiveMQMessageBundle.BUNDLE.connectionExists(connection.getID()); throw ActiveMQMessageBundle.BUNDLE.connectionExists(connection.getID());
} }

View File

@ -25,7 +25,7 @@ import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory; import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
public class InVMAcceptorFactory implements AcceptorFactory { public class InVMAcceptorFactory implements AcceptorFactory {
@ -34,10 +34,10 @@ public class InVMAcceptorFactory implements AcceptorFactory {
final ClusterConnection clusterConnection, final ClusterConnection clusterConnection,
final Map<String, Object> configuration, final Map<String, Object> configuration,
final BufferHandler handler, final BufferHandler handler,
final ConnectionLifeCycleListener listener, final ServerConnectionLifeCycleListener listener,
final Executor threadPool, final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool, final ScheduledExecutorService scheduledThreadPool,
final Map<String, ProtocolManager> protocolHandler) { final Map<String, ProtocolManager> protocolMap) {
return new InVMAcceptor(name, clusterConnection, configuration, handler, listener, threadPool); return new InVMAcceptor(name, clusterConnection, configuration, handler, listener, protocolMap, threadPool);
} }
} }

View File

@ -31,9 +31,9 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.UUIDGenerator;
@ -43,7 +43,7 @@ public class InVMConnection implements Connection {
private final BufferHandler handler; private final BufferHandler handler;
private final ConnectionLifeCycleListener listener; private final BaseConnectionLifeCycleListener listener;
private final String id; private final String id;
@ -64,7 +64,7 @@ public class InVMConnection implements Connection {
public InVMConnection(final int serverID, public InVMConnection(final int serverID,
final BufferHandler handler, final BufferHandler handler,
final ConnectionLifeCycleListener listener, final BaseConnectionLifeCycleListener listener,
final Executor executor) { final Executor executor) {
this(serverID, UUIDGenerator.getInstance().generateSimpleStringUUID().toString(), handler, listener, executor); this(serverID, UUIDGenerator.getInstance().generateSimpleStringUUID().toString(), handler, listener, executor);
} }
@ -72,7 +72,7 @@ public class InVMConnection implements Connection {
public InVMConnection(final int serverID, public InVMConnection(final int serverID,
final String id, final String id,
final BufferHandler handler, final BufferHandler handler,
final ConnectionLifeCycleListener listener, final BaseConnectionLifeCycleListener listener,
final Executor executor) { final Executor executor) {
this(serverID, id, handler, listener, executor, null); this(serverID, id, handler, listener, executor, null);
} }
@ -80,7 +80,7 @@ public class InVMConnection implements Connection {
public InVMConnection(final int serverID, public InVMConnection(final int serverID,
final String id, final String id,
final BufferHandler handler, final BufferHandler handler,
final ConnectionLifeCycleListener listener, final BaseConnectionLifeCycleListener listener,
final Executor executor, final Executor executor,
final ActiveMQPrincipal defaultActiveMQPrincipal) { final ActiveMQPrincipal defaultActiveMQPrincipal) {
this.serverID = serverID; this.serverID = serverID;

View File

@ -29,7 +29,9 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.spi.core.remoting.AbstractConnector; import org.apache.activemq.artemis.spi.core.remoting.AbstractConnector;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
@ -72,7 +74,7 @@ public class InVMConnector extends AbstractConnector {
private final BufferHandler handler; private final BufferHandler handler;
private final ConnectionLifeCycleListener listener; private final BaseConnectionLifeCycleListener listener;
private final InVMAcceptor acceptor; private final InVMAcceptor acceptor;
@ -86,7 +88,7 @@ public class InVMConnector extends AbstractConnector {
public InVMConnector(final Map<String, Object> configuration, public InVMConnector(final Map<String, Object> configuration,
final BufferHandler handler, final BufferHandler handler,
final ConnectionLifeCycleListener listener, final ClientConnectionLifeCycleListener listener,
final Executor closeExecutor, final Executor closeExecutor,
final Executor threadPool, final Executor threadPool,
ClientProtocolManager protocolManager) { ClientProtocolManager protocolManager) {
@ -181,11 +183,11 @@ public class InVMConnector extends AbstractConnector {
// This may be an injection point for mocks on tests // This may be an injection point for mocks on tests
protected Connection internalCreateConnection(final BufferHandler handler, protected Connection internalCreateConnection(final BufferHandler handler,
final ConnectionLifeCycleListener listener, final ClientConnectionLifeCycleListener listener,
final Executor serverExecutor) { final Executor serverExecutor) {
// No acceptor on a client connection // No acceptor on a client connection
InVMConnection inVMConnection = new InVMConnection(id, handler, listener, serverExecutor); InVMConnection inVMConnection = new InVMConnection(id, handler, listener, serverExecutor);
listener.connectionCreated(null, inVMConnection, protocolManager.getName()); listener.connectionCreated(null, inVMConnection, protocolManager);
return inVMConnection; return inVMConnection;
} }
@ -195,19 +197,25 @@ public class InVMConnector extends AbstractConnector {
return id == serverId; return id == serverId;
} }
private class Listener implements ConnectionLifeCycleListener { private class Listener implements ClientConnectionLifeCycleListener {
@Override @Override
public void connectionCreated(final ActiveMQComponent component, public void connectionCreated(final ActiveMQComponent component,
final Connection connection, final Connection connection,
final String protocol) { final ClientProtocolManager protocol) {
if (connections.putIfAbsent((String) connection.getID(), connection) != null) { if (connections.putIfAbsent((String) connection.getID(), connection) != null) {
throw ActiveMQMessageBundle.BUNDLE.connectionExists(connection.getID()); throw ActiveMQMessageBundle.BUNDLE.connectionExists(connection.getID());
} }
if (listener instanceof ConnectionLifeCycleListener) {
listener.connectionCreated(component, connection, protocol.getName());
}
else {
listener.connectionCreated(component, connection, protocol); listener.connectionCreated(component, connection, protocol);
} }
}
@Override @Override
public void connectionDestroyed(final Object connectionID) { public void connectionDestroyed(final Object connectionID) {
if (connections.remove(connectionID) != null) { if (connections.remove(connectionID) != null) {

View File

@ -21,8 +21,8 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.Connector; import org.apache.activemq.artemis.spi.core.remoting.Connector;
import org.apache.activemq.artemis.spi.core.remoting.ConnectorFactory; import org.apache.activemq.artemis.spi.core.remoting.ConnectorFactory;
@ -31,7 +31,7 @@ public class InVMConnectorFactory implements ConnectorFactory {
@Override @Override
public Connector createConnector(final Map<String, Object> configuration, public Connector createConnector(final Map<String, Object> configuration,
final BufferHandler handler, final BufferHandler handler,
final ConnectionLifeCycleListener listener, final ClientConnectionLifeCycleListener listener,
final Executor closeExecutor, final Executor closeExecutor,
final Executor threadPool, final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool, final ScheduledExecutorService scheduledThreadPool,

View File

@ -62,6 +62,7 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.protocol.ProtocolHandler; import org.apache.activemq.artemis.core.protocol.ProtocolHandler;
import org.apache.activemq.artemis.core.remoting.impl.AbstractAcceptor;
import org.apache.activemq.artemis.core.remoting.impl.ssl.SSLSupport; import org.apache.activemq.artemis.core.remoting.impl.ssl.SSLSupport;
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQComponent;
@ -71,18 +72,17 @@ import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationService; import org.apache.activemq.artemis.core.server.management.NotificationService;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.TypedProperties; import org.apache.activemq.artemis.utils.TypedProperties;
/** /**
* A Netty TCP Acceptor that supports SSL * A Netty TCP Acceptor that is embedding Netty.
*/ */
public class NettyAcceptor implements Acceptor { public class NettyAcceptor extends AbstractAcceptor {
static { static {
// Disable resource leak detection for performance reasons by default // Disable resource leak detection for performance reasons by default
@ -107,7 +107,7 @@ public class NettyAcceptor implements Acceptor {
private final BufferHandler handler; private final BufferHandler handler;
private final ConnectionLifeCycleListener listener; private final ServerConnectionLifeCycleListener listener;
private final boolean sslEnabled; private final boolean sslEnabled;
@ -173,9 +173,11 @@ public class NettyAcceptor implements Acceptor {
final ClusterConnection clusterConnection, final ClusterConnection clusterConnection,
final Map<String, Object> configuration, final Map<String, Object> configuration,
final BufferHandler handler, final BufferHandler handler,
final ConnectionLifeCycleListener listener, final ServerConnectionLifeCycleListener listener,
final ScheduledExecutorService scheduledThreadPool, final ScheduledExecutorService scheduledThreadPool,
final Map<String, ProtocolManager> protocolMap) { final Map<String, ProtocolManager> protocolMap) {
super(protocolMap);
this.name = name; this.name = name;
this.clusterConnection = clusterConnection; this.clusterConnection = clusterConnection;
@ -604,7 +606,7 @@ public class NettyAcceptor implements Acceptor {
ActiveMQServerChannelHandler(final ChannelGroup group, ActiveMQServerChannelHandler(final ChannelGroup group,
final BufferHandler handler, final BufferHandler handler,
final ConnectionLifeCycleListener listener) { final ServerConnectionLifeCycleListener listener) {
super(group, handler, listener); super(group, handler, listener);
} }
@ -618,7 +620,7 @@ public class NettyAcceptor implements Acceptor {
NettyServerConnection nc = new NettyServerConnection(configuration, ctx.channel(), connectionListener, !httpEnabled && batchDelay > 0, directDeliver); NettyServerConnection nc = new NettyServerConnection(configuration, ctx.channel(), connectionListener, !httpEnabled && batchDelay > 0, directDeliver);
connectionListener.connectionCreated(NettyAcceptor.this, nc, protocol); connectionListener.connectionCreated(NettyAcceptor.this, nc, protocolHandler.getProtocol(protocol));
SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
if (sslHandler != null) { if (sslHandler != null) {
@ -648,12 +650,12 @@ public class NettyAcceptor implements Acceptor {
} }
} }
private class Listener implements ConnectionLifeCycleListener { private class Listener implements ServerConnectionLifeCycleListener {
@Override @Override
public void connectionCreated(final ActiveMQComponent component, public void connectionCreated(final ActiveMQComponent component,
final Connection connection, final Connection connection,
final String protocol) { final ProtocolManager protocol) {
if (connections.putIfAbsent(connection.getID(), (NettyServerConnection) connection) != null) { if (connections.putIfAbsent(connection.getID(), (NettyServerConnection) connection) != null) {
throw ActiveMQMessageBundle.BUNDLE.connectionExists(connection.getID()); throw ActiveMQMessageBundle.BUNDLE.connectionExists(connection.getID());
} }

View File

@ -25,7 +25,7 @@ import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory; import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
public class NettyAcceptorFactory implements AcceptorFactory { public class NettyAcceptorFactory implements AcceptorFactory {
@ -34,7 +34,7 @@ public class NettyAcceptorFactory implements AcceptorFactory {
final ClusterConnection connection, final ClusterConnection connection,
final Map<String, Object> configuration, final Map<String, Object> configuration,
final BufferHandler handler, final BufferHandler handler,
final ConnectionLifeCycleListener listener, final ServerConnectionLifeCycleListener listener,
final Executor threadPool, final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool, final ScheduledExecutorService scheduledThreadPool,
final Map<String, ProtocolManager> protocolMap) { final Map<String, ProtocolManager> protocolMap) {

View File

@ -21,13 +21,13 @@ import java.util.Map;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
public class NettyServerConnection extends NettyConnection { public class NettyServerConnection extends NettyConnection {
public NettyServerConnection(Map<String, Object> configuration, public NettyServerConnection(Map<String, Object> configuration,
Channel channel, Channel channel,
ConnectionLifeCycleListener listener, ServerConnectionLifeCycleListener listener,
boolean batchingEnabled, boolean batchingEnabled,
boolean directDeliver) { boolean directDeliver) {
super(configuration, channel, listener, batchingEnabled, directDeliver); super(configuration, channel, listener, batchingEnabled, directDeliver);

View File

@ -65,12 +65,12 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory; import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.ReusableLatch;
public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycleListener { public class RemotingServiceImpl implements RemotingService, ServerConnectionLifeCycleListener {
// Constants ----------------------------------------------------- // Constants -----------------------------------------------------
private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled(); private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
@ -107,7 +107,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
private final ClusterManager clusterManager; private final ClusterManager clusterManager;
private final Map<String, ProtocolManager> protocolMap = new ConcurrentHashMap(); private final Map<String, ProtocolManagerFactory> protocolMap = new ConcurrentHashMap();
private ActiveMQPrincipal defaultInvmSecurityPrincipal; private ActiveMQPrincipal defaultInvmSecurityPrincipal;
@ -144,7 +144,8 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
this.flushExecutor = flushExecutor; this.flushExecutor = flushExecutor;
ActiveMQServerLogger.LOGGER.addingProtocolSupport(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory.getModuleName()); ActiveMQServerLogger.LOGGER.addingProtocolSupport(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory.getModuleName());
this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory.createProtocolManager(server, coreProtocolManagerFactory.filterInterceptors(incomingInterceptors), coreProtocolManagerFactory.filterInterceptors(outgoingInterceptors))); // this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory.createProtocolManager(server, coreProtocolManagerFactory.filterInterceptors(incomingInterceptors), coreProtocolManagerFactory.filterInterceptors(outgoingInterceptors)));
this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory);
if (config.isResolveProtocols()) { if (config.isResolveProtocols()) {
resolveProtocols(server, this.getClass().getClassLoader()); resolveProtocols(server, this.getClass().getClassLoader());
@ -157,9 +158,10 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
if (protocolManagerFactories != null) { if (protocolManagerFactories != null) {
for (ProtocolManagerFactory protocolManagerFactory : protocolManagerFactories) { for (ProtocolManagerFactory protocolManagerFactory : protocolManagerFactories) {
String[] protocols = protocolManagerFactory.getProtocols(); String[] protocols = protocolManagerFactory.getProtocols();
for (String protocol : protocols) { for (String protocolName : protocols) {
ActiveMQServerLogger.LOGGER.addingProtocolSupport(protocol, protocolManagerFactory.getModuleName()); ActiveMQServerLogger.LOGGER.addingProtocolSupport(protocolName, protocolManagerFactory.getModuleName());
protocolMap.put(protocol, protocolManagerFactory.createProtocolManager(server, incomingInterceptors, outgoingInterceptors)); // protocolMap.put(protocol, protocolManagerFactory.createProtocolManager(server, incomingInterceptors, outgoingInterceptors));
protocolMap.put(protocolName, protocolManagerFactory);
} }
} }
} }
@ -172,7 +174,8 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
String[] protocols = next.getProtocols(); String[] protocols = next.getProtocols();
for (String protocol : protocols) { for (String protocol : protocols) {
ActiveMQServerLogger.LOGGER.addingProtocolSupport(protocol, next.getModuleName()); ActiveMQServerLogger.LOGGER.addingProtocolSupport(protocol, next.getModuleName());
protocolMap.put(protocol, next.createProtocolManager(server, next.filterInterceptors(incomingInterceptors), next.filterInterceptors(outgoingInterceptors))); // protocolMap.put(protocol, next.createProtocolManager(server, next.filterInterceptors(incomingInterceptors), next.filterInterceptors(outgoingInterceptors)));
protocolMap.put(protocol, next);
} }
} }
} }
@ -210,45 +213,35 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
try { try {
AcceptorFactory factory = server.getServiceRegistry().getAcceptorFactory(info.getName(), info.getFactoryClassName()); AcceptorFactory factory = server.getServiceRegistry().getAcceptorFactory(info.getName(), info.getFactoryClassName());
Map<String, ProtocolManager> supportedProtocols = new ConcurrentHashMap(); Map<String, ProtocolManagerFactory> selectedProtocolFactories = new ConcurrentHashMap();
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
String protocol = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME, null, info.getParams()); String protocol = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME, null, info.getParams());
if (protocol != null) { if (protocol != null) {
ActiveMQServerLogger.LOGGER.warnDeprecatedProtocol(); ActiveMQServerLogger.LOGGER.warnDeprecatedProtocol();
ProtocolManager protocolManager = protocolMap.get(protocol); locateProtocols(protocol, info, selectedProtocolFactories);
if (protocolManager == null) {
ActiveMQServerLogger.LOGGER.noProtocolManagerFound(protocol, info.toString());
}
else {
supportedProtocols.put(protocol, protocolManager);
}
} }
String protocols = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOLS_PROP_NAME, null, info.getParams()); String protocols = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOLS_PROP_NAME, null, info.getParams());
if (protocols != null) { if (protocols != null) {
String[] actualProtocols = protocols.split(","); locateProtocols(protocols, info, selectedProtocolFactories);
if (actualProtocols != null) {
for (String actualProtocol : actualProtocols) {
ProtocolManager protocolManager = protocolMap.get(actualProtocol);
if (protocolManager == null) {
ActiveMQServerLogger.LOGGER.noProtocolManagerFound(actualProtocol, info.toString());
}
else {
supportedProtocols.put(actualProtocol, protocolManager);
}
}
}
} }
ClusterConnection clusterConnection = lookupClusterConnection(info); ClusterConnection clusterConnection = lookupClusterConnection(info);
Acceptor acceptor = factory.createAcceptor(info.getName(), clusterConnection, info.getParams(), new DelegatingBufferHandler(), this, threadPool, scheduledThreadPool, supportedProtocols.isEmpty() ? protocolMap : supportedProtocols); // If empty: we get the default list
if (selectedProtocolFactories.isEmpty()) {
selectedProtocolFactories = protocolMap;
}
Map<String, ProtocolManager> selectedProtocols = new ConcurrentHashMap();
for (Map.Entry<String, ProtocolManagerFactory> entry: selectedProtocolFactories.entrySet()) {
selectedProtocols.put(entry.getKey(), entry.getValue().createProtocolManager(server, info.getExtraParams(), incomingInterceptors, outgoingInterceptors));
}
Acceptor acceptor = factory.createAcceptor(info.getName(), clusterConnection, info.getParams(), new DelegatingBufferHandler(), this, threadPool, scheduledThreadPool, selectedProtocols);
if (defaultInvmSecurityPrincipal != null && acceptor.isUnsecurable()) { if (defaultInvmSecurityPrincipal != null && acceptor.isUnsecurable()) {
acceptor.setDefaultActiveMQPrincipal(defaultInvmSecurityPrincipal); acceptor.setDefaultActiveMQPrincipal(defaultInvmSecurityPrincipal);
@ -280,6 +273,25 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
started = true; started = true;
} }
private void locateProtocols(String protocolList,
Object transportConfig,
Map<String, ProtocolManagerFactory> protocolMap) {
String[] protocolsSplit = protocolList.split(",");
if (protocolsSplit != null) {
for (String protocolItem : protocolsSplit) {
ProtocolManagerFactory protocolManagerFactory = protocolMap.get(protocolItem);
if (protocolManagerFactory == null) {
ActiveMQServerLogger.LOGGER.noProtocolManagerFound(protocolItem, transportConfig.toString());
}
else {
protocolMap.put(protocolItem, protocolManagerFactory);
}
}
}
}
@Override @Override
public synchronized void startAcceptors() throws Exception { public synchronized void startAcceptors() throws Exception {
if (isStarted()) { if (isStarted()) {
@ -469,25 +481,19 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
// ConnectionLifeCycleListener implementation ----------------------------------- // ConnectionLifeCycleListener implementation -----------------------------------
private ProtocolManager getProtocolManager(String protocol) { private ProtocolManagerFactory getProtocolManager(String protocol) {
return protocolMap.get(protocol); return protocolMap.get(protocol);
} }
@Override @Override
public void connectionCreated(final ActiveMQComponent component, public void connectionCreated(final ActiveMQComponent component,
final Connection connection, final Connection connection,
final String protocol) { final ProtocolManager protocol) {
if (server == null) { if (server == null) {
throw new IllegalStateException("Unable to create connection, server hasn't finished starting up"); throw new IllegalStateException("Unable to create connection, server hasn't finished starting up");
} }
ProtocolManager pmgr = this.getProtocolManager(protocol.toString()); ConnectionEntry entry = protocol.createConnectionEntry((Acceptor) component, connection);
if (pmgr == null) {
throw ActiveMQMessageBundle.BUNDLE.unknownProtocol(protocol);
}
ConnectionEntry entry = pmgr.createConnectionEntry((Acceptor) component, connection);
if (isTrace) { if (isTrace) {
ActiveMQServerLogger.LOGGER.trace("Connection created " + connection); ActiveMQServerLogger.LOGGER.trace("Connection created " + connection);
@ -720,10 +726,9 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
} }
protected void updateProtocols() { protected void updateProtocols() {
for (ProtocolManager<?> protocolManager : this.protocolMap.values()) { for (Acceptor acceptor : this.acceptors.values()) {
protocolManager.updateInterceptors(incomingInterceptors, outgoingInterceptors); acceptor.updateInterceptors(incomingInterceptors, outgoingInterceptors);
} }
} }
} }

View File

@ -16,7 +16,7 @@
*/ */
package org.apache.activemq.artemis.core.server.cluster.impl; package org.apache.activemq.artemis.core.server.cluster.impl;
import org.apache.activemq.artemis.utils.uri.URISchema; import org.apache.activemq.artemis.utils.uri.BeanSupport;
import org.apache.commons.beanutils.Converter; import org.apache.commons.beanutils.Converter;
public enum MessageLoadBalancingType { public enum MessageLoadBalancingType {
@ -24,7 +24,7 @@ public enum MessageLoadBalancingType {
static { static {
// for URI support on ClusterConnection // for URI support on ClusterConnection
URISchema.registerConverter(new MessageLoadBalancingTypeConverter(), MessageLoadBalancingType.class); BeanSupport.registerConverter(new MessageLoadBalancingTypeConverter(), MessageLoadBalancingType.class);
} }
static class MessageLoadBalancingTypeConverter implements Converter { static class MessageLoadBalancingTypeConverter implements Converter {

View File

@ -34,7 +34,7 @@ public abstract class AbstractProtocolManagerFactory<P extends BaseInterceptor>
* @param listIn * @param listIn
* @return * @return
*/ */
protected List<P> filterInterceptors(Class<P> type, List<? extends BaseInterceptor> listIn) { protected List<P> internalFilterInterceptors(Class<P> type, List<? extends BaseInterceptor> listIn) {
if (listIn == null) { if (listIn == null) {
return Collections.emptyList(); return Collections.emptyList();
} }

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.spi.core.protocol; package org.apache.activemq.artemis.spi.core.protocol;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -34,8 +35,9 @@ public interface ProtocolManagerFactory<P extends BaseInterceptor> {
* @return * @return
*/ */
ProtocolManager createProtocolManager(ActiveMQServer server, ProtocolManager createProtocolManager(ActiveMQServer server,
List<P> incomingInterceptors, Map<String, Object> parameters,
List<P> outgoingInterceptors); List<BaseInterceptor> incomingInterceptors,
List<BaseInterceptor> outgoingInterceptors) throws Exception;
/** /**
* This should get the entire list and only return the ones this factory can deal with * * This should get the entire list and only return the ones this factory can deal with *

View File

@ -16,8 +16,10 @@
*/ */
package org.apache.activemq.artemis.spi.core.remoting; package org.apache.activemq.artemis.spi.core.remoting;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
@ -38,6 +40,11 @@ public interface Acceptor extends ActiveMQComponent {
*/ */
void pause(); void pause();
/**
* This will update the list of interceptors for each ProtocolManager inside the acceptor.
* */
void updateInterceptors(List<BaseInterceptor> incomingInterceptors, List<BaseInterceptor> outgoingInterceptors);
/** /**
* @return the cluster connection associated with this Acceptor * @return the cluster connection associated with this Acceptor
*/ */

View File

@ -46,7 +46,7 @@ public interface AcceptorFactory {
ClusterConnection clusterConnection, ClusterConnection clusterConnection,
Map<String, Object> configuration, Map<String, Object> configuration,
BufferHandler handler, BufferHandler handler,
ConnectionLifeCycleListener listener, ServerConnectionLifeCycleListener listener,
Executor threadPool, Executor threadPool,
ScheduledExecutorService scheduledThreadPool, ScheduledExecutorService scheduledThreadPool,
Map<String, ProtocolManager> protocolMap); Map<String, ProtocolManager> protocolMap);

View File

@ -16,15 +16,7 @@
*/ */
package org.apache.activemq.artemis.spi.core.remoting; package org.apache.activemq.artemis.spi.core.remoting;
public interface ServerConnectionLifeCycleListener extends ConnectionLifeCycleListener { import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
/** public interface ServerConnectionLifeCycleListener extends BaseConnectionLifeCycleListener<ProtocolManager> {
* This method is used both by client connector creation and server connection creation through acceptors.
* the acceptor will be set to null on client operations
*
* @param acceptor The acceptor here will be always null on a client connection created event.
* @param connection the connection that has been created
* @param protocol the protocol to use
*/
void connectionCreated(Acceptor acceptor, Connection connection, String protocol);
} }

View File

@ -21,6 +21,7 @@ import java.net.URI;
import java.util.Map; import java.util.Map;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.utils.uri.BeanSupport;
import org.apache.activemq.artemis.utils.uri.URISupport; import org.apache.activemq.artemis.utils.uri.URISupport;
public class ClusterConnectionMulticastSchema extends ClusterConnectionStaticSchema { public class ClusterConnectionMulticastSchema extends ClusterConnectionStaticSchema {
@ -40,7 +41,7 @@ public class ClusterConnectionMulticastSchema extends ClusterConnectionStaticSch
else { else {
bean.setDiscoveryGroupName(uri.getHost()); bean.setDiscoveryGroupName(uri.getHost());
Map<String, String> parameters = URISupport.parseParameters(uri); Map<String, String> parameters = URISupport.parseParameters(uri);
setData(uri, bean, parameters); BeanSupport.setData(uri, bean, parameters);
} }
} }

View File

@ -21,6 +21,7 @@ import java.net.URI;
import java.util.Map; import java.util.Map;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.utils.uri.BeanSupport;
import org.apache.activemq.artemis.utils.uri.URISchema; import org.apache.activemq.artemis.utils.uri.URISchema;
import org.apache.activemq.artemis.utils.uri.URISupport; import org.apache.activemq.artemis.utils.uri.URISupport;
@ -50,7 +51,7 @@ public class ClusterConnectionStaticSchema extends URISchema<ClusterConnectionCo
URISupport.CompositeData compositeData = URISupport.parseComposite(uri); URISupport.CompositeData compositeData = URISupport.parseComposite(uri);
bean.setCompositeMembers(compositeData); bean.setCompositeMembers(compositeData);
setData(uri, bean, compositeData.getParameters()); BeanSupport.setData(uri, bean, compositeData.getParameters());
} }

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.uri;
import java.net.URI;
import java.util.List;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.junit.Assert;
import org.junit.Test;
public class AcceptorParserTest {
@Test
public void testAcceptor() throws Exception {
AcceptorTransportConfigurationParser parser = new AcceptorTransportConfigurationParser();
List<TransportConfiguration> configs = parser.newObject(new URI("tcp://localhost:8080?tcpSendBufferSize=1048576&tcpReceiveBufferSize=1048576&protocols=openwire&banana=x"), "test");
for (TransportConfiguration config : configs) {
System.out.println("config:" + config);
Assert.assertTrue(config.getExtraParams().get("banana").equals("x"));
}
}
}

View File

@ -41,7 +41,6 @@ public class ClusterConnectionConfigurationTest {
ClusterConnectionConfigurationParser parser = new ClusterConnectionConfigurationParser(); ClusterConnectionConfigurationParser parser = new ClusterConnectionConfigurationParser();
ClusterConnectionConfiguration configuration = parser.newObject(new URI("static://(tcp://localhost:6556,tcp://localhost:6557)?minLargeMessageSize=132;messageLoadBalancingType=OFF"), null); ClusterConnectionConfiguration configuration = parser.newObject(new URI("static://(tcp://localhost:6556,tcp://localhost:6557)?minLargeMessageSize=132;messageLoadBalancingType=OFF"), null);
Assert.assertEquals(132, configuration.getMinLargeMessageSize()); Assert.assertEquals(132, configuration.getMinLargeMessageSize());
Assert.assertEquals(MessageLoadBalancingType.OFF, configuration.getMessageLoadBalancingType());
Assert.assertEquals(2, configuration.getCompositeMembers().getComponents().length); Assert.assertEquals(2, configuration.getCompositeMembers().getComponents().length);
Assert.assertEquals("tcp://localhost:6556", configuration.getCompositeMembers().getComponents()[0].toString()); Assert.assertEquals("tcp://localhost:6556", configuration.getCompositeMembers().getComponents()[0].toString());
Assert.assertEquals("tcp://localhost:6557", configuration.getCompositeMembers().getComponents()[1].toString()); Assert.assertEquals("tcp://localhost:6557", configuration.getCompositeMembers().getComponents()[1].toString());

View File

@ -39,7 +39,7 @@ under the License.
<dependency> <dependency>
<groupId>org.apache.qpid</groupId> <groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId> <artifactId>qpid-jms-client</artifactId>
<version>0.5.0</version> <version>0.7.0</version>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -81,7 +81,7 @@
${project.version}(${activemq.version.incrementingVersion}) ${project.version}(${activemq.version.incrementingVersion})
</ActiveMQ-Version> </ActiveMQ-Version>
<resteasy.version>3.0.14.Final</resteasy.version> <resteasy.version>3.0.14.Final</resteasy.version>
<proton.version>0.10</proton.version> <proton.version>0.12.0</proton.version>
<activemq5-version>5.12.0</activemq5-version> <activemq5-version>5.12.0</activemq5-version>
<fuse.mqtt.client.version>1.10</fuse.mqtt.client.version> <fuse.mqtt.client.version>1.10</fuse.mqtt.client.version>
<apache.derby.version>10.11.1.1</apache.derby.version> <apache.derby.version>10.11.1.1</apache.derby.version>

View File

@ -188,7 +188,7 @@
<dependency> <dependency>
<groupId>org.apache.qpid</groupId> <groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId> <artifactId>qpid-jms-client</artifactId>
<version>0.5.0</version> <version>0.7.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.qpid</groupId> <groupId>org.apache.qpid</groupId>

View File

@ -26,10 +26,11 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory;
import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -48,7 +49,7 @@ public class NettyAcceptorFactoryTest extends ActiveMQTestBase {
} }
}; };
ConnectionLifeCycleListener listener = new ConnectionLifeCycleListener() { ServerConnectionLifeCycleListener listener = new ServerConnectionLifeCycleListener() {
@Override @Override
public void connectionException(final Object connectionID, final ActiveMQException me) { public void connectionException(final Object connectionID, final ActiveMQException me) {
@ -61,7 +62,7 @@ public class NettyAcceptorFactoryTest extends ActiveMQTestBase {
@Override @Override
public void connectionCreated(ActiveMQComponent component, public void connectionCreated(ActiveMQComponent component,
final Connection connection, final Connection connection,
final String protocol) { final ProtocolManager protocol) {
} }
@Override @Override

View File

@ -25,13 +25,14 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -72,7 +73,7 @@ public class NettyAcceptorTest extends ActiveMQTestBase {
}; };
Map<String, Object> params = new HashMap<>(); Map<String, Object> params = new HashMap<>();
ConnectionLifeCycleListener listener = new ConnectionLifeCycleListener() { ServerConnectionLifeCycleListener listener = new ServerConnectionLifeCycleListener() {
@Override @Override
public void connectionException(final Object connectionID, final ActiveMQException me) { public void connectionException(final Object connectionID, final ActiveMQException me) {
@ -85,7 +86,7 @@ public class NettyAcceptorTest extends ActiveMQTestBase {
@Override @Override
public void connectionCreated(final ActiveMQComponent component, public void connectionCreated(final ActiveMQComponent component,
final Connection connection, final Connection connection,
final String protocol) { final ProtocolManager protocol) {
} }
@Override @Override

View File

@ -16,10 +16,12 @@
*/ */
package org.apache.activemq.artemis.tests.unit.core.remoting.server.impl.fake; package org.apache.activemq.artemis.tests.unit.core.remoting.server.impl.fake;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.management.NotificationService; import org.apache.activemq.artemis.core.server.management.NotificationService;
@ -27,7 +29,7 @@ import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory; import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener;
public class FakeAcceptorFactory implements AcceptorFactory { public class FakeAcceptorFactory implements AcceptorFactory {
@ -38,7 +40,7 @@ public class FakeAcceptorFactory implements AcceptorFactory {
ClusterConnection clusterConnection, ClusterConnection clusterConnection,
Map<String, Object> configuration, Map<String, Object> configuration,
BufferHandler handler, BufferHandler handler,
ConnectionLifeCycleListener listener, ServerConnectionLifeCycleListener listener,
Executor threadPool, Executor threadPool,
ScheduledExecutorService scheduledThreadPool, ScheduledExecutorService scheduledThreadPool,
Map<String, ProtocolManager> protocolMap) { Map<String, ProtocolManager> protocolMap) {
@ -57,6 +59,11 @@ public class FakeAcceptorFactory implements AcceptorFactory {
} }
@Override
public void updateInterceptors(List<BaseInterceptor> incomingInterceptors,
List<BaseInterceptor> outgoingInterceptors) {
}
@Override @Override
public ClusterConnection getClusterConnection() { public ClusterConnection getClusterConnection() {
return null; return null;