mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-02-28 22:39:27 +00:00
ARTEMIS-4726 fix original commit
The original commit (1ee3e884b707a659d924188048c2960a3b22df35) for this issue wasn't completely correct. This commit fixes those issues so that both the messageCount and scheduledMessageCount are accurate now when a scheduled message is removed by its ID.
This commit is contained in:
parent
3244965155
commit
c47713454c
@ -275,6 +275,11 @@ public class PagedReferenceImpl extends AbstractProtocolReference implements Pag
|
|||||||
getQueue().acknowledge(tx, this, reason, consumer, true);
|
getQueue().acknowledge(tx, this, reason, consumer, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer, boolean delivering) throws Exception {
|
||||||
|
getQueue().acknowledge(tx, this, reason, consumer, delivering);
|
||||||
|
}
|
||||||
|
|
||||||
/* (non-Javadoc)
|
/* (non-Javadoc)
|
||||||
* @see java.lang.Object#toString()
|
* @see java.lang.Object#toString()
|
||||||
*/
|
*/
|
||||||
|
@ -114,6 +114,8 @@ public interface MessageReference {
|
|||||||
|
|
||||||
void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception;
|
void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception;
|
||||||
|
|
||||||
|
void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer, boolean delivering) throws Exception;
|
||||||
|
|
||||||
void emptyConsumerID();
|
void emptyConsumerID();
|
||||||
|
|
||||||
void setConsumerId(long consumerID);
|
void setConsumerId(long consumerID);
|
||||||
|
@ -168,6 +168,11 @@ public class GroupFirstMessageReference implements MessageReference {
|
|||||||
messageReference.acknowledge(tx, reason, consumer);
|
messageReference.acknowledge(tx, reason, consumer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer, boolean delivering) throws Exception {
|
||||||
|
messageReference.acknowledge(tx, reason, consumer, delivering);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void emptyConsumerID() {
|
public void emptyConsumerID() {
|
||||||
messageReference.emptyConsumerID();
|
messageReference.emptyConsumerID();
|
||||||
|
@ -266,10 +266,15 @@ public class MessageReferenceImpl extends AbstractProtocolReference implements M
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception {
|
public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception {
|
||||||
|
acknowledge(tx, reason, consumer, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer, boolean delivering) throws Exception {
|
||||||
if (tx == null) {
|
if (tx == null) {
|
||||||
getQueue().acknowledge(this, reason, consumer);
|
getQueue().acknowledge(this, reason, consumer);
|
||||||
} else {
|
} else {
|
||||||
getQueue().acknowledge(tx, this, reason, consumer, true);
|
getQueue().acknowledge(tx, this, reason, consumer, delivering);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -175,9 +175,10 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
|
|||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
MessageReference ref = iter.next().getRef();
|
MessageReference ref = iter.next().getRef();
|
||||||
if (ref.getMessage().getMessageID() == id) {
|
if (ref.getMessage().getMessageID() == id) {
|
||||||
ref.acknowledge(tx);
|
ref.acknowledge(tx, AckReason.NORMAL, null, false);
|
||||||
iter.remove();
|
iter.remove();
|
||||||
notifyScheduledReferencesUpdated();
|
notifyScheduledReferencesUpdated();
|
||||||
|
metrics.decrementMetrics(ref);
|
||||||
return ref;
|
return ref;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -557,8 +557,10 @@ public class ScheduledMessageTest extends ActiveMQTestBase {
|
|||||||
|
|
||||||
QueueControl queueControl = (QueueControl) server.getManagementService().getResource(ResourceNames.QUEUE + atestq);
|
QueueControl queueControl = (QueueControl) server.getManagementService().getResource(ResourceNames.QUEUE + atestq);
|
||||||
assertEquals(1, queueControl.getMessageCount());
|
assertEquals(1, queueControl.getMessageCount());
|
||||||
|
assertEquals(1, queueControl.getScheduledCount());
|
||||||
assertTrue(queueControl.removeMessage((long) queueControl.listScheduledMessages()[0].get("messageID")));
|
assertTrue(queueControl.removeMessage((long) queueControl.listScheduledMessages()[0].get("messageID")));
|
||||||
assertEquals(0, queueControl.getMessageCount());
|
assertEquals(0, queueControl.getMessageCount());
|
||||||
|
assertEquals(0, queueControl.getScheduledCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
Loading…
x
Reference in New Issue
Block a user