Allow a receiver link to enable consumer options on the subscription
such as exclusive and retroactive using options encoded on the address
(cherry picked from commit a35e8dc8a2)

Conflicts:
	activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
This commit is contained in:
Timothy Bish 2016-07-14 16:08:34 -04:00
parent c1a2998660
commit b547e46131
2 changed files with 98 additions and 0 deletions

View File

@ -46,6 +46,7 @@ import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
import org.apache.activemq.transport.amqp.AmqpProtocolException;
import org.apache.activemq.transport.amqp.ResponseHandler;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Target;
@ -313,6 +314,22 @@ public class AmqpSession implements AmqpResource {
int senderCredit = protonSender.getRemoteCredit();
// Allows the options on the destination to configure the consumerInfo
if (destination.getOptions() != null) {
Map<String, Object> options = IntrospectionSupport.extractProperties(
new HashMap<String, Object>(destination.getOptions()), "consumer.");
IntrospectionSupport.setProperties(consumerInfo, options);
if (options.size() > 0) {
String msg = "There are " + options.size()
+ " consumer options that couldn't be set on the consumer."
+ " Check the options are spelled correctly."
+ " Unknown parameters=[" + options + "]."
+ " This consumer cannot be started.";
LOG.warn(msg);
throw new AmqpProtocolException(AmqpError.INVALID_FIELD.toString(), msg);
}
}
consumerInfo.setSelector(selector);
consumerInfo.setNoRangeAcks(true);
consumerInfo.setDestination(destination);

View File

@ -55,8 +55,10 @@ import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.ConnectorViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
import org.apache.activemq.util.Wait;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Test;
import org.objectweb.jtests.jms.framework.TestConfig;
import org.slf4j.Logger;
@ -1177,6 +1179,85 @@ public class JMSClientTest extends JMSClientTestSupport {
}
}
@Test(timeout = 60000)
public void testZeroPrefetchWithTwoConsumers() throws Exception {
JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=0"));
connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("Msg1"));
producer.send(session.createTextMessage("Msg2"));
// now lets receive it
MessageConsumer consumer1 = session.createConsumer(queue);
MessageConsumer consumer2 = session.createConsumer(queue);
TextMessage answer = (TextMessage)consumer1.receive(5000);
assertNotNull(answer);
assertEquals("Should have received a message!", answer.getText(), "Msg1");
answer = (TextMessage)consumer2.receive(5000);
assertNotNull(answer);
assertEquals("Should have received a message!", answer.getText(), "Msg2");
answer = (TextMessage)consumer2.receiveNoWait();
assertNull("Should have not received a message!", answer);
}
@Test(timeout=30000)
public void testRetroactiveConsumerSupported() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName() + "?consumer.retroactive=true");
MessageConsumer consumer = session.createConsumer(queue);
QueueViewMBean queueView = getProxyToQueue(getDestinationName());
assertNotNull(queueView);
assertEquals(1, queueView.getSubscriptions().length);
SubscriptionViewMBean subscriber = getProxyToQueueSubscriber(getDestinationName());
assertTrue(subscriber.isRetroactive());
consumer.close();
}
@Test(timeout=30000)
public void testExclusiveConsumerSupported() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName() + "?consumer.exclusive=true");
MessageConsumer consumer = session.createConsumer(queue);
QueueViewMBean queueView = getProxyToQueue(getDestinationName());
assertNotNull(queueView);
assertEquals(1, queueView.getSubscriptions().length);
SubscriptionViewMBean subscriber = getProxyToQueueSubscriber(getDestinationName());
assertTrue(subscriber.isExclusive());
consumer.close();
}
@Test(timeout=30000)
public void testUnpplicableDestinationOption() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName() + "?consumer.unknoen=true");
try {
session.createConsumer(queue);
fail("Should have failed to create consumer");
} catch (JMSException jmsEx) {
}
}
protected void receiveMessages(MessageConsumer consumer) throws Exception {
for (int i = 0; i < 10; i++) {
Message message = consumer.receive(1000);