mirror of https://github.com/apache/activemq.git
changes for https://issues.apache.org/activemq/browse/AMQ-2789 - message priority
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@980458 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
589d634fe3
commit
cba046834c
|
@ -98,7 +98,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
protected ExecutorService topicExecutor;
|
||||
protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
|
||||
protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
|
||||
private final WireFormat wireFormat = new OpenWireFormat();
|
||||
final WireFormat wireFormat = new OpenWireFormat();
|
||||
private SystemUsage usageManager;
|
||||
private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
|
||||
private LinkedBlockingQueue<Runnable> asyncTopicJobQueue;
|
||||
|
@ -368,7 +368,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
command.setDestination(dest);
|
||||
command.setMessageId(message.getMessageId().toString());
|
||||
command.setTransactionInfo(createTransactionInfo(message.getTransactionId()));
|
||||
|
||||
command.setPriority(message.getPriority());
|
||||
command.setPrioritySupported(isPrioritizedMessages());
|
||||
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
|
||||
command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
|
||||
store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), null, null);
|
||||
|
@ -472,10 +473,12 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
pageFile.tx().execute(new Transaction.Closure<Exception>() {
|
||||
public void execute(Transaction tx) throws Exception {
|
||||
StoredDestination sd = getStoredDestination(dest, tx);
|
||||
sd.orderIndex.resetCursorPosition();
|
||||
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
|
||||
.hasNext();) {
|
||||
Entry<Long, MessageKeys> entry = iterator.next();
|
||||
listener.recoverMessage(loadMessage(entry.getValue().location));
|
||||
Message msg = loadMessage(entry.getValue().location);
|
||||
listener.recoverMessage(msg);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -484,8 +487,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
}
|
||||
}
|
||||
|
||||
long cursorPos = 0;
|
||||
|
||||
|
||||
public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception {
|
||||
indexLock.readLock().lock();
|
||||
try {
|
||||
|
@ -494,19 +496,19 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
StoredDestination sd = getStoredDestination(dest, tx);
|
||||
Entry<Long, MessageKeys> entry = null;
|
||||
int counter = 0;
|
||||
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
|
||||
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
|
||||
.hasNext()
|
||||
&& listener.hasSpace();) {
|
||||
entry = iterator.next();
|
||||
listener.recoverMessage(loadMessage(entry.getValue().location));
|
||||
Message msg = loadMessage(entry.getValue().location);
|
||||
//System.err.println("RECOVER " + msg.getMessageId().getProducerSequenceId());
|
||||
listener.recoverMessage(msg);
|
||||
counter++;
|
||||
if (counter >= maxReturned || listener.hasSpace() == false) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (entry != null) {
|
||||
cursorPos = entry.getKey() + 1;
|
||||
}
|
||||
sd.orderIndex.stoppedIterating();
|
||||
}
|
||||
});
|
||||
}finally {
|
||||
|
@ -515,7 +517,15 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
}
|
||||
|
||||
public void resetBatching() {
|
||||
cursorPos = 0;
|
||||
try {
|
||||
pageFile.tx().execute(new Transaction.Closure<Exception>() {
|
||||
public void execute(Transaction tx) throws Exception {
|
||||
StoredDestination sd = getStoredDestination(dest, tx);
|
||||
sd.orderIndex.resetCursorPosition();}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to reset batching",e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -527,21 +537,22 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
// Hopefully one day the page file supports concurrent read
|
||||
// operations... but for now we must
|
||||
// externally synchronize...
|
||||
Long location;
|
||||
|
||||
indexLock.readLock().lock();
|
||||
try {
|
||||
location = pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>() {
|
||||
public Long execute(Transaction tx) throws IOException {
|
||||
pageFile.tx().execute(new Transaction.Closure<IOException>() {
|
||||
public void execute(Transaction tx) throws IOException {
|
||||
StoredDestination sd = getStoredDestination(dest, tx);
|
||||
return sd.messageIdIndex.get(tx, key);
|
||||
Long location = sd.messageIdIndex.get(tx, key);
|
||||
if (location != null) {
|
||||
sd.orderIndex.setBatch(tx, location);
|
||||
}
|
||||
}
|
||||
});
|
||||
}finally {
|
||||
indexLock.readLock().unlock();
|
||||
}
|
||||
if (location != null) {
|
||||
cursorPos = location + 1;
|
||||
}
|
||||
|
||||
} finally {
|
||||
unlockAsyncJobQueue();
|
||||
}
|
||||
|
@ -723,7 +734,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
// The subscription might not exist.
|
||||
return 0;
|
||||
}
|
||||
cursorPos += 1;
|
||||
MessageOrderCursor moc = new MessageOrderCursor(cursorPos + 1);
|
||||
|
||||
int counter = 0;
|
||||
try {
|
||||
|
@ -732,7 +743,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
if (selector != null) {
|
||||
selectorExpression = SelectorParser.parse(selector);
|
||||
}
|
||||
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
|
||||
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
|
||||
.hasNext();) {
|
||||
Entry<Long, MessageKeys> entry = iterator.next();
|
||||
if (selectorExpression != null) {
|
||||
|
@ -765,9 +776,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
public void execute(Transaction tx) throws Exception {
|
||||
StoredDestination sd = getStoredDestination(dest, tx);
|
||||
Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
|
||||
cursorPos += 1;
|
||||
|
||||
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
|
||||
MessageOrderCursor moc = new MessageOrderCursor(cursorPos + 1);
|
||||
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
|
||||
.hasNext();) {
|
||||
Entry<Long, MessageKeys> entry = iterator.next();
|
||||
listener.recoverMessage(loadMessage(entry.getValue().location));
|
||||
|
@ -787,15 +797,15 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
pageFile.tx().execute(new Transaction.Closure<Exception>() {
|
||||
public void execute(Transaction tx) throws Exception {
|
||||
StoredDestination sd = getStoredDestination(dest, tx);
|
||||
Long cursorPos = sd.subscriptionCursors.get(subscriptionKey);
|
||||
if (cursorPos == null) {
|
||||
cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
|
||||
cursorPos += 1;
|
||||
MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
|
||||
if (moc == null) {
|
||||
long pos = sd.subscriptionAcks.get(tx, subscriptionKey);
|
||||
moc = new MessageOrderCursor(pos+1);
|
||||
}
|
||||
|
||||
Entry<Long, MessageKeys> entry = null;
|
||||
int counter = 0;
|
||||
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
|
||||
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, moc); iterator
|
||||
.hasNext();) {
|
||||
entry = iterator.next();
|
||||
if (listener.recoverMessage(loadMessage(entry.getValue().location))) {
|
||||
|
@ -806,7 +816,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
}
|
||||
}
|
||||
if (entry != null) {
|
||||
sd.subscriptionCursors.put(subscriptionKey, entry.getKey() + 1);
|
||||
MessageOrderCursor copy = sd.orderIndex.cursor.copy();
|
||||
copy.increment();
|
||||
sd.subscriptionCursors.put(subscriptionKey, copy);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
@ -94,6 +94,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
static final int CLOSED_STATE = 1;
|
||||
static final int OPEN_STATE = 2;
|
||||
static final long NOT_ACKED = -1;
|
||||
static final int VERSION = 2;
|
||||
|
||||
|
||||
protected class Metadata {
|
||||
|
@ -104,7 +105,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
protected Location firstInProgressTransactionLocation;
|
||||
protected Location producerSequenceIdTrackerLocation = null;
|
||||
protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
|
||||
|
||||
protected int version = VERSION;
|
||||
public void read(DataInput is) throws IOException {
|
||||
state = is.readInt();
|
||||
destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
|
||||
|
@ -126,6 +127,12 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
}
|
||||
} catch (EOFException expectedOnUpgrade) {
|
||||
}
|
||||
try {
|
||||
version = is.readInt();
|
||||
}catch (EOFException expectedOnUpgrade) {
|
||||
version=1;
|
||||
}
|
||||
LOG.info("KahaDB is version " + version);
|
||||
}
|
||||
|
||||
public void write(DataOutput os) throws IOException {
|
||||
|
@ -152,6 +159,9 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
} else {
|
||||
os.writeBoolean(false);
|
||||
}
|
||||
if (version > 1) {
|
||||
os.writeInt(version);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -974,22 +984,26 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
}
|
||||
|
||||
// Add the message.
|
||||
long id = sd.nextMessageId++;
|
||||
int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
|
||||
long id = sd.orderIndex.getNextMessageId(priority);
|
||||
Long previous = sd.locationIndex.put(tx, location, id);
|
||||
if( previous == null ) {
|
||||
if (previous == null) {
|
||||
previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
|
||||
if( previous == null ) {
|
||||
sd.orderIndex.put(tx, id, new MessageKeys(command.getMessageId(), location));
|
||||
if (previous == null) {
|
||||
sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
|
||||
} else {
|
||||
// If the message ID as indexed, then the broker asked us to store a DUP
|
||||
// message. Bad BOY! Don't do it, and log a warning.
|
||||
LOG.warn("Duplicate message add attempt rejected. Message id: "+command.getMessageId());
|
||||
// If the message ID as indexed, then the broker asked us to
|
||||
// store a DUP
|
||||
// message. Bad BOY! Don't do it, and log a warning.
|
||||
LOG.warn("Duplicate message add attempt rejected. Message id: " + command.getMessageId());
|
||||
// TODO: consider just rolling back the tx.
|
||||
sd.messageIdIndex.put(tx, command.getMessageId(), previous);
|
||||
}
|
||||
} else {
|
||||
// restore the previous value.. Looks like this was a redo of a previously
|
||||
// added message. We don't want to assign it a new id as the other indexes would
|
||||
// restore the previous value.. Looks like this was a redo of a
|
||||
// previously
|
||||
// added message. We don't want to assign it a new id as the other
|
||||
// indexes would
|
||||
// be wrong..
|
||||
//
|
||||
// TODO: consider just rolling back the tx.
|
||||
|
@ -1049,9 +1063,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
|
||||
void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
|
||||
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
|
||||
sd.orderIndex.clear(tx);
|
||||
sd.orderIndex.unload(tx);
|
||||
tx.free(sd.orderIndex.getPageId());
|
||||
sd.orderIndex.remove(tx);
|
||||
|
||||
sd.locationIndex.clear(tx);
|
||||
sd.locationIndex.unload(tx);
|
||||
|
@ -1085,7 +1097,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
sd.subscriptions.put(tx, subscriptionKey, command);
|
||||
long ackLocation=NOT_ACKED;
|
||||
if (!command.getRetroactive()) {
|
||||
ackLocation = sd.nextMessageId-1;
|
||||
ackLocation = sd.orderIndex.nextMessageId-1;
|
||||
}
|
||||
|
||||
sd.subscriptionAcks.put(tx, subscriptionKey, ackLocation);
|
||||
|
@ -1273,17 +1285,17 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
|
||||
}
|
||||
}
|
||||
|
||||
static class StoredDestination {
|
||||
long nextMessageId;
|
||||
BTreeIndex<Long, MessageKeys> orderIndex;
|
||||
|
||||
class StoredDestination {
|
||||
|
||||
MessageOrderIndex orderIndex = new MessageOrderIndex();
|
||||
BTreeIndex<Location, Long> locationIndex;
|
||||
BTreeIndex<String, Long> messageIdIndex;
|
||||
|
||||
// These bits are only set for Topics
|
||||
BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
|
||||
BTreeIndex<String, Long> subscriptionAcks;
|
||||
HashMap<String, Long> subscriptionCursors;
|
||||
HashMap<String, MessageOrderCursor> subscriptionCursors;
|
||||
TreeMap<Long, HashSet<String>> ackPositions;
|
||||
}
|
||||
|
||||
|
@ -1291,7 +1303,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
|
||||
public StoredDestination readPayload(DataInput dataIn) throws IOException {
|
||||
StoredDestination value = new StoredDestination();
|
||||
value.orderIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
|
||||
value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
|
||||
value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
|
||||
value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
|
||||
|
||||
|
@ -1299,11 +1311,15 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
|
||||
value.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
|
||||
}
|
||||
if (metadata.version >= 2) {
|
||||
value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
|
||||
value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
|
||||
dataOut.writeLong(value.orderIndex.getPageId());
|
||||
dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
|
||||
dataOut.writeLong(value.locationIndex.getPageId());
|
||||
dataOut.writeLong(value.messageIdIndex.getPageId());
|
||||
if (value.subscriptions != null) {
|
||||
|
@ -1313,6 +1329,10 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
} else {
|
||||
dataOut.writeBoolean(false);
|
||||
}
|
||||
if (metadata.version >= 2) {
|
||||
dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
|
||||
dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1385,7 +1405,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
if (rc == null) {
|
||||
// Brand new destination.. allocate indexes for it.
|
||||
rc = new StoredDestination();
|
||||
rc.orderIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
|
||||
rc.orderIndex.allocate(tx);
|
||||
rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
|
||||
rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
|
||||
|
||||
|
@ -1397,15 +1417,10 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
}
|
||||
|
||||
// Configure the marshalers and load.
|
||||
rc.orderIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
|
||||
rc.orderIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
|
||||
rc.orderIndex.load(tx);
|
||||
|
||||
// Figure out the next key using the last entry in the destination.
|
||||
Entry<Long, MessageKeys> lastEntry = rc.orderIndex.getLast(tx);
|
||||
if( lastEntry!=null ) {
|
||||
rc.nextMessageId = lastEntry.getKey()+1;
|
||||
}
|
||||
rc.orderIndex.configureLast(tx);
|
||||
|
||||
rc.locationIndex.setKeyMarshaller(LocationMarshaller.INSTANCE);
|
||||
rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
|
||||
|
@ -1427,19 +1442,19 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
rc.subscriptionAcks.load(tx);
|
||||
|
||||
rc.ackPositions = new TreeMap<Long, HashSet<String>>();
|
||||
rc.subscriptionCursors = new HashMap<String, Long>();
|
||||
rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
|
||||
|
||||
for (Iterator<Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
|
||||
Entry<String, Long> entry = iterator.next();
|
||||
addAckLocation(rc, entry.getValue(), entry.getKey());
|
||||
}
|
||||
|
||||
if (rc.nextMessageId == 0) {
|
||||
if (rc.orderIndex.nextMessageId == 0) {
|
||||
// check for existing durable sub all acked out - pull next seq from acks as messages are gone
|
||||
if (!rc.ackPositions.isEmpty()) {
|
||||
Long lastAckedMessageId = rc.ackPositions.lastKey();
|
||||
if (lastAckedMessageId != NOT_ACKED) {
|
||||
rc.nextMessageId = lastAckedMessageId+1;
|
||||
rc.orderIndex.nextMessageId = lastAckedMessageId+1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1486,18 +1501,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
|
||||
// Find all the entries that need to get deleted.
|
||||
ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
|
||||
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
|
||||
Entry<Long, MessageKeys> entry = iterator.next();
|
||||
if (entry.getKey().compareTo(sequenceId) <= 0) {
|
||||
// We don't do the actually delete while we are
|
||||
// iterating the BTree since
|
||||
// iterating would fail.
|
||||
deletes.add(entry);
|
||||
}else {
|
||||
//no point in iterating the in-order sequences anymore
|
||||
break;
|
||||
}
|
||||
}
|
||||
sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
|
||||
|
||||
// Do the actual deletes.
|
||||
for (Entry<Long, MessageKeys> entry : deletes) {
|
||||
|
@ -1816,4 +1820,337 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) {
|
||||
this.databaseLockedWaitDelay = databaseLockedWaitDelay;
|
||||
}
|
||||
|
||||
|
||||
class MessageOrderCursor{
|
||||
long defaultCursorPosition;
|
||||
long lowPriorityCursorPosition;
|
||||
long highPriorityCursorPosition;
|
||||
MessageOrderCursor(){
|
||||
}
|
||||
|
||||
MessageOrderCursor(long position){
|
||||
this.defaultCursorPosition=position;
|
||||
this.lowPriorityCursorPosition=position;
|
||||
this.highPriorityCursorPosition=position;
|
||||
}
|
||||
|
||||
MessageOrderCursor(MessageOrderCursor other){
|
||||
this.defaultCursorPosition=other.defaultCursorPosition;
|
||||
this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
|
||||
this.highPriorityCursorPosition=other.highPriorityCursorPosition;
|
||||
}
|
||||
|
||||
MessageOrderCursor copy() {
|
||||
return new MessageOrderCursor(this);
|
||||
}
|
||||
|
||||
void reset() {
|
||||
this.defaultCursorPosition=0;
|
||||
this.highPriorityCursorPosition=0;
|
||||
this.lowPriorityCursorPosition=0;
|
||||
}
|
||||
|
||||
void increment() {
|
||||
if (defaultCursorPosition!=0) {
|
||||
defaultCursorPosition++;
|
||||
}
|
||||
if (highPriorityCursorPosition!=0) {
|
||||
highPriorityCursorPosition++;
|
||||
}
|
||||
if (lowPriorityCursorPosition!=0) {
|
||||
lowPriorityCursorPosition++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class MessageOrderIndex{
|
||||
long nextMessageId;
|
||||
BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
|
||||
BTreeIndex<Long, MessageKeys> lowPriorityIndex;
|
||||
BTreeIndex<Long, MessageKeys> highPriorityIndex;
|
||||
MessageOrderCursor cursor = new MessageOrderCursor();
|
||||
Long lastDefaultKey;
|
||||
Long lastHighKey;
|
||||
Long lastLowKey;
|
||||
|
||||
|
||||
MessageKeys remove(Transaction tx, Long key) throws IOException {
|
||||
MessageKeys result = defaultPriorityIndex.remove(tx, key);
|
||||
if (result == null && highPriorityIndex!=null) {
|
||||
result = highPriorityIndex.remove(tx, key);
|
||||
if (result ==null && lowPriorityIndex!=null) {
|
||||
result = lowPriorityIndex.remove(tx, key);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
void load(Transaction tx) throws IOException {
|
||||
defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
|
||||
defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
|
||||
defaultPriorityIndex.load(tx);
|
||||
if (metadata.version >= 2) {
|
||||
lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
|
||||
lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
|
||||
lowPriorityIndex.load(tx);
|
||||
|
||||
highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
|
||||
highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
|
||||
highPriorityIndex.load(tx);
|
||||
}
|
||||
}
|
||||
|
||||
void allocate(Transaction tx) throws IOException {
|
||||
defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
|
||||
if (metadata.version >= 2) {
|
||||
lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
|
||||
highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
|
||||
}
|
||||
}
|
||||
|
||||
void configureLast(Transaction tx) throws IOException {
|
||||
// Figure out the next key using the last entry in the destination.
|
||||
if (highPriorityIndex != null) {
|
||||
Entry<Long, MessageKeys> lastEntry = highPriorityIndex.getLast(tx);
|
||||
if (lastEntry != null) {
|
||||
nextMessageId = lastEntry.getKey() + 1;
|
||||
} else {
|
||||
lastEntry = defaultPriorityIndex.getLast(tx);
|
||||
if (lastEntry != null) {
|
||||
nextMessageId = lastEntry.getKey() + 1;
|
||||
} else {
|
||||
lastEntry = lowPriorityIndex.getLast(tx);
|
||||
if (lastEntry != null) {
|
||||
nextMessageId = lastEntry.getKey() + 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Entry<Long, MessageKeys> lastEntry = defaultPriorityIndex.getLast(tx);
|
||||
if (lastEntry != null) {
|
||||
nextMessageId = lastEntry.getKey() + 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void remove(Transaction tx) throws IOException {
|
||||
defaultPriorityIndex.clear(tx);
|
||||
defaultPriorityIndex.unload(tx);
|
||||
tx.free(defaultPriorityIndex.getPageId());
|
||||
if (lowPriorityIndex != null) {
|
||||
lowPriorityIndex.clear(tx);
|
||||
lowPriorityIndex.unload(tx);
|
||||
|
||||
tx.free(lowPriorityIndex.getPageId());
|
||||
}
|
||||
if (highPriorityIndex != null) {
|
||||
highPriorityIndex.clear(tx);
|
||||
highPriorityIndex.unload(tx);
|
||||
tx.free(highPriorityIndex.getPageId());
|
||||
}
|
||||
}
|
||||
|
||||
void resetCursorPosition() {
|
||||
this.cursor.reset();
|
||||
lastDefaultKey = null;
|
||||
lastHighKey = null;
|
||||
lastLowKey = null;
|
||||
}
|
||||
|
||||
void setBatch(Transaction tx, Long sequence) throws IOException {
|
||||
if (sequence != null) {
|
||||
Long nextPosition = new Long(sequence.longValue() + 1);
|
||||
if (defaultPriorityIndex.containsKey(tx, sequence)) {
|
||||
lastDefaultKey = nextPosition;
|
||||
cursor.defaultCursorPosition = nextPosition.longValue();
|
||||
} else if (highPriorityIndex != null) {
|
||||
if (highPriorityIndex.containsKey(tx, sequence)) {
|
||||
lastHighKey = nextPosition;
|
||||
cursor.highPriorityCursorPosition = nextPosition.longValue();
|
||||
} else if (lowPriorityIndex.containsKey(tx, sequence)) {
|
||||
lastLowKey = nextPosition;
|
||||
cursor.lowPriorityCursorPosition = nextPosition.longValue();
|
||||
}
|
||||
} else {
|
||||
lastDefaultKey = nextPosition;
|
||||
cursor.defaultCursorPosition = nextPosition.longValue();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void stoppedIterating() {
|
||||
if (lastDefaultKey!=null) {
|
||||
cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
|
||||
}
|
||||
if (lastHighKey!=null) {
|
||||
cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
|
||||
}
|
||||
if (lastLowKey!=null) {
|
||||
cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
|
||||
}
|
||||
lastDefaultKey = null;
|
||||
lastHighKey = null;
|
||||
lastLowKey = null;
|
||||
}
|
||||
|
||||
void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
|
||||
throws IOException {
|
||||
getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
|
||||
if (highPriorityIndex != null) {
|
||||
getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
|
||||
}
|
||||
if (lowPriorityIndex != null) {
|
||||
getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
|
||||
}
|
||||
}
|
||||
|
||||
void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
|
||||
BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
|
||||
for (Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx); iterator.hasNext();) {
|
||||
Entry<Long, MessageKeys> entry = iterator.next();
|
||||
if (entry.getKey().compareTo(sequenceId) <= 0) {
|
||||
// We don't do the actually delete while we are
|
||||
// iterating the BTree since
|
||||
// iterating would fail.
|
||||
deletes.add(entry);
|
||||
} else {
|
||||
// no point in iterating the in-order sequences anymore
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
long getNextMessageId(int priority) {
|
||||
return nextMessageId++;
|
||||
}
|
||||
|
||||
MessageKeys get(Transaction tx, Long key) throws IOException {
|
||||
MessageKeys result = defaultPriorityIndex.get(tx, key);
|
||||
if (result == null) {
|
||||
result = highPriorityIndex.get(tx, key);
|
||||
if (result == null) {
|
||||
result = lowPriorityIndex.get(tx, key);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
|
||||
if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
|
||||
return defaultPriorityIndex.put(tx, key, value);
|
||||
} else if (priority > javax.jms.Message.DEFAULT_PRIORITY) {
|
||||
return highPriorityIndex.put(tx, key, value);
|
||||
} else {
|
||||
return lowPriorityIndex.put(tx, key, value);
|
||||
}
|
||||
}
|
||||
|
||||
Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
|
||||
return new MessageOrderIterator(tx,cursor);
|
||||
}
|
||||
|
||||
Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
|
||||
return new MessageOrderIterator(tx,m);
|
||||
}
|
||||
|
||||
class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
|
||||
Iterator<Entry<Long, MessageKeys>>currentIterator;
|
||||
final Iterator<Entry<Long, MessageKeys>>highIterator;
|
||||
final Iterator<Entry<Long, MessageKeys>>defaultIterator;
|
||||
final Iterator<Entry<Long, MessageKeys>>lowIterator;
|
||||
Long lastKey;
|
||||
|
||||
|
||||
|
||||
MessageOrderIterator(Transaction tx, MessageOrderCursor m) throws IOException {
|
||||
this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition);
|
||||
if (highPriorityIndex != null) {
|
||||
this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition);
|
||||
} else {
|
||||
this.highIterator = null;
|
||||
}
|
||||
if (lowPriorityIndex != null) {
|
||||
this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition);
|
||||
} else {
|
||||
this.lowIterator = null;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean hasNext() {
|
||||
if (currentIterator == null) {
|
||||
if (highIterator != null) {
|
||||
if (highIterator.hasNext()) {
|
||||
currentIterator = highIterator;
|
||||
return currentIterator.hasNext();
|
||||
}
|
||||
if (defaultIterator.hasNext()) {
|
||||
currentIterator = defaultIterator;
|
||||
return currentIterator.hasNext();
|
||||
}
|
||||
if (lowIterator.hasNext()) {
|
||||
currentIterator = lowIterator;
|
||||
return currentIterator.hasNext();
|
||||
}
|
||||
return false;
|
||||
} else {
|
||||
currentIterator = defaultIterator;
|
||||
return currentIterator.hasNext();
|
||||
}
|
||||
}
|
||||
if (highIterator != null) {
|
||||
if (currentIterator.hasNext()) {
|
||||
return true;
|
||||
}
|
||||
if (currentIterator == highIterator) {
|
||||
if (defaultIterator.hasNext()) {
|
||||
currentIterator = defaultIterator;
|
||||
return currentIterator.hasNext();
|
||||
}
|
||||
if (lowIterator.hasNext()) {
|
||||
currentIterator = lowIterator;
|
||||
return currentIterator.hasNext();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
if (currentIterator == defaultIterator) {
|
||||
if (lowIterator.hasNext()) {
|
||||
currentIterator = lowIterator;
|
||||
return currentIterator.hasNext();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return currentIterator.hasNext();
|
||||
}
|
||||
|
||||
public Entry<Long, MessageKeys> next() {
|
||||
Entry<Long, MessageKeys> result = currentIterator.next();
|
||||
if (result != null) {
|
||||
Long key = result.getKey();
|
||||
if (highIterator != null) {
|
||||
if (currentIterator == defaultIterator) {
|
||||
lastDefaultKey = key;
|
||||
} else if (currentIterator == highIterator) {
|
||||
lastHighKey = key;
|
||||
} else {
|
||||
lastLowKey = key;
|
||||
}
|
||||
} else {
|
||||
lastDefaultKey = key;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -53,6 +53,8 @@ message KahaAddMessageCommand {
|
|||
required KahaDestination destination = 2;
|
||||
required string messageId = 3;
|
||||
required bytes message = 4;
|
||||
optional int32 priority =5 [default = 4];
|
||||
optional bool prioritySupported = 6;
|
||||
}
|
||||
|
||||
message KahaRemoveMessageCommand {
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.store.kahadb;
|
||||
|
||||
import junit.framework.Test;
|
||||
import org.apache.activemq.store.MessagePriorityTest;
|
||||
import org.apache.activemq.store.PersistenceAdapter;
|
||||
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||
|
||||
public class KahaDBMessagePriorityTest extends MessagePriorityTest {
|
||||
|
||||
@Override
|
||||
protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception {
|
||||
KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
|
||||
adapter.setConcurrentStoreAndDispatchQueues(false);
|
||||
adapter.setConcurrentStoreAndDispatchTopics(false);
|
||||
adapter.deleteAllMessages();
|
||||
return adapter;
|
||||
}
|
||||
|
||||
public static Test suite() {
|
||||
return suite(KahaDBMessagePriorityTest.class);
|
||||
}
|
||||
|
||||
}
|
|
@ -16,25 +16,20 @@
|
|||
*/
|
||||
package org.apache.activemq.store.kahadb;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.util.IOHelper;
|
||||
|
||||
import javax.jms.*;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.io.FileNotFoundException;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author chirino
|
||||
*/
|
||||
public class KahaDBVersionTest extends TestCase {
|
||||
|
||||
|
@ -49,11 +44,11 @@ public class KahaDBVersionTest extends TestCase {
|
|||
|
||||
}
|
||||
|
||||
|
||||
public void testCreateStore() throws Exception {
|
||||
|
||||
public void XtestCreateStore() throws Exception {
|
||||
KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
|
||||
File dir = new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion1");
|
||||
IOHelper.deleteFile(dir);
|
||||
KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
|
||||
kaha.setDirectory(dir);
|
||||
kaha.setJournalMaxFileLength(1024*1024);
|
||||
BrokerService broker = createBroker(kaha);
|
||||
|
@ -76,17 +71,15 @@ public class KahaDBVersionTest extends TestCase {
|
|||
Message msg = session.createTextMessage("test message:"+i);
|
||||
producer.send(msg);
|
||||
}
|
||||
connection.close();
|
||||
connection.stop();
|
||||
broker.stop();
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
public void XtestVersionConversion() throws Exception{
|
||||
public void testVersionConversion() throws Exception{
|
||||
File testDir = new File("target/activemq-data/kahadb/versionDB");
|
||||
IOHelper.deleteFile(testDir);
|
||||
IOHelper.copyFile(VERSION_1_DB, testDir);
|
||||
|
||||
KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
|
||||
kaha.setDirectory(testDir);
|
||||
kaha.setJournalMaxFileLength(1024*1024);
|
||||
|
@ -100,18 +93,21 @@ public class KahaDBVersionTest extends TestCase {
|
|||
Queue queue = session.createQueue("test.queue");
|
||||
MessageConsumer queueConsumer = session.createConsumer(queue);
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
TextMessage msg = (TextMessage) queueConsumer.receive();
|
||||
System.err.println(msg.getText());
|
||||
TextMessage msg = (TextMessage) queueConsumer.receive(10000);
|
||||
//System.err.println(msg.getText());
|
||||
assertNotNull(msg);
|
||||
}
|
||||
MessageConsumer topicConsumer = session.createDurableSubscriber(topic,"test");
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
TextMessage msg = (TextMessage) topicConsumer.receive();
|
||||
System.err.println(msg.getText());
|
||||
TextMessage msg = (TextMessage) topicConsumer.receive(10000);
|
||||
//System.err.println(msg.getText());
|
||||
assertNotNull(msg);
|
||||
}
|
||||
broker.stop();
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue