ARTEMIS-1211 - Allow local transactions when no jta in Resource Adapter
https://issues.apache.org/jira/browse/ARTEMIS-1211
This commit is contained in:
parent
487cc45425
commit
d2594a280b
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.ra;
|
package org.apache.activemq.artemis.ra;
|
||||||
|
|
||||||
|
import javax.jms.JMSException;
|
||||||
import javax.jms.JMSRuntimeException;
|
import javax.jms.JMSRuntimeException;
|
||||||
import javax.resource.NotSupportedException;
|
import javax.resource.NotSupportedException;
|
||||||
|
|
||||||
|
@ -62,4 +63,8 @@ public interface ActiveMQRABundle {
|
||||||
|
|
||||||
@Message(id = 159006, value = "Invalid Session Mode {0}", format = Message.Format.MESSAGE_FORMAT)
|
@Message(id = 159006, value = "Invalid Session Mode {0}", format = Message.Format.MESSAGE_FORMAT)
|
||||||
JMSRuntimeException invalidAcknowledgeMode(int sessionMode);
|
JMSRuntimeException invalidAcknowledgeMode(int sessionMode);
|
||||||
|
|
||||||
|
@Message(id = 159007, value = "Invalid Session Mode SESSION_TRANSACTED, to enable Local Transacted Sessions you can " +
|
||||||
|
"set the allowLocalTransactions (allow-local-transactions) on the resource adapter")
|
||||||
|
JMSException invalidSessionTransactedModeRuntimeAllowLocal();
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,6 +43,7 @@ public class ActiveMQRAMCFProperties extends ConnectionFactoryProperties impleme
|
||||||
* The topic type
|
* The topic type
|
||||||
*/
|
*/
|
||||||
private static final String TOPIC_TYPE = Topic.class.getName();
|
private static final String TOPIC_TYPE = Topic.class.getName();
|
||||||
|
protected boolean allowLocalTransactions;
|
||||||
|
|
||||||
private String strConnectorClassName;
|
private String strConnectorClassName;
|
||||||
|
|
||||||
|
@ -171,4 +172,12 @@ public class ActiveMQRAMCFProperties extends ConnectionFactoryProperties impleme
|
||||||
|
|
||||||
this.useTryLock = useTryLock;
|
this.useTryLock = useTryLock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isAllowLocalTransactions() {
|
||||||
|
return allowLocalTransactions;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAllowLocalTransactions(boolean allowLocalTransactions) {
|
||||||
|
this.allowLocalTransactions = allowLocalTransactions;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -608,10 +608,19 @@ public final class ActiveMQRAManagedConnectionFactory implements ManagedConnecti
|
||||||
return mcfProperties.isHA();
|
return mcfProperties.isHA();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setAllowLocalTransactions(Boolean allowLocalTransactions) {
|
||||||
|
mcfProperties.setAllowLocalTransactions(allowLocalTransactions);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Boolean isAllowLocalTransactions() {
|
||||||
|
return mcfProperties.isAllowLocalTransactions();
|
||||||
|
}
|
||||||
|
|
||||||
public void setHA(Boolean ha) {
|
public void setHA(Boolean ha) {
|
||||||
mcfProperties.setHA(ha);
|
mcfProperties.setHA(ha);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the useTryLock.
|
* Get the useTryLock.
|
||||||
*
|
*
|
||||||
|
|
|
@ -37,6 +37,7 @@ public class ActiveMQRAProperties extends ConnectionFactoryProperties implements
|
||||||
* Trace enabled
|
* Trace enabled
|
||||||
*/
|
*/
|
||||||
private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled();
|
private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled();
|
||||||
|
protected boolean allowLocalTransactions;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The user name
|
* The user name
|
||||||
|
@ -281,4 +282,11 @@ public class ActiveMQRAProperties extends ConnectionFactoryProperties implements
|
||||||
this.jgroupsChannelRefName = jgroupsChannelRefName;
|
this.jgroupsChannelRefName = jgroupsChannelRefName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isAllowLocalTransactions() {
|
||||||
|
return allowLocalTransactions;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAllowLocalTransactions(boolean allowLocalTransactions) {
|
||||||
|
this.allowLocalTransactions = allowLocalTransactions;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -120,6 +120,8 @@ public final class ActiveMQRASessionFactoryImpl extends ActiveMQConnectionForCon
|
||||||
*/
|
*/
|
||||||
private final Set<TemporaryTopic> tempTopics = new HashSet<>();
|
private final Set<TemporaryTopic> tempTopics = new HashSet<>();
|
||||||
|
|
||||||
|
private boolean allowLocalTransaction;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
*
|
*
|
||||||
|
@ -829,24 +831,32 @@ public final class ActiveMQRASessionFactoryImpl extends ActiveMQConnectionForCon
|
||||||
//In the Java EE web or EJB container, when there is no active JTA transaction in progress
|
//In the Java EE web or EJB container, when there is no active JTA transaction in progress
|
||||||
// The argument {@code transacted} is ignored.
|
// The argument {@code transacted} is ignored.
|
||||||
|
|
||||||
//The session will always be non-transacted,
|
//The session will always be non-transacted, unless allow-local-transactions is true
|
||||||
transacted = false;
|
if (transacted && mcf.isAllowLocalTransactions()) {
|
||||||
switch (acknowledgeMode) {
|
acknowledgeMode = Session.SESSION_TRANSACTED;
|
||||||
//using one of the two acknowledgement modes AUTO_ACKNOWLEDGE and DUPS_OK_ACKNOWLEDGE.
|
} else {
|
||||||
case Session.AUTO_ACKNOWLEDGE:
|
transacted = false;
|
||||||
case Session.DUPS_OK_ACKNOWLEDGE:
|
switch (acknowledgeMode) {
|
||||||
//plus our own
|
//using one of the two acknowledgement modes AUTO_ACKNOWLEDGE and DUPS_OK_ACKNOWLEDGE.
|
||||||
case ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE:
|
case Session.AUTO_ACKNOWLEDGE:
|
||||||
case ActiveMQJMSConstants.PRE_ACKNOWLEDGE:
|
case Session.DUPS_OK_ACKNOWLEDGE:
|
||||||
break;
|
//plus our own
|
||||||
//The value {@code Session.CLIENT_ACKNOWLEDGE} may not be used.
|
case ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE:
|
||||||
case Session.CLIENT_ACKNOWLEDGE:
|
case ActiveMQJMSConstants.PRE_ACKNOWLEDGE:
|
||||||
throw ActiveMQRABundle.BUNDLE.invalidClientAcknowledgeModeRuntime();
|
break;
|
||||||
//same with this although the spec doesn't explicitly say
|
//The value {@code Session.CLIENT_ACKNOWLEDGE} may not be used.
|
||||||
case Session.SESSION_TRANSACTED:
|
case Session.CLIENT_ACKNOWLEDGE:
|
||||||
throw ActiveMQRABundle.BUNDLE.invalidSessionTransactedModeRuntime();
|
throw ActiveMQRABundle.BUNDLE.invalidClientAcknowledgeModeRuntime();
|
||||||
default:
|
//same with this although the spec doesn't explicitly say
|
||||||
throw ActiveMQRABundle.BUNDLE.invalidAcknowledgeMode(acknowledgeMode);
|
case Session.SESSION_TRANSACTED:
|
||||||
|
if (!mcf.isAllowLocalTransactions()) {
|
||||||
|
throw ActiveMQRABundle.BUNDLE.invalidSessionTransactedModeRuntimeAllowLocal();
|
||||||
|
}
|
||||||
|
transacted = true;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw ActiveMQRABundle.BUNDLE.invalidAcknowledgeMode(acknowledgeMode);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -53,6 +53,7 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen
|
||||||
public String strConnectorClassName;
|
public String strConnectorClassName;
|
||||||
|
|
||||||
public String strConnectionParameters;
|
public String strConnectionParameters;
|
||||||
|
protected boolean allowLocalTransactions;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The resource adapter
|
* The resource adapter
|
||||||
|
@ -818,6 +819,14 @@ public class ActiveMQActivationSpec extends ConnectionFactoryProperties implemen
|
||||||
public void setMaxMessages(final Integer value) {
|
public void setMaxMessages(final Integer value) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isAllowLocalTransactions() {
|
||||||
|
return allowLocalTransactions;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAllowLocalTransactions(boolean allowLocalTransactions) {
|
||||||
|
this.allowLocalTransactions = allowLocalTransactions;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object o) {
|
public boolean equals(Object o) {
|
||||||
if (this == o)
|
if (this == o)
|
||||||
|
|
|
@ -0,0 +1,271 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.artemis.tests.integration.ra;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||||
|
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
|
||||||
|
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
|
||||||
|
import org.apache.activemq.artemis.core.security.Role;
|
||||||
|
import org.apache.activemq.artemis.ra.ActiveMQRAConnectionFactory;
|
||||||
|
import org.apache.activemq.artemis.ra.ActiveMQRAConnectionFactoryImpl;
|
||||||
|
import org.apache.activemq.artemis.ra.ActiveMQRAConnectionManager;
|
||||||
|
import org.apache.activemq.artemis.ra.ActiveMQRAManagedConnectionFactory;
|
||||||
|
import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
|
||||||
|
import org.apache.activemq.artemis.service.extensions.ServiceUtils;
|
||||||
|
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.JMSContext;
|
||||||
|
import javax.jms.JMSProducer;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Queue;
|
||||||
|
import javax.jms.QueueConnection;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
public class OutgoingConnectionTestNoJTA extends ActiveMQRATestBase {
|
||||||
|
|
||||||
|
protected ActiveMQResourceAdapter resourceAdapter;
|
||||||
|
protected ActiveMQRAConnectionFactory qraConnectionFactory;
|
||||||
|
protected ActiveMQRAManagedConnectionFactory mcf;
|
||||||
|
ActiveMQRAConnectionManager qraConnectionManager = new ActiveMQRAConnectionManager();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean useSecurity() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
useDummyTransactionManager();
|
||||||
|
super.setUp();
|
||||||
|
((ActiveMQJAASSecurityManager) server.getSecurityManager()).getConfiguration().addUser("testuser", "testpassword");
|
||||||
|
((ActiveMQJAASSecurityManager) server.getSecurityManager()).getConfiguration().addUser("guest", "guest");
|
||||||
|
((ActiveMQJAASSecurityManager) server.getSecurityManager()).getConfiguration().setDefaultUser("guest");
|
||||||
|
((ActiveMQJAASSecurityManager) server.getSecurityManager()).getConfiguration().addRole("testuser", "arole");
|
||||||
|
((ActiveMQJAASSecurityManager) server.getSecurityManager()).getConfiguration().addRole("guest", "arole");
|
||||||
|
Role role = new Role("arole", true, true, true, true, true, true, true, true);
|
||||||
|
Set<Role> roles = new HashSet<>();
|
||||||
|
roles.add(role);
|
||||||
|
server.getSecurityRepository().addMatch(MDBQUEUEPREFIXED, roles);
|
||||||
|
|
||||||
|
resourceAdapter = new ActiveMQResourceAdapter();
|
||||||
|
resourceAdapter.setEntries("[\"java://jmsXA\"]");
|
||||||
|
resourceAdapter.setConnectorClassName(InVMConnectorFactory.class.getName());
|
||||||
|
MyBootstrapContext ctx = new MyBootstrapContext();
|
||||||
|
resourceAdapter.start(ctx);
|
||||||
|
mcf = new ActiveMQRAManagedConnectionFactory();
|
||||||
|
mcf.setAllowLocalTransactions(true);
|
||||||
|
mcf.setResourceAdapter(resourceAdapter);
|
||||||
|
qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
((DummyTransactionManager) ServiceUtils.getTransactionManager()).tx = null;
|
||||||
|
if (resourceAdapter != null) {
|
||||||
|
resourceAdapter.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
qraConnectionManager.stop();
|
||||||
|
super.tearDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleMessageSendAndReceiveSessionTransacted() throws Exception {
|
||||||
|
setupDLQ(10);
|
||||||
|
resourceAdapter = newResourceAdapter();
|
||||||
|
MyBootstrapContext ctx = new MyBootstrapContext();
|
||||||
|
resourceAdapter.start(ctx);
|
||||||
|
ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory();
|
||||||
|
mcf.setAllowLocalTransactions(true);
|
||||||
|
mcf.setResourceAdapter(resourceAdapter);
|
||||||
|
ActiveMQRAConnectionFactory qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager);
|
||||||
|
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
|
||||||
|
Session s = queueConnection.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
Queue q = ActiveMQJMSClient.createQueue(MDBQUEUE);
|
||||||
|
MessageProducer mp = s.createProducer(q);
|
||||||
|
MessageConsumer consumer = s.createConsumer(q);
|
||||||
|
Message message = s.createTextMessage("test");
|
||||||
|
mp.send(message);
|
||||||
|
s.commit();
|
||||||
|
queueConnection.start();
|
||||||
|
TextMessage textMessage = (TextMessage) consumer.receive(1000);
|
||||||
|
assertNotNull(textMessage);
|
||||||
|
assertEquals(textMessage.getText(), "test");
|
||||||
|
s.rollback();
|
||||||
|
textMessage = (TextMessage) consumer.receive(1000);
|
||||||
|
assertNotNull(textMessage);
|
||||||
|
assertEquals(textMessage.getText(), "test");
|
||||||
|
s.commit();
|
||||||
|
textMessage = (TextMessage) consumer.receiveNoWait();
|
||||||
|
assertNull(textMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleMessageSendAndReceiveNotTransacted() throws Exception {
|
||||||
|
setupDLQ(10);
|
||||||
|
resourceAdapter = newResourceAdapter();
|
||||||
|
MyBootstrapContext ctx = new MyBootstrapContext();
|
||||||
|
resourceAdapter.start(ctx);
|
||||||
|
ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory();
|
||||||
|
mcf.setAllowLocalTransactions(true);
|
||||||
|
mcf.setResourceAdapter(resourceAdapter);
|
||||||
|
ActiveMQRAConnectionFactory qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager);
|
||||||
|
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
|
||||||
|
Session s = queueConnection.createSession(false, Session.SESSION_TRANSACTED);
|
||||||
|
Queue q = ActiveMQJMSClient.createQueue(MDBQUEUE);
|
||||||
|
MessageProducer mp = s.createProducer(q);
|
||||||
|
MessageConsumer consumer = s.createConsumer(q);
|
||||||
|
Message message = s.createTextMessage("test");
|
||||||
|
mp.send(message);
|
||||||
|
s.commit();
|
||||||
|
queueConnection.start();
|
||||||
|
TextMessage textMessage = (TextMessage) consumer.receive(1000);
|
||||||
|
assertNotNull(textMessage);
|
||||||
|
assertEquals(textMessage.getText(), "test");
|
||||||
|
s.rollback();
|
||||||
|
textMessage = (TextMessage) consumer.receive(1000);
|
||||||
|
assertNotNull(textMessage);
|
||||||
|
assertEquals(textMessage.getText(), "test");
|
||||||
|
s.commit();
|
||||||
|
textMessage = (TextMessage) consumer.receiveNoWait();
|
||||||
|
assertNull(textMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleMessageSendAndReceiveSessionTransacted2() throws Exception {
|
||||||
|
setupDLQ(10);
|
||||||
|
resourceAdapter = newResourceAdapter();
|
||||||
|
MyBootstrapContext ctx = new MyBootstrapContext();
|
||||||
|
resourceAdapter.start(ctx);
|
||||||
|
ActiveMQRAManagedConnectionFactory mcf = new ActiveMQRAManagedConnectionFactory();
|
||||||
|
mcf.setAllowLocalTransactions(true);
|
||||||
|
mcf.setResourceAdapter(resourceAdapter);
|
||||||
|
ActiveMQRAConnectionFactory qraConnectionFactory = new ActiveMQRAConnectionFactoryImpl(mcf, qraConnectionManager);
|
||||||
|
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
|
||||||
|
Session s = queueConnection.createSession(Session.SESSION_TRANSACTED);
|
||||||
|
Queue q = ActiveMQJMSClient.createQueue(MDBQUEUE);
|
||||||
|
MessageProducer mp = s.createProducer(q);
|
||||||
|
MessageConsumer consumer = s.createConsumer(q);
|
||||||
|
Message message = s.createTextMessage("test");
|
||||||
|
mp.send(message);
|
||||||
|
s.commit();
|
||||||
|
queueConnection.start();
|
||||||
|
TextMessage textMessage = (TextMessage) consumer.receive(1000);
|
||||||
|
assertNotNull(textMessage);
|
||||||
|
assertEquals(textMessage.getText(), "test");
|
||||||
|
s.rollback();
|
||||||
|
textMessage = (TextMessage) consumer.receive(1000);
|
||||||
|
assertNotNull(textMessage);
|
||||||
|
assertEquals(textMessage.getText(), "test");
|
||||||
|
s.commit();
|
||||||
|
textMessage = (TextMessage) consumer.receiveNoWait();
|
||||||
|
assertNull(textMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void sessionTransactedTestNoActiveJTATx() throws Exception {
|
||||||
|
JMSContext context = qraConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);
|
||||||
|
assertEquals(context.getSessionMode(), JMSContext.AUTO_ACKNOWLEDGE);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testQueuSessionAckMode() throws Exception {
|
||||||
|
|
||||||
|
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
|
||||||
|
|
||||||
|
Session s = queueConnection.createSession(false, Session.SESSION_TRANSACTED);
|
||||||
|
|
||||||
|
s.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleSendNoXAJMSContext() throws Exception {
|
||||||
|
Queue q = ActiveMQJMSClient.createQueue(MDBQUEUE);
|
||||||
|
|
||||||
|
try (ClientSessionFactory sf = locator.createSessionFactory();
|
||||||
|
ClientSession session = sf.createSession();
|
||||||
|
ClientConsumer consVerify = session.createConsumer("jms.queue." + MDBQUEUE);
|
||||||
|
JMSContext jmsctx = qraConnectionFactory.createContext();
|
||||||
|
) {
|
||||||
|
session.start();
|
||||||
|
// These next 4 lines could be written in a single line however it makes difficult for debugging
|
||||||
|
JMSProducer producer = jmsctx.createProducer();
|
||||||
|
producer.setProperty("strvalue", "hello");
|
||||||
|
TextMessage msgsend = jmsctx.createTextMessage("hello");
|
||||||
|
producer.send(q, msgsend);
|
||||||
|
|
||||||
|
ClientMessage msg = consVerify.receive(1000);
|
||||||
|
assertNotNull(msg);
|
||||||
|
assertEquals("hello", msg.getStringProperty("strvalue"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleMessageSendAndReceive() throws Exception {
|
||||||
|
QueueConnection queueConnection = qraConnectionFactory.createQueueConnection();
|
||||||
|
Session s = queueConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Queue q = ActiveMQJMSClient.createQueue(MDBQUEUE);
|
||||||
|
MessageProducer mp = s.createProducer(q);
|
||||||
|
MessageConsumer consumer = s.createConsumer(q);
|
||||||
|
Message message = s.createTextMessage("test");
|
||||||
|
mp.send(message);
|
||||||
|
queueConnection.start();
|
||||||
|
TextMessage textMessage = (TextMessage) consumer.receive(1000);
|
||||||
|
assertNotNull(textMessage);
|
||||||
|
assertEquals(textMessage.getText(), "test");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSimpleSendNoXAJMS1() throws Exception {
|
||||||
|
Queue q = ActiveMQJMSClient.createQueue(MDBQUEUE);
|
||||||
|
try (ClientSessionFactory sf = locator.createSessionFactory();
|
||||||
|
ClientSession session = sf.createSession();
|
||||||
|
ClientConsumer consVerify = session.createConsumer("jms.queue." + MDBQUEUE);
|
||||||
|
Connection conn = qraConnectionFactory.createConnection();
|
||||||
|
) {
|
||||||
|
Session jmsSess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
session.start();
|
||||||
|
MessageProducer producer = jmsSess.createProducer(q);
|
||||||
|
// These next 4 lines could be written in a single line however it makes difficult for debugging
|
||||||
|
TextMessage msgsend = jmsSess.createTextMessage("hello");
|
||||||
|
msgsend.setStringProperty("strvalue", "hello");
|
||||||
|
producer.send(msgsend);
|
||||||
|
|
||||||
|
ClientMessage msg = consVerify.receive(1000);
|
||||||
|
assertNotNull(msg);
|
||||||
|
assertEquals("hello", msg.getStringProperty("strvalue"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue