mirror of https://github.com/apache/activemq.git
Merge pull request #1154 from cshannon/AMQ-9436
AMQ-9436 - Ensure message audit in queue store cursor is shared
This commit is contained in:
commit
65a6b805a0
|
@ -53,7 +53,9 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
|
|||
|
||||
@Override
|
||||
public synchronized void start() throws Exception {
|
||||
started = true;
|
||||
if (isStarted()) {
|
||||
return;
|
||||
}
|
||||
super.start();
|
||||
if (nonPersistent == null) {
|
||||
if (broker.getBrokerService().isPersistent()) {
|
||||
|
@ -76,7 +78,9 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
|
|||
|
||||
@Override
|
||||
public synchronized void stop() throws Exception {
|
||||
started = false;
|
||||
if (!isStarted()) {
|
||||
return;
|
||||
}
|
||||
if (nonPersistent != null) {
|
||||
nonPersistent.destroy();
|
||||
}
|
||||
|
|
|
@ -29,10 +29,13 @@ import javax.management.ObjectName;
|
|||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.activemq.ActiveMQMessageAudit;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.broker.ProducerBrokerExchange;
|
||||
import org.apache.activemq.broker.region.SubscriptionStatistics;
|
||||
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
|
||||
import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||
|
@ -117,6 +120,18 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
|
|||
queue.initialize();
|
||||
queue.start();
|
||||
|
||||
// verify that the cursor message audit is created and set with the
|
||||
// correct audit depth and shared with the persistent and non peristent
|
||||
// cursors
|
||||
final StoreQueueCursor messages = (StoreQueueCursor) queue.getMessages();
|
||||
ActiveMQMessageAudit messageAudit = messages.getMessageAudit();
|
||||
assertNotNull(messageAudit);
|
||||
assertEquals(auditDepth, messageAudit.getAuditDepth());
|
||||
assertSame(messageAudit, messages.getPersistent().getMessageAudit());
|
||||
assertSame(messageAudit, messages.getNonPersistent().getMessageAudit());
|
||||
// Verify calling start again doesn't re-initial the audit
|
||||
messages.start();
|
||||
assertSame(messageAudit, messages.getMessageAudit());
|
||||
|
||||
ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
|
||||
ProducerInfo producerInfo = new ProducerInfo();
|
||||
|
|
Loading…
Reference in New Issue