mirror of https://github.com/apache/activemq.git
resolve AMQ-2020, we may want to push setBatch into the MessageStore inteface, see the use by the cursors when the cache is exhausted
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@722983 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e76909d357
commit
d7f34d9a6f
|
@ -863,6 +863,9 @@ public class Queue extends BaseDestination implements Task {
|
||||||
QueueMessageReference r = createMessageReference(m);
|
QueueMessageReference r = createMessageReference(m);
|
||||||
BrokerSupport.resend(context, m, dest);
|
BrokerSupport.resend(context, m, dest);
|
||||||
removeMessage(context, r);
|
removeMessage(context, r);
|
||||||
|
synchronized (messages) {
|
||||||
|
messages.rollback(r.getMessageId());
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -909,18 +912,12 @@ public class Queue extends BaseDestination implements Task {
|
||||||
IndirectMessageReference r = (IndirectMessageReference) ref;
|
IndirectMessageReference r = (IndirectMessageReference) ref;
|
||||||
if (filter.evaluate(context, r)) {
|
if (filter.evaluate(context, r)) {
|
||||||
// We should only move messages that can be locked.
|
// We should only move messages that can be locked.
|
||||||
Message m = r.getMessage();
|
moveMessageTo(context, ref.getMessage(), dest);
|
||||||
BrokerSupport.resend(context, m, dest);
|
|
||||||
removeMessage(context, r);
|
|
||||||
set.remove(r);
|
set.remove(r);
|
||||||
if (++movedCounter >= maximumMessages
|
if (++movedCounter >= maximumMessages
|
||||||
&& maximumMessages > 0) {
|
&& maximumMessages > 0) {
|
||||||
return movedCounter;
|
return movedCounter;
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
synchronized (messages) {
|
|
||||||
messages.rollback(r.getMessageId());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} while (set.size() < this.destinationStatistics.getMessages().getCount()
|
} while (set.size() < this.destinationStatistics.getMessages().getCount()
|
||||||
|
@ -1088,6 +1085,12 @@ public class Queue extends BaseDestination implements Task {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (ack.isPoisonAck()) {
|
||||||
|
// message gone to DLQ, is ok to allow redelivery
|
||||||
|
synchronized(messages) {
|
||||||
|
messages.rollback(reference.getMessageId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1097,9 +1100,6 @@ public class Queue extends BaseDestination implements Task {
|
||||||
synchronized(pagedInMessages) {
|
synchronized(pagedInMessages) {
|
||||||
pagedInMessages.remove(reference.getMessageId());
|
pagedInMessages.remove(reference.getMessageId());
|
||||||
}
|
}
|
||||||
synchronized(messages) {
|
|
||||||
messages.rollback(reference.getMessageId());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void messageExpired(ConnectionContext context,MessageReference reference) {
|
public void messageExpired(ConnectionContext context,MessageReference reference) {
|
||||||
|
|
|
@ -43,6 +43,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
||||||
protected boolean batchResetNeeded = true;
|
protected boolean batchResetNeeded = true;
|
||||||
protected boolean storeHasMessages = false;
|
protected boolean storeHasMessages = false;
|
||||||
protected int size;
|
protected int size;
|
||||||
|
private MessageId lastCachedId;
|
||||||
|
|
||||||
protected AbstractStoreCursor(Destination destination) {
|
protected AbstractStoreCursor(Destination destination) {
|
||||||
this.regionDestination=destination;
|
this.regionDestination=destination;
|
||||||
|
@ -154,12 +155,20 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
||||||
public final synchronized void addMessageLast(MessageReference node) throws Exception {
|
public final synchronized void addMessageLast(MessageReference node) throws Exception {
|
||||||
if (cacheEnabled && hasSpace()) {
|
if (cacheEnabled && hasSpace()) {
|
||||||
recoverMessage(node.getMessage(),true);
|
recoverMessage(node.getMessage(),true);
|
||||||
}else {
|
lastCachedId = node.getMessageId();
|
||||||
|
} else {
|
||||||
|
if (cacheEnabled) {
|
||||||
|
// sync with store on disabling the cache
|
||||||
|
setBatch(lastCachedId);
|
||||||
|
}
|
||||||
cacheEnabled=false;
|
cacheEnabled=false;
|
||||||
}
|
}
|
||||||
size++;
|
size++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void setBatch(MessageId messageId) {
|
||||||
|
}
|
||||||
|
|
||||||
public final synchronized void addMessageFirst(MessageReference node) throws Exception {
|
public final synchronized void addMessageFirst(MessageReference node) throws Exception {
|
||||||
cacheEnabled=false;
|
cacheEnabled=false;
|
||||||
size++;
|
size++;
|
||||||
|
|
|
@ -17,11 +17,14 @@
|
||||||
package org.apache.activemq.broker.region.cursors;
|
package org.apache.activemq.broker.region.cursors;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.InterruptedIOException;
|
||||||
|
|
||||||
import org.apache.activemq.broker.region.Queue;
|
import org.apache.activemq.broker.region.Queue;
|
||||||
import org.apache.activemq.command.Message;
|
import org.apache.activemq.command.Message;
|
||||||
import org.apache.activemq.command.MessageId;
|
import org.apache.activemq.command.MessageId;
|
||||||
import org.apache.activemq.store.MessageStore;
|
import org.apache.activemq.store.MessageStore;
|
||||||
|
import org.apache.activemq.store.amq.AMQMessageStore;
|
||||||
|
import org.apache.activemq.store.kahadaptor.KahaReferenceStore;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
@ -71,6 +74,20 @@ class QueueStorePrefetch extends AbstractStoreCursor {
|
||||||
this.store.resetBatching();
|
this.store.resetBatching();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void setBatch(MessageId messageId) {
|
||||||
|
AMQMessageStore amqStore = (AMQMessageStore) store;
|
||||||
|
try {
|
||||||
|
amqStore.flush();
|
||||||
|
} catch (InterruptedIOException e) {
|
||||||
|
LOG.debug("flush on setBatch resulted in exception", e);
|
||||||
|
}
|
||||||
|
KahaReferenceStore kahaStore =
|
||||||
|
(KahaReferenceStore) amqStore.getReferenceStore();
|
||||||
|
kahaStore.setBatch(messageId);
|
||||||
|
batchResetNeeded = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
protected void doFillBatch() throws Exception {
|
protected void doFillBatch() throws Exception {
|
||||||
this.store.recoverNextMessages(this.maxBatchSize, this);
|
this.store.recoverNextMessages(this.maxBatchSize, this);
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,9 +17,7 @@
|
||||||
package org.apache.activemq.broker.region.cursors;
|
package org.apache.activemq.broker.region.cursors;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.LinkedHashMap;
|
|
||||||
|
|
||||||
import org.apache.activemq.broker.region.MessageReference;
|
|
||||||
import org.apache.activemq.broker.region.Subscription;
|
import org.apache.activemq.broker.region.Subscription;
|
||||||
import org.apache.activemq.broker.region.Topic;
|
import org.apache.activemq.broker.region.Topic;
|
||||||
import org.apache.activemq.command.Message;
|
import org.apache.activemq.command.Message;
|
||||||
|
@ -39,7 +37,6 @@ import org.apache.commons.logging.LogFactory;
|
||||||
class TopicStorePrefetch extends AbstractStoreCursor {
|
class TopicStorePrefetch extends AbstractStoreCursor {
|
||||||
private static final Log LOG = LogFactory.getLog(TopicStorePrefetch.class);
|
private static final Log LOG = LogFactory.getLog(TopicStorePrefetch.class);
|
||||||
private TopicMessageStore store;
|
private TopicMessageStore store;
|
||||||
private final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> ();
|
|
||||||
private String clientId;
|
private String clientId;
|
||||||
private String subscriberName;
|
private String subscriberName;
|
||||||
private Subscription subscription;
|
private Subscription subscription;
|
||||||
|
|
|
@ -193,7 +193,7 @@ public class KahaReferenceStore implements ReferenceStore {
|
||||||
public void removeAllMessages(ConnectionContext context) throws IOException {
|
public void removeAllMessages(ConnectionContext context) throws IOException {
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
Set<MessageId> tmpSet = new HashSet(messageContainer.keySet());
|
Set<MessageId> tmpSet = new HashSet<MessageId>(messageContainer.keySet());
|
||||||
for (MessageId id:tmpSet) {
|
for (MessageId id:tmpSet) {
|
||||||
removeMessage(id);
|
removeMessage(id);
|
||||||
}
|
}
|
||||||
|
@ -255,5 +255,11 @@ public class KahaReferenceStore implements ReferenceStore {
|
||||||
* @see org.apache.activemq.store.ReferenceStore#setBatch(org.apache.activemq.command.MessageId)
|
* @see org.apache.activemq.store.ReferenceStore#setBatch(org.apache.activemq.command.MessageId)
|
||||||
*/
|
*/
|
||||||
public void setBatch(MessageId startAfter) {
|
public void setBatch(MessageId startAfter) {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
batchEntry = messageContainer.getEntry(startAfter);
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.activemq.ActiveMQSession;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.region.Queue;
|
import org.apache.activemq.broker.region.Queue;
|
||||||
import org.apache.activemq.broker.region.RegionBroker;
|
import org.apache.activemq.broker.region.RegionBroker;
|
||||||
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
|
||||||
|
|
||||||
public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {
|
public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {
|
||||||
|
@ -89,15 +90,13 @@ public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {
|
||||||
@Override
|
@Override
|
||||||
public void testLoadRequestReply() throws Exception {
|
public void testLoadRequestReply() throws Exception {
|
||||||
super.testLoadRequestReply();
|
super.testLoadRequestReply();
|
||||||
|
|
||||||
|
Thread.sleep(2000);
|
||||||
|
|
||||||
// some checks on the slave
|
// some checks on the slave
|
||||||
AdvisoryBroker ab = (AdvisoryBroker) slave.getBroker().getAdaptor(
|
AdvisoryBroker ab = (AdvisoryBroker) slave.getBroker().getAdaptor(
|
||||||
AdvisoryBroker.class);
|
AdvisoryBroker.class);
|
||||||
|
|
||||||
if (!deleteTempQueue || serverTransactional) {
|
|
||||||
// give temp destination removes a chance to perculate on connection.close
|
|
||||||
Thread.sleep(2000);
|
|
||||||
}
|
|
||||||
assertEquals("the temp queues should not be visible as they are removed", 1, ab.getAdvisoryDestinations().size());
|
assertEquals("the temp queues should not be visible as they are removed", 1, ab.getAdvisoryDestinations().size());
|
||||||
|
|
||||||
RegionBroker rb = (RegionBroker) slave.getBroker().getAdaptor(
|
RegionBroker rb = (RegionBroker) slave.getBroker().getAdaptor(
|
||||||
|
|
|
@ -0,0 +1,331 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.broker.region;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Vector;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import javax.jms.InvalidSelectorException;
|
||||||
|
import javax.management.ObjectName;
|
||||||
|
|
||||||
|
import junit.framework.TestCase;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
|
import org.apache.activemq.broker.ProducerBrokerExchange;
|
||||||
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||||
|
import org.apache.activemq.command.ConsumerInfo;
|
||||||
|
import org.apache.activemq.command.Message;
|
||||||
|
import org.apache.activemq.command.MessageAck;
|
||||||
|
import org.apache.activemq.command.MessageDispatchNotification;
|
||||||
|
import org.apache.activemq.command.MessageId;
|
||||||
|
import org.apache.activemq.command.MessagePull;
|
||||||
|
import org.apache.activemq.command.ProducerInfo;
|
||||||
|
import org.apache.activemq.command.Response;
|
||||||
|
import org.apache.activemq.filter.MessageEvaluationContext;
|
||||||
|
import org.apache.activemq.state.ProducerState;
|
||||||
|
import org.apache.activemq.store.MessageStore;
|
||||||
|
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author gtully
|
||||||
|
* @see https://issues.apache.org/activemq/browse/AMQ-2020
|
||||||
|
**/
|
||||||
|
public class QueueDuplicatesFromStoreTest extends TestCase {
|
||||||
|
private static final Log LOG = LogFactory
|
||||||
|
.getLog(QueueDuplicatesFromStoreTest.class);
|
||||||
|
|
||||||
|
ActiveMQQueue destination = new ActiveMQQueue("queue-"
|
||||||
|
+ QueueDuplicatesFromStoreTest.class.getSimpleName());
|
||||||
|
BrokerService brokerService;
|
||||||
|
|
||||||
|
final static String mesageIdRoot = "11111:22222:";
|
||||||
|
final int messageBytesSize = 256;
|
||||||
|
final String text = new String(new byte[messageBytesSize]);
|
||||||
|
|
||||||
|
final int ackStartIndex = 100;
|
||||||
|
final int ackWindow = 50;
|
||||||
|
final int ackBatchSize = 50;
|
||||||
|
final int fullWindow = 200;
|
||||||
|
final int count = 20000;
|
||||||
|
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
brokerService = new BrokerService();
|
||||||
|
brokerService.setUseJmx(false);
|
||||||
|
brokerService.deleteAllMessages();
|
||||||
|
brokerService.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
brokerService.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testNoDuplicateAfterCacheFullAndAckedWithLargeAuditDepth() throws Exception {
|
||||||
|
doTestNoDuplicateAfterCacheFullAndAcked(1024*10);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testNoDuplicateAfterCacheFullAndAckedWithSmallAuditDepth() throws Exception {
|
||||||
|
doTestNoDuplicateAfterCacheFullAndAcked(512);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void doTestNoDuplicateAfterCacheFullAndAcked(final int auditDepth) throws Exception {
|
||||||
|
final AMQPersistenceAdapter persistenceAdapter =
|
||||||
|
(AMQPersistenceAdapter) brokerService.getPersistenceAdapter();
|
||||||
|
final MessageStore queueMessageStore =
|
||||||
|
persistenceAdapter.createQueueMessageStore(destination);
|
||||||
|
final ConnectionContext contextNotInTx = new ConnectionContext();
|
||||||
|
final ConsumerInfo consumerInfo = new ConsumerInfo();
|
||||||
|
final DestinationStatistics destinationStatistics = new DestinationStatistics();
|
||||||
|
consumerInfo.setExclusive(true);
|
||||||
|
final Queue queue = new Queue(brokerService, destination,
|
||||||
|
queueMessageStore, destinationStatistics, null);
|
||||||
|
|
||||||
|
// a workaround for this issue
|
||||||
|
// queue.setUseCache(false);
|
||||||
|
queue.systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 10);
|
||||||
|
queue.setMaxAuditDepth(auditDepth);
|
||||||
|
queue.initialize();
|
||||||
|
queue.start();
|
||||||
|
|
||||||
|
|
||||||
|
ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
|
||||||
|
ProducerInfo producerInfo = new ProducerInfo();
|
||||||
|
ProducerState producerState = new ProducerState(producerInfo);
|
||||||
|
producerExchange.setProducerState(producerState);
|
||||||
|
producerExchange.setConnectionContext(contextNotInTx);
|
||||||
|
|
||||||
|
final CountDownLatch receivedLatch = new CountDownLatch(count);
|
||||||
|
final AtomicLong ackedCount = new AtomicLong(0);
|
||||||
|
final AtomicLong enqueueCounter = new AtomicLong(0);
|
||||||
|
final Vector<String> errors = new Vector<String>();
|
||||||
|
|
||||||
|
// populate the queue store, exceed memory limit so that cache is disabled
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
Message message = getMessage(i);
|
||||||
|
queue.send(producerExchange, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals("store count is correct", count, queueMessageStore
|
||||||
|
.getMessageCount());
|
||||||
|
|
||||||
|
// pull from store in small windows
|
||||||
|
Subscription subscription = new Subscription() {
|
||||||
|
|
||||||
|
public void add(MessageReference node) throws Exception {
|
||||||
|
if (enqueueCounter.get() != node.getMessageId().getProducerSequenceId()) {
|
||||||
|
errors.add("Not in sequence at: " + enqueueCounter.get() + ", received: "
|
||||||
|
+ node.getMessageId().getProducerSequenceId());
|
||||||
|
}
|
||||||
|
assertEquals("is in order", enqueueCounter.get(), node
|
||||||
|
.getMessageId().getProducerSequenceId());
|
||||||
|
receivedLatch.countDown();
|
||||||
|
enqueueCounter.incrementAndGet();
|
||||||
|
node.decrementReferenceCount();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void add(ConnectionContext context, Destination destination)
|
||||||
|
throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
|
public int countBeforeFull() {
|
||||||
|
if (isFull()) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
return fullWindow - (int) (enqueueCounter.get() - ackedCount.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void destroy() {
|
||||||
|
};
|
||||||
|
|
||||||
|
public void gc() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public ConsumerInfo getConsumerInfo() {
|
||||||
|
return consumerInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ConnectionContext getContext() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getDequeueCounter() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getDispatchedCounter() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getDispatchedQueueSize() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getEnqueueCounter() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getInFlightSize() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getInFlightUsage() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ObjectName getObjectName() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getPendingQueueSize() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getPrefetchSize() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getSelector() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isBrowser() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isFull() {
|
||||||
|
return (enqueueCounter.get() - ackedCount.get()) >= fullWindow;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isHighWaterMark() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isLowWaterMark() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isRecoveryRequired() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isSlave() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean matches(MessageReference node,
|
||||||
|
MessageEvaluationContext context) throws IOException {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean matches(ActiveMQDestination destination) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void processMessageDispatchNotification(
|
||||||
|
MessageDispatchNotification mdn) throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
|
public Response pullMessage(ConnectionContext context,
|
||||||
|
MessagePull pull) throws Exception {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<MessageReference> remove(ConnectionContext context,
|
||||||
|
Destination destination) throws Exception {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setObjectName(ObjectName objectName) {
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSelector(String selector)
|
||||||
|
throws InvalidSelectorException,
|
||||||
|
UnsupportedOperationException {
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updateConsumerPrefetch(int newPrefetch) {
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean addRecoveredMessage(ConnectionContext context,
|
||||||
|
MessageReference message) throws Exception {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ActiveMQDestination getActiveMQDestination() {
|
||||||
|
return destination;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void acknowledge(ConnectionContext context, MessageAck ack)
|
||||||
|
throws Exception {
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
queue.addSubscription(contextNotInTx, subscription);
|
||||||
|
int removeIndex = 0;
|
||||||
|
do {
|
||||||
|
// Simulate periodic acks in small but recent windows
|
||||||
|
long receivedCount = enqueueCounter.get();
|
||||||
|
if (receivedCount > ackStartIndex) {
|
||||||
|
if (receivedCount >= removeIndex + ackWindow) {
|
||||||
|
for (int j = 0; j < ackBatchSize; j++, removeIndex++) {
|
||||||
|
ackedCount.incrementAndGet();
|
||||||
|
MessageAck ack = new MessageAck();
|
||||||
|
ack.setLastMessageId(new MessageId(mesageIdRoot
|
||||||
|
+ removeIndex));
|
||||||
|
ack.setMessageCount(1);
|
||||||
|
queue.removeMessage(contextNotInTx, subscription,
|
||||||
|
new IndirectMessageReference(
|
||||||
|
getMessage(removeIndex)), ack);
|
||||||
|
|
||||||
|
}
|
||||||
|
if (removeIndex % 1000 == 0) {
|
||||||
|
LOG.info("acked: " + removeIndex);
|
||||||
|
persistenceAdapter.checkpoint(true);
|
||||||
|
persistenceAdapter.cleanup();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} while (!receivedLatch.await(0, TimeUnit.MILLISECONDS) && errors.isEmpty());
|
||||||
|
|
||||||
|
assertTrue("There are no errors: " + errors, errors.isEmpty());
|
||||||
|
assertEquals(count, enqueueCounter.get());
|
||||||
|
assertEquals("store count is correct", count - removeIndex,
|
||||||
|
queueMessageStore.getMessageCount());
|
||||||
|
}
|
||||||
|
|
||||||
|
private Message getMessage(int i) throws Exception {
|
||||||
|
ActiveMQTextMessage message = new ActiveMQTextMessage();
|
||||||
|
message.setMessageId(new MessageId(mesageIdRoot + i));
|
||||||
|
message.setDestination(destination);
|
||||||
|
message.setPersistent(true);
|
||||||
|
message.setResponseRequired(true);
|
||||||
|
message.setText("Msg:" + i + " " + text);
|
||||||
|
assertEquals(message.getMessageId().getProducerSequenceId(), i);
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue