Fixed failing test cases: - a few problems had been there for a while

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@463646 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-10-13 11:17:41 +00:00
parent a20de10f4a
commit 3a9299bade
22 changed files with 257 additions and 120 deletions

View File

@ -21,6 +21,7 @@ import java.util.Set;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Region; import org.apache.activemq.broker.region.Region;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
@ -250,10 +251,19 @@ public interface Broker extends Region, Service {
/** /**
* Sets the default administration connection context used when configuring the broker on startup or via JMX * Sets the default administration connection context used when configuring the broker on startup or via JMX
* @param adminConnectionContext
*/ */
public abstract void setAdminConnectionContext(ConnectionContext adminConnectionContext); public abstract void setAdminConnectionContext(ConnectionContext adminConnectionContext);
/**
* @return the pendingDurableSubscriberPolicy
*/
public abstract PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy();
/**
* @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy to set
*/
public abstract void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy);
/** /**
* @return the broker's temp data store * @return the broker's temp data store
* @throws Exception * @throws Exception

View File

@ -19,6 +19,7 @@ package org.apache.activemq.broker;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
@ -233,6 +234,15 @@ public class BrokerFilter implements Broker {
next.setAdminConnectionContext(adminConnectionContext); next.setAdminConnectionContext(adminConnectionContext);
} }
public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
return next.getPendingDurableSubscriberPolicy();
}
public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
next.setPendingDurableSubscriberPolicy(pendingDurableSubscriberPolicy);
}
public Store getTempDataStore() { public Store getTempDataStore() {
return next.getTempDataStore(); return next.getTempDataStore();
} }

View File

@ -45,7 +45,9 @@ import org.apache.activemq.broker.region.DestinationFactory;
import org.apache.activemq.broker.region.DestinationFactoryImpl; import org.apache.activemq.broker.region.DestinationFactoryImpl;
import org.apache.activemq.broker.region.DestinationInterceptor; import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.virtual.VirtualDestination; import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic; import org.apache.activemq.broker.region.virtual.VirtualTopic;
@ -137,6 +139,7 @@ public class BrokerService implements Service, Serializable {
private ActiveMQDestination[] destinations; private ActiveMQDestination[] destinations;
private Store tempDataStore; private Store tempDataStore;
private int persistenceThreadPriority = Thread.MAX_PRIORITY; private int persistenceThreadPriority = Thread.MAX_PRIORITY;
private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy = new VMPendingDurableSubscriberMessageStoragePolicy();
/** /**
@ -388,7 +391,13 @@ public class BrokerService implements Service, Serializable {
} }
getBroker().start(); getBroker().start();
/*
if(isUseJmx()){
// yes - this is orer dependent!
// register all destination in persistence store including inactive destinations as mbeans
this.startDestinationsInPersistenceStore(broker);
}
*/
startAllConnectors(); startAllConnectors();
if (isUseJmx() && masterConnector != null) { if (isUseJmx() && masterConnector != null) {
@ -988,6 +997,23 @@ public class BrokerService implements Service, Serializable {
this.persistenceThreadPriority=persistenceThreadPriority; this.persistenceThreadPriority=persistenceThreadPriority;
} }
/**
* @return the pendingDurableSubscriberPolicy
*/
public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy(){
return this.pendingDurableSubscriberPolicy;
}
/**
* @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy to set
*/
public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy){
this.pendingDurableSubscriberPolicy=pendingDurableSubscriberPolicy;
if (broker != null) {
broker.setPendingDurableSubscriberPolicy(pendingDurableSubscriberPolicy);
}
}
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
/** /**
@ -1199,8 +1225,6 @@ public class BrokerService implements Service, Serializable {
mbeanServer.registerMBean(adminView, objectName); mbeanServer.registerMBean(adminView, objectName);
registeredMBeanNames.add(objectName); registeredMBeanNames.add(objectName);
} }
//register all destination in persistence store including inactive destinations as mbeans
this.startDestinationsInPersistenceStore(broker);
} }
@ -1243,6 +1267,7 @@ public class BrokerService implements Service, Serializable {
regionBroker.setKeepDurableSubsActive(keepDurableSubsActive); regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
regionBroker.setBrokerName(getBrokerName()); regionBroker.setBrokerName(getBrokerName());
regionBroker.setPendingDurableSubscriberPolicy(getPendingDurableSubscriberPolicy());
return regionBroker; return regionBroker;
} }
@ -1516,7 +1541,4 @@ public class BrokerService implements Service, Serializable {
} }
} }
} }
} }

View File

@ -19,6 +19,7 @@ package org.apache.activemq.broker;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
@ -231,6 +232,13 @@ public class EmptyBroker implements Broker {
return null; return null;
} }
public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
return null;
}
public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
}
public Store getTempDataStore() { public Store getTempDataStore() {
return null; return null;
} }

View File

@ -23,6 +23,7 @@ import java.util.Set;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
@ -231,6 +232,14 @@ public class ErrorBroker implements Broker {
throw new BrokerStoppedException(this.message); throw new BrokerStoppedException(this.message);
} }
public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
throw new BrokerStoppedException(this.message);
}
public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
throw new BrokerStoppedException(this.message);
}
public Store getTempDataStore() { public Store getTempDataStore() {
throw new BrokerStoppedException(this.message); throw new BrokerStoppedException(this.message);
} }

View File

@ -19,6 +19,7 @@ package org.apache.activemq.broker;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
@ -245,6 +246,14 @@ public class MutableBrokerFilter implements Broker {
return getNext().messagePull(context, pull); return getNext().messagePull(context, pull);
} }
public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
return getNext().getPendingDurableSubscriberPolicy();
}
public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
getNext().setPendingDurableSubscriberPolicy(pendingDurableSubscriberPolicy);
}
public Store getTempDataStore() { public Store getTempDataStore() {
return getNext().getTempDataStore(); return getNext().getTempDataStore();
} }

View File

@ -177,7 +177,6 @@ abstract public class AbstractRegion implements Region {
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
log.debug("Adding consumer: "+info.getConsumerId()); log.debug("Adding consumer: "+info.getConsumerId());
ActiveMQDestination destination = info.getDestination(); ActiveMQDestination destination = info.getDestination();
if (destination != null && ! destination.isPattern() && ! destination.isComposite()) { if (destination != null && ! destination.isPattern() && ! destination.isComposite()) {
// lets auto-create the destination // lets auto-create the destination
@ -260,7 +259,6 @@ abstract public class AbstractRegion implements Region {
} }
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
log.debug("Removing consumer: "+info.getConsumerId()); log.debug("Removing consumer: "+info.getConsumerId());
Subscription sub = (Subscription) subscriptions.remove(info.getConsumerId()); Subscription sub = (Subscription) subscriptions.remove(info.getConsumerId());

View File

@ -92,6 +92,7 @@ public class DestinationFactoryImpl extends DestinationFactory {
MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination); MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination);
Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory); Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory);
configureQueue(queue, destination); configureQueue(queue, destination);
queue.initialize();
return queue; return queue;
} }
} else if (destination.isTemporary()){ } else if (destination.isTemporary()){

View File

@ -24,6 +24,7 @@ import javax.jms.JMSException;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor; import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
@ -40,10 +41,8 @@ public class DurableTopicSubscription extends PrefetchSubscription {
private final boolean keepDurableSubsActive; private final boolean keepDurableSubsActive;
private boolean active=false; private boolean active=false;
public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws InvalidSelectorException { public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive,PendingMessageCursor cursor) throws InvalidSelectorException {
//super(broker,context, info, new StoreDurableSubscriberCursor(context.getClientId(),info.getSubcriptionName(),broker.getTempDataStore(),info.getPrefetchSize())); super(broker,context,info,cursor);
//super(broker,context, info, new FilePendingMessageCursor(context.getClientId() + info.getConsumerId().toString(),broker.getTempDataStore()));
super(broker,context,info);
this.keepDurableSubsActive = keepDurableSubsActive; this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubcriptionName()); subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
} }
@ -192,7 +191,6 @@ public class DurableTopicSubscription extends PrefetchSubscription {
* Release any references that we are holding. * Release any references that we are holding.
*/ */
synchronized public void destroy() { synchronized public void destroy() {
synchronized(pending) { synchronized(pending) {
pending.reset(); pending.reset();
while(pending.hasNext()) { while(pending.hasNext()) {

View File

@ -124,8 +124,8 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
synchronized public void add(MessageReference node) throws Exception{ synchronized public void add(MessageReference node) throws Exception{
enqueueCounter++; enqueueCounter++;
//if(!isFull()){
if(!isFull() && pending.isEmpty() && canDispatch(node)){ if(!isFull() && pending.isEmpty() ){
dispatch(node); dispatch(node);
}else{ }else{
optimizePrefetch(); optimizePrefetch();
@ -376,7 +376,6 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
if(canDispatch(node)&&!isSlaveBroker()){ if(canDispatch(node)&&!isSlaveBroker()){
MessageDispatch md=createMessageDispatch(node,message); MessageDispatch md=createMessageDispatch(node,message);
// NULL messages don't count... they don't get Acked. // NULL messages don't count... they don't get Acked.
if( node != QueueMessageReference.NULL_MESSAGE ) { if( node != QueueMessageReference.NULL_MESSAGE ) {
dispatchCounter++; dispatchCounter++;

View File

@ -20,12 +20,15 @@ package org.apache.activemq.broker.region;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory; import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
import org.apache.activemq.broker.region.group.MessageGroupMap; import org.apache.activemq.broker.region.group.MessageGroupMap;
import org.apache.activemq.broker.region.group.MessageGroupMapFactory; import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
import org.apache.activemq.broker.region.group.MessageGroupSet; import org.apache.activemq.broker.region.group.MessageGroupSet;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DispatchPolicy; import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
@ -67,10 +70,10 @@ public class Queue implements Destination {
protected final ActiveMQDestination destination; protected final ActiveMQDestination destination;
protected final List consumers = new CopyOnWriteArrayList(); protected final List consumers = new CopyOnWriteArrayList();
private final LinkedList messages = new LinkedList();
protected final Valve dispatchValve = new Valve(true); protected final Valve dispatchValve = new Valve(true);
protected final UsageManager usageManager; protected final UsageManager usageManager;
protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
protected PendingMessageCursor messages = new VMPendingMessageCursor();
private LockOwner exclusiveOwner; private LockOwner exclusiveOwner;
private MessageGroupMap messageGroupOwners; private MessageGroupMap messageGroupOwners;
@ -100,6 +103,10 @@ public class Queue implements Destination {
destinationStatistics.setParent(parentStats); destinationStatistics.setParent(parentStats);
this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName()); this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName());
}
public void initialize() throws Exception {
if (store != null) { if (store != null) {
// Restore the persistent messages. // Restore the persistent messages.
store.recover(new MessageRecoveryListener() { store.recover(new MessageRecoveryListener() {
@ -107,7 +114,11 @@ public class Queue implements Destination {
message.setRegionDestination(Queue.this); message.setRegionDestination(Queue.this);
MessageReference reference = createMessageReference(message); MessageReference reference = createMessageReference(message);
synchronized (messages) { synchronized (messages) {
messages.add(reference); try{
messages.addMessageLast(reference);
}catch(Exception e){
log.fatal("Failed to add message to cursor",e);
}
} }
reference.decrementReferenceCount(); reference.decrementReferenceCount();
destinationStatistics.getMessages().increment(); destinationStatistics.getMessages().increment();
@ -158,9 +169,10 @@ public class Queue implements Destination {
synchronized (messages) { synchronized (messages) {
// Add all the matching messages in the queue to the // Add all the matching messages in the queue to the
// subscription. // subscription.
for (Iterator iter = messages.iterator(); iter.hasNext();) { messages.reset();
while(messages.hasNext()) {
QueueMessageReference node = (QueueMessageReference) iter.next(); QueueMessageReference node = (QueueMessageReference) messages.next();
if (node.isDropped()) { if (node.isDropped()) {
continue; continue;
} }
@ -219,8 +231,9 @@ public class Queue implements Destination {
// lets copy the messages to dispatch to avoid deadlock // lets copy the messages to dispatch to avoid deadlock
List messagesToDispatch = new ArrayList(); List messagesToDispatch = new ArrayList();
synchronized (messages) { synchronized (messages) {
for (Iterator iter = messages.iterator(); iter.hasNext();) { messages.reset();
QueueMessageReference node = (QueueMessageReference) iter.next(); while(messages.hasNext()) {
QueueMessageReference node = (QueueMessageReference) messages.next();
if (node.isDropped()) { if (node.isDropped()) {
continue; continue;
} }
@ -314,12 +327,13 @@ public class Queue implements Destination {
public void gc() { public void gc() {
synchronized (messages) { synchronized (messages) {
for (Iterator iter = messages.iterator(); iter.hasNext();) { messages.resetForGC();
while(messages.hasNext()) {
// Remove dropped messages from the queue. // Remove dropped messages from the queue.
QueueMessageReference node = (QueueMessageReference) iter.next(); QueueMessageReference node = (QueueMessageReference) messages.next();
if (node.isDropped()) { if (node.isDropped()) {
garbageSize--; garbageSize--;
iter.remove(); messages.remove();
continue; continue;
} }
} }
@ -456,6 +470,12 @@ public class Queue implements Destination {
public void setMemoryLimit(long limit) { public void setMemoryLimit(long limit) {
getUsageManager().setLimit(limit); getUsageManager().setLimit(limit);
} }
public PendingMessageCursor getMessages(){
return this.messages;
}
public void setMessages(PendingMessageCursor messages){
this.messages=messages;
}
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
@ -470,7 +490,7 @@ public class Queue implements Destination {
try { try {
destinationStatistics.onMessageEnqueue(message); destinationStatistics.onMessageEnqueue(message);
synchronized (messages) { synchronized (messages) {
messages.add(node); messages.addMessageLast(node);
} }
synchronized (consumers) { synchronized (consumers) {
@ -509,12 +529,12 @@ public class Queue implements Destination {
} }
public Message[] browse() { public Message[] browse() {
ArrayList l = new ArrayList(); ArrayList l = new ArrayList();
synchronized (messages) { synchronized (messages) {
for (Iterator iter = messages.iterator(); iter.hasNext();) { messages.reset();
while(messages.hasNext()) {
try { try {
MessageReference r = (MessageReference) iter.next(); MessageReference r = (MessageReference) messages.next();
r.incrementReferenceCount(); r.incrementReferenceCount();
try { try {
Message m = r.getMessage(); Message m = r.getMessage();
@ -536,9 +556,10 @@ public class Queue implements Destination {
public Message getMessage(String messageId) { public Message getMessage(String messageId) {
synchronized (messages) { synchronized (messages) {
for (Iterator iter = messages.iterator(); iter.hasNext();) { messages.reset();
while(messages.hasNext()) {
try { try {
MessageReference r = (MessageReference) iter.next(); MessageReference r = (MessageReference) messages.next();
if (messageId.equals(r.getMessageId().toString())) { if (messageId.equals(r.getMessageId().toString())) {
r.incrementReferenceCount(); r.incrementReferenceCount();
try { try {
@ -563,9 +584,10 @@ public class Queue implements Destination {
public void purge() { public void purge() {
synchronized (messages) { synchronized (messages) {
ConnectionContext c = createConnectionContext(); ConnectionContext c = createConnectionContext();
for (Iterator iter = messages.iterator(); iter.hasNext();) { messages.reset();
while(messages.hasNext()) {
try { try {
QueueMessageReference r = (QueueMessageReference) iter.next(); QueueMessageReference r = (QueueMessageReference) messages.next();
// We should only delete messages that can be locked. // We should only delete messages that can be locked.
if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) { if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) {
@ -623,8 +645,9 @@ public class Queue implements Destination {
int counter = 0; int counter = 0;
synchronized (messages) { synchronized (messages) {
ConnectionContext c = createConnectionContext(); ConnectionContext c = createConnectionContext();
for (Iterator iter = messages.iterator(); iter.hasNext();) { messages.reset();
IndirectMessageReference r = (IndirectMessageReference) iter.next(); while(messages.hasNext()) {
IndirectMessageReference r = (IndirectMessageReference) messages.next();
if (filter.evaluate(c, r)) { if (filter.evaluate(c, r)) {
// We should only delete messages that can be locked. // We should only delete messages that can be locked.
if (lockMessage(r)) { if (lockMessage(r)) {
@ -672,8 +695,9 @@ public class Queue implements Destination {
public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception { public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception {
int counter = 0; int counter = 0;
synchronized (messages) { synchronized (messages) {
for (Iterator iter = messages.iterator(); iter.hasNext();) { messages.reset();
MessageReference r = (MessageReference) iter.next(); while(messages.hasNext()) {
MessageReference r = (MessageReference) messages.next();
if (filter.evaluate(context, r)) { if (filter.evaluate(context, r)) {
r.incrementReferenceCount(); r.incrementReferenceCount();
try { try {
@ -721,8 +745,9 @@ public class Queue implements Destination {
public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception { public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception {
int counter = 0; int counter = 0;
synchronized (messages) { synchronized (messages) {
for (Iterator iter = messages.iterator(); iter.hasNext();) { messages.reset();
IndirectMessageReference r = (IndirectMessageReference) iter.next(); while(messages.hasNext()) {
IndirectMessageReference r = (IndirectMessageReference) messages.next();
if (filter.evaluate(context, r)) { if (filter.evaluate(context, r)) {
// We should only move messages that can be locked. // We should only move messages that can be locked.
if (lockMessage(r)) { if (lockMessage(r)) {
@ -789,5 +814,4 @@ public class Queue implements Destination {
answer.getMessageEvaluationContext().setDestination(getActiveMQDestination()); answer.getMessageEvaluationContext().setDestination(getActiveMQDestination());
return answer; return answer;
} }
} }

View File

@ -79,7 +79,6 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
String groupId = node.getGroupID(); String groupId = node.getGroupID();
int sequence = node.getGroupSequence(); int sequence = node.getGroupSequence();
if( groupId!=null ) { if( groupId!=null ) {
MessageGroupMap messageGroupOwners = ((Queue)node.getRegionDestination()).getMessageGroupOwners(); MessageGroupMap messageGroupOwners = ((Queue)node.getRegionDestination()).getMessageGroupOwners();
// If we can own the first, then no-one else should own the rest. // If we can own the first, then no-one else should own the rest.

View File

@ -31,7 +31,9 @@ import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection; import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
@ -93,6 +95,7 @@ public class RegionBroker implements Broker {
private ConnectionContext adminConnectionContext; private ConnectionContext adminConnectionContext;
protected DestinationFactory destinationFactory; protected DestinationFactory destinationFactory;
protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap(); protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy = new VMPendingDurableSubscriberMessageStoragePolicy();
public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException { public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException {
this.brokerService = brokerService; this.brokerService = brokerService;
@ -584,4 +587,18 @@ public class RegionBroker implements Broker {
public Store getTempDataStore() { public Store getTempDataStore() {
return brokerService.getTempDataStore(); return brokerService.getTempDataStore();
} }
/**
* @return the pendingDurableSubscriberPolicy
*/
public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy(){
return this.pendingDurableSubscriberPolicy;
}
/**
* @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy to set
*/
public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy durableSubscriberCursor){
this.pendingDurableSubscriberPolicy=durableSubscriberCursor;
}
} }

View File

@ -26,6 +26,7 @@ import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionId;
@ -63,26 +64,22 @@ public class TopicRegion extends AbstractRegion {
public Subscription addConsumer(ConnectionContext context,ConsumerInfo info) throws Exception{ public Subscription addConsumer(ConnectionContext context,ConsumerInfo info) throws Exception{
if(info.isDurable()){ if(info.isDurable()){
ActiveMQDestination destination=info.getDestination(); ActiveMQDestination destination=info.getDestination();
if(!destination.isPattern()){ if(!destination.isPattern()){
// Make sure the destination is created. // Make sure the destination is created.
lookup(context,destination); lookup(context,destination);
} }
String clientId=context.getClientId(); String clientId=context.getClientId();
String subcriptionName=info.getSubcriptionName(); String subcriptionName=info.getSubcriptionName();
SubscriptionKey key=new SubscriptionKey(clientId,subcriptionName); SubscriptionKey key=new SubscriptionKey(clientId,subcriptionName);
DurableTopicSubscription sub=(DurableTopicSubscription)durableSubscriptions.get(key); DurableTopicSubscription sub=(DurableTopicSubscription)durableSubscriptions.get(key);
if(sub!=null){ if(sub!=null){
if(sub.isActive()){ if(sub.isActive()){
throw new JMSException("Durable consumer is in use for client: " + clientId + " and subscriptionName: " + subcriptionName); throw new JMSException("Durable consumer is in use for client: "+clientId+" and subscriptionName: "
+subcriptionName);
} }
// Has the selector changed?? // Has the selector changed??
if(hasDurableSubChanged(info,sub.getConsumerInfo())){ if(hasDurableSubChanged(info,sub.getConsumerInfo())){
// Remove the consumer first then add it. // Remove the consumer first then add it.
durableSubscriptions.remove(key); durableSubscriptions.remove(key);
for(Iterator iter=destinations.values().iterator();iter.hasNext();){ for(Iterator iter=destinations.values().iterator();iter.hasNext();){
@ -90,30 +87,26 @@ public class TopicRegion extends AbstractRegion {
topic.deleteSubscription(context,key); topic.deleteSubscription(context,key);
} }
super.removeConsumer(context,sub.getConsumerInfo()); super.removeConsumer(context,sub.getConsumerInfo());
super.addConsumer(context,info); super.addConsumer(context,info);
sub=(DurableTopicSubscription)durableSubscriptions.get(key); sub=(DurableTopicSubscription)durableSubscriptions.get(key);
} }else{
else {
// Change the consumer id key of the durable sub. // Change the consumer id key of the durable sub.
if(sub.getConsumerInfo().getConsumerId()!=null) if(sub.getConsumerInfo().getConsumerId()!=null)
subscriptions.remove(sub.getConsumerInfo().getConsumerId()); subscriptions.remove(sub.getConsumerInfo().getConsumerId());
subscriptions.put(info.getConsumerId(),sub); subscriptions.put(info.getConsumerId(),sub);
} }
} }else{
else {
super.addConsumer(context,info); super.addConsumer(context,info);
sub=(DurableTopicSubscription)durableSubscriptions.get(key); sub=(DurableTopicSubscription)durableSubscriptions.get(key);
if(sub==null){ if(sub==null){
throw new JMSException("Cannot use the same consumerId: " + info.getConsumerId() + " for two different durable subscriptions clientID: " throw new JMSException("Cannot use the same consumerId: "+info.getConsumerId()
+ key.getClientId() + " subscriberName: " + key.getSubscriptionName()); +" for two different durable subscriptions clientID: "+key.getClientId()
+" subscriberName: "+key.getSubscriptionName());
} }
} }
sub.activate(context,info); sub.activate(context,info);
return sub; return sub;
} }else{
else {
return super.addConsumer(context,info); return super.addConsumer(context,info);
} }
} }
@ -223,7 +216,10 @@ public class TopicRegion extends AbstractRegion {
SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubcriptionName()); SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key); DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
if(sub==null){ if(sub==null){
sub = new DurableTopicSubscription(broker,context, info, keepDurableSubsActive); PendingMessageCursor cursor=broker.getPendingDurableSubscriberPolicy().getSubscriberPendingMessageCursor(
context.getClientId(),info.getSubcriptionName(),broker.getTempDataStore(),
info.getPrefetchSize());
sub=new DurableTopicSubscription(broker,context,info,keepDurableSubsActive,cursor);
durableSubscriptions.put(key,sub); durableSubscriptions.put(key,sub);
} }
else { else {

View File

@ -1,19 +1,15 @@
/** /**
* *
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with this * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* work for additional information regarding copyright ownership. The ASF * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* licenses this file to You under the Apache License, Version 2.0 (the * License. You may obtain a copy of the License at
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * specific language governing permissions and limitations under the License.
* License for the specific language governing permissions and limitations under
* the License.
*/ */
package org.apache.activemq.broker.region.cursors; package org.apache.activemq.broker.region.cursors;
@ -24,12 +20,12 @@ import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.MessageReference;
/** /**
* Abstract method holder for pending message (messages awaiting disptach to a * Abstract method holder for pending message (messages awaiting disptach to a consumer) cursor
* consumer) cursor
* *
* @version $Revision$ * @version $Revision$
*/ */
public class AbstractPendingMessageCursor implements PendingMessageCursor{ public class AbstractPendingMessageCursor implements PendingMessageCursor{
protected int maxBatchSize=100; protected int maxBatchSize=100;
public void start() throws Exception{ public void start() throws Exception{
@ -38,12 +34,10 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor {
public void stop() throws Exception{ public void stop() throws Exception{
} }
public void add(ConnectionContext context,Destination destination) public void add(ConnectionContext context,Destination destination) throws Exception{
throws Exception{
} }
public void remove(ConnectionContext context,Destination destination) public void remove(ConnectionContext context,Destination destination) throws Exception{
throws Exception{
} }
public boolean isRecoveryRequired(){ public boolean isRecoveryRequired(){
@ -92,5 +86,10 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor {
protected void fillBatch() throws Exception{ protected void fillBatch() throws Exception{
} }
/**
* Give the cursor a hint that we are about to remove messages from memory only
*/
public void resetForGC(){
reset();
}
} }

View File

@ -113,4 +113,10 @@ public interface PendingMessageCursor extends Service{
* @param maxBatchSize * @param maxBatchSize
*/ */
public void setMaxBatchSize(int maxBatchSize); public void setMaxBatchSize(int maxBatchSize);
/**
* Give the cursor a hint that we are about to remove
* messages from memory only
*/
public void resetForGC();
} }

View File

@ -127,11 +127,6 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{
return false; return false;
} }
public synchronized void addMessageFirst(MessageReference node) throws IOException{
if(started){
throw new RuntimeException("This shouldn't be called!");
}
}
public synchronized void addMessageLast(MessageReference node) throws Exception{ public synchronized void addMessageLast(MessageReference node) throws Exception{
if(node!=null){ if(node!=null){

View File

@ -20,6 +20,7 @@ package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.TopicSubscription; import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory; import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
import org.apache.activemq.broker.region.group.MessageGroupMapFactory; import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
import org.apache.activemq.filter.DestinationMapEntry; import org.apache.activemq.filter.DestinationMapEntry;
@ -46,6 +47,7 @@ public class PolicyEntry extends DestinationMapEntry {
private MessageEvictionStrategy messageEvictionStrategy; private MessageEvictionStrategy messageEvictionStrategy;
private long memoryLimit; private long memoryLimit;
private MessageGroupMapFactory messageGroupMapFactory; private MessageGroupMapFactory messageGroupMapFactory;
private PendingQueueMessageStoragePolicy pendingQueueMessageStoragePolicy;
public void configure(Queue queue) { public void configure(Queue queue) {
if (dispatchPolicy != null) { if (dispatchPolicy != null) {
@ -58,6 +60,10 @@ public class PolicyEntry extends DestinationMapEntry {
if( memoryLimit>0 ) { if( memoryLimit>0 ) {
queue.getUsageManager().setLimit(memoryLimit); queue.getUsageManager().setLimit(memoryLimit);
} }
if (pendingQueueMessageStoragePolicy != null) {
PendingMessageCursor messages = pendingQueueMessageStoragePolicy.getQueuePendingMessageCursor();
queue.setMessages(messages);
}
} }
public void configure(Topic topic) { public void configure(Topic topic) {
@ -74,6 +80,7 @@ public class PolicyEntry extends DestinationMapEntry {
if( memoryLimit>0 ) { if( memoryLimit>0 ) {
topic.getUsageManager().setLimit(memoryLimit); topic.getUsageManager().setLimit(memoryLimit);
} }
} }
public void configure(TopicSubscription subscription) { public void configure(TopicSubscription subscription) {
@ -196,4 +203,20 @@ public class PolicyEntry extends DestinationMapEntry {
} }
/**
* @return the pendingQueueMessageStoragePolicy
*/
public PendingQueueMessageStoragePolicy getPendingQueueMessageStoragePolicy(){
return this.pendingQueueMessageStoragePolicy;
}
/**
* @param pendingQueueMessageStoragePolicy the pendingQueueMessageStoragePolicy to set
*/
public void setPendingQueueMessageStoragePolicy(PendingQueueMessageStoragePolicy pendingQueueMessageStoragePolicy){
this.pendingQueueMessageStoragePolicy=pendingQueueMessageStoragePolicy;
}
} }

View File

@ -20,6 +20,7 @@ package org.apache.activemq.broker;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
@ -218,6 +219,13 @@ public class StubBroker implements Broker {
} }
public void stop() throws Exception { public void stop() throws Exception {
}
public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
return null;
}
public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
} }
public Store getTempDataStore() { public Store getTempDataStore() {

View File

@ -39,6 +39,7 @@ import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter; import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -51,7 +52,7 @@ public class CursorDurableTest extends TestCase{
protected static final Log log = LogFactory.getLog(CursorDurableTest.class); protected static final Log log = LogFactory.getLog(CursorDurableTest.class);
protected static final int MESSAGE_COUNT=50; protected static final int MESSAGE_COUNT=100;
protected static final int PREFETCH_SIZE = 5; protected static final int PREFETCH_SIZE = 5;
protected BrokerService broker; protected BrokerService broker;
protected String bindAddress="tcp://localhost:60706"; protected String bindAddress="tcp://localhost:60706";
@ -138,7 +139,10 @@ public class CursorDurableTest extends TestCase{
for (int i =MESSAGE_COUNT/10; i < MESSAGE_COUNT; i++) { for (int i =MESSAGE_COUNT/10; i < MESSAGE_COUNT; i++) {
TextMessage msg=session.createTextMessage("test"+i); TextMessage msg=session.createTextMessage("test"+i);
senderList.add(msg); senderList.add(msg);
producer.send(msg); producer.send(msg);
} }
@ -204,11 +208,13 @@ public class CursorDurableTest extends TestCase{
BrokerService answer=new BrokerService(); BrokerService answer=new BrokerService();
configureBroker(answer); configureBroker(answer);
answer.setDeleteAllMessagesOnStartup(true); answer.setDeleteAllMessagesOnStartup(true);
answer.setPendingDurableSubscriberPolicy(new StorePendingDurableSubscriberMessageStoragePolicy());
answer.start(); answer.start();
return answer; return answer;
} }
protected void configureBroker(BrokerService answer) throws Exception{ protected void configureBroker(BrokerService answer) throws Exception{
answer.addConnector(bindAddress); answer.addConnector(bindAddress);
answer.setDeleteAllMessagesOnStartup(true); answer.setDeleteAllMessagesOnStartup(true);
} }

View File

@ -35,6 +35,6 @@ public class KahaCursorDurableTest extends CursorDurableTest{
KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new File("activemq-data/durableTest")); KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new File("activemq-data/durableTest"));
answer.setPersistenceAdapter(adaptor); answer.setPersistenceAdapter(adaptor);
answer.addConnector(bindAddress); answer.addConnector(bindAddress);
answer.setDeleteAllMessagesOnStartup(true); //answer.setDeleteAllMessagesOnStartup(true);
} }
} }