mirror of https://github.com/apache/activemq.git
You can now configure the prefetchPolicy and redeliveryPolicy using the jndi properties. You can also do it using the Broker URL git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@418495 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9befb11435
commit
7f0e13571f
|
@ -20,7 +20,6 @@ import java.net.URI;
|
|||
import java.net.URISyntaxException;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Enumeration;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
|
@ -30,10 +29,8 @@ import javax.jms.QueueConnectionFactory;
|
|||
import javax.jms.TopicConnection;
|
||||
import javax.jms.TopicConnectionFactory;
|
||||
import javax.naming.Context;
|
||||
import javax.naming.Referenceable;
|
||||
import javax.naming.Reference;
|
||||
import javax.naming.NamingException;
|
||||
|
||||
import org.apache.activemq.jndi.JNDIBaseStorable;
|
||||
import org.apache.activemq.management.JMSStatsImpl;
|
||||
import org.apache.activemq.management.StatsCapable;
|
||||
import org.apache.activemq.management.StatsImpl;
|
||||
|
@ -43,7 +40,6 @@ import org.apache.activemq.util.IntrospectionSupport;
|
|||
import org.apache.activemq.util.JMSExceptionSupport;
|
||||
import org.apache.activemq.util.URISupport;
|
||||
import org.apache.activemq.util.URISupport.CompositeData;
|
||||
import org.apache.activemq.jndi.JNDIBaseStorable;
|
||||
|
||||
import edu.emory.mathcs.backport.java.util.concurrent.Executor;
|
||||
import edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
|
@ -277,7 +273,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
|||
try {
|
||||
|
||||
Map map = URISupport.parseQuery(this.brokerURL.getQuery());
|
||||
if( IntrospectionSupport.setProperties(this, map, "jms.") ) {
|
||||
if( buildFromMap(IntrospectionSupport.extractProperties(map, "jms.")) ) {
|
||||
this.brokerURL = URISupport.createRemainingURI(this.brokerURL, map);
|
||||
}
|
||||
|
||||
|
@ -289,7 +285,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
|||
// It might be a composite URI.
|
||||
try {
|
||||
CompositeData data = URISupport.parseComposite(this.brokerURL);
|
||||
if( IntrospectionSupport.setProperties(this, data.getParameters(), "jms.") ) {
|
||||
if( buildFromMap(IntrospectionSupport.extractProperties(data.getParameters(), "jms.")) ) {
|
||||
this.brokerURL = data.toURI();
|
||||
}
|
||||
} catch (URISyntaxException e) {
|
||||
|
@ -391,8 +387,6 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
|||
properties = new Properties();
|
||||
}
|
||||
|
||||
IntrospectionSupport.setProperties(this, properties);
|
||||
|
||||
String temp = properties.getProperty(Context.PROVIDER_URL);
|
||||
if (temp == null || temp.length() == 0) {
|
||||
temp = properties.getProperty("brokerURL");
|
||||
|
@ -400,6 +394,28 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
|||
if (temp != null && temp.length() > 0) {
|
||||
setBrokerURL(temp);
|
||||
}
|
||||
|
||||
buildFromMap(properties);
|
||||
}
|
||||
|
||||
public boolean buildFromMap(Map properties) {
|
||||
boolean rc=false;
|
||||
|
||||
ActiveMQPrefetchPolicy p = new ActiveMQPrefetchPolicy();
|
||||
if( IntrospectionSupport.setProperties(p, properties, "prefetchPolicy.") ) {
|
||||
setPrefetchPolicy(p);
|
||||
rc = true;
|
||||
}
|
||||
|
||||
RedeliveryPolicy rp = new RedeliveryPolicy();
|
||||
if ( IntrospectionSupport.setProperties(rp, properties, "redeliveryPolicy.") ) {
|
||||
setRedeliveryPolicy(rp);
|
||||
rc = true;
|
||||
}
|
||||
|
||||
rc |= IntrospectionSupport.setProperties(this, properties);
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
public void populateProperties(Properties props) {
|
||||
|
@ -414,6 +430,9 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
|
|||
props.setProperty("clientID", getClientID());
|
||||
}
|
||||
|
||||
IntrospectionSupport.getProperties(getPrefetchPolicy(), props, "prefetchPolicy.");
|
||||
IntrospectionSupport.getProperties(getRedeliveryPolicy(), props, "redeliveryPolicy.");
|
||||
|
||||
props.setProperty("copyMessageOnSend", Boolean.toString(isCopyMessageOnSend()));
|
||||
props.setProperty("disableTimeStampsByDefault", Boolean.toString(isDisableTimeStampsByDefault()));
|
||||
props.setProperty("objectMessageSerializationDefered", Boolean.toString(isObjectMessageSerializationDefered()));
|
||||
|
|
|
@ -35,6 +35,53 @@ import java.util.Map.Entry;
|
|||
|
||||
public class IntrospectionSupport {
|
||||
|
||||
|
||||
static public boolean getProperties(Object target, Map props, String optionPrefix) {
|
||||
|
||||
boolean rc = false;
|
||||
if( target == null )
|
||||
throw new IllegalArgumentException("target was null.");
|
||||
if( props == null )
|
||||
throw new IllegalArgumentException("props was null.");
|
||||
|
||||
if( optionPrefix == null )
|
||||
optionPrefix="";
|
||||
|
||||
Class clazz = target.getClass();
|
||||
Method[] methods = clazz.getMethods();
|
||||
for (int i = 0; i < methods.length; i++) {
|
||||
Method method = methods[i];
|
||||
String name = method.getName();
|
||||
Class type = method.getReturnType();
|
||||
Class params[] = method.getParameterTypes();
|
||||
if( name.startsWith("get") && params.length==0 &&
|
||||
type!=null && isSettableType(type)) {
|
||||
|
||||
try {
|
||||
|
||||
Object value = method.invoke(target, new Object[]{});
|
||||
if( value == null )
|
||||
continue;
|
||||
|
||||
String strValue = convertToString(value, type);
|
||||
if( strValue ==null )
|
||||
continue;
|
||||
|
||||
name = name.substring(3,4).toLowerCase()+name.substring(4);
|
||||
props.put(optionPrefix+name, strValue);
|
||||
rc = true;
|
||||
|
||||
} catch ( Throwable ignore) {
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
|
||||
|
||||
static public boolean setProperties(Object target, Map props, String optionPrefix) {
|
||||
boolean rc = false;
|
||||
if( target == null )
|
||||
|
@ -75,7 +122,9 @@ public class IntrospectionSupport {
|
|||
return rc;
|
||||
}
|
||||
|
||||
public static void setProperties(Object target, Map props) {
|
||||
public static boolean setProperties(Object target, Map props) {
|
||||
boolean rc = false;
|
||||
|
||||
if( target == null )
|
||||
throw new IllegalArgumentException("target was null.");
|
||||
if( props == null )
|
||||
|
@ -85,8 +134,11 @@ public class IntrospectionSupport {
|
|||
Map.Entry entry = (Entry) iter.next();
|
||||
if( setProperty(target, (String) entry.getKey(), entry.getValue()) ) {
|
||||
iter.remove();
|
||||
rc=true;
|
||||
}
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
private static boolean setProperty(Object target, String name, Object value) {
|
||||
|
@ -121,6 +173,18 @@ public class IntrospectionSupport {
|
|||
return null;
|
||||
}
|
||||
|
||||
private static String convertToString(Object value, Class type) throws URISyntaxException {
|
||||
PropertyEditor editor = PropertyEditorManager.findEditor(type);
|
||||
if( editor != null ) {
|
||||
editor.setValue(value);
|
||||
return editor.getAsText();
|
||||
}
|
||||
if( type == URI.class ) {
|
||||
return ((URI)value).toString();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private static Method findSetterMethod(Class clazz, String name) {
|
||||
// Build the method name.
|
||||
name = "set"+name.substring(0,1).toUpperCase()+name.substring(1);
|
||||
|
|
|
@ -59,4 +59,27 @@ public class InitialContextTest extends TestCase {
|
|||
|
||||
assertEquals("the brokerURL should match", expected, connectionFactory.getBrokerURL());
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void testConnectionFactoryPolicyConfig() throws Exception {
|
||||
|
||||
Properties properties = new Properties();
|
||||
properties.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
|
||||
properties.put(Context.PROVIDER_URL, "tcp://localhost:65432");
|
||||
properties.put("prefetchPolicy.queuePrefetch", "777");
|
||||
properties.put("redeliveryPolicy.maximumRedeliveries", "15");
|
||||
properties.put("redeliveryPolicy.backOffMultiplier", "32");
|
||||
|
||||
InitialContext context = new InitialContext(properties);
|
||||
assertTrue("Created context", context != null);
|
||||
|
||||
ActiveMQConnectionFactory connectionFactory = (ActiveMQConnectionFactory) context.lookup("ConnectionFactory");
|
||||
|
||||
assertTrue("Should have created a ConnectionFactory", connectionFactory != null);
|
||||
|
||||
assertEquals(777, connectionFactory.getPrefetchPolicy().getQueuePrefetch());
|
||||
assertEquals(15, connectionFactory.getRedeliveryPolicy().getMaximumRedeliveries());
|
||||
assertEquals(32, connectionFactory.getRedeliveryPolicy().getBackOffMultiplier());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,6 +39,10 @@ public class ObjectFactoryTest extends CombinationTestSupport {
|
|||
factory.setUseCompression(true);
|
||||
factory.setUseRetroactiveConsumer(true);
|
||||
factory.setUserName("user");
|
||||
factory.getPrefetchPolicy().setQueuePrefetch(777);
|
||||
factory.getRedeliveryPolicy().setMaximumRedeliveries(15);
|
||||
factory.getRedeliveryPolicy().setBackOffMultiplier((short) 32);
|
||||
|
||||
|
||||
// Create reference
|
||||
Reference ref = JNDIReferenceFactory.createReference(factory.getClass().getName(), factory);
|
||||
|
@ -61,6 +65,9 @@ public class ObjectFactoryTest extends CombinationTestSupport {
|
|||
assertEquals(factory.isUseCompression(), temp.isUseCompression());
|
||||
assertEquals(factory.isUseRetroactiveConsumer(), temp.isUseRetroactiveConsumer());
|
||||
assertEquals(factory.getUserName(), temp.getUserName());
|
||||
assertEquals(factory.getPrefetchPolicy().getQueuePrefetch(), temp.getPrefetchPolicy().getQueuePrefetch());
|
||||
assertEquals(factory.getRedeliveryPolicy().getMaximumRedeliveries(), temp.getRedeliveryPolicy().getMaximumRedeliveries());
|
||||
assertEquals(factory.getRedeliveryPolicy().getBackOffMultiplier(), temp.getRedeliveryPolicy().getBackOffMultiplier());
|
||||
}
|
||||
|
||||
public void testDestination() throws Exception {
|
||||
|
|
Loading…
Reference in New Issue