mirror of https://github.com/apache/activemq.git
resolve AMQ-2149|https://issues.apache.org/activemq/browse/AMQ-2149 - isses with kaha reference store and duplicate messages, recovery of duplicaes and recovery with memory limits and session duplicate acks resulting in out of order message dispatch, more detail in the jira and test case
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@758678 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
27ea85e15a
commit
132f662209
|
@ -1067,7 +1067,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
|||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(getConsumerId() + " Ignoring Duplicate: " + md.getMessage());
|
||||
}
|
||||
ackLater(md, MessageAck.STANDARD_ACK_TYPE);
|
||||
acknowledge(md);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -212,6 +212,9 @@ public class TransactionContext implements XAResource {
|
|||
localTransactionEventListener.beginEvent();
|
||||
}
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Being:" + transactionId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -230,6 +233,10 @@ public class TransactionContext implements XAResource {
|
|||
|
||||
beforeEnd();
|
||||
if (transactionId != null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Rollback:" + transactionId);
|
||||
}
|
||||
|
||||
TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK);
|
||||
this.transactionId = null;
|
||||
this.connection.asyncSendPacket(info);
|
||||
|
@ -260,6 +267,10 @@ public class TransactionContext implements XAResource {
|
|||
|
||||
// Only send commit if the transaction was started.
|
||||
if (transactionId != null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Commit:" + transactionId);
|
||||
}
|
||||
|
||||
TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE);
|
||||
this.transactionId = null;
|
||||
// Notify the listener that the tx was committed back
|
||||
|
|
|
@ -432,6 +432,10 @@ public class ManagedRegionBroker extends RegionBroker {
|
|||
public boolean hasSpace() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean isDuplicate(MessageId id) {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
} catch (Throwable e) {
|
||||
LOG.error("Failed to browse messages for Subscription " + view, e);
|
||||
|
|
|
@ -18,8 +18,6 @@ package org.apache.activemq.broker.region;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.apache.activemq.advisory.AdvisorySupport;
|
||||
import org.apache.activemq.broker.Broker;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
|
|
|
@ -21,6 +21,8 @@ import java.util.ArrayList;
|
|||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.jms.InvalidSelectorException;
|
||||
import javax.jms.JMSException;
|
||||
|
@ -69,6 +71,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
|
||||
private boolean slowConsumer;
|
||||
|
||||
private CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
|
||||
|
||||
public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
|
||||
super(broker,context, info);
|
||||
this.usageManager=usageManager;
|
||||
|
@ -186,6 +190,15 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
// Handle the standard acknowledgment case.
|
||||
boolean callDispatchMatched = false;
|
||||
Destination destination = null;
|
||||
|
||||
if (!isSlave()) {
|
||||
while(!okForAckAsDispatchDone.await(100, TimeUnit.MILLISECONDS)) {
|
||||
LOG.warn("Ack before disaptch, waiting for recovery dispatch: " + ack);
|
||||
}
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("ack:" + ack);
|
||||
}
|
||||
synchronized(dispatchLock) {
|
||||
if (ack.isStandardAck()) {
|
||||
// First check if the ack matches the dispatched. When using failover this might
|
||||
|
@ -608,6 +621,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
if (message == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
okForAckAsDispatchDone.countDown();
|
||||
|
||||
// No reentrant lock - Patch needed to IndirectMessageReference on method lock
|
||||
if (!isSlave()) {
|
||||
|
||||
|
@ -648,6 +664,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
|
|||
node.getRegionDestination().getDestinationStatistics().getInflight().increment();
|
||||
}
|
||||
}
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(info.getDestination().getPhysicalName() + " dispatched: " + message.getMessageId());
|
||||
}
|
||||
if (info.isDispatchAsync()) {
|
||||
try {
|
||||
dispatchPending();
|
||||
|
|
|
@ -217,6 +217,10 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
public boolean hasSpace() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean isDuplicate(MessageId id) {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}else {
|
||||
int messageCount = store.getMessageCount();
|
||||
|
@ -540,8 +544,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node) throws IOException {
|
||||
messageConsumed(context, node);
|
||||
if (store != null && node.isPersistent()) {
|
||||
// the original ack may be a ranged ack, but we are trying to delete
|
||||
// a specific
|
||||
// the original ack may be a ranged ack, but we are trying to delete a specific
|
||||
// message store here so we need to convert to a non ranged ack.
|
||||
if (ack.getMessageCount() > 0) {
|
||||
// Dup the ack
|
||||
|
@ -1542,8 +1545,6 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
|
||||
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
|
||||
if (oldPercentUsage > newPercentUsage) {
|
||||
synchronized(messagesWaitingForSpace) {
|
||||
if (!messagesWaitingForSpace.isEmpty() && !memoryUsage.isFull()) {
|
||||
try {
|
||||
this.taskRunner.wakeup();
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -1552,5 +1553,3 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -238,6 +238,10 @@ public class Topic extends BaseDestination implements Task{
|
|||
public boolean hasSpace() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean isDuplicate(MessageId id) {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -494,6 +498,10 @@ public class Topic extends BaseDestination implements Task{
|
|||
public boolean hasSpace() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean isDuplicate(MessageId id) {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
|
||||
if (msgs != null) {
|
||||
|
|
|
@ -271,7 +271,7 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor {
|
|||
this.useCache = useCache;
|
||||
}
|
||||
|
||||
protected synchronized boolean isDuplicate(MessageId messageId) {
|
||||
public synchronized boolean isDuplicate(MessageId messageId) {
|
||||
if (!enableAudit || audit==null) {
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -72,6 +72,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
|||
}
|
||||
|
||||
public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
|
||||
boolean recovered = false;
|
||||
if (!isDuplicate(message.getMessageId())) {
|
||||
if (!cached) {
|
||||
message.setRegionDestination(regionDestination);
|
||||
|
@ -82,13 +83,14 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
|
|||
message.incrementReferenceCount();
|
||||
batchList.put(message.getMessageId(), message);
|
||||
clearIterator(true);
|
||||
recovered = true;
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Ignoring batched duplicated from store: " + message);
|
||||
LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor got duplicate: " + message);
|
||||
}
|
||||
storeHasMessages = true;
|
||||
}
|
||||
return true;
|
||||
return recovered;
|
||||
}
|
||||
|
||||
public final void reset() {
|
||||
|
|
|
@ -18,7 +18,6 @@ package org.apache.activemq.broker.region.cursors;
|
|||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
|
|
@ -26,4 +26,5 @@ public interface MessageRecoveryListener {
|
|||
boolean recoverMessage(Message message) throws Exception;
|
||||
boolean recoverMessageReference(MessageId ref) throws Exception;
|
||||
boolean hasSpace();
|
||||
boolean isDuplicate(MessageId ref);
|
||||
}
|
||||
|
|
|
@ -67,8 +67,9 @@ public interface ReferenceStore extends MessageStore {
|
|||
|
||||
/**
|
||||
* Adds a message reference to the message store
|
||||
* @return true if reference was added, false if it is a duplicate and not added
|
||||
*/
|
||||
void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException;
|
||||
boolean addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException;
|
||||
|
||||
/**
|
||||
* Looks up a message using either the String messageID or the
|
||||
|
|
|
@ -380,13 +380,19 @@ public class AMQMessageStore extends AbstractMessageStore {
|
|||
while (iterator.hasNext()) {
|
||||
Entry<MessageId, ReferenceData> entry = iterator.next();
|
||||
try {
|
||||
referenceStore.addMessageReference(context, entry.getKey(), entry.getValue());
|
||||
if (referenceStore.addMessageReference(context, entry.getKey(), entry.getValue())) {
|
||||
size++;
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("not adding duplicate reference: " + entry.getKey() + ", " + entry.getValue());
|
||||
}
|
||||
}
|
||||
AMQMessageStore.this.peristenceAdapter.removeInProgressDataFile(AMQMessageStore.this, entry
|
||||
.getValue().getFileId());
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("Message could not be added to long term store: " + e.getMessage(), e);
|
||||
}
|
||||
size++;
|
||||
|
||||
// Commit the batch if it's getting too big
|
||||
if (size >= maxCheckpointMessageAddSize) {
|
||||
persitanceAdapter.commitTransaction(context);
|
||||
|
|
|
@ -40,6 +40,10 @@ final class RecoveryListenerAdapter implements MessageRecoveryListener {
|
|||
return listener.hasSpace();
|
||||
}
|
||||
|
||||
public boolean isDuplicate(MessageId id) {
|
||||
return listener.isDuplicate(id);
|
||||
}
|
||||
|
||||
public boolean recoverMessage(Message message) throws Exception {
|
||||
if (listener.hasSpace()) {
|
||||
listener.recoverMessage(message);
|
||||
|
@ -55,7 +59,8 @@ final class RecoveryListenerAdapter implements MessageRecoveryListener {
|
|||
if (message != null) {
|
||||
return recoverMessage(message);
|
||||
} else {
|
||||
throw new IllegalStateException("Message id " + ref + " could not be recovered from the data store - already dispatched");
|
||||
throw new IllegalStateException("Message id " + ref + " could not be recovered from the data store for: " + store.getDestination().getQualifiedName()
|
||||
+ " - already dispatched");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -29,9 +29,9 @@ import org.apache.activemq.command.MessageAck;
|
|||
import org.apache.activemq.command.MessageId;
|
||||
import org.apache.activemq.kaha.MapContainer;
|
||||
import org.apache.activemq.kaha.StoreEntry;
|
||||
import org.apache.activemq.store.AbstractMessageStore;
|
||||
import org.apache.activemq.store.MessageRecoveryListener;
|
||||
import org.apache.activemq.store.ReferenceStore;
|
||||
import org.apache.activemq.store.AbstractMessageStore;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
|
@ -122,11 +122,17 @@ public class KahaReferenceStore extends AbstractMessageStore implements Referenc
|
|||
if ( recoverReference(listener, msg)) {
|
||||
count++;
|
||||
lastBatchId = msg.getMessageId();
|
||||
} else {
|
||||
} else if (!listener.isDuplicate(new MessageId(msg.getMessageId()))) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(destination.getQualifiedName() + " did not recover:" + msg.getMessageId());
|
||||
LOG.debug(destination.getQualifiedName() + " did not recover (will retry) message: " + msg.getMessageId());
|
||||
}
|
||||
// give usage limits a chance to reclaim
|
||||
break;
|
||||
} else {
|
||||
// skip duplicate and continue
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(destination.getQualifiedName() + " skipping duplicate, " + msg.getMessageId());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
lastBatchId = null;
|
||||
|
@ -140,16 +146,26 @@ public class KahaReferenceStore extends AbstractMessageStore implements Referenc
|
|||
}
|
||||
}
|
||||
|
||||
public void addMessageReference(ConnectionContext context, MessageId messageId,
|
||||
public boolean addMessageReference(ConnectionContext context, MessageId messageId,
|
||||
ReferenceData data) throws IOException {
|
||||
|
||||
boolean uniqueueReferenceAdded = false;
|
||||
lock.lock();
|
||||
try {
|
||||
if (!messageContainer.containsKey(messageId)) {
|
||||
ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
|
||||
messageContainer.put(messageId, record);
|
||||
uniqueueReferenceAdded = true;
|
||||
addInterest(record);
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(destination.getPhysicalName() + " ignoring duplicated (add) message reference:" + messageId);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
return uniqueueReferenceAdded;
|
||||
}
|
||||
|
||||
public ReferenceData getMessageReference(MessageId identity) throws IOException {
|
||||
|
|
|
@ -75,7 +75,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
|
|||
throw new RuntimeException("Use addMessageReference instead");
|
||||
}
|
||||
|
||||
public void addMessageReference(final ConnectionContext context, final MessageId messageId,
|
||||
public boolean addMessageReference(final ConnectionContext context, final MessageId messageId,
|
||||
final ReferenceData data) {
|
||||
lock.lock();
|
||||
try {
|
||||
|
@ -100,6 +100,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
|
|||
}finally {
|
||||
lock.unlock();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public ReferenceData getMessageReference(final MessageId identity) throws IOException {
|
||||
|
|
|
@ -486,7 +486,9 @@ public class FailoverTransport implements CompositeTransport {
|
|||
return;
|
||||
|
||||
} catch (IOException e) {
|
||||
LOG.debug("Send oneway attempt: " + i + " failed.");
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
|
||||
}
|
||||
handleTransportFailure(e);
|
||||
}
|
||||
}
|
||||
|
@ -622,6 +624,9 @@ public class FailoverTransport implements CompositeTransport {
|
|||
}
|
||||
for (Iterator<Command> iter2 = tmpMap.values().iterator(); iter2.hasNext();) {
|
||||
Command command = iter2.next();
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("restore, replay: " + command);
|
||||
}
|
||||
t.oneway(command);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -268,7 +268,11 @@ public abstract class Usage<T extends Usage> implements Service {
|
|||
}
|
||||
|
||||
};
|
||||
if (started.get()) {
|
||||
getExecutor().execute(listenerNotifier);
|
||||
} else {
|
||||
LOG.warn("not notifying usage change to listeners on shutdown");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -22,8 +22,6 @@ import java.util.Timer;
|
|||
import java.util.TimerTask;
|
||||
import java.util.Vector;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.JMSException;
|
||||
|
@ -33,11 +31,15 @@ import javax.jms.MessageListener;
|
|||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.broker.region.DestinationStatistics;
|
||||
import org.apache.activemq.broker.region.RegionBroker;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
|
||||
import org.apache.activemq.usage.MemoryUsage;
|
||||
|
@ -53,7 +55,7 @@ public class AMQ2149Test extends TestCase {
|
|||
|
||||
private static final Log LOG = LogFactory.getLog(AMQ2149Test.class);
|
||||
|
||||
private static final long BROKER_STOP_PERIOD = 15 * 1000;
|
||||
private static final long BROKER_STOP_PERIOD = 20 * 1000;
|
||||
|
||||
private static final String BROKER_CONNECTOR = "tcp://localhost:61617";
|
||||
private static final String BROKER_URL = "failover:("+ BROKER_CONNECTOR
|
||||
|
@ -62,7 +64,7 @@ public class AMQ2149Test extends TestCase {
|
|||
private final String SEQ_NUM_PROPERTY = "seqNum";
|
||||
|
||||
final int MESSAGE_LENGTH_BYTES = 75000;
|
||||
final int MAX_TO_SEND = 2000;
|
||||
final int MAX_TO_SEND = 1500;
|
||||
final long SLEEP_BETWEEN_SEND_MS = 3;
|
||||
final int NUM_SENDERS_AND_RECEIVERS = 10;
|
||||
final Object brokerLock = new Object();
|
||||
|
@ -144,7 +146,7 @@ public class AMQ2149Test extends TestCase {
|
|||
public void onMessage(Message message) {
|
||||
try {
|
||||
final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
|
||||
if ((seqNum % 100) == 0) {
|
||||
if ((seqNum % 500) == 0) {
|
||||
LOG.info(queueName + " received " + seqNum);
|
||||
}
|
||||
if (seqNum != nextExpectedSeqNum) {
|
||||
|
@ -192,7 +194,7 @@ public class AMQ2149Test extends TestCase {
|
|||
|
||||
public void run() {
|
||||
final String longString = buildLongString();
|
||||
while (nextSequenceNumber <= MAX_TO_SEND) {
|
||||
while (nextSequenceNumber < MAX_TO_SEND) {
|
||||
try {
|
||||
final Message message = session
|
||||
.createTextMessage(longString);
|
||||
|
@ -219,7 +221,20 @@ public class AMQ2149Test extends TestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void x_testOrderWithMemeUsageLimit() throws Exception {
|
||||
// no need to run this unless there are some issues with the others
|
||||
public void vanilaVerify_testOrder() throws Exception {
|
||||
|
||||
createBroker(new Configurer() {
|
||||
public void configure(BrokerService broker) throws Exception {
|
||||
broker.deleteAllMessages();
|
||||
}
|
||||
});
|
||||
|
||||
verifyOrderedMessageReceipt();
|
||||
verifyStats(false);
|
||||
}
|
||||
|
||||
public void testOrderWithMemeUsageLimit() throws Exception {
|
||||
|
||||
createBroker(new Configurer() {
|
||||
public void configure(BrokerService broker) throws Exception {
|
||||
|
@ -234,9 +249,10 @@ public class AMQ2149Test extends TestCase {
|
|||
});
|
||||
|
||||
verifyOrderedMessageReceipt();
|
||||
verifyStats(false);
|
||||
}
|
||||
|
||||
public void testOrderWithRestartVMIndex() throws Exception {
|
||||
public void testOrderWithRestartAndVMIndex() throws Exception {
|
||||
createBroker(new Configurer() {
|
||||
public void configure(BrokerService broker) throws Exception {
|
||||
AMQPersistenceAdapterFactory persistenceFactory =
|
||||
|
@ -260,10 +276,11 @@ public class AMQ2149Test extends TestCase {
|
|||
} finally {
|
||||
timer.cancel();
|
||||
}
|
||||
verifyStats(true);
|
||||
}
|
||||
|
||||
|
||||
public void x_testOrderWithRestart() throws Exception {
|
||||
public void testOrderWithRestart() throws Exception {
|
||||
createBroker(new Configurer() {
|
||||
public void configure(BrokerService broker) throws Exception {
|
||||
broker.deleteAllMessages();
|
||||
|
@ -278,10 +295,45 @@ public class AMQ2149Test extends TestCase {
|
|||
} finally {
|
||||
timer.cancel();
|
||||
}
|
||||
|
||||
verifyStats(true);
|
||||
}
|
||||
|
||||
|
||||
public void x_testOrderWithRestartWithForceRecover() throws Exception {
|
||||
public void testOrderWithRestartAndNoCache() throws Exception {
|
||||
|
||||
PolicyEntry noCache = new PolicyEntry();
|
||||
noCache.setUseCache(false);
|
||||
final PolicyMap policyMap = new PolicyMap();
|
||||
policyMap.setDefaultEntry(noCache);
|
||||
|
||||
createBroker(new Configurer() {
|
||||
public void configure(BrokerService broker) throws Exception {
|
||||
broker.setDestinationPolicy(policyMap);
|
||||
broker.deleteAllMessages();
|
||||
}
|
||||
});
|
||||
|
||||
final Timer timer = new Timer();
|
||||
schedualRestartTask(timer, new Configurer() {
|
||||
public void configure(BrokerService broker) throws Exception {
|
||||
broker.setDestinationPolicy(policyMap);
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
verifyOrderedMessageReceipt();
|
||||
} finally {
|
||||
timer.cancel();
|
||||
}
|
||||
|
||||
verifyStats(true);
|
||||
}
|
||||
|
||||
|
||||
// no need to run this unless there are issues with the other restart tests
|
||||
|
||||
public void eaiserToRepoduce_testOrderWithRestartWithForceRecover() throws Exception {
|
||||
createBroker(new Configurer() {
|
||||
public void configure(BrokerService broker) throws Exception {
|
||||
AMQPersistenceAdapterFactory persistenceFactory =
|
||||
|
@ -297,11 +349,6 @@ public class AMQ2149Test extends TestCase {
|
|||
AMQPersistenceAdapterFactory persistenceFactory =
|
||||
(AMQPersistenceAdapterFactory) broker.getPersistenceFactory();
|
||||
persistenceFactory.setForceRecoverReferenceStore(true);
|
||||
// PolicyEntry auditDepthPolicy = new PolicyEntry();
|
||||
// auditDepthPolicy.setMaxAuditDepth(2000);
|
||||
// PolicyMap policyMap = new PolicyMap();
|
||||
// policyMap.setDefaultEntry(auditDepthPolicy);
|
||||
// broker.setDestinationPolicy(policyMap);
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -310,6 +357,23 @@ public class AMQ2149Test extends TestCase {
|
|||
} finally {
|
||||
timer.cancel();
|
||||
}
|
||||
|
||||
verifyStats(true);
|
||||
}
|
||||
|
||||
private void verifyStats(boolean brokerRestarts) throws Exception {
|
||||
RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
|
||||
|
||||
for (Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) {
|
||||
DestinationStatistics stats = dest.getDestinationStatistics();
|
||||
if (brokerRestarts) {
|
||||
assertTrue("qneue/dequeue match for: " + dest.getName(),
|
||||
stats.getEnqueues().getCount() <= stats.getDequeues().getCount());
|
||||
} else {
|
||||
assertEquals("qneue/dequeue match for: " + dest.getName(),
|
||||
stats.getEnqueues().getCount(), stats.getDequeues().getCount());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void schedualRestartTask(final Timer timer, final Configurer configurer) {
|
||||
|
@ -319,6 +383,7 @@ public class AMQ2149Test extends TestCase {
|
|||
LOG.info("stopping broker..");
|
||||
try {
|
||||
broker.stop();
|
||||
broker.waitUntilStopped();
|
||||
} catch (Exception e) {
|
||||
LOG.error("ex on broker stop", e);
|
||||
exceptions.add(e);
|
||||
|
@ -355,7 +420,7 @@ public class AMQ2149Test extends TestCase {
|
|||
threads.add(thread);
|
||||
}
|
||||
|
||||
final long expiry = System.currentTimeMillis() + 1000 * 60 * 10;
|
||||
final long expiry = System.currentTimeMillis() + 1000 * 60 * 20;
|
||||
while(!threads.isEmpty() && exceptions.isEmpty() && System.currentTimeMillis() < expiry) {
|
||||
Thread sendThread = threads.firstElement();
|
||||
sendThread.join(1000*10);
|
||||
|
|
|
@ -24,10 +24,14 @@ import java.util.Map;
|
|||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.activemq.kaha.impl.container.BaseContainerImpl;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
public class MapContainerTest extends TestCase {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(MapContainerTest.class);
|
||||
protected static final int COUNT = 10;
|
||||
|
||||
protected String name = "test";
|
||||
|
@ -180,6 +184,71 @@ public class MapContainerTest extends TestCase {
|
|||
assertTrue(container.isEmpty());
|
||||
}
|
||||
|
||||
|
||||
public void testDuplicatesOk() throws Exception {
|
||||
StoreEntry first, entry;
|
||||
|
||||
container.put("M1", "DD");
|
||||
first = container.getFirst();
|
||||
LOG.info("First=" + first);
|
||||
assertEquals(-1, first.getNextItem());
|
||||
|
||||
// add duplicate
|
||||
String old = container.put("M1", "DD");
|
||||
assertNotNull(old);
|
||||
assertEquals(1, container.size());
|
||||
|
||||
entry = container.getFirst();
|
||||
LOG.info("New First=" + entry);
|
||||
assertEquals(-1, entry.getNextItem());
|
||||
|
||||
assertEquals(first, entry);
|
||||
|
||||
container.remove("M1");
|
||||
|
||||
entry = container.getFirst();
|
||||
assertNull(entry);
|
||||
}
|
||||
|
||||
|
||||
public void testDuplicatesFreeListShared() throws Exception {
|
||||
StoreEntry batchEntry;
|
||||
|
||||
MapContainer other = store.getMapContainer(getName()+"2", "test", true);
|
||||
other.load();
|
||||
other.put("M1", "DD");
|
||||
|
||||
container.put("M1", "DD");
|
||||
batchEntry = container.getFirst();
|
||||
LOG.info("First=" + batchEntry);
|
||||
assertEquals(-1, batchEntry.getNextItem());
|
||||
|
||||
// have something on free list before duplicate
|
||||
other.remove("M1");
|
||||
|
||||
// add duplicate
|
||||
String old = container.put("M1", "DD");
|
||||
assertNotNull(old);
|
||||
assertEquals(1, container.size());
|
||||
|
||||
// entry now on free list on its own
|
||||
batchEntry = container.refresh(batchEntry);
|
||||
assertEquals(-1, batchEntry.getNextItem());
|
||||
LOG.info("refreshed=" + batchEntry);
|
||||
|
||||
// ack
|
||||
container.remove("M1");
|
||||
|
||||
//container is valid (empty)
|
||||
assertNull(container.getFirst());
|
||||
|
||||
// batchEntry now has next as there is another on the free list
|
||||
batchEntry = container.refresh(batchEntry);
|
||||
LOG.info("refreshed=" + batchEntry);
|
||||
|
||||
assertTrue(batchEntry.getNextItem() != -1);
|
||||
}
|
||||
|
||||
protected Store getStore() throws IOException {
|
||||
return StoreFactory.open(name, "rw");
|
||||
}
|
||||
|
@ -188,7 +257,7 @@ public class MapContainerTest extends TestCase {
|
|||
super.setUp();
|
||||
name = System.getProperty("basedir", ".") + "/target/activemq-data/map-container.db";
|
||||
store = getStore();
|
||||
container = store.getMapContainer("test", "test", true);
|
||||
container = store.getMapContainer(getName(), "test", true);
|
||||
container.load();
|
||||
testMap = new HashMap<String, String>();
|
||||
for (int i = 0; i < COUNT; i++) {
|
||||
|
|
|
@ -66,7 +66,7 @@ public class JPAReferenceStore extends AbstractMessageStore implements Reference
|
|||
throw new RuntimeException("Use addMessageReference instead");
|
||||
}
|
||||
|
||||
public void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException {
|
||||
public boolean addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException {
|
||||
EntityManager manager = adapter.beginEntityManager(context);
|
||||
try {
|
||||
|
||||
|
@ -85,6 +85,7 @@ public class JPAReferenceStore extends AbstractMessageStore implements Reference
|
|||
throw IOExceptionSupport.create(e);
|
||||
}
|
||||
adapter.commitEntityManager(context, manager);
|
||||
return true;
|
||||
}
|
||||
|
||||
public ReferenceData getMessageReference(MessageId identity) throws IOException {
|
||||
|
|
Loading…
Reference in New Issue