AMQ-2439: KahaDB + Network of Brokers + Restart = Duplicate Messages that cannot be removed from the data store

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@821090 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2009-10-02 16:46:37 +00:00
parent 94708a284d
commit bdd9e2a8cd
1 changed files with 37 additions and 6 deletions

View File

@ -22,7 +22,17 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.*; import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -49,13 +59,22 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.index.BTreeIndex; import org.apache.kahadb.index.BTreeIndex;
import org.apache.kahadb.index.BTreeVisitor; import org.apache.kahadb.index.BTreeVisitor;
import org.apache.kahadb.journal.DataFile;
import org.apache.kahadb.journal.Journal; import org.apache.kahadb.journal.Journal;
import org.apache.kahadb.journal.Location; import org.apache.kahadb.journal.Location;
import org.apache.kahadb.journal.DataFile;
import org.apache.kahadb.page.Page; import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.PageFile; import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.page.Transaction; import org.apache.kahadb.page.Transaction;
import org.apache.kahadb.util.*; import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.DataByteArrayInputStream;
import org.apache.kahadb.util.DataByteArrayOutputStream;
import org.apache.kahadb.util.LockFile;
import org.apache.kahadb.util.LongMarshaller;
import org.apache.kahadb.util.Marshaller;
import org.apache.kahadb.util.Sequence;
import org.apache.kahadb.util.SequenceSet;
import org.apache.kahadb.util.StringMarshaller;
import org.apache.kahadb.util.VariableMarshaller;
public class MessageDatabase { public class MessageDatabase {
@ -808,12 +827,24 @@ public class MessageDatabase {
long id = sd.nextMessageId++; long id = sd.nextMessageId++;
Long previous = sd.locationIndex.put(tx, location, id); Long previous = sd.locationIndex.put(tx, location, id);
if( previous == null ) { if( previous == null ) {
sd.messageIdIndex.put(tx, command.getMessageId(), id); previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location)); if( previous == null ) {
sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));
} else {
// If the message ID as indexed, then the broker asked us to store a DUP
// message. Bad BOY! Don't do it, and log a warning.
LOG.warn("Duplicate message add attempt rejected. Message id: "+command.getMessageId()+", on: "+command.getDestination());
// TODO: consider just rolling back the tx.
sd.messageIdIndex.put(tx, command.getMessageId(), previous);
}
} else { } else {
// restore the previous value.. Looks like this was a redo of a previously // restore the previous value.. Looks like this was a redo of a previously
// added message. We don't want to assing it a new id as the other indexes would // added message. We don't want to assign it a new id as the other indexes would
// be wrong.. // be wrong..
//
// TODO: consider just rolling back the tx.
sd.locationIndex.put(tx, location, previous); sd.locationIndex.put(tx, location, previous);
} }