This commit is contained in:
Clebert Suconic 2019-05-15 18:49:29 -04:00
commit ea973ce776
20 changed files with 1186 additions and 9 deletions

View File

@ -68,7 +68,7 @@ import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQCompatibleMes
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQMapCompatibleMessage; import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQMapCompatibleMessage;
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQObjectCompatibleMessage; import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQObjectCompatibleMessage;
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQStreamCompatibleMessage; import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQStreamCompatibleMessage;
import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQTextCompabileMessage; import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQTextCompatibleMessage;
import org.apache.activemq.artemis.selector.filter.FilterException; import org.apache.activemq.artemis.selector.filter.FilterException;
import org.apache.activemq.artemis.selector.impl.SelectorParser; import org.apache.activemq.artemis.selector.impl.SelectorParser;
import org.apache.activemq.artemis.utils.CompositeAddress; import org.apache.activemq.artemis.utils.CompositeAddress;
@ -234,7 +234,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
ActiveMQTextMessage msg; ActiveMQTextMessage msg;
if (enable1xPrefixes) { if (enable1xPrefixes) {
msg = new ActiveMQTextCompabileMessage(session); msg = new ActiveMQTextCompatibleMessage(session);
} else { } else {
msg = new ActiveMQTextMessage(session); msg = new ActiveMQTextMessage(session);
} }
@ -249,7 +249,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
ActiveMQTextMessage msg; ActiveMQTextMessage msg;
if (enable1xPrefixes) { if (enable1xPrefixes) {
msg = new ActiveMQTextCompabileMessage(session); msg = new ActiveMQTextCompatibleMessage(session);
} else { } else {
msg = new ActiveMQTextMessage(session); msg = new ActiveMQTextMessage(session);
} }

View File

@ -190,7 +190,7 @@ public class ActiveMQCompatibleMessage extends ActiveMQMessage {
} }
case ActiveMQTextMessage.TYPE: // 3 case ActiveMQTextMessage.TYPE: // 3
{ {
msg = new ActiveMQTextCompabileMessage(message, session); msg = new ActiveMQTextCompatibleMessage(message, session);
break; break;
} }
default: { default: {

View File

@ -21,11 +21,12 @@ import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage; import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
public class ActiveMQTextCompabileMessage extends ActiveMQTextMessage { public class ActiveMQTextCompatibleMessage extends ActiveMQTextMessage {
@Override @Override
public void setJMSReplyTo(Destination dest) throws JMSException { public void setJMSReplyTo(Destination dest) throws JMSException {
@ -40,15 +41,21 @@ public class ActiveMQTextCompabileMessage extends ActiveMQTextMessage {
return replyTo; return replyTo;
} }
public ActiveMQTextCompabileMessage(ClientSession session) { public ActiveMQTextCompatibleMessage(ClientSession session) {
super(session); super(session);
} }
public ActiveMQTextCompabileMessage(ClientMessage message, ClientSession session) { public ActiveMQTextCompatibleMessage(ClientMessage message, ClientSession session) {
super(message, session); super(message, session);
} }
public ActiveMQTextCompabileMessage(TextMessage foreign, ClientSession session) throws JMSException { public ActiveMQTextCompatibleMessage(TextMessage foreign, ClientSession session) throws JMSException {
super(foreign, session); super(foreign, session);
} }
@Override
protected SimpleString checkPrefix(SimpleString address) {
return ActiveMQCompatibleMessage.checkPrefix1X(address);
}
} }

View File

@ -347,7 +347,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
case CREATE_QUEUE: { case CREATE_QUEUE: {
CreateQueueMessage request = (CreateQueueMessage) packet; CreateQueueMessage request = (CreateQueueMessage) packet;
requiresResponse = request.isRequiresResponse(); requiresResponse = request.isRequiresResponse();
session.createQueue(request.getAddress(), request.getQueueName(), RoutingType.MULTICAST, request.getFilterString(), request.isTemporary(), request.isDurable()); session.createQueue(request.getAddress(), request.getQueueName(), getRoutingTypeFromAddress(request.getAddress()), request.getFilterString(), request.isTemporary(), request.isDurable());
if (requiresResponse) { if (requiresResponse) {
response = createNullResponseMessage(packet); response = createNullResponseMessage(packet);
} }
@ -634,6 +634,14 @@ public class ServerSessionPacketHandler implements ChannelHandler {
} }
} }
private RoutingType getRoutingTypeFromAddress(SimpleString address) {
if (address.startsWith(PacketImpl.OLD_QUEUE_PREFIX) || address.startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX)) {
return RoutingType.ANYCAST;
}
return RoutingType.MULTICAST;
}
private Packet createNullResponseMessage(Packet packet) { private Packet createNullResponseMessage(Packet packet) {
final Packet response; final Packet response;
if (!packet.isResponseAsync() || channel.getConnection().isVersionBeforeAsyncResponseChange()) { if (!packet.isResponseAsync() || channel.getConnection().isVersionBeforeAsyncResponseChange()) {

View File

@ -0,0 +1,53 @@
package jmsReplyToQueue
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
import org.apache.activemq.artemis.core.server.JournalType
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// starts an artemis server
import org.apache.activemq.artemis.core.server.impl.AddressInfo
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS
String folder = arg[0];
String queueAddress = "jms.queue.myQueue";
String replyQueueAddress = "jms.queue.myReplyQueue";
configuration = new ConfigurationImpl();
configuration.setJournalType(JournalType.NIO);
configuration.setBrokerInstance(new File(folder + "/server"));
configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616");
configuration.setSecurityEnabled(false);
configuration.setPersistenceEnabled(false);
jmsConfiguration = new JMSConfigurationImpl();
server = new EmbeddedJMS();
server.setConfiguration(configuration);
server.setJmsConfiguration(jmsConfiguration);
server.start();
server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST));
server.getActiveMQServer().createQueue(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST, SimpleString.toSimpleString(queueAddress), null, true, false);
server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(replyQueueAddress), RoutingType.ANYCAST));
server.getActiveMQServer().createQueue(SimpleString.toSimpleString(replyQueueAddress), RoutingType.ANYCAST, SimpleString.toSimpleString(replyQueueAddress), null, true, false);

View File

@ -0,0 +1,55 @@
package jmsReplyToQueue
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import javax.jms.*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
try {
cf.setEnable1xPrefixes(true);
} catch (Throwable totallyIgnored) {
// older versions will not have this method, dont even bother about seeing the stack trace or exception
}
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue myQueue = session.createQueue("myQueue");
MessageConsumer queueConsumer = session.createConsumer(myQueue);
consumerCreated.countDown();
connection.start()
Message message = queueConsumer.receive(5000);
GroovyRun.assertNotNull(message)
session.commit();
System.out.println("Received " + message + " from: " + myQueue);
queueConsumer.close();
System.out.println("Sending message to: " + message.getJMSReplyTo());
MessageProducer producer = session.createProducer(message.getJMSReplyTo());
message = session.createMessage();
producer.send(message);
session.commit();
connection.close();
latch.countDown();

View File

@ -0,0 +1,55 @@
package jmsReplyToQueue
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import javax.jms.*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
try {
cf.setEnable1xPrefixes(true);
} catch (Throwable totallyIgnored) {
// older versions will not have this method, dont even bother about seeing the stack trace or exception
}
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
connection.start();
Queue myQueue = session.createQueue("myQueue");
Queue temporaryQueue = session.createQueue("myTemporaryQueue");
MessageConsumer consumer = session.createConsumer(temporaryQueue);
MessageProducer queueProducer = session.createProducer(myQueue)
queueProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
Message message = session.createMessage();
message.setJMSReplyTo(temporaryQueue);
System.out.println("Sending " + message + " to: " + myQueue);
queueProducer.send(message);
session.commit();
System.out.println("Receiving message from: " + temporaryQueue);
message = consumer.receive(10000);
GroovyRun.assertNotNull(message);
session.commit();
System.out.println("Received message: " + message);
connection.close();
senderLatch.countDown();

View File

@ -0,0 +1,49 @@
package jmsReplyToTempQueue
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
import org.apache.activemq.artemis.core.server.JournalType
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// starts an artemis server
import org.apache.activemq.artemis.core.server.impl.AddressInfo
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS
String folder = arg[0];
String queueAddress = "jms.queue.myQueue";
configuration = new ConfigurationImpl();
configuration.setJournalType(JournalType.NIO);
configuration.setBrokerInstance(new File(folder + "/server"));
configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616");
configuration.setSecurityEnabled(false);
configuration.setPersistenceEnabled(false);
jmsConfiguration = new JMSConfigurationImpl();
server = new EmbeddedJMS();
server.setConfiguration(configuration);
server.setJmsConfiguration(jmsConfiguration);
server.start();
server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST));
server.getActiveMQServer().createQueue(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST, SimpleString.toSimpleString(queueAddress), null, true, false);

View File

@ -0,0 +1,55 @@
package jmsReplyToTempQueue
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import javax.jms.*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
try {
cf.setEnable1xPrefixes(true);
} catch (Throwable totallyIgnored) {
// older versions will not have this method, dont even bother about seeing the stack trace or exception
}
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue myQueue = session.createQueue("myQueue");
MessageConsumer queueConsumer = session.createConsumer(myQueue);
consumerCreated.countDown();
connection.start()
Message message = queueConsumer.receive(5000);
GroovyRun.assertNotNull(message)
session.commit();
System.out.println("Received " + message + " from: " + myQueue);
queueConsumer.close();
System.out.println("Sending message to: " + message.getJMSReplyTo());
MessageProducer producer = session.createProducer(message.getJMSReplyTo());
message = session.createMessage();
producer.send(message);
session.commit();
connection.close();
latch.countDown();

View File

@ -0,0 +1,55 @@
package jmsReplyToTempQueue
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import javax.jms.*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
try {
cf.setEnable1xPrefixes(true);
} catch (Throwable totallyIgnored) {
// older versions will not have this method, dont even bother about seeing the stack trace or exception
}
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
connection.start();
Queue myQueue = session.createQueue("myQueue");
Queue temporaryQueue = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(temporaryQueue);
MessageProducer queueProducer = session.createProducer(myQueue)
queueProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
Message message = session.createMessage();
message.setJMSReplyTo(temporaryQueue);
System.out.println("Sending " + message + " to: " + myQueue);
queueProducer.send(message);
session.commit();
System.out.println("Receiving message from: " + temporaryQueue);
message = consumer.receive(10000);
GroovyRun.assertNotNull(message);
session.commit();
System.out.println("Received message: " + message);
connection.close();
senderLatch.countDown();

View File

@ -0,0 +1,52 @@
package jmsReplyToTempTopic
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
import org.apache.activemq.artemis.core.server.JournalType
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// starts an artemis server
import org.apache.activemq.artemis.core.server.impl.AddressInfo
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS
String folder = arg[0];
String queueAddress = "jms.queue.myQueue";
String replyTopicAddress = "jms.topic.myReplyTopic";
configuration = new ConfigurationImpl();
configuration.setJournalType(JournalType.NIO);
configuration.setBrokerInstance(new File(folder + "/server"));
configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616");
configuration.setSecurityEnabled(false);
configuration.setPersistenceEnabled(false);
jmsConfiguration = new JMSConfigurationImpl();
server = new EmbeddedJMS();
server.setConfiguration(configuration);
server.setJmsConfiguration(jmsConfiguration);
server.start();
server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST));
server.getActiveMQServer().createQueue(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST, SimpleString.toSimpleString(queueAddress), null, true, false);
server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(replyTopicAddress), RoutingType.MULTICAST));

View File

@ -0,0 +1,58 @@
package jmsReplyToTempTopic
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import javax.jms.*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
try {
cf.setEnable1xPrefixes(true);
} catch (Throwable totallyIgnored) {
// older versions will not have this method, dont even bother about seeing the stack trace or exception
}
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue myQueue = session.createQueue("myQueue");
MessageConsumer queueConsumer = session.createConsumer(myQueue);
consumerCreated.countDown();
connection.start()
for (int i = 0; i < 5; i++) {
Message message = queueConsumer.receive(5000);
GroovyRun.assertNotNull(message)
System.out.println("Received " + message + " from: " + myQueue);
GroovyRun.assertEquals("myQueue", ((Queue)message.getJMSDestination()).getQueueName());
System.out.println("Sending message to: " + message.getJMSReplyTo());
MessageProducer producer = session.createProducer(message.getJMSReplyTo());
message = session.createMessage();
producer.send(message);
}
queueConsumer.close();
session.commit();
connection.close();
latch.countDown();

View File

@ -0,0 +1,71 @@
package jmsReplyToTempTopic
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import javax.jms.*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
try {
cf.setEnable1xPrefixes(true);
} catch (Throwable totallyIgnored) {
// older versions will not have this method, dont even bother about seeing the stack trace or exception
}
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
connection.start();
Queue myQueue = session.createQueue("myQueue");
System.out.println("myQueue::" + myQueue);
TemporaryTopic replyTopic = session.createTemporaryTopic();
MessageConsumer consumer = session.createConsumer(replyTopic);
System.out.println("Temporary Topic " + replyTopic);
MessageProducer queueProducer = session.createProducer(myQueue)
queueProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
sendMessage(session.createTextMessage("hello"), replyTopic, myQueue, queueProducer);
sendMessage(session.createMapMessage(), replyTopic, myQueue, queueProducer);
sendMessage(session.createObjectMessage(), replyTopic, myQueue, queueProducer);
sendMessage(session.createStreamMessage(), replyTopic, myQueue, queueProducer);
sendMessage(session.createMessage(), replyTopic, myQueue, queueProducer);
session.commit();
System.out.println("Receiving message from: " + replyTopic);
for (int i = 0; i < 5; i++) {
message = consumer.receive(10000);
GroovyRun.assertNotNull(message);
}
GroovyRun.assertNull(consumer.receiveNoWait());
session.commit();
System.out.println("Received message: " + message);
connection.close();
senderLatch.countDown();
void sendMessage(Message message, TemporaryTopic replyTopic, Queue myQueue, MessageProducer queueProducer) {
message.setJMSReplyTo(replyTopic);
System.out.println("Sending " + message + " to: " + myQueue);
queueProducer.send(message);
}

View File

@ -0,0 +1,49 @@
package jmsReplyToTopic
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
import org.apache.activemq.artemis.core.server.JournalType
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// starts an artemis server
import org.apache.activemq.artemis.core.server.impl.AddressInfo
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS
String folder = arg[0];
String queueAddress = "jms.queue.myQueue";
configuration = new ConfigurationImpl();
configuration.setJournalType(JournalType.NIO);
configuration.setBrokerInstance(new File(folder + "/server"));
configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616");
configuration.setSecurityEnabled(false);
configuration.setPersistenceEnabled(false);
jmsConfiguration = new JMSConfigurationImpl();
server = new EmbeddedJMS();
server.setConfiguration(configuration);
server.setJmsConfiguration(jmsConfiguration);
server.start();
server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST));
server.getActiveMQServer().createQueue(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST, SimpleString.toSimpleString(queueAddress), null, true, false);

View File

@ -0,0 +1,55 @@
package jmsReplyToTopic
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import javax.jms.*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
try {
cf.setEnable1xPrefixes(true);
} catch (Throwable totallyIgnored) {
// older versions will not have this method, dont even bother about seeing the stack trace or exception
}
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue myQueue = session.createQueue("myQueue");
MessageConsumer queueConsumer = session.createConsumer(myQueue);
consumerCreated.countDown();
connection.start()
Message message = queueConsumer.receive(5000);
GroovyRun.assertNotNull(message)
session.commit();
System.out.println("Received " + message + " from: " + myQueue);
queueConsumer.close();
System.out.println("Sending message to: " + message.getJMSReplyTo());
MessageProducer producer = session.createProducer(message.getJMSReplyTo());
message = session.createMessage();
producer.send(message);
session.commit();
connection.close();
latch.countDown();

View File

@ -0,0 +1,55 @@
package jmsReplyToTopic
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.apache.activemq.artemis.tests.compatibility.GroovyRun
import javax.jms.*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
try {
cf.setEnable1xPrefixes(true);
} catch (Throwable totallyIgnored) {
// older versions will not have this method, dont even bother about seeing the stack trace or exception
}
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
connection.start();
Queue myQueue = session.createQueue("myQueue");
Topic replyTopic = session.createTopic("myReplyTopic");
MessageConsumer consumer = session.createConsumer(replyTopic);
MessageProducer queueProducer = session.createProducer(myQueue)
queueProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
Message message = session.createMessage();
message.setJMSReplyTo(replyTopic);
System.out.println("Sending " + message + " to: " + myQueue);
queueProducer.send(message);
session.commit();
System.out.println("Receiving message from: " + replyTopic);
message = consumer.receive(10000);
GroovyRun.assertNotNull(message);
session.commit();
System.out.println("Received message: " + message);
connection.close();
senderLatch.countDown();

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.compatibility;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase;
import org.apache.activemq.artemis.utils.FileUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
@RunWith(Parameterized.class)
public class JmsReplyToQueueTest extends VersionedBase {
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
public static Collection getParameters() {
List<Object[]> combinations = new ArrayList<>();
combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, SNAPSHOT});
combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, ONE_FIVE});
combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, SNAPSHOT});
return combinations;
}
public JmsReplyToQueueTest(String server, String sender, String receiver) throws Exception {
super(server, sender, receiver);
}
@Before
public void setUp() throws Throwable {
FileUtil.deleteDirectory(serverFolder.getRoot());
}
@After
public void stopTest() throws Exception {
execute(serverClassloader, "server.stop()");
}
@Test
public void testJmsReplyToQueue() throws Throwable {
evaluate(serverClassloader, "jmsReplyToQueue/artemisServer.groovy", serverFolder.getRoot().getAbsolutePath(), server);
CountDownLatch consumerCreated = new CountDownLatch(1);
CountDownLatch receiverLatch = new CountDownLatch(1);
CountDownLatch senderLatch = new CountDownLatch(1);
setVariable(receiverClassloader, "latch", receiverLatch);
setVariable(receiverClassloader, "consumerCreated", consumerCreated);
AtomicInteger errors = new AtomicInteger(0);
Thread t1 = new Thread() {
@Override
public void run() {
try {
evaluate(receiverClassloader, "jmsReplyToQueue/receiveMessages.groovy", receiver);
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
}
};
t1.start();
Assert.assertTrue(consumerCreated.await(10, TimeUnit.SECONDS));
setVariable(senderClassloader, "senderLatch", senderLatch);
Thread t2 = new Thread() {
@Override
public void run() {
try {
evaluate(senderClassloader, "jmsReplyToQueue/sendMessagesAddress.groovy", sender);
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
}
};
t2.start();
try {
Assert.assertTrue("Sender did not get message from queue", senderLatch.await(10, TimeUnit.SECONDS));
Assert.assertTrue("Receiver did not receive messages", receiverLatch.await(10, TimeUnit.SECONDS));
} finally {
t1.join(TimeUnit.SECONDS.toMillis(1));
t2.join(TimeUnit.SECONDS.toMillis(1));
if (t1.isAlive()) {
t1.interrupt();
}
if (t2.isAlive()) {
t2.interrupt();
}
}
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.compatibility;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase;
import org.apache.activemq.artemis.utils.FileUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
@RunWith(Parameterized.class)
public class JmsReplyToTempQueueTest extends VersionedBase {
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
public static Collection getParameters() {
List<Object[]> combinations = new ArrayList<>();
combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, SNAPSHOT});
combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, ONE_FIVE});
combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, SNAPSHOT});
return combinations;
}
public JmsReplyToTempQueueTest(String server, String sender, String receiver) throws Exception {
super(server, sender, receiver);
}
@Before
public void setUp() throws Throwable {
FileUtil.deleteDirectory(serverFolder.getRoot());
}
@After
public void stopTest() throws Exception {
execute(serverClassloader, "server.stop()");
}
@Test
public void testJmsReplyToTempQueue() throws Throwable {
evaluate(serverClassloader, "jmsReplyToTempQueue/artemisServer.groovy", serverFolder.getRoot().getAbsolutePath(), server);
CountDownLatch consumerCreated = new CountDownLatch(1);
CountDownLatch receiverLatch = new CountDownLatch(1);
CountDownLatch senderLatch = new CountDownLatch(1);
setVariable(receiverClassloader, "latch", receiverLatch);
setVariable(receiverClassloader, "consumerCreated", consumerCreated);
AtomicInteger errors = new AtomicInteger(0);
Thread t1 = new Thread() {
@Override
public void run() {
try {
evaluate(receiverClassloader, "jmsReplyToTempQueue/receiveMessages.groovy", receiver);
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
}
};
t1.start();
Assert.assertTrue(consumerCreated.await(10, TimeUnit.SECONDS));
setVariable(senderClassloader, "senderLatch", senderLatch);
Thread t2 = new Thread() {
@Override
public void run() {
try {
evaluate(senderClassloader, "jmsReplyToTempQueue/sendMessagesAddress.groovy", sender);
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
}
};
t2.start();
try {
Assert.assertTrue("Sender did not get message from temporary queue", senderLatch.await(10, TimeUnit.SECONDS));
Assert.assertTrue("Receiver did not receive messages", receiverLatch.await(10, TimeUnit.SECONDS));
} finally {
t1.join(TimeUnit.SECONDS.toMillis(1));
t2.join(TimeUnit.SECONDS.toMillis(1));
if (t1.isAlive()) {
t1.interrupt();
}
if (t2.isAlive()) {
t2.interrupt();
}
}
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.compatibility;
import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase;
import org.apache.activemq.artemis.utils.FileUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
@RunWith(Parameterized.class)
public class JmsReplyToTempTopicTest extends VersionedBase {
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
public static Collection getParameters() {
List<Object[]> combinations = new ArrayList<>();
combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, SNAPSHOT});
combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, ONE_FIVE});
combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, SNAPSHOT});
return combinations;
}
public JmsReplyToTempTopicTest(String server, String sender, String receiver) throws Exception {
super(server, sender, receiver);
}
@Before
public void setUp() throws Throwable {
FileUtil.deleteDirectory(serverFolder.getRoot());
}
@After
public void stopTest() throws Exception {
execute(serverClassloader, "server.stop()");
}
@Test
public void testJmsReplyToTempTopic() throws Throwable {
evaluate(serverClassloader, "jmsReplyToTempTopic/artemisServer.groovy", serverFolder.getRoot().getAbsolutePath(), server);
CountDownLatch consumerCreated = new CountDownLatch(1);
CountDownLatch receiverLatch = new CountDownLatch(1);
CountDownLatch senderLatch = new CountDownLatch(1);
setVariable(receiverClassloader, "latch", receiverLatch);
setVariable(receiverClassloader, "consumerCreated", consumerCreated);
AtomicInteger errors = new AtomicInteger(0);
Thread t1 = new Thread() {
@Override
public void run() {
try {
evaluate(receiverClassloader, "jmsReplyToTempTopic/receiveMessages.groovy", receiver);
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
}
};
t1.start();
Assert.assertTrue(consumerCreated.await(10, TimeUnit.SECONDS));
setVariable(senderClassloader, "senderLatch", senderLatch);
Thread t2 = new Thread() {
@Override
public void run() {
try {
evaluate(senderClassloader, "jmsReplyToTempTopic/sendMessagesAddress.groovy", sender);
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
}
};
t2.start();
try {
Assert.assertTrue("Sender did not get message from temporary topic", senderLatch.await(10, TimeUnit.SECONDS));
Assert.assertTrue("Receiver did not receive messages", receiverLatch.await(10, TimeUnit.SECONDS));
} finally {
t1.join(TimeUnit.SECONDS.toMillis(1));
t2.join(TimeUnit.SECONDS.toMillis(1));
if (t1.isAlive()) {
t1.interrupt();
}
if (t2.isAlive()) {
t2.interrupt();
}
}
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.compatibility;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase;
import org.apache.activemq.artemis.utils.FileUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE;
import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT;
@RunWith(Parameterized.class)
public class JmsReplyToTopicTest extends VersionedBase {
@Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}")
public static Collection getParameters() {
List<Object[]> combinations = new ArrayList<>();
combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, SNAPSHOT});
combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, ONE_FIVE});
combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, SNAPSHOT});
return combinations;
}
public JmsReplyToTopicTest(String server, String sender, String receiver) throws Exception {
super(server, sender, receiver);
}
@Before
public void setUp() throws Throwable {
FileUtil.deleteDirectory(serverFolder.getRoot());
}
@After
public void stopTest() throws Exception {
execute(serverClassloader, "server.stop()");
}
@Test
public void testJmsReplyToTopic() throws Throwable {
evaluate(serverClassloader, "jmsReplyToTopic/artemisServer.groovy", serverFolder.getRoot().getAbsolutePath(), server);
CountDownLatch consumerCreated = new CountDownLatch(1);
CountDownLatch receiverLatch = new CountDownLatch(1);
CountDownLatch senderLatch = new CountDownLatch(1);
setVariable(receiverClassloader, "latch", receiverLatch);
setVariable(receiverClassloader, "consumerCreated", consumerCreated);
AtomicInteger errors = new AtomicInteger(0);
Thread t1 = new Thread() {
@Override
public void run() {
try {
evaluate(receiverClassloader, "jmsReplyToTopic/receiveMessages.groovy", receiver);
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
}
};
t1.start();
Assert.assertTrue(consumerCreated.await(10, TimeUnit.SECONDS));
setVariable(senderClassloader, "senderLatch", senderLatch);
Thread t2 = new Thread() {
@Override
public void run() {
try {
evaluate(senderClassloader, "jmsReplyToTopic/sendMessagesAddress.groovy", sender);
} catch (Throwable e) {
e.printStackTrace();
errors.incrementAndGet();
}
}
};
t2.start();
try {
Assert.assertTrue("Sender did not get message from topic", senderLatch.await(10, TimeUnit.SECONDS));
Assert.assertTrue("Receiver did not receive messages", receiverLatch.await(10, TimeUnit.SECONDS));
} finally {
t1.join(TimeUnit.SECONDS.toMillis(1));
t2.join(TimeUnit.SECONDS.toMillis(1));
if (t1.isAlive()) {
t1.interrupt();
}
if (t2.isAlive()) {
t2.interrupt();
}
}
}
}