mirror of https://github.com/apache/activemq.git
trace more operations and don't let tracing errors cause operations to fail.
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@439886 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9da7781c49
commit
8029b8d557
|
@ -30,26 +30,43 @@ import java.net.UnknownHostException;
|
|||
|
||||
import org.apache.activemq.broker.BrokerPluginSupport;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.broker.region.Subscription;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.BrokerId;
|
||||
import org.apache.activemq.command.ConnectionInfo;
|
||||
import org.apache.activemq.command.ConsumerInfo;
|
||||
import org.apache.activemq.command.DataStructure;
|
||||
import org.apache.activemq.command.DestinationInfo;
|
||||
import org.apache.activemq.command.JournalTrace;
|
||||
import org.apache.activemq.command.Message;
|
||||
import org.apache.activemq.command.MessageAck;
|
||||
import org.apache.activemq.command.JournalTrace;
|
||||
import org.apache.activemq.command.MessageDispatch;
|
||||
import org.apache.activemq.command.MessageDispatchNotification;
|
||||
import org.apache.activemq.command.MessagePull;
|
||||
import org.apache.activemq.command.ProducerInfo;
|
||||
import org.apache.activemq.command.RemoveSubscriptionInfo;
|
||||
import org.apache.activemq.command.Response;
|
||||
import org.apache.activemq.command.SessionInfo;
|
||||
import org.apache.activemq.command.TransactionId;
|
||||
import org.apache.activemq.command.TransactionInfo;
|
||||
import org.apache.activemq.openwire.OpenWireFormatFactory;
|
||||
import org.apache.activemq.util.ByteArrayOutputStream;
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
import org.apache.activemq.wireformat.WireFormat;
|
||||
import org.apache.activemq.wireformat.WireFormatFactory;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* A Broker interceptor which allows you to trace all operations to a UDP socket.
|
||||
*
|
||||
* @org.apache.xbean.XBean
|
||||
* @org.apache.xbean.XBean element="udpTraceBrokerPlugin"
|
||||
*
|
||||
* @version $Revision: 427613 $
|
||||
*/
|
||||
public class UDPTraceBrokerPlugin extends BrokerPluginSupport {
|
||||
|
||||
static final private Log log = LogFactory.getLog(UDPTraceBrokerPlugin.class);
|
||||
protected WireFormat wireFormat;
|
||||
protected WireFormatFactory wireFormatFactory;
|
||||
protected int maxTraceDatagramSize = 1024*4;
|
||||
|
@ -93,15 +110,21 @@ public class UDPTraceBrokerPlugin extends BrokerPluginSupport {
|
|||
super.stop();
|
||||
}
|
||||
|
||||
private void trace(DataStructure command) throws IOException {
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream(maxTraceDatagramSize);
|
||||
DataOutputStream out = new DataOutputStream(baos);
|
||||
wireFormat.marshal(brokerId, out);
|
||||
wireFormat.marshal(command, out);
|
||||
out.close();
|
||||
ByteSequence sequence = baos.toByteSequence();
|
||||
DatagramPacket datagram = new DatagramPacket( sequence.getData(), sequence.getOffset(), sequence.getLength(), address);
|
||||
socket.send(datagram);
|
||||
private void trace(DataStructure command) {
|
||||
try {
|
||||
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream(maxTraceDatagramSize);
|
||||
DataOutputStream out = new DataOutputStream(baos);
|
||||
wireFormat.marshal(brokerId, out);
|
||||
wireFormat.marshal(command, out);
|
||||
out.close();
|
||||
ByteSequence sequence = baos.toByteSequence();
|
||||
DatagramPacket datagram = new DatagramPacket( sequence.getData(), sequence.getOffset(), sequence.getLength(), address);
|
||||
socket.send(datagram);
|
||||
|
||||
} catch ( Throwable e) {
|
||||
log.debug("Failed to trace: "+command, e);
|
||||
}
|
||||
}
|
||||
|
||||
public void send(ConnectionContext context, Message messageSend) throws Exception {
|
||||
|
@ -113,6 +136,105 @@ public class UDPTraceBrokerPlugin extends BrokerPluginSupport {
|
|||
trace(ack);
|
||||
super.acknowledge(context, ack);
|
||||
}
|
||||
|
||||
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
|
||||
trace(info);
|
||||
super.addConnection(context, info);
|
||||
}
|
||||
|
||||
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
|
||||
trace(info);
|
||||
return super.addConsumer(context, info);
|
||||
}
|
||||
|
||||
public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
|
||||
trace(info);
|
||||
super.addDestinationInfo(context, info);
|
||||
}
|
||||
|
||||
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
|
||||
trace(info);
|
||||
super.addProducer(context, info);
|
||||
}
|
||||
|
||||
public void addSession(ConnectionContext context, SessionInfo info) throws Exception {
|
||||
trace(info);
|
||||
super.addSession(context, info);
|
||||
}
|
||||
|
||||
public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
|
||||
trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.BEGIN));
|
||||
super.beginTransaction(context, xid);
|
||||
}
|
||||
|
||||
public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
|
||||
trace(new TransactionInfo(context.getConnectionId(), xid, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE));
|
||||
super.commitTransaction(context, xid, onePhase);
|
||||
}
|
||||
|
||||
public void forgetTransaction(ConnectionContext context, TransactionId xid) throws Exception {
|
||||
trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.FORGET));
|
||||
super.forgetTransaction(context, xid);
|
||||
}
|
||||
|
||||
public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
|
||||
trace(pull);
|
||||
return super.messagePull(context, pull);
|
||||
}
|
||||
|
||||
public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
|
||||
trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.PREPARE));
|
||||
return super.prepareTransaction(context, xid);
|
||||
}
|
||||
|
||||
public void processDispatch(MessageDispatch messageDispatch) {
|
||||
trace(messageDispatch);
|
||||
super.processDispatch(messageDispatch);
|
||||
}
|
||||
|
||||
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
|
||||
trace(messageDispatchNotification);
|
||||
super.processDispatchNotification(messageDispatchNotification);
|
||||
}
|
||||
|
||||
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
|
||||
trace(info.createRemoveCommand());
|
||||
super.removeConnection(context, info, error);
|
||||
}
|
||||
|
||||
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
|
||||
trace(info.createRemoveCommand());
|
||||
super.removeConsumer(context, info);
|
||||
}
|
||||
|
||||
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
|
||||
super.removeDestination(context, destination, timeout);
|
||||
}
|
||||
|
||||
public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
|
||||
trace(info);
|
||||
super.removeDestinationInfo(context, info);
|
||||
}
|
||||
|
||||
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
|
||||
trace(info.createRemoveCommand());
|
||||
super.removeProducer(context, info);
|
||||
}
|
||||
|
||||
public void removeSession(ConnectionContext context, SessionInfo info) throws Exception {
|
||||
trace(info.createRemoveCommand());
|
||||
super.removeSession(context, info);
|
||||
}
|
||||
|
||||
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
|
||||
trace(info);
|
||||
super.removeSubscription(context, info);
|
||||
}
|
||||
|
||||
public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
|
||||
trace(new TransactionInfo(context.getConnectionId(), xid, TransactionInfo.ROLLBACK));
|
||||
super.rollbackTransaction(context, xid);
|
||||
}
|
||||
|
||||
public WireFormat getWireFormat() {
|
||||
if( wireFormat == null ) {
|
||||
|
|
Loading…
Reference in New Issue