ARTEMIS-2339 Compatibility around prefixing

There are a few issues with prefixing and compatibility.
This is basically an issue when integrated with Wildfly or any other case
where prefix is activated
and playing with older versions.
This commit is contained in:
Clebert Suconic 2019-05-15 09:10:53 -04:00
parent c24248adf1
commit b9b6fddeea
20 changed files with 1186 additions and 9 deletions

View File

@ -68,7 +68,7 @@ import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQCompatibleMes
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQMapCompatibleMessage;
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQObjectCompatibleMessage;
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQStreamCompatibleMessage;
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQTextCompabileMessage;
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQTextCompatibleMessage;
import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser;
import org.apache.activemq.artemis.utils.CompositeAddress;
@ -234,7 +234,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
ActiveMQTextMessage msg;
if (enable1xPrefixes) {
msg = new ActiveMQTextCompabileMessage(session);
msg = new ActiveMQTextCompatibleMessage(session);
} else {
msg = new ActiveMQTextMessage(session);
}
@ -249,7 +249,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
ActiveMQTextMessage msg;
if (enable1xPrefixes) {
msg = new ActiveMQTextCompabileMessage(session);
msg = new ActiveMQTextCompatibleMessage(session);
} else {
msg = new ActiveMQTextMessage(session);
}

View File

@ -190,7 +190,7 @@ public class ActiveMQCompatibleMessage extends ActiveMQMessage {
}
case ActiveMQTextMessage.TYPE: // 3
{
msg = new ActiveMQTextCompabileMessage(message, session);
msg = new ActiveMQTextCompatibleMessage(message, session);
break;
}
default: {

View File

@ -21,11 +21,12 @@ import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
public class ActiveMQTextCompabileMessage extends ActiveMQTextMessage {
public class ActiveMQTextCompatibleMessage extends ActiveMQTextMessage {
@Override
public void setJMSReplyTo(Destination dest) throws JMSException {
@ -40,15 +41,21 @@ public class ActiveMQTextCompabileMessage extends ActiveMQTextMessage {
return replyTo;
}
public ActiveMQTextCompabileMessage(ClientSession session) {
public ActiveMQTextCompatibleMessage(ClientSession session) {
super(session);
}
public ActiveMQTextCompabileMessage(ClientMessage message, ClientSession session) {
public ActiveMQTextCompatibleMessage(ClientMessage message, ClientSession session) {
super(message, session);
}
public ActiveMQTextCompabileMessage(TextMessage foreign, ClientSession session) throws JMSException {
public ActiveMQTextCompatibleMessage(TextMessage foreign, ClientSession session) throws JMSException {
super(foreign, session);
}
@Override
protected SimpleString checkPrefix(SimpleString address) {
return ActiveMQCompatibleMessage.checkPrefix1X(address);
}
}

View File

@ -347,7 +347,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
case CREATE_QUEUE: {
CreateQueueMessage request = (CreateQueueMessage) packet;
requiresResponse = request.isRequiresResponse();
session.createQueue(request.getAddress(), request.getQueueName(), RoutingType.MULTICAST, request.getFilterString(), request.isTemporary(), request.isDurable());
session.createQueue(request.getAddress(), request.getQueueName(), getRoutingTypeFromAddress(request.getAddress()), request.getFilterString(), request.isTemporary(), request.isDurable());
if (requiresResponse) {
response = createNullResponseMessage(packet);
}
@ -634,6 +634,14 @@ public class ServerSessionPacketHandler implements ChannelHandler {
}
}
private RoutingType getRoutingTypeFromAddress(SimpleString address) {
if (address.startsWith(PacketImpl.OLD_QUEUE_PREFIX) || address.startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX)) {
return RoutingType.ANYCAST;
}
return RoutingType.MULTICAST;
}
private Packet createNullResponseMessage(Packet packet) {
final Packet response;
if (!packet.isResponseAsync() || channel.getConnection().isVersionBeforeAsyncResponseChange()) {

View File

@ -0,0 +1,53 @@
package jmsReplyToQueue
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
import org.apache.activemq.artemis.core.server.JournalType
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// starts an artemis server
import org.apache.activemq.artemis.core.server.impl.AddressInfo
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS
String folder = arg[0];
String queueAddress = "jms.queue.myQueue";
String replyQueueAddress = "jms.queue.myReplyQueue";
configuration = new ConfigurationImpl();
configuration.setJournalType(JournalType.NIO);
configuration.setBrokerInstance(new File(folder + "/server"));
configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616");
configuration.setSecurityEnabled(false);
configuration.setPersistenceEnabled(false);
jmsConfiguration = new JMSConfigurationImpl();
server = new EmbeddedJMS();
server.setConfiguration(configuration);
server.setJmsConfiguration(jmsConfiguration);
server.start();
server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST));
server.getActiveMQServer().createQueue(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST, SimpleString.toSimpleString(queueAddress), null, true, false);
server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(replyQueueAddress), RoutingType.ANYCAST));
server.getActiveMQServer().createQueue(SimpleString.toSimpleString(replyQueueAddress), RoutingType.ANYCAST, SimpleString.toSimpleString(replyQueueAddress), null, true, false);

View File

@ -0,0 +1,55 @@
package jmsReplyToQueue
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import javax.jms.*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
try {
cf.setEnable1xPrefixes(true);
} catch (Throwable totallyIgnored) {
// older versions will not have this method, dont even bother about seeing the stack trace or exception
}
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue myQueue = session.createQueue("myQueue");
MessageConsumer queueConsumer = session.createConsumer(myQueue);
consumerCreated.countDown();
connection.start()
Message message = queueConsumer.receive(5000);
GroovyRun.assertNotNull(message)
session.commit();
System.out.println("Received " + message + " from: " + myQueue);
queueConsumer.close();
System.out.println("Sending message to: " + message.getJMSReplyTo());
MessageProducer producer = session.createProducer(message.getJMSReplyTo());
message = session.createMessage();
producer.send(message);
session.commit();
connection.close();
latch.countDown();

View File

@ -0,0 +1,55 @@
package jmsReplyToQueue
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import javax.jms.*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
try {
cf.setEnable1xPrefixes(true);
} catch (Throwable totallyIgnored) {
// older versions will not have this method, dont even bother about seeing the stack trace or exception
}
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
connection.start();
Queue myQueue = session.createQueue("myQueue");
Queue temporaryQueue = session.createQueue("myTemporaryQueue");
MessageConsumer consumer = session.createConsumer(temporaryQueue);
MessageProducer queueProducer = session.createProducer(myQueue)
queueProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
Message message = session.createMessage();
message.setJMSReplyTo(temporaryQueue);
System.out.println("Sending " + message + " to: " + myQueue);
queueProducer.send(message);
session.commit();
System.out.println("Receiving message from: " + temporaryQueue);
message = consumer.receive(10000);
GroovyRun.assertNotNull(message);
session.commit();
System.out.println("Received message: " + message);
connection.close();
senderLatch.countDown();

View File

@ -0,0 +1,49 @@
package jmsReplyToTempQueue
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
import org.apache.activemq.artemis.core.server.JournalType
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// starts an artemis server
import org.apache.activemq.artemis.core.server.impl.AddressInfo
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS
String folder = arg[0];
String queueAddress = "jms.queue.myQueue";
configuration = new ConfigurationImpl();
configuration.setJournalType(JournalType.NIO);
configuration.setBrokerInstance(new File(folder + "/server"));
configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616");
configuration.setSecurityEnabled(false);
configuration.setPersistenceEnabled(false);
jmsConfiguration = new JMSConfigurationImpl();
server = new EmbeddedJMS();
server.setConfiguration(configuration);
server.setJmsConfiguration(jmsConfiguration);
server.start();
server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST));
server.getActiveMQServer().createQueue(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST, SimpleString.toSimpleString(queueAddress), null, true, false);

View File

@ -0,0 +1,55 @@
package jmsReplyToTempQueue
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import javax.jms.*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
try {
cf.setEnable1xPrefixes(true);
} catch (Throwable totallyIgnored) {
// older versions will not have this method, dont even bother about seeing the stack trace or exception
}
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue myQueue = session.createQueue("myQueue");
MessageConsumer queueConsumer = session.createConsumer(myQueue);
consumerCreated.countDown();
connection.start()
Message message = queueConsumer.receive(5000);
GroovyRun.assertNotNull(message)
session.commit();
System.out.println("Received " + message + " from: " + myQueue);
queueConsumer.close();
System.out.println("Sending message to: " + message.getJMSReplyTo());
MessageProducer producer = session.createProducer(message.getJMSReplyTo());
message = session.createMessage();
producer.send(message);
session.commit();
connection.close();
latch.countDown();

View File

@ -0,0 +1,55 @@
package jmsReplyToTempQueue
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import javax.jms.*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
try {
cf.setEnable1xPrefixes(true);
} catch (Throwable totallyIgnored) {
// older versions will not have this method, dont even bother about seeing the stack trace or exception
}
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
connection.start();
Queue myQueue = session.createQueue("myQueue");
Queue temporaryQueue = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(temporaryQueue);
MessageProducer queueProducer = session.createProducer(myQueue)
queueProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
Message message = session.createMessage();
message.setJMSReplyTo(temporaryQueue);
System.out.println("Sending " + message + " to: " + myQueue);
queueProducer.send(message);
session.commit();
System.out.println("Receiving message from: " + temporaryQueue);
message = consumer.receive(10000);
GroovyRun.assertNotNull(message);
session.commit();
System.out.println("Received message: " + message);
connection.close();
senderLatch.countDown();

View File

@ -0,0 +1,52 @@
package jmsReplyToTempTopic
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
import org.apache.activemq.artemis.core.server.JournalType
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// starts an artemis server
import org.apache.activemq.artemis.core.server.impl.AddressInfo
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS
String folder = arg[0];
String queueAddress = "jms.queue.myQueue";
String replyTopicAddress = "jms.topic.myReplyTopic";
configuration = new ConfigurationImpl();
configuration.setJournalType(JournalType.NIO);
configuration.setBrokerInstance(new File(folder + "/server"));
configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616");
configuration.setSecurityEnabled(false);
configuration.setPersistenceEnabled(false);
jmsConfiguration = new JMSConfigurationImpl();
server = new EmbeddedJMS();
server.setConfiguration(configuration);
server.setJmsConfiguration(jmsConfiguration);
server.start();
server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST));
server.getActiveMQServer().createQueue(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST, SimpleString.toSimpleString(queueAddress), null, true, false);
server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(replyTopicAddress), RoutingType.MULTICAST));

View File

@ -0,0 +1,58 @@
package jmsReplyToTempTopic
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import javax.jms.*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
try {
cf.setEnable1xPrefixes(true);
} catch (Throwable totallyIgnored) {
// older versions will not have this method, dont even bother about seeing the stack trace or exception
}
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue myQueue = session.createQueue("myQueue");
MessageConsumer queueConsumer = session.createConsumer(myQueue);
consumerCreated.countDown();
connection.start()
for (int i = 0; i < 5; i++) {
Message message = queueConsumer.receive(5000);
GroovyRun.assertNotNull(message)
System.out.println("Received " + message + " from: " + myQueue);
GroovyRun.assertEquals("myQueue", ((Queue)message.getJMSDestination()).getQueueName());
System.out.println("Sending message to: " + message.getJMSReplyTo());
MessageProducer producer = session.createProducer(message.getJMSReplyTo());
message = session.createMessage();
producer.send(message);
}
queueConsumer.close();
session.commit();
connection.close();
latch.countDown();

View File

@ -0,0 +1,71 @@
package jmsReplyToTempTopic
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import javax.jms.*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
try {
cf.setEnable1xPrefixes(true);
} catch (Throwable totallyIgnored) {
// older versions will not have this method, dont even bother about seeing the stack trace or exception
}
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
connection.start();
Queue myQueue = session.createQueue("myQueue");
System.out.println("myQueue::" + myQueue);
TemporaryTopic replyTopic = session.createTemporaryTopic();
MessageConsumer consumer = session.createConsumer(replyTopic);
System.out.println("Temporary Topic " + replyTopic);
MessageProducer queueProducer = session.createProducer(myQueue)
queueProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
sendMessage(session.createTextMessage("hello"), replyTopic, myQueue, queueProducer);
sendMessage(session.createMapMessage(), replyTopic, myQueue, queueProducer);
sendMessage(session.createObjectMessage(), replyTopic, myQueue, queueProducer);
sendMessage(session.createStreamMessage(), replyTopic, myQueue, queueProducer);
sendMessage(session.createMessage(), replyTopic, myQueue, queueProducer);
session.commit();
System.out.println("Receiving message from: " + replyTopic);
for (int i = 0; i < 5; i++) {
message = consumer.receive(10000);
GroovyRun.assertNotNull(message);
}
GroovyRun.assertNull(consumer.receiveNoWait());
session.commit();
System.out.println("Received message: " + message);
connection.close();
senderLatch.countDown();
void sendMessage(Message message, TemporaryTopic replyTopic, Queue myQueue, MessageProducer queueProducer) {
message.setJMSReplyTo(replyTopic);
System.out.println("Sending " + message + " to: " + myQueue);
queueProducer.send(message);
}

View File

@ -0,0 +1,49 @@
package jmsReplyToTopic
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
import org.apache.activemq.artemis.core.server.JournalType
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// starts an artemis server
import org.apache.activemq.artemis.core.server.impl.AddressInfo
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS
String folder = arg[0];
String queueAddress = "jms.queue.myQueue";
configuration = new ConfigurationImpl();
configuration.setJournalType(JournalType.NIO);
configuration.setBrokerInstance(new File(folder + "/server"));
configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616");
configuration.setSecurityEnabled(false);
configuration.setPersistenceEnabled(false);
jmsConfiguration = new JMSConfigurationImpl();
server = new EmbeddedJMS();
server.setConfiguration(configuration);
server.setJmsConfiguration(jmsConfiguration);
server.start();
server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST));
server.getActiveMQServer().createQueue(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST, SimpleString.toSimpleString(queueAddress), null, true, false);

View File

@ -0,0 +1,55 @@
package jmsReplyToTopic
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import javax.jms.*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
try {
cf.setEnable1xPrefixes(true);
} catch (Throwable totallyIgnored) {
// older versions will not have this method, dont even bother about seeing the stack trace or exception
}
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue myQueue = session.createQueue("myQueue");
MessageConsumer queueConsumer = session.createConsumer(myQueue);
consumerCreated.countDown();
connection.start()
Message message = queueConsumer.receive(5000);
GroovyRun.assertNotNull(message)
session.commit();
System.out.println("Received " + message + " from: " + myQueue);
queueConsumer.close();
System.out.println("Sending message to: " + message.getJMSReplyTo());
MessageProducer producer = session.createProducer(message.getJMSReplyTo());
message = session.createMessage();
producer.send(message);
session.commit();
connection.close();
latch.countDown();

View File

@ -0,0 +1,55 @@
package jmsReplyToTopic
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import javax.jms.*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
try {
cf.setEnable1xPrefixes(true);
} catch (Throwable totallyIgnored) {
// older versions will not have this method, dont even bother about seeing the stack trace or exception
}
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
connection.start();
Queue myQueue = session.createQueue("myQueue");
Topic replyTopic = session.createTopic("myReplyTopic");
MessageConsumer consumer = session.createConsumer(replyTopic);
MessageProducer queueProducer = session.createProducer(myQueue)
queueProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
Message message = session.createMessage();
message.setJMSReplyTo(replyTopic);
System.out.println("Sending " + message + " to: " + myQueue);
queueProducer.send(message);
session.commit();
System.out.println("Receiving message from: " + replyTopic);
message = consumer.receive(10000);
GroovyRun.assertNotNull(message);
session.commit();
System.out.println("Received message: " + message);
connection.close();
senderLatch.countDown();

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.compatibility;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase;
import org.apache.activemq.artemis.utils.FileUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
@RunWith(Parameterized.class)
public class JmsReplyToQueueTest extends VersionedBase {
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
public static Collection getParameters() {
List<Object[]> combinations = new ArrayList<>();
combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, SNAPSHOT});
combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, ONE_FIVE});
combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, SNAPSHOT});
return combinations;
}
public JmsReplyToQueueTest(String server, String sender, String receiver) throws Exception {
super(server, sender, receiver);
}
@Before
public void setUp() throws Throwable {
FileUtil.deleteDirectory(serverFolder.getRoot());
}
@After
public void stopTest() throws Exception {
execute(serverClassloader, "server.stop()");
}
@Test
public void testJmsReplyToQueue() throws Throwable {
evaluate(serverClassloader, "jmsReplyToQueue/artemisServer.groovy", serverFolder.getRoot().getAbsolutePath(), server);
CountDownLatch consumerCreated = new CountDownLatch(1);
CountDownLatch receiverLatch = new CountDownLatch(1);
CountDownLatch senderLatch = new CountDownLatch(1);
setVariable(receiverClassloader, "latch", receiverLatch);
setVariable(receiverClassloader, "consumerCreated", consumerCreated);
AtomicInteger errors = new AtomicInteger(0);
Thread t1 = new Thread() {
@Override
public void run() {
try {
evaluate(receiverClassloader, "jmsReplyToQueue/receiveMessages.groovy", receiver);
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
}
};
t1.start();
Assert.assertTrue(consumerCreated.await(10, TimeUnit.SECONDS));
setVariable(senderClassloader, "senderLatch", senderLatch);
Thread t2 = new Thread() {
@Override
public void run() {
try {
evaluate(senderClassloader, "jmsReplyToQueue/sendMessagesAddress.groovy", sender);
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
}
};
t2.start();
try {
Assert.assertTrue("Sender did not get message from queue", senderLatch.await(10, TimeUnit.SECONDS));
Assert.assertTrue("Receiver did not receive messages", receiverLatch.await(10, TimeUnit.SECONDS));
} finally {
t1.join(TimeUnit.SECONDS.toMillis(1));
t2.join(TimeUnit.SECONDS.toMillis(1));
if (t1.isAlive()) {
t1.interrupt();
}
if (t2.isAlive()) {
t2.interrupt();
}
}
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.compatibility;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase;
import org.apache.activemq.artemis.utils.FileUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
@RunWith(Parameterized.class)
public class JmsReplyToTempQueueTest extends VersionedBase {
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
public static Collection getParameters() {
List<Object[]> combinations = new ArrayList<>();
combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, SNAPSHOT});
combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, ONE_FIVE});
combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, SNAPSHOT});
return combinations;
}
public JmsReplyToTempQueueTest(String server, String sender, String receiver) throws Exception {
super(server, sender, receiver);
}
@Before
public void setUp() throws Throwable {
FileUtil.deleteDirectory(serverFolder.getRoot());
}
@After
public void stopTest() throws Exception {
execute(serverClassloader, "server.stop()");
}
@Test
public void testJmsReplyToTempQueue() throws Throwable {
evaluate(serverClassloader, "jmsReplyToTempQueue/artemisServer.groovy", serverFolder.getRoot().getAbsolutePath(), server);
CountDownLatch consumerCreated = new CountDownLatch(1);
CountDownLatch receiverLatch = new CountDownLatch(1);
CountDownLatch senderLatch = new CountDownLatch(1);
setVariable(receiverClassloader, "latch", receiverLatch);
setVariable(receiverClassloader, "consumerCreated", consumerCreated);
AtomicInteger errors = new AtomicInteger(0);
Thread t1 = new Thread() {
@Override
public void run() {
try {
evaluate(receiverClassloader, "jmsReplyToTempQueue/receiveMessages.groovy", receiver);
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
}
};
t1.start();
Assert.assertTrue(consumerCreated.await(10, TimeUnit.SECONDS));
setVariable(senderClassloader, "senderLatch", senderLatch);
Thread t2 = new Thread() {
@Override
public void run() {
try {
evaluate(senderClassloader, "jmsReplyToTempQueue/sendMessagesAddress.groovy", sender);
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
}
};
t2.start();
try {
Assert.assertTrue("Sender did not get message from temporary queue", senderLatch.await(10, TimeUnit.SECONDS));
Assert.assertTrue("Receiver did not receive messages", receiverLatch.await(10, TimeUnit.SECONDS));
} finally {
t1.join(TimeUnit.SECONDS.toMillis(1));
t2.join(TimeUnit.SECONDS.toMillis(1));
if (t1.isAlive()) {
t1.interrupt();
}
if (t2.isAlive()) {
t2.interrupt();
}
}
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.compatibility;
import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase;
import org.apache.activemq.artemis.utils.FileUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
@RunWith(Parameterized.class)
public class JmsReplyToTempTopicTest extends VersionedBase {
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
public static Collection getParameters() {
List<Object[]> combinations = new ArrayList<>();
combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, SNAPSHOT});
combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, ONE_FIVE});
combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, SNAPSHOT});
return combinations;
}
public JmsReplyToTempTopicTest(String server, String sender, String receiver) throws Exception {
super(server, sender, receiver);
}
@Before
public void setUp() throws Throwable {
FileUtil.deleteDirectory(serverFolder.getRoot());
}
@After
public void stopTest() throws Exception {
execute(serverClassloader, "server.stop()");
}
@Test
public void testJmsReplyToTempTopic() throws Throwable {
evaluate(serverClassloader, "jmsReplyToTempTopic/artemisServer.groovy", serverFolder.getRoot().getAbsolutePath(), server);
CountDownLatch consumerCreated = new CountDownLatch(1);
CountDownLatch receiverLatch = new CountDownLatch(1);
CountDownLatch senderLatch = new CountDownLatch(1);
setVariable(receiverClassloader, "latch", receiverLatch);
setVariable(receiverClassloader, "consumerCreated", consumerCreated);
AtomicInteger errors = new AtomicInteger(0);
Thread t1 = new Thread() {
@Override
public void run() {
try {
evaluate(receiverClassloader, "jmsReplyToTempTopic/receiveMessages.groovy", receiver);
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
}
};
t1.start();
Assert.assertTrue(consumerCreated.await(10, TimeUnit.SECONDS));
setVariable(senderClassloader, "senderLatch", senderLatch);
Thread t2 = new Thread() {
@Override
public void run() {
try {
evaluate(senderClassloader, "jmsReplyToTempTopic/sendMessagesAddress.groovy", sender);
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
}
};
t2.start();
try {
Assert.assertTrue("Sender did not get message from temporary topic", senderLatch.await(10, TimeUnit.SECONDS));
Assert.assertTrue("Receiver did not receive messages", receiverLatch.await(10, TimeUnit.SECONDS));
} finally {
t1.join(TimeUnit.SECONDS.toMillis(1));
t2.join(TimeUnit.SECONDS.toMillis(1));
if (t1.isAlive()) {
t1.interrupt();
}
if (t2.isAlive()) {
t2.interrupt();
}
}
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.compatibility;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase;
import org.apache.activemq.artemis.utils.FileUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
@RunWith(Parameterized.class)
public class JmsReplyToTopicTest extends VersionedBase {
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
public static Collection getParameters() {
List<Object[]> combinations = new ArrayList<>();
combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, SNAPSHOT});
combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, ONE_FIVE});
combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, SNAPSHOT});
return combinations;
}
public JmsReplyToTopicTest(String server, String sender, String receiver) throws Exception {
super(server, sender, receiver);
}
@Before
public void setUp() throws Throwable {
FileUtil.deleteDirectory(serverFolder.getRoot());
}
@After
public void stopTest() throws Exception {
execute(serverClassloader, "server.stop()");
}
@Test
public void testJmsReplyToTopic() throws Throwable {
evaluate(serverClassloader, "jmsReplyToTopic/artemisServer.groovy", serverFolder.getRoot().getAbsolutePath(), server);
CountDownLatch consumerCreated = new CountDownLatch(1);
CountDownLatch receiverLatch = new CountDownLatch(1);
CountDownLatch senderLatch = new CountDownLatch(1);
setVariable(receiverClassloader, "latch", receiverLatch);
setVariable(receiverClassloader, "consumerCreated", consumerCreated);
AtomicInteger errors = new AtomicInteger(0);
Thread t1 = new Thread() {
@Override
public void run() {
try {
evaluate(receiverClassloader, "jmsReplyToTopic/receiveMessages.groovy", receiver);
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
}
};
t1.start();
Assert.assertTrue(consumerCreated.await(10, TimeUnit.SECONDS));
setVariable(senderClassloader, "senderLatch", senderLatch);
Thread t2 = new Thread() {
@Override
public void run() {
try {
evaluate(senderClassloader, "jmsReplyToTopic/sendMessagesAddress.groovy", sender);
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
}
};
t2.start();
try {
Assert.assertTrue("Sender did not get message from topic", senderLatch.await(10, TimeUnit.SECONDS));
Assert.assertTrue("Receiver did not receive messages", receiverLatch.await(10, TimeUnit.SECONDS));
} finally {
t1.join(TimeUnit.SECONDS.toMillis(1));
t2.join(TimeUnit.SECONDS.toMillis(1));
if (t1.isAlive()) {
t1.interrupt();
}
if (t2.isAlive()) {
t2.interrupt();
}
}
}
}