ARTEMIS-571 Fix issues in openwire testsuite

* Redelivery count fix
* Regression in BrokerTest
This commit is contained in:
Howard Gao 2016-06-17 10:49:01 +08:00 committed by Clebert Suconic
parent 279780c89d
commit 109ce6ded9
10 changed files with 56 additions and 18 deletions

View File

@ -43,6 +43,7 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormat;
public class AMQConsumer { public class AMQConsumer {
@ -271,6 +272,10 @@ public class AMQConsumer {
transaction.commit(true); transaction.commit(true);
} }
} }
if (ack.isExpiredAck()) {
//adjust delivering count for expired messages
this.serverConsumer.getQueue().decDelivering(ackList.size());
}
} }
public void browseFinished() { public void browseFinished() {
@ -314,14 +319,23 @@ public class AMQConsumer {
} }
} }
public void updateDeliveryCountAfterCancel(MessageReference ref) { public boolean updateDeliveryCountAfterCancel(MessageReference ref) {
long seqId = ref.getMessage().getMessageID(); long seqId = ref.getMessage().getMessageID();
long lastDelSeqId = info.getLastDeliveredSequenceId(); long lastDelSeqId = info.getLastDeliveredSequenceId();
//because delivering count is always one greater than redelivery count
//we adjust it down before further calculating.
ref.decrementDeliveryCount();
// This is a specific rule of the protocol // This is a specific rule of the protocol
if (!(lastDelSeqId < 0 || seqId <= lastDelSeqId)) { if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNKNOWN) {
ref.decrementDeliveryCount(); // this takes care of un-acked messages in non-tx deliveries
// tx cases are handled by
// org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection.CommandProcessor.processRollbackTransaction()
ref.incrementDeliveryCount();
} }
return true;
} }
/** /**

View File

@ -123,8 +123,7 @@ public class AMQSession implements SessionCallback {
@Override @Override
public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) { public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
if (consumer.getProtocolData() != null) { if (consumer.getProtocolData() != null) {
((AMQConsumer) consumer.getProtocolData()).updateDeliveryCountAfterCancel(ref); return ((AMQConsumer) consumer.getProtocolData()).updateDeliveryCountAfterCancel(ref);
return true;
} }
else { else {
return false; return false;

View File

@ -249,4 +249,6 @@ public interface Queue extends Bindable {
* @return the user who created this queue * @return the user who created this queue
*/ */
SimpleString getUser(); SimpleString getUser();
void decDelivering(int size);
} }

View File

@ -2854,6 +2854,11 @@ public class QueueImpl implements Queue {
deliveringCount.decrementAndGet(); deliveringCount.decrementAndGet();
} }
@Override
public void decDelivering(int size) {
deliveringCount.addAndGet(-size);
}
private void configureExpiry(final AddressSettings settings) { private void configureExpiry(final AddressSettings settings) {
this.expiryAddress = settings == null ? null : settings.getExpiryAddress(); this.expiryAddress = settings == null ? null : settings.getExpiryAddress();
} }

View File

@ -1289,5 +1289,10 @@ public class ScheduledDeliveryHandlerTest extends Assert {
public SimpleString getUser() { public SimpleString getUser() {
return null; return null;
} }
@Override
public void decDelivering(int size) {
}
} }
} }

View File

@ -41,12 +41,12 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import javax.management.MBeanServer; import javax.management.MBeanServer;
import javax.management.MBeanServerFactory; import javax.management.MBeanServerFactory;
public class ArtemisBrokerWrapper extends ArtemisBrokerBase { public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
protected final Map<String, SimpleString> testQueues = new HashMap<>(); protected final Map<String, SimpleString> testQueues = new HashMap<>();
protected JMSServerManagerImpl jmsServer; protected JMSServerManagerImpl jmsServer;
protected MBeanServer mbeanServer; protected MBeanServer mbeanServer;
@ -251,9 +251,18 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
} }
} }
public long getAMQueueMessageCount(String physicalName) { public long getAMQueueMessageCount(ActiveMQDestination amq5Dest) {
if (amq5Dest.isTopic()) {
throw new IllegalArgumentException("Method only accept queue type parameter.");
}
long count = 0; long count = 0;
String qname = "jms.queue." + physicalName; String qname = null;
if (amq5Dest.isTemporary()) {
qname = "jms.tempqueue." + amq5Dest.getPhysicalName();
}
else {
qname = "jms.queue." + amq5Dest.getPhysicalName();
}
Binding binding = server.getPostOffice().getBinding(new SimpleString(qname)); Binding binding = server.getPostOffice().getBinding(new SimpleString(qname));
if (binding != null) { if (binding != null) {
QueueImpl q = (QueueImpl) binding.getBindable(); QueueImpl q = (QueueImpl) binding.getBindable();

View File

@ -57,8 +57,8 @@ public class JMSDurableTopicRedeliverTest extends JmsTopicRedeliverTest {
assertEquals(((TextMessage) unackMessage).getText(), text); assertEquals(((TextMessage) unackMessage).getText(), text);
assertFalse(unackMessage.getJMSRedelivered()); assertFalse(unackMessage.getJMSRedelivered());
assertEquals(unackMessage.getIntProperty("JMSXDeliveryCount"), 1); assertEquals(unackMessage.getIntProperty("JMSXDeliveryCount"), 1);
consumeSession.close();
consumer.close(); consumer.close();
consumeSession.close();
// receive then acknowledge // receive then acknowledge
consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

View File

@ -136,9 +136,9 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport {
} }
// now lets receive it // now lets receive it
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);
TextMessage answer = (TextMessage) consumer.receiveNoWait(); TextMessage answer = (TextMessage) consumer.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg1"); assertEquals("Should have received a message!", answer.getText(), "Msg1");
answer = (TextMessage) consumer.receiveNoWait(); answer = (TextMessage) consumer.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg2"); assertEquals("Should have received a message!", answer.getText(), "Msg2");
if (transacted) { if (transacted) {
session.commit(); session.commit();
@ -157,9 +157,9 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport {
// now lets receive it // now lets receive it
MessageConsumer consumer1 = session.createConsumer(queue); MessageConsumer consumer1 = session.createConsumer(queue);
MessageConsumer consumer2 = session.createConsumer(queue); MessageConsumer consumer2 = session.createConsumer(queue);
TextMessage answer = (TextMessage) consumer1.receiveNoWait(); TextMessage answer = (TextMessage) consumer1.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg1"); assertEquals("Should have received a message!", answer.getText(), "Msg1");
answer = (TextMessage) consumer2.receiveNoWait(); answer = (TextMessage) consumer2.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg2"); assertEquals("Should have received a message!", answer.getText(), "Msg2");
answer = (TextMessage) consumer2.receiveNoWait(); answer = (TextMessage) consumer2.receiveNoWait();

View File

@ -459,7 +459,7 @@ public class BrokerTest extends BrokerTestSupport {
//due to async tx operations, we need some time for message count to go down //due to async tx operations, we need some time for message count to go down
Thread.sleep(1000); Thread.sleep(1000);
ArtemisBrokerWrapper wrapper = (ArtemisBrokerWrapper) broker.getBroker(); ArtemisBrokerWrapper wrapper = (ArtemisBrokerWrapper) broker.getBroker();
long messageCount = wrapper.getAMQueueMessageCount(destination.getPhysicalName()); long messageCount = wrapper.getAMQueueMessageCount(destination);
// The queue should now only have the remaining 2 messages // The queue should now only have the remaining 2 messages
assertEquals(2, messageCount); assertEquals(2, messageCount);
@ -1473,15 +1473,15 @@ public class BrokerTest extends BrokerTestSupport {
assertEquals(m.getMessageId(), message1.getMessageId()); assertEquals(m.getMessageId(), message1.getMessageId());
ArtemisBrokerWrapper wrapper = (ArtemisBrokerWrapper) broker.getBroker(); ArtemisBrokerWrapper wrapper = (ArtemisBrokerWrapper) broker.getBroker();
long messageCount = wrapper.getAMQueueMessageCount(destination.getPhysicalName()); long messageCount = wrapper.getAMQueueMessageCount(destination);
assertTrue(messageCount == 2); assertTrue(messageCount == 2);
connection.send(createAck(consumerInfo, m, 1, MessageAck.DELIVERED_ACK_TYPE)); connection.send(createAck(consumerInfo, m, 1, MessageAck.DELIVERED_ACK_TYPE));
messageCount = wrapper.getAMQueueMessageCount(destination.getPhysicalName()); messageCount = wrapper.getAMQueueMessageCount(destination);
assertTrue(messageCount == 2); assertTrue(messageCount == 2);
connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE)); connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
//give some time for broker to count down //give some time for broker to count down
Thread.sleep(2000); Thread.sleep(2000);
messageCount = wrapper.getAMQueueMessageCount(destination.getPhysicalName()); messageCount = wrapper.getAMQueueMessageCount(destination);
assertTrue(messageCount == 1); assertTrue(messageCount == 1);
} }

View File

@ -570,4 +570,8 @@ public class FakeQueue implements Queue {
public SimpleString getUser() { public SimpleString getUser() {
return null; return null;
} }
}
@Override
public void decDelivering(int size) {
}
}