AMQ-6703 - fix regression in MBeanTest - StoreQueueCursor not sharing its audit - have purge use rollback and delegate to both cursors

This commit is contained in:
gtully 2018-05-18 11:36:54 +01:00
parent 72613aaba6
commit 2eff835ee2
2 changed files with 8 additions and 7 deletions

View File

@ -1307,6 +1307,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
try { try {
QueueMessageReference r = (QueueMessageReference) ref; QueueMessageReference r = (QueueMessageReference) ref;
removeMessage(c, r); removeMessage(c, r);
messages.rollback(r.getMessageId());
} catch (IOException e) { } catch (IOException e) {
} }
} }
@ -1314,10 +1315,6 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
// store // store
} while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0); } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0);
if (getMessages().getMessageAudit() != null) {
getMessages().getMessageAudit().clear();
}
if (this.destinationStatistics.getMessages().getCount() > 0) { if (this.destinationStatistics.getMessages().getCount() > 0) {
LOG.warn("{} after purge of {} messages, message count stats report: {}", getActiveMQDestination().getQualifiedName(), originalMessageCount, this.destinationStatistics.getMessages().getCount()); LOG.warn("{} after purge of {} messages, message count stats report: {}", getActiveMQDestination().getQualifiedName(), originalMessageCount, this.destinationStatistics.getMessages().getCount());
} }

View File

@ -20,6 +20,7 @@ import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.usage.SystemUsage;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -77,9 +78,6 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
public synchronized void stop() throws Exception { public synchronized void stop() throws Exception {
started = false; started = false;
if (nonPersistent != null) { if (nonPersistent != null) {
// nonPersistent.clear();
// nonPersistent.stop();
// nonPersistent.gc();
nonPersistent.destroy(); nonPersistent.destroy();
} }
persistent.stop(); persistent.stop();
@ -264,6 +262,12 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
} }
} }
@Override
public void rollback(MessageId id) {
nonPersistent.rollback(id);
persistent.rollback(id);
}
@Override @Override
public void setUseCache(boolean useCache) { public void setUseCache(boolean useCache) {
super.setUseCache(useCache); super.setUseCache(useCache);