Add support for exposing information about blocked sends through JMX - see https://issues.apache.org/jira/browse/AMQ-4635

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1504452 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2013-07-18 12:53:08 +00:00
parent 92b6bd23cf
commit 07369531ea
11 changed files with 235 additions and 83 deletions

View File

@ -50,27 +50,12 @@ import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionMetaData;
import org.apache.activemq.ConfigurationException;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
import org.apache.activemq.broker.jmx.AnnotatedMBean;
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.broker.jmx.ConnectorView;
import org.apache.activemq.broker.jmx.ConnectorViewMBean;
import org.apache.activemq.broker.jmx.HealthView;
import org.apache.activemq.broker.jmx.HealthViewMBean;
import org.apache.activemq.broker.jmx.JmsConnectorView;
import org.apache.activemq.broker.jmx.JobSchedulerView;
import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
import org.apache.activemq.broker.jmx.ManagedRegionBroker;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.jmx.NetworkConnectorView;
import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
import org.apache.activemq.broker.jmx.ProxyConnectorView;
import org.apache.activemq.broker.jmx.*;
import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFactory;
@ -87,6 +72,7 @@ import org.apache.activemq.broker.scheduler.SchedulerBroker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.network.ConnectionFilter;
import org.apache.activemq.network.DiscoveryNetworkConnector;
@ -107,16 +93,7 @@ import org.apache.activemq.transport.TransportFactorySupport;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.vm.VMTransportFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.BrokerSupport;
import org.apache.activemq.util.DefaultIOExceptionHandler;
import org.apache.activemq.util.IOExceptionHandler;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.InetAddressUtil;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ThreadPoolUtils;
import org.apache.activemq.util.TimeUtils;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@ -1269,6 +1246,20 @@ public class BrokerService implements Service {
return answer;
}
public ProducerBrokerExchange getProducerBrokerExchange(ProducerInfo producerInfo){
ProducerBrokerExchange result = null;
for (TransportConnector connector : transportConnectors) {
for (TransportConnection tc: connector.getConnections()){
result = tc.getProducerBrokerExchangeIfExists(producerInfo);
if (result !=null){
return result;
}
}
}
return result;
}
public String[] getTransportConnectorURIs() {
return transportConnectorURIs;
}

View File

@ -16,6 +16,10 @@
*/
package org.apache.activemq.broker;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Region;
import org.apache.activemq.command.Message;
@ -24,13 +28,8 @@ import org.apache.activemq.state.ProducerState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
/**
* Holds internal state in the broker for a MessageProducer
*
*
*/
public class ProducerBrokerExchange {
@ -44,6 +43,7 @@ public class ProducerBrokerExchange {
private boolean auditProducerSequenceIds;
private boolean isNetworkProducer;
private BrokerService brokerService;
private final FlowControlInfo flowControlInfo = new FlowControlInfo();
public ProducerBrokerExchange() {
}
@ -131,7 +131,7 @@ public class ProducerBrokerExchange {
/**
* Enforce duplicate suppression using info from persistence adapter
* @param messageSend
*
* @return false if message should be ignored as a duplicate
*/
public boolean canDispatch(Message messageSend) {
@ -166,7 +166,7 @@ public class ProducerBrokerExchange {
try {
return brokerService.getPersistenceAdapter().getLastProducerSequenceId(messageId.getProducerId());
} catch (IOException ignored) {
LOG.debug("Failed to determine last producer sequence id for: " +messageId, ignored);
LOG.debug("Failed to determine last producer sequence id for: " + messageId, ignored);
}
return -1;
}
@ -180,4 +180,87 @@ public class ProducerBrokerExchange {
lastSendSequenceNumber.set(l);
LOG.debug("last stored sequence id set: " + l);
}
public void incrementSend() {
flowControlInfo.incrementSend();
}
public void blockingOnFlowControl(boolean blockingOnFlowControl) {
flowControlInfo.setBlockingOnFlowControl(blockingOnFlowControl);
}
public void incrementTimeBlocked(Destination destination, long timeBlocked) {
flowControlInfo.incrementTimeBlocked(timeBlocked);
}
public boolean isBlockedForFlowControl() {
return flowControlInfo.isBlockingOnFlowControl();
}
public void resetFlowControl() {
flowControlInfo.reset();
}
public long getTotalTimeBlocked() {
return flowControlInfo.getTotalTimeBlocked();
}
public int getPercentageBlocked() {
double value = flowControlInfo.getSendsBlocked() / flowControlInfo.getTotalSends();
return (int) value * 100;
}
public static class FlowControlInfo {
private AtomicBoolean blockingOnFlowControl = new AtomicBoolean();
private AtomicLong totalSends = new AtomicLong();
private AtomicLong sendsBlocked = new AtomicLong();
private AtomicLong totalTimeBlocked = new AtomicLong();
public boolean isBlockingOnFlowControl() {
return blockingOnFlowControl.get();
}
public void setBlockingOnFlowControl(boolean blockingOnFlowControl) {
this.blockingOnFlowControl.set(blockingOnFlowControl);
if (blockingOnFlowControl) {
incrementSendBlocked();
}
}
public long getTotalSends() {
return totalSends.get();
}
public void incrementSend() {
this.totalSends.incrementAndGet();
}
public long getSendsBlocked() {
return sendsBlocked.get();
}
public void incrementSendBlocked() {
this.sendsBlocked.incrementAndGet();
}
public long getTotalTimeBlocked() {
return totalTimeBlocked.get();
}
public void incrementTimeBlocked(long time) {
this.totalTimeBlocked.addAndGet(time);
}
public void reset() {
blockingOnFlowControl.set(false);
totalSends.set(0);
sendsBlocked.set(0);
totalTimeBlocked.set(0);
}
}
}

View File

@ -36,44 +36,10 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.transaction.xa.XAResource;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ControlCommand;
import org.apache.activemq.command.DataArrayResponse;
import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.FlushCommand;
import org.apache.activemq.command.IntegerResponse;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.command.*;
import org.apache.activemq.network.DemandForwardingBridge;
import org.apache.activemq.network.MBeanNetworkListener;
import org.apache.activemq.network.NetworkBridgeConfiguration;
@ -94,10 +60,8 @@ import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.TransmitCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@ -1407,6 +1371,16 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
}
public ProducerBrokerExchange getProducerBrokerExchangeIfExists(ProducerInfo producerInfo){
ProducerBrokerExchange result = null;
if (producerInfo != null && producerInfo.getProducerId() != null){
synchronized (producerExchanges){
result = producerExchanges.get(producerInfo.getProducerId());
}
}
return result;
}
private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException {
ProducerBrokerExchange result = producerExchanges.get(id);
if (result == null) {

View File

@ -486,4 +486,19 @@ public class DestinationView implements DestinationViewMBean {
return destination.isDLQ();
}
@Override
public long getBlockedSends() {
return destination.getDestinationStatistics().getBlockedSends().getCount();
}
@Override
public double getAverageBlockedTime() {
return destination.getDestinationStatistics().getBlockedTime().getAverageTime();
}
@Override
public long getTotalBlockedTime() {
return destination.getDestinationStatistics().getBlockedTime().getTotalTime();
}
}

View File

@ -371,4 +371,13 @@ public interface DestinationViewMBean {
@MBeanInfo("Dead Letter Queue")
boolean isDLQ();
@MBeanInfo("Get number of messages blocked for Flow Control")
long getBlockedSends();
@MBeanInfo("get the average time (ms) a message is blocked for Flow Control")
double getAverageBlockedTime();
@MBeanInfo("Get the total time (ms) messages are blocked for Flow Control")
long getTotalBlockedTime();
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.jmx;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ProducerInfo;
@ -148,4 +149,39 @@ public class ProducerView implements ProducerViewMBean {
public String getUserName() {
return userName;
}
@Override
public boolean isProducerBlocked() {
ProducerBrokerExchange producerBrokerExchange = broker.getBrokerService().getProducerBrokerExchange(info);
if (producerBrokerExchange != null){
return producerBrokerExchange.isBlockedForFlowControl();
}
return false;
}
@Override
public long getTotalTimeBlocked() {
ProducerBrokerExchange producerBrokerExchange = broker.getBrokerService().getProducerBrokerExchange(info);
if (producerBrokerExchange != null){
return producerBrokerExchange.getTotalTimeBlocked();
}
return 0;
}
@Override
public int getPercentageBlocked() {
ProducerBrokerExchange producerBrokerExchange = broker.getBrokerService().getProducerBrokerExchange(info);
if (producerBrokerExchange != null){
return producerBrokerExchange.getPercentageBlocked();
}
return 0;
}
@Override
public void resetFlowControlStats() {
ProducerBrokerExchange producerBrokerExchange = broker.getBrokerService().getProducerBrokerExchange(info);
if (producerBrokerExchange != null){
producerBrokerExchange.resetFlowControl();
}
}
}

View File

@ -86,4 +86,16 @@ public interface ProducerViewMBean {
*/
@MBeanInfo("User Name used to authorize creation of this Producer")
String getUserName();
@MBeanInfo("is the producer blocked for Flow Control")
boolean isProducerBlocked();
@MBeanInfo("total time (ms) Producer Blocked For Flow Control")
long getTotalTimeBlocked();
@MBeanInfo("percentage of sends Producer Blocked for Flow Control")
int getPercentageBlocked();
@MBeanInfo("reset flow control stata")
void resetFlowControlStats();
}

View File

@ -606,11 +606,11 @@ public abstract class BaseDestination implements Destination {
this.storeUsageHighWaterMark = storeUsageHighWaterMark;
}
protected final void waitForSpace(ConnectionContext context, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
waitForSpace(context, usage, 100, warning);
protected final void waitForSpace(ConnectionContext context,ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, String warning) throws IOException, InterruptedException, ResourceAllocationException {
waitForSpace(context, producerBrokerExchange, usage, 100, warning);
}
protected final void waitForSpace(ConnectionContext context, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
protected final void waitForSpace(ConnectionContext context, ProducerBrokerExchange producerBrokerExchange, Usage<?> usage, int highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException {
if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) {
getLog().debug("sendFailIfNoSpace, forcing exception on send, usage: " + usage + ": " + warning);
throw new ResourceAllocationException(warning);
@ -623,6 +623,8 @@ public abstract class BaseDestination implements Destination {
} else {
long start = System.currentTimeMillis();
long nextWarn = start;
producerBrokerExchange.blockingOnFlowControl(true);
destinationStatistics.getBlockedSends().increment();
while (!usage.waitForSpace(1000, highWaterMark)) {
if (context.getStopping().get()) {
throw new IOException("Connection closed, send aborted.");
@ -634,6 +636,11 @@ public abstract class BaseDestination implements Destination {
nextWarn = now + blockedProducerWarningInterval;
}
}
long finish = System.currentTimeMillis();
long totalTimeBlocked = finish - start;
destinationStatistics.getBlockedTime().addTime(totalTimeBlocked);
producerBrokerExchange.incrementTimeBlocked(this,totalTimeBlocked);
producerBrokerExchange.blockingOnFlowControl(false);
}
}

View File

@ -39,6 +39,9 @@ public class DestinationStatistics extends StatsImpl {
protected CountStatisticImpl inflight;
protected CountStatisticImpl expired;
protected TimeStatisticImpl processTime;
protected CountStatisticImpl blockedSends;
protected TimeStatisticImpl blockedTime;
public DestinationStatistics() {
@ -56,6 +59,8 @@ public class DestinationStatistics extends StatsImpl {
messages.setDoReset(false);
messagesCached = new PollCountStatisticImpl("messagesCached", "The number of messages that are held in the destination's memory cache");
processTime = new TimeStatisticImpl("processTime", "information around length of time messages are held by a destination");
blockedSends = new CountStatisticImpl("blockedSends", "number of messages that have to wait for flow control");
blockedTime = new TimeStatisticImpl("blockedTime","amount of time messages are blocked for flow control");
addStatistic("enqueues", enqueues);
addStatistic("dispatched", dispatched);
addStatistic("dequeues", dequeues);
@ -66,6 +71,8 @@ public class DestinationStatistics extends StatsImpl {
addStatistic("messages", messages);
addStatistic("messagesCached", messagesCached);
addStatistic("processTime", processTime);
addStatistic("blockedSends",blockedSends);
addStatistic("blockedTime",blockedTime);
}
public CountStatisticImpl getEnqueues() {
@ -112,6 +119,13 @@ public class DestinationStatistics extends StatsImpl {
return this.processTime;
}
public CountStatisticImpl getBlockedSends(){
return this.blockedSends;
}
public TimeStatisticImpl getBlockedTime(){
return this.blockedTime;
}
public void reset() {
if (this.isDoReset()) {
super.reset();
@ -120,6 +134,8 @@ public class DestinationStatistics extends StatsImpl {
dispatched.reset();
inflight.reset();
expired.reset();
blockedSends.reset();
blockedTime.reset();
}
}
@ -135,6 +151,8 @@ public class DestinationStatistics extends StatsImpl {
messages.setEnabled(enabled);
messagesCached.setEnabled(enabled);
processTime.setEnabled(enabled);
blockedSends.setEnabled(enabled);
blockedTime.setEnabled(enabled);
}
@ -150,6 +168,8 @@ public class DestinationStatistics extends StatsImpl {
messagesCached.setParent(parent.messagesCached);
messages.setParent(parent.messages);
processTime.setParent(parent.processTime);
blockedSends.setParent(parent.blockedSends);
blockedTime.setParent(parent.blockedTime);
} else {
enqueues.setParent(null);
dispatched.setParent(null);
@ -161,6 +181,8 @@ public class DestinationStatistics extends StatsImpl {
messagesCached.setParent(null);
messages.setParent(null);
processTime.setParent(null);
blockedSends.setParent(null);
blockedTime.setParent(null);
}
}

View File

@ -716,7 +716,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} else {
if (memoryUsage.isFull()) {
waitForSpace(context, memoryUsage, "Usage Manager Memory Limit reached. Producer ("
waitForSpace(context, producerExchange, memoryUsage, "Usage Manager Memory Limit reached. Producer ("
+ message.getProducerId() + ") stopped to prevent flooding "
+ getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info");
@ -869,7 +869,8 @@ public class Queue extends BaseDestination implements Task, UsageListener {
final ConnectionContext context = producerExchange.getConnectionContext();
Future<Object> result = null;
checkUsage(context, message);
producerExchange.incrementSend();
checkUsage(context, producerExchange, message);
sendLock.lockInterruptibly();
try {
if (store != null && message.isPersistent()) {
@ -911,7 +912,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
}
private void checkUsage(ConnectionContext context, Message message) throws ResourceAllocationException, IOException, InterruptedException {
private void checkUsage(ConnectionContext context,ProducerBrokerExchange producerBrokerExchange, Message message) throws ResourceAllocationException, IOException, InterruptedException {
if (message.isPersistent()) {
if (store != null && systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
final String logMessage = "Persistent store is Full, " + getStoreUsageHighWaterMark() + "% of "
@ -920,7 +921,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
+ getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
waitForSpace(context, producerBrokerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
}
} else if (messages.getSystemUsage() != null && systemUsage.getTempUsage().isFull()) {
final String logMessage = "Temp Store is Full ("
@ -929,7 +930,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
+ ") to prevent flooding " + getActiveMQDestination().getQualifiedName() + "."
+ " See http://activemq.apache.org/producer-flow-control.html for more info";
waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage);
waitForSpace(context, producerBrokerExchange, messages.getSystemUsage().getTempUsage(), logMessage);
}
}

View File

@ -316,6 +316,7 @@ public class Topic extends BaseDestination implements Task {
final ConnectionContext context = producerExchange.getConnectionContext();
final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
producerExchange.incrementSend();
final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
&& !context.isInRecoveryMode();
@ -418,6 +419,7 @@ public class Topic extends BaseDestination implements Task {
} else {
waitForSpace(
context,
producerExchange,
memoryUsage,
"Usage Manager Memory Usage limit reached. Stopping producer ("
+ message.getProducerId()
@ -475,7 +477,7 @@ public class Topic extends BaseDestination implements Task {
throw new javax.jms.ResourceAllocationException(logMessage);
}
waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
waitForSpace(context,producerExchange, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(), logMessage);
}
result = topicStore.asyncAddTopicMessage(context, message,isOptimizeStorage());
}