Timothy A. Bish 2013-06-05 17:34:50 +00:00
parent 66a99fb580
commit 1f5694ed60
3 changed files with 29 additions and 27 deletions

View File

@ -19,12 +19,11 @@ package org.apache.activemq.broker.region;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
@ -36,19 +35,20 @@ public class QueueBrowserSubscription extends QueueSubscription {
boolean browseDone; boolean browseDone;
boolean destinationsAdded; boolean destinationsAdded;
public QueueBrowserSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) public QueueBrowserSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws JMSException {
throws JMSException { super(broker, usageManager, context, info);
super(broker,usageManager, context, info);
} }
@Override
protected boolean canDispatch(MessageReference node) { protected boolean canDispatch(MessageReference node) {
return !((QueueMessageReference)node).isAcked(); return !((QueueMessageReference) node).isAcked();
} }
@Override
public synchronized String toString() { public synchronized String toString() {
return "QueueBrowserSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" return "QueueBrowserSubscription:" + " consumer=" + info.getConsumerId() +
+ destinations.size() + ", dispatched=" + dispatched.size() + ", delivered=" ", destinations=" + destinations.size() + ", dispatched=" + dispatched.size() +
+ this.prefetchExtension + ", pending=" + getPendingQueueSize(); ", delivered=" + this.prefetchExtension + ", pending=" + getPendingQueueSize();
} }
synchronized public void destinationsAdded() throws Exception { synchronized public void destinationsAdded() throws Exception {
@ -57,12 +57,13 @@ public class QueueBrowserSubscription extends QueueSubscription {
} }
private void checkDone() throws Exception { private void checkDone() throws Exception {
if( !browseDone && queueRefs == 0 && destinationsAdded) { if (!browseDone && queueRefs == 0 && destinationsAdded) {
browseDone=true; browseDone = true;
add(QueueMessageReference.NULL_MESSAGE); add(QueueMessageReference.NULL_MESSAGE);
} }
} }
@Override
public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException { public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
return !browseDone && super.matches(node, context); return !browseDone && super.matches(node, context);
} }
@ -70,8 +71,8 @@ public class QueueBrowserSubscription extends QueueSubscription {
/** /**
* Since we are a browser we don't really remove the message from the queue. * Since we are a browser we don't really remove the message from the queue.
*/ */
protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference n) @Override
throws IOException { protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException {
if (info.isNetworkSubscription()) { if (info.isNetworkSubscription()) {
super.acknowledge(context, ack, n); super.acknowledge(context, ack, n);
} }
@ -88,7 +89,6 @@ public class QueueBrowserSubscription extends QueueSubscription {
checkDone(); checkDone();
} }
@Override @Override
public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
super.remove(context, destination); super.remove(context, destination);

View File

@ -238,7 +238,13 @@ public class PolicyEntry extends DestinationMapEntry {
configurePrefetch(sub); configurePrefetch(sub);
sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark()); sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
sub.setUsePrefetchExtension(isUsePrefetchExtension()); sub.setUsePrefetchExtension(isUsePrefetchExtension());
sub.setMaxProducersToAudit(getMaxProducersToAudit());
// TODO
// We currently need an infinite audit because of the way that browser dispatch
// is done. We should refactor the browsers to better handle message dispatch so
// we can remove this and perform a more efficient dispatch.
sub.setMaxProducersToAudit(Integer.MAX_VALUE);
sub.setMaxAuditDepth(Integer.MAX_VALUE);
} }
public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub) { public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub) {

View File

@ -19,7 +19,6 @@ package org.apache.activemq.bugs;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import java.net.URI;
import java.util.Enumeration; import java.util.Enumeration;
import javax.jms.Connection; import javax.jms.Connection;
@ -31,7 +30,6 @@ import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.junit.After; import org.junit.After;
@ -46,14 +44,14 @@ public class AMQ4487Test {
private final String destinationName = "TEST.QUEUE"; private final String destinationName = "TEST.QUEUE";
private BrokerService broker; private BrokerService broker;
private URI connectUri;
private ActiveMQConnectionFactory factory; private ActiveMQConnectionFactory factory;
@Before @Before
public void startBroker() throws Exception { public void startBroker() throws Exception {
broker = new BrokerService(); broker = new BrokerService();
TransportConnector connector = broker.addConnector("tcp://0.0.0.0:0");
broker.deleteAllMessages(); broker.deleteAllMessages();
broker.setUseJmx(false);
broker.setAdvisorySupport(false);
PolicyEntry policy = new PolicyEntry(); PolicyEntry policy = new PolicyEntry();
policy.setQueue(">"); policy.setQueue(">");
@ -64,8 +62,7 @@ public class AMQ4487Test {
broker.start(); broker.start();
broker.waitUntilStarted(); broker.waitUntilStarted();
connectUri = connector.getConnectUri(); factory = new ActiveMQConnectionFactory("vm://localhost");
factory = new ActiveMQConnectionFactory(connectUri);
} }
@After @After
@ -101,7 +98,7 @@ public class AMQ4487Test {
@Test @Test
public void testBrowsingWithMoreThanMaxAuditDepth() throws Exception { public void testBrowsingWithMoreThanMaxAuditDepth() throws Exception {
doTestBrowsing(76); doTestBrowsing(300);
} }
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
@ -124,7 +121,6 @@ public class AMQ4487Test {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Browsed Message: {}", m.getJMSMessageID()); LOG.debug("Browsed Message: {}", m.getJMSMessageID());
} }
LOG.info("Browsed Message: {}", m.getJMSMessageID());
received++; received++;
if (received > messagesToSend) { if (received > messagesToSend) {