Use the store based cursor by default for Queues - which will enable very large queue support

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@490814 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-12-28 21:03:53 +00:00
parent 6964747912
commit d2e606208d
7 changed files with 142 additions and 73 deletions

View File

@ -38,6 +38,7 @@ import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import sun.security.x509.IssuerAlternativeNameExtension;
import java.util.concurrent.ConcurrentHashMap;
@ -60,6 +61,7 @@ abstract public class AbstractRegion implements Region {
protected final TaskRunnerFactory taskRunnerFactory;
protected final Object destinationsMutex = new Object();
protected final Map consumerChangeMutexMap = new HashMap();
protected boolean started = false;
public AbstractRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
if (broker == null) {
@ -76,9 +78,15 @@ abstract public class AbstractRegion implements Region {
}
public void start() throws Exception {
started = true;
for (Iterator i = destinations.values().iterator();i.hasNext();) {
Destination dest = (Destination)i.next();
dest.start();
}
}
public void stop() throws Exception {
started = false;
for (Iterator i = destinations.values().iterator();i.hasNext();) {
Destination dest = (Destination)i.next();
dest.stop();

View File

@ -77,7 +77,7 @@ public class DestinationFactoryImpl extends DestinationFactory {
if (destination.isQueue()) {
if (destination.isTemporary()) {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory) {
return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory,broker.getTempDataStore()) {
public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {
// Only consumers on the same connection can consume from
@ -90,7 +90,7 @@ public class DestinationFactoryImpl extends DestinationFactory {
};
} else {
MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination);
Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory);
Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory,broker.getTempDataStore());
configureQueue(queue, destination);
queue.initialize();
return queue;

View File

@ -19,22 +19,21 @@ package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.util.SubscriptionKey;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class DurableTopicSubscription extends PrefetchSubscription {
static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
private final ConcurrentHashMap redeliveredMessages = new ConcurrentHashMap();
private final ConcurrentHashMap destinations = new ConcurrentHashMap();
private final SubscriptionKey subscriptionKey;
@ -72,6 +71,7 @@ public class DurableTopicSubscription extends PrefetchSubscription {
}
public void activate(ConnectionContext context, ConsumerInfo info) throws Exception {
log.debug("Deactivating " + this);
if( !active ) {
this.active = true;
this.context = context;
@ -97,6 +97,7 @@ public class DurableTopicSubscription extends PrefetchSubscription {
}
synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception {
active=false;
synchronized(pending){
pending.stop();
@ -197,9 +198,12 @@ public class DurableTopicSubscription extends PrefetchSubscription {
"DurableTopicSubscription:" +
" consumer="+info.getConsumerId()+
", destinations="+destinations.size()+
", dispatched="+dispatched.size()+
", delivered="+this.prefetchExtension+
", pending="+getPendingQueueSize();
", total="+enqueueCounter+
", pending="+getPendingQueueSize()+
", dispatched="+dispatchCounter+
", inflight="+dispatched.size()+
", prefetchExtension="+this.prefetchExtension;
}
public String getClientId() {

View File

@ -327,6 +327,10 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
return (dispatched.size()-prefetchExtension) >= (info.getPrefetchSize() *.9);
}
public int countBeforeFull() {
return info.getPrefetchSize() + prefetchExtension - dispatched.size();
}
public int getPendingQueueSize(){
synchronized(pending) {
return pending.size();
@ -396,9 +400,14 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
List toDispatch=null;
synchronized(pending){
try{
int numberToDispatch=countBeforeFull();
if(numberToDispatch>0){
int count=0;
pending.reset();
while(pending.hasNext()&&!isFull()){
while(pending.hasNext()&&!isFull()&&count<numberToDispatch){
MessageReference node=pending.next();
if(canDispatch(node)){
pending.remove();
// Message may have been sitting in the pending list a while
// waiting for the consumer to ak the message.
@ -409,17 +418,22 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
toDispatch=new ArrayList();
}
toDispatch.add(node);
count++;
}
}
}
}finally{
pending.release();
}
}
if(toDispatch!=null){
synchronized(dispatched){
for(int i=0;i<toDispatch.size();i++){
MessageReference node=(MessageReference)toDispatch.get(i);
dispatch(node);
}
}
}
}finally{
dispatching.set(false);
}
@ -458,6 +472,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
}
return true;
}else{
QueueMessageReference n = (QueueMessageReference) node;
return false;
}
}

View File

@ -28,6 +28,7 @@ import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
import org.apache.activemq.broker.region.group.MessageGroupMap;
@ -44,6 +45,7 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageRecoveryListener;
@ -74,7 +76,7 @@ public class Queue implements Destination, Task {
private final Valve dispatchValve = new Valve(true);
private final UsageManager usageManager;
private final DestinationStatistics destinationStatistics = new DestinationStatistics();
private PendingMessageCursor messages = new VMPendingMessageCursor();
private PendingMessageCursor messages;
private final LinkedList pagedInMessages = new LinkedList();
private LockOwner exclusiveOwner;
@ -92,13 +94,20 @@ public class Queue implements Destination, Task {
private final Object exclusiveLockMutex = new Object();
private final Object doDispatchMutex = new Object();
private TaskRunner taskRunner;
private boolean started = false;
public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory) throws Exception {
TaskRunnerFactory taskFactory, Store tmpStore) throws Exception {
this.destination = destination;
this.usageManager = new UsageManager(memoryManager);
this.usageManager.setLimit(Long.MAX_VALUE);
this.store = store;
if(destination.isTemporary()){
this.messages=new VMPendingMessageCursor();
}else{
this.messages=new StoreQueueCursor(this,tmpStore);
}
this.taskRunner = taskFactory.createTaskRunner(this, "Queue "+destination.getPhysicalName());
// Let the store know what usage manager we are using so that he can
@ -118,17 +127,15 @@ public class Queue implements Destination, Task {
if(store!=null){
// Restore the persistent messages.
messages.setUsageManager(getUsageManager());
messages.start();
if(messages.isRecoveryRequired()){
store.recover(new MessageRecoveryListener(){
public void recoverMessage(Message message){
// Message could have expired while it was being loaded..
if(message.isExpired()){
// TODO: remove message from store.
// TODO remove from store
return;
}
message.setRegionDestination(Queue.this);
synchronized(messages){
try{
@ -157,10 +164,12 @@ public class Queue implements Destination, Task {
/**
* Lock a node
*
* @param node
* @param lockOwner
* @return true if can be locked
* @see org.apache.activemq.broker.region.Destination#lock(org.apache.activemq.broker.region.MessageReference, org.apache.activemq.broker.region.LockOwner)
* @see org.apache.activemq.broker.region.Destination#lock(org.apache.activemq.broker.region.MessageReference,
* org.apache.activemq.broker.region.LockOwner)
*/
public boolean lock(MessageReference node,LockOwner lockOwner){
synchronized(exclusiveLockMutex){
@ -312,18 +321,22 @@ public class Queue implements Destination, Task {
// There is delay between the client sending it and it arriving at the
// destination.. it may have expired.
if(message.isExpired()){
if (log.isDebugEnabled()) {
log.debug("Expired message: " + message);
}
return;
}
if(context.isProducerFlowControl()){
if(usageManager.isSendFailIfNoSpace()&&usageManager.isFull()){
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
}else{
usageManager.waitForSpace();
// The usage manager could have delayed us by the time
// we unblock the message could have expired..
if(message.isExpired()){
if (log.isDebugEnabled()) {
log.debug("Expired message: " + message);
}
return;
}
}
@ -336,19 +349,29 @@ public class Queue implements Destination, Task {
context.getTransaction().addSynchronization(new Synchronization(){
public void afterCommit() throws Exception{
//even though the message could be expired - it won't be from the store
//and it's important to keep the store/cursor in step
synchronized(messages){
messages.addMessageLast(message);
}
// It could take while before we receive the commit
// operration.. by that time the message could have expired..
if(message.isExpired()){
// TODO: remove message from store.
if (log.isDebugEnabled()) {
log.debug("Expired message: " + message);
}
return;
}
sendMessage(context,message);
}
});
}else{
synchronized(messages){
messages.addMessageLast(message);
}
sendMessage(context,message);
}
}
@ -432,12 +455,19 @@ public class Queue implements Destination, Task {
}
public void start() throws Exception {
started = true;
messages.start();
doPageIn(false);
}
public void stop() throws Exception {
started = false;
if( taskRunner!=null ) {
taskRunner.shutdown();
}
if(messages!=null){
messages.stop();
}
}
// Properties
@ -528,6 +558,11 @@ public class Queue implements Destination, Task {
public Message[] browse() {
ArrayList l = new ArrayList();
try{
doPageIn(true);
}catch(Exception e){
log.error("caught an exception browsing " + this,e);
}
synchronized(pagedInMessages) {
for (Iterator i = pagedInMessages.iterator();i.hasNext();) {
MessageReference r = (MessageReference)i.next();
@ -538,7 +573,7 @@ public class Queue implements Destination, Task {
l.add(m);
}
}catch(IOException e){
log.error("caught an exception brwsing " + this,e);
log.error("caught an exception browsing " + this,e);
}
finally {
r.decrementReferenceCount();
@ -850,11 +885,10 @@ public class Queue implements Destination, Task {
return answer;
}
private void sendMessage(final ConnectionContext context,Message msg) throws Exception{
synchronized(messages){
messages.addMessageLast(msg);
}
destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessages().increment();
pageInMessages(false);
@ -863,6 +897,7 @@ public class Queue implements Destination, Task {
private List doPageIn() throws Exception{
return doPageIn(true);
}
private List doPageIn(boolean force) throws Exception{
final int toPageIn=maximumPagedInMessages-pagedInMessages.size();
List result=null;
@ -877,9 +912,15 @@ public class Queue implements Destination, Task {
while(messages.hasNext()&&count<toPageIn){
MessageReference node=messages.next();
messages.remove();
if(!node.isExpired()){
node=createMessageReference(node.getMessage());
result.add(node);
count++;
}else{
if (log.isDebugEnabled()) {
log.debug("Expired message: " + node);
}
}
}
}finally{
messages.release();

View File

@ -208,7 +208,7 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
/**
*/
synchronized public void destroy() {
public void destroy() {
}
}

View File

@ -78,7 +78,7 @@ public class RegionBroker implements Broker {
private final Region tempQueueRegion;
private final Region tempTopicRegion;
private BrokerService brokerService;
private boolean stopped = false;
private boolean started = false;
private boolean keepDurableSubsActive=false;
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
@ -178,6 +178,7 @@ public class RegionBroker implements Broker {
public void start() throws Exception {
((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
started = true;
queueRegion.start();
topicRegion.start();
tempQueueRegion.start();
@ -185,7 +186,7 @@ public class RegionBroker implements Broker {
}
public void stop() throws Exception {
stopped = true;
started = false;
ServiceStopper ss = new ServiceStopper();
doStop(ss);
ss.throwFirstException();
@ -245,7 +246,6 @@ public class RegionBroker implements Broker {
if( destinations.contains(destination) ){
throw new JMSException("Destination already exists: "+destination);
}
Destination answer = null;
switch(destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
@ -366,7 +366,8 @@ public class RegionBroker implements Broker {
}
public void send(ConnectionContext context, Message message) throws Exception {
message.getMessageId().setBrokerSequenceId(sequenceGenerator.getNextSequenceId());
long si = sequenceGenerator.getNextSequenceId();
message.getMessageId().setBrokerSequenceId(si);
if (message.getTimestamp() > 0 && (message.getBrokerPath() == null || message.getBrokerPath().length == 0)) {
//timestamp not been disabled and has not passed through a network
message.setTimestamp(System.currentTimeMillis());
@ -541,7 +542,7 @@ public class RegionBroker implements Broker {
}
public boolean isStopped(){
return stopped;
return !started;
}
public Set getDurableDestinations(){