ACTIVEMQ6-7 - Improve Serialization on Connection Factory

https://issues.apache.org/jira/browse/ACTIVEMQ6-7

Connection Factory is now externalizable and is now serialized as a string that represents a URI. There are schemas for every possible type for connection factory and server locator.

The client JNDI representation of factories has also been changed to be consistent with this.
This commit is contained in:
Andy Taylor 2015-01-29 09:18:36 +00:00
parent b24d72900b
commit 3b76ccc92b
163 changed files with 2682 additions and 1922 deletions

View File

@ -14,15 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.api.core;
import java.io.Serializable;
package org.apache.activemq.utils.uri;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* 9/25/12
*/
public interface BroadcastEndpointFactoryConfiguration extends Serializable
public class SchemaConstants
{
BroadcastEndpointFactory createBroadcastEndpointFactory();
public static final String TCP = "tcp";
public static final String UDP = "udp";
public static final String JGROUPS = "jgroups";
public static final String VM = "vm";
}

View File

@ -18,13 +18,13 @@
package org.apache.activemq.utils.uri;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author clebertsuconic
*/
public class URIFactory<T>
{
@ -48,11 +48,25 @@ public class URIFactory<T>
schemaFactory.setFactory(this);
}
public void removeSchema(final String schemaName)
public void removeSchema(final SchemaConstants schemaName)
{
schemas.remove(schemaName);
}
public T newObject(String uriString) throws Exception
{
URI uri = normalise(uriString);
URISchema<T> schemaFactory = schemas.get(uri.getScheme());
if (schemaFactory == null)
{
throw new NullPointerException("Schema " + uri.getScheme() + " not found");
}
return schemaFactory.newObject(uri);
}
public T newObject(URI uri) throws Exception
{
URISchema<T> schemaFactory = schemas.get(uri.getScheme());
@ -66,5 +80,72 @@ public class URIFactory<T>
return schemaFactory.newObject(uri);
}
public void populateObject(URI uri, T bean) throws Exception
{
URISchema<T> schemaFactory = schemas.get(uri.getScheme());
if (schemaFactory == null)
{
throw new NullPointerException("Schema " + uri.getScheme() + " not found");
}
schemaFactory.populateObject(uri, bean);
}
public URI createSchema(String scheme, T bean) throws Exception
{
URISchema<T> schemaFactory = schemas.get(scheme);
if (schemaFactory == null)
{
throw new NullPointerException("Schema " + scheme + " not found");
}
return schemaFactory.newURI(bean);
}
/*
* this method is used to change a string with multiple URI's in it into a valid URI.
* for instance it is possible to have the following String
* (tcp://localhost:5445,tcp://localhost:5545,tcp://localhost:5555)?somequery
* This is an invalid URI so will be changed so that the first URI is used and the
* extra ones added as part of the URI fragment, like so
* tcp://localhost:5445?someQuery#tcp://localhost:5545,tcp://localhost:5555.
*
* It is the job of the URISchema implementation to handle these fragments as needed.
* */
private URI normalise(String uri) throws URISyntaxException
{
if (uri.startsWith("("))
{
String[] split = uri.split("\\)");
String[] connectorURIS = split[0].substring(split[0].indexOf('(') + 1).split(",");
String factoryQuery = split.length > 1 ? split[1] : "";
StringBuilder builder = new StringBuilder(connectorURIS[0]);
if (factoryQuery != null && factoryQuery.length() > 0)
{
if (connectorURIS[0].contains("?"))
{
builder.append("&").append(factoryQuery.substring(1));
}
else
{
builder.append(factoryQuery);
}
}
if (connectorURIS.length > 1)
{
builder.append("#");
for (int i = 1; i < connectorURIS.length; i++)
{
if (i > 1)
{
builder.append(",");
}
builder.append(connectorURIS[i]);
}
}
return new URI(builder.toString());
}
return new URI(uri);
}
}

View File

@ -14,15 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.utils.uri;
import java.beans.PropertyDescriptor;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.beanutils.FluentPropertyBeanIntrospector;
@ -40,6 +42,16 @@ public abstract class URISchema<T>
return newObject(uri, null);
}
public void populateObject(URI uri, T bean) throws Exception
{
setData(uri, bean, parseQuery(uri.getQuery(), null));
}
public URI newURI(T bean) throws Exception
{
return internalNewURI(bean);
}
private URIFactory<T> parentFactory;
@ -102,6 +114,8 @@ public abstract class URISchema<T>
protected abstract T internalNewObject(URI uri, Map<String, String> query) throws Exception;
protected abstract URI internalNewURI(T bean) throws Exception;
private static final BeanUtilsBean beanUtils = new BeanUtilsBean();
@ -185,4 +199,74 @@ public abstract class URISchema<T>
}
return obj;
}
public static void setData(URI uri, HashMap<String, Object> properties, Set<String> allowableProperties, Map<String, String> query)
{
if (allowableProperties.contains("host"))
{
properties.put("host", uri.getHost());
}
if (allowableProperties.contains("port"))
{
properties.put("port", uri.getPort());
}
if (allowableProperties.contains("userInfo"))
{
properties.put("userInfo", uri.getUserInfo());
}
for (Map.Entry<String, String> entry : query.entrySet())
{
if (allowableProperties.contains(entry.getKey()))
{
properties.put(entry.getKey(), entry.getValue());
}
}
}
public static String getData(List<String> ignored, Object... beans) throws Exception
{
StringBuilder sb = new StringBuilder();
synchronized (beanUtils)
{
for (Object bean : beans)
{
if (bean != null)
{
PropertyDescriptor[] descriptors = beanUtils.getPropertyUtils().getPropertyDescriptors(bean);
for (PropertyDescriptor descriptor : descriptors)
{
if (descriptor.getReadMethod() != null && descriptor.getWriteMethod() != null && isWriteable(descriptor, ignored))
{
String value = beanUtils.getProperty(bean, descriptor.getName());
if (value != null)
{
sb.append("&").append(descriptor.getName()).append("=").append(value);
}
}
}
}
}
}
return sb.toString();
}
private static boolean isWriteable(PropertyDescriptor descriptor, List<String> ignored)
{
if (ignored != null && ignored.contains(descriptor.getName()))
{
return false;
}
Class<?> type = descriptor.getPropertyType();
return (type == Double.class) ||
(type == double.class) ||
(type == Long.class) ||
(type == long.class) ||
(type == Integer.class) ||
(type == int.class) ||
(type == Float.class) ||
(type == float.class) ||
(type == Boolean.class) ||
(type == boolean.class) ||
(type == String.class);
}
}

View File

@ -109,6 +109,12 @@ public class URIParserTest
{
return setData(uri, new Fruit(getSchemaName()), query);
}
@Override
protected URI internalNewURI(FruitBase bean)
{
return null;
}
}
class FruitBaseSchema extends URISchema<FruitBase>
@ -125,6 +131,12 @@ public class URIParserTest
{
return setData(uri, new FruitBase(getSchemaName()), query);
}
@Override
protected URI internalNewURI(FruitBase bean)
{
return null;
}
}

View File

@ -16,8 +16,8 @@
*/
package org.apache.activemq.api.core;
import java.io.Serializable;
import java.io.Serializable;
public interface BroadcastEndpointFactory extends Serializable
{

View File

@ -38,7 +38,7 @@ public final class BroadcastGroupConfiguration implements Serializable
private long broadcastPeriod = ActiveMQDefaultConfiguration.getDefaultBroadcastPeriod();
private BroadcastEndpointFactoryConfiguration endpointFactoryConfiguration = null;
private BroadcastEndpointFactory endpointFactory = null;
private List<String> connectorInfos = null;
@ -79,14 +79,14 @@ public final class BroadcastGroupConfiguration implements Serializable
return this;
}
public BroadcastEndpointFactoryConfiguration getEndpointFactoryConfiguration()
public BroadcastEndpointFactory getEndpointFactory()
{
return endpointFactoryConfiguration;
return endpointFactory;
}
public BroadcastGroupConfiguration setEndpointFactoryConfiguration(BroadcastEndpointFactoryConfiguration endpointFactoryConfiguration)
public BroadcastGroupConfiguration setEndpointFactory(BroadcastEndpointFactory endpointFactory)
{
this.endpointFactoryConfiguration = endpointFactoryConfiguration;
this.endpointFactory = endpointFactory;
return this;
}
@ -97,7 +97,7 @@ public final class BroadcastGroupConfiguration implements Serializable
int result = 1;
result = prime * result + (int)(broadcastPeriod ^ (broadcastPeriod >>> 32));
result = prime * result + ((connectorInfos == null) ? 0 : connectorInfos.hashCode());
result = prime * result + ((endpointFactoryConfiguration == null) ? 0 : endpointFactoryConfiguration.hashCode());
result = prime * result + ((endpointFactory == null) ? 0 : endpointFactory.hashCode());
result = prime * result + ((name == null) ? 0 : name.hashCode());
return result;
}
@ -121,12 +121,12 @@ public final class BroadcastGroupConfiguration implements Serializable
}
else if (!connectorInfos.equals(other.connectorInfos))
return false;
if (endpointFactoryConfiguration == null)
if (endpointFactory == null)
{
if (other.endpointFactoryConfiguration != null)
if (other.endpointFactory != null)
return false;
}
else if (!endpointFactoryConfiguration.equals(other.endpointFactoryConfiguration))
else if (!endpointFactory.equals(other.endpointFactory))
return false;
if (name == null)
{

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.api.core;
import org.jgroups.JChannel;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
public class ChannelBroadcastEndpointFactory implements BroadcastEndpointFactory
{
private final JChannel channel;
private final String channelName;
public ChannelBroadcastEndpointFactory(JChannel channel, String channelName)
{
this.channel = channel;
this.channelName = channelName;
}
@Override
public BroadcastEndpoint createBroadcastEndpoint() throws Exception
{
return new JGroupsChannelBroadcastEndpoint(channel, channelName).initChannel();
}
}

View File

@ -16,9 +16,6 @@
*/
package org.apache.activemq.api.core;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import org.apache.activemq.api.core.client.ActiveMQClient;
@ -47,30 +44,10 @@ public final class DiscoveryGroupConfiguration implements Serializable
private long discoveryInitialWaitTimeout = ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT;
/*
* The localBindAddress is needed so we can be backward compatible with 2.2 clients
* */
private transient String localBindAddress = null;
/*
* The localBindPort is needed so we can be backward compatible with 2.2 clients
* */
private transient int localBindPort = -1;
/*
* The groupAddress is needed so we can be backward compatible with 2.2 clients
* */
private String groupAddress = null;
/*
* The groupPort is needed so we can be backward compatible with 2.2 clients
* */
private int groupPort = -1;
/*
* This is the actual object used by the class, it has to be transient so we can handle deserialization with a 2.2 client
* */
private transient BroadcastEndpointFactoryConfiguration endpointFactoryConfiguration;
private BroadcastEndpointFactory endpointFactory;
public DiscoveryGroupConfiguration()
{
@ -121,51 +98,17 @@ public final class DiscoveryGroupConfiguration implements Serializable
return this;
}
public BroadcastEndpointFactoryConfiguration getBroadcastEndpointFactoryConfiguration()
public BroadcastEndpointFactory getBroadcastEndpointFactory()
{
return endpointFactoryConfiguration;
return endpointFactory;
}
public DiscoveryGroupConfiguration setBroadcastEndpointFactoryConfiguration(BroadcastEndpointFactoryConfiguration endpointFactoryConfiguration)
public DiscoveryGroupConfiguration setBroadcastEndpointFactory(BroadcastEndpointFactory endpointFactory)
{
this.endpointFactoryConfiguration = endpointFactoryConfiguration;
if (endpointFactoryConfiguration instanceof DiscoveryGroupConfigurationCompatibilityHelper)
{
DiscoveryGroupConfigurationCompatibilityHelper dgcch = (DiscoveryGroupConfigurationCompatibilityHelper) endpointFactoryConfiguration;
localBindAddress = dgcch.getLocalBindAddress();
localBindPort = dgcch.getLocalBindPort();
groupAddress = dgcch.getGroupAddress();
groupPort = dgcch.getGroupPort();
}
this.endpointFactory = endpointFactory;
return this;
}
private void writeObject(ObjectOutputStream out) throws IOException
{
out.defaultWriteObject();
if (groupPort < 0)
{
out.writeObject(endpointFactoryConfiguration);
}
}
private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException
{
in.defaultReadObject();
if (groupPort < 0)
{
endpointFactoryConfiguration = (BroadcastEndpointFactoryConfiguration) in.readObject();
}
else
{
endpointFactoryConfiguration = new UDPBroadcastGroupConfiguration()
.setGroupAddress(groupAddress)
.setGroupPort(groupPort)
.setLocalBindAddress(localBindAddress)
.setLocalBindPort(localBindPort);
}
}
@Override
public boolean equals(Object o)
{

View File

@ -0,0 +1,281 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.api.core;
import org.jgroups.JChannel;
import org.jgroups.ReceiverAdapter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
/**
* This class is the implementation of ActiveMQ members discovery that will use JGroups.
*
* @author Howard Gao
*/
public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint
{
private final String channelName;
private boolean clientOpened;
private boolean broadcastOpened;
private JChannelWrapper channel;
private JGroupsReceiver receiver;
public JGroupsBroadcastEndpoint(String channelName)
{
this.channelName = channelName;
}
public void broadcast(final byte[] data) throws Exception
{
if (broadcastOpened)
{
org.jgroups.Message msg = new org.jgroups.Message();
msg.setBuffer(data);
channel.send(msg);
}
}
public byte[] receiveBroadcast() throws Exception
{
if (clientOpened)
{
return receiver.receiveBroadcast();
}
else
{
return null;
}
}
public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception
{
if (clientOpened)
{
return receiver.receiveBroadcast(time, unit);
}
else
{
return null;
}
}
public synchronized void openClient() throws Exception
{
if (clientOpened)
{
return;
}
internalOpen();
receiver = new JGroupsReceiver();
channel.setReceiver(receiver);
clientOpened = true;
}
public synchronized void openBroadcaster() throws Exception
{
if (broadcastOpened) return;
internalOpen();
broadcastOpened = true;
}
public abstract JChannel createChannel() throws Exception;
public JGroupsBroadcastEndpoint initChannel() throws Exception
{
this.channel = JChannelManager.getJChannel(channelName, this);
return this;
}
protected void internalOpen() throws Exception
{
channel.connect();
}
public synchronized void close(boolean isBroadcast) throws Exception
{
if (isBroadcast)
{
broadcastOpened = false;
}
else
{
channel.removeReceiver(receiver);
clientOpened = false;
}
channel.close();
}
/**
* This class is used to receive messages from a JGroups channel.
* Incoming messages are put into a queue.
*/
private static final class JGroupsReceiver extends ReceiverAdapter
{
private final BlockingQueue<byte[]> dequeue = new LinkedBlockingDeque<byte[]>();
@Override
public void receive(org.jgroups.Message msg)
{
dequeue.add(msg.getBuffer());
}
public byte[] receiveBroadcast() throws Exception
{
return dequeue.take();
}
public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception
{
return dequeue.poll(time, unit);
}
}
/**
* This class wraps a JChannel with a reference counter. The reference counter
* controls the life of the JChannel. When reference count is zero, the channel
* will be disconnected.
*/
protected static class JChannelWrapper
{
int refCount = 1;
JChannel channel;
String channelName;
List<JGroupsReceiver> receivers = new ArrayList<JGroupsReceiver>();
public JChannelWrapper(String channelName, JChannel channel) throws Exception
{
this.refCount = 1;
this.channelName = channelName;
this.channel = channel;
}
public synchronized void close()
{
refCount--;
if (refCount == 0)
{
JChannelManager.closeChannel(this.channelName, channel);
}
}
public void removeReceiver(JGroupsReceiver receiver)
{
synchronized (receivers)
{
receivers.remove(receiver);
}
}
public synchronized void connect() throws Exception
{
if (channel.isConnected()) return;
channel.setReceiver(new ReceiverAdapter()
{
@Override
public void receive(org.jgroups.Message msg)
{
synchronized (receivers)
{
for (JGroupsReceiver r : receivers)
{
r.receive(msg);
}
}
}
});
channel.connect(channelName);
}
public void setReceiver(JGroupsReceiver jGroupsReceiver)
{
synchronized (receivers)
{
receivers.add(jGroupsReceiver);
}
}
public void send(org.jgroups.Message msg) throws Exception
{
channel.send(msg);
}
public JChannelWrapper addRef()
{
this.refCount++;
return this;
}
@Override
public String toString()
{
return "JChannelWrapper of [" + channel + "] " + refCount + " " + channelName;
}
}
/**
* This class maintain a global Map of JChannels wrapped in JChannelWrapper for
* the purpose of reference counting.
* <p/>
* Wherever a JChannel is needed it should only get it by calling the getChannel()
* method of this class. The real disconnect of channels are also done here only.
*/
protected static class JChannelManager
{
private static Map<String, JChannelWrapper> channels;
public static synchronized JChannelWrapper getJChannel(String channelName, JGroupsBroadcastEndpoint endpoint) throws Exception
{
if (channels == null)
{
channels = new HashMap<>();
}
JChannelWrapper wrapper = channels.get(channelName);
if (wrapper == null)
{
wrapper = new JChannelWrapper(channelName, endpoint.createChannel());
channels.put(channelName, wrapper);
return wrapper;
}
return wrapper.addRef();
}
public static synchronized void closeChannel(String channelName, JChannel channel)
{
channel.setReceiver(null);
channel.disconnect();
channel.close();
JChannelWrapper wrapper = channels.remove(channelName);
if (wrapper == null)
{
throw new IllegalStateException("Did not find channel " + channelName);
}
}
}
}

View File

@ -1,404 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.api.core;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.conf.PlainConfigurator;
/**
* The configuration for creating broadcasting/discovery groups using JGroups channels
* There are two ways to constructing a JGroups channel (JChannel):
* <ol>
* <li> by passing in a JGroups configuration file<br>
* The file must exists in the activemq classpath. ActiveMQ creates a JChannel with the
* configuration file and use it for broadcasting and discovery. In standalone server
* mode ActiveMQ uses this way for constructing JChannels.</li>
* <li> by passing in a JChannel instance<br>
* This is useful when ActiveMQ needs to get a JChannel from a running JGroups service as in the
* case of AS7 integration.</li>
* </ol>
* <p>
* Note only one JChannel is needed in a VM. To avoid the channel being prematurely disconnected
* by any party, a wrapper class is used.
*
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
* @see JChannelWrapper, JChannelManager
*/
public final class JGroupsBroadcastGroupConfiguration implements BroadcastEndpointFactoryConfiguration, DiscoveryGroupConfigurationCompatibilityHelper
{
private static final long serialVersionUID = 8952238567248461285L;
private final BroadcastEndpointFactory factory;
public JGroupsBroadcastGroupConfiguration(final String jgroupsFile, final String channelName)
{
factory = new BroadcastEndpointFactory()
{
private static final long serialVersionUID = 1047956472941098435L;
@Override
public BroadcastEndpoint createBroadcastEndpoint() throws Exception
{
JGroupsBroadcastEndpoint endpoint = new JGroupsBroadcastEndpoint();
endpoint.initChannel(jgroupsFile, channelName);
return endpoint;
}
};
}
public JGroupsBroadcastGroupConfiguration(final JChannel channel, final String channelName)
{
factory = new BroadcastEndpointFactory()
{
private static final long serialVersionUID = 5110372849181145377L;
@Override
public BroadcastEndpoint createBroadcastEndpoint() throws Exception
{
JGroupsBroadcastEndpoint endpoint = new JGroupsBroadcastEndpoint();
endpoint.initChannel(channel, channelName);
return endpoint;
}
};
}
@Override
public BroadcastEndpointFactory createBroadcastEndpointFactory()
{
return factory;
}
@Override
public String getLocalBindAddress()
{
return null;
}
@Override
/*
* return -1 to force deserialization of object
* */
public int getLocalBindPort()
{
return -1;
}
@Override
public String getGroupAddress()
{
return null;
}
@Override
public int getGroupPort()
{
return -1;
}
/**
* This class is the implementation of ActiveMQ members discovery that will use JGroups.
*
* @author Howard Gao
*/
private static final class JGroupsBroadcastEndpoint implements BroadcastEndpoint
{
private boolean clientOpened;
private boolean broadcastOpened;
private JChannelWrapper<?> channel;
private JGroupsReceiver receiver;
public void broadcast(final byte[] data) throws Exception
{
if (broadcastOpened)
{
Message msg = new Message();
msg.setBuffer(data);
channel.send(msg);
}
}
public byte[] receiveBroadcast() throws Exception
{
if (clientOpened)
{
return receiver.receiveBroadcast();
}
else
{
return null;
}
}
public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception
{
if (clientOpened)
{
return receiver.receiveBroadcast(time, unit);
}
else
{
return null;
}
}
public synchronized void openClient() throws Exception
{
if (clientOpened)
{
return;
}
internalOpen();
receiver = new JGroupsReceiver();
channel.setReceiver(receiver);
clientOpened = true;
}
public synchronized void openBroadcaster() throws Exception
{
if (broadcastOpened) return;
internalOpen();
broadcastOpened = true;
}
private void initChannel(final String jgroupsConfig, final String channelName) throws Exception
{
PlainConfigurator configurator = new PlainConfigurator(jgroupsConfig);
try
{
this.channel = JChannelManager.getJChannel(channelName, configurator);
return;
}
catch (Exception e)
{
this.channel = null;
}
URL configURL = Thread.currentThread().getContextClassLoader().getResource(jgroupsConfig);
if (configURL == null)
{
throw new RuntimeException("couldn't find JGroups configuration " + jgroupsConfig);
}
this.channel = JChannelManager.getJChannel(channelName, configURL);
}
private void initChannel(final JChannel channel1, final String channelName) throws Exception
{
this.channel = JChannelManager.getJChannel(channelName, channel1);
}
protected void internalOpen() throws Exception
{
channel.connect();
}
public synchronized void close(boolean isBroadcast) throws Exception
{
if (isBroadcast)
{
broadcastOpened = false;
}
else
{
channel.removeReceiver(receiver);
clientOpened = false;
}
channel.close();
}
/**
* This class is used to receive messages from a JGroups channel.
* Incoming messages are put into a queue.
*/
private static final class JGroupsReceiver extends ReceiverAdapter
{
private final BlockingQueue<byte[]> dequeue = new LinkedBlockingDeque<byte[]>();
@Override
public void receive(org.jgroups.Message msg)
{
dequeue.add(msg.getBuffer());
}
public byte[] receiveBroadcast() throws Exception
{
return dequeue.take();
}
public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception
{
return dequeue.poll(time, unit);
}
}
/**
* This class wraps a JChannel with a reference counter. The reference counter
* controls the life of the JChannel. When reference count is zero, the channel
* will be disconnected.
*
* @param <T>
*/
private static class JChannelWrapper<T>
{
int refCount = 1;
JChannel channel;
String channelName;
List<JGroupsReceiver> receivers = new ArrayList<JGroupsReceiver>();
public JChannelWrapper(String channelName, T t) throws Exception
{
this.refCount = 1;
this.channelName = channelName;
if (t instanceof URL)
{
this.channel = new JChannel((URL) t);
}
else if (t instanceof JChannel)
{
this.channel = (JChannel) t;
}
else if (t instanceof PlainConfigurator)
{
this.channel = new JChannel((PlainConfigurator)t);
}
else
{
throw new IllegalArgumentException("Unsupported type " + t);
}
}
public synchronized void close()
{
refCount--;
if (refCount == 0)
{
JChannelManager.closeChannel(this.channelName, channel);
}
}
public void removeReceiver(JGroupsReceiver receiver)
{
synchronized (receivers)
{
receivers.remove(receiver);
}
}
public synchronized void connect() throws Exception
{
if (channel.isConnected()) return;
channel.setReceiver(new ReceiverAdapter()
{
@Override
public void receive(Message msg)
{
synchronized (receivers)
{
for (JGroupsReceiver r : receivers)
{
r.receive(msg);
}
}
}
});
channel.connect(channelName);
}
public void setReceiver(JGroupsReceiver jGroupsReceiver)
{
synchronized (receivers)
{
receivers.add(jGroupsReceiver);
}
}
public void send(Message msg) throws Exception
{
channel.send(msg);
}
public JChannelWrapper<T> addRef()
{
this.refCount++;
return this;
}
@Override
public String toString()
{
return "JChannelWrapper of [" + channel + "] " + refCount + " " + channelName;
}
}
/**
* This class maintain a global Map of JChannels wrapped in JChannelWrapper for
* the purpose of reference counting.
* <p/>
* Wherever a JChannel is needed it should only get it by calling the getChannel()
* method of this class. The real disconnect of channels are also done here only.
*/
private static class JChannelManager
{
private static Map<String, JChannelWrapper<?>> channels;
public static synchronized <T> JChannelWrapper<?> getJChannel(String channelName, T t) throws Exception
{
if (channels == null)
{
channels = new HashMap<String, JChannelWrapper<?>>();
}
JChannelWrapper<?> wrapper = channels.get(channelName);
if (wrapper == null)
{
wrapper = new JChannelWrapper<T>(channelName, t);
channels.put(channelName, wrapper);
return wrapper;
}
return wrapper.addRef();
}
public static synchronized void closeChannel(String channelName, JChannel channel)
{
channel.setReceiver(null);
channel.disconnect();
channel.close();
JChannelWrapper<?> wrapper = channels.remove(channelName);
if (wrapper == null)
{
throw new IllegalStateException("Did not find channel " + channelName);
}
}
}
}
}

View File

@ -16,30 +16,24 @@
*/
package org.apache.activemq.api.core;
import org.jgroups.JChannel;
/**
* This interface is needed for making a DiscoveryGroupConfiguration backward
* compatible with version 2.2 clients. It is used to extract from new
* {@link org.apache.activemq.api.core.BroadcastEndpointFactoryConfiguration} the four
* UDP attributes in order to form a version 2.2 DiscoveryGroupConfiguration
* in time of serialization.
*
* @see DiscoveryGroupConfiguration#readObject(java.io.ObjectInputStream)
* @see DiscoveryGroupConfiguration#writeObject(java.io.ObjectOutputStream)
*
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* 12/13/12
*/
public interface DiscoveryGroupConfigurationCompatibilityHelper
public class JGroupsChannelBroadcastEndpoint extends JGroupsBroadcastEndpoint
{
// XXX No javadocs
String getLocalBindAddress();
private final JChannel jChannel;
// XXX No javadocs
int getLocalBindPort();
public JGroupsChannelBroadcastEndpoint(JChannel jChannel, final String channelName) throws Exception
{
super(channelName);
this.jChannel = jChannel;
}
// XXX No javadocs
String getGroupAddress();
// XXX No javadocs
int getGroupPort();
@Override
public JChannel createChannel() throws Exception
{
return jChannel;
}
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.api.core;
import org.jgroups.JChannel;
import java.net.URL;
/**
* This class is the implementation of ActiveMQ members discovery that will use JGroups.
*
* @author Howard Gao
*/
public final class JGroupsFileBroadcastEndpoint extends JGroupsBroadcastEndpoint
{
private String file;
public JGroupsFileBroadcastEndpoint(final String file, final String channelName) throws Exception
{
super(channelName);
this.file = file;
}
public JChannel createChannel() throws Exception
{
URL configURL = Thread.currentThread().getContextClassLoader().getResource(file);
if (configURL == null)
{
throw new RuntimeException("couldn't find JGroups configuration " + file);
}
return new JChannel(configURL);
}
}

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.api.core;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
public class JGroupsFileBroadcastEndpointFactory implements BroadcastEndpointFactory
{
private String file;
private String channelName;
@Override
public BroadcastEndpoint createBroadcastEndpoint() throws Exception
{
return new JGroupsFileBroadcastEndpoint(file, channelName).initChannel();
}
public String getFile()
{
return file;
}
public JGroupsFileBroadcastEndpointFactory setFile(String file)
{
this.file = file;
return this;
}
public String getChannelName()
{
return channelName;
}
public JGroupsFileBroadcastEndpointFactory setChannelName(String channelName)
{
this.channelName = channelName;
return this;
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.api.core;
import org.jgroups.JChannel;
import org.jgroups.conf.PlainConfigurator;
/**
* This class is the implementation of ActiveMQ members discovery that will use JGroups.
*
* @author Howard Gao
*/
public final class JGroupsPropertiesBroadcastEndpoint extends JGroupsBroadcastEndpoint
{
private String properties;
public JGroupsPropertiesBroadcastEndpoint(final String properties, final String channelName) throws Exception
{
super(channelName);
this.properties = properties;
}
@Override
public JChannel createChannel() throws Exception
{
PlainConfigurator configurator = new PlainConfigurator(properties);
return new JChannel(configurator);
}
}

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.api.core;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
public class JGroupsPropertiesBroadcastEndpointFactory implements BroadcastEndpointFactory
{
private String properties;
private String channelName;
@Override
public BroadcastEndpoint createBroadcastEndpoint() throws Exception
{
return new JGroupsPropertiesBroadcastEndpoint(properties, channelName).initChannel();
}
public String getProperties()
{
return properties;
}
public JGroupsPropertiesBroadcastEndpointFactory setProperties(String properties)
{
this.properties = properties;
return this;
}
public String getChannelName()
{
return channelName;
}
public JGroupsPropertiesBroadcastEndpointFactory setChannelName(String channelName)
{
this.channelName = channelName;
return this;
}
}

View File

@ -36,10 +36,8 @@ import org.apache.activemq.core.client.ActiveMQClientLogger;
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a> Created 18 Nov 2008 08:44:30
*/
public final class UDPBroadcastGroupConfiguration implements BroadcastEndpointFactoryConfiguration, DiscoveryGroupConfigurationCompatibilityHelper
public final class UDPBroadcastEndpointFactory implements BroadcastEndpointFactory
{
private static final long serialVersionUID = 1052413739064253955L;
private transient String localBindAddress = null;
private transient int localBindPort = -1;
@ -48,24 +46,17 @@ public final class UDPBroadcastGroupConfiguration implements BroadcastEndpointFa
private int groupPort = -1;
public UDPBroadcastGroupConfiguration()
public UDPBroadcastEndpointFactory()
{
}
public BroadcastEndpointFactory createBroadcastEndpointFactory()
public BroadcastEndpoint createBroadcastEndpoint() throws Exception
{
return new BroadcastEndpointFactory()
{
@Override
public BroadcastEndpoint createBroadcastEndpoint() throws Exception
{
return new UDPBroadcastEndpoint()
.setGroupAddress(groupAddress != null ? InetAddress.getByName(groupAddress) : null)
.setGroupPort(groupPort)
.setLocalBindAddress(localBindAddress != null ? InetAddress.getByName(localBindAddress) : null)
.setLocalBindPort(localBindPort);
}
};
return new UDPBroadcastEndpoint()
.setGroupAddress(groupAddress != null ? InetAddress.getByName(groupAddress) : null)
.setGroupPort(groupPort)
.setLocalBindAddress(localBindAddress != null ? InetAddress.getByName(localBindAddress) : null)
.setLocalBindPort(localBindPort);
}
public String getGroupAddress()
@ -73,7 +64,7 @@ public final class UDPBroadcastGroupConfiguration implements BroadcastEndpointFa
return groupAddress;
}
public UDPBroadcastGroupConfiguration setGroupAddress(String groupAddress)
public UDPBroadcastEndpointFactory setGroupAddress(String groupAddress)
{
this.groupAddress = groupAddress;
return this;
@ -84,7 +75,7 @@ public final class UDPBroadcastGroupConfiguration implements BroadcastEndpointFa
return groupPort;
}
public UDPBroadcastGroupConfiguration setGroupPort(int groupPort)
public UDPBroadcastEndpointFactory setGroupPort(int groupPort)
{
this.groupPort = groupPort;
return this;
@ -95,7 +86,7 @@ public final class UDPBroadcastGroupConfiguration implements BroadcastEndpointFa
return localBindPort;
}
public UDPBroadcastGroupConfiguration setLocalBindPort(int localBindPort)
public UDPBroadcastEndpointFactory setLocalBindPort(int localBindPort)
{
this.localBindPort = localBindPort;
return this;
@ -106,7 +97,7 @@ public final class UDPBroadcastGroupConfiguration implements BroadcastEndpointFa
return localBindAddress;
}
public UDPBroadcastGroupConfiguration setLocalBindAddress(String localBindAddress)
public UDPBroadcastEndpointFactory setLocalBindAddress(String localBindAddress)
{
this.localBindAddress = localBindAddress;
return this;
@ -324,7 +315,7 @@ public final class UDPBroadcastGroupConfiguration implements BroadcastEndpointFa
return false;
if (getClass() != obj.getClass())
return false;
UDPBroadcastGroupConfiguration other = (UDPBroadcastGroupConfiguration) obj;
UDPBroadcastEndpointFactory other = (UDPBroadcastEndpointFactory) obj;
if (groupAddress == null)
{
if (other.groupAddress != null)

View File

@ -21,6 +21,9 @@ import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy;
import org.apache.activemq.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.uri.ServerLocatorParser;
import java.net.URI;
/**
* Utility class for creating ActiveMQ {@link ClientSessionFactory} objects.
@ -112,6 +115,17 @@ public final class ActiveMQClient
public static final String DEFAULT_CORE_PROTOCOL = "CORE";
/**
* Creates a ActiveMQConnectionFactory;
*
* @return the ActiveMQConnectionFactory
*/
public static ServerLocator createServerLocator(final String url) throws Exception
{
ServerLocatorParser parser = new ServerLocatorParser();
return parser.newObject(new URI(url));
}
/**
* Create a ServerLocator which creates session factories using a static list of transportConfigurations, the ServerLocator is not updated automatically
* as the cluster topology changes, and no HA backup information is propagated to the client

View File

@ -16,8 +16,6 @@
*/
package org.apache.activemq.api.core.client;
import java.io.Serializable;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.spi.core.protocol.RemotingConnection;
@ -27,7 +25,7 @@ import org.apache.activemq.spi.core.protocol.RemotingConnection;
* Each TopologyMember represents a single server and possibly any backup server that may take over
* its duties (using the nodeId of the original server).
*/
public interface TopologyMember extends Serializable
public interface TopologyMember
{
/**
* Returns the {@code backup-group-name} of the live server and backup servers associated with

View File

@ -72,13 +72,8 @@ import org.apache.activemq.utils.UUIDGenerator;
*
* @author Tim Fox
*/
public final class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener, Serializable
public final class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener
{
/*needed for backward compatibility*/
@SuppressWarnings("unused")
private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
/*end of compatibility fixes*/
private enum STATE
{
INITIALIZED, CLOSED, CLOSING
@ -398,7 +393,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
private static DiscoveryGroup createDiscoveryGroup(String nodeID, DiscoveryGroupConfiguration config) throws Exception
{
DiscoveryGroup group = new DiscoveryGroup(nodeID, config.getName(),
config.getRefreshTimeout(), config.getBroadcastEndpointFactoryConfiguration().createBroadcastEndpointFactory(), null);
config.getRefreshTimeout(), config.getBroadcastEndpointFactory(), null);
return group;
}

View File

@ -16,12 +16,10 @@
*/
package org.apache.activemq.spi.core.remoting;
import java.io.Serializable;
/**
* @author Clebert Suconic
*/
public interface ClientProtocolManagerFactory extends Serializable
public interface ClientProtocolManagerFactory
{
ClientProtocolManager newProtocolManager();

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.uri;
import org.apache.activemq.api.core.client.ServerLocator;
import org.apache.activemq.utils.uri.URISchema;
import java.net.URI;
import java.util.Map;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
public abstract class AbstractServerLocatorSchema extends URISchema<ServerLocator>
{
protected ConnectionOptions newConnectionOptions(URI uri, Map<String, String> query) throws Exception
{
return setData(uri, new ConnectionOptions(), query);
}
}

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.uri;
/**
* This will represent all the possible options you could setup on URLs
* When parsing the URL this will serve as an intermediate object
* And it could also be a pl
* @author clebertsuconic
*/
public class ConnectionOptions
{
private boolean ha;
private String host;
private int port;
public ConnectionOptions setHost(String host)
{
this.host = host;
return this;
}
public String getHost()
{
return host;
}
public ConnectionOptions setPort(int port)
{
this.port = port;
return this;
}
public int getPort()
{
return port;
}
public boolean isHa()
{
return ha;
}
public void setHa(boolean ha)
{
this.ha = ha;
}
@Override
public String toString()
{
return "ConnectionOptions{" +
"ha=" + ha +
'}';
}
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.uri;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.api.core.client.ActiveMQClient;
import org.apache.activemq.api.core.client.ServerLocator;
import org.apache.activemq.utils.uri.SchemaConstants;
import org.apache.activemq.utils.uri.URISchema;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
public class InVMServerLocatorSchema extends AbstractServerLocatorSchema
{
@Override
public String getSchemaName()
{
return SchemaConstants.VM;
}
@Override
protected ServerLocator internalNewObject(URI uri, Map<String, String> query) throws Exception
{
TransportConfiguration tc = createTransportConfiguration(uri);
ServerLocator factory = ActiveMQClient.createServerLocatorWithoutHA(tc);
return URISchema.setData(uri, factory, query);
}
public static TransportConfiguration createTransportConfiguration(URI uri)
{
Map<String, Object> inVmTransportConfig = new HashMap<>();
inVmTransportConfig.put("serverId", uri.getHost());
return new TransportConfiguration("org.apache.activemq.core.remoting.impl.invm.InVMConnectorFactory", inVmTransportConfig);
}
@Override
protected URI internalNewURI(ServerLocator bean) throws Exception
{
return getUri(bean.getStaticTransportConfigurations());
}
public static URI getUri(TransportConfiguration[] configurations) throws URISyntaxException
{
String host = "0";
if (configurations != null && configurations.length > 0)
{
TransportConfiguration configuration = configurations[0];
Map<String, Object> params = configuration.getParams();
host = params.get("serverId") == null ? host : params.get("serverId").toString();
}
return new URI(SchemaConstants.VM, null, host, -1, null, null, null);
}
}

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.uri;
import org.apache.activemq.api.core.BroadcastEndpointFactory;
import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.api.core.JGroupsFileBroadcastEndpointFactory;
import org.apache.activemq.api.core.JGroupsPropertiesBroadcastEndpointFactory;
import org.apache.activemq.api.core.client.ActiveMQClient;
import org.apache.activemq.api.core.client.ServerLocator;
import org.apache.activemq.utils.uri.SchemaConstants;
import org.apache.activemq.utils.uri.URISchema;
import java.io.NotSerializableException;
import java.net.URI;
import java.util.Map;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
public class JGroupsServerLocatorSchema extends AbstractServerLocatorSchema
{
@Override
public String getSchemaName()
{
return SchemaConstants.JGROUPS;
}
@Override
protected ServerLocator internalNewObject(URI uri, Map<String, String> query) throws Exception
{
ConnectionOptions options = newConnectionOptions(uri, query);
DiscoveryGroupConfiguration dcConfig = getDiscoveryGroupConfiguration(uri, query);
if (options.isHa())
{
return ActiveMQClient.createServerLocatorWithHA(dcConfig);
}
else
{
return ActiveMQClient.createServerLocatorWithoutHA(dcConfig);
}
}
@Override
protected URI internalNewURI(ServerLocator bean) throws Exception
{
DiscoveryGroupConfiguration dgc = bean.getDiscoveryGroupConfiguration();
BroadcastEndpointFactory endpoint = dgc.getBroadcastEndpointFactory();
String auth;
if (endpoint instanceof JGroupsFileBroadcastEndpointFactory)
{
auth = ((JGroupsFileBroadcastEndpointFactory) endpoint).getChannelName();
}
else if (endpoint instanceof JGroupsPropertiesBroadcastEndpointFactory)
{
auth = ((JGroupsPropertiesBroadcastEndpointFactory) endpoint).getChannelName();
}
else
{
throw new NotSerializableException(endpoint + "not serializable");
}
String query = URISchema.getData(null, bean, dgc, endpoint);
dgc.setBroadcastEndpointFactory(endpoint);
return new URI(SchemaConstants.JGROUPS, null, auth, -1, null, query, null);
}
public static DiscoveryGroupConfiguration getDiscoveryGroupConfiguration(URI uri, Map<String, String> query) throws Exception
{
BroadcastEndpointFactory endpointFactory;
if (query.containsKey("file"))
{
endpointFactory = new JGroupsFileBroadcastEndpointFactory().setChannelName(uri.getAuthority());
}
else
{
endpointFactory = new JGroupsPropertiesBroadcastEndpointFactory().setChannelName(uri.getAuthority());
}
URISchema.setData(uri, endpointFactory, query);
DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration().setBroadcastEndpointFactory(endpointFactory);
URISchema.setData(uri, dcConfig, query);
return dcConfig;
}
}

View File

@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.uri;
import org.apache.activemq.api.core.client.ServerLocator;
import org.apache.activemq.utils.uri.URIFactory;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
public class ServerLocatorParser extends URIFactory<ServerLocator>
{
public ServerLocatorParser()
{
registerSchema(new TCPServerLocatorSchema());
registerSchema(new UDPServerLocatorSchema());
registerSchema(new JGroupsServerLocatorSchema());
}
}

View File

@ -0,0 +1,160 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.uri;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.api.core.client.ActiveMQClient;
import org.apache.activemq.api.core.client.ServerLocator;
import org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.utils.uri.SchemaConstants;
import org.apache.activemq.utils.uri.URISchema;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
public class TCPServerLocatorSchema extends AbstractServerLocatorSchema
{
@Override
public String getSchemaName()
{
return SchemaConstants.TCP;
}
@Override
protected ServerLocator internalNewObject(URI uri, Map<String, String> query) throws Exception
{
ConnectionOptions options = newConnectionOptions(uri, query);
TransportConfiguration[] configurations = getTransportConfigurations(uri, query);
if (options.isHa())
{
return ActiveMQClient.createServerLocatorWithHA(configurations);
}
else
{
return ActiveMQClient.createServerLocatorWithoutHA(configurations);
}
}
public static TransportConfiguration[] getTransportConfigurations(URI uri, Map<String, String> query) throws URISyntaxException
{
HashMap<String, Object> props = new HashMap<>();
URISchema.setData(uri, props, TransportConstants.ALLOWABLE_CONNECTOR_KEYS, query);
List<TransportConfiguration> transportConfigurations = new ArrayList<>();
transportConfigurations.add(new TransportConfiguration(NettyConnectorFactory.class.getName(),
props,
uri.toString()));
String connectors = uri.getFragment();
if (connectors != null)
{
String[] split = connectors.split(",");
for (String s : split)
{
URI extraUri = new URI(s);
HashMap<String, Object> newProps = new HashMap<>();
URISchema.setData(extraUri, newProps, TransportConstants.ALLOWABLE_CONNECTOR_KEYS, query);
URISchema.setData(extraUri, newProps, TransportConstants.ALLOWABLE_CONNECTOR_KEYS, URISchema.parseQuery(extraUri.getQuery(), null));
transportConfigurations.add(new TransportConfiguration(NettyConnectorFactory.class.getName(),
newProps,
extraUri.toString()));
}
}
TransportConfiguration[] configurations = new TransportConfiguration[transportConfigurations.size()];
transportConfigurations.toArray(configurations);
return configurations;
}
@Override
protected URI internalNewURI(ServerLocator bean) throws Exception
{
String query = URISchema.getData(null, bean);
TransportConfiguration[] staticConnectors = bean.getStaticTransportConfigurations();
return getURI(query, staticConnectors);
}
public static URI getURI(String query, TransportConfiguration[] staticConnectors) throws Exception
{
if (staticConnectors == null || staticConnectors.length < 1)
{
throw new Exception();
}
StringBuilder fragment = new StringBuilder();
for (int i = 1; i < staticConnectors.length; i++)
{
TransportConfiguration connector = staticConnectors[i];
Map<String, Object> params = connector.getParams();
URI extraUri = new URI(SchemaConstants.TCP, null, getHost(params), getPort(params), null, createQuery(params, null), null);
if (i > 1)
{
fragment.append(",");
}
fragment.append(extraUri.toASCIIString());
}
Map<String, Object> params = staticConnectors[0].getParams();
return new URI(SchemaConstants.TCP, null, getHost(params), getPort(params), null, createQuery(params, query), fragment.toString());
}
private static int getPort(Map<String, Object> params)
{
Object port = params.get("port");
if (port instanceof String)
{
return Integer.valueOf((String) port);
}
return port != null ? (int) port : 5445;
}
private static String getHost(Map<String, Object> params)
{
return params.get("host") != null ? (String) params.get("host") : "localhost";
}
private static String createQuery(Map<String, Object> params, String query)
{
StringBuilder cb;
if (query == null)
{
cb = new StringBuilder();
}
else
{
cb = new StringBuilder(query);
}
for (String param : params.keySet())
{
if (cb.length() > 0)
{
cb.append("&");
}
cb.append(param).append("=").append(params.get(param));
}
return cb.toString();
}
}

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.uri;
import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.api.core.client.ActiveMQClient;
import org.apache.activemq.api.core.client.ServerLocator;
import org.apache.activemq.utils.uri.SchemaConstants;
import org.apache.activemq.utils.uri.URISchema;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
public class UDPServerLocatorSchema extends AbstractServerLocatorSchema
{
protected static List<String> IGNORED = new ArrayList<>();
static
{
IGNORED.add("localBindAddress");
IGNORED.add("localBindPort");
}
@Override
public String getSchemaName()
{
return SchemaConstants.UDP;
}
@Override
protected ServerLocator internalNewObject(URI uri, Map<String, String> query) throws Exception
{
ConnectionOptions options = newConnectionOptions(uri, query);
DiscoveryGroupConfiguration dgc = getDiscoveryGroupConfiguration(uri, query, getHost(uri), getPort(uri));
if (options.isHa())
{
return ActiveMQClient.createServerLocatorWithHA(dgc);
}
else
{
return ActiveMQClient.createServerLocatorWithoutHA(dgc);
}
}
@Override
protected URI internalNewURI(ServerLocator bean) throws Exception
{
DiscoveryGroupConfiguration dgc = bean.getDiscoveryGroupConfiguration();
UDPBroadcastEndpointFactory endpoint = (UDPBroadcastEndpointFactory) dgc.getBroadcastEndpointFactory();
dgc.setBroadcastEndpointFactory(endpoint);
String query = URISchema.getData(IGNORED, bean, dgc, endpoint);
return new URI(SchemaConstants.UDP, null, endpoint.getGroupAddress(), endpoint.getGroupPort(), null, query, null);
}
public static DiscoveryGroupConfiguration getDiscoveryGroupConfiguration(URI uri, Map<String, String> query, String host, int port) throws Exception
{
UDPBroadcastEndpointFactory endpointFactoryConfiguration = new UDPBroadcastEndpointFactory()
.setGroupAddress(host)
.setGroupPort(port);
URISchema.setData(uri, endpointFactoryConfiguration, query);
DiscoveryGroupConfiguration dgc = URISchema.setData(uri, new DiscoveryGroupConfiguration(), query)
.setBroadcastEndpointFactory(endpointFactoryConfiguration);
URISchema.setData(uri, dgc, query);
return dgc;
}
}

View File

@ -37,6 +37,13 @@
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging-processor</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core-client</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core-client</artifactId>

View File

@ -29,6 +29,9 @@ import org.apache.activemq.jms.client.ActiveMQTopicConnectionFactory;
import org.apache.activemq.jms.client.ActiveMQXAConnectionFactory;
import org.apache.activemq.jms.client.ActiveMQXAQueueConnectionFactory;
import org.apache.activemq.jms.client.ActiveMQXATopicConnectionFactory;
import org.apache.activemq.uri.ConnectionFactoryParser;
import java.net.URI;
/**
* A utility class for creating ActiveMQ client-side JMS managed resources.
@ -37,6 +40,16 @@ import org.apache.activemq.jms.client.ActiveMQXATopicConnectionFactory;
*/
public class ActiveMQJMSClient
{
/**
* Creates a ActiveMQConnectionFactory;
*
* @return the ActiveMQConnectionFactory
*/
public static ActiveMQConnectionFactory createConnectionFactory(final String url) throws Exception
{
ConnectionFactoryParser parser = new ConnectionFactoryParser();
return parser.newObject(new URI(url));
}
/**
* Creates a ActiveMQConnectionFactory that receives cluster topology updates from the cluster as

View File

@ -33,16 +33,24 @@ import javax.jms.XATopicConnection;
import javax.naming.NamingException;
import javax.naming.Reference;
import javax.naming.Referenceable;
import java.io.Serializable;
import java.io.Externalizable;
import java.io.IOException;
import java.io.InvalidObjectException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.net.URI;
import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.api.core.client.ClientSessionFactory;
import org.apache.activemq.api.core.client.ActiveMQClient;
import org.apache.activemq.api.core.client.ServerLocator;
import org.apache.activemq.api.jms.JMSFactoryType;
import org.apache.activemq.jms.referenceable.ConnectionFactoryObjectFactory;
import org.apache.activemq.jms.referenceable.SerializableObjectRefAddr;
import org.apache.activemq.uri.ConnectionFactoryParser;
import org.apache.activemq.uri.ServerLocatorParser;
/**
* ActiveMQ implementation of a JMS ConnectionFactory.
@ -50,11 +58,9 @@ import org.apache.activemq.jms.referenceable.SerializableObjectRefAddr;
* @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*/
public class ActiveMQConnectionFactory implements Serializable, Referenceable, ConnectionFactory, XAConnectionFactory
public class ActiveMQConnectionFactory implements Externalizable, Referenceable, ConnectionFactory, XAConnectionFactory
{
private static final long serialVersionUID = -2810634789345348326L;
private final ServerLocator serverLocator;
private ServerLocator serverLocator;
private String clientID;
@ -62,7 +68,58 @@ public class ActiveMQConnectionFactory implements Serializable, Referenceable, C
private int transactionBatchSize = ActiveMQClient.DEFAULT_ACK_BATCH_SIZE;
private boolean readOnly;
private boolean readOnly;
public void writeExternal(ObjectOutput out) throws IOException
{
ConnectionFactoryParser parser = new ConnectionFactoryParser();
String scheme;
if (serverLocator.getDiscoveryGroupConfiguration() != null)
{
if (serverLocator.getDiscoveryGroupConfiguration().getBroadcastEndpointFactory() instanceof UDPBroadcastEndpointFactory)
{
scheme = "udp";
}
else
{
scheme = "jgroups";
}
}
else
{
scheme = "tcp";
}
try
{
URI uri = parser.createSchema(scheme, this);
out.writeUTF(uri.toASCIIString());
}
catch (Exception e)
{
if (e instanceof IOException)
{
throw (IOException) e;
}
throw new IOException(e);
}
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
String url = in.readUTF();
ConnectionFactoryParser parser = new ConnectionFactoryParser();
ServerLocatorParser locatorParser = new ServerLocatorParser();
try
{
URI uri = new URI(url);
serverLocator = locatorParser.newObject(uri);
parser.populateObject(uri, this);
}
catch (Exception e)
{
throw new InvalidObjectException(e.getMessage());
}
}
public ActiveMQConnectionFactory()
{
@ -559,7 +616,6 @@ public class ActiveMQConnectionFactory implements Serializable, Referenceable, C
public synchronized int getInitialConnectAttempts()
{
checkWrite();
return serverLocator.getInitialConnectAttempts();
}

View File

@ -67,5 +67,4 @@ public class ActiveMQJMSConnectionFactory extends ActiveMQConnectionFactory impl
{
super(ha, initialConnectors);
}
}

View File

@ -21,34 +21,14 @@ import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.NamingException;
import javax.naming.spi.InitialContextFactory;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.StringTokenizer;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.api.core.JGroupsBroadcastGroupConfiguration;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.api.core.UDPBroadcastGroupConfiguration;
import org.apache.activemq.api.core.client.ActiveMQClient;
import org.apache.activemq.api.jms.ActiveMQJMSClient;
import org.apache.activemq.api.jms.JMSFactoryType;
import org.apache.activemq.core.client.ActiveMQClientLogger;
import org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.uri.ConnectionFactoryParser;
/**
* A factory of the ActiveMQ InitialContext which contains
@ -59,52 +39,40 @@ import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
*/
public class ActiveMQInitialContextFactory implements InitialContextFactory
{
public static final String CONNECTION_FACTORY_NAMES = "connectionFactoryNames";
public static final String REFRESH_TIMEOUT = "refreshTimeout";
public static final String DISCOVERY_INITIAL_WAIT_TIMEOUT = "discoveryInitialWaitTimeout";
private static final String[] DEFAULT_CONNECTION_FACTORY_NAMES = {"ConnectionFactory", "XAConnectionFactory", "QueueConnectionFactory", "TopicConnectionFactory"};
public static final String TCP_SCHEME = "tcp";
public static final String JGROUPS_SCHEME = "jgroups";
public static final String UDP_SCHEME = "udp";
public static final String VM_SCHEME = "vm";
public static final String HA = "ha";
public static final String CF_TYPE = "type";
public static final String QUEUE_CF = "QUEUE_CF";
public static final String TOPIC_CF = "TOPIC_CF";
public static final String QUEUE_XA_CF = "QUEUE_XA_CF";
public static final String TOPIC_XA_CF = "TOPIC_XA_CF";
public static final String XA_CF = "XA_CF";
public static final String DYNAMIC_QUEUE_CONTEXT = "dynamicQueues";
public static final String DYNAMIC_TOPIC_CONTEXT = "dynamicTopics";
private String connectionPrefix = "connection.";
private String connectionFactoryPrefix = "connectionFactory.";
private String queuePrefix = "queue.";
private String topicPrefix = "topic.";
public Context getInitialContext(Hashtable environment) throws NamingException
{
// lets create a factory
Map<String, Object> data = new ConcurrentHashMap<String, Object>();
String[] names = getConnectionFactoryNames(environment);
for (int i = 0; i < names.length; i++)
Map<String, Object> data = new ConcurrentHashMap<>();
for (Iterator iter = environment.entrySet().iterator(); iter.hasNext(); )
{
ActiveMQConnectionFactory factory = null;
String name = names[i];
try
Map.Entry entry = (Map.Entry) iter.next();
String key = entry.getKey().toString();
if (key.startsWith(connectionFactoryPrefix))
{
factory = createConnectionFactory(name, environment);
String jndiName = key.substring(connectionFactoryPrefix.length());
try
{
ActiveMQConnectionFactory factory = createConnectionFactory((String) environment.get(key));
data.put(jndiName, factory);
}
catch (Exception e)
{
e.printStackTrace();
throw new NamingException("Invalid broker URL");
}
}
catch (Exception e)
{
e.printStackTrace();
throw new NamingException("Invalid broker URL");
}
data.put(name, factory);
}
createQueues(data, environment);
createTopics(data, environment);
@ -160,58 +128,6 @@ public class ActiveMQInitialContextFactory implements InitialContextFactory
return new ReadOnlyContext(environment, data);
}
protected ActiveMQConnectionFactory createConnectionFactory(String name, Hashtable environment) throws URISyntaxException, MalformedURLException
{
Hashtable connectionFactoryProperties = new Hashtable(environment);
if (DEFAULT_CONNECTION_FACTORY_NAMES[1].equals(name))
{
connectionFactoryProperties.put(CF_TYPE, XA_CF);
}
if (DEFAULT_CONNECTION_FACTORY_NAMES[2].equals(name))
{
connectionFactoryProperties.put(CF_TYPE, QUEUE_CF);
}
if (DEFAULT_CONNECTION_FACTORY_NAMES[3].equals(name))
{
connectionFactoryProperties.put(CF_TYPE, TOPIC_CF);
}
String prefix = connectionPrefix + name + ".";
for (Iterator iter = environment.entrySet().iterator(); iter.hasNext(); )
{
Map.Entry entry = (Map.Entry) iter.next();
String key = (String) entry.getKey();
if (key.startsWith(prefix))
{
// Rename the key...
connectionFactoryProperties.remove(key);
key = key.substring(prefix.length());
connectionFactoryProperties.put(key, entry.getValue());
}
}
return createConnectionFactory(connectionFactoryProperties);
}
protected String[] getConnectionFactoryNames(Map environment)
{
String factoryNames = (String) environment.get(CONNECTION_FACTORY_NAMES);
if (factoryNames != null)
{
List<String> list = new ArrayList<String>();
for (StringTokenizer enumeration = new StringTokenizer(factoryNames, ","); enumeration.hasMoreTokens(); )
{
list.add(enumeration.nextToken().trim());
}
int size = list.size();
if (size > 0)
{
String[] answer = new String[size];
list.toArray(answer);
return answer;
}
}
return DEFAULT_CONNECTION_FACTORY_NAMES;
}
protected void createQueues(Map<String, Object> data, Hashtable environment)
{
for (Iterator iter = environment.entrySet().iterator(); iter.hasNext(); )
@ -259,238 +175,9 @@ public class ActiveMQInitialContextFactory implements InitialContextFactory
/**
* Factory method to create a new connection factory from the given environment
*/
protected ActiveMQConnectionFactory createConnectionFactory(Hashtable environment) throws URISyntaxException, MalformedURLException
protected ActiveMQConnectionFactory createConnectionFactory(String uri) throws Exception
{
ActiveMQConnectionFactory connectionFactory;
Map transportConfig = new HashMap();
if (environment.containsKey(Context.PROVIDER_URL))
{
URI providerURI = new URI(((String)environment.get(Context.PROVIDER_URL)));
if (providerURI.getQuery() != null)
{
try
{
transportConfig = parseQuery(providerURI.getQuery());
}
catch (URISyntaxException e)
{
}
}
if (providerURI.getScheme().equals(TCP_SCHEME))
{
String[] connectors = providerURI.getAuthority().split(",");
TransportConfiguration[] transportConfigurations = new TransportConfiguration[connectors.length];
for (int i = 0; i < connectors.length; i++)
{
Map individualTransportConfig = new HashMap(transportConfig);
String[] hostAndPort = connectors[i].split(":");
individualTransportConfig.put(TransportConstants.HOST_PROP_NAME, hostAndPort[0]);
individualTransportConfig.put(TransportConstants.PORT_PROP_NAME, hostAndPort[1]);
transportConfigurations[i] = new TransportConfiguration(NettyConnectorFactory.class.getCanonicalName(), individualTransportConfig);
}
if (Boolean.TRUE.equals(environment.get(HA)))
{
connectionFactory = ActiveMQJMSClient.createConnectionFactoryWithHA(getJmsFactoryType(environment), transportConfigurations);
}
else
{
connectionFactory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(getJmsFactoryType(environment), transportConfigurations);
}
}
else if (providerURI.getScheme().equals(UDP_SCHEME))
{
DiscoveryGroupConfiguration dgc = new DiscoveryGroupConfiguration()
.setRefreshTimeout(transportConfig.containsKey(REFRESH_TIMEOUT) ? Long.parseLong((String) transportConfig.get(REFRESH_TIMEOUT)) : ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT)
.setDiscoveryInitialWaitTimeout(transportConfig.containsKey(DISCOVERY_INITIAL_WAIT_TIMEOUT) ? Long.parseLong((String) transportConfig.get(DISCOVERY_INITIAL_WAIT_TIMEOUT)) : ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT)
.setBroadcastEndpointFactoryConfiguration(new UDPBroadcastGroupConfiguration()
.setGroupAddress(providerURI.getHost())
.setGroupPort(providerURI.getPort())
.setLocalBindAddress(transportConfig.containsKey(TransportConstants.LOCAL_ADDRESS_PROP_NAME) ? (String) transportConfig.get(TransportConstants.LOCAL_ADDRESS_PROP_NAME) : null)
.setLocalBindPort(transportConfig.containsKey(TransportConstants.LOCAL_PORT_PROP_NAME) ? Integer.parseInt((String) transportConfig.get(TransportConstants.LOCAL_PORT_PROP_NAME)) : -1));
if (Boolean.TRUE.equals(environment.get(HA)))
{
connectionFactory = ActiveMQJMSClient.createConnectionFactoryWithHA(dgc, getJmsFactoryType(environment));
}
else
{
connectionFactory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(dgc, getJmsFactoryType(environment));
}
}
else if (providerURI.getScheme().equals(JGROUPS_SCHEME))
{
JGroupsBroadcastGroupConfiguration config = new JGroupsBroadcastGroupConfiguration(providerURI.getAuthority(), providerURI.getPath() != null ? providerURI.getPath() : UUID.randomUUID().toString());
DiscoveryGroupConfiguration dgc = new DiscoveryGroupConfiguration()
.setRefreshTimeout(transportConfig.containsKey(REFRESH_TIMEOUT) ? Long.parseLong((String) transportConfig.get(REFRESH_TIMEOUT)) : ActiveMQClient.DEFAULT_DISCOVERY_REFRESH_TIMEOUT)
.setDiscoveryInitialWaitTimeout(transportConfig.containsKey(DISCOVERY_INITIAL_WAIT_TIMEOUT) ? Long.parseLong((String) transportConfig.get(DISCOVERY_INITIAL_WAIT_TIMEOUT)) : ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT)
.setBroadcastEndpointFactoryConfiguration(config);
if (Boolean.TRUE.equals(environment.get(HA)))
{
connectionFactory = ActiveMQJMSClient.createConnectionFactoryWithHA(dgc, getJmsFactoryType(environment));
}
else
{
connectionFactory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(dgc, getJmsFactoryType(environment));
}
}
else if (providerURI.getScheme().equals(VM_SCHEME))
{
Map inVmTransportConfig = new HashMap();
inVmTransportConfig.put("serverId", providerURI.getHost());
TransportConfiguration tc = new TransportConfiguration("org.apache.activemq.core.remoting.impl.invm.InVMConnectorFactory", inVmTransportConfig);
connectionFactory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(getJmsFactoryType(environment), tc);
}
else
{
throw new IllegalArgumentException("Invalid scheme");
}
}
else
{
TransportConfiguration tc = new TransportConfiguration("org.apache.activemq.core.remoting.impl.invm.InVMConnectorFactory");
connectionFactory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(getJmsFactoryType(environment), tc);
}
Properties properties = new Properties();
properties.putAll(environment);
for (Object key : environment.keySet())
{
invokeSetter(connectionFactory, (String) key, environment.get(key));
}
return connectionFactory;
}
private JMSFactoryType getJmsFactoryType(Hashtable environment)
{
JMSFactoryType ultimateType = JMSFactoryType.CF; // default value
if (environment.containsKey(CF_TYPE))
{
String tempType = (String) environment.get(CF_TYPE);
if (QUEUE_CF.equals(tempType))
{
ultimateType = JMSFactoryType.QUEUE_CF;
}
else if (TOPIC_CF.equals(tempType))
{
ultimateType = JMSFactoryType.TOPIC_CF;
}
else if (QUEUE_XA_CF.equals(tempType))
{
ultimateType = JMSFactoryType.QUEUE_XA_CF;
}
else if (TOPIC_XA_CF.equals(tempType))
{
ultimateType = JMSFactoryType.TOPIC_XA_CF;
}
else if (XA_CF.equals(tempType))
{
ultimateType = JMSFactoryType.XA_CF;
}
}
return ultimateType;
}
public static Map<String, String> parseQuery(String uri) throws URISyntaxException
{
try
{
uri = uri.substring(uri.lastIndexOf("?") + 1); // get only the relevant part of the query
Map<String, String> rc = new HashMap<String, String>();
if (uri != null && !uri.isEmpty())
{
String[] parameters = uri.split("&");
for (int i = 0; i < parameters.length; i++)
{
int p = parameters[i].indexOf("=");
if (p >= 0)
{
String name = URLDecoder.decode(parameters[i].substring(0, p), "UTF-8");
String value = URLDecoder.decode(parameters[i].substring(p + 1), "UTF-8");
rc.put(name, value);
}
else
{
rc.put(parameters[i], null);
}
}
}
return rc;
}
catch (UnsupportedEncodingException e)
{
throw (URISyntaxException) new URISyntaxException(e.toString(), "Invalid encoding").initCause(e);
}
}
public String getConnectionPrefix()
{
return connectionPrefix;
}
public void setConnectionPrefix(String connectionPrefix)
{
this.connectionPrefix = connectionPrefix;
}
private void invokeSetter(Object target, final String propertyName, final Object propertyValue)
{
Method setter = null;
Method[] methods = target.getClass().getMethods();
// turn something like "consumerWindowSize" to "setConsumerWindowSize"
String setterMethodName = "set" + Character.toUpperCase(propertyName.charAt(0)) + propertyName.substring(1);
for (Method m : methods)
{
if (m.getName().equals(setterMethodName))
{
setter = m;
break;
}
}
try
{
if (setter != null)
{
ActiveMQClientLogger.LOGGER.info("Invoking: " + setter + " that takes a " + setter.getParameterTypes()[0] + " with a " + propertyValue.getClass());
if (propertyValue.getClass() == String.class && setter.getParameterTypes()[0] != String.class)
{
String stringPropertyValue = (String) propertyValue;
if (setter.getParameterTypes()[0] == Integer.TYPE)
{
setter.invoke(target, Integer.parseInt(stringPropertyValue));
}
else if (setter.getParameterTypes()[0] == Long.TYPE)
{
setter.invoke(target, Long.parseLong(stringPropertyValue));
}
else if (setter.getParameterTypes()[0] == Double.TYPE)
{
setter.invoke(target, Double.parseDouble(stringPropertyValue));
}
else if (setter.getParameterTypes()[0] == Boolean.TYPE)
{
setter.invoke(target, Boolean.parseBoolean(stringPropertyValue));
}
}
else
{
setter.invoke(target, propertyValue);
}
}
}
catch (Exception e)
{
ActiveMQClientLogger.LOGGER.warn("Caught exception during invocation of: " + setter, e);
}
ConnectionFactoryParser parser = new ConnectionFactoryParser();
return parser.newObject(uri);
}
}

View File

@ -31,15 +31,15 @@ import org.apache.activemq.utils.uri.URISchema;
public abstract class AbstractCFSchema extends URISchema<ActiveMQConnectionFactory>
{
protected ConnectionOptions newConectionOptions(URI uri, Map<String, String> query) throws Exception
protected JMSConnectionOptions newConectionOptions(URI uri, Map<String, String> query) throws Exception
{
String type = query.get("type");
// We do this check here to guarantee proper logging
if (ConnectionOptions.convertCFType(type) == null)
if (JMSConnectionOptions.convertCFType(type) == null)
{
ActiveMQClientLogger.LOGGER.invalidCFType(type, uri.toString());
}
return setData(uri, new ConnectionOptions(), query);
return setData(uri, new JMSConnectionOptions(), query);
}
}

View File

@ -28,7 +28,9 @@ public class ConnectionFactoryParser extends URIFactory<ActiveMQConnectionFactor
{
public ConnectionFactoryParser()
{
registerSchema(new TCPSchema());
registerSchema(new UDPSchema());
registerSchema(new JGroupsSchema());
registerSchema(new InVMSchema());
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.uri;
import org.apache.activemq.api.jms.ActiveMQJMSClient;
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.utils.uri.SchemaConstants;
import org.apache.activemq.utils.uri.URISchema;
import java.net.URI;
import java.util.Map;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
public class InVMSchema extends AbstractCFSchema
{
@Override
public String getSchemaName()
{
return SchemaConstants.VM;
}
@Override
protected ActiveMQConnectionFactory internalNewObject(URI uri, Map<String, String> query) throws Exception
{
JMSConnectionOptions options = newConectionOptions(uri, query);
ActiveMQConnectionFactory factory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(options.getFactoryTypeEnum(), InVMServerLocatorSchema.createTransportConfiguration(uri));
return URISchema.setData(uri, factory, query);
}
@Override
protected URI internalNewURI(ActiveMQConnectionFactory bean) throws Exception
{
return InVMServerLocatorSchema.getUri(bean.getStaticConnectors());
}
}

View File

@ -17,14 +17,17 @@
package org.apache.activemq.uri;
import java.io.NotSerializableException;
import java.net.URI;
import java.util.Map;
import java.util.UUID;
import org.apache.activemq.api.core.BroadcastEndpointFactory;
import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.api.core.JGroupsBroadcastGroupConfiguration;
import org.apache.activemq.api.core.JGroupsFileBroadcastEndpointFactory;
import org.apache.activemq.api.core.JGroupsPropertiesBroadcastEndpointFactory;
import org.apache.activemq.api.jms.ActiveMQJMSClient;
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.utils.uri.SchemaConstants;
import org.apache.activemq.utils.uri.URISchema;
/**
@ -36,34 +39,48 @@ public class JGroupsSchema extends AbstractCFSchema
@Override
public String getSchemaName()
{
return "jgroups";
return SchemaConstants.JGROUPS;
}
@Override
public ActiveMQConnectionFactory internalNewObject(URI uri, Map<String, String> query) throws Exception
{
ConnectionOptions options = newConectionOptions(uri, query);
System.out.println("authority = " + uri.getAuthority() + " path = " + uri.getPath());
JGroupsBroadcastGroupConfiguration jgroupsConfig = new JGroupsBroadcastGroupConfiguration(uri.getAuthority(), uri.getPath() != null ? uri.getPath() : UUID.randomUUID().toString());
URISchema.setData(uri, jgroupsConfig, query);
DiscoveryGroupConfiguration dcConfig = new DiscoveryGroupConfiguration().setBroadcastEndpointFactoryConfiguration(jgroupsConfig);
URISchema.setData(uri, dcConfig, query);
JMSConnectionOptions options = newConectionOptions(uri, query);
DiscoveryGroupConfiguration dcConfig = JGroupsServerLocatorSchema.getDiscoveryGroupConfiguration(uri, query);
ActiveMQConnectionFactory factory;
if (options.isHa())
{
return ActiveMQJMSClient.createConnectionFactoryWithHA(dcConfig, options.getFactoryTypeEnum());
factory = ActiveMQJMSClient.createConnectionFactoryWithHA(dcConfig, options.getFactoryTypeEnum());
}
else
{
return ActiveMQJMSClient.createConnectionFactoryWithoutHA(dcConfig, options.getFactoryTypeEnum());
factory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(dcConfig, options.getFactoryTypeEnum());
}
return URISchema.setData(uri, factory, query);
}
@Override
protected URI internalNewURI(ActiveMQConnectionFactory bean) throws Exception
{
DiscoveryGroupConfiguration dgc = bean.getDiscoveryGroupConfiguration();
BroadcastEndpointFactory endpoint = dgc.getBroadcastEndpointFactory();
String auth;
if (endpoint instanceof JGroupsFileBroadcastEndpointFactory)
{
auth = ((JGroupsFileBroadcastEndpointFactory) endpoint).getChannelName();
}
else if (endpoint instanceof JGroupsPropertiesBroadcastEndpointFactory)
{
auth = ((JGroupsPropertiesBroadcastEndpointFactory) endpoint).getChannelName();
}
else
{
throw new NotSerializableException(endpoint + "not serializable");
}
String query = URISchema.getData(null, bean, dgc, endpoint);
dgc.setBroadcastEndpointFactory(endpoint);
return new URI(SchemaConstants.JGROUPS, null, auth, -1, null, query, null);
}
}

View File

@ -26,50 +26,10 @@ import org.apache.activemq.api.jms.JMSFactoryType;
* @author clebertsuconic
*/
public class ConnectionOptions
public class JMSConnectionOptions extends ConnectionOptions
{
private boolean ha;
private JMSFactoryType factoryType = JMSFactoryType.CF;
private String host;
private int port;
public ConnectionOptions setHost(String host)
{
this.host = host;
return this;
}
public String getHost()
{
return host;
}
public ConnectionOptions setPort(int port)
{
this.port = port;
return this;
}
public int getPort()
{
return port;
}
public boolean isHa()
{
return ha;
}
public void setHa(boolean ha)
{
this.ha = ha;
}
public JMSFactoryType getFactoryTypeEnum()
{
return factoryType;
@ -96,7 +56,7 @@ public class ConnectionOptions
{
if (type == null)
{
return null;
return JMSFactoryType.CF;
}
else
{
@ -112,8 +72,7 @@ public class ConnectionOptions
@Override
public String toString()
{
return "ConnectionOptions{" +
"ha=" + ha +
return "JMSConnectionOptions{" +
", factoryType=" + factoryType +
'}';
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.uri;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.api.jms.ActiveMQJMSClient;
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.utils.uri.SchemaConstants;
import org.apache.activemq.utils.uri.URISchema;
import java.net.URI;
import java.util.Map;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
*/
public class TCPSchema extends AbstractCFSchema
{
@Override
public String getSchemaName()
{
return SchemaConstants.TCP;
}
@Override
protected ActiveMQConnectionFactory internalNewObject(URI uri, Map<String, String> query) throws Exception
{
JMSConnectionOptions options = newConectionOptions(uri, query);
TransportConfiguration[] configurations = TCPServerLocatorSchema.getTransportConfigurations(uri, query);
ActiveMQConnectionFactory factory;
if (options.isHa())
{
factory = ActiveMQJMSClient.createConnectionFactoryWithHA(options.getFactoryTypeEnum(), configurations);
}
else
{
factory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(options.getFactoryTypeEnum(), configurations);
}
return URISchema.setData(uri, factory, query);
}
@Override
protected URI internalNewURI(ActiveMQConnectionFactory bean) throws Exception
{
String query = URISchema.getData(null, bean);
TransportConfiguration[] staticConnectors = bean.getStaticConnectors();
return TCPServerLocatorSchema.getURI(query, staticConnectors);
}
}

View File

@ -17,15 +17,14 @@
package org.apache.activemq.uri;
import java.io.PrintStream;
import java.net.URI;
import java.util.Map;
import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.api.core.UDPBroadcastGroupConfiguration;
import org.apache.activemq.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.api.jms.ActiveMQJMSClient;
import org.apache.activemq.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.utils.uri.SchemaConstants;
import org.apache.activemq.utils.uri.URISchema;
/**
@ -37,29 +36,35 @@ public class UDPSchema extends AbstractCFSchema
@Override
public String getSchemaName()
{
return "udp";
return SchemaConstants.UDP;
}
@Override
public ActiveMQConnectionFactory internalNewObject(URI uri, Map<String, String> query) throws Exception
{
ConnectionOptions options = newConectionOptions(uri, query);
JMSConnectionOptions options = newConectionOptions(uri, query);
DiscoveryGroupConfiguration dgc = URISchema.setData(uri, new DiscoveryGroupConfiguration(), query)
.setBroadcastEndpointFactoryConfiguration(new UDPBroadcastGroupConfiguration()
.setGroupAddress(getHost(uri))
.setGroupPort(getPort(uri))
.setLocalBindAddress(query.containsKey(TransportConstants.LOCAL_ADDRESS_PROP_NAME) ? (String) query.get(TransportConstants.LOCAL_ADDRESS_PROP_NAME) : null)
.setLocalBindPort(query.containsKey(TransportConstants.LOCAL_PORT_PROP_NAME) ? Integer.parseInt((String) query.get(TransportConstants.LOCAL_PORT_PROP_NAME)) : -1));
DiscoveryGroupConfiguration dgc = UDPServerLocatorSchema.getDiscoveryGroupConfiguration(uri, query, getHost(uri), getPort(uri));
ActiveMQConnectionFactory factory;
if (options.isHa())
{
return ActiveMQJMSClient.createConnectionFactoryWithHA(dgc, options.getFactoryTypeEnum());
factory = ActiveMQJMSClient.createConnectionFactoryWithHA(dgc, options.getFactoryTypeEnum());
}
else
{
return ActiveMQJMSClient.createConnectionFactoryWithoutHA(dgc, options.getFactoryTypeEnum());
factory = ActiveMQJMSClient.createConnectionFactoryWithoutHA(dgc, options.getFactoryTypeEnum());
}
return URISchema.setData(uri, factory, query);
}
@Override
protected URI internalNewURI(ActiveMQConnectionFactory bean) throws Exception
{
DiscoveryGroupConfiguration dgc = bean.getDiscoveryGroupConfiguration();
UDPBroadcastEndpointFactory endpoint = (UDPBroadcastEndpointFactory) dgc.getBroadcastEndpointFactory();
String query = URISchema.getData(UDPServerLocatorSchema.IGNORED, bean, dgc, endpoint);
dgc.setBroadcastEndpointFactory(endpoint);
return new URI(SchemaConstants.UDP, null, endpoint.getGroupAddress(), endpoint.getGroupPort(), null, query, null);
}
}

View File

@ -17,11 +17,31 @@
package org.apache.activemq.uri;
import javax.jms.ConnectionFactory;
import javax.jms.XAQueueConnectionFactory;
import java.beans.PropertyDescriptor;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.api.core.BroadcastEndpointFactory;
import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.api.core.JGroupsFileBroadcastEndpointFactory;
import org.apache.activemq.api.core.JGroupsPropertiesBroadcastEndpointFactory;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.api.jms.ActiveMQJMSClient;
import org.apache.activemq.api.jms.JMSFactoryType;
import org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.jms.client.ActiveMQJMSConnectionFactory;
import org.apache.activemq.jms.client.ActiveMQQueueConnectionFactory;
import org.apache.activemq.jms.client.ActiveMQTopicConnectionFactory;
import org.apache.activemq.jms.client.ActiveMQXAQueueConnectionFactory;
import org.apache.activemq.jms.client.ActiveMQXATopicConnectionFactory;
import org.apache.activemq.tests.util.RandomUtil;
import org.apache.commons.beanutils.BeanUtilsBean;
import org.junit.Assert;
import org.junit.Test;
@ -33,13 +53,212 @@ public class ConnectionFactoryURITest
{
ConnectionFactoryParser parser = new ConnectionFactoryParser();
@Test
public void testQUEUE_XA_CF() throws Exception
{
ActiveMQConnectionFactory factory = parser.newObject(new URI("tcp://localhost:3030?ha=true&type=QUEUE_XA_CF"));
Assert.assertTrue(ActiveMQXAQueueConnectionFactory.class.getName().equals(factory.getClass().getName()));
}
@Test
public void testTOPICXA_CF() throws Exception
{
ActiveMQConnectionFactory factory = parser.newObject(new URI("tcp://localhost:3030?ha=true&type=TOPIC_XA_CF"));
Assert.assertTrue(ActiveMQXATopicConnectionFactory.class.getName().equals(factory.getClass().getName()));
}
@Test
public void testQUEUE_CF() throws Exception
{
ActiveMQConnectionFactory factory = parser.newObject(new URI("tcp://localhost:3030?ha=true&type=QUEUE_CF"));
Assert.assertTrue(ActiveMQQueueConnectionFactory.class.getName().equals(factory.getClass().getName()));
}
@Test
public void testTOPIC_CF() throws Exception
{
ActiveMQConnectionFactory factory = parser.newObject(new URI("tcp://localhost:3030?ha=true&type=TOPIC_CF"));
Assert.assertTrue(ActiveMQTopicConnectionFactory.class.getName().equals(factory.getClass().getName()));
}
@Test
public void testCF() throws Exception
{
ActiveMQConnectionFactory factory = parser.newObject(new URI("tcp://localhost:3030?ha=true&type=CF"));
Assert.assertTrue(ActiveMQJMSConnectionFactory.class.getName().equals(factory.getClass().getName()));
}
@Test
public void testNoCF() throws Exception
{
ActiveMQConnectionFactory factory = parser.newObject(new URI("tcp://localhost:3030?ha=true"));
Assert.assertTrue(ActiveMQJMSConnectionFactory.class.getName().equals(factory.getClass().getName()));
}
@Test
public void testTCPAllProperties() throws Exception
{
StringBuilder sb = new StringBuilder();
sb.append("tcp://localhost:3030?ha=true");
BeanUtilsBean bean = new BeanUtilsBean();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(true, (TransportConfiguration) null);
populate(sb, bean, factory);
ActiveMQConnectionFactory factory2 = parser.newObject(new URI(sb.toString()));
checkEquals(bean, factory, factory2);
}
@Test
public void testTCPAllNettyConnectorProperties() throws Exception
{
Map<String, Object> props = new HashMap<>();
Set<String> allowableConnectorKeys = TransportConstants.ALLOWABLE_CONNECTOR_KEYS;
StringBuilder sb = new StringBuilder();
sb.append("tcp://localhost:3030?ha=true");
populateConnectorParams(props, allowableConnectorKeys, sb);
ActiveMQConnectionFactory factory = parser.newObject(new URI(sb.toString()));
Map<String, Object> params = factory.getStaticConnectors()[0].getParams();
Assert.assertEquals(params.get("host"), "localhost");
Assert.assertEquals(params.get("port"), 3030);
for (Map.Entry<String, Object> entry : params.entrySet())
{
if (!entry.getKey().equals("host") && !entry.getKey().equals("port"))
{
Assert.assertEquals(entry.getValue(), props.get(entry.getKey()));
}
}
}
@Test
public void testTCPAllNettyConnectorPropertiesMultiple() throws Exception
{
Map<String, Object> props = new HashMap<>();
Set<String> allowableConnectorKeys = TransportConstants.ALLOWABLE_CONNECTOR_KEYS;
StringBuilder sb = new StringBuilder();
sb.append("(tcp://localhost0:5445?");//localhost1:5446,localhost2:5447,localhost3:5448)&ha=true");
populateConnectorParams(props, allowableConnectorKeys, sb);
Map<String, Object> props2 = new HashMap<>();
sb.append(",tcp://localhost1:5446?");
populateConnectorParams(props2, allowableConnectorKeys, sb);
Map<String, Object> props3 = new HashMap<>();
sb.append(",tcp://localhost2:5447?");
populateConnectorParams(props3, allowableConnectorKeys, sb);
sb.append(")?ha=true&clientID=myID");
ActiveMQConnectionFactory factory = parser.newObject(sb.toString());
TransportConfiguration[] staticConnectors = factory.getStaticConnectors();
Assert.assertEquals(3, staticConnectors.length);
checkTC(props, staticConnectors[0],0);
checkTC(props2, staticConnectors[1],1);
checkTC(props3, staticConnectors[2],2);
}
private void checkTC(Map<String, Object> props, TransportConfiguration staticConnector, int offfSet)
{
TransportConfiguration connector = staticConnector;
Assert.assertEquals(connector.getParams().get("host"), "localhost" + offfSet);
Assert.assertEquals(connector.getParams().get("port"), (5445 + offfSet));
Map<String, Object> params = connector.getParams();
for (Map.Entry<String, Object> entry : params.entrySet())
{
if (!entry.getKey().equals("host") && !entry.getKey().equals("port"))
{
Assert.assertEquals(entry.getValue(), props.get(entry.getKey()));
}
}
}
private void populateConnectorParams(Map<String, Object> props, Set<String> allowableConnectorKeys, StringBuilder sb)
{
for (String allowableConnectorKey : allowableConnectorKeys)
{
if (!allowableConnectorKey.equals("host") && !allowableConnectorKey.equals("port"))
{
String value = RandomUtil.randomString();
props.put(allowableConnectorKey, value);
sb.append("&").append(allowableConnectorKey).append("=").append(value);
}
}
}
@Test
public void testTCPURI() throws Exception
{
TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName());
HashMap<String, Object> params = new HashMap<>();
params.put("host", "localhost1");
params.put("port", 5446);
TransportConfiguration tc2 = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
HashMap<String, Object> params2 = new HashMap<>();
params2.put("host", "localhost2");
params2.put("port", 5447);
TransportConfiguration tc3 = new TransportConfiguration(NettyConnectorFactory.class.getName(), params2);
ActiveMQConnectionFactory connectionFactoryWithHA = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, tc, tc2, tc3);
URI tcp = parser.createSchema("tcp", connectionFactoryWithHA);
ActiveMQConnectionFactory factory = parser.newObject(tcp);
BeanUtilsBean bean = new BeanUtilsBean();
checkEquals(bean, connectionFactoryWithHA, factory);
}
@Test
public void testUDP() throws Exception
{
ActiveMQConnectionFactory factory = parser.newObject(new URI("udp://localhost:3030?ha=true&type=QUEUE_XA_CF"));
Assert.assertTrue(factory instanceof XAQueueConnectionFactory);
Assert.assertTrue(ActiveMQXAQueueConnectionFactory.class.getName().equals(factory.getClass().getName()));
}
@Test
public void testUDPAllProperties() throws Exception
{
StringBuilder sb = new StringBuilder();
sb.append("udp://localhost:3030?ha=true");
BeanUtilsBean bean = new BeanUtilsBean();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(true, (TransportConfiguration) null);
populate(sb, bean, factory);
ActiveMQConnectionFactory factory2 = parser.newObject(new URI(sb.toString()));
checkEquals(bean, factory, factory2);
}
@Test
public void testUDPURI() throws Exception
{
DiscoveryGroupConfiguration discoveryGroupConfiguration = new DiscoveryGroupConfiguration();
UDPBroadcastEndpointFactory endpoint = new UDPBroadcastEndpointFactory();
endpoint.setGroupPort(3333).setGroupAddress("wahey").setLocalBindPort(555).setLocalBindAddress("uhuh");
discoveryGroupConfiguration.setName("foo")
.setRefreshTimeout(12345)
.setDiscoveryInitialWaitTimeout(5678)
.setBroadcastEndpointFactory(endpoint);
ActiveMQConnectionFactory connectionFactoryWithHA = ActiveMQJMSClient.createConnectionFactoryWithHA(discoveryGroupConfiguration, JMSFactoryType.CF);
URI tcp = parser.createSchema("udp", connectionFactoryWithHA);
ActiveMQConnectionFactory factory = parser.newObject(tcp);
DiscoveryGroupConfiguration dgc = factory.getDiscoveryGroupConfiguration();
Assert.assertNotNull(dgc);
BroadcastEndpointFactory befc = dgc.getBroadcastEndpointFactory();
Assert.assertNotNull(befc);
Assert.assertTrue(befc instanceof UDPBroadcastEndpointFactory);
UDPBroadcastEndpointFactory ubgc = (UDPBroadcastEndpointFactory) befc;
Assert.assertEquals(ubgc.getGroupAddress(), "wahey");
Assert.assertEquals(ubgc.getGroupPort(), 3333);
//these 2 are transient
Assert.assertEquals(ubgc.getLocalBindAddress(), null);
Assert.assertEquals(ubgc.getLocalBindPort(), -1);
Assert.assertEquals(dgc.getName(), "foo");
Assert.assertEquals(dgc.getDiscoveryInitialWaitTimeout(), 5678);
Assert.assertEquals(dgc.getRefreshTimeout(), 12345);
BeanUtilsBean bean = new BeanUtilsBean();
checkEquals(bean, connectionFactoryWithHA, factory);
}
@Test
@ -47,14 +266,149 @@ public class ConnectionFactoryURITest
{
ActiveMQConnectionFactory factory = parser.newObject(new URI("udp://localhost:3030?ha=true&type=QUEUE_XA_CFInvalid"));
Assert.assertTrue(factory instanceof ConnectionFactory);
Assert.assertTrue(ActiveMQJMSConnectionFactory.class.getName().equals(factory.getClass().getName()));
}
@Test
public void testJGroups() throws Exception
public void testJGroupsFile() throws Exception
{
ActiveMQConnectionFactory factory = parser.newObject(new URI("jgroups://test.xml?test=33"));
ActiveMQConnectionFactory factory = parser.newObject(new URI("jgroups://channel-name?file=/path/to/some/file/channel-file.xml&test=33"));
// Assert.assertTrue(factory instanceof ConnectionFactory);
Assert.assertTrue(ActiveMQJMSConnectionFactory.class.getName().equals(factory.getClass().getName()));
JGroupsFileBroadcastEndpointFactory broadcastEndpointFactory = (JGroupsFileBroadcastEndpointFactory) factory.getDiscoveryGroupConfiguration().getBroadcastEndpointFactory();
Assert.assertEquals(broadcastEndpointFactory.getFile(), "/path/to/some/file/channel-file.xml");
Assert.assertEquals(broadcastEndpointFactory.getChannelName(), "channel-name");
}
@Test
public void testJGroupsKeyValue() throws Exception
{
ActiveMQConnectionFactory factory = parser.newObject(new URI("jgroups://channel-name?properties=param=value;param2=value2&test=33"));
Assert.assertTrue(ActiveMQJMSConnectionFactory.class.getName().equals(factory.getClass().getName()));
JGroupsPropertiesBroadcastEndpointFactory broadcastEndpointFactory = (JGroupsPropertiesBroadcastEndpointFactory) factory.getDiscoveryGroupConfiguration().getBroadcastEndpointFactory();
Assert.assertEquals(broadcastEndpointFactory.getProperties(), "param=value;param2=value2");
Assert.assertEquals(broadcastEndpointFactory.getChannelName(), "channel-name");
}
@Test
public void testJGroupsAllProperties() throws Exception
{
StringBuilder sb = new StringBuilder();
sb.append("jgroups://?file=param=value;param=value&channelName=channelName&ha=true");
BeanUtilsBean bean = new BeanUtilsBean();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(true, (TransportConfiguration) null);
populate(sb, bean, factory);
ActiveMQConnectionFactory factory2 = parser.newObject(new URI(sb.toString()));
checkEquals(bean, factory, factory2);
}
@Test
public void testJGroupsFileURI() throws Exception
{
DiscoveryGroupConfiguration discoveryGroupConfiguration = new DiscoveryGroupConfiguration();
JGroupsFileBroadcastEndpointFactory endpointFactory = new JGroupsFileBroadcastEndpointFactory()
.setChannelName("channel-name")
.setFile("channel-file.xml");
discoveryGroupConfiguration.setName("foo")
.setRefreshTimeout(12345)
.setDiscoveryInitialWaitTimeout(5678)
.setBroadcastEndpointFactory(endpointFactory);
ActiveMQConnectionFactory connectionFactoryWithHA = ActiveMQJMSClient.createConnectionFactoryWithHA(discoveryGroupConfiguration, JMSFactoryType.CF);
URI tcp = parser.createSchema("jgroups", connectionFactoryWithHA);
ActiveMQConnectionFactory factory = parser.newObject(tcp);
DiscoveryGroupConfiguration dgc = factory.getDiscoveryGroupConfiguration();
Assert.assertNotNull(dgc);
BroadcastEndpointFactory befc = dgc.getBroadcastEndpointFactory();
Assert.assertNotNull(befc);
Assert.assertTrue(befc instanceof JGroupsFileBroadcastEndpointFactory);
Assert.assertEquals(dgc.getName(), "foo");
Assert.assertEquals(dgc.getDiscoveryInitialWaitTimeout(), 5678);
Assert.assertEquals(dgc.getRefreshTimeout(), 12345);
JGroupsFileBroadcastEndpointFactory fileBroadcastEndpointFactory = (JGroupsFileBroadcastEndpointFactory) befc;
Assert.assertEquals(fileBroadcastEndpointFactory.getFile(), "channel-file.xml");
Assert.assertEquals(fileBroadcastEndpointFactory.getChannelName(), "channel-name");
BeanUtilsBean bean = new BeanUtilsBean();
checkEquals(bean, connectionFactoryWithHA, factory);
}
@Test
public void testJGroupsPropertiesURI() throws Exception
{
DiscoveryGroupConfiguration discoveryGroupConfiguration = new DiscoveryGroupConfiguration();
JGroupsPropertiesBroadcastEndpointFactory endpointFactory = new JGroupsPropertiesBroadcastEndpointFactory()
.setChannelName("channel-name")
.setProperties("param=val,param2-val2");
discoveryGroupConfiguration.setName("foo")
.setRefreshTimeout(12345)
.setDiscoveryInitialWaitTimeout(5678)
.setBroadcastEndpointFactory(endpointFactory);
ActiveMQConnectionFactory connectionFactoryWithHA = ActiveMQJMSClient.createConnectionFactoryWithHA(discoveryGroupConfiguration, JMSFactoryType.CF);
URI tcp = parser.createSchema("jgroups", connectionFactoryWithHA);
ActiveMQConnectionFactory factory = parser.newObject(tcp);
DiscoveryGroupConfiguration dgc = factory.getDiscoveryGroupConfiguration();
Assert.assertNotNull(dgc);
BroadcastEndpointFactory broadcastEndpointFactory = dgc.getBroadcastEndpointFactory();
Assert.assertNotNull(broadcastEndpointFactory);
Assert.assertTrue(broadcastEndpointFactory instanceof JGroupsPropertiesBroadcastEndpointFactory);
Assert.assertEquals(dgc.getName(), "foo");
Assert.assertEquals(dgc.getDiscoveryInitialWaitTimeout(), 5678);
Assert.assertEquals(dgc.getRefreshTimeout(), 12345);
JGroupsPropertiesBroadcastEndpointFactory propertiesBroadcastEndpointFactory = (JGroupsPropertiesBroadcastEndpointFactory) broadcastEndpointFactory;
Assert.assertEquals(propertiesBroadcastEndpointFactory.getProperties(), "param=val,param2-val2");
Assert.assertEquals(propertiesBroadcastEndpointFactory.getChannelName(), "channel-name");
BeanUtilsBean bean = new BeanUtilsBean();
checkEquals(bean, connectionFactoryWithHA, factory);
}
private void populate(StringBuilder sb, BeanUtilsBean bean, ActiveMQConnectionFactory factory) throws IllegalAccessException, InvocationTargetException
{
PropertyDescriptor[] descriptors = bean.getPropertyUtils().getPropertyDescriptors(factory);
for (PropertyDescriptor descriptor : descriptors)
{
if (descriptor.getWriteMethod() != null && descriptor.getReadMethod() != null)
{
if (descriptor.getPropertyType() == String.class)
{
String value = RandomUtil.randomString();
bean.setProperty(factory, descriptor.getName(), value);
sb.append("&").append(descriptor.getName()).append("=").append(value);
}
else if (descriptor.getPropertyType() == int.class)
{
int value = RandomUtil.randomPositiveInt();
bean.setProperty(factory, descriptor.getName(), value);
sb.append("&").append(descriptor.getName()).append("=").append(value);
}
else if (descriptor.getPropertyType() == long.class)
{
long value = RandomUtil.randomPositiveLong();
bean.setProperty(factory, descriptor.getName(), value);
sb.append("&").append(descriptor.getName()).append("=").append(value);
}
else if (descriptor.getPropertyType() == double.class)
{
double value = RandomUtil.randomDouble();
bean.setProperty(factory, descriptor.getName(), value);
sb.append("&").append(descriptor.getName()).append("=").append(value);
}
}
}
}
private void checkEquals(BeanUtilsBean bean, ActiveMQConnectionFactory factory, ActiveMQConnectionFactory factory2) throws IllegalAccessException, InvocationTargetException, NoSuchMethodException
{
PropertyDescriptor[] descriptors = bean.getPropertyUtils().getPropertyDescriptors(factory);
for (PropertyDescriptor descriptor : descriptors)
{
if (descriptor.getWriteMethod() != null && descriptor.getReadMethod() != null)
{
Assert.assertEquals(descriptor.getName() + " incorrect", bean.getProperty(factory, descriptor.getName()),bean.getProperty(factory2, descriptor.getName()));
}
}
}
}

View File

@ -39,11 +39,12 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.activemq.api.core.ActiveMQException;
import org.apache.activemq.api.core.BroadcastEndpointFactoryConfiguration;
import org.apache.activemq.api.core.BroadcastEndpointFactory;
import org.apache.activemq.api.core.ChannelBroadcastEndpointFactory;
import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.api.core.JGroupsBroadcastGroupConfiguration;
import org.apache.activemq.api.core.JGroupsFileBroadcastEndpointFactory;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.api.core.UDPBroadcastGroupConfiguration;
import org.apache.activemq.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.api.core.client.ClientSession;
import org.apache.activemq.api.core.client.ClientSessionFactory;
import org.apache.activemq.api.core.client.ActiveMQClient;
@ -1873,13 +1874,13 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable
if (discoveryAddress != null || jgroupsFileName != null || jgroupsLocatorClassName != null)
{
BroadcastEndpointFactoryConfiguration endpointFactoryConfiguration = null;
BroadcastEndpointFactory endpointFactory = null;
if (jgroupsLocatorClassName != null)
{
String jchannelRefName = raProperties.getJgroupsChannelRefName();
JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClassName, jchannelRefName);
endpointFactoryConfiguration = new JGroupsBroadcastGroupConfiguration(jchannel, jgroupsChannel);
endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel);
}
else if (discoveryAddress != null)
{
@ -1892,7 +1893,7 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable
String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress()
: raProperties.getDiscoveryLocalBindAddress();
endpointFactoryConfiguration = new UDPBroadcastGroupConfiguration()
endpointFactory = new UDPBroadcastEndpointFactory()
.setGroupAddress(discoveryAddress)
.setGroupPort(discoveryPort)
.setLocalBindAddress(localBindAddress)
@ -1900,7 +1901,9 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable
}
else if (jgroupsFileName != null)
{
endpointFactoryConfiguration = new JGroupsBroadcastGroupConfiguration(jgroupsFileName, jgroupsChannel);
endpointFactory = new JGroupsFileBroadcastEndpointFactory()
.setChannelName(jgroupsChannel)
.setFile(jgroupsFileName);
}
Long refreshTimeout = overrideProperties.getDiscoveryRefreshTimeout() != null ? overrideProperties.getDiscoveryRefreshTimeout()
: raProperties.getDiscoveryRefreshTimeout();
@ -1920,7 +1923,7 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable
DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration()
.setRefreshTimeout(refreshTimeout)
.setDiscoveryInitialWaitTimeout(initialTimeout)
.setBroadcastEndpointFactoryConfiguration(endpointFactoryConfiguration);
.setBroadcastEndpointFactory(endpointFactory);
if (ActiveMQRALogger.LOGGER.isDebugEnabled())
{
@ -2008,7 +2011,7 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable
if (connectorClassName == null)
{
BroadcastEndpointFactoryConfiguration endpointFactoryConfiguration = null;
BroadcastEndpointFactory endpointFactory = null;
if (discoveryAddress != null)
{
Integer discoveryPort = overrideProperties.getDiscoveryPort() != null ? overrideProperties.getDiscoveryPort()
@ -2020,7 +2023,7 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable
String localBindAddress = overrideProperties.getDiscoveryLocalBindAddress() != null ? overrideProperties.getDiscoveryLocalBindAddress()
: raProperties.getDiscoveryLocalBindAddress();
endpointFactoryConfiguration = new UDPBroadcastGroupConfiguration()
endpointFactory = new UDPBroadcastEndpointFactory()
.setGroupAddress(discoveryAddress)
.setGroupPort(discoveryPort)
.setLocalBindAddress(localBindAddress)
@ -2028,7 +2031,9 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable
}
else if (jgroupsFileName != null)
{
endpointFactoryConfiguration = new JGroupsBroadcastGroupConfiguration(jgroupsFileName, jgroupsChannel);
endpointFactory = new JGroupsFileBroadcastEndpointFactory()
.setChannelName(jgroupsChannel)
.setFile(jgroupsFileName);
}
else
{
@ -2037,9 +2042,9 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable
{
String jgroupsChannelRefName = raProperties.getJgroupsChannelRefName();
JChannel jchannel = ActiveMQRaUtils.locateJGroupsChannel(jgroupsLocatorClass, jgroupsChannelRefName);
endpointFactoryConfiguration = new JGroupsBroadcastGroupConfiguration(jchannel, jgroupsChannel);
endpointFactory = new ChannelBroadcastEndpointFactory(jchannel, jgroupsChannel);
}
if (endpointFactoryConfiguration == null)
if (endpointFactory == null)
{
throw new IllegalArgumentException("must provide either TransportType or DiscoveryGroupAddress and DiscoveryGroupPort for ActiveMQ ResourceAdapter Connection Factory");
}
@ -2061,7 +2066,7 @@ public class ActiveMQResourceAdapter implements ResourceAdapter, Serializable
DiscoveryGroupConfiguration groupConfiguration = new DiscoveryGroupConfiguration()
.setRefreshTimeout(refreshTimeout)
.setDiscoveryInitialWaitTimeout(initialTimeout)
.setBroadcastEndpointFactoryConfiguration(endpointFactoryConfiguration);
.setBroadcastEndpointFactory(endpointFactory);
groupConfiguration.setRefreshTimeout(refreshTimeout);

View File

@ -16,7 +16,6 @@
*/
package org.apache.activemq.core.config;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -35,7 +34,7 @@ import org.apache.activemq.core.settings.impl.AddressSettings;
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*/
public interface Configuration extends Serializable
public interface Configuration
{
/**
* To be used on dependency management on the application server

View File

@ -20,6 +20,7 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@ -51,7 +52,7 @@ import org.apache.activemq.core.settings.impl.AddressSettings;
* @author <a href="mailto:ataylor@redhat.com>Andy Taylor</a>
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*/
public class ConfigurationImpl implements Configuration
public class ConfigurationImpl implements Configuration, Serializable
{
// Constants ------------------------------------------------------------------------------

View File

@ -27,14 +27,14 @@ import java.util.Map;
import java.util.Set;
import org.apache.activemq.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.api.core.BroadcastEndpointFactoryConfiguration;
import org.apache.activemq.api.core.BroadcastEndpointFactory;
import org.apache.activemq.api.core.BroadcastGroupConfiguration;
import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.api.core.JGroupsBroadcastGroupConfiguration;
import org.apache.activemq.api.core.JGroupsFileBroadcastEndpointFactory;
import org.apache.activemq.api.core.Pair;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.api.core.UDPBroadcastGroupConfiguration;
import org.apache.activemq.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.api.core.client.ActiveMQClient;
import org.apache.activemq.core.config.BridgeConfiguration;
import org.apache.activemq.core.config.ClusterConnectionConfiguration;
@ -1268,15 +1268,17 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
// TODO: validate if either jgroups or UDP is being filled
BroadcastEndpointFactoryConfiguration endpointFactoryConfiguration;
BroadcastEndpointFactory endpointFactory;
if (jgroupsFile != null)
{
endpointFactoryConfiguration = new JGroupsBroadcastGroupConfiguration(jgroupsFile, jgroupsChannel);
endpointFactory = new JGroupsFileBroadcastEndpointFactory()
.setFile(jgroupsFile)
.setChannelName(jgroupsChannel);
}
else
{
endpointFactoryConfiguration = new UDPBroadcastGroupConfiguration()
endpointFactory = new UDPBroadcastEndpointFactory()
.setGroupAddress(groupAddress)
.setGroupPort(groupPort)
.setLocalBindAddress(localAddress)
@ -1287,7 +1289,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
.setName(name)
.setBroadcastPeriod(broadcastPeriod)
.setConnectorInfos(connectorNames)
.setEndpointFactoryConfiguration(endpointFactoryConfiguration);
.setEndpointFactory(endpointFactory);
mainConfig.getBroadcastGroupConfigurations().add(config);
}
@ -1317,14 +1319,16 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
String jgroupsChannel = getString(e, "jgroups-channel", null, Validators.NO_CHECK);
// TODO: validate if either jgroups or UDP is being filled
BroadcastEndpointFactoryConfiguration endpointFactoryConfiguration;
BroadcastEndpointFactory endpointFactory;
if (jgroupsFile != null)
{
endpointFactoryConfiguration = new JGroupsBroadcastGroupConfiguration(jgroupsFile, jgroupsChannel);
endpointFactory = new JGroupsFileBroadcastEndpointFactory()
.setFile(jgroupsFile)
.setChannelName(jgroupsChannel);
}
else
{
endpointFactoryConfiguration = new UDPBroadcastGroupConfiguration()
endpointFactory = new UDPBroadcastEndpointFactory()
.setGroupAddress(groupAddress)
.setGroupPort(groupPort)
.setLocalBindAddress(localBindAddress)
@ -1335,7 +1339,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil
.setName(name)
.setRefreshTimeout(refreshTimeout)
.setDiscoveryInitialWaitTimeout(discoveryInitialWaitTimeout)
.setBroadcastEndpointFactoryConfiguration(endpointFactoryConfiguration);
.setBroadcastEndpointFactory(endpointFactory);
if (mainConfig.getDiscoveryGroupConfigurations().containsKey(name))
{

View File

@ -19,7 +19,7 @@ package org.apache.activemq.core.management.impl;
import javax.management.MBeanOperationInfo;
import org.apache.activemq.api.core.BroadcastGroupConfiguration;
import org.apache.activemq.api.core.UDPBroadcastGroupConfiguration;
import org.apache.activemq.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.api.core.management.BroadcastGroupControl;
import org.apache.activemq.core.persistence.StorageManager;
import org.apache.activemq.core.server.cluster.BroadcastGroup;
@ -130,9 +130,9 @@ public class BroadcastGroupControlImpl extends AbstractControl implements Broadc
clearIO();
try
{
if (configuration.getEndpointFactoryConfiguration() instanceof UDPBroadcastGroupConfiguration)
if (configuration.getEndpointFactory() instanceof UDPBroadcastEndpointFactory)
{
return ((UDPBroadcastGroupConfiguration)configuration.getEndpointFactoryConfiguration()).getGroupAddress();
return ((UDPBroadcastEndpointFactory)configuration.getEndpointFactory()).getGroupAddress();
}
throw new Exception("Invalid request because this is not a UDP Broadcast configuration.");
}
@ -147,9 +147,9 @@ public class BroadcastGroupControlImpl extends AbstractControl implements Broadc
clearIO();
try
{
if (configuration.getEndpointFactoryConfiguration() instanceof UDPBroadcastGroupConfiguration)
if (configuration.getEndpointFactory() instanceof UDPBroadcastEndpointFactory)
{
return ((UDPBroadcastGroupConfiguration)configuration.getEndpointFactoryConfiguration()).getGroupPort();
return ((UDPBroadcastEndpointFactory)configuration.getEndpointFactory()).getGroupPort();
}
throw new Exception("Invalid request because this is not a UDP Broadcast configuration.");
}
@ -164,9 +164,9 @@ public class BroadcastGroupControlImpl extends AbstractControl implements Broadc
clearIO();
try
{
if (configuration.getEndpointFactoryConfiguration() instanceof UDPBroadcastGroupConfiguration)
if (configuration.getEndpointFactory() instanceof UDPBroadcastEndpointFactory)
{
return ((UDPBroadcastGroupConfiguration)configuration.getEndpointFactoryConfiguration()).getLocalBindPort();
return ((UDPBroadcastEndpointFactory)configuration.getEndpointFactory()).getLocalBindPort();
}
throw new Exception("Invalid request because this is not a UDP Broadcast configuration.");
}

View File

@ -17,7 +17,6 @@
package org.apache.activemq.core.remoting.impl.invm;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;

View File

@ -17,7 +17,6 @@
package org.apache.activemq.core.remoting.impl.invm;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;

View File

@ -16,11 +16,6 @@
*/
package org.apache.activemq.core.remoting.impl.invm;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.activemq.api.config.ActiveMQDefaultConfiguration;
/**
* A TransportConstants

View File

@ -17,7 +17,6 @@
package org.apache.activemq.core.remoting.impl.netty;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;

View File

@ -242,21 +242,6 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle
AcceptorFactory factory = (AcceptorFactory) clazz.newInstance();
// Check valid properties
if (info.getParams() != null)
{
Set<String> invalid = ConfigurationHelper.checkKeys(factory.getAllowableProperties(), info.getParams()
.keySet());
if (!invalid.isEmpty())
{
ActiveMQServerLogger.LOGGER.invalidAcceptorKeys(ConfigurationHelper.stringSetToCommaListString(invalid));
continue;
}
}
Map<String, ProtocolManager> supportedProtocols = new ConcurrentHashMap();
String protocol = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME, null,

View File

@ -856,7 +856,7 @@ public final class ClusterManager implements ActiveMQComponent
if (group == null)
{
group = new BroadcastGroupImpl(nodeManager, config.getName(),
config.getBroadcastPeriod(), scheduledExecutor, config.getEndpointFactoryConfiguration().createBroadcastEndpointFactory());
config.getBroadcastPeriod(), scheduledExecutor, config.getEndpointFactory());
for (String connectorInfo : config.getConnectorInfos())
{

View File

@ -17,7 +17,6 @@
package org.apache.activemq.spi.core.remoting;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;

View File

@ -22,7 +22,7 @@ import org.apache.activemq.api.core.BroadcastGroupConfiguration;
import org.apache.activemq.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.api.core.UDPBroadcastGroupConfiguration;
import org.apache.activemq.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.core.config.BridgeConfiguration;
import org.apache.activemq.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.core.config.Configuration;
@ -141,7 +141,7 @@ public class FileConfigurationTest extends ConfigurationImplTest
Assert.assertEquals(2, conf.getBroadcastGroupConfigurations().size());
for (BroadcastGroupConfiguration bc : conf.getBroadcastGroupConfigurations())
{
UDPBroadcastGroupConfiguration udpBc = (UDPBroadcastGroupConfiguration) bc.getEndpointFactoryConfiguration();
UDPBroadcastEndpointFactory udpBc = (UDPBroadcastEndpointFactory) bc.getEndpointFactory();
if (bc.getName().equals("bg1"))
{
Assert.assertEquals("bg1", bc.getName());
@ -165,16 +165,16 @@ public class FileConfigurationTest extends ConfigurationImplTest
Assert.assertEquals(2, conf.getDiscoveryGroupConfigurations().size());
DiscoveryGroupConfiguration dc = conf.getDiscoveryGroupConfigurations().get("dg1");
Assert.assertEquals("dg1", dc.getName());
Assert.assertEquals("192.168.0.120", ((UDPBroadcastGroupConfiguration) dc.getBroadcastEndpointFactoryConfiguration()).getGroupAddress());
assertEquals("172.16.8.10", ((UDPBroadcastGroupConfiguration) dc.getBroadcastEndpointFactoryConfiguration()).getLocalBindAddress());
Assert.assertEquals(11999, ((UDPBroadcastGroupConfiguration) dc.getBroadcastEndpointFactoryConfiguration()).getGroupPort());
Assert.assertEquals("192.168.0.120", ((UDPBroadcastEndpointFactory) dc.getBroadcastEndpointFactory()).getGroupAddress());
assertEquals("172.16.8.10", ((UDPBroadcastEndpointFactory) dc.getBroadcastEndpointFactory()).getLocalBindAddress());
Assert.assertEquals(11999, ((UDPBroadcastEndpointFactory) dc.getBroadcastEndpointFactory()).getGroupPort());
Assert.assertEquals(12345, dc.getRefreshTimeout());
dc = conf.getDiscoveryGroupConfigurations().get("dg2");
Assert.assertEquals("dg2", dc.getName());
Assert.assertEquals("192.168.0.121", ((UDPBroadcastGroupConfiguration) dc.getBroadcastEndpointFactoryConfiguration()).getGroupAddress());
assertEquals("172.16.8.11", ((UDPBroadcastGroupConfiguration) dc.getBroadcastEndpointFactoryConfiguration()).getLocalBindAddress());
Assert.assertEquals(12999, ((UDPBroadcastGroupConfiguration) dc.getBroadcastEndpointFactoryConfiguration()).getGroupPort());
Assert.assertEquals("192.168.0.121", ((UDPBroadcastEndpointFactory) dc.getBroadcastEndpointFactory()).getGroupAddress());
assertEquals("172.16.8.11", ((UDPBroadcastEndpointFactory) dc.getBroadcastEndpointFactory()).getLocalBindAddress());
Assert.assertEquals(12999, ((UDPBroadcastEndpointFactory) dc.getBroadcastEndpointFactory()).getGroupPort());
Assert.assertEquals(23456, dc.getRefreshTimeout());
Assert.assertEquals(2, conf.getDivertConfigurations().size());

View File

@ -64,38 +64,27 @@ A JMS connection factory is used by the client to make connections to
the server. It knows the location of the server it is connecting to, as
well as many other configuration parameters.
By default, a `javax.naming.Context` instance created using the
`org.apache.activemq.jndi.ActiveMQInitialContextFactory` will
automatically have the following connection factories available for
lookup:
- `ConnectionFactory`
- `XAConnectionFactory`
- `QueueConnectionFactory`
- `TopicConnectionFactory`
Here's a simple example of the JNDI context environment for a client
looking up a connection factory to access an *embedded* instance of
ActiveMQ:
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
connectionFactory.invmConnectionFactory=vm://0
It's really as simple as that. As noted previously, any JNDI context
created with the `ActiveMQInitialContextFactory` will have a set of
default connection factories available. Therefore, only the
`java.naming.factory.initial` property is required to access an embedded
broker.
In this instance we have created a connection factory that is bound to
`invmConnectionFactory`, any entry with prefix `connectionFactory.` will
create a connection factory.
In certain situations there could be multiple server instances running
within a particular JVM. In that situation each server would typically
have an InVM acceptor with a unique server-ID. A client using JMS and
JNDI can account for this by specifying a
`javax.naming.Context.PROVIDER_URL` (`String` value of
"java.naming.provider.url") in the JNDI environment like `vm://2` where
`2` is the server-ID for acceptor.
JNDI can account for this by specifying a connction factory for each
server, like so:
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
connectionFactory.invmConnectionFactory0=vm://0
connectionFactory.invmConnectionFactory1=vm://1
connectionFactory.invmConnectionFactory2=vm://2
Here is a list of all the supported URL schemes:
@ -108,19 +97,17 @@ Here is a list of all the supported URL schemes:
- `jgroups`
Most clients won't be connecting to an embedded broker. Clients will
most commonly connect across a network a remote broker. In that case the
client can use the `javax.naming.Context.PROVIDER_URL` (`String` value
of "java.naming.provider.url") in the JNDI environment to specify where
to connect. Here's a simple example of a client configuring a connection
factory to connect to a remote broker running on myhost:5445:
most commonly connect across a network a remote broker. Here's a simple
example of a client configuring a connection factory to connect to a
remote broker running on myhost:5445:
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://myhost:5445
connectionFactory.ConnectionFactory=tcp://myhost:5445
In the example above the client is using the `tcp` scheme for the
provider URL. A client may also specify multiple comma-delimited
host:port combinations in the URL (e.g.
`tcp://remote-host1:5445,remote-host2:5445`). Whether there is one or
`(tcp://remote-host1:5445,remote-host2:5445)`). Whether there is one or
many host:port combinations in the URL they are treated as the *initial
connector(s)* for the underlying connection.
@ -132,13 +119,24 @@ Each scheme has a specific set of properties which can be set using the
traditional URL query string format (e.g.
`scheme://host:port?key1=value1&key2=value2`) to customize the
underlying transport mechanism. For example, if a client wanted to
connect to a remote server using TCP and SSL it would use a
`Context.PROVIDER_URL` of `tcp://remote-host:5445?ssl-enabled=true`.
connect to a remote server using TCP and SSL it would create a connection
factory like so, `tcp://remote-host:5445?ssl-enabled=true`.
All the properties available for the `tcp` scheme are described in [the
documentation regarding the Netty
transport](#configuring-transports.netty).
Note if you are using the `tcp` scheme and multiple addresses then a query
can be applied to all the url's or just to an individual connector, so where
you have
- `(tcp://remote-host1:5445?httpEnabled=true,remote-host2:5445?httpEnabled=true)?clientID=1234`
then the `httpEnabled` property is only set on the individual connectors where as the `clientId`
is set on the actual connection factory. Any connector specific properties set on the whole
URI will be applied to all the connectors.
The `udp` scheme supports 4 properties:
- `local-address` - If you are running with multiple network
@ -169,48 +167,23 @@ The `udp` scheme supports 4 properties:
value for this parameter is 10000 milliseconds.
Lastly, the `jgroups` scheme is supported which provides an alternative
to the `udp` scheme for server discovery. The URL pattern is as follows
`jgroups://<jgroups-xml-conf-filename>` where
`<jgroups-xml-conf-filename>` refers to an XML file on the classpath
that contains the JGroups configuration.
to the `udp` scheme for server discovery. The URL pattern is either
`jgroups://channelName?file=jgroups-xml-conf-filename`
where`jgroups-xml-conf-filename` refers to an XML file on the classpath
that contains the JGroups configuration or it can be
`jgroups://channelName?properties=some-jgroups-properties`. In both instance the
`channelName` is the name given to the jgroups channel created.
The `refresh-timeout` and `discovery-initial-wait-timeout` properties
are supported just like with `udp`.
Although a `javax.naming.Context` instance created using the
`org.apache.activemq.jndi.ActiveMQInitialContextFactory` will
automatically have some connection factories present, it is possible for
a client to specify its own connection factories. This is done using the
`org.apache.activemq.jndi.ActiveMQInitialContextFactory.CONNECTION_FACTORY_NAMES`
property (String value of "connectionFactoryNames"). The value for this
property is a comma delimited String of all the connection factories the
client wishes to create. For example:
The default type for the default connection factory is of type `javax.jms.ConnectionFactory`.
This can be changed by setting the type like so
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connectionFactoryNames=myConnectionFactory
java.naming.provider.url=tcp://localhost:5445?type=CF
In this example, the client is creating a connection factory named
"myConnectionFactory." This replaces all the default connection
factories so that only the "myConnectionFactory" connection factory is
available to the client.
Aside from the underlying transport, the underlying connection factory
implementation can also be configured using special properties. To
configure a particular connection factory the client would follow this
pattern for the property name to set in the environment:
`connection.<connection-factory-name>.<property-name>`. For example, if
the client wanted to customize the default connection factory
"ConnectionFactory" to support high-availability then it would do this:
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://myhost:5445
connection.ConnectionFactory.ha=true
Any property available on the underlying
`org.apache.activemq.jms.client.ActiveMQConnectionFactory` can be set
this way in addition to the `ha` (boolean) and `type` (String)
properties. Here are the different options for the `type`:
In this example it is still set to the default, below shows a list of types that can be set.
#### Configuration for Connection Factory Types
<table>

View File

@ -16,5 +16,5 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connectionFactory.ConnectionFactory=tcp://localhost:5445
queue.queue/exampleQueue=exampleQueue

View File

@ -154,7 +154,7 @@ public class ApplicationLayerFailoverExample extends ActiveMQExample
// Step 1. Get an initial context for looking up JNDI from the server
Hashtable<String, Object> properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", "tcp://127.0.0.1:" + (5445 + server));
properties.put("connectionFactory.ConnectionFactory", "tcp://127.0.0.1:" + (5445 + server));
properties.put("queue.queue/exampleQueue", "exampleQueue");
initialContext = new InitialContext(properties);

View File

@ -59,7 +59,7 @@ public class BridgeExample extends ActiveMQExample
Hashtable<String, Object> properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", "tcp://127.0.0.1:5445");
properties.put("connectionFactory.ConnectionFactory", "tcp://127.0.0.1:5445");
properties.put("queue.queue/sausage-factory", "sausage-factory");
ic0 = new InitialContext(properties);
@ -75,7 +75,7 @@ public class BridgeExample extends ActiveMQExample
properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", "tcp://127.0.0.1:5446");
properties.put("connectionFactory.ConnectionFactory", "tcp://127.0.0.1:5446");
properties.put("queue.queue/mincing-machine", "mincing-machine");
ic1 = new InitialContext(properties);

View File

@ -16,5 +16,5 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connectionFactory.ConnectionFactory=tcp://localhost:5445
queue.queue/exampleQueue=exampleQueue

View File

@ -16,4 +16,4 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connectionFactory.ConnectionFactory=tcp://localhost:5445

View File

@ -16,9 +16,5 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connectionFactory.ConnectionFactory=tcp://localhost:5445?ha=true&retryInterval=1000&retryIntervalMultiplier=1.0&reconnectAttempts=-1
queue.queue/exampleQueue=exampleQueue
connection.ConnectionFactory.ha=true
connection.ConnectionFactory.retryInterval=1000
connection.ConnectionFactory.retryIntervalMultiplier=1.0
connection.ConnectionFactory.reconnectAttempts=-1

View File

@ -16,5 +16,5 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=udp://231.7.7.7:9876
connectionFactory.ConnectionFactory=udp://231.7.7.7:9876
queue.queue/exampleQueue=exampleQueue

View File

@ -60,7 +60,7 @@ public class ClusteredDurableSubscriptionExample extends ActiveMQExample
// Step 1. Get an initial context for looking up JNDI from server 0
Hashtable<String, Object> properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", args[0]);
properties.put("connectionFactory.ConnectionFactory", args[0]);
properties.put("topic.topic/exampleTopic", "exampleTopic");
ic0 = new InitialContext(properties);
@ -74,7 +74,7 @@ public class ClusteredDurableSubscriptionExample extends ActiveMQExample
properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", args[1]);
properties.put("connectionFactory.ConnectionFactory", args[1]);
ic1 = new InitialContext(properties);
// Step 5. Look-up a JMS Connection Factory object from JNDI on server 1

View File

@ -62,7 +62,7 @@ public class ClusteredGroupingExample extends ActiveMQExample
// Step 1. Get an initial context for looking up JNDI from server 0
Hashtable<String, Object> properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", args[0]);
properties.put("connectionFactory.ConnectionFactory", args[0]);
properties.put("queue.queue/exampleQueue", "exampleQueue");
ic0 = new InitialContext(properties);
@ -75,7 +75,7 @@ public class ClusteredGroupingExample extends ActiveMQExample
// Step 4. Get an initial context for looking up JNDI from server 1
properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", args[1]);
properties.put("connectionFactory.ConnectionFactory", args[1]);
ic1 = new InitialContext(properties);
// Step 5. Look-up a JMS Connection Factory object from JNDI on server 1
@ -84,7 +84,7 @@ public class ClusteredGroupingExample extends ActiveMQExample
// Step 4. Get an initial context for looking up JNDI from server 2
properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", args[2]);
properties.put("connectionFactory.ConnectionFactory", args[2]);
ic2 = new InitialContext(properties);
// Step 5. Look-up a JMS Connection Factory object from JNDI on server 2

View File

@ -58,7 +58,7 @@ public class ClusteredJgroupsExample extends ActiveMQExample
// Step 1. Get an initial context for looking up JNDI from server 0
Hashtable<String, Object> properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", args[0]);
properties.put("connectionFactory.ConnectionFactory", args[0]);
properties.put("queue.queue/exampleQueue", "exampleQueue");
ic0 = new InitialContext(properties);
@ -71,7 +71,7 @@ public class ClusteredJgroupsExample extends ActiveMQExample
// Step 4. Get an initial context for looking up JNDI from server 1
properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", args[1]);
properties.put("connectionFactory.ConnectionFactory", args[1]);
ic1 = new InitialContext(properties);
// Step 5. Look-up a JMS Connection Factory object from JNDI on server 1

View File

@ -16,4 +16,4 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connectionFactory.ConnectionFactory=tcp://localhost:5445

View File

@ -58,7 +58,7 @@ public class ClusteredQueueExample extends ActiveMQExample
// Step 1. Get an initial context for looking up JNDI from server 0
Hashtable<String, Object> properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", args[0]);
properties.put("connectionFactory.ConnectionFactory", args[0]);
properties.put("queue.queue/exampleQueue", "exampleQueue");
ic0 = new InitialContext(properties);
@ -71,7 +71,7 @@ public class ClusteredQueueExample extends ActiveMQExample
// Step 4. Get an initial context for looking up JNDI from server 1
properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", args[1]);
properties.put("connectionFactory.ConnectionFactory", args[1]);
ic1 = new InitialContext(properties);
// Step 5. Look-up a JMS Connection Factory object from JNDI on server 1

View File

@ -56,18 +56,18 @@ public class ClusteredStandaloneExample extends ActiveMQExample
{
Hashtable<String, Object> properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", args[0]);
properties.put("connectionFactory.ConnectionFactory", args[0]);
properties.put("topic.topic/exampleTopic", "exampleTopic");
initialContext0 = new InitialContext(properties);
properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", args[1]);
properties.put("connectionFactory.ConnectionFactory", args[1]);
initialContext1 = new InitialContext(properties);
properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", args[2]);
properties.put("connectionFactory.ConnectionFactory", args[2]);
initialContext2 = new InitialContext(properties);
// First we demonstrate a distributed topic.

View File

@ -62,7 +62,7 @@ public class StaticClusteredQueueExample extends ActiveMQExample
// Step 1. Get an initial context for looking up JNDI from server 3
Hashtable<String, Object> properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", args[3]);
properties.put("connectionFactory.ConnectionFactory", args[3]);
properties.put("queue.queue/exampleQueue", "exampleQueue");
ic0 = new InitialContext(properties);

View File

@ -60,7 +60,7 @@ public class ClusterStaticOnewayExample extends ActiveMQExample
// Step 1. Get an initial context for looking up JNDI from server 0
Hashtable<String, Object> properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", args[0]);
properties.put("connectionFactory.ConnectionFactory", args[0]);
properties.put("queue.queue/exampleQueue", "exampleQueue");
ic0 = new InitialContext(properties);

View File

@ -58,7 +58,7 @@ public class ClusteredTopicExample extends ActiveMQExample
// Step 1. Get an initial context for looking up JNDI from server 0
Hashtable<String, Object> properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", args[0]);
properties.put("connectionFactory.ConnectionFactory", args[0]);
properties.put("topic.topic/exampleTopic", "exampleTopic");
ic0 = new InitialContext(properties);
@ -71,7 +71,7 @@ public class ClusteredTopicExample extends ActiveMQExample
// Step 4. Get an initial context for looking up JNDI from server 1
properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", args[1]);
properties.put("connectionFactory.ConnectionFactory", args[1]);
ic1 = new InitialContext(properties);
// Step 5. Look-up a JMS Connection Factory object from JNDI on server 1

View File

@ -58,17 +58,13 @@ public class ColocatedFailoverScaleDownExample extends ActiveMQExample
// Step 1. Get an initial context for looking up JNDI for both servers
Hashtable<String, Object> properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", args[1]);
properties.put("connectionFactory.ConnectionFactory", args[1]);
initialContext1 = new InitialContext(properties);
properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", args[0]);
properties.put("connectionFactory.ConnectionFactory", args[0] + "?ha=true&retryInterval=1000&retryIntervalMultiplier=1.0&reconnectAttempts=-1");
properties.put("queue.queue/exampleQueue", "exampleQueue");
properties.put("connection.ConnectionFactory.ha", true);
properties.put("connection.ConnectionFactory.retryInterval", 1000);
properties.put("connection.ConnectionFactory.retryIntervalMultiplier", 1.0);
properties.put("connection.ConnectionFactory.reconnectAttempts", -1);
initialContext = new InitialContext(properties);
// Step 2. Look up the JMS resources from JNDI

View File

@ -56,17 +56,13 @@ public class ColocatedFailoverExample extends ActiveMQExample
// Step 1. Get an initial context for looking up JNDI for both servers
Hashtable<String, Object> properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", args[1]);
properties.put("connectionFactory.ConnectionFactory", args[1]);
initialContext1 = new InitialContext(properties);
properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", args[0]);
properties.put("connectionFactory.ConnectionFactory", args[0] + "?ha=true&retryInterval=1000&retryIntervalMultiplier=1.0&reconnectAttempts=-1");
properties.put("queue.queue/exampleQueue", "exampleQueue");
properties.put("connection.ConnectionFactory.ha", true);
properties.put("connection.ConnectionFactory.retryInterval", 1000);
properties.put("connection.ConnectionFactory.retryIntervalMultiplier", 1.0);
properties.put("connection.ConnectionFactory.reconnectAttempts", -1);
initialContext = new InitialContext(properties);
// Step 2. Look up the JMS resources from JNDI

View File

@ -16,6 +16,5 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connection.ConnectionFactory.consumerMaxRate=10
connectionFactory.ConnectionFactory=tcp://localhost:5445?consumerMaxRate=10
queue.queue/exampleQueue=exampleQueue

View File

@ -16,6 +16,6 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connectionFactory.ConnectionFactory=tcp://localhost:5445
queue.queue/exampleQueue=exampleQueue
queue.queue/deadLetterQueue=deadLetterQueue

View File

@ -16,5 +16,5 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connectionFactory.ConnectionFactory=tcp://localhost:5445
queue.queue/exampleQueue=exampleQueue

View File

@ -61,7 +61,7 @@ public class DivertExample extends ActiveMQExample
// Step 1. Create an initial context to perform the JNDI lookup on the London server
Hashtable<String, Object> properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", args[0]);
properties.put("connectionFactory.ConnectionFactory", args[0]);
properties.put("queue.queue/orders", "orders");
properties.put("topic.topic/priceUpdates", "priceUpdates");
properties.put("topic.topic/spyTopic", "spyTopic");
@ -80,7 +80,7 @@ public class DivertExample extends ActiveMQExample
// Step 6. Create an initial context to perform the JNDI lookup on the New York server
properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", args[1]);
properties.put("connectionFactory.ConnectionFactory", args[1]);
properties.put("topic.topic/newYorkPriceUpdates", "newYorkPriceUpdates");
initialContextNewYork = new InitialContext(properties);

View File

@ -16,5 +16,5 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connectionFactory.ConnectionFactory=tcp://localhost:5445
topic.topic/exampleTopic=exampleTopic

View File

@ -16,6 +16,6 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connectionFactory.ConnectionFactory=tcp://localhost:5445
queue.queue/exampleQueue=exampleQueue
queue.queue/expiryQueue=expiryQueue

View File

@ -67,21 +67,13 @@ public class HAPolicyAutoBackupExample extends ActiveMQExample
// Step 1. Get an initial context for looking up JNDI from server 0 and 1
Hashtable<String, Object> properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", args[0]);
properties.put("connectionFactory.ConnectionFactory", args[0] + "?ha=true&retryInterval=1000&retryIntervalMultiplier=1.0&reconnectAttempts=-1");
properties.put("queue.queue/exampleQueue", "exampleQueue");
properties.put("connection.ConnectionFactory.ha", true);
properties.put("connection.ConnectionFactory.retryInterval", 1000);
properties.put("connection.ConnectionFactory.retryIntervalMultiplier", 1.0);
properties.put("connection.ConnectionFactory.reconnectAttempts", -1);
ic0 = new InitialContext(properties);
properties = new Hashtable<String, Object>();
properties.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.put("java.naming.provider.url", args[1]);
properties.put("connection.ConnectionFactory.ha", true);
properties.put("connection.ConnectionFactory.retryInterval", 1000);
properties.put("connection.ConnectionFactory.retryIntervalMultiplier", 1.0);
properties.put("connection.ConnectionFactory.reconnectAttempts", -1);
properties.put("connectionFactory.ConnectionFactory", args[1] + "?ha=true&retryInterval=1000&retryIntervalMultiplier=1.0&reconnectAttempts=-1");
ic1 = new InitialContext(properties);
// Step 2. Look-up the JMS Queue object from JNDI

View File

@ -16,5 +16,5 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:8080?http-enabled=true
connectionFactory.ConnectionFactory=tcp://localhost:8080?http-enabled=true
queue.queue/exampleQueue=exampleQueue

View File

@ -16,5 +16,5 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connectionFactory.ConnectionFactory=tcp://localhost:5445
queue.queue/exampleQueue=exampleQueue

View File

@ -16,5 +16,5 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connectionFactory.ConnectionFactory=tcp://localhost:5445
queue.queue/exampleQueue=exampleQueue

View File

@ -165,7 +165,7 @@ public class JMSBridgeExample
private static Hashtable<String, String> createJndiParams(String server)
{
Hashtable<String, String> jndiProps = new Hashtable<String, String>();
jndiProps.put("java.naming.provider.url", server);
jndiProps.put("connectionFactory.ConnectionFactory", server);
jndiProps.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
jndiProps.put("queue.target/queue", "target");
jndiProps.put("topic.source/topic", "topic");

View File

@ -16,5 +16,5 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connectionFactory.ConnectionFactory=tcp://localhost:5445
queue.queue/exampleQueue=exampleQueue

View File

@ -16,5 +16,5 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connectionFactory.ConnectionFactory=tcp://localhost:5445
queue.queue/exampleQueue=exampleQueue

View File

@ -16,5 +16,5 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connectionFactory.ConnectionFactory=tcp://localhost:5445
topic.topic/exampleTopic=exampleTopic

View File

@ -16,5 +16,5 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connectionFactory.ConnectionFactory=tcp://localhost:5445
queue.queue/exampleQueue=exampleQueue

View File

@ -16,5 +16,5 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connectionFactory.ConnectionFactory=tcp://localhost:5445
queue.queue/exampleQueue=exampleQueue

View File

@ -16,5 +16,5 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connectionFactory.ConnectionFactory=tcp://localhost:5445
queue.queue/exampleQueue=exampleQueue

View File

@ -16,6 +16,6 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connectionFactory.ConnectionFactory=tcp://localhost:5445
queue.queue/exampleQueue=exampleQueue
topic.topic/notificationsTopic=notificationsTopic

View File

@ -16,5 +16,5 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connectionFactory.ConnectionFactory=tcp://localhost:5445
queue.queue/exampleQueue=exampleQueue

View File

@ -16,6 +16,6 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connectionFactory.ConnectionFactory=tcp://localhost:5445
queue.queue/exampleQueue=exampleQueue
queue.queue/expiryQueue=expiryQueue

View File

@ -16,5 +16,5 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connectionFactory.ConnectionFactory=tcp://localhost:5445
queue.queue/exampleQueue=exampleQueue

View File

@ -16,6 +16,5 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connection.ConnectionFactory.groupID=Group-0
connectionFactory.ConnectionFactory=tcp://localhost:5445?groupID=Group-0
queue.queue/exampleQueue=exampleQueue

View File

@ -16,5 +16,5 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connectionFactory.ConnectionFactory=tcp://localhost:5445
queue.queue/exampleQueue=exampleQueue

View File

@ -16,9 +16,5 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connection.ConnectionFactory.ha=true
connection.ConnectionFactory.retryInterval=1000
connection.ConnectionFactory.retryIntervalMultiplier=1.0
connection.ConnectionFactory.reconnectAttempts=-1
connectionFactory.ConnectionFactory=tcp://localhost:5445?ha=true&retryInterval=1000&retryIntervalMultiplier=1.0&reconnectAttempts=-1
queue.queue/exampleQueue=exampleQueue

View File

@ -16,9 +16,5 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connection.ConnectionFactory.ha=true
connection.ConnectionFactory.retryInterval=1000
connection.ConnectionFactory.retryIntervalMultiplier=1.0
connection.ConnectionFactory.reconnectAttempts=-1
connectionFactory.ConnectionFactory=tcp://localhost:5445?ha=true&retryInterval=1000&retryIntervalMultiplier=1.0&reconnectAttempts=-1
queue.queue/exampleQueue=exampleQueue

View File

@ -16,6 +16,5 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connection.ConnectionFactory.consumerWindowSize=0
connectionFactory.ConnectionFactory=tcp://localhost:5445?consumerWindowSize=0
queue.queue/exampleQueue=exampleQueue

View File

@ -16,9 +16,5 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connection.ConnectionFactory.ha=true
connection.ConnectionFactory.retryInterval=1000
connection.ConnectionFactory.retryIntervalMultiplier=1.0
connection.ConnectionFactory.reconnectAttempts=-1
connectionFactory.ConnectionFactory=tcp://localhost:5445?ha=true&retryInterval=1000&retryIntervalMultiplier=1.0&reconnectAttempts=-1
queue.queue/exampleQueue=exampleQueue

View File

@ -16,6 +16,6 @@
# under the License.
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:5445
connectionFactory.ConnectionFactory=tcp://localhost:5445
queue.queue/exampleQueue=exampleQueue
queue.queue/pagingQueue=pagingQueue

Some files were not shown because too many files have changed in this diff Show More