Reduce contention on the AMQ Store

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@618981 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-02-06 12:23:41 +00:00
parent 65beee6895
commit 149428b5bc
13 changed files with 393 additions and 225 deletions

View File

@ -277,4 +277,8 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
} }
} }
} }
protected boolean isDropped(MessageReference node) {
return false;
}
} }

View File

@ -452,7 +452,10 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (node == null) { if (node == null) {
break; break;
} }
if (canDispatch(node)) { if(isDropped(node)) {
pending.remove();
}
else if (canDispatch(node)) {
pending.remove(); pending.remove();
// Message may have been sitting in the pending // Message may have been sitting in the pending
// list a while waiting for the consumer to ak the message. // list a while waiting for the consumer to ak the message.
@ -574,6 +577,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
* @throws IOException * @throws IOException
*/ */
protected abstract boolean canDispatch(MessageReference node) throws IOException; protected abstract boolean canDispatch(MessageReference node) throws IOException;
protected abstract boolean isDropped(MessageReference node);
/** /**
* Used during acknowledgment to remove the message. * Used during acknowledgment to remove the message.

View File

@ -205,4 +205,14 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
public void destroy() { public void destroy() {
} }
protected boolean isDropped(MessageReference node) {
boolean result = false;
if(node instanceof IndirectMessageReference) {
QueueMessageReference qmr = (QueueMessageReference) node;
result = qmr.isDropped();
}
return result;
}
} }

View File

@ -38,6 +38,7 @@ import org.apache.commons.logging.LogFactory;
*/ */
public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener, UsageListener { public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener, UsageListener {
private static final Log LOG = LogFactory.getLog(AbstractStoreCursor.class); private static final Log LOG = LogFactory.getLog(AbstractStoreCursor.class);
protected static final int MAX_FILL_ATTEMPTS=3;
protected final Destination regionDestination; protected final Destination regionDestination;
protected final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> (); protected final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> ();
protected boolean cacheEnabled=false; protected boolean cacheEnabled=false;
@ -180,7 +181,10 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
resetBatch(); resetBatch();
this.batchResetNeeded = false; this.batchResetNeeded = false;
} }
while (this.batchList.isEmpty() && (this.storeHasMessages || size > 0)) { //we may have to move the store cursor past messages that have
//already been delivered - but we also don't want it to spin
int fillAttempts=0;
while (fillAttempts < MAX_FILL_ATTEMPTS && this.batchList.isEmpty() && (this.storeHasMessages ||this.size >0)) {
this.storeHasMessages = false; this.storeHasMessages = false;
try { try {
doFillBatch(); doFillBatch();
@ -191,6 +195,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
if (!this.batchList.isEmpty()) { if (!this.batchList.isEmpty()) {
this.storeHasMessages=true; this.storeHasMessages=true;
} }
fillAttempts++;
} }
} }

View File

@ -18,6 +18,7 @@
package org.apache.activemq.store; package org.apache.activemq.store;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.locks.Lock;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
@ -82,5 +83,7 @@ public interface ReferenceStore extends MessageStore {
boolean supportsExternalBatchControl(); boolean supportsExternalBatchControl();
void setBatch(MessageId startAfter); void setBatch(MessageId startAfter);
Lock getStoreLock();
} }

View File

@ -29,6 +29,7 @@ import java.util.Set;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
@ -78,9 +79,11 @@ public class AMQMessageStore implements MessageStore {
private Map<MessageId, ReferenceData> cpAddedMessageIds; private Map<MessageId, ReferenceData> cpAddedMessageIds;
private final boolean debug = LOG.isDebugEnabled(); private final boolean debug = LOG.isDebugEnabled();
private final AtomicReference<Location> mark = new AtomicReference<Location>(); private final AtomicReference<Location> mark = new AtomicReference<Location>();
protected final Lock lock;
public AMQMessageStore(AMQPersistenceAdapter adapter, ReferenceStore referenceStore, ActiveMQDestination destination) { public AMQMessageStore(AMQPersistenceAdapter adapter,ReferenceStore referenceStore, ActiveMQDestination destination) {
this.peristenceAdapter = adapter; this.peristenceAdapter = adapter;
this.lock=referenceStore.getStoreLock();
this.transactionStore = adapter.getTransactionStore(); this.transactionStore = adapter.getTransactionStore();
this.referenceStore = referenceStore; this.referenceStore = referenceStore;
this.destination = destination; this.destination = destination;
@ -99,7 +102,7 @@ public class AMQMessageStore implements MessageStore {
} }
/** /**
* Not synchronized since the Journal has better throughput if you increase * Not synchronize since the Journal has better throughput if you increase
* the number of concurrent writes that it is doing. * the number of concurrent writes that it is doing.
*/ */
public final void addMessage(ConnectionContext context, final Message message) throws IOException { public final void addMessage(ConnectionContext context, final Message message) throws IOException {
@ -114,8 +117,11 @@ public class AMQMessageStore implements MessageStore {
if (debug) { if (debug) {
LOG.debug("Journalled transacted message add for: " + id + ", at: " + location); LOG.debug("Journalled transacted message add for: " + id + ", at: " + location);
} }
synchronized (this) { lock.lock();
try {
inFlightTxLocations.add(location); inFlightTxLocations.add(location);
}finally {
lock.unlock();
} }
transactionStore.addMessage(this, message, location); transactionStore.addMessage(this, message, location);
context.getTransaction().addSynchronization(new Synchronization() { context.getTransaction().addSynchronization(new Synchronization() {
@ -124,8 +130,11 @@ public class AMQMessageStore implements MessageStore {
if (debug) { if (debug) {
LOG.debug("Transacted message add commit for: " + id + ", at: " + location); LOG.debug("Transacted message add commit for: " + id + ", at: " + location);
} }
synchronized (AMQMessageStore.this) { lock.lock();
try {
inFlightTxLocations.remove(location); inFlightTxLocations.remove(location);
}finally {
lock.unlock();
} }
addMessage(message, location); addMessage(message, location);
} }
@ -134,8 +143,11 @@ public class AMQMessageStore implements MessageStore {
if (debug) { if (debug) {
LOG.debug("Transacted message add rollback for: " + id + ", at: " + location); LOG.debug("Transacted message add rollback for: " + id + ", at: " + location);
} }
synchronized (AMQMessageStore.this) { lock.lock();
try {
inFlightTxLocations.remove(location); inFlightTxLocations.remove(location);
}finally {
lock.unlock();
} }
} }
}); });
@ -147,10 +159,13 @@ public class AMQMessageStore implements MessageStore {
data.setExpiration(message.getExpiration()); data.setExpiration(message.getExpiration());
data.setFileId(location.getDataFileId()); data.setFileId(location.getDataFileId());
data.setOffset(location.getOffset()); data.setOffset(location.getOffset());
synchronized (this) { lock.lock();
try {
lastLocation = location; lastLocation = location;
messages.put(message.getMessageId(), data); messages.put(message.getMessageId(), data);
this.peristenceAdapter.addInProgressDataFile(this, location.getDataFileId()); this.peristenceAdapter.addInProgressDataFile(this, location.getDataFileId());
}finally {
lock.unlock();
} }
if (messages.size() > this.peristenceAdapter if (messages.size() > this.peristenceAdapter
.getMaxCheckpointMessageAddSize()) { .getMaxCheckpointMessageAddSize()) {
@ -199,8 +214,11 @@ public class AMQMessageStore implements MessageStore {
if (debug) { if (debug) {
LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location); LOG.debug("Journalled transacted message remove for: " + ack.getLastMessageId() + ", at: " + location);
} }
synchronized (this) { lock.lock();
try {
inFlightTxLocations.add(location); inFlightTxLocations.add(location);
}finally {
lock.unlock();
} }
transactionStore.removeMessage(this, ack, location); transactionStore.removeMessage(this, ack, location);
context.getTransaction().addSynchronization(new Synchronization() { context.getTransaction().addSynchronization(new Synchronization() {
@ -209,9 +227,12 @@ public class AMQMessageStore implements MessageStore {
if (debug) { if (debug) {
LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: " + location); LOG.debug("Transacted message remove commit for: " + ack.getLastMessageId() + ", at: " + location);
} }
synchronized (AMQMessageStore.this) { lock.lock();
try {
inFlightTxLocations.remove(location); inFlightTxLocations.remove(location);
removeMessage(ack,location); removeMessage(ack,location);
}finally {
lock.unlock();
} }
} }
@ -219,8 +240,11 @@ public class AMQMessageStore implements MessageStore {
if (debug) { if (debug) {
LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: " + location); LOG.debug("Transacted message remove rollback for: " + ack.getLastMessageId() + ", at: " + location);
} }
synchronized (AMQMessageStore.this) { lock.lock();
try {
inFlightTxLocations.remove(location); inFlightTxLocations.remove(location);
}finally {
lock.unlock();
} }
} }
}); });
@ -229,13 +253,16 @@ public class AMQMessageStore implements MessageStore {
final void removeMessage(final MessageAck ack, final Location location) throws InterruptedIOException { final void removeMessage(final MessageAck ack, final Location location) throws InterruptedIOException {
ReferenceData data; ReferenceData data;
synchronized (this) { lock.lock();
try{
lastLocation = location; lastLocation = location;
MessageId id = ack.getLastMessageId(); MessageId id = ack.getLastMessageId();
data = messages.remove(id); data = messages.remove(id);
if (data == null) { if (data == null) {
messageAcks.add(ack); messageAcks.add(ack);
} }
}finally {
lock.unlock();
} }
if (messageAcks.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) { if (messageAcks.size() > this.peristenceAdapter.getMaxCheckpointMessageAddSize()) {
flush(); flush();
@ -273,7 +300,8 @@ public class AMQMessageStore implements MessageStore {
LOG.debug("flush starting ..."); LOG.debug("flush starting ...");
} }
CountDownLatch countDown; CountDownLatch countDown;
synchronized (this) { lock.lock();
try {
if (lastWrittenLocation == lastLocation) { if (lastWrittenLocation == lastLocation) {
return; return;
} }
@ -281,6 +309,8 @@ public class AMQMessageStore implements MessageStore {
flushLatch = new CountDownLatch(1); flushLatch = new CountDownLatch(1);
} }
countDown = flushLatch; countDown = flushLatch;
}finally {
lock.unlock();
} }
try { try {
asyncWriteTask.wakeup(); asyncWriteTask.wakeup();
@ -300,9 +330,12 @@ public class AMQMessageStore implements MessageStore {
void asyncWrite() { void asyncWrite() {
try { try {
CountDownLatch countDown; CountDownLatch countDown;
synchronized (this) { lock.lock();
try {
countDown = flushLatch; countDown = flushLatch;
flushLatch = null; flushLatch = null;
}finally {
lock.unlock();
} }
mark.set(doAsyncWrite()); mark.set(doAsyncWrite());
if (countDown != null) { if (countDown != null) {
@ -323,13 +356,16 @@ public class AMQMessageStore implements MessageStore {
final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize(); final int maxCheckpointMessageAddSize = peristenceAdapter.getMaxCheckpointMessageAddSize();
final Location lastLocation; final Location lastLocation;
// swap out the message hash maps.. // swap out the message hash maps..
synchronized (this) { lock.lock();
try {
cpAddedMessageIds = this.messages; cpAddedMessageIds = this.messages;
cpRemovedMessageLocations = this.messageAcks; cpRemovedMessageLocations = this.messageAcks;
cpActiveJournalLocations = new ArrayList<Location>(inFlightTxLocations); cpActiveJournalLocations = new ArrayList<Location>(inFlightTxLocations);
this.messages = new LinkedHashMap<MessageId, ReferenceData>(); this.messages = new LinkedHashMap<MessageId, ReferenceData>();
this.messageAcks = new ArrayList<MessageAck>(); this.messageAcks = new ArrayList<MessageAck>();
lastLocation = this.lastLocation; lastLocation = this.lastLocation;
}finally {
lock.unlock();
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Doing batch update... adding: " + cpAddedMessageIds.size() + " removing: " + cpRemovedMessageLocations.size() + " "); LOG.debug("Doing batch update... adding: " + cpAddedMessageIds.size() + " removing: " + cpRemovedMessageLocations.size() + " ");
@ -371,9 +407,12 @@ public class AMQMessageStore implements MessageStore {
} }
}); });
LOG.debug("Batch update done."); LOG.debug("Batch update done.");
synchronized (this) { lock.lock();
try {
cpAddedMessageIds = null; cpAddedMessageIds = null;
lastWrittenLocation = lastLocation; lastWrittenLocation = lastLocation;
}finally {
lock.unlock();
} }
if (cpActiveJournalLocations.size() > 0) { if (cpActiveJournalLocations.size() > 0) {
Collections.sort(cpActiveJournalLocations); Collections.sort(cpActiveJournalLocations);
@ -403,12 +442,15 @@ public class AMQMessageStore implements MessageStore {
protected Location getLocation(MessageId messageId) throws IOException { protected Location getLocation(MessageId messageId) throws IOException {
ReferenceData data = null; ReferenceData data = null;
synchronized (this) { lock.lock();
try {
// Is it still in flight??? // Is it still in flight???
data = messages.get(messageId); data = messages.get(messageId);
if (data == null && cpAddedMessageIds != null) { if (data == null && cpAddedMessageIds != null) {
data = cpAddedMessageIds.get(messageId); data = cpAddedMessageIds.get(messageId);
} }
}finally {
lock.unlock();
} }
if (data == null) { if (data == null) {
data = referenceStore.getMessageReference(messageId); data = referenceStore.getMessageReference(messageId);
@ -483,11 +525,11 @@ public class AMQMessageStore implements MessageStore {
} }
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
/*
RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter( RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(
this, listener); this, listener);
if (referenceStore.supportsExternalBatchControl()) { if (referenceStore.supportsExternalBatchControl()) {
synchronized (this) { lock.lock();
try {
referenceStore.recoverNextMessages(maxReturned, referenceStore.recoverNextMessages(maxReturned,
recoveryListener); recoveryListener);
if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) { if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) {
@ -505,18 +547,21 @@ public class AMQMessageStore implements MessageStore {
referenceStore.setBatch(recoveryListener referenceStore.setBatch(recoveryListener
.getLastRecoveredMessageId()); .getLastRecoveredMessageId());
} }
}finally {
lock.unlock();
} }
} else { } else {
flush(); flush();
referenceStore.recoverNextMessages(maxReturned, recoveryListener); referenceStore.recoverNextMessages(maxReturned, recoveryListener);
} }
*/ /*
RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener); RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener);
referenceStore.recoverNextMessages(maxReturned, recoveryListener); referenceStore.recoverNextMessages(maxReturned, recoveryListener);
if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) { if (recoveryListener.size() == 0 && recoveryListener.hasSpace()) {
flush(); flush();
referenceStore.recoverNextMessages(maxReturned, recoveryListener); referenceStore.recoverNextMessages(maxReturned, recoveryListener);
} }
*/
} }

View File

@ -30,6 +30,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activeio.journal.Journal; import org.apache.activeio.journal.Journal;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
@ -430,7 +431,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
AMQTopicMessageStore store = (AMQTopicMessageStore)topics.get(destinationName); AMQTopicMessageStore store = (AMQTopicMessageStore)topics.get(destinationName);
if (store == null) { if (store == null) {
TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName); TopicReferenceStore checkpointStore = referenceStoreAdapter.createTopicReferenceStore(destinationName);
store = new AMQTopicMessageStore(this, checkpointStore, destinationName); store = new AMQTopicMessageStore(this,checkpointStore, destinationName);
try { try {
store.start(); store.start();
} catch (Exception e) { } catch (Exception e) {

View File

@ -17,10 +17,6 @@
package org.apache.activemq.store.amq; package org.apache.activemq.store.amq;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
@ -33,7 +29,6 @@ import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TopicReferenceStore; import org.apache.activemq.store.TopicReferenceStore;
import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.SubscriptionKey; import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -47,7 +42,7 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag
private static final Log LOG = LogFactory.getLog(AMQTopicMessageStore.class); private static final Log LOG = LogFactory.getLog(AMQTopicMessageStore.class);
private TopicReferenceStore topicReferenceStore; private TopicReferenceStore topicReferenceStore;
public AMQTopicMessageStore(AMQPersistenceAdapter adapter, TopicReferenceStore topicReferenceStore, ActiveMQTopic destinationName) { public AMQTopicMessageStore(AMQPersistenceAdapter adapter,TopicReferenceStore topicReferenceStore, ActiveMQTopic destinationName) {
super(adapter, topicReferenceStore, destinationName); super(adapter, topicReferenceStore, destinationName);
this.topicReferenceStore = topicReferenceStore; this.topicReferenceStore = topicReferenceStore;
} }
@ -98,8 +93,11 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag
if (debug) { if (debug) {
LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location); LOG.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location);
} }
synchronized (this) { lock.lock();
try {
inFlightTxLocations.add(location); inFlightTxLocations.add(location);
}finally {
lock.unlock();
} }
transactionStore.acknowledge(this, ack, location); transactionStore.acknowledge(this, ack, location);
context.getTransaction().addSynchronization(new Synchronization() { context.getTransaction().addSynchronization(new Synchronization() {
@ -108,9 +106,12 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag
if (debug) { if (debug) {
LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location); LOG.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location);
} }
synchronized (AMQTopicMessageStore.this) { lock.lock();
try {
inFlightTxLocations.remove(location); inFlightTxLocations.remove(location);
acknowledge(context,messageId, location, clientId,subscriptionName); acknowledge(context,messageId, location, clientId,subscriptionName);
}finally {
lock.unlock();
} }
} }
@ -118,8 +119,11 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag
if (debug) { if (debug) {
LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location); LOG.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location);
} }
synchronized (AMQTopicMessageStore.this) { lock.lock();
try{
inFlightTxLocations.remove(location); inFlightTxLocations.remove(location);
}finally {
lock.unlock();
} }
} }
}); });
@ -149,8 +153,12 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag
Location location, String clientId, String subscriptionName) Location location, String clientId, String subscriptionName)
throws IOException { throws IOException {
MessageAck ack = null; MessageAck ack = null;
synchronized (this) { lock.lock();
try {
lastLocation = location; lastLocation = location;
}finally {
lock.unlock();
}
if (topicReferenceStore.acknowledgeReference(context, clientId, if (topicReferenceStore.acknowledgeReference(context, clientId,
subscriptionName, messageId)) { subscriptionName, messageId)) {
@ -158,7 +166,7 @@ public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessag
ack.setLastMessageId(messageId); ack.setLastMessageId(messageId);
} }
}
if (ack != null) { if (ack != null) {
removeMessage(context, ack); removeMessage(context, ack);
} }

View File

@ -55,9 +55,7 @@ final class RecoveryListenerAdapter implements MessageRecoveryListener {
if (message != null) { if (message != null) {
return recoverMessage(message); return recoverMessage(message);
} else { } else {
if (LOG.isDebugEnabled()) { LOG.error("Message id " + ref + " could not be recovered from the data store - already dispatched");
LOG.debug("Message id " + ref + " could not be recovered from the data store - already dispatched");
}
} }
return false; return false;
} }

View File

@ -17,6 +17,9 @@
package org.apache.activemq.store.kahadaptor; package org.apache.activemq.store.kahadaptor;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
@ -26,10 +29,12 @@ import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.StoreEntry; import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.ReferenceStore; import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.ReferenceStore.ReferenceData;
import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
/**
* @author rajdavies
*
*/
public class KahaReferenceStore implements ReferenceStore { public class KahaReferenceStore implements ReferenceStore {
protected final ActiveMQDestination destination; protected final ActiveMQDestination destination;
@ -37,6 +42,7 @@ public class KahaReferenceStore implements ReferenceStore {
protected KahaReferenceStoreAdapter adapter; protected KahaReferenceStoreAdapter adapter;
private StoreEntry batchEntry; private StoreEntry batchEntry;
private String lastBatchId; private String lastBatchId;
protected final Lock lock = new ReentrantLock();
public KahaReferenceStore(KahaReferenceStoreAdapter adapter, MapContainer<MessageId, ReferenceRecord> container, public KahaReferenceStore(KahaReferenceStoreAdapter adapter, MapContainer<MessageId, ReferenceRecord> container,
ActiveMQDestination destination) throws IOException { ActiveMQDestination destination) throws IOException {
@ -44,6 +50,10 @@ public class KahaReferenceStore implements ReferenceStore {
this.messageContainer = container; this.messageContainer = container;
this.destination = destination; this.destination = destination;
} }
public Lock getStoreLock() {
return lock;
}
public void start() { public void start() {
} }
@ -55,11 +65,11 @@ public class KahaReferenceStore implements ReferenceStore {
return new MessageId(((ReferenceRecord)object).getMessageId()); return new MessageId(((ReferenceRecord)object).getMessageId());
} }
public synchronized void addMessage(ConnectionContext context, Message message) throws IOException { public void addMessage(ConnectionContext context, Message message) throws IOException {
throw new RuntimeException("Use addMessageReference instead"); throw new RuntimeException("Use addMessageReference instead");
} }
public synchronized Message getMessage(MessageId identity) throws IOException { public Message getMessage(MessageId identity) throws IOException {
throw new RuntimeException("Use addMessageReference instead"); throw new RuntimeException("Use addMessageReference instead");
} }
@ -73,58 +83,78 @@ public class KahaReferenceStore implements ReferenceStore {
return false; return false;
} }
public synchronized void recover(MessageRecoveryListener listener) throws Exception { public void recover(MessageRecoveryListener listener) throws Exception {
for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer lock.lock();
.getNext(entry)) { try {
ReferenceRecord record = messageContainer.getValue(entry); for (StoreEntry entry = messageContainer.getFirst(); entry != null; entry = messageContainer
if (!recoverReference(listener, record)) { .getNext(entry)) {
break; ReferenceRecord record = messageContainer.getValue(entry);
} if (!recoverReference(listener, record)) {
} break;
}
public synchronized void recoverNextMessages(int maxReturned, MessageRecoveryListener listener)
throws Exception {
StoreEntry entry = batchEntry;
if (entry == null) {
entry = messageContainer.getFirst();
} else {
entry = messageContainer.refresh(entry);
if (entry != null) {
entry = messageContainer.getNext(entry);
}
}
if (entry != null) {
int count = 0;
do {
ReferenceRecord msg = messageContainer.getValue(entry);
if (msg != null ) {
if ( recoverReference(listener, msg)) {
count++;
lastBatchId = msg.getMessageId();
}
} else {
lastBatchId = null;
} }
batchEntry = entry; }
entry = messageContainer.getNext(entry); }finally {
} while (entry != null && count < maxReturned && listener.hasSpace()); lock.unlock();
} }
} }
public synchronized void addMessageReference(ConnectionContext context, MessageId messageId, public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener)
throws Exception {
lock.lock();
try {
StoreEntry entry = batchEntry;
if (entry == null) {
entry = messageContainer.getFirst();
} else {
entry = messageContainer.refresh(entry);
if (entry != null) {
entry = messageContainer.getNext(entry);
}
}
if (entry != null) {
int count = 0;
do {
ReferenceRecord msg = messageContainer.getValue(entry);
if (msg != null ) {
if ( recoverReference(listener, msg)) {
count++;
lastBatchId = msg.getMessageId();
}
} else {
lastBatchId = null;
}
batchEntry = entry;
entry = messageContainer.getNext(entry);
} while (entry != null && count < maxReturned && listener.hasSpace());
}
}finally {
lock.unlock();
}
}
public void addMessageReference(ConnectionContext context, MessageId messageId,
ReferenceData data) throws IOException { ReferenceData data) throws IOException {
ReferenceRecord record = new ReferenceRecord(messageId.toString(), data); lock.lock();
messageContainer.put(messageId, record); try {
addInterest(record); ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
messageContainer.put(messageId, record);
addInterest(record);
}finally {
lock.unlock();
}
} }
public synchronized ReferenceData getMessageReference(MessageId identity) throws IOException { public ReferenceData getMessageReference(MessageId identity) throws IOException {
ReferenceRecord result = messageContainer.get(identity); lock.lock();
if (result == null) { try {
return null; ReferenceRecord result = messageContainer.get(identity);
if (result == null) {
return null;
}
return result.getData();
}finally {
lock.unlock();
} }
return result.getData();
} }
public void addReferenceFileIdsInUse() { public void addReferenceFileIdsInUse() {
@ -139,36 +169,57 @@ public class KahaReferenceStore implements ReferenceStore {
removeMessage(ack.getLastMessageId()); removeMessage(ack.getLastMessageId());
} }
public synchronized void removeMessage(MessageId msgId) throws IOException { public void removeMessage(MessageId msgId) throws IOException {
StoreEntry entry = messageContainer.getEntry(msgId); lock.lock();
if (entry != null) { try {
ReferenceRecord rr = messageContainer.remove(msgId); StoreEntry entry = messageContainer.getEntry(msgId);
if (rr != null) { if (entry != null) {
removeInterest(rr); ReferenceRecord rr = messageContainer.remove(msgId);
if (messageContainer.isEmpty() if (rr != null) {
|| (lastBatchId != null && lastBatchId.equals(msgId.toString())) removeInterest(rr);
|| (batchEntry != null && batchEntry.equals(entry))) { if (messageContainer.isEmpty()
resetBatching(); || (lastBatchId != null && lastBatchId.equals(msgId.toString()))
|| (batchEntry != null && batchEntry.equals(entry))) {
resetBatching();
}
} }
} }
}finally {
lock.unlock();
} }
} }
public synchronized void removeAllMessages(ConnectionContext context) throws IOException { public void removeAllMessages(ConnectionContext context) throws IOException {
messageContainer.clear(); lock.lock();
try {
messageContainer.clear();
}finally {
lock.unlock();
}
} }
public ActiveMQDestination getDestination() { public ActiveMQDestination getDestination() {
return destination; return destination;
} }
public synchronized void delete() { public void delete() {
messageContainer.clear(); lock.lock();
try {
messageContainer.clear();
}finally {
lock.unlock();
}
} }
public synchronized void resetBatching() { public void resetBatching() {
batchEntry = null; lock.lock();
lastBatchId = null; try {
batchEntry = null;
lastBatchId = null;
}finally {
lock.unlock();
}
} }
public int getMessageCount() { public int getMessageCount() {

View File

@ -25,6 +25,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
@ -177,7 +178,7 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
} }
return rc; return rc;
} }
/*
public void buildReferenceFileIdsInUse() throws IOException { public void buildReferenceFileIdsInUse() throws IOException {
recordReferences = new HashMap<Integer, AtomicInteger>(); recordReferences = new HashMap<Integer, AtomicInteger>();
Set<ActiveMQDestination> destinations = getDestinations(); Set<ActiveMQDestination> destinations = getDestinations();
@ -191,6 +192,7 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
} }
} }
} }
*/
protected MapContainer<MessageId, ReferenceRecord> getMapReferenceContainer(Object id, protected MapContainer<MessageId, ReferenceRecord> getMapReferenceContainer(Object id,
String containerName) String containerName)
@ -249,6 +251,7 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
* @throws IOException * @throws IOException
* @see org.apache.activemq.store.ReferenceStoreAdapter#recoverState() * @see org.apache.activemq.store.ReferenceStoreAdapter#recoverState()
*/ */
public void recoverState() throws IOException { public void recoverState() throws IOException {
for (Iterator<SubscriptionInfo> i = durableSubscribers.iterator(); i.hasNext();) { for (Iterator<SubscriptionInfo> i = durableSubscribers.iterator(); i.hasNext();) {
SubscriptionInfo info = i.next(); SubscriptionInfo info = i.next();

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
@ -62,33 +63,38 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
return new MessageId(((ReferenceRecord)object).getMessageId()); return new MessageId(((ReferenceRecord)object).getMessageId());
} }
public synchronized void addMessage(ConnectionContext context, Message message) throws IOException { public void addMessage(ConnectionContext context, Message message) throws IOException {
throw new RuntimeException("Use addMessageReference instead"); throw new RuntimeException("Use addMessageReference instead");
} }
public synchronized Message getMessage(MessageId identity) throws IOException { public Message getMessage(MessageId identity) throws IOException {
throw new RuntimeException("Use addMessageReference instead"); throw new RuntimeException("Use addMessageReference instead");
} }
public synchronized void addMessageReference(final ConnectionContext context, final MessageId messageId, public void addMessageReference(final ConnectionContext context, final MessageId messageId,
final ReferenceData data) { final ReferenceData data) {
final ReferenceRecord record = new ReferenceRecord(messageId.toString(), data); lock.lock();
final int subscriberCount = subscriberMessages.size(); try {
if (subscriberCount > 0) { final ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
final StoreEntry messageEntry = messageContainer.place(messageId, record); final int subscriberCount = subscriberMessages.size();
addInterest(record); if (subscriberCount > 0) {
final TopicSubAck tsa = new TopicSubAck(); final StoreEntry messageEntry = messageContainer.place(messageId, record);
tsa.setCount(subscriberCount); addInterest(record);
tsa.setMessageEntry(messageEntry); final TopicSubAck tsa = new TopicSubAck();
final StoreEntry ackEntry = ackContainer.placeLast(tsa); tsa.setCount(subscriberCount);
for (final Iterator<TopicSubContainer> i = subscriberMessages.values().iterator(); i.hasNext();) { tsa.setMessageEntry(messageEntry);
final TopicSubContainer container = i.next(); final StoreEntry ackEntry = ackContainer.placeLast(tsa);
final ConsumerMessageRef ref = new ConsumerMessageRef(); for (final Iterator<TopicSubContainer> i = subscriberMessages.values().iterator(); i.hasNext();) {
ref.setAckEntry(ackEntry); final TopicSubContainer container = i.next();
ref.setMessageEntry(messageEntry); final ConsumerMessageRef ref = new ConsumerMessageRef();
ref.setMessageId(messageId); ref.setAckEntry(ackEntry);
container.add(ref); ref.setMessageEntry(messageEntry);
ref.setMessageId(messageId);
container.add(ref);
}
} }
}finally {
lock.unlock();
} }
} }
@ -121,100 +127,119 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
return container; return container;
} }
public synchronized boolean acknowledgeReference(ConnectionContext context, public boolean acknowledgeReference(ConnectionContext context,
String clientId, String subscriptionName, MessageId messageId) String clientId, String subscriptionName, MessageId messageId)
throws IOException { throws IOException {
boolean removeMessage = false; boolean removeMessage = false;
String key = getSubscriptionKey(clientId, subscriptionName); lock.lock();
try {
TopicSubContainer container = subscriberMessages.get(key); String key = getSubscriptionKey(clientId, subscriptionName);
if (container != null) {
ConsumerMessageRef ref = null; TopicSubContainer container = subscriberMessages.get(key);
if((ref = container.remove(messageId)) != null) { if (container != null) {
TopicSubAck tsa = ackContainer.get(ref.getAckEntry()); ConsumerMessageRef ref = null;
if (tsa != null) { if((ref = container.remove(messageId)) != null) {
if (tsa.decrementCount() <= 0) { TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
StoreEntry entry = ref.getAckEntry(); if (tsa != null) {
entry = ackContainer.refresh(entry); if (tsa.decrementCount() <= 0) {
ackContainer.remove(entry); StoreEntry entry = ref.getAckEntry();
ReferenceRecord rr = messageContainer.get(messageId); entry = ackContainer.refresh(entry);
if (rr != null) { ackContainer.remove(entry);
entry = tsa.getMessageEntry(); ReferenceRecord rr = messageContainer.get(messageId);
entry = messageContainer.refresh(entry); if (rr != null) {
messageContainer.remove(entry); entry = tsa.getMessageEntry();
removeInterest(rr); entry = messageContainer.refresh(entry);
removeMessage = true; messageContainer.remove(entry);
removeInterest(rr);
removeMessage = true;
}
} }
} }
}else{
//no message held
removeMessage = true;
} }
}else{
//no message held
removeMessage = true;
} }
}finally {
lock.unlock();
} }
return removeMessage; return removeMessage;
} }
public synchronized void acknowledge(ConnectionContext context, public void acknowledge(ConnectionContext context,
String clientId, String subscriptionName, MessageId messageId) String clientId, String subscriptionName, MessageId messageId)
throws IOException { throws IOException {
String key = getSubscriptionKey(clientId, subscriptionName); String key = getSubscriptionKey(clientId, subscriptionName);
lock.lock();
TopicSubContainer container = subscriberMessages.get(key); try {
if (container != null) { TopicSubContainer container = subscriberMessages.get(key);
ConsumerMessageRef ref = container.remove(messageId); if (container != null) {
if (ref != null) { ConsumerMessageRef ref = container.remove(messageId);
TopicSubAck tsa = ackContainer.get(ref.getAckEntry()); if (ref != null) {
if (tsa != null) { TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
if (tsa.decrementCount() <= 0) { if (tsa != null) {
StoreEntry entry = ref.getAckEntry(); if (tsa.decrementCount() <= 0) {
entry = ackContainer.refresh(entry); StoreEntry entry = ref.getAckEntry();
ackContainer.remove(entry); entry = ackContainer.refresh(entry);
ReferenceRecord rr = messageContainer.get(messageId); ackContainer.remove(entry);
if (rr != null) { ReferenceRecord rr = messageContainer.get(messageId);
entry = tsa.getMessageEntry(); if (rr != null) {
entry = messageContainer.refresh(entry); entry = tsa.getMessageEntry();
messageContainer.remove(entry); entry = messageContainer.refresh(entry);
removeInterest(rr); messageContainer.remove(entry);
removeInterest(rr);
}
} else {
ackContainer.update(ref.getAckEntry(), tsa);
} }
} else {
ackContainer.update(ref.getAckEntry(), tsa);
} }
} }
} }
} }finally {
lock.unlock();
}
} }
public synchronized void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException { public void addSubsciption(SubscriptionInfo info, boolean retroactive) throws IOException {
String key = getSubscriptionKey(info.getClientId(), info.getSubscriptionName()); String key = getSubscriptionKey(info.getClientId(), info.getSubscriptionName());
// if already exists - won't add it again as it causes data files lock.lock();
// to hang around try {
if (!subscriberContainer.containsKey(key)) { // if already exists - won't add it again as it causes data files
subscriberContainer.put(key, info); // to hang around
adapter.addSubscriberState(info); if (!subscriberContainer.containsKey(key)) {
} subscriberContainer.put(key, info);
// add the subscriber adapter.addSubscriberState(info);
addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName()); }
if (retroactive) { // add the subscriber
/* addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName());
* for(StoreEntry if (retroactive) {
* entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){ /*
* TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry); * for(StoreEntry
* ConsumerMessageRef ref=new ConsumerMessageRef(); * entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
* ref.setAckEntry(entry); * TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
* ref.setMessageEntry(tsa.getMessageEntry()); container.add(ref); } * ConsumerMessageRef ref=new ConsumerMessageRef();
*/ * ref.setAckEntry(entry);
* ref.setMessageEntry(tsa.getMessageEntry()); container.add(ref); }
*/
}
}finally {
lock.unlock();
} }
} }
public synchronized void deleteSubscription(String clientId, String subscriptionName) throws IOException { public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); lock.lock();
if (info != null) { try {
adapter.removeSubscriberState(info); SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
} if (info != null) {
adapter.removeSubscriberState(info);
}
removeSubscriberMessageContainer(clientId,subscriptionName); removeSubscriberMessageContainer(clientId,subscriptionName);
}finally {
lock.unlock();
}
} }
public SubscriptionInfo[] getAllSubscriptions() throws IOException { public SubscriptionInfo[] getAllSubscriptions() throws IOException {
@ -233,41 +258,46 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
return subscriberContainer.get(getSubscriptionKey(clientId, subscriptionName)); return subscriberContainer.get(getSubscriptionKey(clientId, subscriptionName));
} }
public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned,
MessageRecoveryListener listener) throws Exception { MessageRecoveryListener listener) throws Exception {
String key = getSubscriptionKey(clientId, subscriptionName); String key = getSubscriptionKey(clientId, subscriptionName);
TopicSubContainer container = subscriberMessages.get(key); lock.lock();
if (container != null) { try {
int count = 0; TopicSubContainer container = subscriberMessages.get(key);
StoreEntry entry = container.getBatchEntry(); if (container != null) {
if (entry == null) { int count = 0;
entry = container.getEntry(); StoreEntry entry = container.getBatchEntry();
} else { if (entry == null) {
entry = container.refreshEntry(entry); entry = container.getEntry();
} else {
entry = container.refreshEntry(entry);
if (entry != null) {
entry = container.getNextEntry(entry);
}
}
if (entry != null) { if (entry != null) {
entry = container.getNextEntry(entry); do {
ConsumerMessageRef consumerRef = container.get(entry);
ReferenceRecord msg = messageContainer.getValue(consumerRef
.getMessageEntry());
if (msg != null) {
if (recoverReference(listener, msg)) {
count++;
container.setBatchEntry(msg.getMessageId(), entry);
} else {
break;
}
} else {
container.reset();
}
entry = container.getNextEntry(entry);
} while (entry != null && count < maxReturned && listener.hasSpace());
} }
} }
}finally {
if (entry != null) { lock.unlock();
do {
ConsumerMessageRef consumerRef = container.get(entry);
ReferenceRecord msg = messageContainer.getValue(consumerRef
.getMessageEntry());
if (msg != null) {
if (recoverReference(listener, msg)) {
count++;
container.setBatchEntry(msg.getMessageId(), entry);
} else {
break;
}
} else {
container.reset();
}
entry = container.getNextEntry(entry);
} while (entry != null && count < maxReturned && listener.hasSpace());
}
} }
} }
@ -288,11 +318,16 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
} }
} }
public synchronized void resetBatching(String clientId, String subscriptionName) { public void resetBatching(String clientId, String subscriptionName) {
String key = getSubscriptionKey(clientId, subscriptionName); lock.lock();
TopicSubContainer topicSubContainer = subscriberMessages.get(key); try {
if (topicSubContainer != null) { String key = getSubscriptionKey(clientId, subscriptionName);
topicSubContainer.reset(); TopicSubContainer topicSubContainer = subscriberMessages.get(key);
if (topicSubContainer != null) {
topicSubContainer.reset();
}
}finally {
lock.unlock();
} }
} }

View File

@ -124,7 +124,7 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic
} }
public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception { public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName)); MemoryTopicSub sub = this.topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
if (sub != null) { if (sub != null) {
sub.recoverNextMessages(maxReturned, listener); sub.recoverNextMessages(maxReturned, listener);
} }