resolve AMQ-2123, deal with the topic dispatch case where a subscription arrives between store of message and dispatch of message

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@747951 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-02-25 23:03:10 +00:00
parent 3cbe388595
commit 8e6446ffcd
2 changed files with 119 additions and 9 deletions

View File

@ -164,8 +164,11 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
} }
} }
}else{ }else{
//no message held
removeMessage = true; if (ackContainer.isEmpty() || isUnreferencedBySubscribers(subscriberMessages, messageId)) {
// no message reference held
removeMessage = true;
}
} }
} }
}finally { }finally {
@ -174,6 +177,28 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
return removeMessage; return removeMessage;
} }
// verify that no subscriber has a reference to this message. In the case where the subscribers
// references are persisted but more than the persisted consumers get the message, the ack from the non
// persisted consumer would remove the message in error
//
// see: https://issues.apache.org/activemq/browse/AMQ-2123
private boolean isUnreferencedBySubscribers(
Map<String, TopicSubContainer> subscriberContainers, MessageId messageId) {
boolean isUnreferenced = true;
for (TopicSubContainer container: subscriberContainers.values()) {
if (!container.isEmpty()) {
for (Iterator i = container.iterator(); i.hasNext();) {
ConsumerMessageRef ref = (ConsumerMessageRef) i.next();
if (messageId.equals(ref.getMessageId())) {
isUnreferenced = false;
break;
}
}
}
}
return isUnreferenced;
}
public void acknowledge(ConnectionContext context, public void acknowledge(ConnectionContext context,
String clientId, String subscriptionName, MessageId messageId) throws IOException { String clientId, String subscriptionName, MessageId messageId) throws IOException {
acknowledgeReference(context, clientId, subscriptionName, messageId); acknowledgeReference(context, clientId, subscriptionName, messageId);

View File

@ -16,6 +16,13 @@
*/ */
package org.apache.activemq.bugs; package org.apache.activemq.bugs;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage; import javax.jms.BytesMessage;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
@ -28,6 +35,7 @@ import javax.jms.Topic;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -45,14 +53,91 @@ public class DurableConsumerTest extends TestCase {
protected String bindAddress="tcp://localhost:61616"; protected String bindAddress="tcp://localhost:61616";
protected byte[] payload = new byte[1024*32];
protected byte[] payload = new byte[1024*16];
protected ConnectionFactory factory; protected ConnectionFactory factory;
protected Vector<Exception> exceptions = new Vector<Exception>();
public void testConcurrentDurableConsumer() throws Exception {
factory = createConnectionFactory();
final String topicName = getName();
final int numMessages = 500;
int numConsumers = 20;
final CountDownLatch counsumerStarted = new CountDownLatch(0);
final AtomicInteger receivedCount = new AtomicInteger();
Runnable consumer = new Runnable() {
public void run() {
final String consumerName = Thread.currentThread().getName();
int acked = 0;
int received = 0;
try {
while (acked < numMessages/2) {
// take one message and close, ack on occasion
Connection consumerConnection = factory.createConnection();
((ActiveMQConnection)consumerConnection).setWatchTopicAdvisories(false);
consumerConnection.setClientID(consumerName);
Session consumerSession = consumerConnection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
Topic topic = consumerSession.createTopic(topicName);
consumerConnection.start();
MessageConsumer consumer = consumerSession
.createDurableSubscriber(topic, consumerName);
counsumerStarted.countDown();
Message msg = null;
do {
msg = consumer.receive(5000);
if (msg != null) {
receivedCount.incrementAndGet();
if (received++ % 2 == 0) {
msg.acknowledge();
acked++;
}
}
} while (msg == null);
consumerConnection.close();
}
assertTrue(received >= acked);
} catch (Exception e) {
e.printStackTrace();
exceptions.add(e);
}
}
};
ExecutorService executor = Executors.newCachedThreadPool();
for (int i=0; i<numConsumers ; i++) {
executor.execute(consumer);
}
assertTrue(counsumerStarted.await(30, TimeUnit.SECONDS));
Connection producerConnection = factory.createConnection();
((ActiveMQConnection)producerConnection).setWatchTopicAdvisories(false);
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = producerSession.createTopic(topicName);
MessageProducer producer = producerSession.createProducer(topic);
producerConnection.start();
for (int i =0; i < numMessages; i++) {
BytesMessage msg = producerSession.createBytesMessage();
msg.writeBytes(payload);
producer.send(msg);
if (i != 0 && i%100==0) {
LOG.info("Sent msg " + i);
}
}
Thread.sleep(2000);
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
assertTrue("got some messages: " + receivedCount.get(), receivedCount.get() > numMessages);
assertTrue(exceptions.isEmpty());
}
public void testConsumer() throws Exception{ public void testConsumer() throws Exception{
factory = createConnectionFactory(); factory = createConnectionFactory();
@ -108,8 +193,6 @@ public class DurableConsumerTest extends TestCase {
broker = createBroker(true); broker = createBroker(true);
} }
super.setUp(); super.setUp();
} }
@ -144,6 +227,8 @@ public class DurableConsumerTest extends TestCase {
answer.setDeleteAllMessagesOnStartup(deleteStore); answer.setDeleteAllMessagesOnStartup(deleteStore);
answer.addConnector(bindAddress); answer.addConnector(bindAddress);
answer.setUseShutdownHook(false); answer.setUseShutdownHook(false);
answer.setUseJmx(false);
answer.setAdvisorySupport(false);
} }
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {