git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@693915 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-09-10 18:01:46 +00:00
parent 8d03599be6
commit 9ce91fa625
6 changed files with 501 additions and 36 deletions

View File

@ -40,6 +40,7 @@ import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.management.JMSConsumerStatsImpl;
import org.apache.activemq.management.StatsCapable;
@ -607,14 +608,13 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
MessageAck ack = null;
if (deliveryingAcknowledgements.compareAndSet(false, true)) {
if (this.optimizeAcknowledge) {
synchronized(deliveredMessages) {
if (!deliveredMessages.isEmpty()) {
MessageDispatch md = deliveredMessages.getFirst();
ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size());
deliveredMessages.clear();
ackCounter = 0;
}
}
synchronized(deliveredMessages) {
ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack != null) {
deliveredMessages.clear();
ackCounter = 0;
}
}
}
if (ack != null) {
final MessageAck ackToSend = ack;
@ -756,17 +756,21 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
ackCounter++;
if (ackCounter >= (info
.getCurrentPrefetchSize() * .65)) {
MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
session.sendAck(ack);
ackCounter = 0;
deliveredMessages.clear();
MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack != null) {
deliveredMessages.clear();
ackCounter = 0;
session.sendAck(ack);
}
}
deliveryingAcknowledgements.set(false);
}
} else {
MessageAck ack = new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
session.sendAck(ack);
deliveredMessages.clear();
MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack!=null) {
deliveredMessages.clear();
session.sendAck(ack);
}
}
}
}
@ -781,6 +785,25 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
}
/**
* Creates a MessageAck for all messages contained in deliveredMessages.
* Caller should hold the lock for deliveredMessages.
*
* @param type Ack-Type (i.e. MessageAck.STANDARD_ACK_TYPE)
* @return <code>null</code> if nothing to ack.
*/
private MessageAck makeAckForAllDeliveredMessages(byte type) {
synchronized (deliveredMessages) {
if (deliveredMessages.isEmpty())
return null;
MessageDispatch md = deliveredMessages.getFirst();
MessageAck ack = new MessageAck(md, type, deliveredMessages.size());
ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
return ack;
}
}
private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
// Don't acknowledge now, but we may need to let the broker know the
@ -814,6 +837,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
deliveredCounter++;
if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter - additionalWindowSize)) {
MessageAck ack = new MessageAck(md, ackType, deliveredCounter);
ack.setFirstMessageId(deliveredMessages.getLast().getMessage().getMessageId());
ack.setTransactionId(session.getTransactionContext().getTransactionId());
session.sendAck(ack);
additionalWindowSize = deliveredCounter;
@ -834,13 +858,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
*/
public void acknowledge() throws JMSException {
synchronized(deliveredMessages) {
if (deliveredMessages.isEmpty()) {
return;
}
// Acknowledge the last message.
MessageDispatch lastMd = deliveredMessages.get(0);
MessageAck ack = new MessageAck(lastMd, MessageAck.STANDARD_ACK_TYPE, deliveredMessages.size());
// Acknowledge all messages so far.
MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack == null)
return; // no msgs
if (session.isTransacted()) {
session.doStartTransaction();
ack.setTransactionId(session.getTransactionContext().getTransactionId());
@ -897,6 +919,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if (lastMd.getMessage().getRedeliveryCounter() > 0) {
redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
}
MessageId firstMsgId = deliveredMessages.getLast().getMessage().getMessageId();
for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) {
MessageDispatch md = (MessageDispatch)iter.next();
@ -910,6 +933,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
// Acknowledge the last message.
MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
ack.setFirstMessageId(firstMsgId);
session.sendAck(ack,true);
// ensure we don't filter this as a duplicate
session.connection.rollbackDuplicate(this, lastMd.getMessage());
@ -919,6 +943,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} else {
MessageAck ack = new MessageAck(lastMd, MessageAck.REDELIVERED_ACK_TYPE, deliveredMessages.size());
ack.setFirstMessageId(firstMsgId);
session.sendAck(ack,true);
// stop the delivery of messages.

View File

@ -180,9 +180,12 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
Destination destination = null;
synchronized(dispatchLock) {
if (ack.isStandardAck()) {
// First check if the ack matches the dispatched. When using failover this might
// not be the case. We don't ever want to ack the wrong messages.
assertAckMatchesDispatched(ack);
// Acknowledge all dispatched messages up till the message id of
// the
// acknowledgment.
// the acknowledgment.
int index = 0;
boolean inAckRange = false;
List<MessageReference> removeList = new ArrayList<MessageReference>();
@ -263,11 +266,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
// this only happens after a reconnect - get an ack which is not
// valid
if (!callDispatchMatched) {
if (LOG.isDebugEnabled()) {
LOG
.debug("Could not correlate acknowledgment with dispatched message: "
+ ack);
}
LOG.error("Could not correlate acknowledgment with dispatched message: "
+ ack);
}
} else if (ack.isIndividualAck()) {
// Message was delivered and acknowledge - but only delete the
@ -409,6 +409,45 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
}
}
/**
* Checks an ack versus the contents of the dispatched list.
*
* @param ack
* @param firstAckedMsg
* @param lastAckedMsg
* @throws JMSException if it does not match
*/
protected void assertAckMatchesDispatched(MessageAck ack)
throws JMSException {
MessageId firstAckedMsg = ack.getFirstMessageId();
MessageId lastAckedMsg = ack.getLastMessageId();
int checkCount = 0;
boolean checkFoundStart = false;
boolean checkFoundEnd = false;
for (MessageReference node : dispatched) {
if (!checkFoundStart && firstAckedMsg != null && firstAckedMsg.equals(node.getMessageId())) {
checkFoundStart = true;
}
if (checkFoundStart || firstAckedMsg == null)
checkCount++;
if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) {
checkFoundEnd = true;
break;
}
}
if (!checkFoundStart && firstAckedMsg != null)
throw new JMSException("Unmatched acknowledege: Could not find Message-ID "+firstAckedMsg+" in dispatched-list (start of ack)");
if (!checkFoundEnd && lastAckedMsg != null)
throw new JMSException("Unmatched acknowledege: Could not find Message-ID "+firstAckedMsg+" in dispatched-list (end of ack)");
if (ack.getMessageCount() != checkCount) {
throw new JMSException("Unmatched acknowledege: Expected message count ("+ack.getMessageCount()+
") differs from count in dispatched-list ("+checkCount+")");
}
}
/**
* @param context
* @param node
@ -429,7 +468,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
* @return
*/
public boolean isFull() {
return dispatched.size() - prefetchExtension >= info.getPrefetchSize();
return isSlave() || dispatched.size() - prefetchExtension >= info.getPrefetchSize();
}
/**

View File

@ -19,6 +19,7 @@ package org.apache.activemq.perf;
import java.io.File;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.apache.kahadb.store.KahaDBPersistenceAdaptor;
/**
* @version $Revision: 1.3 $
@ -29,9 +30,14 @@ public class AMQStoreDurableTopicTest extends SimpleDurableTopicTest {
File dataFileDir = new File("target/test-amq-data/perfTest/amqdb");
dataFileDir.mkdirs();
answer.setDeleteAllMessagesOnStartup(true);
AMQPersistenceAdapter adaptor = new AMQPersistenceAdapter();
adaptor.setArchiveDataLogs(true);
//AMQPersistenceAdapter adaptor = new AMQPersistenceAdapter();
//adaptor.setArchiveDataLogs(true);
//adaptor.setMaxFileLength(1024 * 64);
KahaDBPersistenceAdaptor adaptor = new KahaDBPersistenceAdaptor();
//adaptor.setDirectory(dataFileDir);
answer.setDataDirectoryFile(dataFileDir);
answer.setPersistenceAdapter(adaptor);
answer.addConnector(uri);

View File

@ -31,8 +31,8 @@ public class SimpleDurableTopicTest extends SimpleTopicTest {
protected void setUp() throws Exception {
numberOfDestinations=1;
numberOfConsumers = 4;
numberofProducers = 1;
numberOfConsumers = 2;
numberofProducers = 2;
sampleCount=1000;
playloadSize = 1024;
super.setUp();
@ -41,6 +41,8 @@ public class SimpleDurableTopicTest extends SimpleTopicTest {
protected void configureBroker(BrokerService answer,String uri) throws Exception {
AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
persistenceFactory.setMaxFileLength(1024*16);
persistenceFactory.setPersistentIndex(true);
persistenceFactory.setCleanupInterval(10000);
answer.setPersistenceFactory(persistenceFactory);
answer.setDeleteAllMessagesOnStartup(true);
answer.addConnector(uri);
@ -55,7 +57,7 @@ public class SimpleDurableTopicTest extends SimpleTopicTest {
protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException {
PerfConsumer result = new PerfConsumer(fac, dest, "subs:" + number);
result.setInitialDelay(2000);
result.setInitialDelay(0);
return result;
}

View File

@ -31,12 +31,14 @@ public class SimpleQueueTest extends SimpleTopicTest {
}
protected void setUp() throws Exception {
numberOfConsumers = 1;
super.setUp();
}
protected PerfConsumer createConsumer(ConnectionFactory fac, Destination dest, int number) throws JMSException {
PerfConsumer consumer = new PerfConsumer(fac, dest);
//consumer.setInitialDelay(2000);
//consumer.setSleepDuration(10);
boolean enableAudit = numberOfConsumers <= 1;
System.out.println("Enable Audit = " + enableAudit);
consumer.setEnableAudit(enableAudit);

View File

@ -0,0 +1,391 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.failover;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.ServiceStopper;
import org.apache.log4j.Logger;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
* TestCase showing the message-destroying described in AMQ-1925
*
* @version $Revision: 1.1 $
*/
public class AMQ1925Test extends TestCase {
private static final Logger log = Logger.getLogger(AMQ1925Test.class);
private static final String QUEUE_NAME = "test.amq1925";
private static final String PROPERTY_MSG_NUMBER = "NUMBER";
private static final int MESSAGE_COUNT = 10000;
private BrokerService bs;
private URI tcpUri;
private ActiveMQConnectionFactory cf;
public void testAMQ1925_TXInProgress() throws Exception {
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(session
.createQueue(QUEUE_NAME));
// The runnable is likely to interrupt during the session#commit, since
// this takes the longest
final Object starter = new Object();
final AtomicBoolean restarted = new AtomicBoolean();
new Thread(new Runnable() {
public void run() {
try {
synchronized (starter) {
starter.wait();
}
// Simulate broker failure & restart
bs.stop();
bs = new BrokerService();
bs.setPersistent(true);
bs.setUseJmx(true);
bs.addConnector(tcpUri);
bs.start();
restarted.set(true);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
synchronized (starter) {
starter.notifyAll();
}
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message message = consumer.receive(500);
assertNotNull("No Message " + i + " found", message);
if (i < 10)
assertFalse("Timing problem, restarted too soon", restarted
.get());
if (i == 10) {
synchronized (starter) {
starter.notifyAll();
}
}
if (i > MESSAGE_COUNT - 100) {
assertTrue("Timing problem, restarted too late", restarted
.get());
}
assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER));
session.commit();
}
assertNull(consumer.receive(500));
consumer.close();
session.close();
connection.close();
assertQueueEmpty();
}
public void XtestAMQ1925_TXInProgress_TwoConsumers() throws Exception {
Connection connection = cf.createConnection();
connection.start();
Session session1 = connection.createSession(true,
Session.SESSION_TRANSACTED);
MessageConsumer consumer1 = session1.createConsumer(session1
.createQueue(QUEUE_NAME));
Session session2 = connection.createSession(true,
Session.SESSION_TRANSACTED);
MessageConsumer consumer2 = session2.createConsumer(session2
.createQueue(QUEUE_NAME));
// The runnable is likely to interrupt during the session#commit, since
// this takes the longest
final Object starter = new Object();
final AtomicBoolean restarted = new AtomicBoolean();
new Thread(new Runnable() {
public void run() {
try {
synchronized (starter) {
starter.wait();
}
// Simulate broker failure & restart
bs.stop();
bs = new BrokerService();
bs.setPersistent(true);
bs.setUseJmx(true);
bs.addConnector(tcpUri);
bs.start();
restarted.set(true);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
synchronized (starter) {
starter.notifyAll();
}
Collection<Integer> results = new ArrayList<Integer>(MESSAGE_COUNT);
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message message1 = consumer1.receive(20);
Message message2 = consumer2.receive(20);
if (message1 == null && message2 == null) {
if (results.size() < MESSAGE_COUNT) {
message1 = consumer1.receive(500);
message2 = consumer2.receive(500);
if (message1 == null && message2 == null) {
// Missing messages
break;
}
}
break;
}
if (i < 10)
assertFalse("Timing problem, restarted too soon", restarted
.get());
if (i == 10) {
synchronized (starter) {
starter.notifyAll();
}
}
if (i > MESSAGE_COUNT - 50) {
assertTrue("Timing problem, restarted too late", restarted
.get());
}
if (message1 != null) {
results.add(message1.getIntProperty(PROPERTY_MSG_NUMBER));
session1.commit();
}
if (message2 != null) {
results.add(message2.getIntProperty(PROPERTY_MSG_NUMBER));
session2.commit();
}
}
assertNull(consumer1.receive(500));
assertNull(consumer2.receive(500));
consumer1.close();
session1.close();
consumer2.close();
session2.close();
connection.close();
int foundMissingMessages = 0;
if (results.size() < MESSAGE_COUNT) {
foundMissingMessages = tryToFetchMissingMessages();
}
for (int i = 0; i < MESSAGE_COUNT; i++) {
assertTrue("Message-Nr " + i + " not found (" + results.size()
+ " total, " + foundMissingMessages
+ " have been found 'lingering' in the queue)", results
.contains(i));
}
assertQueueEmpty();
}
private int tryToFetchMissingMessages() throws JMSException {
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(true, 0);
MessageConsumer consumer = session.createConsumer(session
.createQueue(QUEUE_NAME));
int count = 0;
while (true) {
Message message = consumer.receive(500);
if (message == null)
break;
log.info("Found \"missing\" message: " + message);
count++;
}
consumer.close();
session.close();
connection.close();
return count;
}
public void testAMQ1925_TXBegin() throws Exception {
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(session
.createQueue(QUEUE_NAME));
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message message = consumer.receive(500);
assertNotNull(message);
if (i == 222) {
// Simulate broker failure & restart
bs.stop();
bs = new BrokerService();
bs.setPersistent(true);
bs.setUseJmx(true);
bs.addConnector(tcpUri);
bs.start();
}
assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER));
session.commit();
}
assertNull(consumer.receive(500));
consumer.close();
session.close();
connection.close();
assertQueueEmpty();
}
public void testAMQ1925_TXCommited() throws Exception {
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(session
.createQueue(QUEUE_NAME));
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message message = consumer.receive(500);
assertNotNull(message);
assertEquals(i, message.getIntProperty(PROPERTY_MSG_NUMBER));
session.commit();
if (i == 222) {
// Simulate broker failure & restart
bs.stop();
bs = new BrokerService();
bs.setPersistent(true);
bs.setUseJmx(true);
bs.addConnector(tcpUri);
bs.start();
}
}
assertNull(consumer.receive(500));
consumer.close();
session.close();
connection.close();
assertQueueEmpty();
}
private void assertQueueEmpty() throws Exception {
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(session
.createQueue(QUEUE_NAME));
Message msg = consumer.receive(500);
if (msg != null) {
fail(msg.toString());
}
consumer.close();
session.close();
connection.close();
assertQueueLength(0);
}
private void assertQueueLength(int len) throws Exception, IOException {
Set<Destination> destinations = bs.getBroker().getDestinations(
new ActiveMQQueue(QUEUE_NAME));
Queue queue = (Queue) destinations.iterator().next();
assertEquals(len, queue.getMessageStore().getMessageCount());
}
private void sendMessagesToQueue() throws Exception {
Connection connection = cf.createConnection();
Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session
.createQueue(QUEUE_NAME));
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < MESSAGE_COUNT; i++) {
TextMessage message = session
.createTextMessage("Test message " + i);
message.setIntProperty(PROPERTY_MSG_NUMBER, i);
producer.send(message);
}
session.commit();
producer.close();
session.close();
connection.close();
assertQueueLength(MESSAGE_COUNT);
}
protected void setUp() throws Exception {
bs = new BrokerService();
bs.setPersistent(true);
bs.deleteAllMessages();
bs.setUseJmx(true);
TransportConnector connector = bs.addConnector("tcp://localhost:0");
bs.start();
tcpUri = connector.getConnectUri();
cf = new ActiveMQConnectionFactory("failover://(" + tcpUri + ")");
sendMessagesToQueue();
}
protected void tearDown() throws Exception {
new ServiceStopper().stop(bs);
}
}