git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@585967 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-10-18 14:08:44 +00:00
parent c513c859ac
commit 2cb2119814
6 changed files with 124 additions and 52 deletions

View File

@ -123,7 +123,7 @@ public class BrokerService implements Service {
private List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
private List<ObjectName> registeredMBeanNames = new CopyOnWriteArrayList<ObjectName>();
private List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
private Service[] services;
private List<Service> services = new ArrayList<Service>();
private MasterConnector masterConnector;
private String masterConnectorURI;
private transient Thread shutdownHook;
@ -449,8 +449,7 @@ public class BrokerService implements Service {
removeShutdownHook();
ServiceStopper stopper = new ServiceStopper();
if (services != null) {
for (int i = 0; i < services.length; i++) {
Service service = services[i];
for (Service service: services) {
stopper.stop(service);
}
}
@ -647,6 +646,7 @@ public class BrokerService implements Service {
systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default 64 Meg
systemUsage.getTempUsage().setLimit(1024 * 1024 * 1024 * 100); // 10 Gb
systemUsage.getStoreUsage().setLimit(1024 * 1024 * 1024 * 100); // 100 GB
addService(this.systemUsage);
}
return systemUsage;
} catch (IOException e) {
@ -656,7 +656,11 @@ public class BrokerService implements Service {
}
public void setSystemUsage(SystemUsage memoryManager) {
if (this.systemUsage != null) {
removeService(this.systemUsage);
}
this.systemUsage = memoryManager;
addService(this.systemUsage);
}
/**
@ -667,6 +671,7 @@ public class BrokerService implements Service {
if (consumerSystemUsage == null) {
consumerSystemUsage = new SystemUsage(getSystemUsage(), "Consumer");
consumerSystemUsage.getMemoryUsage().setUsagePortion(0.5f);
addService(consumerSystemUsage);
}
return consumerSystemUsage;
}
@ -675,7 +680,11 @@ public class BrokerService implements Service {
* @param consumerUsageManager the consumerUsageManager to set
*/
public void setConsumerSystemUsage(SystemUsage consumerUsageManager) {
if (this.consumerSystemUsage != null) {
removeService(this.consumerSystemUsage);
}
this.consumerSystemUsage = consumerUsageManager;
addService(this.producerSystemUsage);
}
/**
@ -686,6 +695,7 @@ public class BrokerService implements Service {
if (producerSystemUsage == null) {
producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer");
producerSystemUsage.getMemoryUsage().setUsagePortion(0.45f);
addService(producerSystemUsage);
}
return producerSystemUsage;
}
@ -694,7 +704,11 @@ public class BrokerService implements Service {
* @param producerUsageManager the producerUsageManager to set
*/
public void setProducerSystemUsage(SystemUsage producerUsageManager) {
if (this.producerSystemUsage != null) {
removeService(this.producerSystemUsage);
}
this.producerSystemUsage = producerUsageManager;
addService(this.producerSystemUsage);
}
public PersistenceAdapter getPersistenceAdapter() throws IOException {
@ -831,7 +845,7 @@ public class BrokerService implements Service {
}
public Service[] getServices() {
return services;
return (Service[]) services.toArray();
}
/**
@ -839,7 +853,12 @@ public class BrokerService implements Service {
* {@link MasterConnector}
*/
public void setServices(Service[] services) {
this.services = services;
this.services.clear();
if (services != null) {
for (int i=0; i < services.length;i++) {
this.services.add(services[i]);
}
}
}
/**
@ -847,15 +866,11 @@ public class BrokerService implements Service {
* lifecycle
*/
public void addService(Service service) {
if (services == null) {
services = new Service[] {service};
} else {
int length = services.length;
Service[] temp = new Service[length + 1];
System.arraycopy(services, 1, temp, 1, length);
temp[length] = service;
services = temp;
}
services.add(service);
}
public void removeService(Service service) {
services.remove(service);
}
public boolean isUseLoggingForShutdownErrors() {
@ -1676,13 +1691,9 @@ public class BrokerService implements Service {
JmsConnector connector = iter.next();
connector.start();
}
if (services != null) {
for (int i = 0; i < services.length; i++) {
Service service = services[i];
configureService(service);
service.start();
}
for (Service service:services) {
configureService(service);
service.start();
}
}
}

View File

@ -32,6 +32,9 @@ public class ProducerBrokerExchange {
private Region region;
private ProducerState producerState;
private boolean mutable = true;
public ProducerBrokerExchange() {
}
/**
* @return the connectionContext

View File

@ -46,6 +46,7 @@ import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.TopicMessageStore;
@ -613,6 +614,7 @@ public class Topic extends BaseDestination {
ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
producerExchange.setMutable(false);
producerExchange.setConnectionContext(context);
producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
context.getBroker().send(producerExchange, message);
} finally {
context.setProducerFlowControl(originalFlowControl);

View File

@ -29,11 +29,11 @@ public final class VMIndexLinkedList implements Cloneable, IndexLinkedList {
/**
* Constructs an empty list.
* @param header
*/
public VMIndexLinkedList(IndexItem header) {
this.root = header;
this.root.next = root;
root.prev = root;
this.root.next=this.root.prev=this.root;
}
public synchronized IndexItem getRoot() {
@ -144,8 +144,7 @@ public final class VMIndexLinkedList implements Cloneable, IndexLinkedList {
* @see org.apache.activemq.kaha.impl.IndexLinkedList#clear()
*/
public synchronized void clear() {
root.next = root;
root.prev = root;
root.next=root.prev=root;
size = 0;
}
@ -258,12 +257,7 @@ public final class VMIndexLinkedList implements Cloneable, IndexLinkedList {
if (e == root || e.equals(root)) {
return;
}
if (e.prev==null){
e.prev=root;
}
if (e.next==null){
e.next=root;
}
e.prev.next = e.next;
e.next.prev = e.prev;
size--;

View File

@ -21,6 +21,12 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.Service;
import org.apache.commons.logging.Log;
@ -49,6 +55,8 @@ public abstract class Usage<T extends Usage> implements Service {
private List<T> children = new CopyOnWriteArrayList<T>();
private final List<Runnable> callbacks = new LinkedList<Runnable>();
private int pollingTime = 100;
private ThreadPoolExecutor executor;
private AtomicBoolean started=new AtomicBoolean();
public Usage(T parent, String name, float portion) {
this.parent = parent;
@ -233,25 +241,35 @@ public abstract class Usage<T extends Usage> implements Service {
return (int)((((retrieveUsage() * 100) / limiter.getLimit()) / percentUsageMinDelta) * percentUsageMinDelta);
}
private void fireEvent(int oldPercentUsage, int newPercentUsage) {
private void fireEvent(final int oldPercentUsage, final int newPercentUsage) {
if (debug) {
LOG.debug("Memory usage change. from: " + oldPercentUsage + ", to: " + newPercentUsage);
}
// Switching from being full to not being full..
if (oldPercentUsage >= 100 && newPercentUsage < 100) {
synchronized (usageMutex) {
usageMutex.notifyAll();
for (Iterator<Runnable> iter = new ArrayList<Runnable>(callbacks).iterator(); iter.hasNext();) {
Runnable callback = iter.next();
callback.run();
if (started.get()) {
// Switching from being full to not being full..
if (oldPercentUsage >= 100 && newPercentUsage < 100) {
synchronized (usageMutex) {
usageMutex.notifyAll();
for (Iterator<Runnable> iter = new ArrayList<Runnable>(callbacks).iterator(); iter.hasNext();) {
Runnable callback = iter.next();
callback.run();
}
callbacks.clear();
}
callbacks.clear();
}
}
// Let the listeners know
for (Iterator<UsageListener> iter = listeners.iterator(); iter.hasNext();) {
UsageListener l = iter.next();
l.onUsageChanged(this, oldPercentUsage, newPercentUsage);
// Let the listeners know on a separate thread
Runnable listenerNotifier = new Runnable() {
public void run() {
for (Iterator<UsageListener> iter = listeners.iterator(); iter.hasNext();) {
UsageListener l = iter.next();
l.onUsageChanged(Usage.this, oldPercentUsage, newPercentUsage);
}
}
};
listenerNotifier.run();
//getExecutor().execute(listenerNotifier);
}
}
@ -264,21 +282,46 @@ public abstract class Usage<T extends Usage> implements Service {
}
@SuppressWarnings("unchecked")
public void start() {
if (parent != null) {
parent.addChild(this);
public synchronized void start() {
if (started.compareAndSet(false, true)){
if (parent != null) {
parent.addChild(this);
}
for (T t:children) {
t.start();
}
}
}
@SuppressWarnings("unchecked")
public void stop() {
if (parent != null) {
parent.removeChild(this);
public synchronized void stop() {
if (started.compareAndSet(true, false)){
if (parent != null) {
parent.removeChild(this);
}
if (this.executor != null){
this.executor.shutdownNow();
}
//clear down any callbacks
synchronized (usageMutex) {
usageMutex.notifyAll();
for (Iterator<Runnable> iter = new ArrayList<Runnable>(this.callbacks).iterator(); iter.hasNext();) {
Runnable callback = iter.next();
callback.run();
}
this.callbacks.clear();
}
for (T t:children) {
t.stop();
}
}
}
private void addChild(T child) {
children.add(child);
if (started.get()) {
child.start();
}
}
private void removeChild(T child) {
@ -357,4 +400,21 @@ public abstract class Usage<T extends Usage> implements Service {
public void setParent(T parent) {
this.parent = parent;
}
protected synchronized Executor getExecutor() {
if (this.executor == null) {
this.executor = new ThreadPoolExecutor(1, 1, 0,
TimeUnit.NANOSECONDS,
new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, getName()
+ " Usage Thread Pool");
thread.setDaemon(true);
return thread;
}
});
}
return this.executor;
}
}

View File

@ -34,7 +34,9 @@ public class VMIndexLinkedListTest extends TestCase {
protected void setUp() throws Exception {
super.setUp();
for (int i = 0; i < NUMBER; i++) {
testData.add(new IndexItem());
IndexItem item = new IndexItem();
item.setOffset(i);
testData.add(item);
}
root = new IndexItem();
list = new VMIndexLinkedList(root);