This closes #581

This commit is contained in:
Clebert Suconic 2016-06-17 14:58:54 -04:00
commit 07b57e524a
10 changed files with 56 additions and 18 deletions

View File

@ -43,6 +43,7 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.wireformat.WireFormat;
public class AMQConsumer {
@ -271,6 +272,10 @@ public class AMQConsumer {
transaction.commit(true);
}
}
if (ack.isExpiredAck()) {
//adjust delivering count for expired messages
this.serverConsumer.getQueue().decDelivering(ackList.size());
}
}
public void browseFinished() {
@ -314,14 +319,23 @@ public class AMQConsumer {
}
}
public void updateDeliveryCountAfterCancel(MessageReference ref) {
public boolean updateDeliveryCountAfterCancel(MessageReference ref) {
long seqId = ref.getMessage().getMessageID();
long lastDelSeqId = info.getLastDeliveredSequenceId();
//because delivering count is always one greater than redelivery count
//we adjust it down before further calculating.
ref.decrementDeliveryCount();
// This is a specific rule of the protocol
if (!(lastDelSeqId < 0 || seqId <= lastDelSeqId)) {
ref.decrementDeliveryCount();
if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNKNOWN) {
// this takes care of un-acked messages in non-tx deliveries
// tx cases are handled by
// org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection.CommandProcessor.processRollbackTransaction()
ref.incrementDeliveryCount();
}
return true;
}
/**

View File

@ -123,8 +123,7 @@ public class AMQSession implements SessionCallback {
@Override
public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
if (consumer.getProtocolData() != null) {
((AMQConsumer) consumer.getProtocolData()).updateDeliveryCountAfterCancel(ref);
return true;
return ((AMQConsumer) consumer.getProtocolData()).updateDeliveryCountAfterCancel(ref);
}
else {
return false;

View File

@ -249,4 +249,6 @@ public interface Queue extends Bindable {
* @return the user who created this queue
*/
SimpleString getUser();
void decDelivering(int size);
}

View File

@ -2854,6 +2854,11 @@ public class QueueImpl implements Queue {
deliveringCount.decrementAndGet();
}
@Override
public void decDelivering(int size) {
deliveringCount.addAndGet(-size);
}
private void configureExpiry(final AddressSettings settings) {
this.expiryAddress = settings == null ? null : settings.getExpiryAddress();
}

View File

@ -1289,5 +1289,10 @@ public class ScheduledDeliveryHandlerTest extends Assert {
public SimpleString getUser() {
return null;
}
@Override
public void decDelivering(int size) {
}
}
}

View File

@ -41,12 +41,12 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
protected final Map<String, SimpleString> testQueues = new HashMap<>();
protected JMSServerManagerImpl jmsServer;
protected MBeanServer mbeanServer;
@ -251,9 +251,18 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
}
}
public long getAMQueueMessageCount(String physicalName) {
public long getAMQueueMessageCount(ActiveMQDestination amq5Dest) {
if (amq5Dest.isTopic()) {
throw new IllegalArgumentException("Method only accept queue type parameter.");
}
long count = 0;
String qname = "jms.queue." + physicalName;
String qname = null;
if (amq5Dest.isTemporary()) {
qname = "jms.tempqueue." + amq5Dest.getPhysicalName();
}
else {
qname = "jms.queue." + amq5Dest.getPhysicalName();
}
Binding binding = server.getPostOffice().getBinding(new SimpleString(qname));
if (binding != null) {
QueueImpl q = (QueueImpl) binding.getBindable();

View File

@ -57,8 +57,8 @@ public class JMSDurableTopicRedeliverTest extends JmsTopicRedeliverTest {
assertEquals(((TextMessage) unackMessage).getText(), text);
assertFalse(unackMessage.getJMSRedelivered());
assertEquals(unackMessage.getIntProperty("JMSXDeliveryCount"), 1);
consumeSession.close();
consumer.close();
consumeSession.close();
// receive then acknowledge
consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

View File

@ -136,9 +136,9 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport {
}
// now lets receive it
MessageConsumer consumer = session.createConsumer(queue);
TextMessage answer = (TextMessage) consumer.receiveNoWait();
TextMessage answer = (TextMessage) consumer.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg1");
answer = (TextMessage) consumer.receiveNoWait();
answer = (TextMessage) consumer.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg2");
if (transacted) {
session.commit();
@ -157,9 +157,9 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport {
// now lets receive it
MessageConsumer consumer1 = session.createConsumer(queue);
MessageConsumer consumer2 = session.createConsumer(queue);
TextMessage answer = (TextMessage) consumer1.receiveNoWait();
TextMessage answer = (TextMessage) consumer1.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg1");
answer = (TextMessage) consumer2.receiveNoWait();
answer = (TextMessage) consumer2.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg2");
answer = (TextMessage) consumer2.receiveNoWait();

View File

@ -459,7 +459,7 @@ public class BrokerTest extends BrokerTestSupport {
//due to async tx operations, we need some time for message count to go down
Thread.sleep(1000);
ArtemisBrokerWrapper wrapper = (ArtemisBrokerWrapper) broker.getBroker();
long messageCount = wrapper.getAMQueueMessageCount(destination.getPhysicalName());
long messageCount = wrapper.getAMQueueMessageCount(destination);
// The queue should now only have the remaining 2 messages
assertEquals(2, messageCount);
@ -1473,15 +1473,15 @@ public class BrokerTest extends BrokerTestSupport {
assertEquals(m.getMessageId(), message1.getMessageId());
ArtemisBrokerWrapper wrapper = (ArtemisBrokerWrapper) broker.getBroker();
long messageCount = wrapper.getAMQueueMessageCount(destination.getPhysicalName());
long messageCount = wrapper.getAMQueueMessageCount(destination);
assertTrue(messageCount == 2);
connection.send(createAck(consumerInfo, m, 1, MessageAck.DELIVERED_ACK_TYPE));
messageCount = wrapper.getAMQueueMessageCount(destination.getPhysicalName());
messageCount = wrapper.getAMQueueMessageCount(destination);
assertTrue(messageCount == 2);
connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
//give some time for broker to count down
Thread.sleep(2000);
messageCount = wrapper.getAMQueueMessageCount(destination.getPhysicalName());
messageCount = wrapper.getAMQueueMessageCount(destination);
assertTrue(messageCount == 1);
}

View File

@ -570,4 +570,8 @@ public class FakeQueue implements Queue {
public SimpleString getUser() {
return null;
}
}
@Override
public void decDelivering(int size) {
}
}