From 09ae41adec4b538ea372e00e95e47df06e669efe Mon Sep 17 00:00:00 2001 From: Matt Pavlovich Date: Wed, 30 Aug 2023 10:54:16 -0500 Subject: [PATCH] [AMQ-8325] JMS 2.0 XA supported operations --- .../org/apache/activemq/ActiveMQContext.java | 5 +- .../activemq/ActiveMQXAConnectionFactory.java | 15 +++- .../apache/activemq/ActiveMQXAContext.java | 42 +++++++++++ .../ActiveMQXASslConnectionFactory.java | 13 +++- .../jms2/ActiveMQJMS2ConnectionTest.java | 12 --- .../activemq/jms2/ActiveMQJMS2TestBase.java | 75 ++++++++++++++++++- .../jms2/ActiveMQJMS2XAConnectionTest.java | 75 +++++++++++++++++++ .../jms2/ActiveMQJMS2XAContextTest.java | 68 +++++++++++++++++ .../activemq/jms2/ActiveMQJMS2XATestBase.java | 56 ++++++++++++++ 9 files changed, 340 insertions(+), 21 deletions(-) create mode 100644 activemq-client/src/main/java/org/apache/activemq/ActiveMQXAContext.java create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2XAConnectionTest.java create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2XAContextTest.java create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2XATestBase.java diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQContext.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQContext.java index d7d4aa7b85..0e4295c37c 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQContext.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQContext.java @@ -62,7 +62,7 @@ public class ActiveMQContext implements JMSContext { private final ActiveMQConnection activemqConnection; private final AtomicLong connectionCounter; - private ActiveMQSession activemqSession = null; + protected ActiveMQSession activemqSession = null; // Configuration private boolean autoStart = DEFAULT_AUTO_START; @@ -526,7 +526,7 @@ public class ActiveMQContext implements JMSContext { } } - private void checkContextState() { + protected void checkContextState() { if (activemqConnection == null) { throw new JMSRuntimeException("Connection not available"); } @@ -556,5 +556,4 @@ public class ActiveMQContext implements JMSContext { } return this.activemqMessageProducer; } - } diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java index 61bf024fc6..fdfa5a6dbd 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAConnectionFactory.java @@ -30,6 +30,7 @@ import jakarta.jms.XATopicConnectionFactory; import org.apache.activemq.management.JMSStatsImpl; import org.apache.activemq.transport.Transport; +import org.apache.activemq.util.JMSExceptionSupport; /** * A factory of {@link XAConnection} instances @@ -80,15 +81,23 @@ public class ActiveMQXAConnectionFactory extends ActiveMQConnectionFactory imple public XATopicConnection createXATopicConnection(String userName, String password) throws JMSException { return (XATopicConnection) createActiveMQConnection(userName, password); } - + @Override public XAJMSContext createXAContext() { - throw new UnsupportedOperationException("createXAContext() is not supported"); + try { + return new ActiveMQXAContext((ActiveMQXAConnection)createXAConnection()); + } catch (JMSException e) { + throw JMSExceptionSupport.convertToJMSRuntimeException(e); + } } @Override public XAJMSContext createXAContext(String userName, String password) { - throw new UnsupportedOperationException("createXAContext(userName, password) is not supported"); + try { + return new ActiveMQXAContext((ActiveMQXAConnection)createXAConnection(userName, password)); + } catch (JMSException e) { + throw JMSExceptionSupport.convertToJMSRuntimeException(e); + } } protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception { diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAContext.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAContext.java new file mode 100644 index 0000000000..5095ffbc51 --- /dev/null +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQXAContext.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.transaction.xa.XAResource; + +import jakarta.jms.JMSContext; +import jakarta.jms.XAJMSContext; +import jakarta.jms.XASession; + +public class ActiveMQXAContext extends ActiveMQContext implements XAJMSContext { + + ActiveMQXAContext(ActiveMQXAConnection activemqXAConnection) { + super(activemqXAConnection); + } + + @Override + public JMSContext getContext() { + return this; + } + + @Override + public XAResource getXAResource() { + checkContextState(); + return ((XASession) activemqSession).getXAResource(); + } + +} diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQXASslConnectionFactory.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQXASslConnectionFactory.java index e399faaf20..bd6b27a149 100644 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQXASslConnectionFactory.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQXASslConnectionFactory.java @@ -30,6 +30,7 @@ import jakarta.jms.XATopicConnectionFactory; import org.apache.activemq.management.JMSStatsImpl; import org.apache.activemq.transport.Transport; +import org.apache.activemq.util.JMSExceptionSupport; public class ActiveMQXASslConnectionFactory extends ActiveMQSslConnectionFactory implements XAConnectionFactory, XAQueueConnectionFactory, XATopicConnectionFactory { @@ -76,12 +77,20 @@ public class ActiveMQXASslConnectionFactory extends ActiveMQSslConnectionFactory @Override public XAJMSContext createXAContext() { - throw new UnsupportedOperationException("createXAContext() is not supported"); + try { + return new ActiveMQXAContext((ActiveMQXAConnection)createXAConnection()); + } catch (JMSException e) { + throw JMSExceptionSupport.convertToJMSRuntimeException(e); + } } @Override public XAJMSContext createXAContext(String userName, String password) { - throw new UnsupportedOperationException("createXAContext(userName, password) is not supported"); + try { + return new ActiveMQXAContext((ActiveMQXAConnection)createXAConnection(userName, password)); + } catch (JMSException e) { + throw JMSExceptionSupport.convertToJMSRuntimeException(e); + } } @Override diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ConnectionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ConnectionTest.java index 6f1eef6b77..3755a06976 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ConnectionTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2ConnectionTest.java @@ -50,16 +50,4 @@ public class ActiveMQJMS2ConnectionTest extends ActiveMQJMS2TestBase { verifySession(connection.createSession(Session.SESSION_TRANSACTED), Session.SESSION_TRANSACTED); } - private void verifySession(Session session, int acknowledgeMode) throws JMSException { - try { - assertNotNull(session); - assertEquals(acknowledgeMode, session.getAcknowledgeMode()); - assertEquals(acknowledgeMode == Session.SESSION_TRANSACTED, session.getTransacted()); - } finally { - if (session != null) { - session.close(); - } - } - } - } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2TestBase.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2TestBase.java index f0d85cb00c..596c39233d 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2TestBase.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2TestBase.java @@ -16,12 +16,27 @@ */ package org.apache.activemq.jms2; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + import java.lang.management.ManagementFactory; +import java.util.Enumeration; import java.util.LinkedList; import java.util.List; import jakarta.jms.Connection; +import jakarta.jms.Destination; +import jakarta.jms.JMSConsumer; +import jakarta.jms.JMSContext; +import jakarta.jms.JMSException; +import jakarta.jms.JMSProducer; import jakarta.jms.MessageProducer; +import jakarta.jms.Queue; +import jakarta.jms.QueueBrowser; import jakarta.jms.Session; +import jakarta.jms.TextMessage; +import jakarta.jms.Topic; + import javax.management.JMX; import javax.management.MBeanServer; import org.apache.activemq.ActiveMQConnectionFactory; @@ -124,7 +139,19 @@ public abstract class ActiveMQJMS2TestBase { return JMX.newMBeanProxy(mbeanServer, BrokerMBeanSupport.createDestinationName(BrokerMBeanSupport.createBrokerObjectName(DEFAULT_JMX_DOMAIN_NAME, DEFAULT_JMX_BROKER_NAME).toString(), destination), TopicViewMBean.class); } - private static String cleanParameterizedMethodName(String methodName) { + protected void verifySession(Session session, int acknowledgeMode) throws JMSException { + try { + assertNotNull(session); + assertEquals(acknowledgeMode, session.getAcknowledgeMode()); + assertEquals(acknowledgeMode == Session.SESSION_TRANSACTED, session.getTransacted()); + } finally { + if (session != null) { + session.close(); + } + } + } + + protected static String cleanParameterizedMethodName(String methodName) { // clean up parameterized method string: TESTMESSAGETIMESTAMPTIMETOLIVE[DESTINATIONTYPE=QUEUE, MESSAGETYPE=BYTES] // returns: TESTMESSAGETIMESTAMPTIMETOLIVE.QUEUE.BYTES @@ -138,4 +165,50 @@ public abstract class ActiveMQJMS2TestBase { return step1[0] + "." + step3[0].split("=", 2)[1] + "." + step3[1].split("=", 2)[1]; } + + protected static void sendMessage(JMSContext jmsContext, Destination testDestination, String textBody) { + assertNotNull(jmsContext); + JMSProducer jmsProducer = jmsContext.createProducer(); + jmsProducer.send(testDestination, textBody); + } + + protected static void browseMessage(JMSContext jmsContext, Destination testDestination, String expectedTextBody, boolean expectFound) throws JMSException { + assertNotNull(jmsContext); + assertTrue(Queue.class.isAssignableFrom(testDestination.getClass())); + Queue testQueue = Queue.class.cast(testDestination); + try(QueueBrowser queueBrowser = jmsContext.createBrowser(testQueue)) { + Enumeration messageEnumeration = queueBrowser.getEnumeration(); + assertNotNull(messageEnumeration); + + boolean found = false; + while(!found && messageEnumeration.hasMoreElements()) { + jakarta.jms.Message message = (jakarta.jms.Message)messageEnumeration.nextElement(); + assertNotNull(message); + assertTrue(TextMessage.class.isAssignableFrom(message.getClass())); + assertEquals(expectedTextBody, TextMessage.class.cast(message).getText()); + found = true; + } + assertEquals(expectFound, found); + } + } + + protected static void recvMessage(JMSContext jmsContext, Destination testDestination, String expectedTextBody) throws JMSException { + assertNotNull(jmsContext); + try(JMSConsumer jmsConsumer = jmsContext.createConsumer(testDestination)) { + jakarta.jms.Message message = jmsConsumer.receive(1000l); + assertNotNull(message); + assertTrue(TextMessage.class.isAssignableFrom(message.getClass())); + assertEquals(expectedTextBody, TextMessage.class.cast(message).getText()); + } + } + + protected static void recvMessageDurable(JMSContext jmsContext, Topic testTopic, String subscriptionName, String selector, boolean noLocal, String expectedTextBody) throws JMSException { + assertNotNull(jmsContext); + try(JMSConsumer jmsConsumer = jmsContext.createDurableConsumer(testTopic, subscriptionName, selector, noLocal)) { + jakarta.jms.Message message = jmsConsumer.receive(1000l); + assertNotNull(message); + assertTrue(TextMessage.class.isAssignableFrom(message.getClass())); + assertEquals(expectedTextBody, TextMessage.class.cast(message).getText()); + } + } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2XAConnectionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2XAConnectionTest.java new file mode 100644 index 0000000000..e16ba7c216 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2XAConnectionTest.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.jms2; + +import org.junit.Test; +import jakarta.jms.Session; + +public class ActiveMQJMS2XAConnectionTest extends ActiveMQJMS2XATestBase { + + + // XA connection always creates SESSION_TRANSACTED + @Test + public void testCreateSession() throws Exception { + verifySession(connection.createSession(), Session.SESSION_TRANSACTED); + } + + @Test + public void testCreateSessionAckModeAuto() throws Exception { + verifySession(connection.createSession(Session.AUTO_ACKNOWLEDGE), Session.SESSION_TRANSACTED); + } + + @Test + public void testCreateSessionAckModeClient() throws Exception { + verifySession(connection.createSession(Session.CLIENT_ACKNOWLEDGE), Session.SESSION_TRANSACTED); + } + + @Test + public void testCreateSessionAckModeDups() throws Exception { + verifySession(connection.createSession(Session.DUPS_OK_ACKNOWLEDGE), Session.SESSION_TRANSACTED); + } + + @Test + public void testCreateSessionAckModeTrans() throws Exception { + verifySession(connection.createSession(Session.SESSION_TRANSACTED), Session.SESSION_TRANSACTED); + } + + @Test + public void testCreateXASession() throws Exception { + verifySession(xaConnection.createSession(), Session.SESSION_TRANSACTED); + } + + @Test + public void testCreateXASessionAckModeAuto() throws Exception { + verifySession(xaConnection.createSession(Session.AUTO_ACKNOWLEDGE), Session.SESSION_TRANSACTED); + } + + @Test + public void testCreateXASessionAckModeClient() throws Exception { + verifySession(xaConnection.createSession(Session.CLIENT_ACKNOWLEDGE), Session.SESSION_TRANSACTED); + } + + @Test + public void testCreateXASessionAckModeDups() throws Exception { + verifySession(xaConnection.createSession(Session.DUPS_OK_ACKNOWLEDGE), Session.SESSION_TRANSACTED); + } + + @Test + public void testCreateXASessionAckModeTrans() throws Exception { + verifySession(xaConnection.createSession(Session.SESSION_TRANSACTED), Session.SESSION_TRANSACTED); + } +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2XAContextTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2XAContextTest.java new file mode 100644 index 0000000000..239d2117bd --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2XAContextTest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.jms2; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import jakarta.jms.Destination; +import jakarta.jms.JMSContext; +import jakarta.jms.JMSException; +import jakarta.jms.Session; +import org.apache.activemq.ActiveMQXAContext; +import org.junit.Before; +import org.junit.Test; + +public class ActiveMQJMS2XAContextTest extends ActiveMQJMS2XATestBase { + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + // [AMQ-8325] We override ackMode for unit tests. Actual XA features is tested elsewhere + activemqXAConnectionFactory.setXaAckMode(Session.AUTO_ACKNOWLEDGE); + } + + @Test + public void testConnectionFactoryCreateXAContext() { + try(JMSContext jmsContext = activemqXAConnectionFactory.createXAContext()) { + assertNotNull(jmsContext); + jmsContext.start(); + assertTrue(ActiveMQXAContext.class.isAssignableFrom(jmsContext.getClass())); + Destination destination = jmsContext.createQueue(methodNameDestinationName); + sendMessage(jmsContext, destination, "Test-" + methodNameDestinationName); + recvMessage(jmsContext, destination, "Test-" + methodNameDestinationName); + } catch (JMSException e) { + fail(e.getMessage()); + } + } + + @Test + public void testConnectionFactoryCreateContextUserPass() { + try(JMSContext jmsContext = activemqXAConnectionFactory.createXAContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS)) { + assertNotNull(jmsContext); + jmsContext.start(); + assertTrue(ActiveMQXAContext.class.isAssignableFrom(jmsContext.getClass())); + Destination destination = jmsContext.createQueue(methodNameDestinationName); + sendMessage(jmsContext, destination, "Test-" + methodNameDestinationName); + recvMessage(jmsContext, destination, "Test-" + methodNameDestinationName); + } catch (JMSException e) { + fail(e.getMessage()); + } + } +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2XATestBase.java b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2XATestBase.java new file mode 100644 index 0000000000..df083a4eb4 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/jms2/ActiveMQJMS2XATestBase.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.jms2; + +import java.lang.management.ManagementFactory; +import jakarta.jms.Session; +import jakarta.jms.XAConnection; + +import org.apache.activemq.ActiveMQXAConnectionFactory; +import org.junit.After; +import org.junit.Before; + +public abstract class ActiveMQJMS2XATestBase extends ActiveMQJMS2TestBase { + + protected ActiveMQXAConnectionFactory activemqXAConnectionFactory = null; + protected XAConnection xaConnection = null; + + @Before + @Override + public void setUp() throws Exception { + activemqXAConnectionFactory = new ActiveMQXAConnectionFactory("vm://localhost?marshal=false&broker.persistent=false"); + xaConnection = activemqXAConnectionFactory.createXAConnection(); + + // [AMQ-8325] Test using standard JMS connection with XAConnectionFactory + activemqConnectionFactory = activemqXAConnectionFactory; + connection = activemqConnectionFactory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + methodNameDestinationName = "AMQ.JMS2." + cleanParameterizedMethodName(testName.getMethodName().toUpperCase()); + messageProducer = session.createProducer(session.createQueue(methodNameDestinationName)); + mbeanServer = ManagementFactory.getPlatformMBeanServer(); + } + + @After + @Override + public void tearDown() { + super.tearDown(); + if(xaConnection != null) { + try { xaConnection.close(); } catch (Exception e) { } finally { xaConnection = null; } + } + } +}