This commit is contained in:
gtully 2014-12-05 14:49:48 +00:00
parent a876a34141
commit bbb2acca2f
3 changed files with 85 additions and 2 deletions

View File

@ -204,7 +204,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
protected AtomicInteger transportInterruptionProcessingComplete = new AtomicInteger(0);
private long consumerFailoverRedeliveryWaitPeriod;
private Scheduler scheduler;
private boolean messagePrioritySupported = true;
private boolean messagePrioritySupported = false;
private boolean transactedIndividualAck = false;
private boolean nonBlockingRedelivery = false;
private boolean rmIdFromConnectionId = false;

View File

@ -267,7 +267,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
this.failoverRedeliveryWaitPeriod = session.connection.getConsumerFailoverRedeliveryWaitPeriod();
this.nonBlockingRedelivery = session.connection.isNonBlockingRedelivery();
this.transactedIndividualAck = session.connection.isTransactedIndividualAck() || this.nonBlockingRedelivery;
this.transactedIndividualAck = session.connection.isTransactedIndividualAck()
|| this.nonBlockingRedelivery
|| session.connection.isMessagePrioritySupported();
this.consumerExpiryCheckEnabled = session.connection.isConsumerExpiryCheckEnabled();
if (messageListener != null) {
setMessageListener(messageListener);

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.JmsTestSupport;
import org.apache.activemq.command.ActiveMQDestination;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BatchedMessagePriorityConsumerTest extends JmsTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(BatchedMessagePriorityConsumerTest.class);
public void testBatchWithLowPriorityFirstAndClientSupport() throws Exception {
doTestBatchWithLowPriorityFirst(true);
}
public void testBatchWithLowPriorityFirstAndClientSupportOff() throws Exception {
doTestBatchWithLowPriorityFirst(false);
}
public void doTestBatchWithLowPriorityFirst(boolean clientPrioritySupport) throws Exception {
connection.start();
connection.setMessagePrioritySupported(clientPrioritySupport);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE);
MessageProducer producer = session.createProducer(destination);
producer.setPriority(0);
sendMessages(session, producer, 2);
producer.close();
MessageProducer producer2 = session.createProducer(destination);
producer2.setPriority(9);
sendMessages(session, producer2, 3);
producer2.close();
session.close();
Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer messageConsumer = consumerSession.createConsumer(destination);
for (int i = 0; i < 5; i++) {
Message message = messageConsumer.receive(4000);
LOG.info("MessageID: " + message.getJMSMessageID());
}
consumerSession.commit();
consumerSession.close();
// should be nothing left
consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
messageConsumer = consumerSession.createConsumer(destination);
assertNull("No message left", messageConsumer.receive(1000));
consumerSession.close();
}
}