From 0bc9ab2c49bbf1796523280c1606530ac3687005 Mon Sep 17 00:00:00 2001 From: Andy Taylor Date: Wed, 7 Jun 2017 09:37:25 +0100 Subject: [PATCH] ARTEMIS-1211 - Allow local transactions when no jta in Resource Adapter https://issues.apache.org/jira/browse/ARTEMIS-1211 --- .../activemq/artemis/ra/ActiveMQRABundle.java | 5 + .../artemis/ra/ActiveMQRAMCFProperties.java | 9 + .../ActiveMQRAManagedConnectionFactory.java | 9 + .../artemis/ra/ActiveMQRAProperties.java | 8 + .../ra/ActiveMQRASessionFactoryImpl.java | 46 +-- .../ra/inflow/ActiveMQActivationSpec.java | 9 + .../ra/OutgoingConnectionTestNoJTA.java | 271 ++++++++++++++++++ 7 files changed, 339 insertions(+), 18 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/OutgoingConnectionTestNoJTA.java diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRABundle.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRABundle.java index f2e97227f8..b2842682a3 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRABundle.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRABundle.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.ra; +import javax.jms.JMSException; import javax.jms.JMSRuntimeException; import javax.resource.NotSupportedException; @@ -62,4 +63,8 @@ public interface ActiveMQRABundle { @Message(id = 159006, value = "Invalid Session Mode {0}", format = Message.Format.MESSAGE_FORMAT) JMSRuntimeException invalidAcknowledgeMode(int sessionMode); + + @Message(id = 159007, value = "Invalid Session Mode SESSION_TRANSACTED, to enable Local Transacted Sessions you can " + + "set the allowLocalTransactions (allow-local-transactions) on the resource adapter") + JMSException invalidSessionTransactedModeRuntimeAllowLocal(); } diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAMCFProperties.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAMCFProperties.java index 945d4341c7..30923fd4af 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAMCFProperties.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAMCFProperties.java @@ -43,6 +43,7 @@ public class ActiveMQRAMCFProperties extends ConnectionFactoryProperties impleme * The topic type */ private static final String TOPIC_TYPE = Topic.class.getName(); + protected boolean allowLocalTransactions; private String strConnectorClassName; @@ -171,4 +172,12 @@ public class ActiveMQRAMCFProperties extends ConnectionFactoryProperties impleme this.useTryLock = useTryLock; } + + public boolean isAllowLocalTransactions() { + return allowLocalTransactions; + } + + public void setAllowLocalTransactions(boolean allowLocalTransactions) { + this.allowLocalTransactions = allowLocalTransactions; + } } diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnectionFactory.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnectionFactory.java index a3541a6ec4..cc38b67ff7 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnectionFactory.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAManagedConnectionFactory.java @@ -600,10 +600,19 @@ public final class ActiveMQRAManagedConnectionFactory implements ManagedConnecti return mcfProperties.isHA(); } + public void setAllowLocalTransactions(Boolean allowLocalTransactions) { + mcfProperties.setAllowLocalTransactions(allowLocalTransactions); + } + + public Boolean isAllowLocalTransactions() { + return mcfProperties.isAllowLocalTransactions(); + } + public void setHA(Boolean ha) { mcfProperties.setHA(ha); } + /** * Get the useTryLock. * diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAProperties.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAProperties.java index 95a58d3222..dffd5b3afb 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAProperties.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRAProperties.java @@ -37,6 +37,7 @@ public class ActiveMQRAProperties extends ConnectionFactoryProperties implements * Trace enabled */ private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled(); + protected boolean allowLocalTransactions; /** * The user name @@ -281,4 +282,11 @@ public class ActiveMQRAProperties extends ConnectionFactoryProperties implements this.jgroupsChannelRefName = jgroupsChannelRefName; } + public boolean isAllowLocalTransactions() { + return allowLocalTransactions; + } + + public void setAllowLocalTransactions(boolean allowLocalTransactions) { + this.allowLocalTransactions = allowLocalTransactions; + } } diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRASessionFactoryImpl.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRASessionFactoryImpl.java index 15746ddb9b..12d6ae28ea 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRASessionFactoryImpl.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/ActiveMQRASessionFactoryImpl.java @@ -120,6 +120,8 @@ public final class ActiveMQRASessionFactoryImpl extends ActiveMQConnectionForCon */ private final Set tempTopics = new HashSet<>(); + private boolean allowLocalTransaction; + /** * Constructor * @@ -829,24 +831,32 @@ public final class ActiveMQRASessionFactoryImpl extends ActiveMQConnectionForCon //In the Java EE web or EJB container, when there is no active JTA transaction in progress // The argument {@code transacted} is ignored. - //The session will always be non-transacted, - transacted = false; - switch (acknowledgeMode) { - //using one of the two acknowledgement modes AUTO_ACKNOWLEDGE and DUPS_OK_ACKNOWLEDGE. - case Session.AUTO_ACKNOWLEDGE: - case Session.DUPS_OK_ACKNOWLEDGE: - //plus our own - case ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE: - case ActiveMQJMSConstants.PRE_ACKNOWLEDGE: - break; - //The value {@code Session.CLIENT_ACKNOWLEDGE} may not be used. - case Session.CLIENT_ACKNOWLEDGE: - throw ActiveMQRABundle.BUNDLE.invalidClientAcknowledgeModeRuntime(); - //same with this although the spec doesn't explicitly say - case Session.SESSION_TRANSACTED: - throw ActiveMQRABundle.BUNDLE.invalidSessionTransactedModeRuntime(); - default: - throw ActiveMQRABundle.BUNDLE.invalidAcknowledgeMode(acknowledgeMode); + //The session will always be non-transacted, unless allow-local-transactions is true + if (transacted && mcf.isAllowLocalTransactions()) { + acknowledgeMode = Session.SESSION_TRANSACTED; + } else { + transacted = false; + switch (acknowledgeMode) { + //using one of the two acknowledgement modes AUTO_ACKNOWLEDGE and DUPS_OK_ACKNOWLEDGE. + case Session.AUTO_ACKNOWLEDGE: + case Session.DUPS_OK_ACKNOWLEDGE: + //plus our own + case ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE: + case ActiveMQJMSConstants.PRE_ACKNOWLEDGE: + break; + //The value {@code Session.CLIENT_ACKNOWLEDGE} may not be used. + case Session.CLIENT_ACKNOWLEDGE: + throw ActiveMQRABundle.BUNDLE.invalidClientAcknowledgeModeRuntime(); + //same with this although the spec doesn't explicitly say + case Session.SESSION_TRANSACTED: + if (!mcf.isAllowLocalTransactions()) { + throw ActiveMQRABundle.BUNDLE.invalidSessionTransactedModeRuntimeAllowLocal(); + } + transacted = true; + break; + default: + throw ActiveMQRABundle.BUNDLE.invalidAcknowledgeMode(acknowledgeMode); + } } } diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivationSpec.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivationSpec.java index 35abba9e75..959df8c7c3 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivationSpec.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQActivationSpec.java @@ -53,6 +53,7 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen public String strConnectorClassName; public String strConnectionParameters; + protected boolean allowLocalTransactions; /** * The resource adapter @@ -818,6 +819,14 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen public void setMaxMessages(final Integer value) { } + public boolean isAllowLocalTransactions() { + return allowLocalTransactions; + } + + public void setAllowLocalTransactions(boolean allowLocalTransactions) { + this.allowLocalTransactions = allowLocalTransactions; + } + @Override public boolean equals(Object o) { if (this == o) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/OutgoingConnectionTestNoJTA.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/OutgoingConnectionTestNoJTA.java new file mode 100644 index 0000000000..7b2ba8dfbb --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/OutgoingConnectionTestNoJTA.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.ra; + +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; +import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory; +import org.apache.activemq.artemis.core.security.Role; +import org.apache.activemq.artemis.ra.ActiveMQRAConnectionFactory; +import org.apache.activemq.artemis.ra.ActiveMQRAConnectionFactoryImpl; +import org.apache.activemq.artemis.ra.ActiveMQRAConnectionManager; +import org.apache.activemq.artemis.ra.ActiveMQRAManagedConnectionFactory; +import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter; +import org.apache.activemq.artemis.service.extensions.ServiceUtils; +import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.jms.Connection; +import javax.jms.JMSContext; +import javax.jms.JMSProducer; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.HashSet; +import java.util.Set; + +public class OutgoingConnectionTestNoJTA extends ActiveMQRATestBase { + + protected ActiveMQResourceAdapter resourceAdapter; + protected ActiveMQRAConnectionFactory qraConnectionFactory; + protected ActiveMQRAManagedConnectionFactory mcf; + ActiveMQRAConnectionManager qraConnectionManager = new ActiveMQRAConnectionManager(); + + @Override + public boolean useSecurity() { + return true; + } + + @Override + @Before + public void setUp() throws Exception { + useDummyTransactionManager(); + super.setUp(); + ((ActiveMQJAASSecurityManager) server.getSecurityManager()).getConfiguration().addUser("testuser", "testpassword"); + ((ActiveMQJAASSecurityManager) server.getSecurityManager()).getConfiguration().addUser("guest", "guest"); + ((ActiveMQJAASSecurityManager) server.getSecurityManager()).getConfiguration().setDefaultUser("guest"); + ((ActiveMQJAASSecurityManager) server.getSecurityManager()).getConfiguration().addRole("testuser", "arole"); + ((ActiveMQJAASSecurityManager) server.getSecurityManager()).getConfiguration().addRole("guest", "arole"); + Role role = new Role("arole", true, true, true, true, true, true, true, true); + Set roles = new HashSet<>(); + roles.add(role); + server.getSecurityRepository().addMatch(MDBQUEUEPREFIXED, roles); + + resourceAdapter = new ActiveMQResourceAdapter(); + resourceAdapter.setEntries("[\"java://jmsXA\"]"); + resourceAdapter.setConnectorClassName(InVMConnectorFactory.class.getName()); + MyBootstrapContext ctx = new MyBootstrapContext(); + resourceAdapter.start(ctx); + mcf = new ActiveMQRAManagedConnectionFactory(); + mcf.setAllowLocalTransactions(true); + mcf.setResourceAdapter(resourceAdapter); + qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager); + } + + @Override + @After + public void tearDown() throws Exception { + ((DummyTransactionManager) ServiceUtils.getTransactionManager()).tx = null; + if (resourceAdapter != null) { + resourceAdapter.stop(); + } + + qraConnectionManager.stop(); + super.tearDown(); + } + + @Test + public void testSimpleMessageSendAndReceiveSessionTransacted() throws Exception { + setupDLQ(10); + resourceAdapter = newResourceAdapter(); + MyBootstrapContext ctx = new MyBootstrapContext(); + resourceAdapter.start(ctx); + ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory(); + mcf.setAllowLocalTransactions(true); + mcf.setResourceAdapter(resourceAdapter); + ActiveMQRAConnectionFactory qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager); + QueueConnection queueConnection = qraConnectionFactory.createQueueConnection(); + Session s = queueConnection.createSession(true, Session.SESSION_TRANSACTED); + Queue q = ActiveMQJMSClient.createQueue(MDBQUEUE); + MessageProducer mp = s.createProducer(q); + MessageConsumer consumer = s.createConsumer(q); + Message message = s.createTextMessage("test"); + mp.send(message); + s.commit(); + queueConnection.start(); + TextMessage textMessage = (TextMessage) consumer.receive(1000); + assertNotNull(textMessage); + assertEquals(textMessage.getText(), "test"); + s.rollback(); + textMessage = (TextMessage) consumer.receive(1000); + assertNotNull(textMessage); + assertEquals(textMessage.getText(), "test"); + s.commit(); + textMessage = (TextMessage) consumer.receiveNoWait(); + assertNull(textMessage); + } + + @Test + public void testSimpleMessageSendAndReceiveNotTransacted() throws Exception { + setupDLQ(10); + resourceAdapter = newResourceAdapter(); + MyBootstrapContext ctx = new MyBootstrapContext(); + resourceAdapter.start(ctx); + ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory(); + mcf.setAllowLocalTransactions(true); + mcf.setResourceAdapter(resourceAdapter); + ActiveMQRAConnectionFactory qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager); + QueueConnection queueConnection = qraConnectionFactory.createQueueConnection(); + Session s = queueConnection.createSession(false, Session.SESSION_TRANSACTED); + Queue q = ActiveMQJMSClient.createQueue(MDBQUEUE); + MessageProducer mp = s.createProducer(q); + MessageConsumer consumer = s.createConsumer(q); + Message message = s.createTextMessage("test"); + mp.send(message); + s.commit(); + queueConnection.start(); + TextMessage textMessage = (TextMessage) consumer.receive(1000); + assertNotNull(textMessage); + assertEquals(textMessage.getText(), "test"); + s.rollback(); + textMessage = (TextMessage) consumer.receive(1000); + assertNotNull(textMessage); + assertEquals(textMessage.getText(), "test"); + s.commit(); + textMessage = (TextMessage) consumer.receiveNoWait(); + assertNull(textMessage); + } + + @Test + public void testSimpleMessageSendAndReceiveSessionTransacted2() throws Exception { + setupDLQ(10); + resourceAdapter = newResourceAdapter(); + MyBootstrapContext ctx = new MyBootstrapContext(); + resourceAdapter.start(ctx); + ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory(); + mcf.setAllowLocalTransactions(true); + mcf.setResourceAdapter(resourceAdapter); + ActiveMQRAConnectionFactory qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager); + QueueConnection queueConnection = qraConnectionFactory.createQueueConnection(); + Session s = queueConnection.createSession(Session.SESSION_TRANSACTED); + Queue q = ActiveMQJMSClient.createQueue(MDBQUEUE); + MessageProducer mp = s.createProducer(q); + MessageConsumer consumer = s.createConsumer(q); + Message message = s.createTextMessage("test"); + mp.send(message); + s.commit(); + queueConnection.start(); + TextMessage textMessage = (TextMessage) consumer.receive(1000); + assertNotNull(textMessage); + assertEquals(textMessage.getText(), "test"); + s.rollback(); + textMessage = (TextMessage) consumer.receive(1000); + assertNotNull(textMessage); + assertEquals(textMessage.getText(), "test"); + s.commit(); + textMessage = (TextMessage) consumer.receiveNoWait(); + assertNull(textMessage); + } + + @Test + public void sessionTransactedTestNoActiveJTATx() throws Exception { + JMSContext context = qraConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED); + assertEquals(context.getSessionMode(), JMSContext.AUTO_ACKNOWLEDGE); + } + + + @Test + public void testQueuSessionAckMode() throws Exception { + + QueueConnection queueConnection = qraConnectionFactory.createQueueConnection(); + + Session s = queueConnection.createSession(false, Session.SESSION_TRANSACTED); + + s.close(); + } + + + + @Test + public void testSimpleSendNoXAJMSContext() throws Exception { + Queue q = ActiveMQJMSClient.createQueue(MDBQUEUE); + + try (ClientSessionFactory sf = locator.createSessionFactory(); + ClientSession session = sf.createSession(); + ClientConsumer consVerify = session.createConsumer("jms.queue." + MDBQUEUE); + JMSContext jmsctx = qraConnectionFactory.createContext(); + ) { + session.start(); + // These next 4 lines could be written in a single line however it makes difficult for debugging + JMSProducer producer = jmsctx.createProducer(); + producer.setProperty("strvalue", "hello"); + TextMessage msgsend = jmsctx.createTextMessage("hello"); + producer.send(q, msgsend); + + ClientMessage msg = consVerify.receive(1000); + assertNotNull(msg); + assertEquals("hello", msg.getStringProperty("strvalue")); + } + } + + @Test + public void testSimpleMessageSendAndReceive() throws Exception { + QueueConnection queueConnection = qraConnectionFactory.createQueueConnection(); + Session s = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue q = ActiveMQJMSClient.createQueue(MDBQUEUE); + MessageProducer mp = s.createProducer(q); + MessageConsumer consumer = s.createConsumer(q); + Message message = s.createTextMessage("test"); + mp.send(message); + queueConnection.start(); + TextMessage textMessage = (TextMessage) consumer.receive(1000); + assertNotNull(textMessage); + assertEquals(textMessage.getText(), "test"); + } + + @Test + public void testSimpleSendNoXAJMS1() throws Exception { + Queue q = ActiveMQJMSClient.createQueue(MDBQUEUE); + try (ClientSessionFactory sf = locator.createSessionFactory(); + ClientSession session = sf.createSession(); + ClientConsumer consVerify = session.createConsumer("jms.queue." + MDBQUEUE); + Connection conn = qraConnectionFactory.createConnection(); + ) { + Session jmsSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.start(); + MessageProducer producer = jmsSess.createProducer(q); + // These next 4 lines could be written in a single line however it makes difficult for debugging + TextMessage msgsend = jmsSess.createTextMessage("hello"); + msgsend.setStringProperty("strvalue", "hello"); + producer.send(msgsend); + + ClientMessage msg = consVerify.receive(1000); + assertNotNull(msg); + assertEquals("hello", msg.getStringProperty("strvalue")); + } + } +}