diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index 38f69b2317..77caa721a8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.region; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -83,7 +84,7 @@ public abstract class AbstractRegion implements Region { this.destinationFactory = destinationFactory; } - public final void start() throws Exception { + public final void start() throws Exception { started = true; Set inactiveDests = getInactiveDestinations(); @@ -182,8 +183,7 @@ public abstract class AbstractRegion implements Region { } destinationMap.removeAll(destination); - dest.dispose(context); - dest.stop(); + dispose(context,dest); } else { LOG.debug("Destination doesn't exist: " + dest); @@ -334,8 +334,15 @@ public abstract class AbstractRegion implements Region { Subscription sub = consumerExchange.getSubscription(); if (sub == null) { sub = subscriptions.get(ack.getConsumerId()); + if (sub == null) { - throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId()); + //networked subscriptions are going to acknowledge in flight messages + //on behalf a subscription that is no more ... + if (!consumerExchange.getConnectionContext().isNetworkConnection()) { + throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId()); + }else { + return; + } } consumerExchange.setSubscription(sub); } @@ -427,7 +434,9 @@ public abstract class AbstractRegion implements Region { dest.removeProducer(context, info); } } - - - + + protected void dispose(ConnectionContext context,Destination dest) throws Exception { + dest.dispose(context); + dest.stop(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java new file mode 100644 index 0000000000..e0f7f15ebe --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractTempRegion.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.thread.TaskRunnerFactory; +import org.apache.activemq.usage.SystemUsage; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * + */ +public abstract class AbstractTempRegion extends AbstractRegion { + private static int TIME_BEFORE_PURGE = 60000; + private static final Log LOG = LogFactory.getLog(TempQueueRegion.class); + private Map cachedDestinations = new ConcurrentHashMap(); + private final Timer purgeTimer; + private final TimerTask purgeTask; + /** + * @param broker + * @param destinationStatistics + * @param memoryManager + * @param taskRunnerFactory + * @param destinationFactory + */ + public AbstractTempRegion(RegionBroker broker, + DestinationStatistics destinationStatistics, + SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, + DestinationFactory destinationFactory) { + super(broker, destinationStatistics, memoryManager, taskRunnerFactory, + destinationFactory); + this.purgeTimer = new Timer(true); + this.purgeTask = new TimerTask() { + public void run() { + doPurge(); + } + + }; + this.purgeTimer.schedule(purgeTask, TIME_BEFORE_PURGE,TIME_BEFORE_PURGE); + } + + + public void stop() throws Exception { + super.stop(); + if (purgeTimer != null) { + purgeTimer.cancel(); + } + } + + protected abstract Destination doCreateDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception; + + protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { + Destination result = cachedDestinations.remove(new CachedDestination(destination)); + if (result==null) { + result = doCreateDestination(context, destination); + } + return result; + } + + protected final void dispose(ConnectionContext context,Destination dest) throws Exception { + //add to cache + cachedDestinations.put(new CachedDestination(dest.getActiveMQDestination()), dest); + } + + private void doDispose(Destination dest) { + ConnectionContext context = new ConnectionContext(); + try { + dest.dispose(context); + dest.stop(); + } catch (Exception e) { + LOG.warn("Failed to dispose of " + dest,e); + } + + } + + private void doPurge() { + long currentTime = System.currentTimeMillis(); + if (cachedDestinations.size() > 0) { + Set tmp = new HashSet(cachedDestinations.keySet()); + for(CachedDestination key: tmp) { + if ((key.timeStamp + TIME_BEFORE_PURGE) < currentTime) { + Destination dest = cachedDestinations.remove(key); + if (dest != null) { + doDispose(dest); + } + } + } + } + } + + static class CachedDestination{ + long timeStamp; + ActiveMQDestination destination; + + CachedDestination(ActiveMQDestination destination){ + this.destination=destination; + this.timeStamp=System.currentTimeMillis(); + } + + public int hashCode() { + return destination.hashCode(); + } + + public boolean equals(Object o) { + if (o instanceof ActiveMQDestination) { + CachedDestination other = (CachedDestination) o; + return other.destination.equals(this.destination); + } + return false; + } + + } + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java index 2bc61492cb..e9aed1cc21 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java @@ -19,18 +19,22 @@ package org.apache.activemq.broker.region; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; +import org.apache.activemq.broker.Connection; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTempDestination; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.usage.SystemUsage; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * @version $Revision: 1.7 $ */ -public class TempQueueRegion extends AbstractRegion { - +public class TempQueueRegion extends AbstractTempRegion { + private static final Log LOG = LogFactory.getLog(TempQueueRegion.class); + public TempQueueRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); @@ -39,20 +43,26 @@ public class TempQueueRegion extends AbstractRegion { // setAutoCreateDestinations(false); } - protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { + protected Destination doCreateDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination; return new Queue(broker.getRoot(), destination, usageManager, null, destinationStatistics, taskRunnerFactory) { public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { - // Only consumers on the same connection can consume from // the temporary destination - if (!context.isNetworkConnection() && !tempDest.getConnectionId().equals(sub.getConsumerInfo().getConsumerId().getConnectionId())) { - throw new JMSException("Cannot subscribe to remote temporary destination: " + tempDest); + // However, we could have failed over - and we do this + // check client side anyways .... + if (!context.isFaultTolerant() + && (!context.isNetworkConnection() && !tempDest + .getConnectionId().equals( + sub.getConsumerInfo().getConsumerId() + .getConnectionId()))) { + + tempDest.setConnectionId(sub.getConsumerInfo().getConsumerId().getConnectionId()); + LOG.debug(" changed ownership of " + this + " to "+ tempDest.getConnectionId()); } super.addSubscription(context, sub); }; - }; } @@ -79,5 +89,4 @@ public class TempQueueRegion extends AbstractRegion { super.removeDestination(context, destination, timeout); } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java index b077ff4786..9ce31efffd 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java @@ -30,7 +30,7 @@ import org.apache.commons.logging.LogFactory; /** * @version $Revision: 1.7 $ */ -public class TempTopicRegion extends AbstractRegion { +public class TempTopicRegion extends AbstractTempRegion { private static final Log LOG = LogFactory.getLog(TempTopicRegion.class); @@ -81,4 +81,10 @@ public class TempTopicRegion extends AbstractRegion { super.removeDestination(context, destination, timeout); } + + + protected Destination doCreateDestination(ConnectionContext context, + ActiveMQDestination destination) throws Exception { + return destinationFactory.createDestination(context, destination, destinationStatistics); + } }