From 2cb21198141c8980a0d84b697809053a6ad36f35 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Thu, 18 Oct 2007 14:08:44 +0000 Subject: [PATCH] fix for https://issues.apache.org/activemq/browse/AMQ-1467 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@585967 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/BrokerService.java | 53 ++++++---- .../broker/ProducerBrokerExchange.java | 3 + .../apache/activemq/broker/region/Topic.java | 2 + .../kaha/impl/index/VMIndexLinkedList.java | 14 +-- .../java/org/apache/activemq/usage/Usage.java | 100 ++++++++++++++---- .../impl/index/VMIndexLinkedListTest.java | 4 +- 6 files changed, 124 insertions(+), 52 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index 43f0620ff4..0e4477a3c9 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -123,7 +123,7 @@ public class BrokerService implements Service { private List proxyConnectors = new CopyOnWriteArrayList(); private List registeredMBeanNames = new CopyOnWriteArrayList(); private List jmsConnectors = new CopyOnWriteArrayList(); - private Service[] services; + private List services = new ArrayList(); private MasterConnector masterConnector; private String masterConnectorURI; private transient Thread shutdownHook; @@ -449,8 +449,7 @@ public class BrokerService implements Service { removeShutdownHook(); ServiceStopper stopper = new ServiceStopper(); if (services != null) { - for (int i = 0; i < services.length; i++) { - Service service = services[i]; + for (Service service: services) { stopper.stop(service); } } @@ -647,6 +646,7 @@ public class BrokerService implements Service { systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default 64 Meg systemUsage.getTempUsage().setLimit(1024 * 1024 * 1024 * 100); // 10 Gb systemUsage.getStoreUsage().setLimit(1024 * 1024 * 1024 * 100); // 100 GB + addService(this.systemUsage); } return systemUsage; } catch (IOException e) { @@ -656,7 +656,11 @@ public class BrokerService implements Service { } public void setSystemUsage(SystemUsage memoryManager) { + if (this.systemUsage != null) { + removeService(this.systemUsage); + } this.systemUsage = memoryManager; + addService(this.systemUsage); } /** @@ -667,6 +671,7 @@ public class BrokerService implements Service { if (consumerSystemUsage == null) { consumerSystemUsage = new SystemUsage(getSystemUsage(), "Consumer"); consumerSystemUsage.getMemoryUsage().setUsagePortion(0.5f); + addService(consumerSystemUsage); } return consumerSystemUsage; } @@ -675,7 +680,11 @@ public class BrokerService implements Service { * @param consumerUsageManager the consumerUsageManager to set */ public void setConsumerSystemUsage(SystemUsage consumerUsageManager) { + if (this.consumerSystemUsage != null) { + removeService(this.consumerSystemUsage); + } this.consumerSystemUsage = consumerUsageManager; + addService(this.producerSystemUsage); } /** @@ -686,6 +695,7 @@ public class BrokerService implements Service { if (producerSystemUsage == null) { producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer"); producerSystemUsage.getMemoryUsage().setUsagePortion(0.45f); + addService(producerSystemUsage); } return producerSystemUsage; } @@ -694,7 +704,11 @@ public class BrokerService implements Service { * @param producerUsageManager the producerUsageManager to set */ public void setProducerSystemUsage(SystemUsage producerUsageManager) { + if (this.producerSystemUsage != null) { + removeService(this.producerSystemUsage); + } this.producerSystemUsage = producerUsageManager; + addService(this.producerSystemUsage); } public PersistenceAdapter getPersistenceAdapter() throws IOException { @@ -831,7 +845,7 @@ public class BrokerService implements Service { } public Service[] getServices() { - return services; + return (Service[]) services.toArray(); } /** @@ -839,7 +853,12 @@ public class BrokerService implements Service { * {@link MasterConnector} */ public void setServices(Service[] services) { - this.services = services; + this.services.clear(); + if (services != null) { + for (int i=0; i < services.length;i++) { + this.services.add(services[i]); + } + } } /** @@ -847,15 +866,11 @@ public class BrokerService implements Service { * lifecycle */ public void addService(Service service) { - if (services == null) { - services = new Service[] {service}; - } else { - int length = services.length; - Service[] temp = new Service[length + 1]; - System.arraycopy(services, 1, temp, 1, length); - temp[length] = service; - services = temp; - } + services.add(service); + } + + public void removeService(Service service) { + services.remove(service); } public boolean isUseLoggingForShutdownErrors() { @@ -1676,13 +1691,9 @@ public class BrokerService implements Service { JmsConnector connector = iter.next(); connector.start(); } - - if (services != null) { - for (int i = 0; i < services.length; i++) { - Service service = services[i]; - configureService(service); - service.start(); - } + for (Service service:services) { + configureService(service); + service.start(); } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java b/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java index 9075fb1f58..32b3db8655 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java @@ -32,6 +32,9 @@ public class ProducerBrokerExchange { private Region region; private ProducerState producerState; private boolean mutable = true; + + public ProducerBrokerExchange() { + } /** * @return the connectionContext diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index 2883eac375..99cf98960b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -46,6 +46,7 @@ import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.Response; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.state.ProducerState; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.TopicMessageStore; @@ -613,6 +614,7 @@ public class Topic extends BaseDestination { ProducerBrokerExchange producerExchange = new ProducerBrokerExchange(); producerExchange.setMutable(false); producerExchange.setConnectionContext(context); + producerExchange.setProducerState(new ProducerState(new ProducerInfo())); context.getBroker().send(producerExchange, message); } finally { context.setProducerFlowControl(originalFlowControl); diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedList.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedList.java index 1a14923f16..8b93236b7e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedList.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedList.java @@ -29,11 +29,11 @@ public final class VMIndexLinkedList implements Cloneable, IndexLinkedList { /** * Constructs an empty list. + * @param header */ public VMIndexLinkedList(IndexItem header) { this.root = header; - this.root.next = root; - root.prev = root; + this.root.next=this.root.prev=this.root; } public synchronized IndexItem getRoot() { @@ -144,8 +144,7 @@ public final class VMIndexLinkedList implements Cloneable, IndexLinkedList { * @see org.apache.activemq.kaha.impl.IndexLinkedList#clear() */ public synchronized void clear() { - root.next = root; - root.prev = root; + root.next=root.prev=root; size = 0; } @@ -258,12 +257,7 @@ public final class VMIndexLinkedList implements Cloneable, IndexLinkedList { if (e == root || e.equals(root)) { return; } - if (e.prev==null){ - e.prev=root; - } - if (e.next==null){ - e.next=root; - } + e.prev.next = e.next; e.next.prev = e.prev; size--; diff --git a/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java b/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java index 55b7d4258a..9d447845a6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java +++ b/activemq-core/src/main/java/org/apache/activemq/usage/Usage.java @@ -21,6 +21,12 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.Service; import org.apache.commons.logging.Log; @@ -49,6 +55,8 @@ public abstract class Usage implements Service { private List children = new CopyOnWriteArrayList(); private final List callbacks = new LinkedList(); private int pollingTime = 100; + private ThreadPoolExecutor executor; + private AtomicBoolean started=new AtomicBoolean(); public Usage(T parent, String name, float portion) { this.parent = parent; @@ -233,25 +241,35 @@ public abstract class Usage implements Service { return (int)((((retrieveUsage() * 100) / limiter.getLimit()) / percentUsageMinDelta) * percentUsageMinDelta); } - private void fireEvent(int oldPercentUsage, int newPercentUsage) { + private void fireEvent(final int oldPercentUsage, final int newPercentUsage) { if (debug) { LOG.debug("Memory usage change. from: " + oldPercentUsage + ", to: " + newPercentUsage); } - // Switching from being full to not being full.. - if (oldPercentUsage >= 100 && newPercentUsage < 100) { - synchronized (usageMutex) { - usageMutex.notifyAll(); - for (Iterator iter = new ArrayList(callbacks).iterator(); iter.hasNext();) { - Runnable callback = iter.next(); - callback.run(); + if (started.get()) { + // Switching from being full to not being full.. + if (oldPercentUsage >= 100 && newPercentUsage < 100) { + synchronized (usageMutex) { + usageMutex.notifyAll(); + for (Iterator iter = new ArrayList(callbacks).iterator(); iter.hasNext();) { + Runnable callback = iter.next(); + callback.run(); + } + callbacks.clear(); } - callbacks.clear(); } - } - // Let the listeners know - for (Iterator iter = listeners.iterator(); iter.hasNext();) { - UsageListener l = iter.next(); - l.onUsageChanged(this, oldPercentUsage, newPercentUsage); + // Let the listeners know on a separate thread + Runnable listenerNotifier = new Runnable() { + + public void run() { + for (Iterator iter = listeners.iterator(); iter.hasNext();) { + UsageListener l = iter.next(); + l.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage); + } + } + + }; + listenerNotifier.run(); + //getExecutor().execute(listenerNotifier); } } @@ -264,21 +282,46 @@ public abstract class Usage implements Service { } @SuppressWarnings("unchecked") - public void start() { - if (parent != null) { - parent.addChild(this); + public synchronized void start() { + if (started.compareAndSet(false, true)){ + if (parent != null) { + parent.addChild(this); + } + for (T t:children) { + t.start(); + } } } @SuppressWarnings("unchecked") - public void stop() { - if (parent != null) { - parent.removeChild(this); + public synchronized void stop() { + if (started.compareAndSet(true, false)){ + if (parent != null) { + parent.removeChild(this); + } + if (this.executor != null){ + this.executor.shutdownNow(); + } + //clear down any callbacks + synchronized (usageMutex) { + usageMutex.notifyAll(); + for (Iterator iter = new ArrayList(this.callbacks).iterator(); iter.hasNext();) { + Runnable callback = iter.next(); + callback.run(); + } + this.callbacks.clear(); + } + for (T t:children) { + t.stop(); + } } } private void addChild(T child) { children.add(child); + if (started.get()) { + child.start(); + } } private void removeChild(T child) { @@ -357,4 +400,21 @@ public abstract class Usage implements Service { public void setParent(T parent) { this.parent = parent; } + + protected synchronized Executor getExecutor() { + if (this.executor == null) { + this.executor = new ThreadPoolExecutor(1, 1, 0, + TimeUnit.NANOSECONDS, + new LinkedBlockingQueue(), new ThreadFactory() { + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable, getName() + + " Usage Thread Pool"); + thread.setDaemon(true); + return thread; + } + }); + + } + return this.executor; + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java b/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java index 93cbee8a4a..2d0040b256 100644 --- a/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java @@ -34,7 +34,9 @@ public class VMIndexLinkedListTest extends TestCase { protected void setUp() throws Exception { super.setUp(); for (int i = 0; i < NUMBER; i++) { - testData.add(new IndexItem()); + IndexItem item = new IndexItem(); + item.setOffset(i); + testData.add(item); } root = new IndexItem(); list = new VMIndexLinkedList(root);