mirror of https://github.com/apache/activemq.git
Added a udp and multicast broker tracing plugins. If enabled on a broker they can be used to monitor the broker without much network overhead.
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@433948 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
73ef5bea04
commit
4b6b61b820
|
@ -0,0 +1,61 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker.util;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.DatagramSocket;
|
||||
import java.net.MulticastSocket;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
/**
|
||||
* A Broker interceptor which allows you to trace all operations to a Multicast socket.
|
||||
*
|
||||
* @org.apache.xbean.XBean
|
||||
*
|
||||
* @version $Revision: 427613 $
|
||||
*/
|
||||
public class MulticastTraceBrokerPlugin extends UDPTraceBrokerPlugin {
|
||||
|
||||
private int timeToLive = 1;
|
||||
|
||||
public MulticastTraceBrokerPlugin() {
|
||||
try {
|
||||
destination = new URI("multicast://224.1.2.3:61616");
|
||||
} catch (URISyntaxException wontHappen) {
|
||||
}
|
||||
}
|
||||
|
||||
protected DatagramSocket createSocket() throws IOException {
|
||||
MulticastSocket s = new MulticastSocket();
|
||||
s.setSendBufferSize(maxTraceDatagramSize);
|
||||
s.setBroadcast(broadcast);
|
||||
s.setLoopbackMode(true);
|
||||
s.setTimeToLive(timeToLive);
|
||||
return s;
|
||||
}
|
||||
|
||||
public int getTimeToLive() {
|
||||
return timeToLive;
|
||||
}
|
||||
|
||||
public void setTimeToLive(int timeToLive) {
|
||||
this.timeToLive = timeToLive;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,192 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.broker.util;
|
||||
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.DatagramPacket;
|
||||
import java.net.DatagramSocket;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
|
||||
import org.apache.activeio.command.WireFormat;
|
||||
import org.apache.activeio.command.WireFormatFactory;
|
||||
import org.apache.activeio.packet.ByteSequence;
|
||||
import org.apache.activeio.util.ByteArrayOutputStream;
|
||||
import org.apache.activemq.broker.BrokerPluginSupport;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.command.BrokerId;
|
||||
import org.apache.activemq.command.DataStructure;
|
||||
import org.apache.activemq.command.Message;
|
||||
import org.apache.activemq.command.MessageAck;
|
||||
import org.apache.activemq.command.JournalTrace;
|
||||
import org.apache.activemq.openwire.OpenWireFormatFactory;
|
||||
|
||||
/**
|
||||
* A Broker interceptor which allows you to trace all operations to a UDP socket.
|
||||
*
|
||||
* @org.apache.xbean.XBean
|
||||
*
|
||||
* @version $Revision: 427613 $
|
||||
*/
|
||||
public class UDPTraceBrokerPlugin extends BrokerPluginSupport {
|
||||
|
||||
protected WireFormat wireFormat;
|
||||
protected WireFormatFactory wireFormatFactory;
|
||||
protected int maxTraceDatagramSize = 1024*4;
|
||||
protected URI destination;
|
||||
protected DatagramSocket socket;
|
||||
|
||||
protected BrokerId brokerId;
|
||||
protected SocketAddress address;
|
||||
protected boolean broadcast;
|
||||
|
||||
public UDPTraceBrokerPlugin() {
|
||||
try {
|
||||
destination = new URI("udp://127.0.0.1:61616");
|
||||
} catch (URISyntaxException wontHappen) {
|
||||
}
|
||||
}
|
||||
|
||||
public void start() throws Exception {
|
||||
super.start();
|
||||
if( getWireFormat() == null )
|
||||
throw new IllegalArgumentException("Wireformat must be specifed.");
|
||||
if( address == null ) {
|
||||
address = createSocketAddress(destination);
|
||||
}
|
||||
socket = createSocket();
|
||||
|
||||
brokerId = super.getBrokerId();
|
||||
trace(new JournalTrace("START"));
|
||||
}
|
||||
|
||||
protected DatagramSocket createSocket() throws IOException {
|
||||
DatagramSocket s = new DatagramSocket();
|
||||
s.setSendBufferSize(maxTraceDatagramSize);
|
||||
s.setBroadcast(broadcast);
|
||||
return s;
|
||||
}
|
||||
|
||||
public void stop() throws Exception {
|
||||
trace(new JournalTrace("STOP"));
|
||||
socket.close();
|
||||
super.stop();
|
||||
}
|
||||
|
||||
private void trace(DataStructure command) throws IOException {
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream(maxTraceDatagramSize);
|
||||
DataOutputStream out = new DataOutputStream(baos);
|
||||
wireFormat.marshal(brokerId, out);
|
||||
wireFormat.marshal(command, out);
|
||||
out.close();
|
||||
ByteSequence sequence = baos.toByteSequence();
|
||||
DatagramPacket datagram = new DatagramPacket( sequence.getData(), sequence.getOffset(), sequence.getLength(), address);
|
||||
socket.send(datagram);
|
||||
}
|
||||
|
||||
public void send(ConnectionContext context, Message messageSend) throws Exception {
|
||||
trace(messageSend);
|
||||
super.send(context, messageSend);
|
||||
}
|
||||
|
||||
public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
|
||||
trace(ack);
|
||||
super.acknowledge(context, ack);
|
||||
}
|
||||
|
||||
public WireFormat getWireFormat() {
|
||||
if( wireFormat == null ) {
|
||||
wireFormat = createWireFormat();
|
||||
}
|
||||
return wireFormat;
|
||||
}
|
||||
|
||||
protected WireFormat createWireFormat() {
|
||||
return getWireFormatFactory().createWireFormat();
|
||||
}
|
||||
|
||||
public void setWireFormat(WireFormat wireFormat) {
|
||||
this.wireFormat = wireFormat;
|
||||
}
|
||||
|
||||
public WireFormatFactory getWireFormatFactory() {
|
||||
if( wireFormatFactory == null ) {
|
||||
wireFormatFactory = createWireFormatFactory();
|
||||
}
|
||||
return wireFormatFactory;
|
||||
}
|
||||
|
||||
protected OpenWireFormatFactory createWireFormatFactory() {
|
||||
OpenWireFormatFactory wf = new OpenWireFormatFactory();
|
||||
wf.setCacheEnabled(false);
|
||||
wf.setVersion(1);
|
||||
wf.setTightEncodingEnabled(true);
|
||||
wf.setSizePrefixDisabled(true);
|
||||
return wf;
|
||||
}
|
||||
|
||||
public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
|
||||
this.wireFormatFactory = wireFormatFactory;
|
||||
}
|
||||
|
||||
|
||||
protected SocketAddress createSocketAddress(URI location) throws UnknownHostException {
|
||||
InetAddress a = InetAddress.getByName(location.getHost());
|
||||
int port = location.getPort();
|
||||
return new InetSocketAddress(a, port);
|
||||
}
|
||||
|
||||
public URI getDestination() {
|
||||
return destination;
|
||||
}
|
||||
|
||||
public void setDestination(URI destination) {
|
||||
this.destination = destination;
|
||||
}
|
||||
|
||||
public int getMaxTraceDatagramSize() {
|
||||
return maxTraceDatagramSize;
|
||||
}
|
||||
|
||||
public void setMaxTraceDatagramSize(int maxTraceDatagramSize) {
|
||||
this.maxTraceDatagramSize = maxTraceDatagramSize;
|
||||
}
|
||||
|
||||
public boolean isBroadcast() {
|
||||
return broadcast;
|
||||
}
|
||||
|
||||
public void setBroadcast(boolean broadcast) {
|
||||
this.broadcast = broadcast;
|
||||
}
|
||||
|
||||
public SocketAddress getAddress() {
|
||||
return address;
|
||||
}
|
||||
|
||||
public void setAddress(SocketAddress address) {
|
||||
this.address = address;
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -30,6 +30,13 @@ public class JournalTrace implements DataStructure {
|
|||
|
||||
private String message;
|
||||
|
||||
public JournalTrace() {
|
||||
|
||||
}
|
||||
public JournalTrace(String message) {
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
public byte getDataStructureType() {
|
||||
return DATA_STRUCTURE_TYPE;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue