mirror of https://github.com/apache/activemq.git
Added properties for caching of temp destinations
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@636724 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e4621e3aa7
commit
c451951148
|
@ -31,14 +31,18 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
*/
|
||||
public abstract class AbstractTempRegion extends AbstractRegion {
|
||||
private static int TIME_BEFORE_PURGE = 60000;
|
||||
private static final Log LOG = LogFactory.getLog(TempQueueRegion.class);
|
||||
private Map<CachedDestination,Destination> cachedDestinations = new HashMap<CachedDestination,Destination>();
|
||||
private final Timer purgeTimer;
|
||||
private final TimerTask purgeTask;
|
||||
|
||||
private Map<CachedDestination, Destination> cachedDestinations = new HashMap<CachedDestination, Destination>();
|
||||
private final boolean doCacheTempDestinations;
|
||||
private final int purgeTime;
|
||||
private Timer purgeTimer;
|
||||
private TimerTask purgeTask;
|
||||
|
||||
|
||||
/**
|
||||
* @param broker
|
||||
* @param destinationStatistics
|
||||
|
@ -52,56 +56,70 @@ public abstract class AbstractTempRegion extends AbstractRegion {
|
|||
DestinationFactory destinationFactory) {
|
||||
super(broker, destinationStatistics, memoryManager, taskRunnerFactory,
|
||||
destinationFactory);
|
||||
this.purgeTimer = new Timer(true);
|
||||
this.purgeTask = new TimerTask() {
|
||||
public void run() {
|
||||
doPurge();
|
||||
}
|
||||
|
||||
};
|
||||
this.purgeTimer.schedule(purgeTask, TIME_BEFORE_PURGE,TIME_BEFORE_PURGE);
|
||||
}
|
||||
this.doCacheTempDestinations=broker.getBrokerService().isCacheTempDestinations();
|
||||
this.purgeTime = broker.getBrokerService().getTimeBeforePurgeTempDestinations();
|
||||
if (this.doCacheTempDestinations) {
|
||||
this.purgeTimer = new Timer(true);
|
||||
this.purgeTask = new TimerTask() {
|
||||
public void run() {
|
||||
doPurge();
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
this.purgeTimer.schedule(purgeTask, purgeTime, purgeTime);
|
||||
}
|
||||
|
||||
public void stop() throws Exception {
|
||||
super.stop();
|
||||
if (purgeTimer != null) {
|
||||
purgeTimer.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract Destination doCreateDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception;
|
||||
|
||||
protected synchronized Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
|
||||
Destination result = cachedDestinations.remove(new CachedDestination(destination));
|
||||
if (result==null) {
|
||||
protected abstract Destination doCreateDestination(
|
||||
ConnectionContext context, ActiveMQDestination destination)
|
||||
throws Exception;
|
||||
|
||||
protected synchronized Destination createDestination(
|
||||
ConnectionContext context, ActiveMQDestination destination)
|
||||
throws Exception {
|
||||
Destination result = cachedDestinations.remove(new CachedDestination(
|
||||
destination));
|
||||
if (result == null) {
|
||||
result = doCreateDestination(context, destination);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
protected final synchronized void dispose(ConnectionContext context,Destination dest) throws Exception {
|
||||
//add to cache
|
||||
cachedDestinations.put(new CachedDestination(dest.getActiveMQDestination()), dest);
|
||||
|
||||
protected final synchronized void dispose(ConnectionContext context,
|
||||
Destination dest) throws Exception {
|
||||
// add to cache
|
||||
if (this.doCacheTempDestinations) {
|
||||
cachedDestinations.put(new CachedDestination(dest
|
||||
.getActiveMQDestination()), dest);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void doDispose(Destination dest) {
|
||||
ConnectionContext context = new ConnectionContext();
|
||||
try {
|
||||
dest.dispose(context);
|
||||
dest.stop();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to dispose of " + dest,e);
|
||||
LOG.warn("Failed to dispose of " + dest, e);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
private synchronized void doPurge() {
|
||||
long currentTime = System.currentTimeMillis();
|
||||
if (cachedDestinations.size() > 0) {
|
||||
Set<CachedDestination> tmp = new HashSet<CachedDestination>(cachedDestinations.keySet());
|
||||
for(CachedDestination key: tmp) {
|
||||
if ((key.timeStamp + TIME_BEFORE_PURGE) < currentTime) {
|
||||
Set<CachedDestination> tmp = new HashSet<CachedDestination>(
|
||||
cachedDestinations.keySet());
|
||||
for (CachedDestination key : tmp) {
|
||||
if ((key.timeStamp + purgeTime) < currentTime) {
|
||||
Destination dest = cachedDestinations.remove(key);
|
||||
if (dest != null) {
|
||||
doDispose(dest);
|
||||
|
@ -110,20 +128,21 @@ public abstract class AbstractTempRegion extends AbstractRegion {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
static class CachedDestination{
|
||||
|
||||
static class CachedDestination {
|
||||
long timeStamp;
|
||||
|
||||
ActiveMQDestination destination;
|
||||
|
||||
CachedDestination(ActiveMQDestination destination){
|
||||
this.destination=destination;
|
||||
this.timeStamp=System.currentTimeMillis();
|
||||
|
||||
CachedDestination(ActiveMQDestination destination) {
|
||||
this.destination = destination;
|
||||
this.timeStamp = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
|
||||
public int hashCode() {
|
||||
return destination.hashCode();
|
||||
}
|
||||
|
||||
|
||||
public boolean equals(Object o) {
|
||||
if (o instanceof CachedDestination) {
|
||||
CachedDestination other = (CachedDestination) o;
|
||||
|
@ -131,7 +150,7 @@ public abstract class AbstractTempRegion extends AbstractRegion {
|
|||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue