From d9d9699732e4c9de0d9f8f009a4b1fef26b37589 Mon Sep 17 00:00:00 2001 From: Michael Andre Pearce Date: Mon, 5 Jun 2017 13:45:25 +0100 Subject: [PATCH] ARTEMIS-1205: AMQP Shared Durable Subscriber incorrect behaviour. Add test case, to prove the issue, and then obviously ensure it works, post fix. Apply changes in logic of createQueueName to handle global better and fix the behaviour. Create queues so names are same as behaviour with core client. --- .../proton/ProtonServerSenderContext.java | 7 +- .../amqp/ClientDefinedMultiConsumerTest.java | 32 ++-- .../amqp/JMSSharedConsumerTest.java | 166 ++++++++++++++++++ .../amqp/JMSSharedDurableConsumerTest.java | 166 ++++++++++++++++++ 4 files changed, 350 insertions(+), 21 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 6dcf41a2c1..ad8bee79bd 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -738,16 +738,13 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr boolean shared, boolean global, boolean isVolatile) { - String queue = clientId == null || clientId.isEmpty() ? pubId : clientId + "." + pubId; + String queue = clientId == null || clientId.isEmpty() || global ? pubId : clientId + "." + pubId; if (shared) { if (queue.contains("|")) { queue = queue.split("\\|")[0]; } if (isVolatile) { - queue += ":shared-volatile"; - } - if (global) { - queue += ":global"; + queue = "nonDurable" + "." + queue; } } return queue; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java index 51c70ee47c..3f8da1af7c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java @@ -57,15 +57,15 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { assertNotNull(amqpMessage); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); assertNotNull(amqpMessage); - assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount()); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount()); receiver.close(); - assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub"))); receiver2.close(); //check its been deleted Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisfied() throws Exception { - return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")) == null; + return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")) == null; } }, 1000); connection.close(); @@ -76,7 +76,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { AddressInfo addressInfo = new AddressInfo(address); addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); server.addAddressInfo(addressInfo); - server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("myClientId.mySub:shared-volatile"), null, true, false, -1, false, false); + server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("nonDurable.myClientId.mySub"), null, true, false, -1, false, false); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect("myClientId")); @@ -91,12 +91,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { assertNotNull(amqpMessage); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); assertNotNull(amqpMessage); - assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount()); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount()); receiver.close(); - assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub"))); receiver2.close(); //check its **Hasn't** been deleted - assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub"))); connection.close(); } @@ -119,14 +119,14 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { assertNotNull(amqpMessage); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); assertNotNull(amqpMessage); - assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount()); - assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount()); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub"))); //check its been deleted connection.close(); Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisfied() throws Exception { - return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")) == null; + return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")) == null; } }, 1000); } @@ -150,15 +150,15 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { assertNotNull(amqpMessage); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); assertNotNull(amqpMessage); - assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")).getBindable()).getConsumerCount()); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub")).getBindable()).getConsumerCount()); receiver.close(); - assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global"))); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub"))); receiver2.close(); //check its been deleted Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisfied() throws Exception { - return server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")) == null; + return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub")) == null; } }, 1000); connection.close(); @@ -287,12 +287,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { assertNotNull(amqpMessage); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); assertNotNull(amqpMessage); - assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")).getBindable()).getConsumerCount()); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub")).getBindable()).getConsumerCount()); receiver.close(); - assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global"))); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub"))); receiver2.close(); //check its been deleted - assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global"))); + assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub"))); connection.close(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java new file mode 100644 index 0000000000..c49fcff978 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory; +import org.junit.Test; + +public class JMSSharedConsumerTest extends JMSClientTestSupport { + + @Override + protected String getConfiguredProtocols() { + return "AMQP,OPENWIRE,CORE"; + } + + private void testSharedConsumer(Connection connection1, Connection connection2) throws JMSException { + try { + Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Topic topic = session1.createTopic(getTopicName()); + Topic topic2 = session2.createTopic(getTopicName()); + + final MessageConsumer consumer1 = session1.createSharedConsumer(topic, "SharedConsumer"); + final MessageConsumer consumer2 = session2.createSharedConsumer(topic2, "SharedConsumer"); + + MessageProducer producer = session1.createProducer(topic); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + connection1.start(); + + TextMessage message = session1.createTextMessage(); + message.setText("hello"); + producer.send(message); + + Message message1 = consumer1.receive(100); + Message message2 = consumer2.receive(100); + + Message received = null; + if (message1 != null) { + assertNull("Message should only be delivered once per subscribtion but see twice", message2); + received = message1; + } else { + received = message2; + } + assertNotNull("Should have received a message by now.", received); + assertTrue("Should be an instance of TextMessage", received instanceof TextMessage); + } finally { + connection1.close(); + connection2.close(); + } + } + + @Test(timeout = 30000) + public void testSharedConsumer() throws Exception { + Connection connection = createConnection(); //AMQP + Connection connection2 = createConnection(); //AMQP + + testSharedConsumer(connection, connection2); + } + + @Test(timeout = 30000) + public void testSharedConsumerWithArtemisClient() throws Exception { + + Connection connection = createCoreConnection(); //CORE + Connection connection2 = createCoreConnection(); //CORE + + testSharedConsumer(connection, connection2); + + } + + @Test(timeout = 30000) + public void testSharedConsumerWithAMQPClientAndArtemisClient() throws Exception { + + Connection connection = createConnection(); //AMQP + Connection connection2 = createCoreConnection(); //CORE + + testSharedConsumer(connection, connection2); + + } + + @Test(timeout = 30000) + public void testSharedConsumerWithArtemisClientAndAMQPClient() throws Exception { + + Connection connection = createCoreConnection(); //CORE + Connection connection2 = createConnection(); //AMQP + + testSharedConsumer(connection, connection2); + + } + + + protected String getBrokerCoreJMSConnectionString() { + + try { + int port = AMQP_PORT; + + String uri = null; + + if (isUseSSL()) { + uri = "tcp://127.0.0.1:" + port; + } else { + uri = "tcp://127.0.0.1:" + port; + } + + if (!getJmsConnectionURIOptions().isEmpty()) { + uri = uri + "?" + getJmsConnectionURIOptions(); + } + + return uri; + } catch (Exception e) { + throw new RuntimeException(); + } + } + + protected Connection createCoreConnection() throws JMSException { + return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true); + } + + private Connection createCoreConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException { + ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString); + + Connection connection = trackJMSConnection(factory.createConnection(username, password)); + + connection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + exception.printStackTrace(); + } + }); + + if (clientId != null && !clientId.isEmpty()) { + connection.setClientID(clientId); + } + + if (start) { + connection.start(); + } + + return connection; + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java new file mode 100644 index 0000000000..040506ba4b --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory; +import org.junit.Test; + +public class JMSSharedDurableConsumerTest extends JMSClientTestSupport { + + @Override + protected String getConfiguredProtocols() { + return "AMQP,OPENWIRE,CORE"; + } + + private void testSharedDurableConsumer(Connection connection1, Connection connection2) throws JMSException { + try { + Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Topic topic = session1.createTopic(getTopicName()); + Topic topic2 = session2.createTopic(getTopicName()); + + final MessageConsumer consumer1 = session1.createSharedDurableConsumer(topic, "SharedConsumer"); + final MessageConsumer consumer2 = session2.createSharedDurableConsumer(topic2, "SharedConsumer"); + + MessageProducer producer = session1.createProducer(topic); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + connection1.start(); + + TextMessage message = session1.createTextMessage(); + message.setText("hello"); + producer.send(message); + + Message message1 = consumer1.receive(100); + Message message2 = consumer2.receive(100); + + Message received = null; + if (message1 != null) { + assertNull("Message should only be delivered once per subscribtion but see twice", message2); + received = message1; + } else { + received = message2; + } + assertNotNull("Should have received a message by now.", received); + assertTrue("Should be an instance of TextMessage", received instanceof TextMessage); + } finally { + connection1.close(); + connection2.close(); + } + } + + @Test(timeout = 30000) + public void testSharedDurableConsumer() throws Exception { + Connection connection = createConnection(); //AMQP + Connection connection2 = createConnection(); //AMQP + + testSharedDurableConsumer(connection, connection2); + } + + @Test(timeout = 30000) + public void testSharedDurableConsumerWithArtemisClient() throws Exception { + + Connection connection = createCoreConnection(); //CORE + Connection connection2 = createCoreConnection(); //CORE + + testSharedDurableConsumer(connection, connection2); + + } + + @Test(timeout = 30000) + public void testSharedDurableConsumerWithAMQPClientAndArtemisClient() throws Exception { + + Connection connection = createConnection(); //AMQP + Connection connection2 = createCoreConnection(); //CORE + + testSharedDurableConsumer(connection, connection2); + + } + + @Test(timeout = 30000) + public void testSharedDurableConsumerWithArtemisClientAndAMQPClient() throws Exception { + + Connection connection = createCoreConnection(); //CORE + Connection connection2 = createConnection(); //AMQP + + testSharedDurableConsumer(connection, connection2); + + } + + + protected String getBrokerCoreJMSConnectionString() { + + try { + int port = AMQP_PORT; + + String uri = null; + + if (isUseSSL()) { + uri = "tcp://127.0.0.1:" + port; + } else { + uri = "tcp://127.0.0.1:" + port; + } + + if (!getJmsConnectionURIOptions().isEmpty()) { + uri = uri + "?" + getJmsConnectionURIOptions(); + } + + return uri; + } catch (Exception e) { + throw new RuntimeException(); + } + } + + protected Connection createCoreConnection() throws JMSException { + return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true); + } + + private Connection createCoreConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException { + ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString); + + Connection connection = trackJMSConnection(factory.createConnection(username, password)); + + connection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + exception.printStackTrace(); + } + }); + + if (clientId != null && !clientId.isEmpty()) { + connection.setClientID(clientId); + } + + if (start) { + connection.start(); + } + + return connection; + } + +}