diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java b/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java index 76cc62e719..7718f17350 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/util/UDPTraceBrokerPlugin.java @@ -30,26 +30,43 @@ import java.net.UnknownHostException; import org.apache.activemq.broker.BrokerPluginSupport; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.DataStructure; +import org.apache.activemq.command.DestinationInfo; +import org.apache.activemq.command.JournalTrace; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.JournalTrace; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessageDispatchNotification; +import org.apache.activemq.command.MessagePull; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.RemoveSubscriptionInfo; +import org.apache.activemq.command.Response; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.openwire.OpenWireFormatFactory; import org.apache.activemq.util.ByteArrayOutputStream; import org.apache.activemq.util.ByteSequence; import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormatFactory; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * A Broker interceptor which allows you to trace all operations to a UDP socket. * - * @org.apache.xbean.XBean + * @org.apache.xbean.XBean element="udpTraceBrokerPlugin" * * @version $Revision: 427613 $ */ public class UDPTraceBrokerPlugin extends BrokerPluginSupport { + static final private Log log = LogFactory.getLog(UDPTraceBrokerPlugin.class); protected WireFormat wireFormat; protected WireFormatFactory wireFormatFactory; protected int maxTraceDatagramSize = 1024*4; @@ -93,15 +110,21 @@ public class UDPTraceBrokerPlugin extends BrokerPluginSupport { super.stop(); } - private void trace(DataStructure command) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(maxTraceDatagramSize); - DataOutputStream out = new DataOutputStream(baos); - wireFormat.marshal(brokerId, out); - wireFormat.marshal(command, out); - out.close(); - ByteSequence sequence = baos.toByteSequence(); - DatagramPacket datagram = new DatagramPacket( sequence.getData(), sequence.getOffset(), sequence.getLength(), address); - socket.send(datagram); + private void trace(DataStructure command) { + try { + + ByteArrayOutputStream baos = new ByteArrayOutputStream(maxTraceDatagramSize); + DataOutputStream out = new DataOutputStream(baos); + wireFormat.marshal(brokerId, out); + wireFormat.marshal(command, out); + out.close(); + ByteSequence sequence = baos.toByteSequence(); + DatagramPacket datagram = new DatagramPacket( sequence.getData(), sequence.getOffset(), sequence.getLength(), address); + socket.send(datagram); + + } catch ( Throwable e) { + log.debug("Failed to trace: "+command, e); + } } public void send(ConnectionContext context, Message messageSend) throws Exception { @@ -113,6 +136,105 @@ public class UDPTraceBrokerPlugin extends BrokerPluginSupport { trace(ack); super.acknowledge(context, ack); } + + public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { + trace(info); + super.addConnection(context, info); + } + + public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { + trace(info); + return super.addConsumer(context, info); + } + + public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { + trace(info); + super.addDestinationInfo(context, info); + } + + public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { + trace(info); + super.addProducer(context, info); + } + + public void addSession(ConnectionContext context, SessionInfo info) throws Exception { + trace(info); + super.addSession(context, info); + } + + public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { + trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.BEGIN)); + super.beginTransaction(context, xid); + } + + public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { + trace(new TransactionInfo(context.getConnectionId(), xid, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE)); + super.commitTransaction(context, xid, onePhase); + } + + public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception { + trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.FORGET)); + super.forgetTransaction(context, xid); + } + + public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { + trace(pull); + return super.messagePull(context, pull); + } + + public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { + trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.PREPARE)); + return super.prepareTransaction(context, xid); + } + + public void processDispatch(MessageDispatch messageDispatch) { + trace(messageDispatch); + super.processDispatch(messageDispatch); + } + + public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception { + trace(messageDispatchNotification); + super.processDispatchNotification(messageDispatchNotification); + } + + public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { + trace(info.createRemoveCommand()); + super.removeConnection(context, info, error); + } + + public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { + trace(info.createRemoveCommand()); + super.removeConsumer(context, info); + } + + public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { + super.removeDestination(context, destination, timeout); + } + + public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception { + trace(info); + super.removeDestinationInfo(context, info); + } + + public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { + trace(info.createRemoveCommand()); + super.removeProducer(context, info); + } + + public void removeSession(ConnectionContext context, SessionInfo info) throws Exception { + trace(info.createRemoveCommand()); + super.removeSession(context, info); + } + + public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception { + trace(info); + super.removeSubscription(context, info); + } + + public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { + trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.ROLLBACK)); + super.rollbackTransaction(context, xid); + } public WireFormat getWireFormat() { if( wireFormat == null ) {