fix AMQ-1970 - pagedInMessages in slave was being filled due to more than 200(pageSize) unacked messages and slave not modifying the inflight count which is used in the pageIn logic

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@704142 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2008-10-13 15:24:31 +00:00
parent ac2d263e24
commit ae1bb668d6
3 changed files with 63 additions and 10 deletions

View File

@ -163,6 +163,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
pending.remove(); pending.remove();
createMessageDispatch(node, node.getMessage()); createMessageDispatch(node, node.getMessage());
dispatched.add(node); dispatched.add(node);
onDispatch(node, node.getMessage());
} }
return; return;
} }
@ -173,7 +174,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
} }
throw new JMSException( throw new JMSException(
"Slave broker out of sync with master: Dispatched message (" "Slave broker out of sync with master: Dispatched message ("
+ mdn.getMessageId() + ") was not in the pending list"); + mdn.getMessageId() + ") was not in the pending list for " + mdn.getDestination().getPhysicalName());
} }
public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception { public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
@ -205,9 +206,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (!this.getConsumerInfo().isBrowser()) { if (!this.getConsumerInfo().isBrowser()) {
node.getRegionDestination().getDestinationStatistics().getDequeues().increment(); node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
} }
if (!isSlave()) { node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
}
} else { } else {
// setup a Synchronization to remove nodes from the // setup a Synchronization to remove nodes from the
// dispatched list. // dispatched list.

View File

@ -16,14 +16,26 @@
*/ */
package org.apache.activemq.advisory; package org.apache.activemq.advisory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.RegionBroker;
public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest { public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {
String masterBindAddress = "tcp://localhost:61616"; String masterBindAddress = "tcp://localhost:61616";
String slaveBindAddress = "tcp://localhost:62616"; String slaveBindAddress = "tcp://localhost:62616";
BrokerService slave; BrokerService slave;
/* /*
* add a slave broker * add a slave broker
* @see org.apache.activemq.EmbeddedBrokerTestSupport#createBroker() * @see org.apache.activemq.EmbeddedBrokerTestSupport#createBroker()
@ -93,4 +105,46 @@ public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {
masterRb.getDestinationStatistics().getDispatched().getCount()); masterRb.getDestinationStatistics().getDispatched().getCount());
} }
public void testMoreThanPageSizeUnacked() throws Exception {
final int messageCount = Queue.MAX_PAGE_SIZE + 10;
final CountDownLatch latch = new CountDownLatch(1);
serverSession = serverConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQSession s = (ActiveMQSession) serverSession;
s.setSessionAsyncDispatch(true);
MessageConsumer serverConsumer = serverSession.createConsumer(serverDestination);
serverConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
try {
latch.await(30L, TimeUnit.SECONDS);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
MessageProducer producer = clientSession.createProducer(serverDestination);
for (int i =0; i< messageCount; i++) {
Message msg = clientSession.createMessage();
producer.send(msg);
}
RegionBroker slaveRb = (RegionBroker) slave.getBroker().getAdaptor(
RegionBroker.class);
RegionBroker masterRb = (RegionBroker) broker.getBroker().getAdaptor(
RegionBroker.class);
Thread.sleep(4000);
assertEquals("inflight match expected", messageCount, masterRb.getDestinationStatistics().getInflight().getCount());
assertEquals("inflight match on slave and master", slaveRb.getDestinationStatistics().getInflight().getCount(), masterRb.getDestinationStatistics().getInflight().getCount());
latch.countDown();
Thread.sleep(4000);
assertEquals("inflight match expected", 0, masterRb.getDestinationStatistics().getInflight().getCount());
assertEquals("inflight match on slave and master", slaveRb.getDestinationStatistics().getInflight().getCount(), masterRb.getDestinationStatistics().getInflight().getCount());
}
} }

View File

@ -32,11 +32,11 @@ import org.apache.activemq.command.ActiveMQQueue;
* @version $Revision: 397249 $ * @version $Revision: 397249 $
*/ */
public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport { public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport {
private Connection serverConnection; protected Connection serverConnection;
private Session serverSession; protected Session serverSession;
private Connection clientConnection; protected Connection clientConnection;
private Session clientSession; protected Session clientSession;
private Destination serverDestination; protected Destination serverDestination;
protected static final int COUNT = 2000; protected static final int COUNT = 2000;
public void testLoadRequestReply() throws Exception { public void testLoadRequestReply() throws Exception {