ARTEMIS-1205: AMQP Shared Durable Subscriber incorrect behaviour.

Add test case, to prove the issue, and then obviously ensure it works, post fix.
Apply changes in logic of createQueueName to handle global better and fix the behaviour.
Create queues so names are same as behaviour with core client.
This commit is contained in:
Michael Andre Pearce 2017-06-05 13:45:25 +01:00
parent 7036e50383
commit d9d9699732
4 changed files with 350 additions and 21 deletions

View File

@ -738,16 +738,13 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
boolean shared,
boolean global,
boolean isVolatile) {
String queue = clientId == null || clientId.isEmpty() ? pubId : clientId + "." + pubId;
String queue = clientId == null || clientId.isEmpty() || global ? pubId : clientId + "." + pubId;
if (shared) {
if (queue.contains("|")) {
queue = queue.split("\\|")[0];
}
if (isVolatile) {
queue += ":shared-volatile";
}
if (global) {
queue += ":global";
queue = "nonDurable" + "." + queue;
}
}
return queue;

View File

@ -57,15 +57,15 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount());
receiver.close();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")));
receiver2.close();
//check its been deleted
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")) == null;
return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")) == null;
}
}, 1000);
connection.close();
@ -76,7 +76,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.addAddressInfo(addressInfo);
server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("myClientId.mySub:shared-volatile"), null, true, false, -1, false, false);
server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("nonDurable.myClientId.mySub"), null, true, false, -1, false, false);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId"));
@ -91,12 +91,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount());
receiver.close();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")));
receiver2.close();
//check its **Hasn't** been deleted
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")));
connection.close();
}
@ -119,14 +119,14 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount());
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")));
//check its been deleted
connection.close();
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")) == null;
return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")) == null;
}
}, 1000);
}
@ -150,15 +150,15 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")).getBindable()).getConsumerCount());
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub")).getBindable()).getConsumerCount());
receiver.close();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")));
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub")));
receiver2.close();
//check its been deleted
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")) == null;
return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub")) == null;
}
}, 1000);
connection.close();
@ -287,12 +287,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")).getBindable()).getConsumerCount());
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub")).getBindable()).getConsumerCount());
receiver.close();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")));
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub")));
receiver2.close();
//check its been deleted
assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")));
assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub")));
connection.close();
}

View File

@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
import org.junit.Test;
public class JMSSharedConsumerTest extends JMSClientTestSupport {
@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
}
private void testSharedConsumer(Connection connection1, Connection connection2) throws JMSException {
try {
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session1.createTopic(getTopicName());
Topic topic2 = session2.createTopic(getTopicName());
final MessageConsumer consumer1 = session1.createSharedConsumer(topic, "SharedConsumer");
final MessageConsumer consumer2 = session2.createSharedConsumer(topic2, "SharedConsumer");
MessageProducer producer = session1.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection1.start();
TextMessage message = session1.createTextMessage();
message.setText("hello");
producer.send(message);
Message message1 = consumer1.receive(100);
Message message2 = consumer2.receive(100);
Message received = null;
if (message1 != null) {
assertNull("Message should only be delivered once per subscribtion but see twice", message2);
received = message1;
} else {
received = message2;
}
assertNotNull("Should have received a message by now.", received);
assertTrue("Should be an instance of TextMessage", received instanceof TextMessage);
} finally {
connection1.close();
connection2.close();
}
}
@Test(timeout = 30000)
public void testSharedConsumer() throws Exception {
Connection connection = createConnection(); //AMQP
Connection connection2 = createConnection(); //AMQP
testSharedConsumer(connection, connection2);
}
@Test(timeout = 30000)
public void testSharedConsumerWithArtemisClient() throws Exception {
Connection connection = createCoreConnection(); //CORE
Connection connection2 = createCoreConnection(); //CORE
testSharedConsumer(connection, connection2);
}
@Test(timeout = 30000)
public void testSharedConsumerWithAMQPClientAndArtemisClient() throws Exception {
Connection connection = createConnection(); //AMQP
Connection connection2 = createCoreConnection(); //CORE
testSharedConsumer(connection, connection2);
}
@Test(timeout = 30000)
public void testSharedConsumerWithArtemisClientAndAMQPClient() throws Exception {
Connection connection = createCoreConnection(); //CORE
Connection connection2 = createConnection(); //AMQP
testSharedConsumer(connection, connection2);
}
protected String getBrokerCoreJMSConnectionString() {
try {
int port = AMQP_PORT;
String uri = null;
if (isUseSSL()) {
uri = "tcp://127.0.0.1:" + port;
} else {
uri = "tcp://127.0.0.1:" + port;
}
if (!getJmsConnectionURIOptions().isEmpty()) {
uri = uri + "?" + getJmsConnectionURIOptions();
}
return uri;
} catch (Exception e) {
throw new RuntimeException();
}
}
protected Connection createCoreConnection() throws JMSException {
return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true);
}
private Connection createCoreConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException {
ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString);
Connection connection = trackJMSConnection(factory.createConnection(username, password));
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
if (clientId != null && !clientId.isEmpty()) {
connection.setClientID(clientId);
}
if (start) {
connection.start();
}
return connection;
}
}

View File

@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
import org.junit.Test;
public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
}
private void testSharedDurableConsumer(Connection connection1, Connection connection2) throws JMSException {
try {
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session1.createTopic(getTopicName());
Topic topic2 = session2.createTopic(getTopicName());
final MessageConsumer consumer1 = session1.createSharedDurableConsumer(topic, "SharedConsumer");
final MessageConsumer consumer2 = session2.createSharedDurableConsumer(topic2, "SharedConsumer");
MessageProducer producer = session1.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection1.start();
TextMessage message = session1.createTextMessage();
message.setText("hello");
producer.send(message);
Message message1 = consumer1.receive(100);
Message message2 = consumer2.receive(100);
Message received = null;
if (message1 != null) {
assertNull("Message should only be delivered once per subscribtion but see twice", message2);
received = message1;
} else {
received = message2;
}
assertNotNull("Should have received a message by now.", received);
assertTrue("Should be an instance of TextMessage", received instanceof TextMessage);
} finally {
connection1.close();
connection2.close();
}
}
@Test(timeout = 30000)
public void testSharedDurableConsumer() throws Exception {
Connection connection = createConnection(); //AMQP
Connection connection2 = createConnection(); //AMQP
testSharedDurableConsumer(connection, connection2);
}
@Test(timeout = 30000)
public void testSharedDurableConsumerWithArtemisClient() throws Exception {
Connection connection = createCoreConnection(); //CORE
Connection connection2 = createCoreConnection(); //CORE
testSharedDurableConsumer(connection, connection2);
}
@Test(timeout = 30000)
public void testSharedDurableConsumerWithAMQPClientAndArtemisClient() throws Exception {
Connection connection = createConnection(); //AMQP
Connection connection2 = createCoreConnection(); //CORE
testSharedDurableConsumer(connection, connection2);
}
@Test(timeout = 30000)
public void testSharedDurableConsumerWithArtemisClientAndAMQPClient() throws Exception {
Connection connection = createCoreConnection(); //CORE
Connection connection2 = createConnection(); //AMQP
testSharedDurableConsumer(connection, connection2);
}
protected String getBrokerCoreJMSConnectionString() {
try {
int port = AMQP_PORT;
String uri = null;
if (isUseSSL()) {
uri = "tcp://127.0.0.1:" + port;
} else {
uri = "tcp://127.0.0.1:" + port;
}
if (!getJmsConnectionURIOptions().isEmpty()) {
uri = uri + "?" + getJmsConnectionURIOptions();
}
return uri;
} catch (Exception e) {
throw new RuntimeException();
}
}
protected Connection createCoreConnection() throws JMSException {
return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true);
}
private Connection createCoreConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException {
ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString);
Connection connection = trackJMSConnection(factory.createConnection(username, password));
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
if (clientId != null && !clientId.isEmpty()) {
connection.setClientID(clientId);
}
if (start) {
connection.start();
}
return connection;
}
}