diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 6dcf41a2c1..ad8bee79bd 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -738,16 +738,13 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr boolean shared, boolean global, boolean isVolatile) { - String queue = clientId == null || clientId.isEmpty() ? pubId : clientId + "." + pubId; + String queue = clientId == null || clientId.isEmpty() || global ? pubId : clientId + "." + pubId; if (shared) { if (queue.contains("|")) { queue = queue.split("\\|")[0]; } if (isVolatile) { - queue += ":shared-volatile"; - } - if (global) { - queue += ":global"; + queue = "nonDurable" + "." + queue; } } return queue; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java index 51c70ee47c..3f8da1af7c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java @@ -57,15 +57,15 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { assertNotNull(amqpMessage); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); assertNotNull(amqpMessage); - assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount()); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount()); receiver.close(); - assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub"))); receiver2.close(); //check its been deleted Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisfied() throws Exception { - return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")) == null; + return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")) == null; } }, 1000); connection.close(); @@ -76,7 +76,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { AddressInfo addressInfo = new AddressInfo(address); addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); server.addAddressInfo(addressInfo); - server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("myClientId.mySub:shared-volatile"), null, true, false, -1, false, false); + server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("nonDurable.myClientId.mySub"), null, true, false, -1, false, false); AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect("myClientId")); @@ -91,12 +91,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { assertNotNull(amqpMessage); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); assertNotNull(amqpMessage); - assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount()); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount()); receiver.close(); - assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub"))); receiver2.close(); //check its **Hasn't** been deleted - assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub"))); connection.close(); } @@ -119,14 +119,14 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { assertNotNull(amqpMessage); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); assertNotNull(amqpMessage); - assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount()); - assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount()); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub"))); //check its been deleted connection.close(); Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisfied() throws Exception { - return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")) == null; + return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")) == null; } }, 1000); } @@ -150,15 +150,15 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { assertNotNull(amqpMessage); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); assertNotNull(amqpMessage); - assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")).getBindable()).getConsumerCount()); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub")).getBindable()).getConsumerCount()); receiver.close(); - assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global"))); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub"))); receiver2.close(); //check its been deleted Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisfied() throws Exception { - return server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")) == null; + return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub")) == null; } }, 1000); connection.close(); @@ -287,12 +287,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport { assertNotNull(amqpMessage); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); assertNotNull(amqpMessage); - assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")).getBindable()).getConsumerCount()); + assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub")).getBindable()).getConsumerCount()); receiver.close(); - assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global"))); + assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub"))); receiver2.close(); //check its been deleted - assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global"))); + assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub"))); connection.close(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java new file mode 100644 index 0000000000..c49fcff978 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory; +import org.junit.Test; + +public class JMSSharedConsumerTest extends JMSClientTestSupport { + + @Override + protected String getConfiguredProtocols() { + return "AMQP,OPENWIRE,CORE"; + } + + private void testSharedConsumer(Connection connection1, Connection connection2) throws JMSException { + try { + Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Topic topic = session1.createTopic(getTopicName()); + Topic topic2 = session2.createTopic(getTopicName()); + + final MessageConsumer consumer1 = session1.createSharedConsumer(topic, "SharedConsumer"); + final MessageConsumer consumer2 = session2.createSharedConsumer(topic2, "SharedConsumer"); + + MessageProducer producer = session1.createProducer(topic); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + connection1.start(); + + TextMessage message = session1.createTextMessage(); + message.setText("hello"); + producer.send(message); + + Message message1 = consumer1.receive(100); + Message message2 = consumer2.receive(100); + + Message received = null; + if (message1 != null) { + assertNull("Message should only be delivered once per subscribtion but see twice", message2); + received = message1; + } else { + received = message2; + } + assertNotNull("Should have received a message by now.", received); + assertTrue("Should be an instance of TextMessage", received instanceof TextMessage); + } finally { + connection1.close(); + connection2.close(); + } + } + + @Test(timeout = 30000) + public void testSharedConsumer() throws Exception { + Connection connection = createConnection(); //AMQP + Connection connection2 = createConnection(); //AMQP + + testSharedConsumer(connection, connection2); + } + + @Test(timeout = 30000) + public void testSharedConsumerWithArtemisClient() throws Exception { + + Connection connection = createCoreConnection(); //CORE + Connection connection2 = createCoreConnection(); //CORE + + testSharedConsumer(connection, connection2); + + } + + @Test(timeout = 30000) + public void testSharedConsumerWithAMQPClientAndArtemisClient() throws Exception { + + Connection connection = createConnection(); //AMQP + Connection connection2 = createCoreConnection(); //CORE + + testSharedConsumer(connection, connection2); + + } + + @Test(timeout = 30000) + public void testSharedConsumerWithArtemisClientAndAMQPClient() throws Exception { + + Connection connection = createCoreConnection(); //CORE + Connection connection2 = createConnection(); //AMQP + + testSharedConsumer(connection, connection2); + + } + + + protected String getBrokerCoreJMSConnectionString() { + + try { + int port = AMQP_PORT; + + String uri = null; + + if (isUseSSL()) { + uri = "tcp://127.0.0.1:" + port; + } else { + uri = "tcp://127.0.0.1:" + port; + } + + if (!getJmsConnectionURIOptions().isEmpty()) { + uri = uri + "?" + getJmsConnectionURIOptions(); + } + + return uri; + } catch (Exception e) { + throw new RuntimeException(); + } + } + + protected Connection createCoreConnection() throws JMSException { + return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true); + } + + private Connection createCoreConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException { + ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString); + + Connection connection = trackJMSConnection(factory.createConnection(username, password)); + + connection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + exception.printStackTrace(); + } + }); + + if (clientId != null && !clientId.isEmpty()) { + connection.setClientID(clientId); + } + + if (start) { + connection.start(); + } + + return connection; + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java new file mode 100644 index 0000000000..040506ba4b --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory; +import org.junit.Test; + +public class JMSSharedDurableConsumerTest extends JMSClientTestSupport { + + @Override + protected String getConfiguredProtocols() { + return "AMQP,OPENWIRE,CORE"; + } + + private void testSharedDurableConsumer(Connection connection1, Connection connection2) throws JMSException { + try { + Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Topic topic = session1.createTopic(getTopicName()); + Topic topic2 = session2.createTopic(getTopicName()); + + final MessageConsumer consumer1 = session1.createSharedDurableConsumer(topic, "SharedConsumer"); + final MessageConsumer consumer2 = session2.createSharedDurableConsumer(topic2, "SharedConsumer"); + + MessageProducer producer = session1.createProducer(topic); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + connection1.start(); + + TextMessage message = session1.createTextMessage(); + message.setText("hello"); + producer.send(message); + + Message message1 = consumer1.receive(100); + Message message2 = consumer2.receive(100); + + Message received = null; + if (message1 != null) { + assertNull("Message should only be delivered once per subscribtion but see twice", message2); + received = message1; + } else { + received = message2; + } + assertNotNull("Should have received a message by now.", received); + assertTrue("Should be an instance of TextMessage", received instanceof TextMessage); + } finally { + connection1.close(); + connection2.close(); + } + } + + @Test(timeout = 30000) + public void testSharedDurableConsumer() throws Exception { + Connection connection = createConnection(); //AMQP + Connection connection2 = createConnection(); //AMQP + + testSharedDurableConsumer(connection, connection2); + } + + @Test(timeout = 30000) + public void testSharedDurableConsumerWithArtemisClient() throws Exception { + + Connection connection = createCoreConnection(); //CORE + Connection connection2 = createCoreConnection(); //CORE + + testSharedDurableConsumer(connection, connection2); + + } + + @Test(timeout = 30000) + public void testSharedDurableConsumerWithAMQPClientAndArtemisClient() throws Exception { + + Connection connection = createConnection(); //AMQP + Connection connection2 = createCoreConnection(); //CORE + + testSharedDurableConsumer(connection, connection2); + + } + + @Test(timeout = 30000) + public void testSharedDurableConsumerWithArtemisClientAndAMQPClient() throws Exception { + + Connection connection = createCoreConnection(); //CORE + Connection connection2 = createConnection(); //AMQP + + testSharedDurableConsumer(connection, connection2); + + } + + + protected String getBrokerCoreJMSConnectionString() { + + try { + int port = AMQP_PORT; + + String uri = null; + + if (isUseSSL()) { + uri = "tcp://127.0.0.1:" + port; + } else { + uri = "tcp://127.0.0.1:" + port; + } + + if (!getJmsConnectionURIOptions().isEmpty()) { + uri = uri + "?" + getJmsConnectionURIOptions(); + } + + return uri; + } catch (Exception e) { + throw new RuntimeException(); + } + } + + protected Connection createCoreConnection() throws JMSException { + return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true); + } + + private Connection createCoreConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException { + ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString); + + Connection connection = trackJMSConnection(factory.createConnection(username, password)); + + connection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + exception.printStackTrace(); + } + }); + + if (clientId != null && !clientId.isEmpty()) { + connection.setClientID(clientId); + } + + if (start) { + connection.start(); + } + + return connection; + } + +}