diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 7df3b30653..9954bee21b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -718,17 +718,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } } - // an ack for an unmatched message is stored as a negative sequence id - // if sub has been getting unmatched acks, we need to reset - protected Long resetForSelectors(SubscriptionInfo info, Long position) { - if (info.getSelector() != null) { - if (position < NOT_ACKED) { - position = NOT_ACKED; - } - } - return position; - } - public int getMessageCount(String clientId, String subscriptionName) throws IOException { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); @@ -737,12 +726,11 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { return pageFile.tx().execute(new Transaction.CallableClosure() { public Integer execute(Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(dest, tx); - Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); + LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); if (cursorPos == null) { // The subscription might not exist. return 0; } - cursorPos = resetForSelectors(info, cursorPos); int counter = 0; try { @@ -752,7 +740,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { selectorExpression = SelectorParser.parse(selector); } sd.orderIndex.resetCursorPosition(); - sd.orderIndex.setBatch(tx, extractSequenceId(cursorPos)); + sd.orderIndex.setBatch(tx, cursorPos); for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator .hasNext();) { Entry entry = iterator.next(); @@ -787,9 +775,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { pageFile.tx().execute(new Transaction.Closure() { public void execute(Transaction tx) throws Exception { StoredDestination sd = getStoredDestination(dest, tx); - Long cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); - cursorPos = resetForSelectors(info, cursorPos); - sd.orderIndex.setBatch(tx, extractSequenceId(cursorPos)); + LastAck cursorPos = sd.subscriptionAcks.get(tx, subscriptionKey); + sd.orderIndex.setBatch(tx, cursorPos); for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator .hasNext();) { Entry entry = iterator.next(); @@ -815,9 +802,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { sd.orderIndex.resetCursorPosition(); MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey); if (moc == null) { - Long pos = sd.subscriptionAcks.get(tx, subscriptionKey); - pos = resetForSelectors(info, pos); - sd.orderIndex.setBatch(tx, extractSequenceId(pos)); + LastAck pos = sd.subscriptionAcks.get(tx, subscriptionKey); + sd.orderIndex.setBatch(tx, pos); moc = sd.orderIndex.cursor; } else { sd.orderIndex.cursor.sync(moc); @@ -839,9 +825,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { if (entry != null) { MessageOrderCursor copy = sd.orderIndex.cursor.copy(); sd.subscriptionCursors.put(subscriptionKey, copy); - if (LOG.isDebugEnabled()) { - LOG.debug("updated moc: " + copy + ", recovered: " + counter); - } } } }); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 246c369324..dbdc0c8ab9 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -983,7 +983,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar // Skip adding the message to the index if this is a topic and there are // no subscriptions. - if (sd.subscriptions != null && sd.ackPositions.isEmpty(tx)) { + if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) { return; } @@ -995,6 +995,9 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar previous = sd.messageIdIndex.put(tx, command.getMessageId(), id); if (previous == null) { sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location)); + if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) { + addAckLocationForNewMessage(tx, sd, id); + } } else { // If the message ID as indexed, then the broker asked us to // store a DUP @@ -1018,13 +1021,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); } - protected Long extractSequenceId(Long prev) { - if (prev < NOT_ACKED) { - prev = Math.abs(prev) + UNMATCHED_SEQ; - } - return prev; - } - void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException { StoredDestination sd = getStoredDestination(command.getDestination(), tx); if (!command.hasSubscriptionKey()) { @@ -1046,26 +1042,13 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar // Make sure it's a valid message id... if (sequence != null) { String subscriptionKey = command.getSubscriptionKey(); - Long ackSequenceToStore = sequence; - if (command.getAck() == UNMATCHED) { - // store negative sequence to indicate that it was unmatched - ackSequenceToStore = new Long(UNMATCHED_SEQ - sequence); + if (command.getAck() != UNMATCHED) { + sd.orderIndex.get(tx, sequence); + byte priority = sd.orderIndex.lastGetPriority(); + sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority)); } - - Long prev = sd.subscriptionAcks.put(tx, subscriptionKey, ackSequenceToStore); - if (prev != null) { - if (ackSequenceToStore != sequence) { - // unmatched, need to add ack locations for the intermediate sequences - for (long matchedGapSequence = extractSequenceId(prev) + 1; matchedGapSequence < sequence; matchedGapSequence++) { - addAckLocation(tx, sd, matchedGapSequence, subscriptionKey); - } - } - // The following method handles deleting un-referenced messages. - removeAckLocation(tx, sd, subscriptionKey, extractSequenceId(prev)); - } - - // Add it to the new location set. - addAckLocation(tx, sd, sequence, subscriptionKey); + // The following method handles deleting un-referenced messages. + removeAckLocation(tx, sd, subscriptionKey, sequence); } } @@ -1127,20 +1110,17 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar long ackLocation=NOT_ACKED; if (!command.getRetroactive()) { ackLocation = sd.orderIndex.nextMessageId-1; + } else { + addAckLocationForRetroactiveSub(tx, sd, ackLocation, subscriptionKey); } - - sd.subscriptionAcks.put(tx, subscriptionKey, ackLocation); - addAckLocation(tx, sd, ackLocation, subscriptionKey); + sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation)); } else { // delete the sub... String subscriptionKey = command.getSubscriptionKey(); sd.subscriptions.remove(tx, subscriptionKey); - Long prev = sd.subscriptionAcks.remove(tx, subscriptionKey); - if( prev!=null ) { - removeAckLocation(tx, sd, subscriptionKey, extractSequenceId(prev)); - } + sd.subscriptionAcks.remove(tx, subscriptionKey); + removeAckLocationsForSub(tx, sd, subscriptionKey); } - } /** @@ -1318,7 +1298,64 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar LocationMarshaller.INSTANCE.writePayload(object.location, dataOut); } } - + + class LastAck { + long lastAckedSequence; + byte priority; + + public LastAck(LastAck source) { + this.lastAckedSequence = source.lastAckedSequence; + this.priority = source.priority; + } + + public LastAck() { + this.priority = MessageOrderIndex.HI; + } + + public LastAck(long ackLocation) { + this.lastAckedSequence = ackLocation; + this.priority = MessageOrderIndex.HI; + } + + public LastAck(long ackLocation, byte priority) { + this.lastAckedSequence = ackLocation; + this.priority = priority; + } + + public String toString() { + return "[" + lastAckedSequence + ":" + priority + "]"; + } + } + + protected class LastAckMarshaller implements Marshaller { + + public void writePayload(LastAck object, DataOutput dataOut) throws IOException { + dataOut.writeLong(object.lastAckedSequence); + dataOut.writeByte(object.priority); + } + + public LastAck readPayload(DataInput dataIn) throws IOException { + LastAck lastAcked = new LastAck(); + lastAcked.lastAckedSequence = dataIn.readLong(); + if (metadata.version >= 3) { + lastAcked.priority = dataIn.readByte(); + } + return lastAcked; + } + + public int getFixedSize() { + return 9; + } + + public LastAck deepCopy(LastAck source) { + return new LastAck(source); + } + + public boolean isDeepCopySupported() { + return true; + } + } + class StoredDestination { MessageOrderIndex orderIndex = new MessageOrderIndex(); @@ -1327,7 +1364,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar // These bits are only set for Topics BTreeIndex subscriptions; - BTreeIndex subscriptionAcks; + BTreeIndex subscriptionAcks; HashMap subscriptionCursors; BTreeIndex> ackPositions; } @@ -1342,7 +1379,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar if (dataIn.readBoolean()) { value.subscriptions = new BTreeIndex(pageFile, dataIn.readLong()); - value.subscriptionAcks = new BTreeIndex(pageFile, dataIn.readLong()); + value.subscriptionAcks = new BTreeIndex(pageFile, dataIn.readLong()); if (metadata.version >= 3) { value.ackPositions = new BTreeIndex>(pageFile, dataIn.readLong()); } else { @@ -1482,7 +1519,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar if (topic) { rc.subscriptions = new BTreeIndex(pageFile, tx.allocate()); - rc.subscriptionAcks = new BTreeIndex(pageFile, tx.allocate()); + rc.subscriptionAcks = new BTreeIndex(pageFile, tx.allocate()); rc.ackPositions = new BTreeIndex>(pageFile, tx.allocate()); } metadata.destinations.put(tx, key, rc); @@ -1510,7 +1547,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar rc.subscriptions.load(tx); rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE); - rc.subscriptionAcks.setValueMarshaller(LongMarshaller.INSTANCE); + rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller()); rc.subscriptionAcks.load(tx); rc.ackPositions.setKeyMarshaller(LongMarshaller.INSTANCE); @@ -1520,19 +1557,27 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar rc.subscriptionCursors = new HashMap(); if (metadata.version < 3) { - // on upgrade need to fill ackLocation - for (Iterator> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) { - Entry entry = iterator.next(); - addAckLocation(tx, rc, extractSequenceId(entry.getValue()), entry.getKey()); + + // on upgrade need to fill ackLocation with available messages past last ack + for (Iterator> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { + Entry entry = iterator.next(); + for (Iterator> orderIterator = + rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) { + Long sequence = orderIterator.next().getKey(); + addAckLocation(tx, rc, sequence, entry.getKey()); + } + // modify so it is upgraded + rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue()); } } if (rc.orderIndex.nextMessageId == 0) { // check for existing durable sub all acked out - pull next seq from acks as messages are gone - if (!rc.ackPositions.isEmpty(tx)) { - Long lastAckedMessageId = rc.ackPositions.getLast(tx).getKey(); - if (lastAckedMessageId != NOT_ACKED) { - rc.orderIndex.nextMessageId = lastAckedMessageId+1; + if (!rc.subscriptionAcks.isEmpty(tx)) { + for (Iterator> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) { + Entry entry = iterator.next(); + rc.orderIndex.nextMessageId = + Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1); } } } @@ -1546,11 +1591,6 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar return rc; } - /** - * @param sd - * @param messageSequence - * @param subscriptionKey - */ private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException { HashSet hs = sd.ackPositions.get(tx, messageSequence); if (hs == null) { @@ -1561,6 +1601,34 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar sd.ackPositions.put(tx, messageSequence, hs); } + // new sub is interested in potentially all existing messages + private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException { + for (Iterator>> iterator = sd.ackPositions.iterator(tx, messageSequence); iterator.hasNext(); ) { + Entry> entry = iterator.next(); + entry.getValue().add(subscriptionKey); + sd.ackPositions.put(tx, entry.getKey(), entry.getValue()); + } + } + + // on a new message add, all existing subs are interested in this message + private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException { + HashSet hs = new HashSet(); + for (Iterator> iterator = sd.subscriptionAcks.iterator(tx); iterator.hasNext();) { + Entry entry = iterator.next(); + hs.add(entry.getKey()); + } + sd.ackPositions.put(tx, messageSequence, hs); + } + + private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { + if (!sd.ackPositions.isEmpty(tx)) { + Long end = sd.ackPositions.getLast(tx).getKey(); + for (Long sequence = sd.ackPositions.getFirst(tx).getKey(); sequence <= end; sequence++) { + removeAckLocation(tx, sd, subscriptionKey, sequence); + } + } + } + /** * @param tx * @param sd @@ -1578,21 +1646,15 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar HashSet firstSet = sd.ackPositions.getFirst(tx).getValue(); sd.ackPositions.remove(tx, sequenceId); - // Did we just empty out the first set in the - // ordered list of ack locations? Then it's time to - // delete some messages. - if (hs == firstSet) { + // Find all the entries that need to get deleted. + ArrayList> deletes = new ArrayList>(); + sd.orderIndex.getDeleteList(tx, deletes, sequenceId); - // Find all the entries that need to get deleted. - ArrayList> deletes = new ArrayList>(); - sd.orderIndex.getDeleteList(tx, deletes, sequenceId); - - // Do the actual deletes. - for (Entry entry : deletes) { - sd.locationIndex.remove(tx, entry.getValue().location); - sd.messageIdIndex.remove(tx,entry.getValue().messageId); - sd.orderIndex.remove(tx,entry.getKey()); - } + // Do the actual deletes. + for (Entry entry : deletes) { + sd.locationIndex.remove(tx, entry.getValue().location); + sd.messageIdIndex.remove(tx, entry.getValue().messageId); + sd.orderIndex.remove(tx, entry.getKey()); } } } @@ -1905,7 +1967,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar this.databaseLockedWaitDelay = databaseLockedWaitDelay; } - + class MessageOrderCursor{ long defaultCursorPosition; long lowPriorityCursorPosition; @@ -1960,7 +2022,11 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } } - class MessageOrderIndex{ + class MessageOrderIndex { + static final byte HI = 9; + static final byte LO = 0; + static final byte DEF = 4; + long nextMessageId; BTreeIndex defaultPriorityIndex; BTreeIndex lowPriorityIndex; @@ -1969,8 +2035,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar Long lastDefaultKey; Long lastHighKey; Long lastLowKey; - - + byte lastGetPriority; + MessageKeys remove(Transaction tx, Long key) throws IOException { MessageKeys result = defaultPriorityIndex.remove(tx, key); if (result == null && highPriorityIndex!=null) { @@ -2072,6 +2138,29 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar } } } + + void setBatch(Transaction tx, LastAck last) throws IOException { + setBatch(tx, last.lastAckedSequence); + if (cursor.defaultCursorPosition == 0 + && cursor.highPriorityCursorPosition == 0 + && cursor.lowPriorityCursorPosition == 0) { + long next = last.lastAckedSequence + 1; + switch (last.priority) { + case DEF: + cursor.defaultCursorPosition = next; + cursor.highPriorityCursorPosition = next; + break; + case HI: + cursor.highPriorityCursorPosition = next; + break; + case LO: + cursor.lowPriorityCursorPosition = next; + cursor.defaultCursorPosition = next; + cursor.highPriorityCursorPosition = next; + break; + } + } + } void stoppedIterating() { if (lastDefaultKey!=null) { @@ -2116,7 +2205,12 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar result = highPriorityIndex.get(tx, key); if (result == null) { result = lowPriorityIndex.get(tx, key); + lastGetPriority = LO; + } else { + lastGetPriority = HI; } + } else { + lastGetPriority = DEF; } return result; } @@ -2138,7 +2232,11 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar Iterator> iterator(Transaction tx, MessageOrderCursor m) throws IOException{ return new MessageOrderIterator(tx,m); } - + + public byte lastGetPriority() { + return lastGetPriority; + } + class MessageOrderIterator implements Iterator>{ Iterator>currentIterator; final Iterator>highIterator; diff --git a/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java b/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java index ec9348d8d8..4be5897639 100644 --- a/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java @@ -23,6 +23,8 @@ import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.util.IOHelper; import javax.jms.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; @@ -33,11 +35,15 @@ import java.io.FileNotFoundException; */ public class KahaDBVersionTest extends TestCase { + static final Log LOG = LogFactory.getLog(KahaDBVersionTest.class); final static File VERSION_1_DB= new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion1"); final static File VERSION_2_DB= new File("src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion2"); + + BrokerService broker = null; + protected BrokerService createBroker(KahaDBPersistenceAdapter kaha) throws Exception { - BrokerService broker = new BrokerService(); + broker = new BrokerService(); broker.setUseJmx(false); broker.setPersistenceAdapter(kaha); broker.start(); @@ -45,6 +51,11 @@ public class KahaDBVersionTest extends TestCase { } + protected void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + } + } public void XtestCreateStore() throws Exception { KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter(); @@ -57,12 +68,12 @@ public class KahaDBVersionTest extends TestCase { Connection connection = cf.createConnection(); connection.setClientID("test"); connection.start(); - producerSomeMessages(connection); + producerSomeMessages(connection, 1000); connection.close(); broker.stop(); } - private void producerSomeMessages(Connection connection) throws Exception { + private void producerSomeMessages(Connection connection, int numToSend) throws Exception { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("test.topic"); Queue queue = session.createQueue("test.queue"); @@ -70,15 +81,17 @@ public class KahaDBVersionTest extends TestCase { consumer.close(); MessageProducer producer = session.createProducer(topic); producer.setPriority(9); - for (int i =0; i < 1000; i++) { + for (int i =0; i < numToSend; i++) { Message msg = session.createTextMessage("test message:"+i); producer.send(msg); } + LOG.info("sent " + numToSend +" to topic"); producer = session.createProducer(queue); - for (int i =0; i < 1000; i++) { + for (int i =0; i < numToSend; i++) { Message msg = session.createTextMessage("test message:"+i); producer.send(msg); } + LOG.info("sent " + numToSend +" to queue"); } public void testVersion1Conversion() throws Exception{ @@ -94,6 +107,7 @@ public class KahaDBVersionTest extends TestCase { File testDir = new File("target/activemq-data/kahadb/versionDB"); IOHelper.deleteFile(testDir); IOHelper.copyFile(existingStore, testDir); + final int numToSend = 1000; // on repeat store will be upgraded for (int repeats = 0; repeats < 3; repeats++) { @@ -111,21 +125,27 @@ public class KahaDBVersionTest extends TestCase { if (repeats > 0) { // upgraded store will be empty so generated some more messages - producerSomeMessages(connection); + producerSomeMessages(connection, numToSend); } MessageConsumer queueConsumer = session.createConsumer(queue); - for (int i = 0; i < 1000; i++) { + int count = 0; + for (int i = 0; i < (repeats == 0 ? 1000 : numToSend); i++) { TextMessage msg = (TextMessage) queueConsumer.receive(10000); + count++; //System.err.println(msg.getText()); assertNotNull(msg); } + LOG.info("Consumed " + count + " from queue"); + count = 0; MessageConsumer topicConsumer = session.createDurableSubscriber(topic, "test"); - for (int i = 0; i < 1000; i++) { + for (int i = 0; i < (repeats == 0 ? 1000 : numToSend); i++) { TextMessage msg = (TextMessage) topicConsumer.receive(10000); + count++; //System.err.println(msg.getText()); assertNotNull(msg); } + LOG.info("Consumed " + count + " from topic"); connection.close(); broker.stop(); diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java new file mode 100644 index 0000000000..26735eeaae --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubProcessTest.java @@ -0,0 +1,678 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.Test; + +import javax.jms.*; +import java.io.File; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashSet; +import java.util.Vector; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; + +// see https://issues.apache.org/activemq/browse/AMQ-2985 +// this demonstrated receiving old messages eventually along with validating order receipt +public class DurableSubProcessTest extends org.apache.activemq.TestSupport { + private static final Log LOG = LogFactory.getLog(DurableSubProcessTest.class); + public static final long RUNTIME = 4 * 60 * 1000; + + public static final int SERVER_SLEEP = 2 * 1000; // max + public static final int CARGO_SIZE = 10; // max + + public static final int MAX_CLIENTS = 7; + public static final Random CLIENT_LIFETIME = new Random(30 * 1000, 2 * 60 * 1000); + public static final Random CLIENT_ONLINE = new Random(2 * 1000, 15 * 1000); + public static final Random CLIENT_OFFLINE = new Random(1 * 1000, 20 * 1000); + + public static final boolean PERSISTENT_BROKER = true; + public static final boolean ALLOW_SUBSCRIPTION_ABANDONMENT = true; + + + private BrokerService broker; + private ActiveMQTopic topic; + + private ClientManager clientManager; + private Server server; + private HouseKeeper houseKeeper; + + static final Vector exceptions = new Vector(); + + @Test + public void testProcess() { + try { + server.start(); + clientManager.start(); + + if (ALLOW_SUBSCRIPTION_ABANDONMENT) + houseKeeper.start(); + + Thread.sleep(RUNTIME); + assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); + } + catch (Throwable e) { + exit("DurableSubProcessTest.testProcess failed.", e); + } + LOG.info("DONE."); + } + + /** + * Creates batch of messages in a transaction periodically. + * The last message in the transaction is always a special + * message what contains info about the whole transaction. + *

Notifies the clients about the created messages also. + */ + final class Server extends Thread { + + final String url = "vm://" + DurableSubProcessTest.this.getName() + "?" + + "jms.redeliveryPolicy.maximumRedeliveries=2&jms.redeliveryPolicy.initialRedeliveryDelay=500&" + + "jms.producerWindowSize=20971520&jms.prefetchPolicy.all=100&" + + "jms.copyMessageOnSend=false&jms.disableTimeStampsByDefault=false&" + + "jms.alwaysSyncSend=true&jms.dispatchAsync=false&" + + "jms.watchTopicAdvisories=false&" + + "waitForStart=200&create=false"; + final ConnectionFactory cf = new ActiveMQConnectionFactory(url); + + final Object sendMutex = new Object(); + final String[] cargos = new String[500]; + + int transRover = 0; + int messageRover = 0; + + public Server() { + super("Server"); + setDaemon(true); + } + + @Override + public void run() { + try { + while (true) { + DurableSubProcessTest.sleepRandom(SERVER_SLEEP); + send(); + } + } + catch (Throwable e) { + exit("Server.run failed", e); + } + } + + public void send() throws JMSException { + // do not create new clients now + // ToDo: Test this case later. + synchronized (sendMutex) { + int trans = ++transRover; + boolean relevantTrans = random(2) > 1; + ClientType clientType = relevantTrans ? ClientType.randomClientType() : null; // sends this types + int count = random(200); + + LOG.info("Sending Trans[id=" + trans + ", count=" + count + ", clientType=" + clientType + "]"); + + Connection con = cf.createConnection(); + Session sess = con.createSession(true, Session.AUTO_ACKNOWLEDGE); + MessageProducer prod = sess.createProducer(null); + + for (int i = 0; i < count; i++) { + Message message = sess.createMessage(); + message.setIntProperty("ID", ++messageRover); + String type = clientType != null ? clientType.randomMessageType() : ClientType.randomNonRelevantMessageType(); + message.setStringProperty("TYPE", type); + + if (CARGO_SIZE > 0) + message.setStringProperty("CARGO", getCargo(CARGO_SIZE)); + + prod.send(topic, message); + clientManager.onServerMessage(message); + } + + Message message = sess.createMessage(); + message.setIntProperty("ID", ++messageRover); + message.setIntProperty("TRANS", trans); + message.setBooleanProperty("COMMIT", true); + message.setBooleanProperty("RELEVANT", relevantTrans); + prod.send(topic, message); + clientManager.onServerMessage(message); + + sess.commit(); + sess.close(); + con.close(); + } + } + + private String getCargo(int length) { + if (length == 0) + return null; + + if (length < cargos.length) { + String result = cargos[length]; + if (result == null) { + result = getCargoImpl(length); + cargos[length] = result; + } + return result; + } + return getCargoImpl(length); + } + + private String getCargoImpl(int length) { + StringBuilder sb = new StringBuilder(length); + for (int i = length; --i >=0; ) { + sb.append('a'); + } + return sb.toString(); + } + } + + /** + * Clients listen on different messages in the topic. + * The 'TYPE' property helps the client to select the + * proper messages. + */ + private enum ClientType { + A ("a", "b", "c"), + B ("c", "d", "e"), + C ("d", "e", "f"), + D ("g", "h"); + + public final String[] messageTypes; + public final HashSet messageTypeSet; + public final String selector; + + ClientType(String... messageTypes) { + this.messageTypes = messageTypes; + messageTypeSet = new HashSet(Arrays.asList(messageTypes)); + + StringBuilder sb = new StringBuilder("TYPE in ("); + for (int i = 0; i < messageTypes.length; i++) { + if (i > 0) + sb.append(", "); + sb.append('\'').append(messageTypes[i]).append('\''); + } + sb.append(')'); + selector = sb.toString(); + } + + public static ClientType randomClientType() { + return values()[DurableSubProcessTest.random(values().length - 1)]; + } + + public final String randomMessageType() { + return messageTypes[DurableSubProcessTest.random(messageTypes.length - 1)]; + } + + public static String randomNonRelevantMessageType() { + return Integer.toString(DurableSubProcessTest.random(20)); + } + + public final boolean isRelevant(String messageType) { + return messageTypeSet.contains(messageType); + } + + @Override + public final String toString() { + return this.name() /*+ '[' + selector + ']'*/; + } + } + + /** + * Creates new cliens. + */ + private final class ClientManager extends Thread { + + private int clientRover = 0; + + private final CopyOnWriteArrayList clients = new CopyOnWriteArrayList(); + + public ClientManager() { + super("ClientManager"); + setDaemon(true); + } + + @Override + public void run() { + try { + while (true) { + if (clients.size() < MAX_CLIENTS) + createNewClient(); + + int size = clients.size(); + sleepRandom(size * 3 * 1000, size * 6 * 1000); + } + } + catch (Throwable e) { + exit("ClientManager.run failed.", e); + } + } + + private void createNewClient() throws JMSException { + ClientType type = ClientType.randomClientType(); + + Client client; + synchronized (server.sendMutex) { + client = new Client(++clientRover, type, CLIENT_LIFETIME, CLIENT_ONLINE, CLIENT_OFFLINE); + clients.add(client); + } + client.start(); + + LOG.info(client.toString() + " created. " + this); + } + + public void removeClient(Client client) { + clients.remove(client); + } + + public void onServerMessage(Message message) throws JMSException { + for (Client client: clients) { + client.onServerMessage(message); + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("ClientManager[count="); + sb.append(clients.size()); + sb.append(", clients="); + boolean sep = false; + for (Client client: clients) { + if (sep) sb.append(", "); + else sep = true; + sb.append(client.toString()); + } + sb.append(']'); + return sb.toString(); + } + } + + /** + * Consumes massages from a durable subscription. + * Goes online/offline periodically. Checks the incoming messages + * against the sent messages of the server. + */ + private final class Client extends Thread { + + String url = "failover:(tcp://localhost:61656?wireFormat.maxInactivityDuration=0)?" + + "jms.watchTopicAdvisories=false&" + + "jms.alwaysSyncSend=true&jms.dispatchAsync=true&" + + "jms.producerWindowSize=20971520&" + + "jms.copyMessageOnSend=false&" + + "initialReconnectDelay=100&maxReconnectDelay=30000&maxReconnectAttempts=0&" + + "useExponentialBackOff=true"; + final ConnectionFactory cf = new ActiveMQConnectionFactory(url); + + public static final String SUBSCRIPTION_NAME = "subscription"; + + private final int id; + private final String conClientId; + + private final Random lifetime; + private final Random online; + private final Random offline; + + private final ClientType clientType; + private final String selector; + + private final ConcurrentLinkedQueue waitingList = new ConcurrentLinkedQueue(); + + public Client(int id, ClientType clientType, Random lifetime, Random online, Random offline) throws JMSException { + super("Client" + id); + setDaemon(true); + + this.id = id; + conClientId = "cli" + id; + this.clientType = clientType; + selector = "(COMMIT = true and RELEVANT = true) or " + clientType.selector; + + this.lifetime = lifetime; + this.online = online; + this.offline = offline; + + subscribe(); + } + + @Override + public void run() { + long end = System.currentTimeMillis() + lifetime.next(); + try { + boolean sleep = false; + while (true) { + long max = end - System.currentTimeMillis(); + if (max <= 0) + break; + + if (sleep) offline.sleepRandom(); + else sleep = true; + + process(online.next()); + } + + if (!ALLOW_SUBSCRIPTION_ABANDONMENT || random(1) > 0) + unsubscribe(); + else { + LOG.info("Client abandon the subscription. " + this); + + // housekeeper should sweep these abandoned subscriptions + houseKeeper.abandonedSubscriptions.add(conClientId); + } + } + catch (Throwable e) { + exit(toString() + " failed.", e); + } + + clientManager.removeClient(this); + LOG.info(toString() + " DONE."); + } + + private void process(long millis) throws JMSException { + long end = System.currentTimeMillis() + millis; + long hardEnd = end + 2000; // wait to finish the transaction. + boolean inTransaction = false; + int transCount = 0; + + LOG.info(toString() + " ONLINE."); + Connection con = openConnection(); + Session sess = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer = sess.createDurableSubscriber(topic, SUBSCRIPTION_NAME, selector, false); + try { + do { + long max = end - System.currentTimeMillis(); + if (max <= 0) { + if (!inTransaction) + break; + + max = hardEnd - System.currentTimeMillis(); + if (max <= 0) + exit("" + this + " failed: Transaction is not finished."); + } + + Message message = consumer.receive(max); + if (message == null) + continue; + + onClientMessage(message); + + if (message.propertyExists("COMMIT")) { + message.acknowledge(); + + LOG.info("Received Trans[id=" + message.getIntProperty("TRANS") + ", count=" + transCount + "] in " + this + "."); + + inTransaction = false; + transCount = 0; + } + else { + inTransaction = true; + transCount++; + } + } while (true); + } + finally { + sess.close(); + con.close(); + + LOG.info(toString() + " OFFLINE."); + + // Check if the messages are in the waiting + // list for long time. + Message topMessage = waitingList.peek(); + if (topMessage != null) + checkDeliveryTime(topMessage); + } + } + + public void onServerMessage(Message message) throws JMSException { + if (Boolean.TRUE.equals(message.getObjectProperty("COMMIT"))) { + if (Boolean.TRUE.equals(message.getObjectProperty("RELEVANT"))) + waitingList.add(message); + } + else { + String messageType = message.getStringProperty("TYPE"); + if (clientType.isRelevant(messageType)) + waitingList.add(message); + } + } + + public void onClientMessage(Message message) { + Message serverMessage = waitingList.poll(); + try { + if (serverMessage == null) + exit("" + this + " failed: There is no next server message, but received: " + message); + + Integer receivedId = (Integer) message.getObjectProperty("ID"); + Integer serverId = (Integer) serverMessage.getObjectProperty("ID"); + if (receivedId == null || serverId == null) + exit("" + this + " failed: message ID not found.\r\n" + + " received: " + message + "\r\n" + + " server: " + serverMessage); + + if (!serverId.equals(receivedId)) + exit("" + this + " failed: Received wrong message.\r\n" + + " received: " + message + "\r\n" + + " server: " + serverMessage); + + checkDeliveryTime(message); + } + catch (Throwable e) { + exit("" + this + ".onClientMessage failed.\r\n" + + " received: " + message + "\r\n" + + " server: " + serverMessage, e); + } + } + + /** + * Checks if the message was not delivered fast enough. + */ + public void checkDeliveryTime(Message message) throws JMSException { + long creation = message.getJMSTimestamp(); + long min = System.currentTimeMillis() - (offline.max + online.min); + + if (min > creation) { + SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss.SSS"); + exit("" + this + ".checkDeliveryTime failed. Message time: " + df.format(new Date(creation)) + ", min: " + df.format(new Date(min)) + "\r\n" + message); + } + } + + private Connection openConnection() throws JMSException { + Connection con = cf.createConnection(); + con.setClientID(conClientId); + con.start(); + return con; + } + + private void subscribe() throws JMSException { + Connection con = openConnection(); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, SUBSCRIPTION_NAME, selector, true); + session.close(); + con.close(); + } + + private void unsubscribe() throws JMSException { + Connection con = openConnection(); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.unsubscribe(SUBSCRIPTION_NAME); + session.close(); + con.close(); + } + + @Override + public String toString() { + return "Client[id=" + id + ", type=" + clientType + "]"; + } + } + + /** + * Sweeps out not-used durable subscriptions. + */ + private final class HouseKeeper extends Thread { + + private HouseKeeper() { + super("HouseKeeper"); + setDaemon(true); + } + + public final CopyOnWriteArrayList abandonedSubscriptions = new CopyOnWriteArrayList(); + + @Override + public void run() { + while (true) { + try { + Thread.sleep(60 * 1000); + sweep(); + } + catch (InterruptedException ex) { + break; + } + catch (Throwable e) { + Exception log = new Exception("HouseKeeper failed.", e); + log.printStackTrace(); + } + } + } + + private void sweep() throws Exception { + LOG.info("Housekeeper sweeping."); + + int closed = 0; + ArrayList sweeped = new ArrayList(); + try { + for (String clientId: abandonedSubscriptions) { + sweeped.add(clientId); + LOG.info("Sweeping out subscription of " + clientId + "."); + broker.getAdminView().destroyDurableSubscriber(clientId, Client.SUBSCRIPTION_NAME); + closed++; + } + } + finally { + abandonedSubscriptions.removeAll(sweeped); + } + + LOG.info("Housekeeper sweeped out " + closed + " subscriptions."); + } + } + + public static int random(int max) { + return (int) (Math.random() * (max + 1)); + } + + public static int random(int min, int max) { + return random(max - min) + min; + } + + public static void sleepRandom(int maxMillis) throws InterruptedException { + Thread.sleep(random(maxMillis)); + } + + public static void sleepRandom(int minMillis, int maxMillis) throws InterruptedException { + Thread.sleep(random(minMillis, maxMillis)); + } + + public static final class Random { + + final int min; + final int max; + + Random(int min, int max) { + this.min = min; + this.max = max; + } + + public int next() { + return random(min, max); + } + + public void sleepRandom() throws InterruptedException { + DurableSubProcessTest.sleepRandom(min, max); + } + } + + public static void exit(String message) { + exit(message, null); + } + + public static void exit(String message, Throwable e) { + Throwable log = new RuntimeException(message, e); + log.printStackTrace(); + LOG.error(message, e); + exceptions.add(e); + fail(message); + } + + protected void setUp() throws Exception { + topic = (ActiveMQTopic) createDestination(); + startBroker(); + + clientManager = new ClientManager(); + server = new Server(); + houseKeeper = new HouseKeeper(); + + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + + destroyBroker(); + } + + private void startBroker() throws Exception { + startBroker(true); + } + + private void startBroker(boolean deleteAllMessages) throws Exception { + if (broker != null) + return; + + broker = BrokerFactory.createBroker("broker:(vm://localhost)"); + broker.setBrokerName(getName()); + broker.setDeleteAllMessagesOnStartup(deleteAllMessages); + + if (PERSISTENT_BROKER) { + broker.setPersistent(true); + KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter(); + persistenceAdapter.setDirectory(new File("activemq-data/" + getName())); + broker.setPersistenceAdapter(persistenceAdapter); + } + else + broker.setPersistent(false); + + broker.addConnector("tcp://localhost:61656"); + + broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024); + broker.getSystemUsage().getTempUsage().setLimit(256 * 1024 * 1024); + broker.getSystemUsage().getStoreUsage().setLimit(256 * 1024 * 1024); + + broker.start(); + } + + private void destroyBroker() throws Exception { + if (broker == null) + return; + + broker.stop(); + broker = null; + } +} \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java index 360ceb2475..1d39dc9546 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java @@ -466,7 +466,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp assertEquals("offline consumer got all", sent, listener.count); } - public void x_initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception { + public void initCombosForTestMixOfOnLineAndOfflineSubsGetAllMatched() throws Exception { this.addCombinationValues("defaultPersistenceAdapter", new Object[]{ PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC}); } @@ -639,7 +639,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp int filtered = 0; for (int i = 0; i < 10; i++) { - boolean filter = (i %2 == 0); //(int) (Math.random() * 2) >= 1; + boolean filter = (int) (Math.random() * 2) >= 1; if (filter) filtered++; @@ -664,7 +664,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp producer = session.createProducer(null); for (int i = 0; i < 10; i++) { - boolean filter = (i %2 == 0); //(int) (Math.random() * 2) >= 1; + boolean filter = (int) (Math.random() * 2) >= 1; if (filter) filtered++; @@ -702,6 +702,198 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp assertEquals(filtered, listener3.count); } + + public void testInterleavedOfflineSubscriptionCanConsumeAfterUnsub() throws Exception { + // create offline subs 1 + Connection con = createConnection("offCli1"); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); + session.close(); + con.close(); + + // create offline subs 2 + con = createConnection("offCli2"); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, "SubsId", null, true); + session.close(); + con.close(); + + + // send messages + con = createConnection(); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(null); + + int sent = 0; + for (int i = 0; i < 10; i++) { + boolean filter = (int) (Math.random() * 2) >= 1; + + sent++; + + Message message = session.createMessage(); + message.setStringProperty("filter", filter ? "true" : "false"); + producer.send(topic, message); + } + + Thread.sleep(1 * 1000); + + Connection con2 = createConnection("offCli1"); + Session session2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE); + session2.unsubscribe("SubsId"); + session2.close(); + con2.close(); + + // consume all messages + con = createConnection("offCli2"); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", null, true); + Listener listener = new Listener("SubsId"); + consumer.setMessageListener(listener); + + Thread.sleep(3 * 1000); + + session.close(); + con.close(); + + assertEquals("offline consumer got all", sent, listener.count); + } + + + public void testUnmatchedSubUnsubscribeDeletesAll() throws Exception { + // create offline subs 1 + Connection con = createConnection("offCli1"); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); + session.close(); + con.close(); + + // send messages + con = createConnection(); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(null); + + int filtered = 0; + for (int i = 0; i < 10; i++) { + boolean filter = (i %2 == 0); //(int) (Math.random() * 2) >= 1; + if (filter) + filtered++; + + Message message = session.createMessage(); + message.setStringProperty("filter", filter ? "true" : "false"); + producer.send(topic, message); + } + + LOG.info("sent: " + filtered); + Thread.sleep(1 * 1000); + session.close(); + con.close(); + + // test offline subs + con = createConnection("offCli1"); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.unsubscribe("SubsId"); + session.close(); + con.close(); + + + con = createConnection("offCli1"); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", "filter = 'true'", true); + Listener listener = new Listener(); + consumer.setMessageListener(listener); + + Thread.sleep(3 * 1000); + + session.close(); + con.close(); + + assertEquals(0, listener.count); + } + + + public void testAllConsumed() throws Exception { + final String filter = "filter = 'true'"; + Connection con = createConnection("cli1"); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, "SubsId", filter, true); + session.close(); + con.close(); + + con = createConnection("cli2"); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createDurableSubscriber(topic, "SubsId", filter, true); + session.close(); + con.close(); + + // send messages + con = createConnection(); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(null); + + int sent = 0; + for (int i = 0; i < 10; i++) { + Message message = session.createMessage(); + message.setStringProperty("filter", "true"); + producer.send(topic, message); + sent++; + } + + LOG.info("sent: " + sent); + Thread.sleep(1 * 1000); + session.close(); + con.close(); + + con = createConnection("cli1"); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true); + Listener listener = new Listener(); + consumer.setMessageListener(listener); + Thread.sleep(3 * 1000); + session.close(); + con.close(); + + assertEquals(sent, listener.count); + + LOG.info("cli2 pull 2"); + con = createConnection("cli2"); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumer = session.createDurableSubscriber(topic, "SubsId", filter, true); + assertNotNull("got message", consumer.receive(2000)); + assertNotNull("got message", consumer.receive(2000)); + session.close(); + con.close(); + + + // send messages + con = createConnection(); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = session.createProducer(null); + + sent = 0; + for (int i = 0; i < 2; i++) { + Message message = session.createMessage(); + message.setStringProperty("filter", i==1 ? "true" : "false"); + producer.send(topic, message); + sent++; + } + LOG.info("sent: " + sent); + Thread.sleep(1 * 1000); + session.close(); + con.close(); + + LOG.info("cli1 again, should get 1 new ones"); + con = createConnection("cli1"); + session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumer = session.createDurableSubscriber(topic, "SubsId", filter, true); + listener = new Listener(); + consumer.setMessageListener(listener); + Thread.sleep(3 * 1000); + session.close(); + con.close(); + + assertEquals(1, listener.count); + } + public static class Listener implements MessageListener { int count = 0; String id = null;