[AMQ-8325] JMS 2.0 XA supported operations

This commit is contained in:
Matt Pavlovich 2023-08-30 10:54:16 -05:00
parent 9f374dfcb5
commit 09ae41adec
9 changed files with 340 additions and 21 deletions

View File

@ -62,7 +62,7 @@ public class ActiveMQContext implements JMSContext {
private final ActiveMQConnection activemqConnection; private final ActiveMQConnection activemqConnection;
private final AtomicLong connectionCounter; private final AtomicLong connectionCounter;
private ActiveMQSession activemqSession = null; protected ActiveMQSession activemqSession = null;
// Configuration // Configuration
private boolean autoStart = DEFAULT_AUTO_START; private boolean autoStart = DEFAULT_AUTO_START;
@ -526,7 +526,7 @@ public class ActiveMQContext implements JMSContext {
} }
} }
private void checkContextState() { protected void checkContextState() {
if (activemqConnection == null) { if (activemqConnection == null) {
throw new JMSRuntimeException("Connection not available"); throw new JMSRuntimeException("Connection not available");
} }
@ -556,5 +556,4 @@ public class ActiveMQContext implements JMSContext {
} }
return this.activemqMessageProducer; return this.activemqMessageProducer;
} }
} }

View File

@ -30,6 +30,7 @@ import jakarta.jms.XATopicConnectionFactory;
import org.apache.activemq.management.JMSStatsImpl; import org.apache.activemq.management.JMSStatsImpl;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.JMSExceptionSupport;
/** /**
* A factory of {@link XAConnection} instances * A factory of {@link XAConnection} instances
@ -80,15 +81,23 @@ public class ActiveMQXAConnectionFactory extends ActiveMQConnectionFactory imple
public XATopicConnection createXATopicConnection(String userName, String password) throws JMSException { public XATopicConnection createXATopicConnection(String userName, String password) throws JMSException {
return (XATopicConnection) createActiveMQConnection(userName, password); return (XATopicConnection) createActiveMQConnection(userName, password);
} }
@Override @Override
public XAJMSContext createXAContext() { public XAJMSContext createXAContext() {
throw new UnsupportedOperationException("createXAContext() is not supported"); try {
return new ActiveMQXAContext((ActiveMQXAConnection)createXAConnection());
} catch (JMSException e) {
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
}
} }
@Override @Override
public XAJMSContext createXAContext(String userName, String password) { public XAJMSContext createXAContext(String userName, String password) {
throw new UnsupportedOperationException("createXAContext(userName, password) is not supported"); try {
return new ActiveMQXAContext((ActiveMQXAConnection)createXAConnection(userName, password));
} catch (JMSException e) {
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
}
} }
protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception { protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import javax.transaction.xa.XAResource;
import jakarta.jms.JMSContext;
import jakarta.jms.XAJMSContext;
import jakarta.jms.XASession;
public class ActiveMQXAContext extends ActiveMQContext implements XAJMSContext {
ActiveMQXAContext(ActiveMQXAConnection activemqXAConnection) {
super(activemqXAConnection);
}
@Override
public JMSContext getContext() {
return this;
}
@Override
public XAResource getXAResource() {
checkContextState();
return ((XASession) activemqSession).getXAResource();
}
}

View File

@ -30,6 +30,7 @@ import jakarta.jms.XATopicConnectionFactory;
import org.apache.activemq.management.JMSStatsImpl; import org.apache.activemq.management.JMSStatsImpl;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.JMSExceptionSupport;
public class ActiveMQXASslConnectionFactory extends ActiveMQSslConnectionFactory implements XAConnectionFactory, XAQueueConnectionFactory, XATopicConnectionFactory { public class ActiveMQXASslConnectionFactory extends ActiveMQSslConnectionFactory implements XAConnectionFactory, XAQueueConnectionFactory, XATopicConnectionFactory {
@ -76,12 +77,20 @@ public class ActiveMQXASslConnectionFactory extends ActiveMQSslConnectionFactory
@Override @Override
public XAJMSContext createXAContext() { public XAJMSContext createXAContext() {
throw new UnsupportedOperationException("createXAContext() is not supported"); try {
return new ActiveMQXAContext((ActiveMQXAConnection)createXAConnection());
} catch (JMSException e) {
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
}
} }
@Override @Override
public XAJMSContext createXAContext(String userName, String password) { public XAJMSContext createXAContext(String userName, String password) {
throw new UnsupportedOperationException("createXAContext(userName, password) is not supported"); try {
return new ActiveMQXAContext((ActiveMQXAConnection)createXAConnection(userName, password));
} catch (JMSException e) {
throw JMSExceptionSupport.convertToJMSRuntimeException(e);
}
} }
@Override @Override

View File

@ -50,16 +50,4 @@ public class ActiveMQJMS2ConnectionTest extends ActiveMQJMS2TestBase {
verifySession(connection.createSession(Session.SESSION_TRANSACTED), Session.SESSION_TRANSACTED); verifySession(connection.createSession(Session.SESSION_TRANSACTED), Session.SESSION_TRANSACTED);
} }
private void verifySession(Session session, int acknowledgeMode) throws JMSException {
try {
assertNotNull(session);
assertEquals(acknowledgeMode, session.getAcknowledgeMode());
assertEquals(acknowledgeMode == Session.SESSION_TRANSACTED, session.getTransacted());
} finally {
if (session != null) {
session.close();
}
}
}
} }

View File

@ -16,12 +16,27 @@
*/ */
package org.apache.activemq.jms2; package org.apache.activemq.jms2;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.util.Enumeration;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import jakarta.jms.Connection; import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSConsumer;
import jakarta.jms.JMSContext;
import jakarta.jms.JMSException;
import jakarta.jms.JMSProducer;
import jakarta.jms.MessageProducer; import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.QueueBrowser;
import jakarta.jms.Session; import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import javax.management.JMX; import javax.management.JMX;
import javax.management.MBeanServer; import javax.management.MBeanServer;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
@ -124,7 +139,19 @@ public abstract class ActiveMQJMS2TestBase {
return JMX.newMBeanProxy(mbeanServer, BrokerMBeanSupport.createDestinationName(BrokerMBeanSupport.createBrokerObjectName(DEFAULT_JMX_DOMAIN_NAME, DEFAULT_JMX_BROKER_NAME).toString(), destination), TopicViewMBean.class); return JMX.newMBeanProxy(mbeanServer, BrokerMBeanSupport.createDestinationName(BrokerMBeanSupport.createBrokerObjectName(DEFAULT_JMX_DOMAIN_NAME, DEFAULT_JMX_BROKER_NAME).toString(), destination), TopicViewMBean.class);
} }
private static String cleanParameterizedMethodName(String methodName) { protected void verifySession(Session session, int acknowledgeMode) throws JMSException {
try {
assertNotNull(session);
assertEquals(acknowledgeMode, session.getAcknowledgeMode());
assertEquals(acknowledgeMode == Session.SESSION_TRANSACTED, session.getTransacted());
} finally {
if (session != null) {
session.close();
}
}
}
protected static String cleanParameterizedMethodName(String methodName) {
// clean up parameterized method string: TESTMESSAGETIMESTAMPTIMETOLIVE[DESTINATIONTYPE=QUEUE, MESSAGETYPE=BYTES] // clean up parameterized method string: TESTMESSAGETIMESTAMPTIMETOLIVE[DESTINATIONTYPE=QUEUE, MESSAGETYPE=BYTES]
// returns: TESTMESSAGETIMESTAMPTIMETOLIVE.QUEUE.BYTES // returns: TESTMESSAGETIMESTAMPTIMETOLIVE.QUEUE.BYTES
@ -138,4 +165,50 @@ public abstract class ActiveMQJMS2TestBase {
return step1[0] + "." + step3[0].split("=", 2)[1] + "." + step3[1].split("=", 2)[1]; return step1[0] + "." + step3[0].split("=", 2)[1] + "." + step3[1].split("=", 2)[1];
} }
protected static void sendMessage(JMSContext jmsContext, Destination testDestination, String textBody) {
assertNotNull(jmsContext);
JMSProducer jmsProducer = jmsContext.createProducer();
jmsProducer.send(testDestination, textBody);
}
protected static void browseMessage(JMSContext jmsContext, Destination testDestination, String expectedTextBody, boolean expectFound) throws JMSException {
assertNotNull(jmsContext);
assertTrue(Queue.class.isAssignableFrom(testDestination.getClass()));
Queue testQueue = Queue.class.cast(testDestination);
try(QueueBrowser queueBrowser = jmsContext.createBrowser(testQueue)) {
Enumeration<?> messageEnumeration = queueBrowser.getEnumeration();
assertNotNull(messageEnumeration);
boolean found = false;
while(!found && messageEnumeration.hasMoreElements()) {
jakarta.jms.Message message = (jakarta.jms.Message)messageEnumeration.nextElement();
assertNotNull(message);
assertTrue(TextMessage.class.isAssignableFrom(message.getClass()));
assertEquals(expectedTextBody, TextMessage.class.cast(message).getText());
found = true;
}
assertEquals(expectFound, found);
}
}
protected static void recvMessage(JMSContext jmsContext, Destination testDestination, String expectedTextBody) throws JMSException {
assertNotNull(jmsContext);
try(JMSConsumer jmsConsumer = jmsContext.createConsumer(testDestination)) {
jakarta.jms.Message message = jmsConsumer.receive(1000l);
assertNotNull(message);
assertTrue(TextMessage.class.isAssignableFrom(message.getClass()));
assertEquals(expectedTextBody, TextMessage.class.cast(message).getText());
}
}
protected static void recvMessageDurable(JMSContext jmsContext, Topic testTopic, String subscriptionName, String selector, boolean noLocal, String expectedTextBody) throws JMSException {
assertNotNull(jmsContext);
try(JMSConsumer jmsConsumer = jmsContext.createDurableConsumer(testTopic, subscriptionName, selector, noLocal)) {
jakarta.jms.Message message = jmsConsumer.receive(1000l);
assertNotNull(message);
assertTrue(TextMessage.class.isAssignableFrom(message.getClass()));
assertEquals(expectedTextBody, TextMessage.class.cast(message).getText());
}
}
} }

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms2;
import org.junit.Test;
import jakarta.jms.Session;
public class ActiveMQJMS2XAConnectionTest extends ActiveMQJMS2XATestBase {
// XA connection always creates SESSION_TRANSACTED
@Test
public void testCreateSession() throws Exception {
verifySession(connection.createSession(), Session.SESSION_TRANSACTED);
}
@Test
public void testCreateSessionAckModeAuto() throws Exception {
verifySession(connection.createSession(Session.AUTO_ACKNOWLEDGE), Session.SESSION_TRANSACTED);
}
@Test
public void testCreateSessionAckModeClient() throws Exception {
verifySession(connection.createSession(Session.CLIENT_ACKNOWLEDGE), Session.SESSION_TRANSACTED);
}
@Test
public void testCreateSessionAckModeDups() throws Exception {
verifySession(connection.createSession(Session.DUPS_OK_ACKNOWLEDGE), Session.SESSION_TRANSACTED);
}
@Test
public void testCreateSessionAckModeTrans() throws Exception {
verifySession(connection.createSession(Session.SESSION_TRANSACTED), Session.SESSION_TRANSACTED);
}
@Test
public void testCreateXASession() throws Exception {
verifySession(xaConnection.createSession(), Session.SESSION_TRANSACTED);
}
@Test
public void testCreateXASessionAckModeAuto() throws Exception {
verifySession(xaConnection.createSession(Session.AUTO_ACKNOWLEDGE), Session.SESSION_TRANSACTED);
}
@Test
public void testCreateXASessionAckModeClient() throws Exception {
verifySession(xaConnection.createSession(Session.CLIENT_ACKNOWLEDGE), Session.SESSION_TRANSACTED);
}
@Test
public void testCreateXASessionAckModeDups() throws Exception {
verifySession(xaConnection.createSession(Session.DUPS_OK_ACKNOWLEDGE), Session.SESSION_TRANSACTED);
}
@Test
public void testCreateXASessionAckModeTrans() throws Exception {
verifySession(xaConnection.createSession(Session.SESSION_TRANSACTED), Session.SESSION_TRANSACTED);
}
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms2;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import jakarta.jms.Destination;
import jakarta.jms.JMSContext;
import jakarta.jms.JMSException;
import jakarta.jms.Session;
import org.apache.activemq.ActiveMQXAContext;
import org.junit.Before;
import org.junit.Test;
public class ActiveMQJMS2XAContextTest extends ActiveMQJMS2XATestBase {
@Before
@Override
public void setUp() throws Exception {
super.setUp();
// [AMQ-8325] We override ackMode for unit tests. Actual XA features is tested elsewhere
activemqXAConnectionFactory.setXaAckMode(Session.AUTO_ACKNOWLEDGE);
}
@Test
public void testConnectionFactoryCreateXAContext() {
try(JMSContext jmsContext = activemqXAConnectionFactory.createXAContext()) {
assertNotNull(jmsContext);
jmsContext.start();
assertTrue(ActiveMQXAContext.class.isAssignableFrom(jmsContext.getClass()));
Destination destination = jmsContext.createQueue(methodNameDestinationName);
sendMessage(jmsContext, destination, "Test-" + methodNameDestinationName);
recvMessage(jmsContext, destination, "Test-" + methodNameDestinationName);
} catch (JMSException e) {
fail(e.getMessage());
}
}
@Test
public void testConnectionFactoryCreateContextUserPass() {
try(JMSContext jmsContext = activemqXAConnectionFactory.createXAContext(DEFAULT_JMS_USER, DEFAULT_JMS_PASS)) {
assertNotNull(jmsContext);
jmsContext.start();
assertTrue(ActiveMQXAContext.class.isAssignableFrom(jmsContext.getClass()));
Destination destination = jmsContext.createQueue(methodNameDestinationName);
sendMessage(jmsContext, destination, "Test-" + methodNameDestinationName);
recvMessage(jmsContext, destination, "Test-" + methodNameDestinationName);
} catch (JMSException e) {
fail(e.getMessage());
}
}
}

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms2;
import java.lang.management.ManagementFactory;
import jakarta.jms.Session;
import jakarta.jms.XAConnection;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.junit.After;
import org.junit.Before;
public abstract class ActiveMQJMS2XATestBase extends ActiveMQJMS2TestBase {
protected ActiveMQXAConnectionFactory activemqXAConnectionFactory = null;
protected XAConnection xaConnection = null;
@Before
@Override
public void setUp() throws Exception {
activemqXAConnectionFactory = new ActiveMQXAConnectionFactory("vm://localhost?marshal=false&broker.persistent=false");
xaConnection = activemqXAConnectionFactory.createXAConnection();
// [AMQ-8325] Test using standard JMS connection with XAConnectionFactory
activemqConnectionFactory = activemqXAConnectionFactory;
connection = activemqConnectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
methodNameDestinationName = "AMQ.JMS2." + cleanParameterizedMethodName(testName.getMethodName().toUpperCase());
messageProducer = session.createProducer(session.createQueue(methodNameDestinationName));
mbeanServer = ManagementFactory.getPlatformMBeanServer();
}
@After
@Override
public void tearDown() {
super.tearDown();
if(xaConnection != null) {
try { xaConnection.close(); } catch (Exception e) { } finally { xaConnection = null; }
}
}
}