diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java index 20a8b9f480..bb1436cf63 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java @@ -46,6 +46,7 @@ import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.transport.amqp.AmqpProtocolConverter; import org.apache.activemq.transport.amqp.AmqpProtocolException; import org.apache.activemq.transport.amqp.ResponseHandler; +import org.apache.activemq.util.IntrospectionSupport; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Target; @@ -313,6 +314,22 @@ public class AmqpSession implements AmqpResource { int senderCredit = protonSender.getRemoteCredit(); + // Allows the options on the destination to configure the consumerInfo + if (destination.getOptions() != null) { + Map options = IntrospectionSupport.extractProperties( + new HashMap(destination.getOptions()), "consumer."); + IntrospectionSupport.setProperties(consumerInfo, options); + if (options.size() > 0) { + String msg = "There are " + options.size() + + " consumer options that couldn't be set on the consumer." + + " Check the options are spelled correctly." + + " Unknown parameters=[" + options + "]." + + " This consumer cannot be started."; + LOG.warn(msg); + throw new AmqpProtocolException(AmqpError.INVALID_FIELD.toString(), msg); + } + } + consumerInfo.setSelector(selector); consumerInfo.setNoRangeAcks(true); consumerInfo.setDestination(destination); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java index e4b8d7a68f..358365619f 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java @@ -55,8 +55,10 @@ import org.apache.activemq.broker.jmx.BrokerView; import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.broker.jmx.ConnectorViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.jmx.SubscriptionViewMBean; import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin; import org.apache.activemq.util.Wait; +import org.apache.qpid.jms.JmsConnectionFactory; import org.junit.Test; import org.objectweb.jtests.jms.framework.TestConfig; import org.slf4j.Logger; @@ -1177,6 +1179,85 @@ public class JMSClientTest extends JMSClientTestSupport { } } + @Test(timeout = 60000) + public void testZeroPrefetchWithTwoConsumers() throws Exception { + JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=0")); + connection = cf.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); + + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Msg1")); + producer.send(session.createTextMessage("Msg2")); + + // now lets receive it + MessageConsumer consumer1 = session.createConsumer(queue); + MessageConsumer consumer2 = session.createConsumer(queue); + TextMessage answer = (TextMessage)consumer1.receive(5000); + assertNotNull(answer); + assertEquals("Should have received a message!", answer.getText(), "Msg1"); + answer = (TextMessage)consumer2.receive(5000); + assertNotNull(answer); + assertEquals("Should have received a message!", answer.getText(), "Msg2"); + + answer = (TextMessage)consumer2.receiveNoWait(); + assertNull("Should have not received a message!", answer); + } + + @Test(timeout=30000) + public void testRetroactiveConsumerSupported() throws Exception { + ActiveMQAdmin.enableJMSFrameTracing(); + + connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName() + "?consumer.retroactive=true"); + MessageConsumer consumer = session.createConsumer(queue); + + QueueViewMBean queueView = getProxyToQueue(getDestinationName()); + assertNotNull(queueView); + assertEquals(1, queueView.getSubscriptions().length); + + SubscriptionViewMBean subscriber = getProxyToQueueSubscriber(getDestinationName()); + assertTrue(subscriber.isRetroactive()); + + consumer.close(); + } + + @Test(timeout=30000) + public void testExclusiveConsumerSupported() throws Exception { + ActiveMQAdmin.enableJMSFrameTracing(); + + connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName() + "?consumer.exclusive=true"); + MessageConsumer consumer = session.createConsumer(queue); + + QueueViewMBean queueView = getProxyToQueue(getDestinationName()); + assertNotNull(queueView); + assertEquals(1, queueView.getSubscriptions().length); + + SubscriptionViewMBean subscriber = getProxyToQueueSubscriber(getDestinationName()); + assertTrue(subscriber.isExclusive()); + + consumer.close(); + } + + @Test(timeout=30000) + public void testUnpplicableDestinationOption() throws Exception { + ActiveMQAdmin.enableJMSFrameTracing(); + + connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName() + "?consumer.unknoen=true"); + try { + session.createConsumer(queue); + fail("Should have failed to create consumer"); + } catch (JMSException jmsEx) { + } + } + protected void receiveMessages(MessageConsumer consumer) throws Exception { for (int i = 0; i < 10; i++) { Message message = consumer.receive(1000);