git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@613830 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-01-21 10:31:22 +00:00
parent caf7a7c7c2
commit 83c3dca9e5
15 changed files with 255 additions and 25 deletions

View File

@ -104,6 +104,32 @@ public class BrokerView implements BrokerViewMBean {
public void setMemoryLimit(long limit) {
brokerService.getSystemUsage().getMemoryUsage().setLimit(limit);
}
public long getStoreLimit() {
return brokerService.getSystemUsage().getStoreUsage().getLimit();
}
public int getStorePercentageUsed() {
return brokerService.getSystemUsage().getStoreUsage().getPercentUsage();
}
public long getTmpLimit() {
return brokerService.getSystemUsage().getTempUsage().getLimit();
}
public int getTmpPercentageUsed() {
return brokerService.getSystemUsage().getTempUsage().getPercentUsage();
}
public void setStoreLimit(long limit) {
brokerService.getSystemUsage().getStoreUsage().setLimit(limit);
}
public void setTmpLimit(long limit) {
brokerService.getSystemUsage().getTempUsage().setLimit(limit);
}
public void resetStatistics() {
broker.getDestinationStatistics().reset();
@ -289,5 +315,4 @@ public class BrokerView implements BrokerViewMBean {
throw e.getTargetException();
}
}
}

View File

@ -20,6 +20,7 @@ import javax.management.ObjectName;
import org.apache.activemq.Service;
/**
* @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (for the reloadLog4jProperties method)
* @version $Revision$
@ -65,6 +66,18 @@ public interface BrokerViewMBean extends Service {
long getMemoryLimit();
void setMemoryLimit(long limit);
int getStorePercentageUsed();
long getStoreLimit();
void setStoreLimit(long limit);
int getTmpPercentageUsed();
long getTmpLimit();
void setTmpLimit(long limit);
boolean isPersistent();

View File

@ -94,15 +94,15 @@ public class DestinationView implements DestinationViewMBean {
}
public int getMemoryPercentageUsed() {
return destination.getBrokerMemoryUsage().getPercentUsage();
return destination.getMemoryUsage().getPercentUsage();
}
public long getMemoryLimit() {
return destination.getBrokerMemoryUsage().getLimit();
return destination.getMemoryUsage().getLimit();
}
public void setMemoryLimit(long limit) {
destination.getBrokerMemoryUsage().setLimit(limit);
destination.getMemoryUsage().setLimit(limit);
}
public double getAverageEnqueueTime() {
@ -267,4 +267,51 @@ public class DestinationView implements DestinationViewMBean {
}
}
public int getMaxAuditDepth() {
return destination.getMaxAuditDepth();
}
public int getMaxProducersToAudit() {
return destination.getMaxProducersToAudit();
}
public boolean isEnableAudit() {
return destination.isEnableAudit();
}
public void setEnableAudit(boolean enableAudit) {
destination.setEnableAudit(enableAudit);
}
public void setMaxAuditDepth(int maxAuditDepth) {
destination.setMaxAuditDepth(maxAuditDepth);
}
public void setMaxProducersToAudit(int maxProducersToAudit) {
destination.setMaxProducersToAudit(maxProducersToAudit);
}
public float getMemoryLimitPortion() {
return destination.getMemoryUsage().getUsagePortion();
}
public long getProducerCount() {
return destination.getDestinationStatistics().getProducers().getCount();
}
public boolean isProducerFlowControl() {
return destination.isProducerFlowControl();
}
public void setMemoryLimitPortion(float value) {
destination.getMemoryUsage().setUsagePortion(value);
}
public void setProducerFlowControl(boolean producerFlowControl) {
destination.setProducerFlowControl(producerFlowControl);
}
}

View File

@ -67,6 +67,11 @@ public interface DestinationViewMBean {
* @return The number of consumers subscribed this destination.
*/
long getConsumerCount();
/**
* @return the number of producers publishing to the destination
*/
long getProducerCount();
/**
* Returns the number of messages in this destination which are yet to be
@ -119,11 +124,32 @@ public interface DestinationViewMBean {
*/
String sendTextMessage(Map headers, String body) throws Exception;
/**
* @return the percentage of amount of memory used
*/
int getMemoryPercentageUsed();
/**
* @return the amount of memory allocated to this destination
*/
long getMemoryLimit();
/**
* set the amount of memory allocated to this destination
* @param limit
*/
void setMemoryLimit(long limit);
/**
* @return the portion of memory from the broker memory limit for this destination
*/
float getMemoryLimitPortion();
/**
* set the portion of memory from the broker memory limit for this destination
* @param value
*/
void setMemoryLimitPortion(float value);
/**
* Browses the current destination returning a list of messages
@ -150,5 +176,34 @@ public interface DestinationViewMBean {
* @return average time a message is held by a destination
*/
double getAverageEnqueueTime();
/**
* @return the producerFlowControl
*/
boolean isProducerFlowControl();
/**
* @param producerFlowControl the producerFlowControl to set
*/
public void setProducerFlowControl(boolean producerFlowControl);
/**
* @return the maxProducersToAudit
*/
public int getMaxProducersToAudit();
/**
* @param maxProducersToAudit the maxProducersToAudit to set
*/
public void setMaxProducersToAudit(int maxProducersToAudit);
/**
* @return the maxAuditDepth
*/
public int getMaxAuditDepth();
/**
* @param maxAuditDepth the maxAuditDepth to set
*/
public void setMaxAuditDepth(int maxAuditDepth);
}

View File

@ -64,6 +64,12 @@ public abstract class BaseDestination implements Destination {
this.destinationStatistics.setEnabled(parentStats.isEnabled());
this.destinationStatistics.setParent(parentStats);
}
/**
* initialize the destination
* @throws Exception
*/
public abstract void initialize() throws Exception;
/**
* @return the producerFlowControl
*/
@ -121,7 +127,7 @@ public abstract class BaseDestination implements Destination {
destinationStatistics.getProducers().decrement();
}
public final MemoryUsage getBrokerMemoryUsage() {
public final MemoryUsage getMemoryUsage() {
return memoryUsage;
}
@ -144,6 +150,11 @@ public abstract class BaseDestination implements Destination {
public final MessageStore getMessageStore() {
return store;
}
public final boolean isActive() {
return destinationStatistics.getConsumers().getCount() != 0 ||
destinationStatistics.getProducers().getCount() != 0;
}

View File

@ -53,7 +53,7 @@ public interface Destination extends Service {
ActiveMQDestination getActiveMQDestination();
MemoryUsage getBrokerMemoryUsage();
MemoryUsage getMemoryUsage();
void dispose(ConnectionContext context) throws IOException;
@ -70,4 +70,20 @@ public interface Destination extends Service {
boolean isProducerFlowControl();
void setProducerFlowControl(boolean value);
int getMaxProducersToAudit();
void setMaxProducersToAudit(int maxProducersToAudit);
int getMaxAuditDepth();
void setMaxAuditDepth(int maxAuditDepth);
boolean isEnableAudit();
void setEnableAudit(boolean enableAudit);
boolean isActive();
}

View File

@ -115,6 +115,7 @@ public class DestinationFactoryImpl extends DestinationFactory {
}
Topic topic = new Topic(broker.getRoot(), destination, store, memoryManager, destinationStatistics, taskRunnerFactory);
configureTopic(topic, destination);
topic.initialize();
return topic;
}
}

View File

@ -81,8 +81,8 @@ public class DestinationFilter implements Destination {
return next.getName();
}
public MemoryUsage getBrokerMemoryUsage() {
return next.getBrokerMemoryUsage();
public MemoryUsage getMemoryUsage() {
return next.getMemoryUsage();
}
public boolean lock(MessageReference node, LockOwner lockOwner) {
@ -141,6 +141,34 @@ public class DestinationFilter implements Destination {
throws Exception {
next.removeProducer(context, info);
}
public int getMaxAuditDepth() {
return next.getMaxAuditDepth();
}
public int getMaxProducersToAudit() {
return next.getMaxProducersToAudit();
}
public boolean isEnableAudit() {
return next.isEnableAudit();
}
public void setEnableAudit(boolean enableAudit) {
next.setEnableAudit(enableAudit);
}
public void setMaxAuditDepth(int maxAuditDepth) {
next.setMaxAuditDepth(maxAuditDepth);
}
public void setMaxProducersToAudit(int maxProducersToAudit) {
next.setMaxProducersToAudit(maxProducersToAudit);
}
public boolean isActive() {
return next.isActive();
}
}

View File

@ -17,11 +17,12 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
@ -31,9 +32,10 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -47,13 +49,23 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
private final boolean keepDurableSubsActive;
private boolean active;
public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
throws InvalidSelectorException {
public DurableTopicSubscription(Broker broker, Destination dest,SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
throws JMSException {
super(broker,usageManager, context, info);
this.pending = new StoreDurableSubscriberCursor(context.getClientId(), info.getSubscriptionName(), broker.getTempDataStore(), info.getPrefetchSize(), this);
this.pending.setSystemUsage(usageManager);
this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
if (dest != null && dest.getMessageStore() != null) {
TopicMessageStore store = (TopicMessageStore)dest.getMessageStore();
try {
this.enqueueCounter=store.getMessageCount(subscriptionKey.getClientId(),subscriptionKey.getSubscriptionName());
} catch (IOException e) {
JMSException jmsEx = new JMSException("Failed to retrieve eunqueueCount from store "+ e);
jmsEx.setLinkedException(e);
throw jmsEx;
}
}
}
public boolean isActive() {

View File

@ -150,6 +150,9 @@ public class Queue extends BaseDestination implements Task {
return true;
}
});
}else {
int messageCount = store.getMessageCount();
destinationStatistics.getMessages().setCount(messageCount);
}
}
}
@ -320,7 +323,8 @@ public class Queue extends BaseDestination implements Task {
final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 && !context.isInRecoveryMode();
if (message.isExpired()) {
broker.messageExpired(context, message);
destinationStatistics.getMessages().decrement();
//message not added to stats yet
//destinationStatistics.getMessages().decrement();
if (sendProducerAck) {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack);
@ -346,7 +350,8 @@ public class Queue extends BaseDestination implements Task {
// message may have expired.
if (broker.isExpired(message)) {
broker.messageExpired(context, message);
destinationStatistics.getMessages().decrement();
//message not added to stats yet
//destinationStatistics.getMessages().decrement();
} else {
doMessageSend(producerExchange, message);
}
@ -436,7 +441,8 @@ public class Queue extends BaseDestination implements Task {
// op, by that time the message could have expired..
if (broker.isExpired(message)) {
broker.messageExpired(context, message);
destinationStatistics.getMessages().decrement();
//message not added to stats yet
//destinationStatistics.getMessages().decrement();
return;
}
sendMessage(context, message);

View File

@ -56,8 +56,6 @@ import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.BrokerSupport;

View File

@ -99,6 +99,13 @@ public class Topic extends BaseDestination implements Task{
}
this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName());
}
public void initialize() throws Exception{
if (store != null) {
int messageCount = store.getMessageCount();
destinationStatistics.getMessages().setCount(messageCount);
}
}
public boolean lock(MessageReference node, LockOwner sub) {
return true;
@ -288,7 +295,8 @@ public class Topic extends BaseDestination implements Task{
// message may have expired.
if (broker.isExpired(message)) {
broker.messageExpired(context, message);
destinationStatistics.getMessages().decrement();
//destinationStatistics.getEnqueues().increment();
//destinationStatistics.getMessages().decrement();
} else {
doMessageSend(producerExchange, message);
}
@ -394,7 +402,8 @@ public class Topic extends BaseDestination implements Task{
if (broker.isExpired(message)) {
broker.messageExpired(context, message);
message.decrementReferenceCount();
destinationStatistics.getMessages().decrement();
//destinationStatistics.getEnqueues().increment();
//destinationStatistics.getMessages().decrement();
return;
}
try {
@ -543,6 +552,7 @@ public class Topic extends BaseDestination implements Task{
// Implementation methods
// -------------------------------------------------------------------------
protected void dispatch(final ConnectionContext context, Message message) throws Exception {
destinationStatistics.getMessages().increment();
destinationStatistics.getEnqueues().increment();
dispatchValve.increment();
MessageEvaluationContext msgContext = context.getMessageEvaluationContext();

View File

@ -229,9 +229,17 @@ public class TopicRegion extends AbstractRegion {
}
SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
DurableTopicSubscription sub = durableSubscriptions.get(key);
ActiveMQDestination destination = info.getDestination();
if (sub == null) {
sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive);
ActiveMQDestination destination = info.getDestination();
Destination dest=null;
try {
dest = lookup(context, destination);
} catch (Exception e) {
JMSException jmsEx = new JMSException("Failed to retrieve destination from region "+ e);
jmsEx.setLinkedException(e);
throw jmsEx;
}
sub = new DurableTopicSubscription(broker,dest, usageManager, context, info, keepDurableSubsActive);
if (destination != null && broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) {

View File

@ -67,7 +67,7 @@ public class PolicyEntry extends DestinationMapEntry {
}
queue.setMessageGroupMapFactory(getMessageGroupMapFactory());
if (memoryLimit > 0) {
queue.getBrokerMemoryUsage().setLimit(memoryLimit);
queue.getMemoryUsage().setLimit(memoryLimit);
}
if (pendingQueuePolicy != null) {
PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(queue, tmpStore);
@ -91,7 +91,7 @@ public class PolicyEntry extends DestinationMapEntry {
}
topic.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
if (memoryLimit > 0) {
topic.getBrokerMemoryUsage().setLimit(memoryLimit);
topic.getMemoryUsage().setLimit(memoryLimit);
}
topic.setProducerFlowControl(isProducerFlowControl());
topic.setEnableAudit(isEnableAudit());

View File

@ -571,7 +571,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
public void setRegionDestination(org.apache.activemq.broker.region.Destination destination) {
this.regionDestination = destination;
if(this.memoryUsage==null) {
this.memoryUsage=regionDestination.getBrokerMemoryUsage();
this.memoryUsage=regionDestination.getMemoryUsage();
}
}