ARTEMIS-3539 allow a single connection for MDB's

https://issues.apache.org/jira/browse/ARTEMIS-3539
This commit is contained in:
Andy Taylor 2021-05-10 17:32:29 +01:00 committed by Bruscino Domenico Francesco
parent 3191d0c929
commit 6622675848
3 changed files with 111 additions and 6 deletions

View File

@ -306,19 +306,28 @@ public class ActiveMQActivation {
Exception firstException = null;
ClientSessionFactory cf = null;
for (int i = 0; i < spec.getMaxSession(); i++) {
ClientSessionFactory cf = null;
//if we are sharing the ceonnection only create 1
if (!spec.isSingleConnection()) {
cf = null;
}
ClientSession session = null;
try {
cf = factory.getServerLocator().createSessionFactory();
if (cf == null) {
cf = factory.getServerLocator().createSessionFactory();
}
session = setupSession(cf);
ActiveMQMessageHandler handler = new ActiveMQMessageHandler(factory, this, ra.getTM(), (ClientSessionInternal) session, cf, i);
handler.setup();
handlers.add(handler);
} catch (Exception e) {
if (cf != null) {
cf.close();
if (!spec.isSingleConnection()) {
cf.close();
}
}
if (session != null) {
session.close();

View File

@ -42,6 +42,8 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen
private static final int DEFAULT_MAX_SESSION = 15;
private static final boolean DEFAULT_SINGLE_CONNECTION = false;
public String strConnectorClassName;
public String strConnectionParameters;
@ -108,6 +110,11 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen
*/
private Integer maxSession;
/*
* Should we use a single connection for inbound
* */
private Boolean singleConnection = false;
/**
* Transaction timeout
*/
@ -604,6 +611,35 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen
maxSession = value;
}
/**
* Get the number of max session
*
* @return The value
*/
public Boolean isSingleConnection() {
if (logger.isTraceEnabled()) {
logger.trace("getSingleConnection()");
}
if (singleConnection == null) {
return DEFAULT_SINGLE_CONNECTION;
}
return singleConnection;
}
/**
* Set the number of max session
*
* @param value The value
*/
public void setSingleConnection(final Boolean value) {
if (logger.isTraceEnabled()) {
logger.trace("setSingleConnection(" + value + ")");
}
singleConnection = value;
}
/**
* Get the transaction timeout
*
@ -851,9 +887,11 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen
return false;
if (maxSession != null ? !maxSession.equals(that.maxSession) : that.maxSession != null)
return false;
if (singleConnection != null ? !useJNDI.equals(that.useJNDI) : that.useJNDI != null)
return false;
if (transactionTimeout != null ? !transactionTimeout.equals(that.transactionTimeout) : that.transactionTimeout != null)
return false;
if (useJNDI != null ? !useJNDI.equals(that.useJNDI) : that.useJNDI != null)
if (useJNDI != null ? !singleConnection.equals(that.singleConnection) : that.singleConnection != null)
return false;
if (jndiParams != null ? !jndiParams.equals(that.jndiParams) : that.jndiParams != null)
return false;
@ -890,6 +928,7 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen
result = 31 * result + (user != null ? user.hashCode() : 0);
result = 31 * result + (password != null ? password.hashCode() : 0);
result = 31 * result + (maxSession != null ? maxSession.hashCode() : 0);
result = 31 * result + (singleConnection != null ? singleConnection.hashCode() : 0);
result = 31 * result + (transactionTimeout != null ? transactionTimeout.hashCode() : 0);
result = 31 * result + (useJNDI != null ? useJNDI.hashCode() : 0);
result = 31 * result + (jndiParams != null ? jndiParams.hashCode() : 0);

View File

@ -463,12 +463,69 @@ public class ResourceAdapterTest extends ActiveMQTestBase {
spec.setDestinationType("javax.jms.Topic");
spec.setDestination("test");
spec.setMinSession(1);
spec.setMaxSession(1);
spec.setMinSession(10);
spec.setMaxSession(10);
ActiveMQActivation activation = new ActiveMQActivation(ra, new MessageEndpointFactory(), spec);
activation.start();
Assert.assertEquals("wrong connection count ", server.getConnectionCount(), 11);
activation.stop();
ra.stop();
locator.close();
} finally {
server.stop();
}
}
@Test
public void testStartActivationSingleConnection() throws Exception {
ActiveMQServer server = createServer(false);
try {
server.start();
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory factory = createSessionFactory(locator);
ClientSession session = factory.createSession(false, false, false);
ActiveMQDestination queue = (ActiveMQDestination) ActiveMQJMSClient.createQueue("test");
session.createQueue(new QueueConfiguration(queue.getSimpleAddress()));
session.close();
ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter();
ra.setConnectorClassName(INVM_CONNECTOR_FACTORY);
ra.setUserName("userGlobal");
ra.setPassword("passwordGlobal");
ra.start(new BootstrapContext());
Connection conn = ra.getDefaultActiveMQConnectionFactory().createConnection();
conn.close();
ActiveMQActivationSpec spec = new ActiveMQActivationSpec();
spec.setResourceAdapter(ra);
spec.setUseJNDI(false);
spec.setUser("user");
spec.setPassword("password");
spec.setDestinationType("javax.jms.Topic");
spec.setDestination("test");
spec.setMinSession(1);
spec.setMaxSession(10);
spec.setSingleConnection(true);
ActiveMQActivation activation = new ActiveMQActivation(ra, new MessageEndpointFactory(), spec);
activation.start();
Assert.assertEquals("wrong connection count ", server.getConnectionCount(), 2);
activation.stop();
ra.stop();