ACTIVEMQ6-7 - Improve Serialization on Connection Factory

https://issues.apache.org/jira/browse/ACTIVEMQ6-7
This commit is contained in:
Clebert Suconic 2014-12-12 14:01:48 -05:00 committed by Andy Taylor
parent 33addb4086
commit b24d72900b
25 changed files with 979 additions and 143 deletions

View File

@ -45,6 +45,10 @@
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.utils.uri;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author clebertsuconic
*/
public class URIFactory<T>
{
private URI defaultURI;
private final Map<String, URISchema<T>> schemas = new ConcurrentHashMap<>();
public URI getDefaultURI()
{
return defaultURI;
}
public void setDefaultURI(URI uri)
{
this.defaultURI = uri;
}
public void registerSchema(URISchema<T> schemaFactory)
{
schemas.put(schemaFactory.getSchemaName(), schemaFactory);
schemaFactory.setFactory(this);
}
public void removeSchema(final String schemaName)
{
schemas.remove(schemaName);
}
public T newObject(URI uri) throws Exception
{
URISchema<T> schemaFactory = schemas.get(uri.getScheme());
if (schemaFactory == null)
{
throw new NullPointerException("Schema " + uri.getScheme() + " not found");
}
return schemaFactory.newObject(uri);
}
}

View File

@ -0,0 +1,188 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.utils.uri;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.beanutils.FluentPropertyBeanIntrospector;
/**
* @author clebertsuconic
*/
public abstract class URISchema<T>
{
public abstract String getSchemaName();
public T newObject(URI uri) throws Exception
{
return newObject(uri, null);
}
private URIFactory<T> parentFactory;
void setFactory(URIFactory<T> factory)
{
this.parentFactory = parentFactory;
}
protected URIFactory<T> getFactory()
{
return parentFactory;
}
protected String getHost(URI uri)
{
URI defaultFactory = getDefaultURI();
if (defaultFactory != null && uri.getHost() == null && defaultFactory.getScheme().equals(uri.getScheme()))
{
uri = defaultFactory;
}
return uri.getHost();
}
protected URI getDefaultURI()
{
URIFactory<T> factory = getFactory();
if (factory == null)
{
return null;
}
else
{
return factory.getDefaultURI();
}
}
protected int getPort(URI uri)
{
URI defaultFactory = getDefaultURI();
if (defaultFactory != null && uri.getPort() < 0 && defaultFactory.getScheme().equals(uri.getScheme()))
{
uri = defaultFactory;
}
return uri.getPort();
}
/**
* It will create a new Object for the URI selected schema.
* the propertyOverrides is used to replace whatever was defined on the URL string
* @param uri
* @param propertyOverrides
* @return
* @throws Exception
*/
public T newObject(URI uri, Map<String, String> propertyOverrides) throws Exception
{
return internalNewObject(uri, parseQuery(uri.getQuery(), propertyOverrides));
}
protected abstract T internalNewObject(URI uri, Map<String, String> query) throws Exception;
private static final BeanUtilsBean beanUtils = new BeanUtilsBean();
static
{
// This is to customize the BeanUtils to use Fluent Proeprties as well
beanUtils.getPropertyUtils().addBeanIntrospector(new FluentPropertyBeanIntrospector());
}
public static Map<String, String> parseQuery(String uri, Map<String, String> propertyOverrides) throws URISyntaxException
{
try
{
Map<String, String> rc = new HashMap<String, String>();
if (uri != null && !uri.isEmpty())
{
String[] parameters = uri.split("&");
for (int i = 0; i < parameters.length; i++)
{
int p = parameters[i].indexOf("=");
if (p >= 0)
{
String name = URLDecoder.decode(parameters[i].substring(0, p), "UTF-8");
String value = URLDecoder.decode(parameters[i].substring(p + 1), "UTF-8");
rc.put(name, value);
}
else
{
rc.put(parameters[i], null);
}
}
}
if (propertyOverrides != null)
{
for (Map.Entry<String, String> entry: propertyOverrides.entrySet())
{
rc.put(entry.getKey(), entry.getValue());
}
}
return rc;
}
catch (UnsupportedEncodingException e)
{
throw (URISyntaxException) new URISyntaxException(e.toString(), "Invalid encoding").initCause(e);
}
}
protected String printQuery(Map<String, String> query)
{
StringBuffer buffer = new StringBuffer();
for (Map.Entry<String, String> entry : query.entrySet())
{
buffer.append(entry.getKey() + "=" + entry.getValue());
buffer.append("\n");
}
return buffer.toString();
}
protected static <P> P copyData(P source, P target) throws Exception
{
synchronized (beanUtils)
{
beanUtils.copyProperties(source, target);
}
return target;
}
protected static <P> P setData(URI uri, P obj, Map<String, String> query) throws Exception
{
synchronized (beanUtils)
{
beanUtils.setProperty(obj, "host", uri.getHost());
beanUtils.setProperty(obj, "port", uri.getPort());
beanUtils.setProperty(obj, "userInfo", uri.getUserInfo());
beanUtils.populate(obj, query);
}
return obj;
}
}

View File

@ -0,0 +1,237 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.utils;
import org.apache.activemq.utils.uri.URIFactory;
import org.apache.activemq.utils.uri.URISchema;
import org.junit.Assert;
import org.junit.Test;
import java.net.URI;
import java.util.Map;
/**
* @author clebertsuconic
*/
public class URIParserTest
{
/**
* this is just a simple test to validate the model
*
* @throws Throwable
*/
@Test
public void testSchemaFruit() throws Throwable
{
FruitParser parser = new FruitParser();
Fruit fruit = (Fruit)parser.newObject(new URI("fruit://some:guy@fair-market:3030?color=green&fluentName=something"));
Assert.assertEquals("fruit", fruit.getName());
Assert.assertEquals(3030, fruit.getPort());
Assert.assertEquals("fair-market", fruit.getHost());
Assert.assertEquals("some:guy", fruit.getUserInfo());
Assert.assertEquals("green", fruit.getColor());
Assert.assertEquals("something", fruit.getFluentName());
}
/**
* Even thought there's no host Poperty on FruitBase.. this should still work fine without throwing any exceptions
*
* @throws Throwable
*/
@Test
public void testSchemaNoHosPropertyt() throws Throwable
{
FruitParser parser = new FruitParser();
FruitBase fruit = parser.newObject(new URI("base://some:guy@fair-market:3030?color=green&fluentName=something"));
Assert.assertEquals("base", fruit.getName());
Assert.assertEquals("green", fruit.getColor());
Assert.assertEquals("something", fruit.getFluentName());
}
/**
* Even thought there's no host Poperty on FruitBase.. this should still work fine without throwing any exceptions
*
* @throws Throwable
*/
@Test
public void testSchemaNoHostOnURL() throws Throwable
{
FruitParser parser = new FruitParser();
Fruit fruit = (Fruit)parser.newObject(new URI("fruit://some:guy@port?color=green&fluentName=something"));
System.out.println("fruit:" + fruit);
Assert.assertEquals("fruit", fruit.getName());
Assert.assertEquals("green", fruit.getColor());
Assert.assertEquals("something", fruit.getFluentName());
}
class FruitParser extends URIFactory<FruitBase>
{
FruitParser()
{
this.registerSchema(new FruitSchema());
this.registerSchema(new FruitBaseSchema());
}
}
class FruitSchema extends URISchema<FruitBase>
{
@Override
public String getSchemaName()
{
return "fruit";
}
@Override
public FruitBase internalNewObject(URI uri, Map<String, String> query) throws Exception
{
return setData(uri, new Fruit(getSchemaName()), query);
}
}
class FruitBaseSchema extends URISchema<FruitBase>
{
@Override
public String getSchemaName()
{
return "base";
}
@Override
public FruitBase internalNewObject(URI uri, Map<String, String> query) throws Exception
{
return setData(uri, new FruitBase(getSchemaName()), query);
}
}
public static class FruitBase
{
final String name;
String fluentName;
String color;
FruitBase(final String name)
{
this.name = name;
}
public String getName()
{
return name;
}
public String getColor()
{
return color;
}
public void setColor(String color)
{
this.color = color;
}
public String getFluentName()
{
return fluentName;
}
public FruitBase setFluentName(String name)
{
this.fluentName = name;
return this;
}
@Override
public String toString()
{
return "FruitBase{" +
"name='" + name + '\'' +
", fluentName='" + fluentName + '\'' +
", color='" + color + '\'' +
'}';
}
}
public static class Fruit extends FruitBase
{
public Fruit(String name)
{
super(name);
}
String host;
int port;
String userInfo;
public void setHost(String host)
{
this.host = host;
}
public String getHost()
{
return host;
}
public void setPort(int port)
{
this.port = port;
}
public int getPort()
{
return port;
}
public void setUserInfo(String userInfo)
{
this.userInfo = userInfo;
}
public String getUserInfo()
{
return userInfo;
}
@Override
public String toString()
{
return "Fruit{" +
"host='" + host + '\'' +
", port=" + port +
", userInfo='" + userInfo + '\'' +
"super=" + super.toString() + '}';
}
}
}

View File

@ -416,4 +416,8 @@ public interface ActiveMQClientLogger extends BasicLogger
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 214024, value = "HTTP upgrade not supported by remote acceptor")
void httpUpgradeNotSupportedByRemoteAcceptor();
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 214025, value = "Invalid type {0}, Using default connection factory at {1}", format = Message.Format.MESSAGE_FORMAT)
void invalidCFType(String type, String uri);
}

View File

@ -60,7 +60,6 @@ import org.apache.activemq.spi.core.remoting.TopologyResponseHandler;
import org.apache.activemq.spi.core.remoting.SessionContext;
import org.apache.activemq.utils.ClassloadingUtil;
import org.apache.activemq.utils.ConcurrentHashSet;
import org.apache.activemq.utils.ConfigurationHelper;
import org.apache.activemq.utils.ConfirmationWindowWarning;
import org.apache.activemq.utils.ExecutorFactory;
import org.apache.activemq.utils.OrderedExecutorFactory;
@ -1265,19 +1264,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
private void checkTransportKeys(final ConnectorFactory factory, final TransportConfiguration tc)
{
if (tc.getParams() != null)
{
Set<String> invalid = ConfigurationHelper.checkKeys(factory.getAllowableProperties(), tc.getParams().keySet());
if (!invalid.isEmpty())
{
String msg = "The following keys are invalid for configuring a connector: " +
ConfigurationHelper.stringSetToCommaListString(invalid);
throw new IllegalStateException(msg);
}
}
}
/**

View File

@ -163,66 +163,66 @@ public class NettyConnector extends AbstractConnector
private final ConnectionLifeCycleListener listener;
private final boolean sslEnabled;
private boolean sslEnabled = TransportConstants.DEFAULT_SSL_ENABLED;
private final boolean httpEnabled;
private boolean httpEnabled;
private final long httpMaxClientIdleTime;
private long httpMaxClientIdleTime;
private final long httpClientIdleScanPeriod;
private long httpClientIdleScanPeriod;
private final boolean httpRequiresSessionId;
private boolean httpRequiresSessionId;
// if true, after the connection, the connector will send
// a HTTP GET request (+ Upgrade: activemq-remoting) that
// will be handled by the server's http server.
private final boolean httpUpgradeEnabled;
private boolean httpUpgradeEnabled;
private final boolean useServlet;
private boolean useServlet;
private final String host;
private String host;
private final int port;
private int port;
private final String localAddress;
private String localAddress;
private final int localPort;
private int localPort;
private final String keyStoreProvider;
private String keyStoreProvider;
private final String keyStorePath;
private String keyStorePath;
private final String keyStorePassword;
private String keyStorePassword;
private final String trustStoreProvider;
private String trustStoreProvider;
private final String trustStorePath;
private String trustStorePath;
private final String trustStorePassword;
private String trustStorePassword;
private final String enabledCipherSuites;
private String enabledCipherSuites;
private final String enabledProtocols;
private String enabledProtocols;
private final boolean tcpNoDelay;
private boolean tcpNoDelay;
private final int tcpSendBufferSize;
private int tcpSendBufferSize;
private final int tcpReceiveBufferSize;
private int tcpReceiveBufferSize;
private final long batchDelay;
private long batchDelay;
private final ConcurrentMap<Object, Connection> connections = new ConcurrentHashMap<Object, Connection>();
private ConcurrentMap<Object, Connection> connections = new ConcurrentHashMap<Object, Connection>();
private final String servletPath;
private String servletPath;
private final int nioRemotingThreads;
private int nioRemotingThreads;
private final boolean useNioGlobalWorkerPool;
private boolean useNioGlobalWorkerPool;
private final ScheduledExecutorService scheduledThreadPool;
private ScheduledExecutorService scheduledThreadPool;
private final Executor closeExecutor;
private Executor closeExecutor;
private BatchFlusher flusher;

View File

@ -17,7 +17,6 @@
package org.apache.activemq.core.remoting.impl.netty;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@ -46,11 +45,6 @@ public class NettyConnectorFactory implements ConnectorFactory
return new NettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool);
}
public Set<String> getAllowableProperties()
{
return TransportConstants.ALLOWABLE_CONNECTOR_KEYS;
}
@Override
public boolean isReliable()
{

View File

@ -27,36 +27,37 @@ import org.apache.activemq.api.config.ActiveMQDefaultConfiguration;
* A TransportConstants
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author ClebertSuconic
*/
public class TransportConstants
{
public static final String SSL_ENABLED_PROP_NAME = "ssl-enabled";
public static final String SSL_ENABLED_PROP_NAME = "sslEnabled";
public static final String HTTP_ENABLED_PROP_NAME = "http-enabled";
public static final String HTTP_ENABLED_PROP_NAME = "httpEnabled";
public static final String HTTP_CLIENT_IDLE_PROP_NAME = "http-client-idle-time";
public static final String HTTP_CLIENT_IDLE_PROP_NAME = "httpClientIdleTime";
public static final String HTTP_CLIENT_IDLE_SCAN_PERIOD = "http-client-idle-scan-period";
public static final String HTTP_CLIENT_IDLE_SCAN_PERIOD = "httpClientIdleScanPeriod";
public static final String HTTP_RESPONSE_TIME_PROP_NAME = "http-response-time";
public static final String HTTP_RESPONSE_TIME_PROP_NAME = "httpResponseTime";
public static final String HTTP_SERVER_SCAN_PERIOD_PROP_NAME = "http-server-scan-period";
public static final String HTTP_SERVER_SCAN_PERIOD_PROP_NAME = "httpServerScanPeriod";
public static final String HTTP_REQUIRES_SESSION_ID = "http-requires-session-id";
public static final String HTTP_REQUIRES_SESSION_ID = "httpRequiresSessionId";
public static final String HTTP_UPGRADE_ENABLED_PROP_NAME = "http-upgrade-enabled";
public static final String HTTP_UPGRADE_ENABLED_PROP_NAME = "httpUpgradeEnabled";
public static final String HTTP_UPGRADE_ENDPOINT_PROP_NAME = "http-upgrade-endpoint";
public static final String HTTP_UPGRADE_ENDPOINT_PROP_NAME = "httpPpgradeEndpoint";
public static final String USE_SERVLET_PROP_NAME = "use-servlet";
public static final String USE_SERVLET_PROP_NAME = "useServlet";
public static final String SERVLET_PATH = "servlet-path";
public static final String SERVLET_PATH = "servletPath";
public static final String USE_NIO_PROP_NAME = "use-nio";
public static final String USE_NIO_PROP_NAME = "useNio";
public static final String USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME = "use-nio-global-worker-pool";
public static final String USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME = "useNioGlobalWorkerPool";
public static final String USE_INVM_PROP_NAME = "use-invm";
public static final String USE_INVM_PROP_NAME = "useInvm";
public static final String PROTOCOL_PROP_NAME = "protocol";
@ -66,27 +67,27 @@ public class TransportConstants
public static final String PORT_PROP_NAME = "port";
public static final String LOCAL_ADDRESS_PROP_NAME = "local-address";
public static final String LOCAL_ADDRESS_PROP_NAME = "localAddress";
public static final String LOCAL_PORT_PROP_NAME = "local-port";
public static final String LOCAL_PORT_PROP_NAME = "localPort";
public static final String KEYSTORE_PROVIDER_PROP_NAME = "key-store-provider";
public static final String KEYSTORE_PROVIDER_PROP_NAME = "keyStoreProvider";
public static final String KEYSTORE_PATH_PROP_NAME = "key-store-path";
public static final String KEYSTORE_PATH_PROP_NAME = "keyStorePath";
public static final String KEYSTORE_PASSWORD_PROP_NAME = "key-store-password";
public static final String KEYSTORE_PASSWORD_PROP_NAME = "keyStorePassword";
public static final String TRUSTSTORE_PROVIDER_PROP_NAME = "trust-store-provider";
public static final String TRUSTSTORE_PROVIDER_PROP_NAME = "trustStoreProvider";
public static final String TRUSTSTORE_PATH_PROP_NAME = "trust-store-path";
public static final String TRUSTSTORE_PATH_PROP_NAME = "trustStorePath";
public static final String TRUSTSTORE_PASSWORD_PROP_NAME = "trust-store-password";
public static final String TRUSTSTORE_PASSWORD_PROP_NAME = "trustStorePassword";
public static final String ENABLED_CIPHER_SUITES_PROP_NAME = "enabled-cipher-suites";
public static final String ENABLED_CIPHER_SUITES_PROP_NAME = "enabledCipherSuites";
public static final String ENABLED_PROTOCOLS_PROP_NAME = "enabled-protocols";
public static final String ENABLED_PROTOCOLS_PROP_NAME = "enabledProtocols";
public static final String NEED_CLIENT_AUTH_PROP_NAME = "need-client-auth";
public static final String NEED_CLIENT_AUTH_PROP_NAME = "needClientAuth";
public static final String BACKLOG_PROP_NAME = "backlog";
@ -102,21 +103,21 @@ public class TransportConstants
* @see <a href="http://docs.oracle.com/javase/6/docs/technotes/guides/net/socketOpt.html">Oracle
* doc on tcpNoDelay</a>
*/
public static final String TCP_NODELAY_PROPNAME = "tcp-no-delay";
public static final String TCP_NODELAY_PROPNAME = "tcpNoDelay";
public static final String TCP_SENDBUFFER_SIZE_PROPNAME = "tcp-send-buffer-size";
public static final String TCP_SENDBUFFER_SIZE_PROPNAME = "tcpSendBufferSize";
public static final String TCP_RECEIVEBUFFER_SIZE_PROPNAME = "tcp-receive-buffer-size";
public static final String TCP_RECEIVEBUFFER_SIZE_PROPNAME = "tcpReceiveBufferSize";
public static final String NIO_REMOTING_THREADS_PROPNAME = "nio-remoting-threads";
public static final String NIO_REMOTING_THREADS_PROPNAME = "nioRemotingThreads";
public static final String BATCH_DELAY = "batch-delay";
public static final String BATCH_DELAY = "batchDelay";
public static final String DIRECT_DELIVER = "direct-deliver";
public static final String DIRECT_DELIVER = "directDeliver";
public static final String CLUSTER_CONNECTION = "cluster-connection";
public static final String CLUSTER_CONNECTION = "clusterConnection";
public static final String STOMP_CONSUMERS_CREDIT = "stomp-consumer-credits";
public static final String STOMP_CONSUMERS_CREDIT = "stompConsumerCredits";
public static final int STOMP_DEFAULT_CONSUMERS_CREDIT = 10 * 1024; // 10K

View File

@ -17,7 +17,6 @@
package org.apache.activemq.spi.core.remoting;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
@ -51,15 +50,6 @@ public interface ConnectorFactory extends TransportConfigurationHelper
ScheduledExecutorService scheduledThreadPool,
ClientProtocolManager protocolManager);
/**
* Returns the allowable properties for this connector.
* <p>
* This will differ between different connector implementations.
*
* @return the allowable properties.
*/
Set<String> getAllowableProperties();
/**
* Indicates if connectors from this factory are reliable or not. If a connector is reliable then connection
* monitoring (i.e. pings/pongs) will be disabled.

View File

@ -50,6 +50,13 @@
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>

View File

@ -60,8 +60,8 @@ import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
public class ActiveMQInitialContextFactory implements InitialContextFactory
{
public static final String CONNECTION_FACTORY_NAMES = "connectionFactoryNames";
public static final String REFRESH_TIMEOUT = "refresh-timeout";
public static final String DISCOVERY_INITIAL_WAIT_TIMEOUT = "discovery-initial-wait-timeout";
public static final String REFRESH_TIMEOUT = "refreshTimeout";
public static final String DISCOVERY_INITIAL_WAIT_TIMEOUT = "discoveryInitialWaitTimeout";
private static final String[] DEFAULT_CONNECTION_FACTORY_NAMES = {"ConnectionFactory", "XAConnectionFactory", "QueueConnectionFactory", "TopicConnectionFactory"};
public static final String TCP_SCHEME = "tcp";
@ -340,7 +340,7 @@ public class ActiveMQInitialContextFactory implements InitialContextFactory
else if (providerURI.getScheme().equals(VM_SCHEME))
{
Map inVmTransportConfig = new HashMap();
inVmTransportConfig.put("server-id", providerURI.getHost());
inVmTransportConfig.put("serverId", providerURI.getHost());
TransportConfiguration tc = new TransportConfiguration("org.apache.activemq.core.remoting.impl.invm.InVMConnectorFactory", inVmTransportConfig);
connectionFactory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(getJmsFactoryType(environment), tc);
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.uri;
import java.net.URI;
import java.util.Map;
import org.apache.activemq.core.client.ActiveMQClientLogger;
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.utils.uri.URISchema;
/**
* @author clebertsuconic
*/
public abstract class AbstractCFSchema extends URISchema<ActiveMQConnectionFactory>
{
protected ConnectionOptions newConectionOptions(URI uri, Map<String, String> query) throws Exception
{
String type = query.get("type");
// We do this check here to guarantee proper logging
if (ConnectionOptions.convertCFType(type) == null)
{
ActiveMQClientLogger.LOGGER.invalidCFType(type, uri.toString());
}
return setData(uri, new ConnectionOptions(), query);
}
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.uri;
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.utils.uri.URIFactory;
/**
* @author clebertsuconic
*/
public class ConnectionFactoryParser extends URIFactory<ActiveMQConnectionFactory>
{
public ConnectionFactoryParser()
{
registerSchema(new UDPSchema());
registerSchema(new JGroupsSchema());
}
}

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.uri;
import org.apache.activemq.api.jms.JMSFactoryType;
/**
* This will represent all the possible options you could setup on URLs
* When parsing the URL this will serve as an intermediate object
* And it could also be a pl
* @author clebertsuconic
*/
public class ConnectionOptions
{
private boolean ha;
private JMSFactoryType factoryType = JMSFactoryType.CF;
private String host;
private int port;
public ConnectionOptions setHost(String host)
{
this.host = host;
return this;
}
public String getHost()
{
return host;
}
public ConnectionOptions setPort(int port)
{
this.port = port;
return this;
}
public int getPort()
{
return port;
}
public boolean isHa()
{
return ha;
}
public void setHa(boolean ha)
{
this.ha = ha;
}
public JMSFactoryType getFactoryTypeEnum()
{
return factoryType;
}
public String getType()
{
return factoryType.toString();
}
public void setType(final String type)
{
this.factoryType = convertCFType(type);
if (factoryType == null)
{
factoryType = JMSFactoryType.CF;
}
}
public static JMSFactoryType convertCFType(String type)
{
try
{
if (type == null)
{
return null;
}
else
{
return Enum.valueOf(JMSFactoryType.class, type);
}
}
catch (Exception e)
{
return null;
}
}
@Override
public String toString()
{
return "ConnectionOptions{" +
"ha=" + ha +
", factoryType=" + factoryType +
'}';
}
}

View File

@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.uri;
import java.net.URI;
import java.util.Map;
import java.util.UUID;
import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.api.core.JGroupsBroadcastGroupConfiguration;
import org.apache.activemq.api.jms.ActiveMQJMSClient;
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.utils.uri.URISchema;
/**
* @author clebertsuconic
*/
public class JGroupsSchema extends AbstractCFSchema
{
@Override
public String getSchemaName()
{
return "jgroups";
}
@Override
public ActiveMQConnectionFactory internalNewObject(URI uri, Map<String, String> query) throws Exception
{
ConnectionOptions options = newConectionOptions(uri, query);
System.out.println("authority = " + uri.getAuthority() + " path = " + uri.getPath());
JGroupsBroadcastGroupConfiguration jgroupsConfig = new JGroupsBroadcastGroupConfiguration(uri.getAuthority(), uri.getPath() != null ? uri.getPath() : UUID.randomUUID().toString());
URISchema.setData(uri, jgroupsConfig, query);
DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration().setBroadcastEndpointFactoryConfiguration(jgroupsConfig);
URISchema.setData(uri, dcConfig, query);
if (options.isHa())
{
return ActiveMQJMSClient.createConnectionFactoryWithHA(dcConfig, options.getFactoryTypeEnum());
}
else
{
return ActiveMQJMSClient.createConnectionFactoryWithoutHA(dcConfig, options.getFactoryTypeEnum());
}
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.uri;
import java.io.PrintStream;
import java.net.URI;
import java.util.Map;
import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.api.core.UDPBroadcastGroupConfiguration;
import org.apache.activemq.api.jms.ActiveMQJMSClient;
import org.apache.activemq.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.utils.uri.URISchema;
/**
* @author clebertsuconic
*/
public class UDPSchema extends AbstractCFSchema
{
@Override
public String getSchemaName()
{
return "udp";
}
@Override
public ActiveMQConnectionFactory internalNewObject(URI uri, Map<String, String> query) throws Exception
{
ConnectionOptions options = newConectionOptions(uri, query);
DiscoveryGroupConfiguration dgc = URISchema.setData(uri, new DiscoveryGroupConfiguration(), query)
.setBroadcastEndpointFactoryConfiguration(new UDPBroadcastGroupConfiguration()
.setGroupAddress(getHost(uri))
.setGroupPort(getPort(uri))
.setLocalBindAddress(query.containsKey(TransportConstants.LOCAL_ADDRESS_PROP_NAME) ? (String) query.get(TransportConstants.LOCAL_ADDRESS_PROP_NAME) : null)
.setLocalBindPort(query.containsKey(TransportConstants.LOCAL_PORT_PROP_NAME) ? Integer.parseInt((String) query.get(TransportConstants.LOCAL_PORT_PROP_NAME)) : -1));
if (options.isHa())
{
return ActiveMQJMSClient.createConnectionFactoryWithHA(dgc, options.getFactoryTypeEnum());
}
else
{
return ActiveMQJMSClient.createConnectionFactoryWithoutHA(dgc, options.getFactoryTypeEnum());
}
}
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.uri;
import javax.jms.ConnectionFactory;
import javax.jms.XAQueueConnectionFactory;
import java.net.URI;
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
import org.junit.Assert;
import org.junit.Test;
/**
* @author clebertsuconic
*/
public class ConnectionFactoryURITest
{
ConnectionFactoryParser parser = new ConnectionFactoryParser();
@Test
public void testUDP() throws Exception
{
ActiveMQConnectionFactory factory = parser.newObject(new URI("udp://localhost:3030?ha=true&type=QUEUE_XA_CF"));
Assert.assertTrue(factory instanceof XAQueueConnectionFactory);
}
@Test
public void testInvalidCFType() throws Exception
{
ActiveMQConnectionFactory factory = parser.newObject(new URI("udp://localhost:3030?ha=true&type=QUEUE_XA_CFInvalid"));
Assert.assertTrue(factory instanceof ConnectionFactory);
}
@Test
public void testJGroups() throws Exception
{
ActiveMQConnectionFactory factory = parser.newObject(new URI("jgroups://test.xml?test=33"));
// Assert.assertTrue(factory instanceof ConnectionFactory);
}
}

View File

@ -47,10 +47,4 @@ public class InVMAcceptorFactory implements AcceptorFactory
{
return new InVMAcceptor(clusterConnection, configuration, handler, listener, threadPool);
}
public Set<String> getAllowableProperties()
{
return TransportConstants.ALLOWABLE_ACCEPTOR_KEYS;
}
}

View File

@ -48,11 +48,6 @@ public class InVMConnectorFactory implements ConnectorFactory
return connector;
}
public Set<String> getAllowableProperties()
{
return TransportConstants.ALLOWABLE_CONNECTOR_KEYS;
}
@Override
public boolean isReliable()
{

View File

@ -27,36 +27,15 @@ import org.apache.activemq.api.config.ActiveMQDefaultConfiguration;
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:mtaylor@redhat.com">Martyn Taylor</a>
* @author ClebertSuconic
*
*/
public final class TransportConstants
{
public static final String SERVER_ID_PROP_NAME = "server-id";
public static final String SERVER_ID_PROP_NAME = "serverId";
public static final int DEFAULT_SERVER_ID = 0;
public static final Set<String> ALLOWABLE_CONNECTOR_KEYS;
public static final Set<String> ALLOWABLE_ACCEPTOR_KEYS;
static
{
Set<String> allowableAcceptorKeys = new HashSet<String>();
allowableAcceptorKeys.add(TransportConstants.SERVER_ID_PROP_NAME);
allowableAcceptorKeys.add(org.apache.activemq.core.remoting.impl.netty.TransportConstants.CLUSTER_CONNECTION);
allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword());
allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropPasswordCodec());
ALLOWABLE_ACCEPTOR_KEYS = Collections.unmodifiableSet(allowableAcceptorKeys);
Set<String> allowableConnectorKeys = new HashSet<String>();
allowableConnectorKeys.add(TransportConstants.SERVER_ID_PROP_NAME);
allowableConnectorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword());
allowableConnectorKeys.add(ActiveMQDefaultConfiguration.getPropPasswordCodec());
ALLOWABLE_CONNECTOR_KEYS = Collections.unmodifiableSet(allowableConnectorKeys);
}
private TransportConstants()
{
// Utility class

View File

@ -46,9 +46,4 @@ public class NettyAcceptorFactory implements AcceptorFactory
{
return new NettyAcceptor(name, connection, configuration, handler, listener, scheduledThreadPool, protocolMap);
}
public Set<String> getAllowableProperties()
{
return TransportConstants.ALLOWABLE_ACCEPTOR_KEYS;
}
}

View File

@ -56,12 +56,4 @@ public interface AcceptorFactory
ScheduledExecutorService scheduledThreadPool,
Map<String, ProtocolManager> protocolMap);
/**
* Returns the allowable properties for this acceptor.
* <p/>
* This will differ between different acceptor implementations.
*
* @return the allowable properties.
*/
Set<String> getAllowableProperties();
}

View File

@ -410,6 +410,13 @@
<version>2.0.0</version>
<!-- License: Apache 2.0 -->
</dependency>
<!-- for URL reflection. Using Populate on URI Factory at activemq-commons -->
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.2</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@ -463,7 +463,7 @@ public class SimpleJNDIClientTest extends UnitTestCase
{
Hashtable props = new Hashtable<>();
props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
props.put(Context.PROVIDER_URL, "jgroups://test-jgroups-file_ping.xml");
props.put(Context.PROVIDER_URL, "jgroups://test-jgroups-file_ping.xml/");
Context ctx = new InitialContext(props);
ActiveMQConnectionFactory connectionFactory = (ActiveMQConnectionFactory) ctx.lookup("ConnectionFactory");