https://issues.apache.org/jira/browse/AMQ-3190 - Durable Subscription - missing messages when selector matching sub resumes after broker restart

the next message id needs to be tracked such that unmatched messages are not ignored from the indexi after a restart. Tracking it as part of message add
in the ackPositions.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1074511 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-02-25 13:17:14 +00:00
parent 576230be65
commit 2d121f4092
4 changed files with 165 additions and 52 deletions

View File

@ -1259,7 +1259,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
session.sendAck(ack); session.sendAck(ack);
} else { } else {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(getConsumerId() + " tracking transacted redlivery of duplicate: " + md.getMessage()); LOG.debug(getConsumerId() + " tracking transacted redelivery of duplicate: " + md.getMessage());
} }
boolean needsPoisonAck = false; boolean needsPoisonAck = false;
synchronized (deliveredMessages) { synchronized (deliveredMessages) {

View File

@ -1618,6 +1618,13 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1); Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
} }
} }
} else {
// update based on ackPositions for unmatched, last entry is always the next
if (!rc.ackPositions.isEmpty(tx)) {
Entry<Long,HashSet<String>> last = rc.ackPositions.getLast(tx);
rc.orderIndex.nextMessageId =
Math.max(rc.orderIndex.nextMessageId, last.getKey());
}
} }
} }
@ -1648,6 +1655,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
} }
} }
final HashSet nextMessageIdMarker = new HashSet<String>();
// on a new message add, all existing subs are interested in this message // on a new message add, all existing subs are interested in this message
private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException { private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException {
HashSet hs = new HashSet<String>(); HashSet hs = new HashSet<String>();
@ -1656,6 +1664,8 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
hs.add(entry.getKey()); hs.add(entry.getKey());
} }
sd.ackPositions.put(tx, messageSequence, hs); sd.ackPositions.put(tx, messageSequence, hs);
// add empty next to keep track of nextMessage
sd.ackPositions.put(tx, messageSequence+1, nextMessageIdMarker);
} }
private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {

View File

@ -16,26 +16,16 @@
*/ */
package org.apache.activemq.usecases; package org.apache.activemq.usecases;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.junit.Ignore;
import org.junit.Test;
import java.io.File; import java.io.File;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.HashSet; import java.util.HashSet;
import java.util.Vector;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
import javax.jms.JMSException; import javax.jms.JMSException;
@ -43,25 +33,42 @@ import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class DurableSubProcessWithRestartTest { public class DurableSubProcessWithRestartTest {
private static final Logger LOG = LoggerFactory.getLogger(DurableSubProcessWithRestartTest.class);
public static final long RUNTIME = 5 * 60 * 1000; public static final long RUNTIME = 5 * 60 * 1000;
public static final int SERVER_SLEEP = 2 * 1000; // max public static final int SERVER_SLEEP = 2 * 1000; // max
public static final int CARGO_SIZE = 400; // max public static final int CARGO_SIZE = 400; // max
public static final int MAX_CLIENTS = 5; public static final int MAX_CLIENTS = 5;
public static final Random CLIENT_LIFETIME = new Random(30 * 1000, public static final Random CLIENT_LIFETIME = new Random(5 * 1000,
2 * 60 * 1000); 2 * 5 * 1000);
public static final Random CLIENT_ONLINE = new Random(2 * 1000, 15 * 1000); public static final Random CLIENT_ONLINE = new Random(2 * 1000, 2 * 1000);
public static final Random CLIENT_OFFLINE = new Random(1 * 1000, 20 * 1000); public static final Random CLIENT_OFFLINE = new Random(10 * 1000, 10 * 1000);
public static final Persistence PERSISTENT_ADAPTER = Persistence.KAHADB; public static final Persistence PERSISTENT_ADAPTER = Persistence.KAHADB;
public static final long BROKER_RESTART = 1 * 60 * 1000; public static final long BROKER_RESTART = 1 * 10 * 1000;
public static final boolean ALLOW_SUBSCRIPTION_ABANDONMENT = true; public static final boolean ALLOW_SUBSCRIPTION_ABANDONMENT = true;
public static final boolean CHECK_REDELIVERY = false; public static final boolean CHECK_REDELIVERY = true;
private BrokerService broker; private BrokerService broker;
private ActiveMQTopic topic; private ActiveMQTopic topic;
@ -73,8 +80,11 @@ public class DurableSubProcessWithRestartTest {
private final ReentrantReadWriteLock processLock = new ReentrantReadWriteLock( private final ReentrantReadWriteLock processLock = new ReentrantReadWriteLock(
true); true);
private int restartCount = 0; private int restartCount = 0;
static final Vector<Throwable> exceptions = new Vector<Throwable>();
@Ignore("Needs some more investigation") @Test // this is a nice test but it takes 5mins, may be handy in the future
// resulting bug https://issues.apache.org/jira/browse/AMQ-3190
@Ignore("covered by org.apache.activemq.usecases.DurableSubscriptionOfflineTest.testNoMissOnMatchingSubAfterRestart()") @Test
public void testProcess() { public void testProcess() {
try { try {
server.start(); server.start();
@ -105,11 +115,12 @@ public class DurableSubProcessWithRestartTest {
} }
processLock.writeLock().lock(); processLock.writeLock().lock();
System.out.println("DONE."); assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
LOG.info("DONE.");
} }
private void restartBroker() throws Exception { private void restartBroker() throws Exception {
System.out.println("Broker restart: waiting for components."); LOG.info("Broker restart: waiting for components.");
processLock.writeLock().lock(); processLock.writeLock().lock();
try { try {
@ -117,7 +128,7 @@ public class DurableSubProcessWithRestartTest {
startBroker(false); startBroker(false);
restartCount++; restartCount++;
System.out.println("Broker restarted. count: " + restartCount); LOG.info("Broker restarted. count: " + restartCount);
} finally { } finally {
processLock.writeLock().unlock(); processLock.writeLock().unlock();
} }
@ -133,7 +144,7 @@ public class DurableSubProcessWithRestartTest {
final class Server extends Thread { final class Server extends Thread {
final String url = "vm://" final String url = "vm://"
+ getName() + DurableSubProcessWithRestartTest.getName()
+ "?" + "?"
+ "jms.redeliveryPolicy.maximumRedeliveries=2&jms.redeliveryPolicy.initialRedeliveryDelay=500&" + "jms.redeliveryPolicy.maximumRedeliveries=2&jms.redeliveryPolicy.initialRedeliveryDelay=500&"
+ "jms.producerWindowSize=20971520&jms.prefetchPolicy.all=100&" + "jms.producerWindowSize=20971520&jms.prefetchPolicy.all=100&"
@ -182,12 +193,12 @@ public class DurableSubProcessWithRestartTest {
.randomClientType() : null; // sends this types .randomClientType() : null; // sends this types
int count = random(200); int count = random(200);
System.out.println("Sending Trans[id=" + trans + ", count=" LOG.info("Sending Trans[id=" + trans + ", count="
+ count + ", clientType=" + clientType + "]"); + count + ", clientType=" + clientType + "]");
Connection con = cf.createConnection(); Connection con = cf.createConnection();
Session sess = con Session sess = con
.createSession(true, Session.AUTO_ACKNOWLEDGE); .createSession(true, Session.SESSION_TRANSACTED);
MessageProducer prod = sess.createProducer(null); MessageProducer prod = sess.createProducer(null);
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
@ -216,7 +227,7 @@ public class DurableSubProcessWithRestartTest {
clientManager.onServerMessage(message); clientManager.onServerMessage(message);
sess.commit(); sess.commit();
System.out.println("Committed Trans[id=" + trans + ", count=" LOG.info("Committed Trans[id=" + trans + ", count="
+ count + ", clientType=" + clientType + "], ID=" + messageRover); + count + ", clientType=" + clientType + "], ID=" + messageRover);
sess.close(); sess.close();
@ -344,7 +355,7 @@ public class DurableSubProcessWithRestartTest {
} }
client.start(); client.start();
System.out.println(client.toString() + " created. " + this); LOG.info(client.toString() + " created. " + this);
} }
public void removeClient(Client client) { public void removeClient(Client client) {
@ -451,7 +462,7 @@ public class DurableSubProcessWithRestartTest {
if (!ALLOW_SUBSCRIPTION_ABANDONMENT || random(1) > 0) if (!ALLOW_SUBSCRIPTION_ABANDONMENT || random(1) > 0)
unsubscribe(); unsubscribe();
else { else {
System.out.println("Client abandon the subscription. " LOG.info("Client abandon the subscription. "
+ this); + this);
// housekeeper should sweep these abandoned subscriptions // housekeeper should sweep these abandoned subscriptions
@ -462,7 +473,7 @@ public class DurableSubProcessWithRestartTest {
} }
clientManager.removeClient(this); clientManager.removeClient(this);
System.out.println(toString() + " DONE."); LOG.info(toString() + " DONE.");
} }
private void process(long millis) throws JMSException { private void process(long millis) throws JMSException {
@ -471,7 +482,7 @@ public class DurableSubProcessWithRestartTest {
boolean inTransaction = false; boolean inTransaction = false;
int transCount = 0; int transCount = 0;
System.out.println(toString() + " ONLINE."); LOG.info(toString() + " ONLINE.");
Connection con = openConnection(); Connection con = openConnection();
Session sess = con.createSession(false, Session.CLIENT_ACKNOWLEDGE); Session sess = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = sess.createDurableSubscriber(topic, MessageConsumer consumer = sess.createDurableSubscriber(topic,
@ -498,7 +509,7 @@ public class DurableSubProcessWithRestartTest {
if (message.propertyExists("COMMIT")) { if (message.propertyExists("COMMIT")) {
message.acknowledge(); // CLIENT_ACKNOWLEDGE message.acknowledge(); // CLIENT_ACKNOWLEDGE
System.out.println("Received Trans[id=" LOG.info("Received Trans[id="
+ message.getIntProperty("TRANS") + ", count=" + message.getIntProperty("TRANS") + ", count="
+ transCount + "] in " + this + "."); + transCount + "] in " + this + ".");
@ -513,7 +524,7 @@ public class DurableSubProcessWithRestartTest {
sess.close(); sess.close();
con.close(); con.close();
System.out.println(toString() + " OFFLINE."); LOG.info(toString() + " OFFLINE.");
// Check if the messages are in the waiting // Check if the messages are in the waiting
// list for long time. // list for long time.
@ -539,7 +550,7 @@ public class DurableSubProcessWithRestartTest {
try { try {
Integer receivedId = (Integer) message.getObjectProperty("ID"); Integer receivedId = (Integer) message.getObjectProperty("ID");
if (processed != null && processed.contains(receivedId)) if (processed != null && processed.contains(receivedId))
System.out.println("! Message has been processed before. " LOG.info("! Message has been processed before. "
+ this + " message = " + message); + this + " message = " + message);
if (serverMessage == null) if (serverMessage == null)
@ -555,10 +566,14 @@ public class DurableSubProcessWithRestartTest {
+ " received: " + message + "\r\n" + " server: " + " received: " + message + "\r\n" + " server: "
+ serverMessage); + serverMessage);
if (!serverId.equals(receivedId)) if (!serverId.equals(receivedId)) {
exit("" + this + " failed: Received wrong message.\r\n" String detail = processed != null ?
Arrays.toString(processed.toArray()) + "\n"
: "";
exit(detail + this + " failed: Received wrong message.\r\n"
+ " received: " + message + "\r\n" + " server: " + " received: " + message + "\r\n" + " server: "
+ serverMessage); + serverMessage);
}
checkDeliveryTime(message); checkDeliveryTime(message);
@ -652,13 +667,13 @@ public class DurableSubProcessWithRestartTest {
} }
private void sweep() throws Exception { private void sweep() throws Exception {
System.out.println("Housekeeper sweeping."); LOG.info("Housekeeper sweeping.");
int closed = 0; int closed = 0;
ArrayList<String> sweeped = new ArrayList<String>(); ArrayList<String> sweeped = new ArrayList<String>();
try { try {
for (String clientId : abandonedSubscriptions) { for (String clientId : abandonedSubscriptions) {
System.out.println("Sweeping out subscription of " LOG.info("Sweeping out subscription of "
+ clientId + "."); + clientId + ".");
broker.getAdminView().destroyDurableSubscriber(clientId, broker.getAdminView().destroyDurableSubscriber(clientId,
Client.SUBSCRIPTION_NAME); Client.SUBSCRIPTION_NAME);
@ -666,12 +681,12 @@ public class DurableSubProcessWithRestartTest {
closed++; closed++;
} }
} catch (Exception ignored) { } catch (Exception ignored) {
System.out.println("Ex on destroy sub " + ignored); LOG.info("Ex on destroy sub " + ignored);
} finally { } finally {
abandonedSubscriptions.removeAll(sweeped); abandonedSubscriptions.removeAll(sweeped);
} }
System.out.println("Housekeeper sweeped out " + closed LOG.info("Housekeeper sweeped out " + closed
+ " subscriptions."); + " subscriptions.");
} }
} }
@ -717,12 +732,14 @@ public class DurableSubProcessWithRestartTest {
} }
public static void exit(String message, Throwable e) { public static void exit(String message, Throwable e) {
Throwable log = new RuntimeException(message, e); Throwable cause = new RuntimeException(message, e);
log.printStackTrace(); LOG.error(message, cause);
System.exit(0); exceptions.add(cause);
fail(cause.toString());
} }
protected void setUp() throws Exception { @Before
public void setUp() throws Exception {
topic = new ActiveMQTopic("TopicT"); topic = new ActiveMQTopic("TopicT");
startBroker(); startBroker();
@ -732,7 +749,8 @@ public class DurableSubProcessWithRestartTest {
} }
protected void tearDown() throws Exception { @After
public void tearDown() throws Exception {
destroyBroker(); destroyBroker();
} }
@ -750,6 +768,7 @@ public class DurableSubProcessWithRestartTest {
broker = BrokerFactory.createBroker("broker:(vm://" + getName() + ")"); broker = BrokerFactory.createBroker("broker:(vm://" + getName() + ")");
broker.setBrokerName(getName()); broker.setBrokerName(getName());
broker.setAdvisorySupport(false);
broker.setDeleteAllMessagesOnStartup(deleteAllMessages); broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
switch (PERSISTENT_ADAPTER) { switch (PERSISTENT_ADAPTER) {
@ -801,8 +820,8 @@ public class DurableSubProcessWithRestartTest {
broker.start(); broker.start();
} }
private String getName() { protected static String getName() {
return DurableSubProcessWithRestartTest.class.getName(); return "DurableSubProcessWithRestartTest";
} }
private static boolean delete(File path) { private static boolean delete(File path) {

View File

@ -16,6 +16,14 @@
*/ */
package org.apache.activemq.usecases; package org.apache.activemq.usecases;
import java.util.Vector;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.Test; import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerFactory;
@ -24,13 +32,8 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import javax.jms.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.Vector;
public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupport { public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupport {
@ -965,6 +968,87 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
assertEquals(1, listener.count); assertEquals(1, listener.count);
} }
// https://issues.apache.org/jira/browse/AMQ-3190
public void testNoMissOnMatchingSubAfterRestart() throws Exception {
final String filter = "filter = 'true'";
Connection con = createConnection("cli1");
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId", filter, true);
session.close();
con.close();
// send unmatched messages
con = createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(null);
int sent = 0;
// message for cli1 to keep it interested
Message message = session.createMessage();
message.setStringProperty("filter", "true");
message.setIntProperty("ID", 0);
producer.send(topic, message);
sent++;
for (int i = sent; i < 10; i++) {
message = session.createMessage();
message.setStringProperty("filter", "false");
message.setIntProperty("ID", i);
producer.send(topic, message);
sent++;
}
con.close();
LOG.info("sent: " + sent);
// new sub at id 10
con = createConnection("cli2");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createDurableSubscriber(topic, "SubsId", filter, true);
session.close();
con.close();
destroyBroker();
createBroker(false);
con = createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(null);
for (int i = sent; i < 30; i++) {
message = session.createMessage();
message.setStringProperty("filter", "true");
message.setIntProperty("ID", i);
producer.send(topic, message);
sent++;
}
con.close();
LOG.info("sent: " + sent);
// pick up the first of the next twenty messages
con = createConnection("cli2");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
Message m = consumer.receive(3000);
assertEquals("is message 10", 10, m.getIntProperty("ID"));
session.close();
con.close();
// pick up the first few messages for client1
con = createConnection("cli1");
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createDurableSubscriber(topic, "SubsId", filter, true);
m = consumer.receive(3000);
assertEquals("is message 0", 0, m.getIntProperty("ID"));
m = consumer.receive(3000);
assertEquals("is message 10", 10, m.getIntProperty("ID"));
session.close();
con.close();
}
public static class Listener implements MessageListener { public static class Listener implements MessageListener {
int count = 0; int count = 0;
String id = null; String id = null;