Allows the store to detect changes in the Audit and only write if its dirty.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1416989 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-12-04 15:25:54 +00:00
parent 7461c78a19
commit 54d56df705
2 changed files with 128 additions and 47 deletions

View File

@ -30,8 +30,8 @@ import org.apache.activemq.util.LRUCache;
/**
* Provides basic audit functions for Messages without sync
*
*
*
*
*/
public class ActiveMQMessageAuditNoSync implements Serializable {
@ -41,11 +41,11 @@ public class ActiveMQMessageAuditNoSync implements Serializable {
public static final int MAXIMUM_PRODUCER_COUNT = 64;
private int auditDepth;
private int maximumNumberOfProducersToTrack;
private LRUCache<Object, BitArrayBin> map;
private final LRUCache<Object, BitArrayBin> map;
private transient boolean modified = true;
/**
* Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack =
* 64
* Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack = 64
*/
public ActiveMQMessageAuditNoSync() {
this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT);
@ -53,17 +53,16 @@ public class ActiveMQMessageAuditNoSync implements Serializable {
/**
* Construct a MessageAudit
*
*
* @param auditDepth range of ids to track
* @param maximumNumberOfProducersToTrack number of producers expected in
* the system
* @param maximumNumberOfProducersToTrack number of producers expected in the system
*/
public ActiveMQMessageAuditNoSync(int auditDepth, final int maximumNumberOfProducersToTrack) {
this.auditDepth = auditDepth;
this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack;
this.map = new LRUCache<Object, BitArrayBin>(0, maximumNumberOfProducersToTrack, 0.75f, true);
}
/**
* @return the auditDepth
*/
@ -76,6 +75,7 @@ public class ActiveMQMessageAuditNoSync implements Serializable {
*/
public void setAuditDepth(int auditDepth) {
this.auditDepth = auditDepth;
this.modified = true;
}
/**
@ -88,15 +88,15 @@ public class ActiveMQMessageAuditNoSync implements Serializable {
/**
* @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set
*/
public void setMaximumNumberOfProducersToTrack(
int maximumNumberOfProducersToTrack) {
public void setMaximumNumberOfProducersToTrack(int maximumNumberOfProducersToTrack) {
this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack;
this.map.setMaxCacheSize(maximumNumberOfProducersToTrack);
this.modified = true;
}
/**
* Checks if this message has been seen before
*
*
* @param message
* @return true if the message is a duplicate
* @throws JMSException
@ -108,7 +108,7 @@ public class ActiveMQMessageAuditNoSync implements Serializable {
/**
* checks whether this messageId has been seen before and adds this
* messageId to the list
*
*
* @param id
* @return true if the message is a duplicate
*/
@ -120,10 +120,12 @@ public class ActiveMQMessageAuditNoSync implements Serializable {
if (bab == null) {
bab = new BitArrayBin(auditDepth);
map.put(seed, bab);
modified = true;
}
long index = IdGenerator.getSequenceFromId(id);
if (index >= 0) {
answer = bab.setBit(index, true);
modified = true;
}
}
return answer;
@ -131,7 +133,7 @@ public class ActiveMQMessageAuditNoSync implements Serializable {
/**
* Checks if this message has been seen before
*
*
* @param message
* @return true if the message is a duplicate
*/
@ -139,16 +141,16 @@ public class ActiveMQMessageAuditNoSync implements Serializable {
MessageId id = message.getMessageId();
return isDuplicate(id);
}
/**
* Checks if this messageId has been seen before
*
*
* @param id
* @return true if the message is a duplicate
*/
public boolean isDuplicate(final MessageId id) {
boolean answer = false;
if (id != null) {
ProducerId pid = id.getProducerId();
if (pid != null) {
@ -156,6 +158,7 @@ public class ActiveMQMessageAuditNoSync implements Serializable {
if (bab == null) {
bab = new BitArrayBin(auditDepth);
map.put(pid, bab);
modified = true;
}
answer = bab.setBit(id.getProducerSequenceId(), true);
}
@ -165,17 +168,17 @@ public class ActiveMQMessageAuditNoSync implements Serializable {
/**
* mark this message as being received
*
*
* @param message
*/
public void rollback(final MessageReference message) {
MessageId id = message.getMessageId();
rollback(id);
}
/**
* mark this message as being received
*
*
* @param id
*/
public void rollback(final MessageId id) {
@ -185,6 +188,7 @@ public class ActiveMQMessageAuditNoSync implements Serializable {
BitArrayBin bab = map.get(pid);
if (bab != null) {
bab.setBit(id.getProducerSequenceId(), false);
modified = true;
}
}
}
@ -197,10 +201,11 @@ public class ActiveMQMessageAuditNoSync implements Serializable {
if (bab != null) {
long index = IdGenerator.getSequenceFromId(id);
bab.setBit(index, false);
modified = true;
}
}
}
/**
* Check the message is in order
* @param msg
@ -210,7 +215,7 @@ public class ActiveMQMessageAuditNoSync implements Serializable {
public boolean isInOrder(Message msg) throws JMSException {
return isInOrder(msg.getJMSMessageID());
}
/**
* Check the message id is in order
* @param id
@ -218,7 +223,7 @@ public class ActiveMQMessageAuditNoSync implements Serializable {
*/
public boolean isInOrder(final String id) {
boolean answer = true;
if (id != null) {
String seed = IdGenerator.getSeedFromId(id);
if (seed != null) {
@ -226,22 +231,22 @@ public class ActiveMQMessageAuditNoSync implements Serializable {
if (bab != null) {
long index = IdGenerator.getSequenceFromId(id);
answer = bab.isInOrder(index);
modified = true;
}
}
}
return answer;
}
/**
* Check the MessageId is in order
* @param message
* @param message
* @return
*/
public boolean isInOrder(final MessageReference message) {
return isInOrder(message.getMessageId());
}
/**
* Check the MessageId is in order
* @param id
@ -257,6 +262,7 @@ public class ActiveMQMessageAuditNoSync implements Serializable {
if (bab == null) {
bab = new BitArrayBin(auditDepth);
map.put(pid, bab);
modified = true;
}
answer = bab.isInOrder(id.getProducerSequenceId());
@ -277,4 +283,36 @@ public class ActiveMQMessageAuditNoSync implements Serializable {
public void clear() {
map.clear();
}
/**
* Returns if the Audit has been modified since last check, this method does not
* reset the modified flag. If the caller needs to reset the flag in order to avoid
* serializing an unchanged Audit then its up the them to reset it themselves.
*
* @return true if the Audit has been modified.
*/
public boolean isModified() {
return this.modified;
}
public void setModified(boolean modified) {
this.modified = modified;
}
/**
* Reads and returns the current modified state of the Audit, once called the state is
* reset to false. This method is useful for code the needs to know if it should write
* out the Audit or otherwise execute some logic based on the Audit having changed since
* last check.
*
* @return true if the Audit has been modified since last check.
*/
public boolean modified() {
if (this.modified) {
this.modified = false;
return true;
}
return false;
}
}

View File

@ -68,10 +68,6 @@ import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor;
import org.apache.activemq.store.kahadb.disk.index.ListIndex;
@ -81,17 +77,21 @@ import org.apache.activemq.store.kahadb.disk.journal.Location;
import org.apache.activemq.store.kahadb.disk.page.Page;
import org.apache.activemq.store.kahadb.disk.page.PageFile;
import org.apache.activemq.store.kahadb.disk.page.Transaction;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.DataByteArrayOutputStream;
import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
import org.apache.activemq.util.LockFile;
import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
import org.apache.activemq.store.kahadb.disk.util.Marshaller;
import org.apache.activemq.store.kahadb.disk.util.Sequence;
import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.DataByteArrayOutputStream;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.LockFile;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -182,12 +182,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
class MetadataMarshaller extends VariableMarshaller<Metadata> {
@Override
public Metadata readPayload(DataInput dataIn) throws IOException {
Metadata rc = new Metadata();
rc.read(dataIn);
return rc;
}
@Override
public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
object.write(dataOut);
}
@ -250,6 +252,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
final PageFile pageFile = getPageFile();
pageFile.load();
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
if (pageFile.getPageCount() == 0) {
// First time this is created.. Initialize the metadata
@ -275,6 +278,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// Perhaps we should just keep an index of file
storedDestinations.clear();
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
Entry<String, StoredDestination> entry = iterator.next();
@ -394,6 +398,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
try {
if (metadata.page != null) {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
checkpointUpdate(tx, true);
}
@ -422,6 +427,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
if (metadata.page != null) {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
tx.store(metadata.page, metadataMarshaller, true);
}
@ -501,6 +507,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// We may have to undo some index updates.
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
recoverIndex(tx);
}
@ -613,6 +620,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
int last=-1;
@Override
public boolean isInterestedInKeysBetween(Location first, Location second) {
if( first==null ) {
return !ss.contains(0, second.getDataFileId());
@ -623,6 +631,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
@Override
public void visit(List<Location> keys, List<Long> values) {
for (Location l : keys) {
int fileId = l.getDataFileId();
@ -765,6 +774,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return;
}
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
checkpointUpdate(tx, cleanup);
}
@ -785,6 +795,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
this.indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
checkpointUpdate(tx, false);
}
@ -907,6 +918,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} else {
// just recover producer audit
data.visit(new Visitor() {
@Override
public void visit(KahaAddMessageCommand command) throws IOException {
metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
}
@ -978,6 +990,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
this.indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
upadateIndex(tx, command, location);
}
@ -997,6 +1010,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
this.indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
updateIndex(tx, command, location);
}
@ -1011,6 +1025,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
this.indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
updateIndex(tx, command, location);
}
@ -1024,6 +1039,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
this.indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
updateIndex(tx, command, location);
}
@ -1073,6 +1089,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
this.indexLock.writeLock().lock();
try {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
for (Operation op : messagingTx) {
op.execute(tx);
@ -1360,6 +1377,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// Use a visitor to cut down the number of pages that we load
entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
int last=-1;
@Override
public boolean isInterestedInKeysBetween(Location first, Location second) {
if( first==null ) {
SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
@ -1385,6 +1403,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
@Override
public void visit(List<Location> keys, List<Long> values) {
for (Location l : keys) {
int fileId = l.getDataFileId();
@ -1445,19 +1464,22 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
};
private Location checkpointProducerAudit() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oout = new ObjectOutputStream(baos);
oout.writeObject(metadata.producerSequenceIdTracker);
oout.flush();
oout.close();
// using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback);
try {
location.getLatch().await();
} catch (InterruptedException e) {
throw new InterruptedIOException(e.toString());
if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oout = new ObjectOutputStream(baos);
oout.writeObject(metadata.producerSequenceIdTracker);
oout.flush();
oout.close();
// using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback);
try {
location.getLatch().await();
} catch (InterruptedException e) {
throw new InterruptedIOException(e.toString());
}
return location;
}
return location;
return metadata.producerSequenceIdTrackerLocation;
}
public HashSet<Integer> getJournalFilesBeingReplicated() {
@ -1495,10 +1517,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
@Override
public MessageKeys readPayload(DataInput dataIn) throws IOException {
return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
}
@Override
public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
dataOut.writeUTF(object.messageId);
LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
@ -1528,6 +1552,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
this.priority = priority;
}
@Override
public String toString() {
return "[" + lastAckedSequence + ":" + priority + "]";
}
@ -1535,11 +1560,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
protected class LastAckMarshaller implements Marshaller<LastAck> {
@Override
public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
dataOut.writeLong(object.lastAckedSequence);
dataOut.writeByte(object.priority);
}
@Override
public LastAck readPayload(DataInput dataIn) throws IOException {
LastAck lastAcked = new LastAck();
lastAcked.lastAckedSequence = dataIn.readLong();
@ -1549,14 +1576,17 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return lastAcked;
}
@Override
public int getFixedSize() {
return 9;
}
@Override
public LastAck deepCopy(LastAck source) {
return new LastAck(source);
}
@Override
public boolean isDeepCopySupported() {
return true;
}
@ -1581,6 +1611,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
@Override
public StoredDestination readPayload(final DataInput dataIn) throws IOException {
final StoredDestination value = new StoredDestination();
value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
@ -1595,6 +1626,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} else {
// upgrade
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
BTreeIndex<Long, HashSet<String>> oldAckPositions =
new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
@ -1638,6 +1670,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} else {
// upgrade
pageFile.tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
@ -1655,6 +1688,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return value;
}
@Override
public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
dataOut.writeLong(value.locationIndex.getPageId());
@ -1675,12 +1709,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
@Override
public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
rc.mergeFramed((InputStream)dataIn);
return rc;
}
@Override
public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
object.writeFramed((OutputStream)dataOut);
}
@ -2275,6 +2311,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
this.checksumJournalFiles = checksumJournalFiles;
}
@Override
public void setBrokerService(BrokerService brokerService) {
this.brokerService = brokerService;
}
@ -2408,6 +2445,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
@Override
public String toString() {
return "MessageOrderCursor:[def:" + defaultCursorPosition
+ ", low:" + lowPriorityCursorPosition
@ -2664,6 +2702,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
@Override
public boolean hasNext() {
if (currentIterator == null) {
if (highIterator != null) {
@ -2712,6 +2751,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return currentIterator.hasNext();
}
@Override
public Entry<Long, MessageKeys> next() {
Entry<Long, MessageKeys> result = currentIterator.next();
if (result != null) {
@ -2731,6 +2771,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return result;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@ -2741,6 +2782,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
@Override
public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oout = new ObjectOutputStream(baos);
@ -2752,6 +2794,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
dataOut.write(data);
}
@Override
@SuppressWarnings("unchecked")
public HashSet<String> readPayload(DataInput dataIn) throws IOException {
int dataLen = dataIn.readInt();