https://issues.apache.org/jira/browse/AMQ-3362 - revisit, place a limit on a browse such that the entire store is not recovered at once. Makes use of getMaxBrowsePageSize and getMaxExpirePageSize like the queue case, overhead is too large otherwise as durable sub test shows

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1159662 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-08-19 15:34:31 +00:00
parent 32c06a0b1b
commit 383a7acb05
5 changed files with 56 additions and 28 deletions

View File

@ -54,6 +54,7 @@ import org.apache.activemq.broker.region.group.MessageGroupMap;
import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.broker.util.InsertionCountList;
import org.apache.activemq.command.*;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
@ -757,24 +758,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
// just track the insertion count
List<Message> browsedMessages = new AbstractList<Message>() {
int size = 0;
@Override
public void add(int index, Message element) {
size++;
}
@Override
public int size() {
return size;
}
@Override
public Message get(int index) {
return null;
}
};
List<Message> browsedMessages = new InsertionCountList<Message>();
doBrowse(browsedMessages, this.getMaxExpirePageSize());
asyncWakeup();
if (LOG.isDebugEnabled()) {

View File

@ -33,6 +33,7 @@ import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
import org.apache.activemq.broker.util.InsertionCountList;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
@ -532,10 +533,15 @@ public class Topic extends BaseDestination implements Task {
}
public Message[] browse() {
final ConnectionContext connectionContext = createConnectionContext();
final Set<Message> result = new CopyOnWriteArraySet<Message>();
final List<Message> result = new ArrayList<Message>();
doBrowse(result, getMaxBrowsePageSize());
return result.toArray(new Message[result.size()]);
}
private void doBrowse(final List<Message> browseList, final int max) {
try {
if (topicStore != null) {
final ConnectionContext connectionContext = createConnectionContext();
topicStore.recover(new MessageRecoveryListener() {
public boolean recoverMessage(Message message) throws Exception {
if (message.isExpired()) {
@ -545,7 +551,7 @@ public class Topic extends BaseDestination implements Task {
}
}
}
result.add(message);
browseList.add(message);
return true;
}
@ -554,7 +560,7 @@ public class Topic extends BaseDestination implements Task {
}
public boolean hasSpace() {
return true;
return browseList.size() < max;
}
public boolean isDuplicate(MessageId id) {
@ -563,15 +569,14 @@ public class Topic extends BaseDestination implements Task {
});
Message[] msgs = subscriptionRecoveryPolicy.browse(getActiveMQDestination());
if (msgs != null) {
for (int i = 0; i < msgs.length; i++) {
result.add(msgs[i]);
for (int i = 0; i < msgs.length && browseList.size() < max; i++) {
browseList.add(msgs[i]);
}
}
}
} catch (Throwable e) {
LOG.warn("Failed to browse Topic: " + getActiveMQDestination().getPhysicalName(), e);
}
return result.toArray(new Message[result.size()]);
}
public boolean iterate() {
@ -656,7 +661,8 @@ public class Topic extends BaseDestination implements Task {
private final Runnable expireMessagesTask = new Runnable() {
public void run() {
browse();
List<Message> browsedMessages = new InsertionCountList<Message>();
doBrowse(browsedMessages, getMaxExpirePageSize());
}
};

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.util;
import java.util.AbstractList;
public class InsertionCountList<T> extends AbstractList<T> {
int size = 0;
@Override
public void add(int index, T element) {
size++;
}
@Override
public int size() {
return size;
}
@Override
public T get(int index) {
return null;
}
};

View File

@ -455,7 +455,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
sd.orderIndex.resetCursorPosition();
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); listener.hasSpace() && iterator
.hasNext(); ) {
Entry<Long, MessageKeys> entry = iterator.next();
if (ackedAndPrepared.contains(entry.getValue().messageId)) {

View File

@ -64,7 +64,7 @@ import org.slf4j.LoggerFactory;
*/
public class DurableConsumerTest extends CombinationTestSupport{
private static final Logger LOG = LoggerFactory.getLogger(DurableConsumerTest.class);
private static int COUNT = 1024 * 10;
private static int COUNT = 1024 * 60;
private static String CONSUMER_NAME = "DURABLE_TEST";
protected BrokerService broker;