This commit is contained in:
gtully 2013-10-22 15:22:36 +01:00
parent 092bc61708
commit 85bc9ce9d0
7 changed files with 137 additions and 10 deletions

View File

@ -286,7 +286,7 @@ public abstract class BaseDestination implements Destination {
}
public int getMaxBrowsePageSize() {
return this.maxBrowsePageSize;
return this.maxBrowsePageSize > 0 ? this.maxBrowsePageSize : getMaxPageSize();
}
public void setMaxBrowsePageSize(int maxPageSize) {

View File

@ -1605,7 +1605,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, alreadyDispatchedMessages.size());
boolean added = false;
for (QueueMessageReference node : alreadyDispatchedMessages) {
if (!node.isAcked() && !browser.isDuplicate(node.getMessageId())) {
if (!node.isAcked() && !browser.isDuplicate(node.getMessageId()) && !browser.atMax()) {
msgContext.setMessageReference(node);
if (browser.matches(node, msgContext)) {
browser.add(node);
@ -1614,7 +1614,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
}
// are we done browsing? no new messages paged
if (!added) {
if (!added || browser.atMax()) {
browser.decrementQueueRef();
browserDispatches.remove(browserDispatch);
}

View File

@ -43,6 +43,7 @@ public class QueueBrowserSubscription extends QueueSubscription {
boolean destinationsAdded;
private final Map<MessageId, Object> audit = new HashMap<MessageId, Object>();
private long maxMessages;
public QueueBrowserSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws JMSException {
super(broker, usageManager, context, info);
@ -115,4 +116,12 @@ public class QueueBrowserSubscription extends QueueSubscription {
// in case of browser
return new ArrayList<MessageReference>();
}
public boolean atMax() {
return maxMessages > 0 && getEnqueueCounter() >= maxMessages;
}
public void setMaxMessages(long max) {
maxMessages = max;
}
}

View File

@ -275,6 +275,9 @@ public class PolicyEntry extends DestinationMapEntry {
// we can remove this and perform a more efficient dispatch.
sub.setMaxProducersToAudit(Integer.MAX_VALUE);
sub.setMaxAuditDepth(Short.MAX_VALUE);
// part solution - dispatching to browsers needs to be restricted
sub.setMaxMessages(getMaxBrowsePageSize());
}
public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub) {

View File

@ -440,6 +440,7 @@ public class JmsQueueBrowserTest extends JmsTestSupport {
PolicyMap policyMap = new PolicyMap();
PolicyEntry policyEntry = new PolicyEntry();
policyEntry.setUseCache(isUseCache);
policyEntry.setMaxBrowsePageSize(4096);
policyMap.setDefaultEntry(policyEntry);
brokerService.setDestinationPolicy(policyMap);
return brokerService;

View File

@ -34,6 +34,8 @@ import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Before;
@ -55,13 +57,12 @@ public class AMQ4595Test {
TransportConnector connector = broker.addConnector("vm://localhost");
broker.deleteAllMessages();
// PolicyEntry policy = new PolicyEntry();
// policy.setQueue(">");
// policy.setMaxAuditDepth(16384);
// policy.setCursorMemoryHighWaterMark(95); // More breathing room.
// PolicyMap pMap = new PolicyMap();
// pMap.setDefaultEntry(policy);
// broker.setDestinationPolicy(pMap);
//PolicyMap pMap = new PolicyMap();
//PolicyEntry policyEntry = new PolicyEntry();
//policyEntry.setMaxBrowsePageSize(10000);
//pMap.put(new ActiveMQQueue(">"), policyEntry);
// when no policy match, browserSub has maxMessages==0
//broker.setDestinationPolicy(pMap);
broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024);
broker.start();

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.io.IOException;
import java.net.URI;
import java.util.Enumeration;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
public class QueueBrowsingLimitTest {
private static final Logger LOG = LoggerFactory.getLogger(QueueBrowsingLimitTest.class);
private BrokerService broker;
private URI connectUri;
private ActiveMQConnectionFactory factory;
private final int browserLimit = 300;
@Before
public void startBroker() throws Exception {
broker = createBroker();
TransportConnector connector = broker.addConnector("tcp://0.0.0.0:0");
broker.deleteAllMessages();
broker.start();
broker.waitUntilStarted();
PolicyEntry policy = new PolicyEntry();
policy.setMaxBrowsePageSize(browserLimit);
broker.setDestinationPolicy(new PolicyMap());
broker.getDestinationPolicy().setDefaultEntry(policy);
connectUri = connector.getConnectUri();
factory = new ActiveMQConnectionFactory(connectUri);
}
public BrokerService createBroker() throws IOException {
return new BrokerService();
}
@After
public void stopBroker() throws Exception {
broker.stop();
broker.waitUntilStopped();
}
@Test
public void testBrowsingLimited() throws Exception {
int messageToSend = 470;
ActiveMQQueue queue = new ActiveMQQueue("TEST");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
String data = "";
for( int i=0; i < 1024*2; i++ ) {
data += "x";
}
for( int i=0; i < messageToSend; i++ ) {
producer.send(session.createTextMessage(data));
}
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> enumeration = browser.getEnumeration();
int received = 0;
while (enumeration.hasMoreElements()) {
Message m = (Message) enumeration.nextElement();
received++;
LOG.info("Browsed message " + received + ": " + m.getJMSMessageID());
}
browser.close();
assertEquals(browserLimit, received);
}
}