This commit is contained in:
Justin Bertram 2017-05-17 10:28:18 -05:00
commit 97843a023f
4 changed files with 224 additions and 41 deletions

View File

@ -59,6 +59,16 @@ public class URIFactory<T, P> {
return schemaFactory.newObject(uri, param);
}
public T newObject(URI uri, Map<String, String> overrides, P param) throws Exception {
URISchema<T, P> schemaFactory = schemas.get(uri.getScheme());
if (schemaFactory == null) {
throw new NullPointerException("Schema " + uri.getScheme() + " not found");
}
return schemaFactory.newObject(uri, overrides, param);
}
public T newObject(String uri, P param) throws Exception {
return newObject(new URI(uri), param);
}

View File

@ -26,6 +26,13 @@ import org.apache.activemq.artemis.jms.client.ActiveMQXAConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQXATopicConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQXAQueueConnectionFactory;
import javax.jms.ConnectionFactory;
import javax.jms.QueueConnectionFactory;
import javax.jms.TopicConnectionFactory;
import javax.jms.XAConnectionFactory;
import javax.jms.XAQueueConnectionFactory;
import javax.jms.XATopicConnectionFactory;
// XXX no javadocs
public enum JMSFactoryType {
CF {
@ -48,6 +55,11 @@ public enum JMSFactoryType {
public ActiveMQConnectionFactory createConnectionFactoryWithoutHA(final TransportConfiguration... transportConfigurations) {
return new ActiveMQJMSConnectionFactory(false, transportConfigurations);
}
@Override
public Class connectionFactoryInterface() {
return ConnectionFactory.class;
}
},
QUEUE_CF {
@Override
@ -69,6 +81,11 @@ public enum JMSFactoryType {
public ActiveMQConnectionFactory createConnectionFactoryWithoutHA(final TransportConfiguration... transportConfigurations) {
return new ActiveMQQueueConnectionFactory(false, transportConfigurations);
}
@Override
public Class connectionFactoryInterface() {
return QueueConnectionFactory.class;
}
},
TOPIC_CF {
@Override
@ -90,6 +107,11 @@ public enum JMSFactoryType {
public ActiveMQConnectionFactory createConnectionFactoryWithoutHA(final TransportConfiguration... transportConfigurations) {
return new ActiveMQTopicConnectionFactory(false, transportConfigurations);
}
@Override
public Class connectionFactoryInterface() {
return TopicConnectionFactory.class;
}
},
XA_CF {
@Override
@ -111,6 +133,11 @@ public enum JMSFactoryType {
public ActiveMQConnectionFactory createConnectionFactoryWithoutHA(final TransportConfiguration... transportConfigurations) {
return new ActiveMQXAConnectionFactory(false, transportConfigurations);
}
@Override
public Class connectionFactoryInterface() {
return XAConnectionFactory.class;
}
},
QUEUE_XA_CF {
@Override
@ -132,6 +159,11 @@ public enum JMSFactoryType {
public ActiveMQConnectionFactory createConnectionFactoryWithoutHA(final TransportConfiguration... transportConfigurations) {
return new ActiveMQXAQueueConnectionFactory(false, transportConfigurations);
}
@Override
public Class connectionFactoryInterface() {
return XAQueueConnectionFactory.class;
}
},
TOPIC_XA_CF {
@Override
@ -153,6 +185,11 @@ public enum JMSFactoryType {
public ActiveMQConnectionFactory createConnectionFactoryWithoutHA(final TransportConfiguration... transportConfigurations) {
return new ActiveMQXATopicConnectionFactory(false, transportConfigurations);
}
@Override
public Class connectionFactoryInterface() {
return XATopicConnectionFactory.class;
}
};
public int intValue() {
@ -264,5 +301,11 @@ public enum JMSFactoryType {
*/
public abstract ActiveMQConnectionFactory createConnectionFactoryWithoutHA(final TransportConfiguration... transportConfigurations);
/**
* Returns the connection factory interface that this JMSFactoryType creates.
*
* @return the javax.jms Class ConnectionFactory interface
*/
public abstract Class connectionFactoryInterface();
}

View File

@ -22,13 +22,16 @@ import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.NamingException;
import javax.naming.spi.InitialContextFactory;
import java.util.HashMap;
import java.util.Collections;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.uri.ConnectionFactoryParser;
import org.apache.activemq.artemis.uri.JMSConnectionOptions;
import org.apache.activemq.artemis.utils.uri.URISchema;
/**
* A factory of the ActiveMQ Artemis InitialContext which contains
@ -39,8 +42,6 @@ import org.apache.activemq.artemis.uri.ConnectionFactoryParser;
*/
public class ActiveMQInitialContextFactory implements InitialContextFactory {
private static final String[] DEFAULT_CONNECTION_FACTORY_NAMES = {"ConnectionFactory", "XAConnectionFactory", "QueueConnectionFactory", "TopicConnectionFactory"};
public static final String REFRESH_TIMEOUT = "refreshTimeout";
public static final String DISCOVERY_INITIAL_WAIT_TIMEOUT = "discoveryInitialWaitTimeout";
public static final String DYNAMIC_QUEUE_CONTEXT = "dynamicQueues";
@ -54,13 +55,31 @@ public class ActiveMQInitialContextFactory implements InitialContextFactory {
// lets create a factory
Map<String, Object> data = new ConcurrentHashMap<>();
Map<String, ConnectionFactory> connectionFactories = new HashMap<>();
String providerUrl = (String) environment.get(javax.naming.Context.PROVIDER_URL);
if (providerUrl != null) {
try {
JMSFactoryType providedFactoryType = getFactoryType(providerUrl);
if (providedFactoryType == null) {
for (JMSFactoryType factoryType : JMSFactoryType.values()) {
String factoryName = factoryType.connectionFactoryInterface().getSimpleName();
data.put(factoryName, createConnectionFactory(providerUrl, Collections.singletonMap("type", factoryType.toString()), factoryName));
}
} else {
String factoryName = providedFactoryType.connectionFactoryInterface().getSimpleName();
data.put(factoryName, createConnectionFactory(providerUrl, factoryName));
}
} catch (Exception e) {
e.printStackTrace();
throw new NamingException("Invalid broker URL");
}
}
for (Map.Entry<?, ?> entry : environment.entrySet()) {
String key = entry.getKey().toString();
if (key.startsWith(connectionFactoryPrefix)) {
String jndiName = key.substring(connectionFactoryPrefix.length());
try {
connectionFactories.put(jndiName, createConnectionFactory((String) environment.get(key), jndiName));
data.put(jndiName, createConnectionFactory((String) environment.get(key), jndiName));
} catch (Exception e) {
e.printStackTrace();
throw new NamingException("Invalid broker URL");
@ -68,21 +87,6 @@ public class ActiveMQInitialContextFactory implements InitialContextFactory {
}
}
if (connectionFactories.isEmpty()) {
String providerUrl = (String) environment.get(javax.naming.Context.PROVIDER_URL);
if (providerUrl != null) {
for (String factoryName : DEFAULT_CONNECTION_FACTORY_NAMES) {
try {
connectionFactories.put(factoryName, createConnectionFactory(providerUrl, factoryName));
} catch (Exception e) {
e.printStackTrace();
throw new NamingException("Invalid broker URL");
}
}
}
}
connectionFactories.forEach((name, factory) -> data.put(name, factory));
createQueues(data, environment);
createTopics(data, environment);
@ -172,4 +176,19 @@ public class ActiveMQInitialContextFactory implements InitialContextFactory {
ConnectionFactoryParser parser = new ConnectionFactoryParser();
return parser.newObject(parser.expandURI(uri), name);
}
}
/**
* Factory method to create a new connection factory from the given environment, with overrides
*/
protected ConnectionFactory createConnectionFactory(String uri, Map<String, String> overrides, String name) throws Exception {
ConnectionFactoryParser parser = new ConnectionFactoryParser();
return parser.newObject(parser.expandURI(uri), overrides, name);
}
public JMSFactoryType getFactoryType(String uri) throws Exception {
ConnectionFactoryParser parser = new ConnectionFactoryParser();
Map<String, String> queryParams = URISchema.parseQuery(parser.expandURI(uri).getQuery(), null);
String type = queryParams.get("type");
return type == null ? null : JMSConnectionOptions.convertCFType(type);
}
}

View File

@ -35,7 +35,6 @@ import java.util.HashSet;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
@ -528,29 +527,141 @@ public class SimpleJNDIClientTest extends ActiveMQTestBase {
}
public void testContext(Context ctx, String jndiName, JMSFactoryType expectedFactoryType) {
try {
ActiveMQConnectionFactory connectionFactory = (ActiveMQConnectionFactory) ctx.lookup(jndiName);
if (expectedFactoryType == null) {
Assert.fail("expected no factory, should have thrown NamingException");
} else {
Assert.assertEquals(expectedFactoryType.intValue(), connectionFactory.getFactoryType());
}
} catch (NamingException namingException) {
Assert.assertNull("NamingException should only occur when no ExpectedFactoryType, but one existed", expectedFactoryType);
}
}
@Test
public void providerURLTest() throws NamingException {
String url = "(tcp://somehost:62616,tcp://somehost:62616)?ha=true";
public void testProviderUrlDefault() throws NamingException, JMSException {
Hashtable<String, String> props = new Hashtable<>();
props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory");
props.put(Context.PROVIDER_URL, "vm://0");
Context ctx = new InitialContext(props);
Properties props = new Properties();
props.setProperty(javax.naming.Context.INITIAL_CONTEXT_FACTORY, ActiveMQInitialContextFactory.class.getName());
props.setProperty(javax.naming.Context.PROVIDER_URL, url);
InitialContext context = new InitialContext(props);
ConnectionFactory connectionFactory = (ConnectionFactory)context.lookup("ConnectionFactory");
testContext(ctx, "ConnectionFactory", JMSFactoryType.CF);
testContext(ctx, "QueueConnectionFactory", JMSFactoryType.QUEUE_CF);
testContext(ctx, "TopicConnectionFactory", JMSFactoryType.TOPIC_CF);
testContext(ctx, "XAConnectionFactory", JMSFactoryType.XA_CF);
testContext(ctx, "XAQueueConnectionFactory", JMSFactoryType.QUEUE_XA_CF);
testContext(ctx, "XATopicConnectionFactory", JMSFactoryType.TOPIC_XA_CF);
}
@Test
public void connectionFactoryProperty() throws NamingException {
String url = "(tcp://somehost:62616,tcp://somehost:62616)?ha=true";
public void testProviderUrlCF() throws NamingException, JMSException {
Hashtable<String, String> props = new Hashtable<>();
props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory");
props.put(Context.PROVIDER_URL, "vm://0?type=CF");
Context ctx = new InitialContext(props);
Properties props = new Properties();
props.setProperty(javax.naming.Context.INITIAL_CONTEXT_FACTORY, ActiveMQInitialContextFactory.class.getName());
props.setProperty(javax.naming.Context.PROVIDER_URL, url);
props.setProperty("connectionFactory.ConnectionFactory",url);
InitialContext context = new InitialContext(props);
ConnectionFactory connectionFactory = (ConnectionFactory)context.lookup("ConnectionFactory");
testContext(ctx, "ConnectionFactory", JMSFactoryType.CF);
testContext(ctx, "QueueConnectionFactory", null);
testContext(ctx, "TopicConnectionFactory", null);
testContext(ctx, "XAConnectionFactory", null);
testContext(ctx, "XAQueueConnectionFactory", null);
testContext(ctx, "XATopicConnectionFactory", null);
}
}
@Test
public void testProviderUrlXACF() throws NamingException, JMSException {
Hashtable<String, String> props = new Hashtable<>();
props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory");
props.put(Context.PROVIDER_URL, "vm://0?type=XA_CF");
Context ctx = new InitialContext(props);
testContext(ctx, "ConnectionFactory", null);
testContext(ctx, "QueueConnectionFactory", null);
testContext(ctx, "TopicConnectionFactory", null);
testContext(ctx, "XAConnectionFactory", JMSFactoryType.XA_CF);
testContext(ctx, "XAQueueConnectionFactory", null);
testContext(ctx, "XATopicConnectionFactory", null);
}
@Test
public void testProviderUrlQueueCF() throws NamingException, JMSException {
Hashtable<String, String> props = new Hashtable<>();
props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory");
props.put(Context.PROVIDER_URL, "vm://0?type=QUEUE_CF");
Context ctx = new InitialContext(props);
testContext(ctx, "ConnectionFactory", null);
testContext(ctx, "QueueConnectionFactory", JMSFactoryType.QUEUE_CF);
testContext(ctx, "TopicConnectionFactory", null);
testContext(ctx, "XAConnectionFactory", null);
testContext(ctx, "XAQueueConnectionFactory", null);
testContext(ctx, "XATopicConnectionFactory", null);
}
@Test
public void testProviderUrlQueueXACF() throws NamingException, JMSException {
Hashtable<String, String> props = new Hashtable<>();
props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory");
props.put(Context.PROVIDER_URL, "vm://0?type=QUEUE_XA_CF");
Context ctx = new InitialContext(props);
testContext(ctx, "ConnectionFactory", null);
testContext(ctx, "QueueConnectionFactory", null);
testContext(ctx, "TopicConnectionFactory", null);
testContext(ctx, "XAConnectionFactory", null);
testContext(ctx, "XAQueueConnectionFactory", JMSFactoryType.QUEUE_XA_CF);
testContext(ctx, "XATopicConnectionFactory", null);
}
@Test
public void testProviderUrlTopicCF() throws NamingException, JMSException {
Hashtable<String, String> props = new Hashtable<>();
props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory");
props.put(Context.PROVIDER_URL, "vm://0?type=TOPIC_CF");
Context ctx = new InitialContext(props);
testContext(ctx, "ConnectionFactory", null);
testContext(ctx, "QueueConnectionFactory", null);
testContext(ctx, "TopicConnectionFactory", JMSFactoryType.TOPIC_CF);
testContext(ctx, "XAConnectionFactory", null);
testContext(ctx, "XAQueueConnectionFactory", null);
testContext(ctx, "XATopicConnectionFactory", null);
}
@Test
public void testProviderUrlTopicXACF() throws NamingException, JMSException {
Hashtable<String, String> props = new Hashtable<>();
props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory");
props.put(Context.PROVIDER_URL, "vm://0?type=TOPIC_XA_CF");
Context ctx = new InitialContext(props);
testContext(ctx, "ConnectionFactory", null);
testContext(ctx, "QueueConnectionFactory", null);
testContext(ctx, "TopicConnectionFactory", null);
testContext(ctx, "XAConnectionFactory", null);
testContext(ctx, "XAQueueConnectionFactory", null);
testContext(ctx, "XATopicConnectionFactory", JMSFactoryType.TOPIC_XA_CF);
}
@Test
public void testProviderUrlDefaultAndCustom() throws NamingException, JMSException {
Hashtable<String, String> props = new Hashtable<>();
props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory");
props.put(Context.PROVIDER_URL, "vm://0");
props.put("connectionFactory.myConnectionFactory", "vm://0");
Context ctx = new InitialContext(props);
testContext(ctx, "ConnectionFactory", JMSFactoryType.CF);
testContext(ctx, "QueueConnectionFactory", JMSFactoryType.QUEUE_CF);
testContext(ctx, "TopicConnectionFactory", JMSFactoryType.TOPIC_CF);
testContext(ctx, "XAConnectionFactory", JMSFactoryType.XA_CF);
testContext(ctx, "XAQueueConnectionFactory", JMSFactoryType.QUEUE_XA_CF);
testContext(ctx, "XATopicConnectionFactory", JMSFactoryType.TOPIC_XA_CF);
testContext(ctx, "myConnectionFactory", JMSFactoryType.CF);
}
}