git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@565381 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-08-13 15:44:44 +00:00
parent 540dd5c987
commit df6b54240c
19 changed files with 1104 additions and 35 deletions

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.broker.jmx; package org.apache.activemq.broker.jmx;
import java.net.URL;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import javax.management.ObjectName; import javax.management.ObjectName;
@ -29,6 +30,8 @@ import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.log4j.LogManager;
import org.apache.log4j.PropertyConfigurator;
public class BrokerView implements BrokerViewMBean { public class BrokerView implements BrokerViewMBean {
@ -228,4 +231,15 @@ public class BrokerView implements BrokerViewMBean {
return context; return context;
} }
// doc comment inherited from BrokerViewMBean
public void reloadLog4jProperties() throws Exception {
LogManager.resetConfiguration();
ClassLoader cl = this.getClass().getClassLoader();
URL log4jprops = cl.getResource("log4j.properties");
if (log4jprops != null) {
PropertyConfigurator.configure(log4jprops);
}
}
} }

View File

@ -141,4 +141,11 @@ public interface BrokerViewMBean extends Service {
*/ */
void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception; void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception;
/**
* Reloads log4j.properties from the classpath.
* This methods calls org.apache.activemq.transport.TransportLoggerControl.reloadLog4jProperties
* @throws Exception
*/
public void reloadLog4jProperties() throws Exception;
} }

View File

@ -0,0 +1,66 @@
package org.apache.activemq.transport;
import java.io.IOException;
import org.apache.commons.logging.Log;
/**
* Interface for classes that will be called by the TransportLogger
* class to actually write to a log file.
* Every class that implements this interface has do be declared in
* the resources/META-INF/services/org/apache/activemq/transport/logwriters
* directory, by creating a file with the name of the writer (for example
* "default") and including the line
* class=org.apache.activemq.transport.logwriters.(Name of the LogWriter class)
*/
public interface LogWriter {
/**
* Writes a header message to the log.
* @param log The log to be written to.
*/
public void initialMessage(Log log);
/**
* Writes a message to a log when a request command is sent.
* @param log The log to be written to.
* @param command The command to be logged.
*/
public void logRequest (Log log, Object command);
/**
* Writes a message to a log when a response command is received.
* @param log The log to be written to.
* @param command The command to be logged.
*/
public void logResponse (Log log, Object response);
/**
* Writes a message to a log when an asynchronous equest command is sent.
* @param log The log to be written to.
* @param command The command to be logged.
*/
public void logAsyncRequest (Log log, Object command);
/**
* Writes a message to a log when message is sent.
* @param log The log to be written to.
* @param command The command to be logged.
*/
public void logOneWay (Log log, Object command);
/**
* Writes a message to a log when message is received.
* @param log The log to be written to.
* @param command The command to be logged.
*/
public void logReceivedCommand (Log log, Object command);
/**
* Writes a message to a log when an exception is received.
* @param log The log to be written to.
* @param command The command to be logged.
*/
public void logReceivedException (Log log, IOException error);
}

View File

@ -19,71 +19,136 @@ package org.apache.activemq.transport;
import java.io.IOException; import java.io.IOException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/** /**
* @version $Revision$ * @version $Revision$
*/ */
public class TransportLogger extends TransportFilter { public class TransportLogger extends TransportFilter {
private static int lastId;
private final Log log; private final Log log;
private boolean logging;
private final LogWriter logWriter;
private TransportLoggerView view;
public TransportLogger(Transport next) { public TransportLogger(Transport next, Log log, boolean startLogging, LogWriter logWriter) {
this(next, LogFactory.getLog(TransportLogger.class.getName() + ".Connection:" + getNextId())); // Changed constructor to pass the implementation of the LogWriter interface
} // that will be used to write the messages.
public TransportLogger(Transport next, Log log) {
super(next); super(next);
this.log = log; this.log = log;
this.logging = startLogging;
this.logWriter = logWriter;
} }
private static synchronized int getNextId() { /**
return ++lastId; * Returns true if logging is activated for this TransportLogger, false otherwise.
* @return true if logging is activated for this TransportLogger, false otherwise.
*/
public boolean isLogging() {
return logging;
}
/**
* Sets if logging should be activated for this TransportLogger.
* @param logging true to activate logging, false to deactivate.
*/
public void setLogging(boolean logging) {
this.logging = logging;
} }
public Object request(Object command) throws IOException { public Object request(Object command) throws IOException {
log.debug("SENDING REQUEST: " + command); // Changed this method to use a LogWriter object to actually
// print the messages to the log, and only in case of logging
// being active, instead of logging the message directly.
if (logging)
logWriter.logRequest(log, command);
Object rc = super.request(command); Object rc = super.request(command);
log.debug("GOT RESPONSE: " + rc); if (logging)
logWriter.logResponse(log, command);
return rc; return rc;
} }
public Object request(Object command, int timeout) throws IOException { public Object request(Object command, int timeout) throws IOException {
log.debug("SENDING REQUEST: " + command); // Changed this method to use a LogWriter object to actually
// print the messages to the log, and only in case of logging
// being active, instead of logging the message directly.
if (logging)
logWriter.logRequest(log, command);
Object rc = super.request(command, timeout); Object rc = super.request(command, timeout);
log.debug("GOT RESPONSE: " + rc); if (logging)
logWriter.logResponse(log, command);
return rc; return rc;
} }
public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
log.debug("SENDING ASNYC REQUEST: " + command); // Changed this method to use a LogWriter object to actually
// print the messages to the log, and only in case of logging
// being active, instead of logging the message directly.
if (logging)
logWriter.logAsyncRequest(log, command);
FutureResponse rc = next.asyncRequest(command, responseCallback); FutureResponse rc = next.asyncRequest(command, responseCallback);
return rc; return rc;
} }
public void oneway(Object command) throws IOException { public void oneway(Object command) throws IOException {
if (log.isDebugEnabled()) { // Changed this method to use a LogWriter object to actually
log.debug("SENDING: " + command); // print the messages to the log, and only in case of logging
// being active, instead of logging the message directly.
if( logging && log.isDebugEnabled() ) {
logWriter.logOneWay(log, command);
} }
next.oneway(command); next.oneway(command);
} }
public void onCommand(Object command) { public void onCommand(Object command) {
if (log.isDebugEnabled()) { // Changed this method to use a LogWriter object to actually
log.debug("RECEIVED: " + command); // print the messages to the log, and only in case of logging
// being active, instead of logging the message directly.
if( logging && log.isDebugEnabled() ) {
logWriter.logReceivedCommand(log, command);
} }
getTransportListener().onCommand(command); getTransportListener().onCommand(command);
} }
public void onException(IOException error) { public void onException(IOException error) {
if (log.isDebugEnabled()) { // Changed this method to use a LogWriter object to actually
log.debug("RECEIVED Exception: " + error, error); // print the messages to the log, and only in case of logging
// being active, instead of logging the message directly.
if( logging && log.isDebugEnabled() ) {
logWriter.logReceivedException(log, error);
} }
getTransportListener().onException(error); getTransportListener().onException(error);
} }
/**
* Gets the associated MBean for this TransportLogger.
* @return the associated MBean for this TransportLogger.
*/
public TransportLoggerView getView() {
return view;
}
/**
* Sets the associated MBean for this TransportLogger.
* @param view the associated MBean for this TransportLogger.
*/
public void setView(TransportLoggerView view) {
this.view = view;
}
public String toString() { public String toString() {
return next.toString(); return next.toString();
} }
/**
* We need to override this method
* so that we can unregister the associated
* MBean to avoid a memory leak.
*/
public void finalize() throws Throwable {
if (view != null) {
view.unregister();
}
}
} }

View File

@ -0,0 +1,33 @@
package org.apache.activemq.transport;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.broker.jmx.ManagementContext;
/**
* Implementation of the TransportLoggerControlMBean interface,
* which is an MBean used to control all TransportLoggers at once.
*/
public class TransportLoggerControl implements TransportLoggerControlMBean {
/**
* Constructor
*/
public TransportLoggerControl(ManagementContext managementContext) {
}
// doc comment inherited from TransportLoggerControlMBean
public void disableAllTransportLoggers() {
TransportLoggerView.disableAllTransportLoggers();
}
// doc comment inherited from TransportLoggerControlMBean
public void enableAllTransportLoggers() {
TransportLoggerView.enableAllTransportLoggers();
}
// doc comment inherited from TransportLoggerControlMBean
public void reloadLog4jProperties() throws Exception {
new BrokerView(null, null).reloadLog4jProperties();
}
}

View File

@ -0,0 +1,27 @@
package org.apache.activemq.transport;
/**
* MBean used to manage all of the TransportLoggers at once.
* Avalaible operations:
* -Enable logging for all TransportLoggers at once.
* -Disable logging for all TransportLoggers at once.
*/
public interface TransportLoggerControlMBean {
/**
* Enable logging for all Transport Loggers at once.
*/
public void enableAllTransportLoggers();
/**
* Disable logging for all Transport Loggers at once.
*/
public void disableAllTransportLoggers();
/**
* Reloads log4j.properties from the classpath
* @throws Exception
*/
public void reloadLog4jProperties() throws Exception;
}

View File

@ -0,0 +1,194 @@
package org.apache.activemq.transport;
import java.io.IOException;
import javax.management.ObjectName;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.LogWriterFinder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Singleton class to create TransportLogger objects.
* When the method getInstance() is called for the first time,
* a TransportLoggerControlMBean is created and registered.
* This MBean permits enabling and disabling the logging for
* all TransportLogger objects at once.
* @see TransportLoggerControlMBean
*/
public class TransportLoggerFactory {
private static final Log log = LogFactory.getLog(TransportLoggerFactory.class);
private static TransportLoggerFactory instance;
private static int lastId=0;
private static final LogWriterFinder logWriterFinder = new LogWriterFinder("META-INF/services/org/apache/activemq/transport/logwriters/");
/**
* LogWriter that will be used if none is specified.
*/
public static String defaultLogWriterName = "default";
/**
* If transport logging is enabled, it will be possible to control
* the transport loggers or not based on this value
*/
private static boolean defaultDynamicManagement = false;
/**
* If transport logging is enabled, the transport loggers will initially
* output or not depending on this value.
* This setting only has a meaning if
*/
private static boolean defaultInitialBehavior = true;
/**
*
*/
private static int defaultJmxPort = 1099;
private boolean transportLoggerControlCreated = false;
private ManagementContext managementContext;
private ObjectName objectName;
/**
* Private constructor.
*/
private TransportLoggerFactory() {
}
/**
* Returns a TransportLoggerFactory object which can be used to create TransportLogger objects.
* @return a TransportLoggerFactory object
*/
public static synchronized TransportLoggerFactory getInstance() {
if (instance == null) {
instance = new TransportLoggerFactory();
}
return instance;
}
public void stop() {
try {
if (this.transportLoggerControlCreated) {
this.managementContext.unregisterMBean(this.objectName);
this.managementContext.stop();
this.managementContext = null;
}
} catch (Exception e) {
log.error("TransportLoggerFactory could not be stopped, reason: " + e, e);
}
}
/**
* Creates a TransportLogger object, that will be inserted in the Transport Stack.
* Uses the default initial behavior, the default log writer, and creates a new
* log4j object to be used by the TransportLogger.
* @param next The next Transport layer in the Transport stack.
* @return A TransportLogger object.
* @throws IOException
*/
public TransportLogger createTransportLogger(Transport next) throws IOException {
int id = getNextId();
return createTransportLogger(next, id, createLog(id), defaultLogWriterName, defaultDynamicManagement, defaultInitialBehavior, defaultJmxPort);
}
/**
* Creates a TransportLogger object, that will be inserted in the Transport Stack.
* Uses the default initial behavior and the default log writer.
* @param next The next Transport layer in the Transport stack.
* @param log The log4j log that will be used by the TransportLogger.
* @return A TransportLogger object.
* @throws IOException
*/
public TransportLogger createTransportLogger(Transport next, Log log) throws IOException {
return createTransportLogger(next, getNextId(), log, defaultLogWriterName, defaultDynamicManagement, defaultInitialBehavior,defaultJmxPort);
}
/**
* Creates a TransportLogger object, that will be inserted in the Transport Stack.
* Creates a new log4j object to be used by the TransportLogger.
* @param next The next Transport layer in the Transport stack.
* @param startLogging Specifies if this TransportLogger should be initially active or not.
* @param logWriterName The name or the LogWriter to be used. Different log writers can output
* logs with a different format.
* @return A TransportLogger object.
* @throws IOException
*/
public TransportLogger createTransportLogger(Transport next, String logWriterName,
boolean useJmx, boolean startLogging, int jmxport) throws IOException {
int id = getNextId();
return createTransportLogger(next, id, createLog(id), logWriterName, useJmx, startLogging, jmxport);
}
/**
* Creates a TransportLogger object, that will be inserted in the Transport Stack.
* @param next The next Transport layer in the Transport stack.
* @param id The id of the transport logger.
* @param log The log4j log that will be used by the TransportLogger.
* @param logWriterName The name or the LogWriter to be used. Different log writers can output
* @param dynamicManagement Specifies if JMX will be used to switch on/off the TransportLogger to be created.
* @param startLogging Specifies if this TransportLogger should be initially active or not. Only has a meaning if
* dynamicManagement = true.
* @param jmxPort the port to be used by the JMX server. It should only be different from 1099 (broker's default JMX port)
* when it's a client that is using Transport Logging. In a broker, if the port is different from 1099, 2 JMX servers will
* be created, both identical, with all the MBeans.
* @return A TransportLogger object.
* @throws IOException
*/
public TransportLogger createTransportLogger(Transport next, int id, Log log,
String logWriterName, boolean dynamicManagement, boolean startLogging, int jmxport) throws IOException {
try {
LogWriter logWriter = logWriterFinder.newInstance(logWriterName);
TransportLogger tl = new TransportLogger (next, log, startLogging, logWriter);
if (dynamicManagement) {
synchronized (this) {
if (!this.transportLoggerControlCreated) {
this.createTransportLoggerControl(jmxport);
}
}
TransportLoggerView tlv = new TransportLoggerView(tl, next.toString(), id, this.managementContext);
tl.setView(tlv);
}
return tl;
} catch (Throwable e) {
throw IOExceptionSupport.create("Could not create log writer object for: " + logWriterName + ", reason: " + e, e);
}
}
synchronized private static int getNextId() {
return ++lastId;
}
private static Log createLog(int id) {
return LogFactory.getLog(TransportLogger.class.getName()+".Connection:" + id);
}
/**
* Starts the management context.
* Creates and registers a TransportLoggerControl MBean which enables the user
* to enable/disable logging for all transport loggers at once.
*/
private void createTransportLoggerControl(int port) {
try {
this.managementContext = new ManagementContext();
this.managementContext.setConnectorPort(port);
this.managementContext.start();
} catch (Exception e) {
log.error("Management context could not be started, reason: " + e, e);
}
try {
this.objectName = new ObjectName(this.managementContext.getJmxDomainName()+":"+ "Type=TransportLoggerControl");
this.managementContext.getMBeanServer().registerMBean(new TransportLoggerControl(this.managementContext),this.objectName);
this.transportLoggerControlCreated = true;
} catch (Exception e) {
log.error("TransportLoggerControlMBean could not be registered, reason: " + e, e);
}
}
}

View File

@ -0,0 +1,154 @@
package org.apache.activemq.transport;
import java.lang.ref.WeakReference;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import javax.management.ObjectName;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.util.JMXSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Class implementing the TransportLoggerViewMBean interface.
* When an object of this class is created, it registers itself in
* the MBeanServer of the management context provided.
* When a TransportLogger object is finalized because the Transport Stack
* where it resides is no longer in use, the method unregister() will be called.
* @see TransportLoggerViewMBean.
*/
public class TransportLoggerView implements TransportLoggerViewMBean {
private static final Log log = LogFactory.getLog(TransportLoggerView.class);
/**
* Set with the TransportLoggerViews objects created.
* Used by the methods enableAllTransportLoggers and diablellTransportLoggers.
* The method unregister() removes objects from this set.
*/
private static Set<TransportLoggerView> transportLoggerViews = Collections.synchronizedSet(new HashSet<TransportLoggerView>());
private final WeakReference<TransportLogger> transportLogger;
private final String nextTransportName;
private final int id;
private final ManagementContext managementContext;
private final ObjectName name;
/**
* Constructor.
* @param transportLogger The TransportLogger object which is to be managed by this MBean.
* @param nextTransportName The name of the next TransportLayer. This is used to give a unique
* name for each MBean of the TransportLoggerView class.
* @param id The id of the TransportLogger to be watched.
* @param managementContext The management context who has the MBeanServer where this MBean will be registered.
*/
public TransportLoggerView (TransportLogger transportLogger, String nextTransportName, int id, ManagementContext managementContext) {
this.transportLogger = new WeakReference<TransportLogger>(transportLogger);
this.nextTransportName = nextTransportName;
this.id = id;
this.managementContext = managementContext;
this.name = this.createTransportLoggerObjectName();
TransportLoggerView.transportLoggerViews.add(this);
this.register();
}
/**
* Enable logging for all Transport Loggers at once.
*/
public static void enableAllTransportLoggers() {
for (TransportLoggerView view : transportLoggerViews) {
view.enableLogging();
}
}
/**
* Disable logging for all Transport Loggers at once.
*/
public static void disableAllTransportLoggers() {
for (TransportLoggerView view : transportLoggerViews) {
view.disableLogging();
}
}
// doc comment inherited from TransportLoggerViewMBean
public void enableLogging() {
this.setLogging(true);
}
// doc comment inherited from TransportLoggerViewMBean
public void disableLogging() {
this.setLogging(false);
}
// doc comment inherited from TransportLoggerViewMBean
public boolean isLogging() {
return transportLogger.get().isLogging();
}
// doc comment inherited from TransportLoggerViewMBean
public void setLogging(boolean logging) {
transportLogger.get().setLogging(logging);
}
/**
* Registers this MBean in the MBeanServer of the management context
* provided at creation time. This method is only called by the constructor.
*/
private void register() {
try {
this.managementContext.getMBeanServer().registerMBean(this, this.name);
} catch (Exception e) {
log.error("Could not register MBean for TransportLoggerView " + id + "with name " + this.name.toString() + ", reason: " + e, e);
}
}
/**
* Unregisters the MBean from the MBeanServer of the management context
* provided at creation time.
* This method is called by the TransportLogger object being managed when
* the TransportLogger object is finalized, to avoid the memory leak that
* would be caused if MBeans were not unregistered.
*/
public void unregister() {
TransportLoggerView.transportLoggerViews.remove(this);
try {
this.managementContext.getMBeanServer().unregisterMBean(this.name);
} catch (Exception e) {
log.error("Could not unregister MBean for TransportLoggerView " + id + "with name " + this.name.toString() + ", reason: " + e, e);
}
}
/**
* Creates the ObjectName to be used when registering the MBean.
* @return the ObjectName to be used when registering the MBean.
*/
private ObjectName createTransportLoggerObjectName() {
try {
return new ObjectName(
createTransportLoggerObjectNameRoot(this.managementContext)
+ JMXSupport.encodeObjectNamePart(TransportLogger.class.getSimpleName()
+ " " + this.id + ";" + this.nextTransportName));
} catch (Exception e) {
log.error("Could not create ObjectName for TransportLoggerView " + id + ", reason: " + e, e);
return null;
}
}
/**
* Creates the part of the ObjectName that will be used by all MBeans.
* This method is public so it can be used by the TransportLoggerControl class.
* @param managementContext
* @return A String with the part of the ObjectName common to all the TransportLoggerView MBeans.
*/
public static String createTransportLoggerObjectNameRoot(ManagementContext managementContext) {
return managementContext.getJmxDomainName()+":"+"Type=TransportLogger,"+"TransportLoggerName=";
}
}

View File

@ -0,0 +1,37 @@
package org.apache.activemq.transport;
/**
* MBean to manage a single Transport Logger.
* It can inform if the logger is currently writing to a log file or not,
* by setting the logging property or by using the operations
* enableLogging() and disableLogging()
*/
public interface TransportLoggerViewMBean {
/**
* Returns if the managed TransportLogger is currently active
* (writing to a log) or not.
* @return if the managed TransportLogger is currently active
* (writing to a log) or not.
*/
public boolean isLogging();
/**
* Enables or disables logging for the managed TransportLogger.
* @param logging Boolean value to enable or disable logging for
* the managed TransportLogger.
* true to enable logging, false to disable logging.
*/
public void setLogging(boolean logging);
/**
* Enables logging for the managed TransportLogger.
*/
public void enableLogging();
/**
* Disables logging for the managed TransportLogger.
*/
public void disableLogging();
}

View File

@ -0,0 +1,142 @@
package org.apache.activemq.transport.logwriters;
import java.io.IOException;
import org.apache.activemq.command.BaseCommand;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.transport.LogWriter;
import org.apache.commons.logging.Log;
/**
* Custom implementation of LogWriter interface.
*/
public class CustomLogWriter implements LogWriter {
// doc comment inherited from LogWriter
public void initialMessage(Log log) {
}
// doc comment inherited from LogWriter
public void logRequest (Log log, Object command) {
log.debug("$$ SENDREQ: " + CustomLogWriter.commandToString(command));
}
// doc comment inherited from LogWriter
public void logResponse (Log log, Object response) {
log.debug("$$ GOT_RESPONSE: "+response);
}
// doc comment inherited from LogWriter
public void logAsyncRequest (Log log, Object command) {
log.debug("$$ SENDING_ASNYC_REQUEST: "+command);
}
// doc comment inherited from LogWriter
public void logOneWay (Log log, Object command) {
log.debug("$$ SENDING: " + CustomLogWriter.commandToString(command));
}
// doc comment inherited from LogWriter
public void logReceivedCommand (Log log, Object command) {
log.debug("$$ RECEIVED: " + CustomLogWriter.commandToString(command));
}
// doc comment inherited from LogWriter
public void logReceivedException (Log log, IOException error) {
log.debug("$$ RECEIVED_EXCEPTION: "+error, error);
}
/**
* Transforms a command into a String
* @param command An object (hopefully of the BaseCommand class or subclass)
* to be transformed into String.
* @return A String which will be written by the CustomLogWriter.
* If the object is not a BaseCommand, the String
* "Unrecognized_object " + command.toString()
* will be returned.
*/
private static String commandToString(Object command) {
StringBuilder sb = new StringBuilder();
if (command instanceof BaseCommand) {
BaseCommand bc = (BaseCommand)command;
sb.append(command.getClass().getSimpleName());
sb.append(' ');
sb.append(bc.isResponseRequired() ? 'T' : 'F');
Message m = null;
if (bc instanceof Message) {
m = (Message)bc;
}
if (bc instanceof MessageDispatch){
m = ((MessageDispatch)bc).getMessage();
}
if (m != null) {
sb.append(' ');
sb.append(m.getMessageId());
sb.append(',');
sb.append(m.getCommandId());
ProducerId pid = m.getProducerId();
long sid = pid.getSessionId();
sb.append(',');
sb.append(pid.getConnectionId());
sb.append(',');
sb.append(sid);
sb.append(',');
sb.append(pid.getValue());
sb.append(',');
sb.append(m.getCorrelationId());
sb.append(',');
sb.append(m.getType());
}
if (bc instanceof MessageDispatch){
sb.append(" toConsumer:");
sb.append(((MessageDispatch)bc).getConsumerId());
}
if (bc instanceof ProducerAck) {
sb.append(" ProducerId:");
sb.append(((ProducerAck)bc).getProducerId());
}
if (bc instanceof MessageAck) {
MessageAck ma = (MessageAck)bc;
sb.append(" ConsumerID:");
sb.append(ma.getConsumerId());
sb.append(" ack:");
sb.append(ma.getFirstMessageId());
sb.append('-');
sb.append(ma.getLastMessageId());
}
if (bc instanceof ConnectionInfo) {
ConnectionInfo ci = (ConnectionInfo)bc;
sb.append(' ');
sb.append(ci.getConnectionId());
}
} else if (command instanceof WireFormatInfo){
sb.append("WireFormatInfo");
} else {
sb.append("Unrecognized_object ");
sb.append(command.toString());
}
return sb.toString();
}
}

View File

@ -0,0 +1,50 @@
package org.apache.activemq.transport.logwriters;
import java.io.IOException;
import org.apache.activemq.transport.LogWriter;
import org.apache.commons.logging.Log;
/**
* Implementation of LogWriter interface to keep ActiveMQ's
* old logging format.
*/
public class DefaultLogWriter implements LogWriter {
// doc comment inherited from LogWriter
public void initialMessage(Log log) {
// Default log writer does nothing here
}
// doc comment inherited from LogWriter
public void logRequest (Log log, Object command) {
log.debug("SENDING REQUEST: "+command);
}
// doc comment inherited from LogWriter
public void logResponse (Log log, Object response) {
log.debug("GOT RESPONSE: "+response);
}
// doc comment inherited from LogWriter
public void logAsyncRequest (Log log, Object command) {
log.debug("SENDING ASNYC REQUEST: "+command);
}
// doc comment inherited from LogWriter
public void logOneWay (Log log, Object command) {
log.debug("SENDING: "+command);
}
// doc comment inherited from LogWriter
public void logReceivedCommand (Log log, Object command) {
log.debug("RECEIVED: " + command);
}
// doc comment inherited from LogWriter
public void logReceivedException (Log log, IOException error) {
log.debug("RECEIVED Exception: "+error, error);
}
}

View File

@ -38,6 +38,7 @@ import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.InactivityMonitor; import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportLogger; import org.apache.activemq.transport.TransportLogger;
import org.apache.activemq.transport.TransportLoggerFactory;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.WireFormatNegotiator; import org.apache.activemq.transport.WireFormatNegotiator;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
@ -104,7 +105,12 @@ public class SslTransportFactory extends TcpTransportFactory {
sslTransport.setSocketOptions(socketOptions); sslTransport.setSocketOptions(socketOptions);
if (sslTransport.isTrace()) { if (sslTransport.isTrace()) {
transport = new TransportLogger(transport); try {
transport = TransportLoggerFactory.getInstance().createTransportLogger(transport,
sslTransport.getLogWriterName(), sslTransport.isDynamicManagement(), sslTransport.isStartLogging(), sslTransport.getJmxPort());
} catch (Throwable e) {
LOG.error("Could not create TransportLogger object for: " + sslTransport.getLogWriterName() + ", reason: " + e, e);
}
} }
transport = new InactivityMonitor(transport); transport = new InactivityMonitor(transport);

View File

@ -36,6 +36,7 @@ import javax.net.SocketFactory;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportLoggerFactory;
import org.apache.activemq.transport.TransportThreadSupport; import org.apache.activemq.transport.TransportThreadSupport;
import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceStopper;
@ -62,7 +63,45 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
protected Socket socket; protected Socket socket;
protected DataOutputStream dataOut; protected DataOutputStream dataOut;
protected DataInputStream dataIn; protected DataInputStream dataIn;
protected boolean trace; /**
* trace=true -> the Transport stack where this TcpTransport
* object will be, will have a TransportLogger layer
* trace=false -> the Transport stack where this TcpTransport
* object will be, will NOT have a TransportLogger layer, and therefore
* will never be able to print logging messages.
* This parameter is most probably set in Connection or TransportConnector URIs.
*/
protected boolean trace = false;
/**
* Name of the LogWriter implementation to use.
* Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
* This parameter is most probably set in Connection or TransportConnector URIs.
*/
protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
/**
* Specifies if the TransportLogger will be manageable by JMX or not.
* Also, as long as there is at least 1 TransportLogger which is manageable,
* a TransportLoggerControl MBean will me created.
*/
protected boolean dynamicManagement = false;
/**
* startLogging=true -> the TransportLogger object of the Transport stack
* will initially write messages to the log.
* startLogging=false -> the TransportLogger object of the Transport stack
* will initially NOT write messages to the log.
* This parameter only has an effect if trace == true.
* This parameter is most probably set in Connection or TransportConnector URIs.
*/
protected boolean startLogging = true;
/**
* Specifies the port that will be used by the JMX server to manage
* the TransportLoggers.
* This should only be set in an URI by a client (producer or consumer) since
* a broker will already create a JMX server.
* It is useful for people who test a broker and clients in the same machine
* and want to control both via JMX; a different port will be needed.
*/
protected int jmxPort = 1099;
protected boolean useLocalHost = true; protected boolean useLocalHost = true;
protected int minmumWireFormatVersion; protected int minmumWireFormatVersion;
protected SocketFactory socketFactory; protected SocketFactory socketFactory;
@ -168,6 +207,45 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
this.trace = trace; this.trace = trace;
} }
public String getLogWriterName() {
return logWriterName;
}
public void setLogWriterName(String logFormat) {
this.logWriterName = logFormat;
}
public boolean isDynamicManagement() {
return dynamicManagement;
}
public void setDynamicManagement(boolean useJmx) {
this.dynamicManagement = useJmx;
}
public boolean isStartLogging() {
return startLogging;
}
public void setStartLogging(boolean startLogging) {
this.startLogging = startLogging;
}
public int getJmxPort() {
return jmxPort;
}
public void setJmxPort(int jmxPort) {
this.jmxPort = jmxPort;
}
public int getMinmumWireFormatVersion() { public int getMinmumWireFormatVersion() {
return minmumWireFormatVersion; return minmumWireFormatVersion;
} }

View File

@ -30,7 +30,7 @@ import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.InactivityMonitor; import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportLogger; import org.apache.activemq.transport.TransportLoggerFactory;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.WireFormatNegotiator; import org.apache.activemq.transport.WireFormatNegotiator;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
@ -84,7 +84,12 @@ public class TcpTransportFactory extends TransportFactory {
tcpTransport.setSocketOptions(socketOptions); tcpTransport.setSocketOptions(socketOptions);
if (tcpTransport.isTrace()) { if (tcpTransport.isTrace()) {
transport = new TransportLogger(transport); try {
transport = TransportLoggerFactory.getInstance().createTransportLogger(transport, tcpTransport.getLogWriterName(),
tcpTransport.isDynamicManagement(), tcpTransport.isStartLogging(), tcpTransport.getJmxPort());
} catch (Throwable e) {
LOG.error("Could not create TransportLogger object for: " + tcpTransport.getLogWriterName() + ", reason: " + e, e);
}
} }
if (isUseInactivityMonitor(transport)) { if (isUseInactivityMonitor(transport)) {

View File

@ -33,6 +33,7 @@ import javax.net.ServerSocketFactory;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.openwire.OpenWireFormatFactory; import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportLoggerFactory;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.TransportServerThreadSupport; import org.apache.activemq.transport.TransportServerThreadSupport;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
@ -57,7 +58,36 @@ public class TcpTransportServer extends TransportServerThreadSupport {
protected final TcpTransportFactory transportFactory; protected final TcpTransportFactory transportFactory;
protected long maxInactivityDuration = 30000; protected long maxInactivityDuration = 30000;
protected int minmumWireFormatVersion; protected int minmumWireFormatVersion;
protected boolean trace; /**
* trace=true -> the Transport stack where this TcpTransport
* object will be, will have a TransportLogger layer
* trace=false -> the Transport stack where this TcpTransport
* object will be, will NOT have a TransportLogger layer, and therefore
* will never be able to print logging messages.
* This parameter is most probably set in Connection or TransportConnector URIs.
*/
protected boolean trace = false;
/**
* Name of the LogWriter implementation to use.
* Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
* This parameter is most probably set in Connection or TransportConnector URIs.
*/
protected String logWriterName = TransportLoggerFactory.defaultLogWriterName;
/**
* Specifies if the TransportLogger will be manageable by JMX or not.
* Also, as long as there is at least 1 TransportLogger which is manageable,
* a TransportLoggerControl MBean will me created.
*/
protected boolean dynamicManagement = false;
/**
* startLogging=true -> the TransportLogger object of the Transport stack
* will initially write messages to the log.
* startLogging=false -> the TransportLogger object of the Transport stack
* will initially NOT write messages to the log.
* This parameter only has an effect if trace == true.
* This parameter is most probably set in Connection or TransportConnector URIs.
*/
protected boolean startLogging = true;
protected Map<String, Object> transportOptions; protected Map<String, Object> transportOptions;
protected final ServerSocketFactory serverSocketFactory; protected final ServerSocketFactory serverSocketFactory;
@ -147,6 +177,32 @@ public class TcpTransportServer extends TransportServerThreadSupport {
this.trace = trace; this.trace = trace;
} }
public String getLogWriterName() {
return logWriterName;
}
public void setLogWriterName(String logFormat) {
this.logWriterName = logFormat;
}
public boolean isDynamicManagement() {
return dynamicManagement;
}
public void setDynamicManagement(boolean useJmx) {
this.dynamicManagement = useJmx;
}
public boolean isStartLogging() {
return startLogging;
}
public void setStartLogging(boolean startLogging) {
this.startLogging = startLogging;
}
/** /**
* pull Sockets from the ServerSocket * pull Sockets from the ServerSocket
*/ */
@ -163,6 +219,10 @@ public class TcpTransportServer extends TransportServerThreadSupport {
options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration)); options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion)); options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion));
options.put("trace", Boolean.valueOf(trace)); options.put("trace", Boolean.valueOf(trace));
options.put("logWriterName", logWriterName);
options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
options.put("startLogging", Boolean.valueOf(startLogging));
options.putAll(transportOptions); options.putAll(transportOptions);
WireFormat format = wireFormatFactory.createWireFormat(); WireFormat format = wireFormatFactory.createWireFormat();
Transport transport = createTransport(socket, format); Transport transport = createTransport(socket, format);

View File

@ -28,20 +28,23 @@ import org.apache.activemq.transport.CommandJoiner;
import org.apache.activemq.transport.InactivityMonitor; import org.apache.activemq.transport.InactivityMonitor;
import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportLogger; import org.apache.activemq.transport.TransportLoggerFactory;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.reliable.DefaultReplayStrategy; import org.apache.activemq.transport.reliable.DefaultReplayStrategy;
import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy; import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
import org.apache.activemq.transport.reliable.ReliableTransport; import org.apache.activemq.transport.reliable.ReliableTransport;
import org.apache.activemq.transport.reliable.ReplayStrategy; import org.apache.activemq.transport.reliable.ReplayStrategy;
import org.apache.activemq.transport.reliable.Replayer; import org.apache.activemq.transport.reliable.Replayer;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport; import org.apache.activemq.util.URISupport;
import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class UdpTransportFactory extends TransportFactory { public class UdpTransportFactory extends TransportFactory {
private static final Log log = LogFactory.getLog(TcpTransportFactory.class);
public TransportServer doBind(String brokerId, final URI location) throws IOException { public TransportServer doBind(String brokerId, final URI location) throws IOException {
try { try {
Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location)); Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(location));
@ -75,7 +78,11 @@ public class UdpTransportFactory extends TransportFactory {
transport = new CommandJoiner(transport, asOpenWireFormat(format)); transport = new CommandJoiner(transport, asOpenWireFormat(format));
if (udpTransport.isTrace()) { if (udpTransport.isTrace()) {
transport = new TransportLogger(transport); try {
transport = TransportLoggerFactory.getInstance().createTransportLogger(transport);
} catch (Throwable e) {
log.error("Could not create TransportLogger object for: " + TransportLoggerFactory.defaultLogWriterName + ", reason: " + e, e);
}
} }
transport = new InactivityMonitor(transport); transport = new InactivityMonitor(transport);
@ -107,7 +114,7 @@ public class UdpTransportFactory extends TransportFactory {
OpenWireFormat openWireFormat = asOpenWireFormat(format); OpenWireFormat openWireFormat = asOpenWireFormat(format);
if (udpTransport.isTrace()) { if (udpTransport.isTrace()) {
transport = new TransportLogger(transport); transport = TransportLoggerFactory.getInstance().createTransportLogger(transport);
} }
transport = new InactivityMonitor(transport); transport = new InactivityMonitor(transport);

View File

@ -0,0 +1,122 @@
package org.apache.activemq.util;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.transport.LogWriter;
import org.apache.activemq.transport.TransportLoggerView;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Class used to find a LogWriter implementation, and returning
* a LogWriter object, taking as argument the name of a log writer.
* The mapping between the log writer names and the classes
* implementing LogWriter is specified by the files in the
* resources/META-INF/services/org/apache/activemq/transport/logwriters
* directory.
*/
public class LogWriterFinder {
private static final Log log = LogFactory.getLog(TransportLoggerView.class);
private final String path;
private final ConcurrentHashMap classMap = new ConcurrentHashMap();
/**
* Builds a LogWriterFinder that will look for the mappings between
* LogWriter names and classes in the directory "path".
* @param path The directory where the files that map log writer names to
* LogWriter classes are.
*/
public LogWriterFinder(String path) {
this.path = path;
}
/**
* Returns a LogWriter object, given a log writer name (for example "default", or "detailed").
* Uses a ConcurrentHashMap to cache the Class objects that have already been loaded.
* @param logWriterName a log writer name (for example "default", or "detailed").
* @return a LogWriter object to be used by the TransportLogger class.
* @throws IllegalAccessException
* @throws InstantiationException
* @throws IOException
* @throws ClassNotFoundException
*/
public LogWriter newInstance(String logWriterName)
throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException
{
Class clazz = (Class) classMap.get(logWriterName);
if (clazz == null) {
clazz = newInstance(doFindLogWriterProperties(logWriterName));
classMap.put(logWriterName, clazz);
}
return (LogWriter)clazz.newInstance();
}
/**
* Loads and returns a class given a Properties object with a "class" property.
* @param properties a Properties object with a "class" property.
* @return a Class object.
* @throws ClassNotFoundException
* @throws IOException
*/
private Class newInstance(Properties properties) throws ClassNotFoundException, IOException {
String className = properties.getProperty("class");
if (className == null) {
throw new IOException("Expected property is missing: " + "class");
}
Class clazz;
try {
clazz = Thread.currentThread().getContextClassLoader().loadClass(className);
} catch (ClassNotFoundException e) {
clazz = LogWriterFinder.class.getClassLoader().loadClass(className);
}
return clazz;
}
/**
* Given a log writer name, returns a Properties object with a "class" property
* whose value is a String with the name of the class to be loaded.
* @param logWriterName a log writer name.
* @return a Properties object with a "class" property
* @throws IOException
*/
protected Properties doFindLogWriterProperties (String logWriterName) throws IOException {
String uri = path + logWriterName;
// lets try the thread context class loader first
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
if (classLoader == null) classLoader = getClass().getClassLoader();
InputStream in = classLoader.getResourceAsStream(uri);
if (in == null) {
in = LogWriterFinder.class.getClassLoader().getResourceAsStream(uri);
if (in == null) {
log.error("Could not find log writer for resource: " + uri);
throw new IOException("Could not find log writer for resource: " + uri);
}
}
// lets load the file
BufferedInputStream reader = null;
Properties properties = new Properties();
try {
reader = new BufferedInputStream(in);
properties.load(reader);
return properties;
} finally {
try {
reader.close();
} catch (Exception e) {
}
}
}
}

View File

@ -0,0 +1 @@
class=org.apache.activemq.transport.logwriters.CustomLogWriter

View File

@ -0,0 +1 @@
class=org.apache.activemq.transport.logwriters.DefaultLogWriter