mirror of https://github.com/apache/activemq.git
resolve https://issues.apache.org/activemq/browse/AMQ-2985 - issue with duplicates. track acklocations at send time such that out of order acks can remove messages, ensures unmatched messages do not build up. track priority with subscriptionAcks such that a restart or batch reset can select the correct cursor and not wrap around producing duplicates. ensure orderly upgrade of store. additional long running test (4 min) that validates
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1038276 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
32db9c5a93
commit
8871c67e8f
|
@ -718,17 +718,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
}
|
||||
}
|
||||
|
||||
// an ack for an unmatched message is stored as a negative sequence id
|
||||
// if sub has been getting unmatched acks, we need to reset
|
||||
protected Long resetForSelectors(SubscriptionInfo info, Long position) {
|
||||
if (info.getSelector() != null) {
|
||||
if (position < NOT_ACKED) {
|
||||
position = NOT_ACKED;
|
||||
}
|
||||
}
|
||||
return position;
|
||||
}
|
||||
|
||||
public int getMessageCount(String clientId, String subscriptionName) throws IOException {
|
||||
final String subscriptionKey = subscriptionKey(clientId, subscriptionName);
|
||||
final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName);
|
||||
|
@ -737,12 +726,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
|
||||
public Integer execute(Transaction tx) throws IOException {
|
||||
StoredDestination sd = getStoredDestination(dest, tx);
|
||||
Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
|
||||
LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
|
||||
if (cursorPos == null) {
|
||||
// The subscription might not exist.
|
||||
return 0;
|
||||
}
|
||||
cursorPos = resetForSelectors(info, cursorPos);
|
||||
|
||||
int counter = 0;
|
||||
try {
|
||||
|
@ -752,7 +740,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
selectorExpression = SelectorParser.parse(selector);
|
||||
}
|
||||
sd.orderIndex.resetCursorPosition();
|
||||
sd.orderIndex.setBatch(tx, extractSequenceId(cursorPos));
|
||||
sd.orderIndex.setBatch(tx, cursorPos);
|
||||
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
|
||||
.hasNext();) {
|
||||
Entry<Long, MessageKeys> entry = iterator.next();
|
||||
|
@ -787,9 +775,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
pageFile.tx().execute(new Transaction.Closure<Exception>() {
|
||||
public void execute(Transaction tx) throws Exception {
|
||||
StoredDestination sd = getStoredDestination(dest, tx);
|
||||
Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
|
||||
cursorPos = resetForSelectors(info, cursorPos);
|
||||
sd.orderIndex.setBatch(tx, extractSequenceId(cursorPos));
|
||||
LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey);
|
||||
sd.orderIndex.setBatch(tx, cursorPos);
|
||||
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
|
||||
.hasNext();) {
|
||||
Entry<Long, MessageKeys> entry = iterator.next();
|
||||
|
@ -815,9 +802,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
sd.orderIndex.resetCursorPosition();
|
||||
MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey);
|
||||
if (moc == null) {
|
||||
Long pos = sd.subscriptionAcks.get(tx, subscriptionKey);
|
||||
pos = resetForSelectors(info, pos);
|
||||
sd.orderIndex.setBatch(tx, extractSequenceId(pos));
|
||||
LastAck pos = sd.subscriptionAcks.get(tx, subscriptionKey);
|
||||
sd.orderIndex.setBatch(tx, pos);
|
||||
moc = sd.orderIndex.cursor;
|
||||
} else {
|
||||
sd.orderIndex.cursor.sync(moc);
|
||||
|
@ -839,9 +825,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
if (entry != null) {
|
||||
MessageOrderCursor copy = sd.orderIndex.cursor.copy();
|
||||
sd.subscriptionCursors.put(subscriptionKey, copy);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("updated moc: " + copy + ", recovered: " + counter);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
@ -983,7 +983,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
|
||||
// Skip adding the message to the index if this is a topic and there are
|
||||
// no subscriptions.
|
||||
if (sd.subscriptions != null && sd.ackPositions.isEmpty(tx)) {
|
||||
if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -995,6 +995,9 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
|
||||
if (previous == null) {
|
||||
sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
|
||||
if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
|
||||
addAckLocationForNewMessage(tx, sd, id);
|
||||
}
|
||||
} else {
|
||||
// If the message ID as indexed, then the broker asked us to
|
||||
// store a DUP
|
||||
|
@ -1018,13 +1021,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
|
||||
}
|
||||
|
||||
protected Long extractSequenceId(Long prev) {
|
||||
if (prev < NOT_ACKED) {
|
||||
prev = Math.abs(prev) + UNMATCHED_SEQ;
|
||||
}
|
||||
return prev;
|
||||
}
|
||||
|
||||
void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
|
||||
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
|
||||
if (!command.hasSubscriptionKey()) {
|
||||
|
@ -1046,26 +1042,13 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
// Make sure it's a valid message id...
|
||||
if (sequence != null) {
|
||||
String subscriptionKey = command.getSubscriptionKey();
|
||||
Long ackSequenceToStore = sequence;
|
||||
if (command.getAck() == UNMATCHED) {
|
||||
// store negative sequence to indicate that it was unmatched
|
||||
ackSequenceToStore = new Long(UNMATCHED_SEQ - sequence);
|
||||
}
|
||||
|
||||
Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, ackSequenceToStore);
|
||||
if (prev != null) {
|
||||
if (ackSequenceToStore != sequence) {
|
||||
// unmatched, need to add ack locations for the intermediate sequences
|
||||
for (long matchedGapSequence = extractSequenceId(prev) + 1; matchedGapSequence < sequence; matchedGapSequence++) {
|
||||
addAckLocation(tx, sd, matchedGapSequence, subscriptionKey);
|
||||
}
|
||||
if (command.getAck() != UNMATCHED) {
|
||||
sd.orderIndex.get(tx, sequence);
|
||||
byte priority = sd.orderIndex.lastGetPriority();
|
||||
sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
|
||||
}
|
||||
// The following method handles deleting un-referenced messages.
|
||||
removeAckLocation(tx, sd, subscriptionKey, extractSequenceId(prev));
|
||||
}
|
||||
|
||||
// Add it to the new location set.
|
||||
addAckLocation(tx, sd, sequence, subscriptionKey);
|
||||
removeAckLocation(tx, sd, subscriptionKey, sequence);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1127,22 +1110,19 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
long ackLocation=NOT_ACKED;
|
||||
if (!command.getRetroactive()) {
|
||||
ackLocation = sd.orderIndex.nextMessageId-1;
|
||||
} else {
|
||||
addAckLocationForRetroactiveSub(tx, sd, ackLocation, subscriptionKey);
|
||||
}
|
||||
|
||||
sd.subscriptionAcks.put(tx, subscriptionKey, ackLocation);
|
||||
addAckLocation(tx, sd, ackLocation, subscriptionKey);
|
||||
sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
|
||||
} else {
|
||||
// delete the sub...
|
||||
String subscriptionKey = command.getSubscriptionKey();
|
||||
sd.subscriptions.remove(tx, subscriptionKey);
|
||||
Long prev = sd.subscriptionAcks.remove(tx, subscriptionKey);
|
||||
if( prev!=null ) {
|
||||
removeAckLocation(tx, sd, subscriptionKey, extractSequenceId(prev));
|
||||
sd.subscriptionAcks.remove(tx, subscriptionKey);
|
||||
removeAckLocationsForSub(tx, sd, subscriptionKey);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* @param tx
|
||||
* @throws IOException
|
||||
|
@ -1319,6 +1299,63 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
}
|
||||
}
|
||||
|
||||
class LastAck {
|
||||
long lastAckedSequence;
|
||||
byte priority;
|
||||
|
||||
public LastAck(LastAck source) {
|
||||
this.lastAckedSequence = source.lastAckedSequence;
|
||||
this.priority = source.priority;
|
||||
}
|
||||
|
||||
public LastAck() {
|
||||
this.priority = MessageOrderIndex.HI;
|
||||
}
|
||||
|
||||
public LastAck(long ackLocation) {
|
||||
this.lastAckedSequence = ackLocation;
|
||||
this.priority = MessageOrderIndex.HI;
|
||||
}
|
||||
|
||||
public LastAck(long ackLocation, byte priority) {
|
||||
this.lastAckedSequence = ackLocation;
|
||||
this.priority = priority;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "[" + lastAckedSequence + ":" + priority + "]";
|
||||
}
|
||||
}
|
||||
|
||||
protected class LastAckMarshaller implements Marshaller<LastAck> {
|
||||
|
||||
public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
|
||||
dataOut.writeLong(object.lastAckedSequence);
|
||||
dataOut.writeByte(object.priority);
|
||||
}
|
||||
|
||||
public LastAck readPayload(DataInput dataIn) throws IOException {
|
||||
LastAck lastAcked = new LastAck();
|
||||
lastAcked.lastAckedSequence = dataIn.readLong();
|
||||
if (metadata.version >= 3) {
|
||||
lastAcked.priority = dataIn.readByte();
|
||||
}
|
||||
return lastAcked;
|
||||
}
|
||||
|
||||
public int getFixedSize() {
|
||||
return 9;
|
||||
}
|
||||
|
||||
public LastAck deepCopy(LastAck source) {
|
||||
return new LastAck(source);
|
||||
}
|
||||
|
||||
public boolean isDeepCopySupported() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
class StoredDestination {
|
||||
|
||||
MessageOrderIndex orderIndex = new MessageOrderIndex();
|
||||
|
@ -1327,7 +1364,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
|
||||
// These bits are only set for Topics
|
||||
BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
|
||||
BTreeIndex<String, Long> subscriptionAcks;
|
||||
BTreeIndex<String, LastAck> subscriptionAcks;
|
||||
HashMap<String, MessageOrderCursor> subscriptionCursors;
|
||||
BTreeIndex<Long, HashSet<String>> ackPositions;
|
||||
}
|
||||
|
@ -1342,7 +1379,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
|
||||
if (dataIn.readBoolean()) {
|
||||
value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
|
||||
value.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
|
||||
value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
|
||||
if (metadata.version >= 3) {
|
||||
value.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
|
||||
} else {
|
||||
|
@ -1482,7 +1519,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
|
||||
if (topic) {
|
||||
rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
|
||||
rc.subscriptionAcks = new BTreeIndex<String, Long>(pageFile, tx.allocate());
|
||||
rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
|
||||
rc.ackPositions = new BTreeIndex<Long, HashSet<String>>(pageFile, tx.allocate());
|
||||
}
|
||||
metadata.destinations.put(tx, key, rc);
|
||||
|
@ -1510,7 +1547,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
rc.subscriptions.load(tx);
|
||||
|
||||
rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
|
||||
rc.subscriptionAcks.setValueMarshaller(LongMarshaller.INSTANCE);
|
||||
rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
|
||||
rc.subscriptionAcks.load(tx);
|
||||
|
||||
rc.ackPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
|
||||
|
@ -1520,19 +1557,27 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
|
||||
|
||||
if (metadata.version < 3) {
|
||||
// on upgrade need to fill ackLocation
|
||||
for (Iterator<Entry<String, Long>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
|
||||
Entry<String, Long> entry = iterator.next();
|
||||
addAckLocation(tx, rc, extractSequenceId(entry.getValue()), entry.getKey());
|
||||
|
||||
// on upgrade need to fill ackLocation with available messages past last ack
|
||||
for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
|
||||
Entry<String, LastAck> entry = iterator.next();
|
||||
for (Iterator<Entry<Long, MessageKeys>> orderIterator =
|
||||
rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
|
||||
Long sequence = orderIterator.next().getKey();
|
||||
addAckLocation(tx, rc, sequence, entry.getKey());
|
||||
}
|
||||
// modify so it is upgraded
|
||||
rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
if (rc.orderIndex.nextMessageId == 0) {
|
||||
// check for existing durable sub all acked out - pull next seq from acks as messages are gone
|
||||
if (!rc.ackPositions.isEmpty(tx)) {
|
||||
Long lastAckedMessageId = rc.ackPositions.getLast(tx).getKey();
|
||||
if (lastAckedMessageId != NOT_ACKED) {
|
||||
rc.orderIndex.nextMessageId = lastAckedMessageId+1;
|
||||
if (!rc.subscriptionAcks.isEmpty(tx)) {
|
||||
for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
|
||||
Entry<String, LastAck> entry = iterator.next();
|
||||
rc.orderIndex.nextMessageId =
|
||||
Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1546,11 +1591,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
return rc;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param sd
|
||||
* @param messageSequence
|
||||
* @param subscriptionKey
|
||||
*/
|
||||
private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
|
||||
HashSet<String> hs = sd.ackPositions.get(tx, messageSequence);
|
||||
if (hs == null) {
|
||||
|
@ -1561,6 +1601,34 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
sd.ackPositions.put(tx, messageSequence, hs);
|
||||
}
|
||||
|
||||
// new sub is interested in potentially all existing messages
|
||||
private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
|
||||
for (Iterator<Entry<Long, HashSet<String>>> iterator = sd.ackPositions.iterator(tx, messageSequence); iterator.hasNext(); ) {
|
||||
Entry<Long, HashSet<String>> entry = iterator.next();
|
||||
entry.getValue().add(subscriptionKey);
|
||||
sd.ackPositions.put(tx, entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
// on a new message add, all existing subs are interested in this message
|
||||
private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException {
|
||||
HashSet hs = new HashSet<String>();
|
||||
for (Iterator<Entry<String, LastAck>> iterator = sd.subscriptionAcks.iterator(tx); iterator.hasNext();) {
|
||||
Entry<String, LastAck> entry = iterator.next();
|
||||
hs.add(entry.getKey());
|
||||
}
|
||||
sd.ackPositions.put(tx, messageSequence, hs);
|
||||
}
|
||||
|
||||
private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
|
||||
if (!sd.ackPositions.isEmpty(tx)) {
|
||||
Long end = sd.ackPositions.getLast(tx).getKey();
|
||||
for (Long sequence = sd.ackPositions.getFirst(tx).getKey(); sequence <= end; sequence++) {
|
||||
removeAckLocation(tx, sd, subscriptionKey, sequence);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param tx
|
||||
* @param sd
|
||||
|
@ -1578,11 +1646,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
HashSet<String> firstSet = sd.ackPositions.getFirst(tx).getValue();
|
||||
sd.ackPositions.remove(tx, sequenceId);
|
||||
|
||||
// Did we just empty out the first set in the
|
||||
// ordered list of ack locations? Then it's time to
|
||||
// delete some messages.
|
||||
if (hs == firstSet) {
|
||||
|
||||
// Find all the entries that need to get deleted.
|
||||
ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
|
||||
sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
|
||||
|
@ -1597,7 +1660,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String key(KahaDestination destination) {
|
||||
return destination.getType().getNumber() + ":" + destination.getName();
|
||||
|
@ -1961,6 +2023,10 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
}
|
||||
|
||||
class MessageOrderIndex {
|
||||
static final byte HI = 9;
|
||||
static final byte LO = 0;
|
||||
static final byte DEF = 4;
|
||||
|
||||
long nextMessageId;
|
||||
BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
|
||||
BTreeIndex<Long, MessageKeys> lowPriorityIndex;
|
||||
|
@ -1969,7 +2035,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
Long lastDefaultKey;
|
||||
Long lastHighKey;
|
||||
Long lastLowKey;
|
||||
|
||||
byte lastGetPriority;
|
||||
|
||||
MessageKeys remove(Transaction tx, Long key) throws IOException {
|
||||
MessageKeys result = defaultPriorityIndex.remove(tx, key);
|
||||
|
@ -2073,6 +2139,29 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
}
|
||||
}
|
||||
|
||||
void setBatch(Transaction tx, LastAck last) throws IOException {
|
||||
setBatch(tx, last.lastAckedSequence);
|
||||
if (cursor.defaultCursorPosition == 0
|
||||
&& cursor.highPriorityCursorPosition == 0
|
||||
&& cursor.lowPriorityCursorPosition == 0) {
|
||||
long next = last.lastAckedSequence + 1;
|
||||
switch (last.priority) {
|
||||
case DEF:
|
||||
cursor.defaultCursorPosition = next;
|
||||
cursor.highPriorityCursorPosition = next;
|
||||
break;
|
||||
case HI:
|
||||
cursor.highPriorityCursorPosition = next;
|
||||
break;
|
||||
case LO:
|
||||
cursor.lowPriorityCursorPosition = next;
|
||||
cursor.defaultCursorPosition = next;
|
||||
cursor.highPriorityCursorPosition = next;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void stoppedIterating() {
|
||||
if (lastDefaultKey!=null) {
|
||||
cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
|
||||
|
@ -2116,7 +2205,12 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
result = highPriorityIndex.get(tx, key);
|
||||
if (result == null) {
|
||||
result = lowPriorityIndex.get(tx, key);
|
||||
lastGetPriority = LO;
|
||||
} else {
|
||||
lastGetPriority = HI;
|
||||
}
|
||||
} else {
|
||||
lastGetPriority = DEF;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
@ -2139,6 +2233,10 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
|
|||
return new MessageOrderIterator(tx,m);
|
||||
}
|
||||
|
||||
public byte lastGetPriority() {
|
||||
return lastGetPriority;
|
||||
}
|
||||
|
||||
class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
|
||||
Iterator<Entry<Long, MessageKeys>>currentIterator;
|
||||
final Iterator<Entry<Long, MessageKeys>>highIterator;
|
||||
|
|
|
@ -23,6 +23,8 @@ import org.apache.activemq.command.ActiveMQQueue;
|
|||
import org.apache.activemq.util.IOHelper;
|
||||
|
||||
import javax.jms.*;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
|
@ -33,11 +35,15 @@ import java.io.FileNotFoundException;
|
|||
*/
|
||||
public class KahaDBVersionTest extends TestCase {
|
||||
|
||||
static final Log LOG = LogFactory.getLog(KahaDBVersionTest.class);
|
||||
final static File VERSION_1_DB= new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion1");
|
||||
final static File VERSION_2_DB= new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2");
|
||||
|
||||
BrokerService broker = null;
|
||||
|
||||
protected BrokerService createBroker(KahaDBPersistenceAdapter kaha) throws Exception {
|
||||
|
||||
BrokerService broker = new BrokerService();
|
||||
broker = new BrokerService();
|
||||
broker.setUseJmx(false);
|
||||
broker.setPersistenceAdapter(kaha);
|
||||
broker.start();
|
||||
|
@ -45,6 +51,11 @@ public class KahaDBVersionTest extends TestCase {
|
|||
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
if (broker != null) {
|
||||
broker.stop();
|
||||
}
|
||||
}
|
||||
|
||||
public void XtestCreateStore() throws Exception {
|
||||
KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
|
||||
|
@ -57,12 +68,12 @@ public class KahaDBVersionTest extends TestCase {
|
|||
Connection connection = cf.createConnection();
|
||||
connection.setClientID("test");
|
||||
connection.start();
|
||||
producerSomeMessages(connection);
|
||||
producerSomeMessages(connection, 1000);
|
||||
connection.close();
|
||||
broker.stop();
|
||||
}
|
||||
|
||||
private void producerSomeMessages(Connection connection) throws Exception {
|
||||
private void producerSomeMessages(Connection connection, int numToSend) throws Exception {
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Topic topic = session.createTopic("test.topic");
|
||||
Queue queue = session.createQueue("test.queue");
|
||||
|
@ -70,15 +81,17 @@ public class KahaDBVersionTest extends TestCase {
|
|||
consumer.close();
|
||||
MessageProducer producer = session.createProducer(topic);
|
||||
producer.setPriority(9);
|
||||
for (int i =0; i < 1000; i++) {
|
||||
for (int i =0; i < numToSend; i++) {
|
||||
Message msg = session.createTextMessage("test message:"+i);
|
||||
producer.send(msg);
|
||||
}
|
||||
LOG.info("sent " + numToSend +" to topic");
|
||||
producer = session.createProducer(queue);
|
||||
for (int i =0; i < 1000; i++) {
|
||||
for (int i =0; i < numToSend; i++) {
|
||||
Message msg = session.createTextMessage("test message:"+i);
|
||||
producer.send(msg);
|
||||
}
|
||||
LOG.info("sent " + numToSend +" to queue");
|
||||
}
|
||||
|
||||
public void testVersion1Conversion() throws Exception{
|
||||
|
@ -94,6 +107,7 @@ public class KahaDBVersionTest extends TestCase {
|
|||
File testDir = new File("target/activemq-data/kahadb/versionDB");
|
||||
IOHelper.deleteFile(testDir);
|
||||
IOHelper.copyFile(existingStore, testDir);
|
||||
final int numToSend = 1000;
|
||||
|
||||
// on repeat store will be upgraded
|
||||
for (int repeats = 0; repeats < 3; repeats++) {
|
||||
|
@ -111,21 +125,27 @@ public class KahaDBVersionTest extends TestCase {
|
|||
|
||||
if (repeats > 0) {
|
||||
// upgraded store will be empty so generated some more messages
|
||||
producerSomeMessages(connection);
|
||||
producerSomeMessages(connection, numToSend);
|
||||
}
|
||||
|
||||
MessageConsumer queueConsumer = session.createConsumer(queue);
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
int count = 0;
|
||||
for (int i = 0; i < (repeats == 0 ? 1000 : numToSend); i++) {
|
||||
TextMessage msg = (TextMessage) queueConsumer.receive(10000);
|
||||
count++;
|
||||
//System.err.println(msg.getText());
|
||||
assertNotNull(msg);
|
||||
}
|
||||
LOG.info("Consumed " + count + " from queue");
|
||||
count = 0;
|
||||
MessageConsumer topicConsumer = session.createDurableSubscriber(topic, "test");
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
for (int i = 0; i < (repeats == 0 ? 1000 : numToSend); i++) {
|
||||
TextMessage msg = (TextMessage) topicConsumer.receive(10000);
|
||||
count++;
|
||||
//System.err.println(msg.getText());
|
||||
assertNotNull(msg);
|
||||
}
|
||||
LOG.info("Consumed " + count + " from topic");
|
||||
connection.close();
|
||||
|
||||
broker.stop();
|
||||
|
|
|
@ -0,0 +1,678 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.usecases;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.jms.*;
|
||||
import java.io.File;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
import java.util.HashSet;
|
||||
import java.util.Vector;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
// see https://issues.apache.org/activemq/browse/AMQ-2985
|
||||
// this demonstrated receiving old messages eventually along with validating order receipt
|
||||
public class DurableSubProcessTest extends org.apache.activemq.TestSupport {
|
||||
private static final Log LOG = LogFactory.getLog(DurableSubProcessTest.class);
|
||||
public static final long RUNTIME = 4 * 60 * 1000;
|
||||
|
||||
public static final int SERVER_SLEEP = 2 * 1000; // max
|
||||
public static final int CARGO_SIZE = 10; // max
|
||||
|
||||
public static final int MAX_CLIENTS = 7;
|
||||
public static final Random CLIENT_LIFETIME = new Random(30 * 1000, 2 * 60 * 1000);
|
||||
public static final Random CLIENT_ONLINE = new Random(2 * 1000, 15 * 1000);
|
||||
public static final Random CLIENT_OFFLINE = new Random(1 * 1000, 20 * 1000);
|
||||
|
||||
public static final boolean PERSISTENT_BROKER = true;
|
||||
public static final boolean ALLOW_SUBSCRIPTION_ABANDONMENT = true;
|
||||
|
||||
|
||||
private BrokerService broker;
|
||||
private ActiveMQTopic topic;
|
||||
|
||||
private ClientManager clientManager;
|
||||
private Server server;
|
||||
private HouseKeeper houseKeeper;
|
||||
|
||||
static final Vector<Throwable> exceptions = new Vector<Throwable>();
|
||||
|
||||
@Test
|
||||
public void testProcess() {
|
||||
try {
|
||||
server.start();
|
||||
clientManager.start();
|
||||
|
||||
if (ALLOW_SUBSCRIPTION_ABANDONMENT)
|
||||
houseKeeper.start();
|
||||
|
||||
Thread.sleep(RUNTIME);
|
||||
assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
|
||||
}
|
||||
catch (Throwable e) {
|
||||
exit("DurableSubProcessTest.testProcess failed.", e);
|
||||
}
|
||||
LOG.info("DONE.");
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates batch of messages in a transaction periodically.
|
||||
* The last message in the transaction is always a special
|
||||
* message what contains info about the whole transaction.
|
||||
* <p>Notifies the clients about the created messages also.
|
||||
*/
|
||||
final class Server extends Thread {
|
||||
|
||||
final String url = "vm://" + DurableSubProcessTest.this.getName() + "?" +
|
||||
"jms.redeliveryPolicy.maximumRedeliveries=2&jms.redeliveryPolicy.initialRedeliveryDelay=500&" +
|
||||
"jms.producerWindowSize=20971520&jms.prefetchPolicy.all=100&" +
|
||||
"jms.copyMessageOnSend=false&jms.disableTimeStampsByDefault=false&" +
|
||||
"jms.alwaysSyncSend=true&jms.dispatchAsync=false&" +
|
||||
"jms.watchTopicAdvisories=false&" +
|
||||
"waitForStart=200&create=false";
|
||||
final ConnectionFactory cf = new ActiveMQConnectionFactory(url);
|
||||
|
||||
final Object sendMutex = new Object();
|
||||
final String[] cargos = new String[500];
|
||||
|
||||
int transRover = 0;
|
||||
int messageRover = 0;
|
||||
|
||||
public Server() {
|
||||
super("Server");
|
||||
setDaemon(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
while (true) {
|
||||
DurableSubProcessTest.sleepRandom(SERVER_SLEEP);
|
||||
send();
|
||||
}
|
||||
}
|
||||
catch (Throwable e) {
|
||||
exit("Server.run failed", e);
|
||||
}
|
||||
}
|
||||
|
||||
public void send() throws JMSException {
|
||||
// do not create new clients now
|
||||
// ToDo: Test this case later.
|
||||
synchronized (sendMutex) {
|
||||
int trans = ++transRover;
|
||||
boolean relevantTrans = random(2) > 1;
|
||||
ClientType clientType = relevantTrans ? ClientType.randomClientType() : null; // sends this types
|
||||
int count = random(200);
|
||||
|
||||
LOG.info("Sending Trans[id=" + trans + ", count=" + count + ", clientType=" + clientType + "]");
|
||||
|
||||
Connection con = cf.createConnection();
|
||||
Session sess = con.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer prod = sess.createProducer(null);
|
||||
|
||||
for (int i = 0; i < count; i++) {
|
||||
Message message = sess.createMessage();
|
||||
message.setIntProperty("ID", ++messageRover);
|
||||
String type = clientType != null ? clientType.randomMessageType() : ClientType.randomNonRelevantMessageType();
|
||||
message.setStringProperty("TYPE", type);
|
||||
|
||||
if (CARGO_SIZE > 0)
|
||||
message.setStringProperty("CARGO", getCargo(CARGO_SIZE));
|
||||
|
||||
prod.send(topic, message);
|
||||
clientManager.onServerMessage(message);
|
||||
}
|
||||
|
||||
Message message = sess.createMessage();
|
||||
message.setIntProperty("ID", ++messageRover);
|
||||
message.setIntProperty("TRANS", trans);
|
||||
message.setBooleanProperty("COMMIT", true);
|
||||
message.setBooleanProperty("RELEVANT", relevantTrans);
|
||||
prod.send(topic, message);
|
||||
clientManager.onServerMessage(message);
|
||||
|
||||
sess.commit();
|
||||
sess.close();
|
||||
con.close();
|
||||
}
|
||||
}
|
||||
|
||||
private String getCargo(int length) {
|
||||
if (length == 0)
|
||||
return null;
|
||||
|
||||
if (length < cargos.length) {
|
||||
String result = cargos[length];
|
||||
if (result == null) {
|
||||
result = getCargoImpl(length);
|
||||
cargos[length] = result;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
return getCargoImpl(length);
|
||||
}
|
||||
|
||||
private String getCargoImpl(int length) {
|
||||
StringBuilder sb = new StringBuilder(length);
|
||||
for (int i = length; --i >=0; ) {
|
||||
sb.append('a');
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clients listen on different messages in the topic.
|
||||
* The 'TYPE' property helps the client to select the
|
||||
* proper messages.
|
||||
*/
|
||||
private enum ClientType {
|
||||
A ("a", "b", "c"),
|
||||
B ("c", "d", "e"),
|
||||
C ("d", "e", "f"),
|
||||
D ("g", "h");
|
||||
|
||||
public final String[] messageTypes;
|
||||
public final HashSet<String> messageTypeSet;
|
||||
public final String selector;
|
||||
|
||||
ClientType(String... messageTypes) {
|
||||
this.messageTypes = messageTypes;
|
||||
messageTypeSet = new HashSet<String>(Arrays.asList(messageTypes));
|
||||
|
||||
StringBuilder sb = new StringBuilder("TYPE in (");
|
||||
for (int i = 0; i < messageTypes.length; i++) {
|
||||
if (i > 0)
|
||||
sb.append(", ");
|
||||
sb.append('\'').append(messageTypes[i]).append('\'');
|
||||
}
|
||||
sb.append(')');
|
||||
selector = sb.toString();
|
||||
}
|
||||
|
||||
public static ClientType randomClientType() {
|
||||
return values()[DurableSubProcessTest.random(values().length - 1)];
|
||||
}
|
||||
|
||||
public final String randomMessageType() {
|
||||
return messageTypes[DurableSubProcessTest.random(messageTypes.length - 1)];
|
||||
}
|
||||
|
||||
public static String randomNonRelevantMessageType() {
|
||||
return Integer.toString(DurableSubProcessTest.random(20));
|
||||
}
|
||||
|
||||
public final boolean isRelevant(String messageType) {
|
||||
return messageTypeSet.contains(messageType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final String toString() {
|
||||
return this.name() /*+ '[' + selector + ']'*/;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates new cliens.
|
||||
*/
|
||||
private final class ClientManager extends Thread {
|
||||
|
||||
private int clientRover = 0;
|
||||
|
||||
private final CopyOnWriteArrayList<Client> clients = new CopyOnWriteArrayList<Client>();
|
||||
|
||||
public ClientManager() {
|
||||
super("ClientManager");
|
||||
setDaemon(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
while (true) {
|
||||
if (clients.size() < MAX_CLIENTS)
|
||||
createNewClient();
|
||||
|
||||
int size = clients.size();
|
||||
sleepRandom(size * 3 * 1000, size * 6 * 1000);
|
||||
}
|
||||
}
|
||||
catch (Throwable e) {
|
||||
exit("ClientManager.run failed.", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void createNewClient() throws JMSException {
|
||||
ClientType type = ClientType.randomClientType();
|
||||
|
||||
Client client;
|
||||
synchronized (server.sendMutex) {
|
||||
client = new Client(++clientRover, type, CLIENT_LIFETIME, CLIENT_ONLINE, CLIENT_OFFLINE);
|
||||
clients.add(client);
|
||||
}
|
||||
client.start();
|
||||
|
||||
LOG.info(client.toString() + " created. " + this);
|
||||
}
|
||||
|
||||
public void removeClient(Client client) {
|
||||
clients.remove(client);
|
||||
}
|
||||
|
||||
public void onServerMessage(Message message) throws JMSException {
|
||||
for (Client client: clients) {
|
||||
client.onServerMessage(message);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder("ClientManager[count=");
|
||||
sb.append(clients.size());
|
||||
sb.append(", clients=");
|
||||
boolean sep = false;
|
||||
for (Client client: clients) {
|
||||
if (sep) sb.append(", ");
|
||||
else sep = true;
|
||||
sb.append(client.toString());
|
||||
}
|
||||
sb.append(']');
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Consumes massages from a durable subscription.
|
||||
* Goes online/offline periodically. Checks the incoming messages
|
||||
* against the sent messages of the server.
|
||||
*/
|
||||
private final class Client extends Thread {
|
||||
|
||||
String url = "failover:(tcp://localhost:61656?wireFormat.maxInactivityDuration=0)?" +
|
||||
"jms.watchTopicAdvisories=false&" +
|
||||
"jms.alwaysSyncSend=true&jms.dispatchAsync=true&" +
|
||||
"jms.producerWindowSize=20971520&" +
|
||||
"jms.copyMessageOnSend=false&" +
|
||||
"initialReconnectDelay=100&maxReconnectDelay=30000&maxReconnectAttempts=0&" +
|
||||
"useExponentialBackOff=true";
|
||||
final ConnectionFactory cf = new ActiveMQConnectionFactory(url);
|
||||
|
||||
public static final String SUBSCRIPTION_NAME = "subscription";
|
||||
|
||||
private final int id;
|
||||
private final String conClientId;
|
||||
|
||||
private final Random lifetime;
|
||||
private final Random online;
|
||||
private final Random offline;
|
||||
|
||||
private final ClientType clientType;
|
||||
private final String selector;
|
||||
|
||||
private final ConcurrentLinkedQueue<Message> waitingList = new ConcurrentLinkedQueue<Message>();
|
||||
|
||||
public Client(int id, ClientType clientType, Random lifetime, Random online, Random offline) throws JMSException {
|
||||
super("Client" + id);
|
||||
setDaemon(true);
|
||||
|
||||
this.id = id;
|
||||
conClientId = "cli" + id;
|
||||
this.clientType = clientType;
|
||||
selector = "(COMMIT = true and RELEVANT = true) or " + clientType.selector;
|
||||
|
||||
this.lifetime = lifetime;
|
||||
this.online = online;
|
||||
this.offline = offline;
|
||||
|
||||
subscribe();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
long end = System.currentTimeMillis() + lifetime.next();
|
||||
try {
|
||||
boolean sleep = false;
|
||||
while (true) {
|
||||
long max = end - System.currentTimeMillis();
|
||||
if (max <= 0)
|
||||
break;
|
||||
|
||||
if (sleep) offline.sleepRandom();
|
||||
else sleep = true;
|
||||
|
||||
process(online.next());
|
||||
}
|
||||
|
||||
if (!ALLOW_SUBSCRIPTION_ABANDONMENT || random(1) > 0)
|
||||
unsubscribe();
|
||||
else {
|
||||
LOG.info("Client abandon the subscription. " + this);
|
||||
|
||||
// housekeeper should sweep these abandoned subscriptions
|
||||
houseKeeper.abandonedSubscriptions.add(conClientId);
|
||||
}
|
||||
}
|
||||
catch (Throwable e) {
|
||||
exit(toString() + " failed.", e);
|
||||
}
|
||||
|
||||
clientManager.removeClient(this);
|
||||
LOG.info(toString() + " DONE.");
|
||||
}
|
||||
|
||||
private void process(long millis) throws JMSException {
|
||||
long end = System.currentTimeMillis() + millis;
|
||||
long hardEnd = end + 2000; // wait to finish the transaction.
|
||||
boolean inTransaction = false;
|
||||
int transCount = 0;
|
||||
|
||||
LOG.info(toString() + " ONLINE.");
|
||||
Connection con = openConnection();
|
||||
Session sess = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||
MessageConsumer consumer = sess.createDurableSubscriber(topic, SUBSCRIPTION_NAME, selector, false);
|
||||
try {
|
||||
do {
|
||||
long max = end - System.currentTimeMillis();
|
||||
if (max <= 0) {
|
||||
if (!inTransaction)
|
||||
break;
|
||||
|
||||
max = hardEnd - System.currentTimeMillis();
|
||||
if (max <= 0)
|
||||
exit("" + this + " failed: Transaction is not finished.");
|
||||
}
|
||||
|
||||
Message message = consumer.receive(max);
|
||||
if (message == null)
|
||||
continue;
|
||||
|
||||
onClientMessage(message);
|
||||
|
||||
if (message.propertyExists("COMMIT")) {
|
||||
message.acknowledge();
|
||||
|
||||
LOG.info("Received Trans[id=" + message.getIntProperty("TRANS") + ", count=" + transCount + "] in " + this + ".");
|
||||
|
||||
inTransaction = false;
|
||||
transCount = 0;
|
||||
}
|
||||
else {
|
||||
inTransaction = true;
|
||||
transCount++;
|
||||
}
|
||||
} while (true);
|
||||
}
|
||||
finally {
|
||||
sess.close();
|
||||
con.close();
|
||||
|
||||
LOG.info(toString() + " OFFLINE.");
|
||||
|
||||
// Check if the messages are in the waiting
|
||||
// list for long time.
|
||||
Message topMessage = waitingList.peek();
|
||||
if (topMessage != null)
|
||||
checkDeliveryTime(topMessage);
|
||||
}
|
||||
}
|
||||
|
||||
public void onServerMessage(Message message) throws JMSException {
|
||||
if (Boolean.TRUE.equals(message.getObjectProperty("COMMIT"))) {
|
||||
if (Boolean.TRUE.equals(message.getObjectProperty("RELEVANT")))
|
||||
waitingList.add(message);
|
||||
}
|
||||
else {
|
||||
String messageType = message.getStringProperty("TYPE");
|
||||
if (clientType.isRelevant(messageType))
|
||||
waitingList.add(message);
|
||||
}
|
||||
}
|
||||
|
||||
public void onClientMessage(Message message) {
|
||||
Message serverMessage = waitingList.poll();
|
||||
try {
|
||||
if (serverMessage == null)
|
||||
exit("" + this + " failed: There is no next server message, but received: " + message);
|
||||
|
||||
Integer receivedId = (Integer) message.getObjectProperty("ID");
|
||||
Integer serverId = (Integer) serverMessage.getObjectProperty("ID");
|
||||
if (receivedId == null || serverId == null)
|
||||
exit("" + this + " failed: message ID not found.\r\n" +
|
||||
" received: " + message + "\r\n" +
|
||||
" server: " + serverMessage);
|
||||
|
||||
if (!serverId.equals(receivedId))
|
||||
exit("" + this + " failed: Received wrong message.\r\n" +
|
||||
" received: " + message + "\r\n" +
|
||||
" server: " + serverMessage);
|
||||
|
||||
checkDeliveryTime(message);
|
||||
}
|
||||
catch (Throwable e) {
|
||||
exit("" + this + ".onClientMessage failed.\r\n" +
|
||||
" received: " + message + "\r\n" +
|
||||
" server: " + serverMessage, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the message was not delivered fast enough.
|
||||
*/
|
||||
public void checkDeliveryTime(Message message) throws JMSException {
|
||||
long creation = message.getJMSTimestamp();
|
||||
long min = System.currentTimeMillis() - (offline.max + online.min);
|
||||
|
||||
if (min > creation) {
|
||||
SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss.SSS");
|
||||
exit("" + this + ".checkDeliveryTime failed. Message time: " + df.format(new Date(creation)) + ", min: " + df.format(new Date(min)) + "\r\n" + message);
|
||||
}
|
||||
}
|
||||
|
||||
private Connection openConnection() throws JMSException {
|
||||
Connection con = cf.createConnection();
|
||||
con.setClientID(conClientId);
|
||||
con.start();
|
||||
return con;
|
||||
}
|
||||
|
||||
private void subscribe() throws JMSException {
|
||||
Connection con = openConnection();
|
||||
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
session.createDurableSubscriber(topic, SUBSCRIPTION_NAME, selector, true);
|
||||
session.close();
|
||||
con.close();
|
||||
}
|
||||
|
||||
private void unsubscribe() throws JMSException {
|
||||
Connection con = openConnection();
|
||||
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
session.unsubscribe(SUBSCRIPTION_NAME);
|
||||
session.close();
|
||||
con.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Client[id=" + id + ", type=" + clientType + "]";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sweeps out not-used durable subscriptions.
|
||||
*/
|
||||
private final class HouseKeeper extends Thread {
|
||||
|
||||
private HouseKeeper() {
|
||||
super("HouseKeeper");
|
||||
setDaemon(true);
|
||||
}
|
||||
|
||||
public final CopyOnWriteArrayList<String> abandonedSubscriptions = new CopyOnWriteArrayList<String>();
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (true) {
|
||||
try {
|
||||
Thread.sleep(60 * 1000);
|
||||
sweep();
|
||||
}
|
||||
catch (InterruptedException ex) {
|
||||
break;
|
||||
}
|
||||
catch (Throwable e) {
|
||||
Exception log = new Exception("HouseKeeper failed.", e);
|
||||
log.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void sweep() throws Exception {
|
||||
LOG.info("Housekeeper sweeping.");
|
||||
|
||||
int closed = 0;
|
||||
ArrayList<String> sweeped = new ArrayList<String>();
|
||||
try {
|
||||
for (String clientId: abandonedSubscriptions) {
|
||||
sweeped.add(clientId);
|
||||
LOG.info("Sweeping out subscription of " + clientId + ".");
|
||||
broker.getAdminView().destroyDurableSubscriber(clientId, Client.SUBSCRIPTION_NAME);
|
||||
closed++;
|
||||
}
|
||||
}
|
||||
finally {
|
||||
abandonedSubscriptions.removeAll(sweeped);
|
||||
}
|
||||
|
||||
LOG.info("Housekeeper sweeped out " + closed + " subscriptions.");
|
||||
}
|
||||
}
|
||||
|
||||
public static int random(int max) {
|
||||
return (int) (Math.random() * (max + 1));
|
||||
}
|
||||
|
||||
public static int random(int min, int max) {
|
||||
return random(max - min) + min;
|
||||
}
|
||||
|
||||
public static void sleepRandom(int maxMillis) throws InterruptedException {
|
||||
Thread.sleep(random(maxMillis));
|
||||
}
|
||||
|
||||
public static void sleepRandom(int minMillis, int maxMillis) throws InterruptedException {
|
||||
Thread.sleep(random(minMillis, maxMillis));
|
||||
}
|
||||
|
||||
public static final class Random {
|
||||
|
||||
final int min;
|
||||
final int max;
|
||||
|
||||
Random(int min, int max) {
|
||||
this.min = min;
|
||||
this.max = max;
|
||||
}
|
||||
|
||||
public int next() {
|
||||
return random(min, max);
|
||||
}
|
||||
|
||||
public void sleepRandom() throws InterruptedException {
|
||||
DurableSubProcessTest.sleepRandom(min, max);
|
||||
}
|
||||
}
|
||||
|
||||
public static void exit(String message) {
|
||||
exit(message, null);
|
||||
}
|
||||
|
||||
public static void exit(String message, Throwable e) {
|
||||
Throwable log = new RuntimeException(message, e);
|
||||
log.printStackTrace();
|
||||
LOG.error(message, e);
|
||||
exceptions.add(e);
|
||||
fail(message);
|
||||
}
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
topic = (ActiveMQTopic) createDestination();
|
||||
startBroker();
|
||||
|
||||
clientManager = new ClientManager();
|
||||
server = new Server();
|
||||
houseKeeper = new HouseKeeper();
|
||||
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
|
||||
destroyBroker();
|
||||
}
|
||||
|
||||
private void startBroker() throws Exception {
|
||||
startBroker(true);
|
||||
}
|
||||
|
||||
private void startBroker(boolean deleteAllMessages) throws Exception {
|
||||
if (broker != null)
|
||||
return;
|
||||
|
||||
broker = BrokerFactory.createBroker("broker:(vm://localhost)");
|
||||
broker.setBrokerName(getName());
|
||||
broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
|
||||
|
||||
if (PERSISTENT_BROKER) {
|
||||
broker.setPersistent(true);
|
||||
KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
|
||||
persistenceAdapter.setDirectory(new File("activemq-data/" + getName()));
|
||||
broker.setPersistenceAdapter(persistenceAdapter);
|
||||
}
|
||||
else
|
||||
broker.setPersistent(false);
|
||||
|
||||
broker.addConnector("tcp://localhost:61656");
|
||||
|
||||
broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024);
|
||||
broker.getSystemUsage().getTempUsage().setLimit(256 * 1024 * 1024);
|
||||
broker.getSystemUsage().getStoreUsage().setLimit(256 * 1024 * 1024);
|
||||
|
||||
broker.start();
|
||||
}
|
||||
|
||||
private void destroyBroker() throws Exception {
|
||||
if (broker == null)
|
||||
return;
|
||||
|
||||
broker.stop();
|
||||
broker = null;
|
||||
}
|
||||
}
|
|
@ -466,7 +466,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
assertEquals("offline consumer got all", sent, listener.count);
|
||||
}
|
||||
|
||||
public void x_initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
|
||||
public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception {
|
||||
this.addCombinationValues("defaultPersistenceAdapter",
|
||||
new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC});
|
||||
}
|
||||
|
@ -639,7 +639,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
|
||||
int filtered = 0;
|
||||
for (int i = 0; i < 10; i++) {
|
||||
boolean filter = (i %2 == 0); //(int) (Math.random() * 2) >= 1;
|
||||
boolean filter = (int) (Math.random() * 2) >= 1;
|
||||
if (filter)
|
||||
filtered++;
|
||||
|
||||
|
@ -664,7 +664,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
producer = session.createProducer(null);
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
boolean filter = (i %2 == 0); //(int) (Math.random() * 2) >= 1;
|
||||
boolean filter = (int) (Math.random() * 2) >= 1;
|
||||
if (filter)
|
||||
filtered++;
|
||||
|
||||
|
@ -702,6 +702,198 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
|
|||
assertEquals(filtered, listener3.count);
|
||||
}
|
||||
|
||||
|
||||
public void testInterleavedOfflineSubscriptionCanConsumeAfterUnsub() throws Exception {
|
||||
// create offline subs 1
|
||||
Connection con = createConnection("offCli1");
|
||||
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
|
||||
session.close();
|
||||
con.close();
|
||||
|
||||
// create offline subs 2
|
||||
con = createConnection("offCli2");
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
session.createDurableSubscriber(topic, "SubsId", null, true);
|
||||
session.close();
|
||||
con.close();
|
||||
|
||||
|
||||
// send messages
|
||||
con = createConnection();
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = session.createProducer(null);
|
||||
|
||||
int sent = 0;
|
||||
for (int i = 0; i < 10; i++) {
|
||||
boolean filter = (int) (Math.random() * 2) >= 1;
|
||||
|
||||
sent++;
|
||||
|
||||
Message message = session.createMessage();
|
||||
message.setStringProperty("filter", filter ? "true" : "false");
|
||||
producer.send(topic, message);
|
||||
}
|
||||
|
||||
Thread.sleep(1 * 1000);
|
||||
|
||||
Connection con2 = createConnection("offCli1");
|
||||
Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
session2.unsubscribe("SubsId");
|
||||
session2.close();
|
||||
con2.close();
|
||||
|
||||
// consume all messages
|
||||
con = createConnection("offCli2");
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true);
|
||||
Listener listener = new Listener("SubsId");
|
||||
consumer.setMessageListener(listener);
|
||||
|
||||
Thread.sleep(3 * 1000);
|
||||
|
||||
session.close();
|
||||
con.close();
|
||||
|
||||
assertEquals("offline consumer got all", sent, listener.count);
|
||||
}
|
||||
|
||||
|
||||
public void testUnmatchedSubUnsubscribeDeletesAll() throws Exception {
|
||||
// create offline subs 1
|
||||
Connection con = createConnection("offCli1");
|
||||
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
|
||||
session.close();
|
||||
con.close();
|
||||
|
||||
// send messages
|
||||
con = createConnection();
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = session.createProducer(null);
|
||||
|
||||
int filtered = 0;
|
||||
for (int i = 0; i < 10; i++) {
|
||||
boolean filter = (i %2 == 0); //(int) (Math.random() * 2) >= 1;
|
||||
if (filter)
|
||||
filtered++;
|
||||
|
||||
Message message = session.createMessage();
|
||||
message.setStringProperty("filter", filter ? "true" : "false");
|
||||
producer.send(topic, message);
|
||||
}
|
||||
|
||||
LOG.info("sent: " + filtered);
|
||||
Thread.sleep(1 * 1000);
|
||||
session.close();
|
||||
con.close();
|
||||
|
||||
// test offline subs
|
||||
con = createConnection("offCli1");
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
session.unsubscribe("SubsId");
|
||||
session.close();
|
||||
con.close();
|
||||
|
||||
|
||||
con = createConnection("offCli1");
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true);
|
||||
Listener listener = new Listener();
|
||||
consumer.setMessageListener(listener);
|
||||
|
||||
Thread.sleep(3 * 1000);
|
||||
|
||||
session.close();
|
||||
con.close();
|
||||
|
||||
assertEquals(0, listener.count);
|
||||
}
|
||||
|
||||
|
||||
public void testAllConsumed() throws Exception {
|
||||
final String filter = "filter = 'true'";
|
||||
Connection con = createConnection("cli1");
|
||||
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
session.createDurableSubscriber(topic, "SubsId", filter, true);
|
||||
session.close();
|
||||
con.close();
|
||||
|
||||
con = createConnection("cli2");
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
session.createDurableSubscriber(topic, "SubsId", filter, true);
|
||||
session.close();
|
||||
con.close();
|
||||
|
||||
// send messages
|
||||
con = createConnection();
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = session.createProducer(null);
|
||||
|
||||
int sent = 0;
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Message message = session.createMessage();
|
||||
message.setStringProperty("filter", "true");
|
||||
producer.send(topic, message);
|
||||
sent++;
|
||||
}
|
||||
|
||||
LOG.info("sent: " + sent);
|
||||
Thread.sleep(1 * 1000);
|
||||
session.close();
|
||||
con.close();
|
||||
|
||||
con = createConnection("cli1");
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
|
||||
Listener listener = new Listener();
|
||||
consumer.setMessageListener(listener);
|
||||
Thread.sleep(3 * 1000);
|
||||
session.close();
|
||||
con.close();
|
||||
|
||||
assertEquals(sent, listener.count);
|
||||
|
||||
LOG.info("cli2 pull 2");
|
||||
con = createConnection("cli2");
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
|
||||
assertNotNull("got message", consumer.receive(2000));
|
||||
assertNotNull("got message", consumer.receive(2000));
|
||||
session.close();
|
||||
con.close();
|
||||
|
||||
|
||||
// send messages
|
||||
con = createConnection();
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
producer = session.createProducer(null);
|
||||
|
||||
sent = 0;
|
||||
for (int i = 0; i < 2; i++) {
|
||||
Message message = session.createMessage();
|
||||
message.setStringProperty("filter", i==1 ? "true" : "false");
|
||||
producer.send(topic, message);
|
||||
sent++;
|
||||
}
|
||||
LOG.info("sent: " + sent);
|
||||
Thread.sleep(1 * 1000);
|
||||
session.close();
|
||||
con.close();
|
||||
|
||||
LOG.info("cli1 again, should get 1 new ones");
|
||||
con = createConnection("cli1");
|
||||
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
|
||||
listener = new Listener();
|
||||
consumer.setMessageListener(listener);
|
||||
Thread.sleep(3 * 1000);
|
||||
session.close();
|
||||
con.close();
|
||||
|
||||
assertEquals(1, listener.count);
|
||||
}
|
||||
|
||||
public static class Listener implements MessageListener {
|
||||
int count = 0;
|
||||
String id = null;
|
||||
|
|
Loading…
Reference in New Issue