diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java index f6223a658f..5041b575ec 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java @@ -68,7 +68,7 @@ import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQCompatibleMes import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQMapCompatibleMessage; import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQObjectCompatibleMessage; import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQStreamCompatibleMessage; -import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQTextCompabileMessage; +import org.apache.activemq.artemis.jms.client.compatible1X.ActiveMQTextCompatibleMessage; import org.apache.activemq.artemis.selector.filter.FilterException; import org.apache.activemq.artemis.selector.impl.SelectorParser; import org.apache.activemq.artemis.utils.CompositeAddress; @@ -234,7 +234,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { ActiveMQTextMessage msg; if (enable1xPrefixes) { - msg = new ActiveMQTextCompabileMessage(session); + msg = new ActiveMQTextCompatibleMessage(session); } else { msg = new ActiveMQTextMessage(session); } @@ -249,7 +249,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { ActiveMQTextMessage msg; if (enable1xPrefixes) { - msg = new ActiveMQTextCompabileMessage(session); + msg = new ActiveMQTextCompatibleMessage(session); } else { msg = new ActiveMQTextMessage(session); } diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQCompatibleMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQCompatibleMessage.java index ec4e720d07..9248e8e415 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQCompatibleMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQCompatibleMessage.java @@ -190,7 +190,7 @@ public class ActiveMQCompatibleMessage extends ActiveMQMessage { } case ActiveMQTextMessage.TYPE: // 3 { - msg = new ActiveMQTextCompabileMessage(message, session); + msg = new ActiveMQTextCompatibleMessage(message, session); break; } default: { diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQTextCompabileMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQTextCompatibleMessage.java similarity index 75% rename from artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQTextCompabileMessage.java rename to artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQTextCompatibleMessage.java index ae8aa52213..4b0d09ca9b 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQTextCompabileMessage.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/compatible1X/ActiveMQTextCompatibleMessage.java @@ -21,11 +21,12 @@ import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.TextMessage; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage; -public class ActiveMQTextCompabileMessage extends ActiveMQTextMessage { +public class ActiveMQTextCompatibleMessage extends ActiveMQTextMessage { @Override public void setJMSReplyTo(Destination dest) throws JMSException { @@ -40,15 +41,21 @@ public class ActiveMQTextCompabileMessage extends ActiveMQTextMessage { return replyTo; } - public ActiveMQTextCompabileMessage(ClientSession session) { + public ActiveMQTextCompatibleMessage(ClientSession session) { super(session); } - public ActiveMQTextCompabileMessage(ClientMessage message, ClientSession session) { + public ActiveMQTextCompatibleMessage(ClientMessage message, ClientSession session) { super(message, session); } - public ActiveMQTextCompabileMessage(TextMessage foreign, ClientSession session) throws JMSException { + public ActiveMQTextCompatibleMessage(TextMessage foreign, ClientSession session) throws JMSException { super(foreign, session); } + + + @Override + protected SimpleString checkPrefix(SimpleString address) { + return ActiveMQCompatibleMessage.checkPrefix1X(address); + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 6730b15c4f..f32c013beb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -347,7 +347,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { case CREATE_QUEUE: { CreateQueueMessage request = (CreateQueueMessage) packet; requiresResponse = request.isRequiresResponse(); - session.createQueue(request.getAddress(), request.getQueueName(), RoutingType.MULTICAST, request.getFilterString(), request.isTemporary(), request.isDurable()); + session.createQueue(request.getAddress(), request.getQueueName(), getRoutingTypeFromAddress(request.getAddress()), request.getFilterString(), request.isTemporary(), request.isDurable()); if (requiresResponse) { response = createNullResponseMessage(packet); } @@ -634,6 +634,14 @@ public class ServerSessionPacketHandler implements ChannelHandler { } } + private RoutingType getRoutingTypeFromAddress(SimpleString address) { + if (address.startsWith(PacketImpl.OLD_QUEUE_PREFIX) || address.startsWith(PacketImpl.OLD_TEMP_QUEUE_PREFIX)) { + return RoutingType.ANYCAST; + } + return RoutingType.MULTICAST; + } + + private Packet createNullResponseMessage(Packet packet) { final Packet response; if (!packet.isResponseAsync() || channel.getConnection().isVersionBeforeAsyncResponseChange()) { diff --git a/tests/compatibility-tests/src/main/resources/jmsReplyToQueue/artemisServer.groovy b/tests/compatibility-tests/src/main/resources/jmsReplyToQueue/artemisServer.groovy new file mode 100644 index 0000000000..444eaf179c --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/jmsReplyToQueue/artemisServer.groovy @@ -0,0 +1,53 @@ +package jmsReplyToQueue + +import org.apache.activemq.artemis.api.core.RoutingType +import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl +import org.apache.activemq.artemis.core.server.JournalType + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// starts an artemis server +import org.apache.activemq.artemis.core.server.impl.AddressInfo +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS + +String folder = arg[0]; +String queueAddress = "jms.queue.myQueue"; +String replyQueueAddress = "jms.queue.myReplyQueue"; + +configuration = new ConfigurationImpl(); +configuration.setJournalType(JournalType.NIO); +configuration.setBrokerInstance(new File(folder + "/server")); +configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616"); +configuration.setSecurityEnabled(false); +configuration.setPersistenceEnabled(false); + + +jmsConfiguration = new JMSConfigurationImpl(); + +server = new EmbeddedJMS(); +server.setConfiguration(configuration); +server.setJmsConfiguration(jmsConfiguration); +server.start(); + +server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST)); +server.getActiveMQServer().createQueue(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST, SimpleString.toSimpleString(queueAddress), null, true, false); + +server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(replyQueueAddress), RoutingType.ANYCAST)); +server.getActiveMQServer().createQueue(SimpleString.toSimpleString(replyQueueAddress), RoutingType.ANYCAST, SimpleString.toSimpleString(replyQueueAddress), null, true, false); diff --git a/tests/compatibility-tests/src/main/resources/jmsReplyToQueue/receiveMessages.groovy b/tests/compatibility-tests/src/main/resources/jmsReplyToQueue/receiveMessages.groovy new file mode 100644 index 0000000000..2eb9fc5b17 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/jmsReplyToQueue/receiveMessages.groovy @@ -0,0 +1,55 @@ +package jmsReplyToQueue + +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +import javax.jms.* + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); +try { + cf.setEnable1xPrefixes(true); +} catch (Throwable totallyIgnored) { + // older versions will not have this method, dont even bother about seeing the stack trace or exception +} +Connection connection = cf.createConnection(); +Session session = connection.createSession(true, Session.SESSION_TRANSACTED); +Queue myQueue = session.createQueue("myQueue"); +MessageConsumer queueConsumer = session.createConsumer(myQueue); +consumerCreated.countDown(); +connection.start() + +Message message = queueConsumer.receive(5000); +GroovyRun.assertNotNull(message) +session.commit(); +System.out.println("Received " + message + " from: " + myQueue); +queueConsumer.close(); + +System.out.println("Sending message to: " + message.getJMSReplyTo()); +MessageProducer producer = session.createProducer(message.getJMSReplyTo()); +message = session.createMessage(); +producer.send(message); +session.commit(); + +connection.close(); + +latch.countDown(); + + + diff --git a/tests/compatibility-tests/src/main/resources/jmsReplyToQueue/sendMessagesAddress.groovy b/tests/compatibility-tests/src/main/resources/jmsReplyToQueue/sendMessagesAddress.groovy new file mode 100644 index 0000000000..fd6baf86e5 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/jmsReplyToQueue/sendMessagesAddress.groovy @@ -0,0 +1,55 @@ +package jmsReplyToQueue + +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +import javax.jms.* + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); +try { + cf.setEnable1xPrefixes(true); +} catch (Throwable totallyIgnored) { + // older versions will not have this method, dont even bother about seeing the stack trace or exception +} +Connection connection = cf.createConnection(); +Session session = connection.createSession(true, Session.SESSION_TRANSACTED); +connection.start(); + +Queue myQueue = session.createQueue("myQueue"); +Queue temporaryQueue = session.createQueue("myTemporaryQueue"); +MessageConsumer consumer = session.createConsumer(temporaryQueue); + +MessageProducer queueProducer = session.createProducer(myQueue) + +queueProducer.setDeliveryMode(DeliveryMode.PERSISTENT); +Message message = session.createMessage(); +message.setJMSReplyTo(temporaryQueue); +System.out.println("Sending " + message + " to: " + myQueue); +queueProducer.send(message); +session.commit(); + +System.out.println("Receiving message from: " + temporaryQueue); +message = consumer.receive(10000); +GroovyRun.assertNotNull(message); +session.commit(); +System.out.println("Received message: " + message); + +connection.close(); +senderLatch.countDown(); \ No newline at end of file diff --git a/tests/compatibility-tests/src/main/resources/jmsReplyToTempQueue/artemisServer.groovy b/tests/compatibility-tests/src/main/resources/jmsReplyToTempQueue/artemisServer.groovy new file mode 100644 index 0000000000..2b06830f1c --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/jmsReplyToTempQueue/artemisServer.groovy @@ -0,0 +1,49 @@ +package jmsReplyToTempQueue + +import org.apache.activemq.artemis.api.core.RoutingType +import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl +import org.apache.activemq.artemis.core.server.JournalType + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// starts an artemis server +import org.apache.activemq.artemis.core.server.impl.AddressInfo +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS + +String folder = arg[0]; +String queueAddress = "jms.queue.myQueue"; + +configuration = new ConfigurationImpl(); +configuration.setJournalType(JournalType.NIO); +configuration.setBrokerInstance(new File(folder + "/server")); +configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616"); +configuration.setSecurityEnabled(false); +configuration.setPersistenceEnabled(false); + + +jmsConfiguration = new JMSConfigurationImpl(); + +server = new EmbeddedJMS(); +server.setConfiguration(configuration); +server.setJmsConfiguration(jmsConfiguration); +server.start(); + +server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST)); +server.getActiveMQServer().createQueue(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST, SimpleString.toSimpleString(queueAddress), null, true, false); diff --git a/tests/compatibility-tests/src/main/resources/jmsReplyToTempQueue/receiveMessages.groovy b/tests/compatibility-tests/src/main/resources/jmsReplyToTempQueue/receiveMessages.groovy new file mode 100644 index 0000000000..a7a115789c --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/jmsReplyToTempQueue/receiveMessages.groovy @@ -0,0 +1,55 @@ +package jmsReplyToTempQueue + +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +import javax.jms.* + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); +try { + cf.setEnable1xPrefixes(true); +} catch (Throwable totallyIgnored) { + // older versions will not have this method, dont even bother about seeing the stack trace or exception +} +Connection connection = cf.createConnection(); +Session session = connection.createSession(true, Session.SESSION_TRANSACTED); +Queue myQueue = session.createQueue("myQueue"); +MessageConsumer queueConsumer = session.createConsumer(myQueue); +consumerCreated.countDown(); +connection.start() + +Message message = queueConsumer.receive(5000); +GroovyRun.assertNotNull(message) +session.commit(); +System.out.println("Received " + message + " from: " + myQueue); +queueConsumer.close(); + +System.out.println("Sending message to: " + message.getJMSReplyTo()); +MessageProducer producer = session.createProducer(message.getJMSReplyTo()); +message = session.createMessage(); +producer.send(message); +session.commit(); + +connection.close(); + +latch.countDown(); + + + diff --git a/tests/compatibility-tests/src/main/resources/jmsReplyToTempQueue/sendMessagesAddress.groovy b/tests/compatibility-tests/src/main/resources/jmsReplyToTempQueue/sendMessagesAddress.groovy new file mode 100644 index 0000000000..968b8ad196 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/jmsReplyToTempQueue/sendMessagesAddress.groovy @@ -0,0 +1,55 @@ +package jmsReplyToTempQueue + +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +import javax.jms.* + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); +try { + cf.setEnable1xPrefixes(true); +} catch (Throwable totallyIgnored) { + // older versions will not have this method, dont even bother about seeing the stack trace or exception +} +Connection connection = cf.createConnection(); +Session session = connection.createSession(true, Session.SESSION_TRANSACTED); +connection.start(); + +Queue myQueue = session.createQueue("myQueue"); +Queue temporaryQueue = session.createTemporaryQueue(); +MessageConsumer consumer = session.createConsumer(temporaryQueue); + +MessageProducer queueProducer = session.createProducer(myQueue) + +queueProducer.setDeliveryMode(DeliveryMode.PERSISTENT); +Message message = session.createMessage(); +message.setJMSReplyTo(temporaryQueue); +System.out.println("Sending " + message + " to: " + myQueue); +queueProducer.send(message); +session.commit(); + +System.out.println("Receiving message from: " + temporaryQueue); +message = consumer.receive(10000); +GroovyRun.assertNotNull(message); +session.commit(); +System.out.println("Received message: " + message); + +connection.close(); +senderLatch.countDown(); \ No newline at end of file diff --git a/tests/compatibility-tests/src/main/resources/jmsReplyToTempTopic/artemisServer.groovy b/tests/compatibility-tests/src/main/resources/jmsReplyToTempTopic/artemisServer.groovy new file mode 100644 index 0000000000..9e85473889 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/jmsReplyToTempTopic/artemisServer.groovy @@ -0,0 +1,52 @@ +package jmsReplyToTempTopic + +import org.apache.activemq.artemis.api.core.RoutingType +import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl +import org.apache.activemq.artemis.core.server.JournalType + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// starts an artemis server +import org.apache.activemq.artemis.core.server.impl.AddressInfo +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS + +String folder = arg[0]; +String queueAddress = "jms.queue.myQueue"; +String replyTopicAddress = "jms.topic.myReplyTopic"; + +configuration = new ConfigurationImpl(); +configuration.setJournalType(JournalType.NIO); +configuration.setBrokerInstance(new File(folder + "/server")); +configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616"); +configuration.setSecurityEnabled(false); +configuration.setPersistenceEnabled(false); + + +jmsConfiguration = new JMSConfigurationImpl(); + +server = new EmbeddedJMS(); +server.setConfiguration(configuration); +server.setJmsConfiguration(jmsConfiguration); +server.start(); + +server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST)); +server.getActiveMQServer().createQueue(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST, SimpleString.toSimpleString(queueAddress), null, true, false); + +server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(replyTopicAddress), RoutingType.MULTICAST)); diff --git a/tests/compatibility-tests/src/main/resources/jmsReplyToTempTopic/receiveMessages.groovy b/tests/compatibility-tests/src/main/resources/jmsReplyToTempTopic/receiveMessages.groovy new file mode 100644 index 0000000000..0ab25bf2a7 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/jmsReplyToTempTopic/receiveMessages.groovy @@ -0,0 +1,58 @@ +package jmsReplyToTempTopic + +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +import javax.jms.* + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); +try { + cf.setEnable1xPrefixes(true); +} catch (Throwable totallyIgnored) { + // older versions will not have this method, dont even bother about seeing the stack trace or exception +} +Connection connection = cf.createConnection(); +Session session = connection.createSession(true, Session.SESSION_TRANSACTED); +Queue myQueue = session.createQueue("myQueue"); +MessageConsumer queueConsumer = session.createConsumer(myQueue); +consumerCreated.countDown(); +connection.start() + +for (int i = 0; i < 5; i++) { + Message message = queueConsumer.receive(5000); + GroovyRun.assertNotNull(message) + System.out.println("Received " + message + " from: " + myQueue); + + GroovyRun.assertEquals("myQueue", ((Queue)message.getJMSDestination()).getQueueName()); + + System.out.println("Sending message to: " + message.getJMSReplyTo()); + MessageProducer producer = session.createProducer(message.getJMSReplyTo()); + message = session.createMessage(); + producer.send(message); +} +queueConsumer.close(); +session.commit(); + +connection.close(); + +latch.countDown(); + + + diff --git a/tests/compatibility-tests/src/main/resources/jmsReplyToTempTopic/sendMessagesAddress.groovy b/tests/compatibility-tests/src/main/resources/jmsReplyToTempTopic/sendMessagesAddress.groovy new file mode 100644 index 0000000000..97b9fedbf0 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/jmsReplyToTempTopic/sendMessagesAddress.groovy @@ -0,0 +1,71 @@ +package jmsReplyToTempTopic + +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +import javax.jms.* + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); +try { + cf.setEnable1xPrefixes(true); +} catch (Throwable totallyIgnored) { + // older versions will not have this method, dont even bother about seeing the stack trace or exception +} +Connection connection = cf.createConnection(); +Session session = connection.createSession(true, Session.SESSION_TRANSACTED); +connection.start(); + +Queue myQueue = session.createQueue("myQueue"); + +System.out.println("myQueue::" + myQueue); +TemporaryTopic replyTopic = session.createTemporaryTopic(); +MessageConsumer consumer = session.createConsumer(replyTopic); + +System.out.println("Temporary Topic " + replyTopic); + +MessageProducer queueProducer = session.createProducer(myQueue) + +queueProducer.setDeliveryMode(DeliveryMode.PERSISTENT); +sendMessage(session.createTextMessage("hello"), replyTopic, myQueue, queueProducer); +sendMessage(session.createMapMessage(), replyTopic, myQueue, queueProducer); +sendMessage(session.createObjectMessage(), replyTopic, myQueue, queueProducer); +sendMessage(session.createStreamMessage(), replyTopic, myQueue, queueProducer); +sendMessage(session.createMessage(), replyTopic, myQueue, queueProducer); +session.commit(); + + +System.out.println("Receiving message from: " + replyTopic); +for (int i = 0; i < 5; i++) { + message = consumer.receive(10000); + GroovyRun.assertNotNull(message); +} +GroovyRun.assertNull(consumer.receiveNoWait()); +session.commit(); +System.out.println("Received message: " + message); + +connection.close(); +senderLatch.countDown(); + + +void sendMessage(Message message, TemporaryTopic replyTopic, Queue myQueue, MessageProducer queueProducer) { + message.setJMSReplyTo(replyTopic); + System.out.println("Sending " + message + " to: " + myQueue); + queueProducer.send(message); +} \ No newline at end of file diff --git a/tests/compatibility-tests/src/main/resources/jmsReplyToTopic/artemisServer.groovy b/tests/compatibility-tests/src/main/resources/jmsReplyToTopic/artemisServer.groovy new file mode 100644 index 0000000000..37a6aa0c95 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/jmsReplyToTopic/artemisServer.groovy @@ -0,0 +1,49 @@ +package jmsReplyToTopic + +import org.apache.activemq.artemis.api.core.RoutingType +import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl +import org.apache.activemq.artemis.core.server.JournalType + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// starts an artemis server +import org.apache.activemq.artemis.core.server.impl.AddressInfo +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS + +String folder = arg[0]; +String queueAddress = "jms.queue.myQueue"; + +configuration = new ConfigurationImpl(); +configuration.setJournalType(JournalType.NIO); +configuration.setBrokerInstance(new File(folder + "/server")); +configuration.addAcceptorConfiguration("artemis", "tcp://0.0.0.0:61616"); +configuration.setSecurityEnabled(false); +configuration.setPersistenceEnabled(false); + + +jmsConfiguration = new JMSConfigurationImpl(); + +server = new EmbeddedJMS(); +server.setConfiguration(configuration); +server.setJmsConfiguration(jmsConfiguration); +server.start(); + +server.getActiveMQServer().addAddressInfo(new AddressInfo(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST)); +server.getActiveMQServer().createQueue(SimpleString.toSimpleString(queueAddress), RoutingType.ANYCAST, SimpleString.toSimpleString(queueAddress), null, true, false); diff --git a/tests/compatibility-tests/src/main/resources/jmsReplyToTopic/receiveMessages.groovy b/tests/compatibility-tests/src/main/resources/jmsReplyToTopic/receiveMessages.groovy new file mode 100644 index 0000000000..6883f4e0cc --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/jmsReplyToTopic/receiveMessages.groovy @@ -0,0 +1,55 @@ +package jmsReplyToTopic + +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +import javax.jms.* + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); +try { + cf.setEnable1xPrefixes(true); +} catch (Throwable totallyIgnored) { + // older versions will not have this method, dont even bother about seeing the stack trace or exception +} +Connection connection = cf.createConnection(); +Session session = connection.createSession(true, Session.SESSION_TRANSACTED); +Queue myQueue = session.createQueue("myQueue"); +MessageConsumer queueConsumer = session.createConsumer(myQueue); +consumerCreated.countDown(); +connection.start() + +Message message = queueConsumer.receive(5000); +GroovyRun.assertNotNull(message) +session.commit(); +System.out.println("Received " + message + " from: " + myQueue); +queueConsumer.close(); + +System.out.println("Sending message to: " + message.getJMSReplyTo()); +MessageProducer producer = session.createProducer(message.getJMSReplyTo()); +message = session.createMessage(); +producer.send(message); +session.commit(); + +connection.close(); + +latch.countDown(); + + + diff --git a/tests/compatibility-tests/src/main/resources/jmsReplyToTopic/sendMessagesAddress.groovy b/tests/compatibility-tests/src/main/resources/jmsReplyToTopic/sendMessagesAddress.groovy new file mode 100644 index 0000000000..02f6137502 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/jmsReplyToTopic/sendMessagesAddress.groovy @@ -0,0 +1,55 @@ +package jmsReplyToTopic + +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +import javax.jms.* + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); +try { + cf.setEnable1xPrefixes(true); +} catch (Throwable totallyIgnored) { + // older versions will not have this method, dont even bother about seeing the stack trace or exception +} +Connection connection = cf.createConnection(); +Session session = connection.createSession(true, Session.SESSION_TRANSACTED); +connection.start(); + +Queue myQueue = session.createQueue("myQueue"); +Topic replyTopic = session.createTopic("myReplyTopic"); +MessageConsumer consumer = session.createConsumer(replyTopic); + +MessageProducer queueProducer = session.createProducer(myQueue) + +queueProducer.setDeliveryMode(DeliveryMode.PERSISTENT); +Message message = session.createMessage(); +message.setJMSReplyTo(replyTopic); +System.out.println("Sending " + message + " to: " + myQueue); +queueProducer.send(message); +session.commit(); + +System.out.println("Receiving message from: " + replyTopic); +message = consumer.receive(10000); +GroovyRun.assertNotNull(message); +session.commit(); +System.out.println("Received message: " + message); + +connection.close(); +senderLatch.countDown(); \ No newline at end of file diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JmsReplyToQueueTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JmsReplyToQueueTest.java new file mode 100644 index 0000000000..393054490c --- /dev/null +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JmsReplyToQueueTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.compatibility; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase; +import org.apache.activemq.artemis.utils.FileUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; + +@RunWith(Parameterized.class) +public class JmsReplyToQueueTest extends VersionedBase { + + @Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}") + public static Collection getParameters() { + List combinations = new ArrayList<>(); + combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, SNAPSHOT}); + combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, ONE_FIVE}); + combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, SNAPSHOT}); + return combinations; + } + + public JmsReplyToQueueTest(String server, String sender, String receiver) throws Exception { + super(server, sender, receiver); + } + + + @Before + public void setUp() throws Throwable { + FileUtil.deleteDirectory(serverFolder.getRoot()); + } + + @After + public void stopTest() throws Exception { + execute(serverClassloader, "server.stop()"); + } + + @Test + public void testJmsReplyToQueue() throws Throwable { + evaluate(serverClassloader, "jmsReplyToQueue/artemisServer.groovy", serverFolder.getRoot().getAbsolutePath(), server); + + CountDownLatch consumerCreated = new CountDownLatch(1); + CountDownLatch receiverLatch = new CountDownLatch(1); + CountDownLatch senderLatch = new CountDownLatch(1); + + setVariable(receiverClassloader, "latch", receiverLatch); + setVariable(receiverClassloader, "consumerCreated", consumerCreated); + + AtomicInteger errors = new AtomicInteger(0); + Thread t1 = new Thread() { + @Override + public void run() { + try { + evaluate(receiverClassloader, "jmsReplyToQueue/receiveMessages.groovy", receiver); + } catch (Throwable e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + } + }; + t1.start(); + + Assert.assertTrue(consumerCreated.await(10, TimeUnit.SECONDS)); + + setVariable(senderClassloader, "senderLatch", senderLatch); + Thread t2 = new Thread() { + @Override + public void run() { + try { + evaluate(senderClassloader, "jmsReplyToQueue/sendMessagesAddress.groovy", sender); + } catch (Throwable e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + } + }; + t2.start(); + + try { + Assert.assertTrue("Sender did not get message from queue", senderLatch.await(10, TimeUnit.SECONDS)); + Assert.assertTrue("Receiver did not receive messages", receiverLatch.await(10, TimeUnit.SECONDS)); + } finally { + + t1.join(TimeUnit.SECONDS.toMillis(1)); + t2.join(TimeUnit.SECONDS.toMillis(1)); + + if (t1.isAlive()) { + t1.interrupt(); + } + + if (t2.isAlive()) { + t2.interrupt(); + } + } + + } + +} \ No newline at end of file diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JmsReplyToTempQueueTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JmsReplyToTempQueueTest.java new file mode 100644 index 0000000000..3bb5cc0a1c --- /dev/null +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JmsReplyToTempQueueTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.compatibility; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase; +import org.apache.activemq.artemis.utils.FileUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; + +@RunWith(Parameterized.class) +public class JmsReplyToTempQueueTest extends VersionedBase { + + @Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}") + public static Collection getParameters() { + List combinations = new ArrayList<>(); + combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, SNAPSHOT}); + combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, ONE_FIVE}); + combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, SNAPSHOT}); + return combinations; + } + + public JmsReplyToTempQueueTest(String server, String sender, String receiver) throws Exception { + super(server, sender, receiver); + } + + + @Before + public void setUp() throws Throwable { + FileUtil.deleteDirectory(serverFolder.getRoot()); + } + + @After + public void stopTest() throws Exception { + execute(serverClassloader, "server.stop()"); + } + + @Test + public void testJmsReplyToTempQueue() throws Throwable { + evaluate(serverClassloader, "jmsReplyToTempQueue/artemisServer.groovy", serverFolder.getRoot().getAbsolutePath(), server); + + CountDownLatch consumerCreated = new CountDownLatch(1); + CountDownLatch receiverLatch = new CountDownLatch(1); + CountDownLatch senderLatch = new CountDownLatch(1); + + setVariable(receiverClassloader, "latch", receiverLatch); + setVariable(receiverClassloader, "consumerCreated", consumerCreated); + + AtomicInteger errors = new AtomicInteger(0); + Thread t1 = new Thread() { + @Override + public void run() { + try { + evaluate(receiverClassloader, "jmsReplyToTempQueue/receiveMessages.groovy", receiver); + } catch (Throwable e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + } + }; + t1.start(); + + Assert.assertTrue(consumerCreated.await(10, TimeUnit.SECONDS)); + + setVariable(senderClassloader, "senderLatch", senderLatch); + Thread t2 = new Thread() { + @Override + public void run() { + try { + evaluate(senderClassloader, "jmsReplyToTempQueue/sendMessagesAddress.groovy", sender); + } catch (Throwable e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + } + }; + t2.start(); + + try { + Assert.assertTrue("Sender did not get message from temporary queue", senderLatch.await(10, TimeUnit.SECONDS)); + Assert.assertTrue("Receiver did not receive messages", receiverLatch.await(10, TimeUnit.SECONDS)); + } finally { + + t1.join(TimeUnit.SECONDS.toMillis(1)); + t2.join(TimeUnit.SECONDS.toMillis(1)); + + if (t1.isAlive()) { + t1.interrupt(); + } + + if (t2.isAlive()) { + t2.interrupt(); + } + } + + } + +} \ No newline at end of file diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JmsReplyToTempTopicTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JmsReplyToTempTopicTest.java new file mode 100644 index 0000000000..1efa3de5d9 --- /dev/null +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JmsReplyToTempTopicTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.compatibility; + +import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase; +import org.apache.activemq.artemis.utils.FileUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; + +@RunWith(Parameterized.class) +public class JmsReplyToTempTopicTest extends VersionedBase { + + @Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}") + public static Collection getParameters() { + List combinations = new ArrayList<>(); + combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, SNAPSHOT}); + combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, ONE_FIVE}); + combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, SNAPSHOT}); + return combinations; + } + + public JmsReplyToTempTopicTest(String server, String sender, String receiver) throws Exception { + super(server, sender, receiver); + } + + + @Before + public void setUp() throws Throwable { + FileUtil.deleteDirectory(serverFolder.getRoot()); + } + + @After + public void stopTest() throws Exception { + execute(serverClassloader, "server.stop()"); + } + + @Test + public void testJmsReplyToTempTopic() throws Throwable { + evaluate(serverClassloader, "jmsReplyToTempTopic/artemisServer.groovy", serverFolder.getRoot().getAbsolutePath(), server); + + CountDownLatch consumerCreated = new CountDownLatch(1); + CountDownLatch receiverLatch = new CountDownLatch(1); + CountDownLatch senderLatch = new CountDownLatch(1); + + setVariable(receiverClassloader, "latch", receiverLatch); + setVariable(receiverClassloader, "consumerCreated", consumerCreated); + + AtomicInteger errors = new AtomicInteger(0); + Thread t1 = new Thread() { + @Override + public void run() { + try { + evaluate(receiverClassloader, "jmsReplyToTempTopic/receiveMessages.groovy", receiver); + } catch (Throwable e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + } + }; + t1.start(); + + Assert.assertTrue(consumerCreated.await(10, TimeUnit.SECONDS)); + + setVariable(senderClassloader, "senderLatch", senderLatch); + Thread t2 = new Thread() { + @Override + public void run() { + try { + evaluate(senderClassloader, "jmsReplyToTempTopic/sendMessagesAddress.groovy", sender); + } catch (Throwable e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + } + }; + t2.start(); + + try { + Assert.assertTrue("Sender did not get message from temporary topic", senderLatch.await(10, TimeUnit.SECONDS)); + Assert.assertTrue("Receiver did not receive messages", receiverLatch.await(10, TimeUnit.SECONDS)); + } finally { + + t1.join(TimeUnit.SECONDS.toMillis(1)); + t2.join(TimeUnit.SECONDS.toMillis(1)); + + if (t1.isAlive()) { + t1.interrupt(); + } + + if (t2.isAlive()) { + t2.interrupt(); + } + } + + } + +} \ No newline at end of file diff --git a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JmsReplyToTopicTest.java b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JmsReplyToTopicTest.java new file mode 100644 index 0000000000..3d2406b7ae --- /dev/null +++ b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/JmsReplyToTopicTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.compatibility; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.tests.compatibility.base.VersionedBase; +import org.apache.activemq.artemis.utils.FileUtil; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.ONE_FIVE; +import static org.apache.activemq.artemis.tests.compatibility.GroovyRun.SNAPSHOT; + +@RunWith(Parameterized.class) +public class JmsReplyToTopicTest extends VersionedBase { + + @Parameterized.Parameters(name = "server={0}, producer={1}, consumer={2}") + public static Collection getParameters() { + List combinations = new ArrayList<>(); + combinations.add(new Object[]{SNAPSHOT, ONE_FIVE, SNAPSHOT}); + combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, ONE_FIVE}); + combinations.add(new Object[]{SNAPSHOT, SNAPSHOT, SNAPSHOT}); + return combinations; + } + + public JmsReplyToTopicTest(String server, String sender, String receiver) throws Exception { + super(server, sender, receiver); + } + + + @Before + public void setUp() throws Throwable { + FileUtil.deleteDirectory(serverFolder.getRoot()); + } + + @After + public void stopTest() throws Exception { + execute(serverClassloader, "server.stop()"); + } + + @Test + public void testJmsReplyToTopic() throws Throwable { + evaluate(serverClassloader, "jmsReplyToTopic/artemisServer.groovy", serverFolder.getRoot().getAbsolutePath(), server); + + CountDownLatch consumerCreated = new CountDownLatch(1); + CountDownLatch receiverLatch = new CountDownLatch(1); + CountDownLatch senderLatch = new CountDownLatch(1); + + setVariable(receiverClassloader, "latch", receiverLatch); + setVariable(receiverClassloader, "consumerCreated", consumerCreated); + + AtomicInteger errors = new AtomicInteger(0); + Thread t1 = new Thread() { + @Override + public void run() { + try { + evaluate(receiverClassloader, "jmsReplyToTopic/receiveMessages.groovy", receiver); + } catch (Throwable e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + } + }; + t1.start(); + + Assert.assertTrue(consumerCreated.await(10, TimeUnit.SECONDS)); + + setVariable(senderClassloader, "senderLatch", senderLatch); + Thread t2 = new Thread() { + @Override + public void run() { + try { + evaluate(senderClassloader, "jmsReplyToTopic/sendMessagesAddress.groovy", sender); + } catch (Throwable e) { + e.printStackTrace(); + errors.incrementAndGet(); + } + } + }; + t2.start(); + + try { + Assert.assertTrue("Sender did not get message from topic", senderLatch.await(10, TimeUnit.SECONDS)); + Assert.assertTrue("Receiver did not receive messages", receiverLatch.await(10, TimeUnit.SECONDS)); + } finally { + + t1.join(TimeUnit.SECONDS.toMillis(1)); + t2.join(TimeUnit.SECONDS.toMillis(1)); + + if (t1.isAlive()) { + t1.interrupt(); + } + + if (t2.isAlive()) { + t2.interrupt(); + } + } + + } + +} \ No newline at end of file