Introduced AbstractTempRegion - to cater for temp destination

usage on  failover

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@631237 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-02-26 14:50:43 +00:00
parent 16b4009348
commit cd24f8099f
4 changed files with 177 additions and 16 deletions

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@ -182,8 +183,7 @@ public abstract class AbstractRegion implements Region {
}
destinationMap.removeAll(destination);
dest.dispose(context);
dest.stop();
dispose(context,dest);
} else {
LOG.debug("Destination doesn't exist: " + dest);
@ -334,8 +334,15 @@ public abstract class AbstractRegion implements Region {
Subscription sub = consumerExchange.getSubscription();
if (sub == null) {
sub = subscriptions.get(ack.getConsumerId());
if (sub == null) {
//networked subscriptions are going to acknowledge in flight messages
//on behalf a subscription that is no more ...
if (!consumerExchange.getConnectionContext().isNetworkConnection()) {
throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId());
}else {
return;
}
}
consumerExchange.setSubscription(sub);
}
@ -428,6 +435,8 @@ public abstract class AbstractRegion implements Region {
}
}
protected void dispose(ConnectionContext context,Destination dest) throws Exception {
dest.dispose(context);
dest.stop();
}
}

View File

@ -0,0 +1,137 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
*
*/
public abstract class AbstractTempRegion extends AbstractRegion {
private static int TIME_BEFORE_PURGE = 60000;
private static final Log LOG = LogFactory.getLog(TempQueueRegion.class);
private Map<CachedDestination,Destination> cachedDestinations = new ConcurrentHashMap<CachedDestination,Destination>();
private final Timer purgeTimer;
private final TimerTask purgeTask;
/**
* @param broker
* @param destinationStatistics
* @param memoryManager
* @param taskRunnerFactory
* @param destinationFactory
*/
public AbstractTempRegion(RegionBroker broker,
DestinationStatistics destinationStatistics,
SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
super(broker, destinationStatistics, memoryManager, taskRunnerFactory,
destinationFactory);
this.purgeTimer = new Timer(true);
this.purgeTask = new TimerTask() {
public void run() {
doPurge();
}
};
this.purgeTimer.schedule(purgeTask, TIME_BEFORE_PURGE,TIME_BEFORE_PURGE);
}
public void stop() throws Exception {
super.stop();
if (purgeTimer != null) {
purgeTimer.cancel();
}
}
protected abstract Destination doCreateDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception;
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
Destination result = cachedDestinations.remove(new CachedDestination(destination));
if (result==null) {
result = doCreateDestination(context, destination);
}
return result;
}
protected final void dispose(ConnectionContext context,Destination dest) throws Exception {
//add to cache
cachedDestinations.put(new CachedDestination(dest.getActiveMQDestination()), dest);
}
private void doDispose(Destination dest) {
ConnectionContext context = new ConnectionContext();
try {
dest.dispose(context);
dest.stop();
} catch (Exception e) {
LOG.warn("Failed to dispose of " + dest,e);
}
}
private void doPurge() {
long currentTime = System.currentTimeMillis();
if (cachedDestinations.size() > 0) {
Set<CachedDestination> tmp = new HashSet<CachedDestination>(cachedDestinations.keySet());
for(CachedDestination key: tmp) {
if ((key.timeStamp + TIME_BEFORE_PURGE) < currentTime) {
Destination dest = cachedDestinations.remove(key);
if (dest != null) {
doDispose(dest);
}
}
}
}
}
static class CachedDestination{
long timeStamp;
ActiveMQDestination destination;
CachedDestination(ActiveMQDestination destination){
this.destination=destination;
this.timeStamp=System.currentTimeMillis();
}
public int hashCode() {
return destination.hashCode();
}
public boolean equals(Object o) {
if (o instanceof ActiveMQDestination) {
CachedDestination other = (CachedDestination) o;
return other.destination.equals(this.destination);
}
return false;
}
}
}

View File

@ -19,17 +19,21 @@ package org.apache.activemq.broker.region;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* @version $Revision: 1.7 $
*/
public class TempQueueRegion extends AbstractRegion {
public class TempQueueRegion extends AbstractTempRegion {
private static final Log LOG = LogFactory.getLog(TempQueueRegion.class);
public TempQueueRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
@ -39,20 +43,26 @@ public class TempQueueRegion extends AbstractRegion {
// setAutoCreateDestinations(false);
}
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
protected Destination doCreateDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
return new Queue(broker.getRoot(), destination, usageManager, null, destinationStatistics, taskRunnerFactory) {
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
// Only consumers on the same connection can consume from
// the temporary destination
if (!context.isNetworkConnection() && !tempDest.getConnectionId().equals(sub.getConsumerInfo().getConsumerId().getConnectionId())) {
throw new JMSException("Cannot subscribe to remote temporary destination: " + tempDest);
// However, we could have failed over - and we do this
// check client side anyways ....
if (!context.isFaultTolerant()
&& (!context.isNetworkConnection() && !tempDest
.getConnectionId().equals(
sub.getConsumerInfo().getConsumerId()
.getConnectionId()))) {
tempDest.setConnectionId(sub.getConsumerInfo().getConsumerId().getConnectionId());
LOG.debug(" changed ownership of " + this + " to "+ tempDest.getConnectionId());
}
super.addSubscription(context, sub);
};
};
}
@ -79,5 +89,4 @@ public class TempQueueRegion extends AbstractRegion {
super.removeDestination(context, destination, timeout);
}
}

View File

@ -30,7 +30,7 @@ import org.apache.commons.logging.LogFactory;
/**
* @version $Revision: 1.7 $
*/
public class TempTopicRegion extends AbstractRegion {
public class TempTopicRegion extends AbstractTempRegion {
private static final Log LOG = LogFactory.getLog(TempTopicRegion.class);
@ -81,4 +81,10 @@ public class TempTopicRegion extends AbstractRegion {
super.removeDestination(context, destination, timeout);
}
protected Destination doCreateDestination(ConnectionContext context,
ActiveMQDestination destination) throws Exception {
return destinationFactory.createDestination(context, destination, destinationStatistics);
}
}