This closes #1339
This commit is contained in:
commit
7faaddc419
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.ra;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.JMSRuntimeException;
|
||||
import javax.resource.NotSupportedException;
|
||||
|
||||
|
@ -62,4 +63,8 @@ public interface ActiveMQRABundle {
|
|||
|
||||
@Message(id = 159006, value = "Invalid Session Mode {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||
JMSRuntimeException invalidAcknowledgeMode(int sessionMode);
|
||||
|
||||
@Message(id = 159007, value = "Invalid Session Mode SESSION_TRANSACTED, to enable Local Transacted Sessions you can " +
|
||||
"set the allowLocalTransactions (allow-local-transactions) on the resource adapter")
|
||||
JMSException invalidSessionTransactedModeRuntimeAllowLocal();
|
||||
}
|
||||
|
|
|
@ -43,6 +43,7 @@ public class ActiveMQRAMCFProperties extends ConnectionFactoryProperties impleme
|
|||
* The topic type
|
||||
*/
|
||||
private static final String TOPIC_TYPE = Topic.class.getName();
|
||||
protected boolean allowLocalTransactions;
|
||||
|
||||
private String strConnectorClassName;
|
||||
|
||||
|
@ -171,4 +172,12 @@ public class ActiveMQRAMCFProperties extends ConnectionFactoryProperties impleme
|
|||
|
||||
this.useTryLock = useTryLock;
|
||||
}
|
||||
|
||||
public boolean isAllowLocalTransactions() {
|
||||
return allowLocalTransactions;
|
||||
}
|
||||
|
||||
public void setAllowLocalTransactions(boolean allowLocalTransactions) {
|
||||
this.allowLocalTransactions = allowLocalTransactions;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -600,10 +600,19 @@ public final class ActiveMQRAManagedConnectionFactory implements ManagedConnecti
|
|||
return mcfProperties.isHA();
|
||||
}
|
||||
|
||||
public void setAllowLocalTransactions(Boolean allowLocalTransactions) {
|
||||
mcfProperties.setAllowLocalTransactions(allowLocalTransactions);
|
||||
}
|
||||
|
||||
public Boolean isAllowLocalTransactions() {
|
||||
return mcfProperties.isAllowLocalTransactions();
|
||||
}
|
||||
|
||||
public void setHA(Boolean ha) {
|
||||
mcfProperties.setHA(ha);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get the useTryLock.
|
||||
*
|
||||
|
|
|
@ -37,6 +37,7 @@ public class ActiveMQRAProperties extends ConnectionFactoryProperties implements
|
|||
* Trace enabled
|
||||
*/
|
||||
private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled();
|
||||
protected boolean allowLocalTransactions;
|
||||
|
||||
/**
|
||||
* The user name
|
||||
|
@ -281,4 +282,11 @@ public class ActiveMQRAProperties extends ConnectionFactoryProperties implements
|
|||
this.jgroupsChannelRefName = jgroupsChannelRefName;
|
||||
}
|
||||
|
||||
public boolean isAllowLocalTransactions() {
|
||||
return allowLocalTransactions;
|
||||
}
|
||||
|
||||
public void setAllowLocalTransactions(boolean allowLocalTransactions) {
|
||||
this.allowLocalTransactions = allowLocalTransactions;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -120,6 +120,8 @@ public final class ActiveMQRASessionFactoryImpl extends ActiveMQConnectionForCon
|
|||
*/
|
||||
private final Set<TemporaryTopic> tempTopics = new HashSet<>();
|
||||
|
||||
private boolean allowLocalTransaction;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*
|
||||
|
@ -829,24 +831,32 @@ public final class ActiveMQRASessionFactoryImpl extends ActiveMQConnectionForCon
|
|||
//In the Java EE web or EJB container, when there is no active JTA transaction in progress
|
||||
// The argument {@code transacted} is ignored.
|
||||
|
||||
//The session will always be non-transacted,
|
||||
transacted = false;
|
||||
switch (acknowledgeMode) {
|
||||
//using one of the two acknowledgement modes AUTO_ACKNOWLEDGE and DUPS_OK_ACKNOWLEDGE.
|
||||
case Session.AUTO_ACKNOWLEDGE:
|
||||
case Session.DUPS_OK_ACKNOWLEDGE:
|
||||
//plus our own
|
||||
case ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE:
|
||||
case ActiveMQJMSConstants.PRE_ACKNOWLEDGE:
|
||||
break;
|
||||
//The value {@code Session.CLIENT_ACKNOWLEDGE} may not be used.
|
||||
case Session.CLIENT_ACKNOWLEDGE:
|
||||
throw ActiveMQRABundle.BUNDLE.invalidClientAcknowledgeModeRuntime();
|
||||
//same with this although the spec doesn't explicitly say
|
||||
case Session.SESSION_TRANSACTED:
|
||||
throw ActiveMQRABundle.BUNDLE.invalidSessionTransactedModeRuntime();
|
||||
default:
|
||||
throw ActiveMQRABundle.BUNDLE.invalidAcknowledgeMode(acknowledgeMode);
|
||||
//The session will always be non-transacted, unless allow-local-transactions is true
|
||||
if (transacted && mcf.isAllowLocalTransactions()) {
|
||||
acknowledgeMode = Session.SESSION_TRANSACTED;
|
||||
} else {
|
||||
transacted = false;
|
||||
switch (acknowledgeMode) {
|
||||
//using one of the two acknowledgement modes AUTO_ACKNOWLEDGE and DUPS_OK_ACKNOWLEDGE.
|
||||
case Session.AUTO_ACKNOWLEDGE:
|
||||
case Session.DUPS_OK_ACKNOWLEDGE:
|
||||
//plus our own
|
||||
case ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE:
|
||||
case ActiveMQJMSConstants.PRE_ACKNOWLEDGE:
|
||||
break;
|
||||
//The value {@code Session.CLIENT_ACKNOWLEDGE} may not be used.
|
||||
case Session.CLIENT_ACKNOWLEDGE:
|
||||
throw ActiveMQRABundle.BUNDLE.invalidClientAcknowledgeModeRuntime();
|
||||
//same with this although the spec doesn't explicitly say
|
||||
case Session.SESSION_TRANSACTED:
|
||||
if (!mcf.isAllowLocalTransactions()) {
|
||||
throw ActiveMQRABundle.BUNDLE.invalidSessionTransactedModeRuntimeAllowLocal();
|
||||
}
|
||||
transacted = true;
|
||||
break;
|
||||
default:
|
||||
throw ActiveMQRABundle.BUNDLE.invalidAcknowledgeMode(acknowledgeMode);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -53,6 +53,7 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen
|
|||
public String strConnectorClassName;
|
||||
|
||||
public String strConnectionParameters;
|
||||
protected boolean allowLocalTransactions;
|
||||
|
||||
/**
|
||||
* The resource adapter
|
||||
|
@ -818,6 +819,14 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen
|
|||
public void setMaxMessages(final Integer value) {
|
||||
}
|
||||
|
||||
public boolean isAllowLocalTransactions() {
|
||||
return allowLocalTransactions;
|
||||
}
|
||||
|
||||
public void setAllowLocalTransactions(boolean allowLocalTransactions) {
|
||||
this.allowLocalTransactions = allowLocalTransactions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o)
|
||||
|
|
|
@ -0,0 +1,271 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.integration.ra;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
||||
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
|
||||
import org.apache.activemq.artemis.core.security.Role;
|
||||
import org.apache.activemq.artemis.ra.ActiveMQRAConnectionFactory;
|
||||
import org.apache.activemq.artemis.ra.ActiveMQRAConnectionFactoryImpl;
|
||||
import org.apache.activemq.artemis.ra.ActiveMQRAConnectionManager;
|
||||
import org.apache.activemq.artemis.ra.ActiveMQRAManagedConnectionFactory;
|
||||
import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
|
||||
import org.apache.activemq.artemis.service.extensions.ServiceUtils;
|
||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.JMSContext;
|
||||
import javax.jms.JMSProducer;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.QueueConnection;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
public class OutgoingConnectionTestNoJTA extends ActiveMQRATestBase {
|
||||
|
||||
protected ActiveMQResourceAdapter resourceAdapter;
|
||||
protected ActiveMQRAConnectionFactory qraConnectionFactory;
|
||||
protected ActiveMQRAManagedConnectionFactory mcf;
|
||||
ActiveMQRAConnectionManager qraConnectionManager = new ActiveMQRAConnectionManager();
|
||||
|
||||
@Override
|
||||
public boolean useSecurity() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
useDummyTransactionManager();
|
||||
super.setUp();
|
||||
((ActiveMQJAASSecurityManager) server.getSecurityManager()).getConfiguration().addUser("testuser", "testpassword");
|
||||
((ActiveMQJAASSecurityManager) server.getSecurityManager()).getConfiguration().addUser("guest", "guest");
|
||||
((ActiveMQJAASSecurityManager) server.getSecurityManager()).getConfiguration().setDefaultUser("guest");
|
||||
((ActiveMQJAASSecurityManager) server.getSecurityManager()).getConfiguration().addRole("testuser", "arole");
|
||||
((ActiveMQJAASSecurityManager) server.getSecurityManager()).getConfiguration().addRole("guest", "arole");
|
||||
Role role = new Role("arole", true, true, true, true, true, true, true, true);
|
||||
Set<Role> roles = new HashSet<>();
|
||||
roles.add(role);
|
||||
server.getSecurityRepository().addMatch(MDBQUEUEPREFIXED, roles);
|
||||
|
||||
resourceAdapter = new ActiveMQResourceAdapter();
|
||||
resourceAdapter.setEntries("[\"java://jmsXA\"]");
|
||||
resourceAdapter.setConnectorClassName(InVMConnectorFactory.class.getName());
|
||||
MyBootstrapContext ctx = new MyBootstrapContext();
|
||||
resourceAdapter.start(ctx);
|
||||
mcf = new ActiveMQRAManagedConnectionFactory();
|
||||
mcf.setAllowLocalTransactions(true);
|
||||
mcf.setResourceAdapter(resourceAdapter);
|
||||
qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager);
|
||||
}
|
||||
|
||||
@Override
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
((DummyTransactionManager) ServiceUtils.getTransactionManager()).tx = null;
|
||||
if (resourceAdapter != null) {
|
||||
resourceAdapter.stop();
|
||||
}
|
||||
|
||||
qraConnectionManager.stop();
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleMessageSendAndReceiveSessionTransacted() throws Exception {
|
||||
setupDLQ(10);
|
||||
resourceAdapter = newResourceAdapter();
|
||||
MyBootstrapContext ctx = new MyBootstrapContext();
|
||||
resourceAdapter.start(ctx);
|
||||
ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory();
|
||||
mcf.setAllowLocalTransactions(true);
|
||||
mcf.setResourceAdapter(resourceAdapter);
|
||||
ActiveMQRAConnectionFactory qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager);
|
||||
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
|
||||
Session s = queueConnection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
Queue q = ActiveMQJMSClient.createQueue(MDBQUEUE);
|
||||
MessageProducer mp = s.createProducer(q);
|
||||
MessageConsumer consumer = s.createConsumer(q);
|
||||
Message message = s.createTextMessage("test");
|
||||
mp.send(message);
|
||||
s.commit();
|
||||
queueConnection.start();
|
||||
TextMessage textMessage = (TextMessage) consumer.receive(1000);
|
||||
assertNotNull(textMessage);
|
||||
assertEquals(textMessage.getText(), "test");
|
||||
s.rollback();
|
||||
textMessage = (TextMessage) consumer.receive(1000);
|
||||
assertNotNull(textMessage);
|
||||
assertEquals(textMessage.getText(), "test");
|
||||
s.commit();
|
||||
textMessage = (TextMessage) consumer.receiveNoWait();
|
||||
assertNull(textMessage);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleMessageSendAndReceiveNotTransacted() throws Exception {
|
||||
setupDLQ(10);
|
||||
resourceAdapter = newResourceAdapter();
|
||||
MyBootstrapContext ctx = new MyBootstrapContext();
|
||||
resourceAdapter.start(ctx);
|
||||
ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory();
|
||||
mcf.setAllowLocalTransactions(true);
|
||||
mcf.setResourceAdapter(resourceAdapter);
|
||||
ActiveMQRAConnectionFactory qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager);
|
||||
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
|
||||
Session s = queueConnection.createSession(false, Session.SESSION_TRANSACTED);
|
||||
Queue q = ActiveMQJMSClient.createQueue(MDBQUEUE);
|
||||
MessageProducer mp = s.createProducer(q);
|
||||
MessageConsumer consumer = s.createConsumer(q);
|
||||
Message message = s.createTextMessage("test");
|
||||
mp.send(message);
|
||||
s.commit();
|
||||
queueConnection.start();
|
||||
TextMessage textMessage = (TextMessage) consumer.receive(1000);
|
||||
assertNotNull(textMessage);
|
||||
assertEquals(textMessage.getText(), "test");
|
||||
s.rollback();
|
||||
textMessage = (TextMessage) consumer.receive(1000);
|
||||
assertNotNull(textMessage);
|
||||
assertEquals(textMessage.getText(), "test");
|
||||
s.commit();
|
||||
textMessage = (TextMessage) consumer.receiveNoWait();
|
||||
assertNull(textMessage);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleMessageSendAndReceiveSessionTransacted2() throws Exception {
|
||||
setupDLQ(10);
|
||||
resourceAdapter = newResourceAdapter();
|
||||
MyBootstrapContext ctx = new MyBootstrapContext();
|
||||
resourceAdapter.start(ctx);
|
||||
ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory();
|
||||
mcf.setAllowLocalTransactions(true);
|
||||
mcf.setResourceAdapter(resourceAdapter);
|
||||
ActiveMQRAConnectionFactory qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager);
|
||||
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
|
||||
Session s = queueConnection.createSession(Session.SESSION_TRANSACTED);
|
||||
Queue q = ActiveMQJMSClient.createQueue(MDBQUEUE);
|
||||
MessageProducer mp = s.createProducer(q);
|
||||
MessageConsumer consumer = s.createConsumer(q);
|
||||
Message message = s.createTextMessage("test");
|
||||
mp.send(message);
|
||||
s.commit();
|
||||
queueConnection.start();
|
||||
TextMessage textMessage = (TextMessage) consumer.receive(1000);
|
||||
assertNotNull(textMessage);
|
||||
assertEquals(textMessage.getText(), "test");
|
||||
s.rollback();
|
||||
textMessage = (TextMessage) consumer.receive(1000);
|
||||
assertNotNull(textMessage);
|
||||
assertEquals(textMessage.getText(), "test");
|
||||
s.commit();
|
||||
textMessage = (TextMessage) consumer.receiveNoWait();
|
||||
assertNull(textMessage);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void sessionTransactedTestNoActiveJTATx() throws Exception {
|
||||
JMSContext context = qraConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);
|
||||
assertEquals(context.getSessionMode(), JMSContext.AUTO_ACKNOWLEDGE);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testQueuSessionAckMode() throws Exception {
|
||||
|
||||
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
|
||||
|
||||
Session s = queueConnection.createSession(false, Session.SESSION_TRANSACTED);
|
||||
|
||||
s.close();
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
public void testSimpleSendNoXAJMSContext() throws Exception {
|
||||
Queue q = ActiveMQJMSClient.createQueue(MDBQUEUE);
|
||||
|
||||
try (ClientSessionFactory sf = locator.createSessionFactory();
|
||||
ClientSession session = sf.createSession();
|
||||
ClientConsumer consVerify = session.createConsumer("jms.queue." + MDBQUEUE);
|
||||
JMSContext jmsctx = qraConnectionFactory.createContext();
|
||||
) {
|
||||
session.start();
|
||||
// These next 4 lines could be written in a single line however it makes difficult for debugging
|
||||
JMSProducer producer = jmsctx.createProducer();
|
||||
producer.setProperty("strvalue", "hello");
|
||||
TextMessage msgsend = jmsctx.createTextMessage("hello");
|
||||
producer.send(q, msgsend);
|
||||
|
||||
ClientMessage msg = consVerify.receive(1000);
|
||||
assertNotNull(msg);
|
||||
assertEquals("hello", msg.getStringProperty("strvalue"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleMessageSendAndReceive() throws Exception {
|
||||
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
|
||||
Session s = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue q = ActiveMQJMSClient.createQueue(MDBQUEUE);
|
||||
MessageProducer mp = s.createProducer(q);
|
||||
MessageConsumer consumer = s.createConsumer(q);
|
||||
Message message = s.createTextMessage("test");
|
||||
mp.send(message);
|
||||
queueConnection.start();
|
||||
TextMessage textMessage = (TextMessage) consumer.receive(1000);
|
||||
assertNotNull(textMessage);
|
||||
assertEquals(textMessage.getText(), "test");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleSendNoXAJMS1() throws Exception {
|
||||
Queue q = ActiveMQJMSClient.createQueue(MDBQUEUE);
|
||||
try (ClientSessionFactory sf = locator.createSessionFactory();
|
||||
ClientSession session = sf.createSession();
|
||||
ClientConsumer consVerify = session.createConsumer("jms.queue." + MDBQUEUE);
|
||||
Connection conn = qraConnectionFactory.createConnection();
|
||||
) {
|
||||
Session jmsSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
session.start();
|
||||
MessageProducer producer = jmsSess.createProducer(q);
|
||||
// These next 4 lines could be written in a single line however it makes difficult for debugging
|
||||
TextMessage msgsend = jmsSess.createTextMessage("hello");
|
||||
msgsend.setStringProperty("strvalue", "hello");
|
||||
producer.send(msgsend);
|
||||
|
||||
ClientMessage msg = consVerify.receive(1000);
|
||||
assertNotNull(msg);
|
||||
assertEquals("hello", msg.getStringProperty("strvalue"));
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue