diff --git a/activemq-core/src/main/java/org/apache/activemq/spring/ActiveMQConnectionFactoryFactoryBean.java b/activemq-core/src/main/java/org/apache/activemq/spring/ActiveMQConnectionFactoryFactoryBean.java new file mode 100644 index 0000000000..67b26d4364 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/spring/ActiveMQConnectionFactoryFactoryBean.java @@ -0,0 +1,170 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.spring; + +import java.util.ArrayList; +import java.util.List; + +import org.springframework.beans.factory.FactoryBean; + +/** + * A helper class for creating a failover configured {@link ActiveMQConnectionFactory} + * which supports one or more TCP based hostname/ports which can all be configured in a + * consistent way without too much URL hacking. + * + * @version $Revision: 1.1 $ + */ +public class ActiveMQConnectionFactoryFactoryBean implements FactoryBean { + private List tcpHostAndPorts = new ArrayList(); + + // tcp properties + private Long maxInactivityDuration; + private String tcpProperties; + + // failover properties + private Long maxReconnectDelay; + private String failoverProperties; + + public Object getObject() throws Exception { + ActiveMQConnectionFactory answer = new ActiveMQConnectionFactory(); + String brokerURL = getBrokerURL(); + answer.setBrokerURL(brokerURL); + return answer; + } + + public String getBrokerURL() { + StringBuffer buffer = new StringBuffer("failover:("); + int counter = 0; + for (String tcpHostAndPort : tcpHostAndPorts) { + if (counter++ > 0) { + buffer.append(","); + } + buffer.append(createTcpHostAndPortUrl(tcpHostAndPort)); + } + buffer.append(")"); + + List parameters = new ArrayList(); + if (maxReconnectDelay != null) { + parameters.add("maxReconnectDelay=" + maxReconnectDelay); + } + if (notEmpty(failoverProperties)) { + parameters.add(failoverProperties); + } + buffer.append(asQueryString(parameters)); + return buffer.toString(); + } + + public Class getObjectType() { + return ActiveMQConnectionFactory.class; + } + + public boolean isSingleton() { + return true; + } + + // Properties + //------------------------------------------------------------------------- + + public List getTcpHostAndPorts() { + return tcpHostAndPorts; + } + + public void setTcpHostAndPorts(List tcpHostAndPorts) { + this.tcpHostAndPorts = tcpHostAndPorts; + } + + public void setTcpHostAndPort(String tcpHostAndPort) { + tcpHostAndPorts = new ArrayList(); + tcpHostAndPorts.add(tcpHostAndPort); + } + + public Long getMaxInactivityDuration() { + return maxInactivityDuration; + } + + public void setMaxInactivityDuration(Long maxInactivityDuration) { + this.maxInactivityDuration = maxInactivityDuration; + } + + public String getTcpProperties() { + return tcpProperties; + } + + public void setTcpProperties(String tcpProperties) { + this.tcpProperties = tcpProperties; + } + + public Long getMaxReconnectDelay() { + return maxReconnectDelay; + } + + public void setMaxReconnectDelay(Long maxReconnectDelay) { + this.maxReconnectDelay = maxReconnectDelay; + } + + public String getFailoverProperties() { + return failoverProperties; + } + + public void setFailoverProperties(String failoverProperties) { + this.failoverProperties = failoverProperties; + } + + // Implementation methods + //------------------------------------------------------------------------- + + /** + * Turns a list of query string key=value strings into a query URL string + * of the form "?a=x&b=y" + */ + protected String asQueryString(List parameters) { + int size = parameters.size(); + if (size < 1) { + return ""; + } + else { + StringBuffer buffer = new StringBuffer("?"); + buffer.append(parameters.get(0)); + for (int i = 1; i < size; i++) { + buffer.append("&"); + buffer.append(parameters.get(i)); + } + return buffer.toString(); + } + } + + /** + * Allows us to add any TCP specific URI configurations + */ + protected String createTcpHostAndPortUrl(String tcpHostAndPort) { + List parameters = new ArrayList(); + if (maxInactivityDuration != null) { + parameters.add("wireFormat.maxInactivityDuration=" + maxInactivityDuration); + } + if (notEmpty(tcpProperties)) { + parameters.add(tcpProperties); + } + return tcpHostAndPort + asQueryString(parameters); + } + + + protected boolean notEmpty(String text) { + return text != null && text.length() > 0; + } + +} diff --git a/activemq-core/src/test/java/org/apache/activemq/spring/ActiveMQConnectionFactoryFactoryBeanTest.java b/activemq-core/src/test/java/org/apache/activemq/spring/ActiveMQConnectionFactoryFactoryBeanTest.java new file mode 100644 index 0000000000..2b24cf66a4 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/spring/ActiveMQConnectionFactoryFactoryBeanTest.java @@ -0,0 +1,94 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.spring; + +import java.util.Arrays; + +import junit.framework.TestCase; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * @version $Revision: 1.1 $ + */ +public class ActiveMQConnectionFactoryFactoryBeanTest extends TestCase { + private static final transient Log LOG = LogFactory.getLog(ActiveMQConnectionFactoryFactoryBeanTest.class); + + private ActiveMQConnectionFactoryFactoryBean factory; + + + public void testSingleTcpURL() throws Exception { + factory.setTcpHostAndPort("tcp://localhost:61616"); + assertCreatedURL("failover:(tcp://localhost:61616)"); + } + + public void testSingleTcpURLWithInactivityTimeout() throws Exception { + factory.setTcpHostAndPort("tcp://localhost:61616"); + factory.setMaxInactivityDuration(60000L); + assertCreatedURL("failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=60000)"); + } + + public void testSingleTcpURLWithInactivityTimeoutAndTcpNoDelay() throws Exception { + factory.setTcpHostAndPort("tcp://localhost:61616"); + factory.setMaxInactivityDuration(50000L); + factory.setTcpProperties("tcpNoDelayEnabled=true"); + assertCreatedURL("failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=50000&tcpNoDelayEnabled=true)"); + } + + public void testSingleTcpURLWithInactivityTimeoutAndMaxReconnectDelay() throws Exception { + factory.setTcpHostAndPort("tcp://localhost:61616"); + factory.setMaxInactivityDuration(60000L); + factory.setMaxReconnectDelay(50000L); + assertCreatedURL("failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=60000)?maxReconnectDelay=50000"); + } + + public void testSingleTcpURLWithInactivityTimeoutAndMaxReconnectDelayAndFailoverProperty() throws Exception { + factory.setTcpHostAndPort("tcp://localhost:61616"); + factory.setMaxInactivityDuration(40000L); + factory.setMaxReconnectDelay(30000L); + factory.setFailoverProperties("useExponentialBackOff=false"); + + assertCreatedURL("failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=40000)?maxReconnectDelay=30000&useExponentialBackOff=false"); + } + + public void testMultipleTcpURLsWithInactivityTimeoutAndMaxReconnectDelayAndFailoverProperty() throws Exception { + factory.setTcpHostAndPorts(Arrays.asList(new String[] {"tcp://localhost:61618", "tcp://foo:61619"})); + factory.setMaxInactivityDuration(40000L); + factory.setMaxReconnectDelay(30000L); + factory.setFailoverProperties("useExponentialBackOff=false"); + + assertCreatedURL("failover:(tcp://localhost:61618?wireFormat.maxInactivityDuration=40000,tcp://foo:61619?wireFormat.maxInactivityDuration=40000)?maxReconnectDelay=30000&useExponentialBackOff=false"); + } + + protected void assertCreatedURL(String expectedURL) throws Exception { + String url = factory.getBrokerURL(); + LOG.debug("Generated URL: " + url); + + assertEquals("URL", expectedURL, url); + Object value = factory.getObject(); + assertTrue("Value should be an ActiveMQConnectionFactory", value instanceof ActiveMQConnectionFactory); + ActiveMQConnectionFactory connectionFactory = (ActiveMQConnectionFactory) value; + String brokerURL = connectionFactory.getBrokerURL(); + assertEquals("brokerURL", expectedURL, brokerURL); + } + + @Override + protected void setUp() throws Exception { + factory = new ActiveMQConnectionFactoryFactoryBean(); + } +}