diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java index d0ee800b73..ca07725855 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java @@ -153,7 +153,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne * @return Returns the Connection. */ public Connection createConnection() throws JMSException { - return createActiveMQConnection(userName, password); + return createActiveMQConnection(); } /** @@ -168,7 +168,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne * @throws JMSException */ public QueueConnection createQueueConnection() throws JMSException { - return createActiveMQConnection(userName, password); + return createActiveMQConnection(); } /** @@ -183,7 +183,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne * @throws JMSException */ public TopicConnection createTopicConnection() throws JMSException { - return createActiveMQConnection(userName, password); + return createActiveMQConnection(); } /** @@ -204,17 +204,22 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne // // ///////////////////////////////////////////// + + protected ActiveMQConnection createActiveMQConnection() throws JMSException { + return createActiveMQConnection(userName, password); + } + /** * @return Returns the Connection. */ - private ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException { + protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException { if (brokerURL == null) { throw new ConfigurationException("brokerURL not set."); } Transport transport; try { transport = TransportFactory.connect(brokerURL,DEFAULT_CONNECTION_EXECUTOR); - ActiveMQConnection connection = new ActiveMQConnection(transport, factoryStats); + ActiveMQConnection connection = createActiveMQConnection(transport, factoryStats); connection.setUserName(userName); connection.setPassword(password); @@ -245,6 +250,11 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne } } + protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception { + ActiveMQConnection connection = new ActiveMQConnection(transport, stats); + return connection; + } + // ///////////////////////////////////////////// // // Property Accessors diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java new file mode 100644 index 0000000000..579cbe1c35 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java @@ -0,0 +1,86 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import org.apache.activemq.management.JMSStatsImpl; +import org.apache.activemq.transport.Transport; + +import javax.jms.JMSException; +import javax.jms.XAConnection; +import javax.jms.XAConnectionFactory; +import javax.jms.XAQueueConnection; +import javax.jms.XAQueueConnectionFactory; +import javax.jms.XATopicConnection; +import javax.jms.XATopicConnectionFactory; + +import java.net.URI; + +/** + * A factory of {@link XAConnection} instances + * + * @version $Revision$ + */ +public class ActiveMQXAConnectionFactory extends ActiveMQConnectionFactory implements XAConnectionFactory, XAQueueConnectionFactory, XATopicConnectionFactory { + + public ActiveMQXAConnectionFactory() { + } + + public ActiveMQXAConnectionFactory(String userName, String password, String brokerURL) { + super(userName, password, brokerURL); + } + + public ActiveMQXAConnectionFactory(String userName, String password, URI brokerURL) { + super(userName, password, brokerURL); + } + + public ActiveMQXAConnectionFactory(String brokerURL) { + super(brokerURL); + } + + public ActiveMQXAConnectionFactory(URI brokerURL) { + super(brokerURL); + } + + public XAConnection createXAConnection() throws JMSException { + return (XAConnection) createActiveMQConnection(); + } + + public XAConnection createXAConnection(String userName, String password) throws JMSException { + return (XAConnection) createActiveMQConnection(userName, password); + } + + public XAQueueConnection createXAQueueConnection() throws JMSException { + return (XAQueueConnection) createActiveMQConnection(); + } + + public XAQueueConnection createXAQueueConnection(String userName, String password) throws JMSException { + return (XAQueueConnection) createActiveMQConnection(userName, password); + } + + public XATopicConnection createXATopicConnection() throws JMSException { + return (XATopicConnection) createActiveMQConnection(); + } + + public XATopicConnection createXATopicConnection(String userName, String password) throws JMSException { + return (XATopicConnection) createActiveMQConnection(userName, password); + } + + protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception { + ActiveMQXAConnection connection = new ActiveMQXAConnection(transport, stats); + return connection; + } +} diff --git a/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java b/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java new file mode 100644 index 0000000000..5fba3b7295 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/ActiveMQXAConnectionFactoryTest.java @@ -0,0 +1,126 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.net.URI; +import java.net.URISyntaxException; + +import javax.jms.*; +import javax.jms.JMSException; + +import org.apache.activemq.ActiveMQXAConnectionFactory; +import org.apache.activemq.broker.BrokerRegistry; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; + +public class ActiveMQXAConnectionFactoryTest extends CombinationTestSupport { + + public void testUseURIToSetOptionsOnConnectionFactory() throws URISyntaxException, JMSException { + ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("vm://localhost?jms.useAsyncSend=true"); + assertTrue(cf.isUseAsyncSend()); + // the broker url have been adjusted. + assertEquals("vm://localhost", cf.getBrokerURL()); + + cf = new ActiveMQXAConnectionFactory("vm://localhost?jms.useAsyncSend=false"); + assertFalse(cf.isUseAsyncSend()); + // the broker url have been adjusted. + assertEquals("vm://localhost", cf.getBrokerURL()); + + cf = new ActiveMQXAConnectionFactory("vm:(broker:()/localhost)?jms.useAsyncSend=true"); + assertTrue(cf.isUseAsyncSend()); + // the broker url have been adjusted. + assertEquals("vm:(broker:()/localhost)", cf.getBrokerURL()); + } + + public void testCreateVMConnectionWithEmbdeddBroker() throws URISyntaxException, JMSException { + ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false"); + // Make sure the broker is not created until the connection is instantiated. + assertNull( BrokerRegistry.getInstance().lookup("localhost") ); + Connection connection = cf.createConnection(); + // This should create the connection. + assertNotNull(connection); + // Verify the broker was created. + assertNotNull( BrokerRegistry.getInstance().lookup("localhost") ); + connection.close(); + // Verify the broker was destroyed. + assertNull( BrokerRegistry.getInstance().lookup("localhost") ); + } + + public void testGetBrokerName() throws URISyntaxException, JMSException { + ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("vm://localhost?broker.persistent=false"); + ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); + connection.start(); + + String brokerName = connection.getBrokerName(); + System.out.println("Got broker name: " + brokerName); + + assertNotNull("No broker name available!", brokerName); + connection.close(); + } + + public void testCreateTcpConnectionUsingAllocatedPort() throws Exception { + assertCreateConnection("tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true"); + } + public void testCreateTcpConnectionUsingKnownPort() throws Exception { + assertCreateConnection("tcp://localhost:61610?wireFormat.tcpNoDelayEnabled=true"); + } + + protected void assertCreateConnection(String uri) throws Exception { + // Start up a broker with a tcp connector. + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + TransportConnector connector = broker.addConnector(uri); + broker.start(); + + URI temp = new URI(uri); + //URI connectURI = connector.getServer().getConnectURI(); + // TODO this sometimes fails when using the actual local host name + URI currentURI = connector.getServer().getConnectURI(); + + // sometimes the actual host name doesn't work in this test case + // e.g. on OS X so lets use the original details but just use the actual port + URI connectURI = new URI(temp.getScheme(), temp.getUserInfo(), temp.getHost(), currentURI.getPort(), temp.getPath(), temp.getQuery(), temp.getFragment()); + + + System.out.println("connection URI is: " + connectURI); + + // This should create the connection. + ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory(connectURI); + Connection connection = cf.createConnection(); + + assertXAConnection(connection); + + assertNotNull(connection); + connection.close(); + + connection = cf.createXAConnection(); + + assertXAConnection(connection); + + assertNotNull(connection); + connection.close(); + + broker.stop(); + } + + private void assertXAConnection(Connection connection) { + assertTrue("Should be an XAConnection", connection instanceof XAConnection); + assertTrue("Should be an XATopicConnection", connection instanceof XATopicConnection); + assertTrue("Should be an XAQueueConnection", connection instanceof XAQueueConnection); + } + +}