git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@650143 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-04-21 13:41:19 +00:00
parent 4a6699e33b
commit 16f2d753e8
23 changed files with 113 additions and 82 deletions

View File

@ -1235,6 +1235,10 @@ public class BrokerService implements Service {
return getBroker().addDestination(getAdminConnectionContext(), destination);
}
public void removeDestination(ActiveMQDestination destination) throws Exception {
getBroker().removeDestination(getAdminConnectionContext(), destination,0);
}
public int getProducerSystemUsagePortion() {
return producerSystemUsagePortion;
}

View File

@ -183,6 +183,10 @@ public abstract class AbstractRegion implements Region {
}
destinationMap.removeAll(destination);
dispose(context,dest);
DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
if (destinationInterceptor != null) {
destinationInterceptor.remove(dest);
}
} else {
LOG.debug("Destination doesn't exist: " + dest);

View File

@ -17,15 +17,12 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.management.ObjectName;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
@ -44,7 +41,6 @@ public abstract class AbstractSubscription implements Subscription {
private static final Log LOG = LogFactory.getLog(AbstractSubscription.class);
protected Broker broker;
protected Destination destination;
protected ConnectionContext context;
protected ConsumerInfo info;
protected final DestinationFilter destinationFilter;
@ -53,9 +49,8 @@ public abstract class AbstractSubscription implements Subscription {
private ObjectName objectName;
public AbstractSubscription(Broker broker, Destination destination,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
this.broker = broker;
this.destination=destination;
this.context = context;
this.info = info;
this.destinationFilter = DestinationFilter.parseFilter(info.getDestination());

View File

@ -35,5 +35,12 @@ public class CompositeDestinationInterceptor implements DestinationInterceptor {
}
return destination;
}
public void remove(Destination destination) {
for (int i = 0; i < interceptors.length; i++) {
interceptors[i].remove(destination);
}
}
}

View File

@ -19,7 +19,6 @@ package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
@ -30,7 +29,6 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
/**
*

View File

@ -25,5 +25,7 @@ package org.apache.activemq.broker.region;
public interface DestinationInterceptor {
Destination intercept(Destination destination);
void remove(Destination destination);
}

View File

@ -49,23 +49,14 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
private final boolean keepDurableSubsActive;
private boolean active;
public DurableTopicSubscription(Broker broker, Destination dest,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
throws JMSException {
super(broker,dest,usageManager, context, info);
super(broker,usageManager, context, info);
this.pending = new StoreDurableSubscriberCursor(broker,context.getClientId(), info.getSubscriptionName(), info.getPrefetchSize(), this);
this.pending.setSystemUsage(usageManager);
this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
if (dest != null && dest.getMessageStore() != null) {
TopicMessageStore store = (TopicMessageStore)dest.getMessageStore();
try {
this.enqueueCounter=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName());
} catch (IOException e) {
JMSException jmsEx = new JMSException("Failed to retrieve eunqueueCount from store "+ e);
jmsEx.setLinkedException(e);
throw jmsEx;
}
}
}
public boolean isActive() {
@ -82,6 +73,16 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
public void add(ConnectionContext context, Destination destination) throws Exception {
super.add(context, destination);
destinations.put(destination.getActiveMQDestination(), destination);
if (destination.getMessageStore() != null) {
TopicMessageStore store = (TopicMessageStore)destination.getMessageStore();
try {
this.enqueueCounter+=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName());
} catch (IOException e) {
JMSException jmsEx = new JMSException("Failed to retrieve eunqueueCount from store "+ e);
jmsEx.setLinkedException(e);
throw jmsEx;
}
}
if (active || keepDurableSubsActive) {
Topic topic = (Topic)destination;
topic.activate(context, this);

View File

@ -66,14 +66,14 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
private final Object dispatchLock = new Object();
protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
public PrefetchSubscription(Broker broker,Destination destination, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
super(broker,destination, context, info);
public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
super(broker,context, info);
this.usageManager=usageManager;
pending = cursor;
}
public PrefetchSubscription(Broker broker,Destination destination, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
this(broker,destination,usageManager,context, info, new VMPendingMessageCursor());
public PrefetchSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
this(broker,usageManager,context, info, new VMPendingMessageCursor());
}
/**
@ -168,9 +168,10 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
+ mdn.getMessageId() + ") was not in the pending list");
}
public void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
public final void acknowledge(final ConnectionContext context,final MessageAck ack) throws Exception {
// Handle the standard acknowledgment case.
boolean callDispatchMatched = false;
Destination destination = null;
synchronized(dispatchLock) {
if (ack.isStandardAck()) {
// Acknowledge all dispatched messages up till the message id of
@ -233,6 +234,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
prefetchExtension = Math.max(0,
prefetchExtension - (index + 1));
}
destination = node.getRegionDestination();
callDispatchMatched = true;
break;
}
@ -268,6 +270,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
}
if (ack.getLastMessageId().equals(node.getMessageId())) {
prefetchExtension = Math.max(prefetchExtension, index + 1);
destination = node.getRegionDestination();
callDispatchMatched = true;
break;
}
@ -294,6 +297,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (inAckRange) {
node.incrementRedeliveryCounter();
if (ack.getLastMessageId().equals(messageId)) {
destination = node.getRegionDestination();
callDispatchMatched = true;
break;
}
@ -335,6 +339,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (ack.getLastMessageId().equals(messageId)) {
prefetchExtension = Math.max(0, prefetchExtension
- (index + 1));
destination = node.getRegionDestination();
callDispatchMatched = true;
break;
}
@ -350,7 +355,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
}
}
}
if (callDispatchMatched) {
if (callDispatchMatched && destination != null) {
if (destination.isLazyDispatch()) {
destination.wakeup();
}

View File

@ -17,9 +17,7 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
import javax.jms.InvalidSelectorException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ConsumerInfo;
@ -33,9 +31,9 @@ public class QueueBrowserSubscription extends QueueSubscription {
boolean browseDone;
boolean destinationsAdded;
public QueueBrowserSubscription(Broker broker,Destination destination, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info)
public QueueBrowserSubscription(Broker broker,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info)
throws InvalidSelectorException {
super(broker,destination,usageManager, context, info);
super(broker,usageManager, context, info);
}
protected boolean canDispatch(MessageReference node) {

View File

@ -46,18 +46,11 @@ public class QueueRegion extends AbstractRegion {
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info)
throws JMSException {
Destination dest = null;
try {
dest = lookup(context, info.getDestination());
} catch (Exception e) {
JMSException jmsEx = new JMSException("Failed to retrieve destination from region "+ e);
jmsEx.setLinkedException(e);
throw jmsEx;
}
if (info.isBrowser()) {
return new QueueBrowserSubscription(broker,dest,usageManager, context, info);
return new QueueBrowserSubscription(broker,usageManager, context, info);
} else {
return new QueueSubscription(broker, dest,usageManager,context, info);
return new QueueSubscription(broker, usageManager,context, info);
}
}

View File

@ -17,10 +17,8 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.group.MessageGroupMap;
@ -28,7 +26,6 @@ import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -37,8 +34,8 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
private static final Log LOG = LogFactory.getLog(QueueSubscription.class);
public QueueSubscription(Broker broker, Destination destination,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
super(broker,destination,usageManager, context, info);
public QueueSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
super(broker,usageManager, context, info);
}
/**

View File

@ -50,18 +50,10 @@ public class TempQueueRegion extends AbstractTempRegion {
}
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
Destination dest=null;
try {
dest = lookup(context, info.getDestination());
} catch (Exception e) {
JMSException jmsEx = new JMSException("Failed to retrieve destination from region "+ e);
jmsEx.setLinkedException(e);
throw jmsEx;
}
if (info.isBrowser()) {
return new QueueBrowserSubscription(broker,dest,usageManager,context, info);
return new QueueBrowserSubscription(broker,usageManager,context, info);
} else {
return new QueueSubscription(broker,dest, usageManager,context, info);
return new QueueSubscription(broker,usageManager,context, info);
}
}

View File

@ -47,9 +47,7 @@ public class TempTopicRegion extends AbstractTempRegion {
throw new JMSException("A durable subscription cannot be created for a temporary topic.");
}
try {
Destination dest = lookup(context, info.getDestination());
TopicSubscription answer = new TopicSubscription(broker, dest,context, info, usageManager);
TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager);
// lets configure the subscription depending on the destination
ActiveMQDestination destination = info.getDestination();
if (destination != null && broker.getDestinationPolicy() != null) {

View File

@ -228,14 +228,7 @@ public class TopicRegion extends AbstractRegion {
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
ActiveMQDestination destination = info.getDestination();
Destination dest=null;
try {
dest = lookup(context, destination);
} catch (Exception e) {
JMSException jmsEx = new JMSException("Failed to retrieve destination from region "+ e);
jmsEx.setLinkedException(e);
throw jmsEx;
}
if (info.isDurable()) {
if (AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
throw new JMSException("Cannot create a durable subscription for an advisory Topic");
@ -245,7 +238,7 @@ public class TopicRegion extends AbstractRegion {
if (sub == null) {
sub = new DurableTopicSubscription(broker,dest, usageManager, context, info, keepDurableSubsActive);
sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive);
if (destination != null && broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) {
@ -259,7 +252,7 @@ public class TopicRegion extends AbstractRegion {
return sub;
}
try {
TopicSubscription answer = new TopicSubscription(broker, dest,context, info, usageManager);
TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager);
// lets configure the subscription depending on the destination
if (destination != null && broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);

View File

@ -20,11 +20,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
@ -65,8 +62,8 @@ public class TopicSubscription extends AbstractSubscription {
private final AtomicLong dequeueCounter = new AtomicLong(0);
private int memoryUsageHighWaterMark = 95;
public TopicSubscription(Broker broker, Destination destination,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
super(broker, destination,context, info);
public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
super(broker, context, info);
this.usageManager = usageManager;
String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
if (info.getDestination().isTemporary() || broker == null || broker.getTempDataStore()==null ) {

View File

@ -34,6 +34,10 @@ public abstract class CompositeDestination implements VirtualDestination {
public Destination intercept(Destination destination) {
return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(), isCopyMessage());
}
public void remove(Destination destination) {
}
public String getName() {
return name;

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.region.virtual;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;

View File

@ -67,6 +67,22 @@ public class MirroredQueue implements DestinationInterceptor, BrokerServiceAware
}
return destination;
}
public void remove(Destination destination) {
if (brokerService == null) {
throw new IllegalArgumentException("No brokerService injected!");
}
ActiveMQDestination topic = getMirrorTopic(destination.getActiveMQDestination());
if (topic != null) {
try {
brokerService.removeDestination(topic);
} catch (Exception e) {
LOG.error("Failed to remove mirror destination for " + destination + ". Reason: " + e,e);
}
}
}
// Properties
// -------------------------------------------------------------------------
@ -124,4 +140,5 @@ public class MirroredQueue implements DestinationInterceptor, BrokerServiceAware
protected ActiveMQDestination getMirrorTopic(ActiveMQDestination original) {
return new ActiveMQTopic(prefix + original.getPhysicalName() + postfix);
}
}

View File

@ -40,7 +40,7 @@ public class VirtualDestinationInterceptor implements DestinationInterceptor {
private DestinationMap destinationMap = new DestinationMap();
private VirtualDestination[] virtualDestinations;
public Destination intercept(Destination destination) {
public synchronized Destination intercept(Destination destination) {
Set virtualDestinations = destinationMap.get(destination.getActiveMQDestination());
List<Destination> destinations = new ArrayList<Destination>();
for (Iterator iter = virtualDestinations.iterator(); iter.hasNext();) {
@ -58,6 +58,10 @@ public class VirtualDestinationInterceptor implements DestinationInterceptor {
}
return destination;
}
public synchronized void remove(Destination destination) {
}
public VirtualDestination[] getVirtualDestinations() {
return virtualDestinations;

View File

@ -46,6 +46,10 @@ public class VirtualTopic implements VirtualDestination {
return new VirtualTopicInterceptor(destination, getPrefix(), getPostfix());
}
public void remove(Destination destination) {
}
// Properties
// -------------------------------------------------------------------------

View File

@ -25,6 +25,7 @@ import javax.jms.TemporaryTopic;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ActiveMQDestination;
/**
* @version $Revision: 397249 $
@ -56,9 +57,9 @@ public class TempDestLoadTest extends EmbeddedBrokerTestSupport {
RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor(
RegionBroker.class);
//there should be 3 destinations - advisories -
//1 for the connection + 2 generic ones
assertTrue(rb.getDestinationMap().size()==3);
//there should be 2 destinations - advisories -
//1 for the connection + 1 generic ones
assertTrue(rb.getDestinationMap().size()==2);
}
public void testLoadTempAdvisoryTopics() throws Exception {
@ -78,9 +79,9 @@ public class TempDestLoadTest extends EmbeddedBrokerTestSupport {
assertTrue(ab.getAdvisoryProducers().size() == 0);
RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor(
RegionBroker.class);
//there should be 3 destinations - advisories -
//1 for the connection + 2 generic ones
assertTrue(rb.getDestinationMap().size()==3);
//there should be 2 destinations - advisories -
//1 for the connection + 1 generic ones
assertTrue(rb.getDestinationMap().size()==2);
}

View File

@ -86,7 +86,7 @@ public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport {
//serverDestination +
assertTrue(rb.getDestinationMap().size()==7);
assertTrue(rb.getDestinationMap().size()==6);
}
protected void setUp() throws Exception {

View File

@ -21,9 +21,11 @@ import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.spring.ConsumerBean;
@ -72,6 +74,20 @@ public class MirroredQueueTest extends EmbeddedBrokerTestSupport {
LOG.info("Received: " + messageList);
}
public void testTempMirroredQueuesClearDown() throws Exception{
if (connection == null) {
connection = createConnection();
}
connection.start();
Session session = connection.createSession(false, 0);
TemporaryQueue tempQueue = session.createTemporaryQueue();
RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor(
RegionBroker.class);
assertTrue(rb.getDestinationMap().size()==4);
tempQueue.delete();
assertTrue(rb.getDestinationMap().size()==3);
}
protected Destination createConsumeDestination() {
return new ActiveMQTopic("VirtualTopic.Mirror." + getQueueName());