added enqueue/dequeue statistics at the Connection/Connector level in JMX

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@357683 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2005-12-19 12:22:09 +00:00
parent 0bf5ab3724
commit 384e290b7f
21 changed files with 452 additions and 110 deletions

View File

@ -26,6 +26,7 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import org.activemq.Service; import org.activemq.Service;
import org.activemq.broker.region.ConnectionStatistics;
import org.activemq.command.ActiveMQDestination; import org.activemq.command.ActiveMQDestination;
import org.activemq.command.BrokerInfo; import org.activemq.command.BrokerInfo;
import org.activemq.command.Command; import org.activemq.command.Command;
@ -78,11 +79,13 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
protected final TaskRunner taskRunner; protected final TaskRunner taskRunner;
protected final Connector connector; protected final Connector connector;
protected boolean demandForwardingBridge; protected boolean demandForwardingBridge;
private ConnectionStatistics statistics = new ConnectionStatistics();
protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap(); protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
private WireFormatInfo wireFormatInfo; private WireFormatInfo wireFormatInfo;
protected boolean disposed=false; protected boolean disposed=false;
static class ConnectionState extends org.activemq.state.ConnectionState { static class ConnectionState extends org.activemq.state.ConnectionState {
private final ConnectionContext context; private final ConnectionContext context;
@ -108,11 +111,15 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
this.connector = connector; this.connector = connector;
this.broker = broker; this.broker = broker;
this.statistics.setParent(connector.getStatistics());
if( taskRunnerFactory != null ) if( taskRunnerFactory != null ) {
taskRunner = taskRunnerFactory.createTaskRunner( this ); taskRunner = taskRunnerFactory.createTaskRunner( this );
else }
taskRunner = null; else {
taskRunner = null;
}
} }
/** /**
@ -561,4 +568,11 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
abstract protected void dispatch(Command command); abstract protected void dispatch(Command command);
/**
* Returns the statistics for this connection
*/
public ConnectionStatistics getStatistics() {
return statistics;
}
} }

View File

@ -19,6 +19,7 @@
package org.activemq.broker; package org.activemq.broker;
import org.activemq.Service; import org.activemq.Service;
import org.activemq.broker.region.ConnectionStatistics;
import org.activemq.command.Command; import org.activemq.command.Command;
import org.activemq.command.Response; import org.activemq.command.Response;
@ -86,4 +87,10 @@ public interface Connection extends Service {
* Returns the number of messages to be dispatched to this connection * Returns the number of messages to be dispatched to this connection
*/ */
public int getDispatchQueueSize(); public int getDispatchQueueSize();
/**
* Returns the statistics for this connection
*/
public ConnectionStatistics getStatistics();
} }

View File

@ -1,37 +1,42 @@
/** /**
* <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a> * <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
* *
* Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com * Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
* *
**/ **/
package org.activemq.broker; package org.activemq.broker;
import org.activemq.Service; import org.activemq.Service;
import org.activemq.broker.region.ConnectorStatistics;
import org.activemq.command.BrokerInfo; import org.activemq.command.BrokerInfo;
/** /**
* A connector creates and manages client connections that talk to the Broker. * A connector creates and manages client connections that talk to the Broker.
* *
* @version $Revision: 1.3 $ * @version $Revision: 1.3 $
*/ */
public interface Connector extends Service { public interface Connector extends Service {
/** /**
* *
* @return * @return
*/ */
public BrokerInfo getBrokerInfo(); public BrokerInfo getBrokerInfo();
/**
* @return the statistics for this connector
*/
public ConnectorStatistics getStatistics();
} }

View File

@ -66,7 +66,7 @@ public class TransportConnection extends AbstractConnection {
serviceTransportException(exception); serviceTransportException(exception);
} }
}); });
connected =true; connected = true;
} }
public void start() throws Exception { public void start() throws Exception {
@ -81,6 +81,7 @@ public class TransportConnection extends AbstractConnection {
super.stop(); super.stop();
} }
/** /**
* @return Returns the blockedCandidate. * @return Returns the blockedCandidate.
*/ */
@ -186,6 +187,7 @@ public class TransportConnection extends AbstractConnection {
try{ try{
setMarkedCandidate(true); setMarkedCandidate(true);
transport.oneway(command); transport.oneway(command);
getStatistics().onCommand(command);
}catch(IOException e){ }catch(IOException e){
serviceException(e); serviceException(e);
}finally{ }finally{

View File

@ -27,6 +27,7 @@ import javax.management.MBeanServer;
import javax.management.ObjectName; import javax.management.ObjectName;
import org.activemq.broker.jmx.ManagedTransportConnector; import org.activemq.broker.jmx.ManagedTransportConnector;
import org.activemq.broker.region.ConnectorStatistics;
import org.activemq.command.BrokerInfo; import org.activemq.command.BrokerInfo;
import org.activemq.command.ConnectionInfo; import org.activemq.command.ConnectionInfo;
import org.activemq.thread.TaskRunnerFactory; import org.activemq.thread.TaskRunnerFactory;
@ -60,8 +61,8 @@ public class TransportConnector implements Connector {
protected CopyOnWriteArrayList connections = new CopyOnWriteArrayList(); protected CopyOnWriteArrayList connections = new CopyOnWriteArrayList();
protected TransportStatusDetector statusDector; protected TransportStatusDetector statusDector;
private DiscoveryAgent discoveryAgent; private DiscoveryAgent discoveryAgent;
private ConnectorStatistics statistics = new ConnectorStatistics();
private URI discoveryUri; private URI discoveryUri;
private URI connectUri; private URI connectUri;
/** /**
@ -164,6 +165,13 @@ public class TransportConnector implements Connector {
this.taskRunnerFactory = taskRunnerFactory; this.taskRunnerFactory = taskRunnerFactory;
} }
/**
* @return the statistics for this connector
*/
public ConnectorStatistics getStatistics() {
return statistics;
}
public void start() throws Exception { public void start() throws Exception {
getServer().start(); getServer().start();
log.info("Accepting connection on: "+getServer().getConnectURI()); log.info("Accepting connection on: "+getServer().getConnectURI());

View File

@ -72,4 +72,31 @@ public class ConnectionView implements ConnectionViewMBean {
public int getDispatchQueueSize() { public int getDispatchQueueSize() {
return connection.getDispatchQueueSize(); return connection.getDispatchQueueSize();
} }
/**
* Resets the statistics
*/
public void resetStatistics() {
connection.getStatistics().reset();
}
/**
* Returns the number of messages enqueued on this connection
*
* @return the number of messages enqueued on this connection
*/
public long getEnqueueCount() {
return connection.getStatistics().getEnqueues().getCount();
}
/**
* Returns the number of messages dequeued on this connection
*
* @return the number of messages dequeued on this connection
*/
public long getDequeueCount() {
return connection.getStatistics().getDequeues().getCount();
}
} }

View File

@ -45,4 +45,24 @@ public interface ConnectionViewMBean extends Service {
* Returns the number of messages to be dispatched to this connection * Returns the number of messages to be dispatched to this connection
*/ */
public int getDispatchQueueSize(); public int getDispatchQueueSize();
/**
* Resets the statistics
*/
public void resetStatistics();
/**
* Returns the number of messages enqueued on this connection
*
* @return the number of messages enqueued on this connection
*/
public long getEnqueueCount();
/**
* Returns the number of messages dequeued on this connection
*
* @return the number of messages dequeued on this connection
*/
public long getDequeueCount();
} }

View File

@ -90,5 +90,31 @@ public class ConnectorView implements ConnectorViewMBean {
} }
return redeliveryPolicy; return redeliveryPolicy;
} }
/**
* Resets the statistics
*/
public void resetStatistics() {
connector.getStatistics().reset();
}
/**
* Returns the number of messages enqueued on this connector
*
* @return the number of messages enqueued on this connector
*/
public long getEnqueueCount() {
return connector.getStatistics().getEnqueues().getCount();
}
/**
* Returns the number of messages dequeued on this connector
*
* @return the number of messages dequeued on this connector
*/
public long getDequeueCount() {
return connector.getStatistics().getDequeues().getCount();
}
} }

View File

@ -19,8 +19,6 @@
package org.activemq.broker.jmx; package org.activemq.broker.jmx;
import org.activemq.Service; import org.activemq.Service;
import org.activemq.command.BrokerInfo;
import org.activemq.command.RedeliveryPolicy;
public interface ConnectorViewMBean extends Service { public interface ConnectorViewMBean extends Service {
@ -39,5 +37,24 @@ public interface ConnectorViewMBean extends Service {
public void setMaximumRedeliveries(int maximumRedeliveries); public void setMaximumRedeliveries(int maximumRedeliveries);
public void setUseExponentialBackOff(boolean useExponentialBackOff); public void setUseExponentialBackOff(boolean useExponentialBackOff);
/**
* Resets the statistics
*/
public void resetStatistics();
/**
* Returns the number of messages enqueued on this connector
*
* @return the number of messages enqueued on this connector
*/
public long getEnqueueCount();
/**
* Returns the number of messages dequeued on this connector
*
* @return the number of messages dequeued on this connector
*/
public long getDequeueCount();
} }

View File

@ -0,0 +1,83 @@
/**
* <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
*
* Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.activemq.broker.region;
import org.activemq.command.Command;
import org.activemq.command.Message;
import org.activemq.management.CountStatisticImpl;
import org.activemq.management.StatsImpl;
/**
* The J2EE Statistics for the Connection.
*
* @version $Revision$
*/
public class ConnectionStatistics extends StatsImpl {
private CountStatisticImpl enqueues;
private CountStatisticImpl dequeues;
public ConnectionStatistics() {
enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the connection");
dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been dispatched from the connection");
addStatistic("enqueues", enqueues);
addStatistic("dequeues", dequeues);
}
public CountStatisticImpl getEnqueues() {
return enqueues;
}
public CountStatisticImpl getDequeues() {
return dequeues;
}
public void reset() {
super.reset();
enqueues.reset();
dequeues.reset();
}
public void setParent(ConnectorStatistics parent) {
if (parent != null) {
enqueues.setParent(parent.getEnqueues());
dequeues.setParent(parent.getDequeues());
}
else {
enqueues.setParent(null);
dequeues.setParent(null);
}
}
/**
* Updates the statistics as a command is dispatched into the connection
*/
public void onCommand(Command command) {
if (command.isMessageDispatch()) {
enqueues.increment();
}
}
public void onMessageDequeue(Message message) {
dequeues.increment();
}
}

View File

@ -0,0 +1,94 @@
/**
* <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
*
* Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.activemq.broker.region;
import org.activemq.management.CountStatisticImpl;
import org.activemq.management.PollCountStatisticImpl;
import org.activemq.management.StatsImpl;
/**
* The J2EE Statistics for the a Destination.
*
* @version $Revision$
*/
public class ConnectorStatistics extends StatsImpl {
protected CountStatisticImpl enqueues;
protected CountStatisticImpl dequeues;
protected CountStatisticImpl consumers;
protected CountStatisticImpl messages;
protected PollCountStatisticImpl messagesCached;
public ConnectorStatistics() {
enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination");
dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been dispatched from the destination");
consumers = new CountStatisticImpl("consumers", "The number of consumers that that are subscribing to messages from the destination");
messages = new CountStatisticImpl("messages", "The number of messages that that are being held by the destination");
messagesCached = new PollCountStatisticImpl("messagesCached", "The number of messages that are held in the destination's memory cache");
addStatistic("enqueues", enqueues);
addStatistic("dequeues", dequeues);
addStatistic("consumers", consumers);
addStatistic("messages", messages);
addStatistic("messagesCached", messagesCached);
}
public CountStatisticImpl getEnqueues() {
return enqueues;
}
public CountStatisticImpl getDequeues() {
return dequeues;
}
public CountStatisticImpl getConsumers() {
return consumers;
}
public PollCountStatisticImpl getMessagesCached() {
return messagesCached;
}
public CountStatisticImpl getMessages() {
return messages;
}
public void reset() {
super.reset();
enqueues.reset();
dequeues.reset();
}
public void setParent(ConnectorStatistics parent) {
if( parent!=null ) {
enqueues.setParent(parent.enqueues);
dequeues.setParent(parent.dequeues);
consumers.setParent(parent.consumers);
messagesCached.setParent(parent.messagesCached);
messages.setParent(parent.messages);
} else {
enqueues.setParent(null);
dequeues.setParent(null);
consumers.setParent(null);
messagesCached.setParent(null);
messages.setParent(null);
}
}
public void setMessagesCached(PollCountStatisticImpl messagesCached) {
this.messagesCached = messagesCached;
}
}

View File

@ -1,23 +1,25 @@
/** /**
* <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a> * <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
* *
* Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com * Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
* *
**/ **/
package org.activemq.broker.region; package org.activemq.broker.region;
import org.activemq.command.Message;
import org.activemq.management.CountStatisticImpl; import org.activemq.management.CountStatisticImpl;
import org.activemq.management.PollCountStatisticImpl; import org.activemq.management.PollCountStatisticImpl;
import org.activemq.management.StatsImpl; import org.activemq.management.StatsImpl;
@ -28,7 +30,7 @@ import org.activemq.management.StatsImpl;
* @version $Revision$ * @version $Revision$
*/ */
public class DestinationStatistics extends StatsImpl { public class DestinationStatistics extends StatsImpl {
protected CountStatisticImpl enqueues; protected CountStatisticImpl enqueues;
protected CountStatisticImpl dequeues; protected CountStatisticImpl dequeues;
protected CountStatisticImpl consumers; protected CountStatisticImpl consumers;
@ -36,32 +38,36 @@ public class DestinationStatistics extends StatsImpl {
protected PollCountStatisticImpl messagesCached; protected PollCountStatisticImpl messagesCached;
public DestinationStatistics() { public DestinationStatistics() {
enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination"); enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination");
dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been dispatched from the destination"); dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been dispatched from the destination");
consumers = new CountStatisticImpl("consumers", "The number of consumers that that are subscribing to messages from the destination"); consumers = new CountStatisticImpl("consumers", "The number of consumers that that are subscribing to messages from the destination");
messages = new CountStatisticImpl("messages", "The number of messages that that are being held by the destination"); messages = new CountStatisticImpl("messages", "The number of messages that that are being held by the destination");
messagesCached = new PollCountStatisticImpl("messagesCached", "The number of messages that are held in the destination's memory cache"); messagesCached = new PollCountStatisticImpl("messagesCached", "The number of messages that are held in the destination's memory cache");
addStatistic("enqueues", enqueues); addStatistic("enqueues", enqueues);
addStatistic("dequeues", dequeues); addStatistic("dequeues", dequeues);
addStatistic("consumers", consumers); addStatistic("consumers", consumers);
addStatistic("messages", messages); addStatistic("messages", messages);
addStatistic("messagesCached", messagesCached); addStatistic("messagesCached", messagesCached);
} }
public CountStatisticImpl getEnqueues() { public CountStatisticImpl getEnqueues() {
return enqueues; return enqueues;
} }
public CountStatisticImpl getDequeues() { public CountStatisticImpl getDequeues() {
return dequeues; return dequeues;
} }
public CountStatisticImpl getConsumers() { public CountStatisticImpl getConsumers() {
return consumers; return consumers;
} }
public PollCountStatisticImpl getMessagesCached() { public PollCountStatisticImpl getMessagesCached() {
return messagesCached; return messagesCached;
} }
public CountStatisticImpl getMessages() { public CountStatisticImpl getMessages() {
return messages; return messages;
} }
@ -71,15 +77,16 @@ public class DestinationStatistics extends StatsImpl {
enqueues.reset(); enqueues.reset();
dequeues.reset(); dequeues.reset();
} }
public void setParent(DestinationStatistics parent) { public void setParent(DestinationStatistics parent) {
if( parent!=null ) { if (parent != null) {
enqueues.setParent(parent.enqueues); enqueues.setParent(parent.enqueues);
dequeues.setParent(parent.dequeues); dequeues.setParent(parent.dequeues);
consumers.setParent(parent.consumers); consumers.setParent(parent.consumers);
messagesCached.setParent(parent.messagesCached); messagesCached.setParent(parent.messagesCached);
messages.setParent(parent.messages); messages.setParent(parent.messages);
} else { }
else {
enqueues.setParent(null); enqueues.setParent(null);
dequeues.setParent(null); dequeues.setParent(null);
consumers.setParent(null); consumers.setParent(null);
@ -91,4 +98,16 @@ public class DestinationStatistics extends StatsImpl {
public void setMessagesCached(PollCountStatisticImpl messagesCached) { public void setMessagesCached(PollCountStatisticImpl messagesCached) {
this.messagesCached = messagesCached; this.messagesCached = messagesCached;
} }
/**
* Called when a message is enqueued to update the statistics.
*/
public void onMessageEnqueue(Message message) {
getEnqueues().increment();
getMessages().increment();
}
public void onMessageDequeue(Message message) {
getDequeues().increment();
}
} }

View File

@ -277,7 +277,8 @@ abstract public class PrefetchSubscription extends AbstractSubscription {
node.decrementReferenceCount(); node.decrementReferenceCount();
if( node.getRegionDestination() !=null ) { if( node.getRegionDestination() !=null ) {
node.getRegionDestination().getDestinationStatistics().getDequeues().increment(); node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
context.getConnection().getStatistics().onMessageDequeue(message);
if( wasFull && !isFull() ) { if( wasFull && !isFull() ) {
try { try {

View File

@ -344,8 +344,7 @@ public class Queue implements Destination {
dispatchValve.increment(); dispatchValve.increment();
MessageEvaluationContext msgContext = context.getMessageEvaluationContext(); MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
try { try {
destinationStatistics.getEnqueues().increment(); destinationStatistics.onMessageEnqueue(message);
destinationStatistics.getMessages().increment();
synchronized (messages) { synchronized (messages) {
messages.add(node); messages.add(node);
} }

View File

@ -60,6 +60,10 @@ abstract public class BaseCommand implements Command {
public boolean isMarshallAware() { public boolean isMarshallAware() {
return false; return false;
} }
public boolean isMessageAck() {
return false;
}
/** /**
* @openwire:property version=1 * @openwire:property version=1

View File

@ -43,6 +43,7 @@ public interface Command extends DataStructure {
boolean isBrokerInfo(); boolean isBrokerInfo();
boolean isWireFormatInfo(); boolean isWireFormatInfo();
boolean isMessage(); boolean isMessage();
boolean isMessageAck();
Response visit( CommandVisitor visitor) throws Throwable; Response visit( CommandVisitor visitor) throws Throwable;
} }

View File

@ -58,6 +58,10 @@ public class KeepAliveInfo implements Command {
return false; return false;
} }
public boolean isMessageAck() {
return false;
}
public boolean isBrokerInfo() { public boolean isBrokerInfo() {
return false; return false;
} }

View File

@ -84,6 +84,10 @@ public class MessageAck extends BaseCommand {
public byte getDataStructureType() { public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE; return DATA_STRUCTURE_TYPE;
} }
public boolean isMessageAck() {
return true;
}
public boolean isPoisonAck() { public boolean isPoisonAck() {
return ackType==POSION_ACK_TYPE; return ackType==POSION_ACK_TYPE;

View File

@ -1,50 +1,50 @@
/** /**
* <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a> * <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
* *
* Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com * Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
* *
**/ **/
package org.activemq.command; package org.activemq.command;
import java.util.Arrays;
import org.activemq.state.CommandVisitor; import org.activemq.state.CommandVisitor;
import java.util.Arrays;
/** /**
* *
* @openwire:marshaller * @openwire:marshaller
* @version $Revision$ * @version $Revision$
*/ */
public class WireFormatInfo implements Command { public class WireFormatInfo implements Command {
public static final byte DATA_STRUCTURE_TYPE=CommandTypes.WIREFORMAT_INFO; public static final byte DATA_STRUCTURE_TYPE = CommandTypes.WIREFORMAT_INFO;
static final private byte MAGIC[] = new byte[]{'A','c','t','i','v','e','M','Q'}; static final private byte MAGIC[] = new byte[] { 'A', 'c', 't', 'i', 'v', 'e', 'M', 'Q' };
static final public long STACK_TRACE_MASK = 0x00000001; static final public long STACK_TRACE_MASK = 0x00000001;
static final public long TCP_NO_DELAY_MASK = 0x00000002; static final public long TCP_NO_DELAY_MASK = 0x00000002;
static final public long CACHE_MASK = 0x00000004; static final public long CACHE_MASK = 0x00000004;
static final public long COMPRESSION_MASK = 0x00000008; static final public long COMPRESSION_MASK = 0x00000008;
protected int version; protected int version;
protected byte magic[] = MAGIC; protected byte magic[] = MAGIC;
protected int options; protected int options;
public byte getDataStructureType() { public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE; return DATA_STRUCTURE_TYPE;
} }
public boolean isWireFormatInfo() { public boolean isWireFormatInfo() {
return true; return true;
} }
@ -55,6 +55,7 @@ public class WireFormatInfo implements Command {
public byte[] getMagic() { public byte[] getMagic() {
return magic; return magic;
} }
public void setMagic(byte[] magic) { public void setMagic(byte[] magic) {
this.magic = magic; this.magic = magic;
} }
@ -71,7 +72,7 @@ public class WireFormatInfo implements Command {
} }
public boolean isValid() { public boolean isValid() {
return magic!=null && Arrays.equals(magic, MAGIC); return magic != null && Arrays.equals(magic, MAGIC);
} }
public void setCommandId(short value) { public void setCommandId(short value) {
@ -92,20 +93,24 @@ public class WireFormatInfo implements Command {
public boolean isBrokerInfo() { public boolean isBrokerInfo() {
return false; return false;
} }
public boolean isMessageDispatch() { public boolean isMessageDispatch() {
return false; return false;
} }
public boolean isMessage() { public boolean isMessage() {
return false; return false;
} }
public boolean isMessageAck() {
return false;
}
public void setResponseRequired(boolean responseRequired) { public void setResponseRequired(boolean responseRequired) {
} }
public String toString() { public String toString() {
return "WireFormatInfo {version="+version+"}"; return "WireFormatInfo {version=" + version + "}";
} }
/** /**
@ -118,47 +123,52 @@ public class WireFormatInfo implements Command {
public void setOptions(int options) { public void setOptions(int options) {
this.options = options; this.options = options;
} }
public boolean isStackTraceEnabled() { public boolean isStackTraceEnabled() {
return (options & STACK_TRACE_MASK)!=0; return (options & STACK_TRACE_MASK) != 0;
} }
public void setStackTraceEnabled(boolean enable) { public void setStackTraceEnabled(boolean enable) {
if( enable ) { if (enable) {
options |= STACK_TRACE_MASK; options |= STACK_TRACE_MASK;
} else { }
else {
options &= ~STACK_TRACE_MASK; options &= ~STACK_TRACE_MASK;
} }
} }
public boolean isTcpNoDelayEnabled() { public boolean isTcpNoDelayEnabled() {
return (options & TCP_NO_DELAY_MASK)!=0; return (options & TCP_NO_DELAY_MASK) != 0;
} }
public void setTcpNoDelayEnabled(boolean enable) { public void setTcpNoDelayEnabled(boolean enable) {
if( enable ) { if (enable) {
options |= TCP_NO_DELAY_MASK; options |= TCP_NO_DELAY_MASK;
} else { }
else {
options &= ~TCP_NO_DELAY_MASK; options &= ~TCP_NO_DELAY_MASK;
} }
} }
public boolean isCacheEnabled() { public boolean isCacheEnabled() {
return (options & CACHE_MASK)!=0; return (options & CACHE_MASK) != 0;
} }
public void setCacheEnabled(boolean enable) { public void setCacheEnabled(boolean enable) {
if( enable ) { if (enable) {
options |= CACHE_MASK; options |= CACHE_MASK;
} else { }
else {
options &= ~CACHE_MASK; options &= ~CACHE_MASK;
} }
} }
public Response visit(CommandVisitor visitor) throws Throwable { public Response visit(CommandVisitor visitor) throws Throwable {
return visitor.processWireFormat( this ); return visitor.processWireFormat(this);
} }
public boolean isMarshallAware() { public boolean isMarshallAware() {
return false; return false;
} }
} }

View File

@ -62,8 +62,6 @@ public class ActiveMQInitialContextFactory implements InitialContextFactory {
public Context getInitialContext(Hashtable environment) throws NamingException { public Context getInitialContext(Hashtable environment) throws NamingException {
// lets create a factory // lets create a factory
Map data = new ConcurrentHashMap(); Map data = new ConcurrentHashMap();
Broker broker = null;
String[] names = getConnectionFactoryNames(environment); String[] names = getConnectionFactoryNames(environment);
for (int i = 0; i < names.length; i++) { for (int i = 0; i < names.length; i++) {
ActiveMQConnectionFactory factory =null; ActiveMQConnectionFactory factory =null;

View File

@ -18,11 +18,10 @@
**/ **/
package org.activemq.management; package org.activemq.management;
import java.util.List;
import org.activemq.ActiveMQSession; import org.activemq.ActiveMQSession;
import org.activemq.util.IndentPrinter; import org.activemq.util.IndentPrinter;
import javax.management.j2ee.statistics.Statistic;
import java.util.List;
/** /**
* Statistics for a JMS connection * Statistics for a JMS connection
* *