This commit is contained in:
Clebert Suconic 2017-06-05 18:15:09 -04:00
commit 56485a204c
4 changed files with 350 additions and 21 deletions

View File

@ -738,16 +738,13 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
boolean shared, boolean shared,
boolean global, boolean global,
boolean isVolatile) { boolean isVolatile) {
String queue = clientId == null || clientId.isEmpty() ? pubId : clientId + "." + pubId; String queue = clientId == null || clientId.isEmpty() || global ? pubId : clientId + "." + pubId;
if (shared) { if (shared) {
if (queue.contains("|")) { if (queue.contains("|")) {
queue = queue.split("\\|")[0]; queue = queue.split("\\|")[0];
} }
if (isVolatile) { if (isVolatile) {
queue += ":shared-volatile"; queue = "nonDurable" + "." + queue;
}
if (global) {
queue += ":global";
} }
} }
return queue; return queue;

View File

@ -57,15 +57,15 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
assertNotNull(amqpMessage); assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage); assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount()); assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount());
receiver.close(); receiver.close();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")));
receiver2.close(); receiver2.close();
//check its been deleted //check its been deleted
Wait.waitFor(new Wait.Condition() { Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisfied() throws Exception { public boolean isSatisfied() throws Exception {
return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")) == null; return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")) == null;
} }
}, 1000); }, 1000);
connection.close(); connection.close();
@ -76,7 +76,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
AddressInfo addressInfo = new AddressInfo(address); AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST); addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.addAddressInfo(addressInfo); server.addAddressInfo(addressInfo);
server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("myClientId.mySub:shared-volatile"), null, true, false, -1, false, false); server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("nonDurable.myClientId.mySub"), null, true, false, -1, false, false);
AmqpClient client = createAmqpClient(); AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId")); AmqpConnection connection = addConnection(client.connect("myClientId"));
@ -91,12 +91,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
assertNotNull(amqpMessage); assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage); assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount()); assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount());
receiver.close(); receiver.close();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")));
receiver2.close(); receiver2.close();
//check its **Hasn't** been deleted //check its **Hasn't** been deleted
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")));
connection.close(); connection.close();
} }
@ -119,14 +119,14 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
assertNotNull(amqpMessage); assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage); assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount()); assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount());
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile"))); assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")));
//check its been deleted //check its been deleted
connection.close(); connection.close();
Wait.waitFor(new Wait.Condition() { Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisfied() throws Exception { public boolean isSatisfied() throws Exception {
return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")) == null; return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")) == null;
} }
}, 1000); }, 1000);
} }
@ -150,15 +150,15 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
assertNotNull(amqpMessage); assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage); assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")).getBindable()).getConsumerCount()); assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub")).getBindable()).getConsumerCount());
receiver.close(); receiver.close();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global"))); assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub")));
receiver2.close(); receiver2.close();
//check its been deleted //check its been deleted
Wait.waitFor(new Wait.Condition() { Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisfied() throws Exception { public boolean isSatisfied() throws Exception {
return server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")) == null; return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub")) == null;
} }
}, 1000); }, 1000);
connection.close(); connection.close();
@ -287,12 +287,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
assertNotNull(amqpMessage); assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS); amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage); assertNotNull(amqpMessage);
assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")).getBindable()).getConsumerCount()); assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub")).getBindable()).getConsumerCount());
receiver.close(); receiver.close();
assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global"))); assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub")));
receiver2.close(); receiver2.close();
//check its been deleted //check its been deleted
assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global"))); assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub")));
connection.close(); connection.close();
} }

View File

@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
import org.junit.Test;
public class JMSSharedConsumerTest extends JMSClientTestSupport {
@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
}
private void testSharedConsumer(Connection connection1, Connection connection2) throws JMSException {
try {
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session1.createTopic(getTopicName());
Topic topic2 = session2.createTopic(getTopicName());
final MessageConsumer consumer1 = session1.createSharedConsumer(topic, "SharedConsumer");
final MessageConsumer consumer2 = session2.createSharedConsumer(topic2, "SharedConsumer");
MessageProducer producer = session1.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection1.start();
TextMessage message = session1.createTextMessage();
message.setText("hello");
producer.send(message);
Message message1 = consumer1.receive(100);
Message message2 = consumer2.receive(100);
Message received = null;
if (message1 != null) {
assertNull("Message should only be delivered once per subscribtion but see twice", message2);
received = message1;
} else {
received = message2;
}
assertNotNull("Should have received a message by now.", received);
assertTrue("Should be an instance of TextMessage", received instanceof TextMessage);
} finally {
connection1.close();
connection2.close();
}
}
@Test(timeout = 30000)
public void testSharedConsumer() throws Exception {
Connection connection = createConnection(); //AMQP
Connection connection2 = createConnection(); //AMQP
testSharedConsumer(connection, connection2);
}
@Test(timeout = 30000)
public void testSharedConsumerWithArtemisClient() throws Exception {
Connection connection = createCoreConnection(); //CORE
Connection connection2 = createCoreConnection(); //CORE
testSharedConsumer(connection, connection2);
}
@Test(timeout = 30000)
public void testSharedConsumerWithAMQPClientAndArtemisClient() throws Exception {
Connection connection = createConnection(); //AMQP
Connection connection2 = createCoreConnection(); //CORE
testSharedConsumer(connection, connection2);
}
@Test(timeout = 30000)
public void testSharedConsumerWithArtemisClientAndAMQPClient() throws Exception {
Connection connection = createCoreConnection(); //CORE
Connection connection2 = createConnection(); //AMQP
testSharedConsumer(connection, connection2);
}
protected String getBrokerCoreJMSConnectionString() {
try {
int port = AMQP_PORT;
String uri = null;
if (isUseSSL()) {
uri = "tcp://127.0.0.1:" + port;
} else {
uri = "tcp://127.0.0.1:" + port;
}
if (!getJmsConnectionURIOptions().isEmpty()) {
uri = uri + "?" + getJmsConnectionURIOptions();
}
return uri;
} catch (Exception e) {
throw new RuntimeException();
}
}
protected Connection createCoreConnection() throws JMSException {
return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true);
}
private Connection createCoreConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException {
ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString);
Connection connection = trackJMSConnection(factory.createConnection(username, password));
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
if (clientId != null && !clientId.isEmpty()) {
connection.setClientID(clientId);
}
if (start) {
connection.start();
}
return connection;
}
}

View File

@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
import org.junit.Test;
public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
}
private void testSharedDurableConsumer(Connection connection1, Connection connection2) throws JMSException {
try {
Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session1.createTopic(getTopicName());
Topic topic2 = session2.createTopic(getTopicName());
final MessageConsumer consumer1 = session1.createSharedDurableConsumer(topic, "SharedConsumer");
final MessageConsumer consumer2 = session2.createSharedDurableConsumer(topic2, "SharedConsumer");
MessageProducer producer = session1.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection1.start();
TextMessage message = session1.createTextMessage();
message.setText("hello");
producer.send(message);
Message message1 = consumer1.receive(100);
Message message2 = consumer2.receive(100);
Message received = null;
if (message1 != null) {
assertNull("Message should only be delivered once per subscribtion but see twice", message2);
received = message1;
} else {
received = message2;
}
assertNotNull("Should have received a message by now.", received);
assertTrue("Should be an instance of TextMessage", received instanceof TextMessage);
} finally {
connection1.close();
connection2.close();
}
}
@Test(timeout = 30000)
public void testSharedDurableConsumer() throws Exception {
Connection connection = createConnection(); //AMQP
Connection connection2 = createConnection(); //AMQP
testSharedDurableConsumer(connection, connection2);
}
@Test(timeout = 30000)
public void testSharedDurableConsumerWithArtemisClient() throws Exception {
Connection connection = createCoreConnection(); //CORE
Connection connection2 = createCoreConnection(); //CORE
testSharedDurableConsumer(connection, connection2);
}
@Test(timeout = 30000)
public void testSharedDurableConsumerWithAMQPClientAndArtemisClient() throws Exception {
Connection connection = createConnection(); //AMQP
Connection connection2 = createCoreConnection(); //CORE
testSharedDurableConsumer(connection, connection2);
}
@Test(timeout = 30000)
public void testSharedDurableConsumerWithArtemisClientAndAMQPClient() throws Exception {
Connection connection = createCoreConnection(); //CORE
Connection connection2 = createConnection(); //AMQP
testSharedDurableConsumer(connection, connection2);
}
protected String getBrokerCoreJMSConnectionString() {
try {
int port = AMQP_PORT;
String uri = null;
if (isUseSSL()) {
uri = "tcp://127.0.0.1:" + port;
} else {
uri = "tcp://127.0.0.1:" + port;
}
if (!getJmsConnectionURIOptions().isEmpty()) {
uri = uri + "?" + getJmsConnectionURIOptions();
}
return uri;
} catch (Exception e) {
throw new RuntimeException();
}
}
protected Connection createCoreConnection() throws JMSException {
return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true);
}
private Connection createCoreConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException {
ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString);
Connection connection = trackJMSConnection(factory.createConnection(username, password));
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
if (clientId != null && !clientId.isEmpty()) {
connection.setClientID(clientId);
}
if (start) {
connection.start();
}
return connection;
}
}