diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java index 2ded69914f..6f31749612 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivation.java @@ -306,19 +306,28 @@ public class ActiveMQActivation { Exception firstException = null; + ClientSessionFactory cf = null; + for (int i = 0; i < spec.getMaxSession(); i++) { - ClientSessionFactory cf = null; + //if we are sharing the ceonnection only create 1 + if (!spec.isSingleConnection()) { + cf = null; + } ClientSession session = null; try { - cf = factory.getServerLocator().createSessionFactory(); + if (cf == null) { + cf = factory.getServerLocator().createSessionFactory(); + } session = setupSession(cf); ActiveMQMessageHandler handler = new ActiveMQMessageHandler(factory, this, ra.getTM(), (ClientSessionInternal) session, cf, i); handler.setup(); handlers.add(handler); } catch (Exception e) { if (cf != null) { - cf.close(); + if (!spec.isSingleConnection()) { + cf.close(); + } } if (session != null) { session.close(); diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivationSpec.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivationSpec.java index af111e7895..215775b6fa 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivationSpec.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivationSpec.java @@ -42,6 +42,8 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen private static final int DEFAULT_MAX_SESSION = 15; + private static final boolean DEFAULT_SINGLE_CONNECTION = false; + public String strConnectorClassName; public String strConnectionParameters; @@ -108,6 +110,11 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen */ private Integer maxSession; + /* + * Should we use a single connection for inbound + * */ + private Boolean singleConnection = false; + /** * Transaction timeout */ @@ -604,6 +611,35 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen maxSession = value; } + /** + * Get the number of max session + * + * @return The value + */ + public Boolean isSingleConnection() { + if (logger.isTraceEnabled()) { + logger.trace("getSingleConnection()"); + } + + if (singleConnection == null) { + return DEFAULT_SINGLE_CONNECTION; + } + + return singleConnection; + } + + /** + * Set the number of max session + * + * @param value The value + */ + public void setSingleConnection(final Boolean value) { + if (logger.isTraceEnabled()) { + logger.trace("setSingleConnection(" + value + ")"); + } + singleConnection = value; + } + /** * Get the transaction timeout * @@ -851,9 +887,11 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen return false; if (maxSession != null ? !maxSession.equals(that.maxSession) : that.maxSession != null) return false; + if (singleConnection != null ? !useJNDI.equals(that.useJNDI) : that.useJNDI != null) + return false; if (transactionTimeout != null ? !transactionTimeout.equals(that.transactionTimeout) : that.transactionTimeout != null) return false; - if (useJNDI != null ? !useJNDI.equals(that.useJNDI) : that.useJNDI != null) + if (useJNDI != null ? !singleConnection.equals(that.singleConnection) : that.singleConnection != null) return false; if (jndiParams != null ? !jndiParams.equals(that.jndiParams) : that.jndiParams != null) return false; @@ -890,6 +928,7 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen result = 31 * result + (user != null ? user.hashCode() : 0); result = 31 * result + (password != null ? password.hashCode() : 0); result = 31 * result + (maxSession != null ? maxSession.hashCode() : 0); + result = 31 * result + (singleConnection != null ? singleConnection.hashCode() : 0); result = 31 * result + (transactionTimeout != null ? transactionTimeout.hashCode() : 0); result = 31 * result + (useJNDI != null ? useJNDI.hashCode() : 0); result = 31 * result + (jndiParams != null ? jndiParams.hashCode() : 0); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ResourceAdapterTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ResourceAdapterTest.java index 2ad2477ab7..e382556f97 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ResourceAdapterTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/ra/ResourceAdapterTest.java @@ -463,12 +463,69 @@ public class ResourceAdapterTest extends ActiveMQTestBase { spec.setDestinationType("javax.jms.Topic"); spec.setDestination("test"); - spec.setMinSession(1); - spec.setMaxSession(1); + spec.setMinSession(10); + spec.setMaxSession(10); ActiveMQActivation activation = new ActiveMQActivation(ra, new MessageEndpointFactory(), spec); activation.start(); + Assert.assertEquals("wrong connection count ", server.getConnectionCount(), 11); + activation.stop(); + + ra.stop(); + + locator.close(); + + } finally { + server.stop(); + } + } + + @Test + public void testStartActivationSingleConnection() throws Exception { + ActiveMQServer server = createServer(false); + + try { + + server.start(); + ServerLocator locator = createInVMNonHALocator(); + ClientSessionFactory factory = createSessionFactory(locator); + ClientSession session = factory.createSession(false, false, false); + ActiveMQDestination queue = (ActiveMQDestination) ActiveMQJMSClient.createQueue("test"); + session.createQueue(new QueueConfiguration(queue.getSimpleAddress())); + session.close(); + + ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter(); + + ra.setConnectorClassName(INVM_CONNECTOR_FACTORY); + ra.setUserName("userGlobal"); + ra.setPassword("passwordGlobal"); + ra.start(new BootstrapContext()); + + Connection conn = ra.getDefaultActiveMQConnectionFactory().createConnection(); + + conn.close(); + + ActiveMQActivationSpec spec = new ActiveMQActivationSpec(); + + spec.setResourceAdapter(ra); + + spec.setUseJNDI(false); + + spec.setUser("user"); + spec.setPassword("password"); + + spec.setDestinationType("javax.jms.Topic"); + spec.setDestination("test"); + + spec.setMinSession(1); + spec.setMaxSession(10); + spec.setSingleConnection(true); + + ActiveMQActivation activation = new ActiveMQActivation(ra, new MessageEndpointFactory(), spec); + + activation.start(); + Assert.assertEquals("wrong connection count ", server.getConnectionCount(), 2); activation.stop(); ra.stop();