git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@963095 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2010-07-11 16:08:15 +00:00
parent 26430893ae
commit 2705e3606f
5 changed files with 190 additions and 64 deletions

View File

@ -195,6 +195,8 @@ public class BrokerService implements Service {
private Scheduler scheduler;
private ThreadPoolExecutor executor;
private boolean slave = true;
private int schedulePeriodForDestinationPurge=5000;
static {
String localHostName = "localhost";
@ -2303,5 +2305,13 @@ public class BrokerService implements Service {
public void setSchedulerDirectory(String schedulerDirectory) {
setSchedulerDirectoryFile(new File(schedulerDirectory));
}
public int getSchedulePeriodForDestinationPurge() {
return this.schedulePeriodForDestinationPurge;
}
public void setSchedulePeriodForDestinationPurge(int schedulePeriodForDestinationPurge) {
this.schedulePeriodForDestinationPurge = schedulePeriodForDestinationPurge;
}
}

View File

@ -48,6 +48,7 @@ public abstract class BaseDestination implements Destination {
public static final int MAX_PAGE_SIZE = 200;
public static final int MAX_BROWSE_PAGE_SIZE = MAX_PAGE_SIZE * 2;
public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000;
public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000;
protected final ActiveMQDestination destination;
protected final Broker broker;
protected final MessageStore store;
@ -82,6 +83,9 @@ public abstract class BaseDestination implements Destination {
protected int storeUsageHighWaterMark = 100;
private SlowConsumerStrategy slowConsumerStrategy;
private boolean prioritizedMessages;
private long inactiveTimoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
private boolean gcIfInactive;
private long lastActiveTime=0l;
/**
* @param broker
@ -196,11 +200,22 @@ public abstract class BaseDestination implements Destination {
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
destinationStatistics.getProducers().increment();
this.lastActiveTime=0l;
}
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
destinationStatistics.getProducers().decrement();
}
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{
destinationStatistics.getConsumers().increment();
this.lastActiveTime=0l;
}
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{
destinationStatistics.getConsumers().decrement();
}
public final MemoryUsage getMemoryUsage() {
return memoryUsage;
@ -595,5 +610,50 @@ public abstract class BaseDestination implements Destination {
public void setPrioritizedMessages(boolean prioritizedMessages) {
this.prioritizedMessages = prioritizedMessages;
}
/**
* @return the inactiveTimoutBeforeGC
*/
public long getInactiveTimoutBeforeGC() {
return this.inactiveTimoutBeforeGC;
}
/**
* @param inactiveTimoutBeforeGC the inactiveTimoutBeforeGC to set
*/
public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) {
this.inactiveTimoutBeforeGC = inactiveTimoutBeforeGC;
}
/**
* @return the gcIfInactive
*/
public boolean isGcIfInactive() {
return this.gcIfInactive;
}
/**
* @param gcIfInactive the gcIfInactive to set
*/
public void setGcIfInactive(boolean gcIfInactive) {
this.gcIfInactive = gcIfInactive;
}
public void markForGC(long timeStamp) {
if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
&& destinationStatistics.messages.getCount() == 0 && getInactiveTimoutBeforeGC() > 0l) {
this.lastActiveTime = timeStamp;
}
}
public boolean canGC() {
boolean result = false;
if (isGcIfInactive()&& this.lastActiveTime != 0l) {
if ((System.currentTimeMillis() - this.lastActiveTime) > getInactiveTimoutBeforeGC()) {
result = true;
}
}
return result;
}
}

View File

@ -355,6 +355,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
LinkedList<BrowserDispatch> browserDispatches = new LinkedList<BrowserDispatch>();
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
super.addSubscription(context, sub);
// synchronize with dispatch method so that no new messages are sent
// while setting up a subscription. avoid out of order messages,
// duplicates, etc.
@ -362,8 +363,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
try {
sub.add(context, this);
destinationStatistics.getConsumers().increment();
// needs to be synchronized - so no contention with dispatching
// consumersLock.
consumersLock.writeLock().lock();
@ -423,7 +423,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId)
throws Exception {
destinationStatistics.getConsumers().decrement();
super.removeSubscription(context, sub, lastDeiveredSequenceId);
// synchronize with dispatch method so that no new messages are sent
// while removing up a subscription.
pagedInPendingDispatchLock.writeLock().lock();

View File

@ -102,6 +102,12 @@ public class RegionBroker extends EmptyBroker {
private ConnectionContext adminConnectionContext;
private final Scheduler scheduler;
private final ThreadPoolExecutor executor;
private final Runnable purgeInactiveDestinationsTask = new Runnable() {
public void run() {
purgeInactiveDestinations();
}
};
public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory,
DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
@ -191,11 +197,16 @@ public class RegionBroker extends EmptyBroker {
topicRegion.start();
tempQueueRegion.start();
tempTopicRegion.start();
int period = this.brokerService.getSchedulePeriodForDestinationPurge();
if (period > 0) {
this.scheduler.executePeriodically(purgeInactiveDestinationsTask, period);
}
}
@Override
public void stop() throws Exception {
started = false;
this.scheduler.cancel(purgeInactiveDestinationsTask);
ServiceStopper ss = new ServiceStopper();
doStop(ss);
ss.throwFirstException();
@ -351,26 +362,28 @@ public class RegionBroker extends EmptyBroker {
}
@Override
public void addProducer(ConnectionContext context, ProducerInfo info)
throws Exception {
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
ActiveMQDestination destination = info.getDestination();
if (destination != null) {
synchronized (purgeInactiveDestinationsTask) {
if (destination != null) {
// This seems to cause the destination to be added but without advisories firing...
context.getBroker().addDestination(context, destination,false);
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
queueRegion.addProducer(context, info);
break;
case ActiveMQDestination.TOPIC_TYPE:
topicRegion.addProducer(context, info);
break;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
tempQueueRegion.addProducer(context, info);
break;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
tempTopicRegion.addProducer(context, info);
break;
// This seems to cause the destination to be added but without
// advisories firing...
context.getBroker().addDestination(context, destination, false);
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
queueRegion.addProducer(context, info);
break;
case ActiveMQDestination.TOPIC_TYPE:
topicRegion.addProducer(context, info);
break;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
tempQueueRegion.addProducer(context, info);
break;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
tempTopicRegion.addProducer(context, info);
break;
}
}
}
}
@ -378,20 +391,22 @@ public class RegionBroker extends EmptyBroker {
@Override
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
ActiveMQDestination destination = info.getDestination();
if (destination != null) {
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
queueRegion.removeProducer(context, info);
break;
case ActiveMQDestination.TOPIC_TYPE:
topicRegion.removeProducer(context, info);
break;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
tempQueueRegion.removeProducer(context, info);
break;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
tempTopicRegion.removeProducer(context, info);
break;
synchronized (purgeInactiveDestinationsTask) {
if (destination != null) {
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
queueRegion.removeProducer(context, info);
break;
case ActiveMQDestination.TOPIC_TYPE:
topicRegion.removeProducer(context, info);
break;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
tempQueueRegion.removeProducer(context, info);
break;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
tempTopicRegion.removeProducer(context, info);
break;
}
}
}
}
@ -399,48 +414,55 @@ public class RegionBroker extends EmptyBroker {
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
ActiveMQDestination destination = info.getDestination();
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
return queueRegion.addConsumer(context, info);
synchronized (purgeInactiveDestinationsTask) {
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
return queueRegion.addConsumer(context, info);
case ActiveMQDestination.TOPIC_TYPE:
return topicRegion.addConsumer(context, info);
case ActiveMQDestination.TOPIC_TYPE:
return topicRegion.addConsumer(context, info);
case ActiveMQDestination.TEMP_QUEUE_TYPE:
return tempQueueRegion.addConsumer(context, info);
case ActiveMQDestination.TEMP_QUEUE_TYPE:
return tempQueueRegion.addConsumer(context, info);
case ActiveMQDestination.TEMP_TOPIC_TYPE:
return tempTopicRegion.addConsumer(context, info);
case ActiveMQDestination.TEMP_TOPIC_TYPE:
return tempTopicRegion.addConsumer(context, info);
default:
throw createUnknownDestinationTypeException(destination);
default:
throw createUnknownDestinationTypeException(destination);
}
}
}
@Override
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
ActiveMQDestination destination = info.getDestination();
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
queueRegion.removeConsumer(context, info);
break;
case ActiveMQDestination.TOPIC_TYPE:
topicRegion.removeConsumer(context, info);
break;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
tempQueueRegion.removeConsumer(context, info);
break;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
tempTopicRegion.removeConsumer(context, info);
break;
default:
throw createUnknownDestinationTypeException(destination);
synchronized (purgeInactiveDestinationsTask) {
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
queueRegion.removeConsumer(context, info);
break;
case ActiveMQDestination.TOPIC_TYPE:
topicRegion.removeConsumer(context, info);
break;
case ActiveMQDestination.TEMP_QUEUE_TYPE:
tempQueueRegion.removeConsumer(context, info);
break;
case ActiveMQDestination.TEMP_TOPIC_TYPE:
tempTopicRegion.removeConsumer(context, info);
break;
default:
throw createUnknownDestinationTypeException(destination);
}
}
}
@Override
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
topicRegion.removeSubscription(context, info);
synchronized (purgeInactiveDestinationsTask) {
topicRegion.removeSubscription(context, info);
}
}
@Override
@ -868,4 +890,38 @@ public class RegionBroker extends EmptyBroker {
}
}
}
protected void purgeInactiveDestinations() {
synchronized (purgeInactiveDestinationsTask) {
List<BaseDestination> list = new ArrayList<BaseDestination>();
Map<ActiveMQDestination, Destination> map = getDestinationMap();
long timeStamp = System.currentTimeMillis();
for (Destination d : map.values()) {
if (d instanceof BaseDestination) {
BaseDestination bd = (BaseDestination) d;
bd.markForGC(timeStamp);
if (bd.canGC()) {
list.add(bd);
}
}
}
if (list.isEmpty() == false) {
ConnectionContext context = new ConnectionContext();
context.setBroker(this);
for (BaseDestination dest : list) {
dest.getLog().info(
dest.getName() + " Inactive for longer than " + dest.getInactiveTimoutBeforeGC()
+ " ms - removing ...");
try {
getRoot().removeDestination(context, dest.getActiveMQDestination(), 0);
} catch (Exception e) {
LOG.error("Failed to remove inactive destination " + dest, e);
}
}
}
}
}
}

View File

@ -112,7 +112,7 @@ public class Topic extends BaseDestination implements Task {
public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
destinationStatistics.getConsumers().increment();
super.addSubscription(context, sub);
if (!sub.getConsumerInfo().isDurable()) {
@ -152,7 +152,7 @@ public class Topic extends BaseDestination implements Task {
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId)
throws Exception {
if (!sub.getConsumerInfo().isDurable()) {
destinationStatistics.getConsumers().decrement();
super.removeSubscription(context, sub, lastDeliveredSequenceId);
synchronized (consumers) {
consumers.remove(sub);
}