ARTEMIS-1623 ActiveMQServerPlugin impl for logging various broker events

Similar concept to the ActiveMQ 5.x loggingBrokerPlugin
This commit is contained in:
Pat Fox 2018-01-06 14:06:39 +01:00 committed by Clebert Suconic
parent f92df7ff67
commit e350c58a46
9 changed files with 1837 additions and 8 deletions

View File

@ -0,0 +1,682 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.plugin.impl;
import java.io.Serializable;
import java.util.Map;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.utils.critical.CriticalComponent;
import org.jboss.logging.Logger;
/**
* plugin to log various events within the broker, configured with the following booleans
*
* LOG_CONNECTION_EVENTS - connections creation/destroy
* LOG_SESSION_EVENTS - sessions creation/close
* LOG_CONSUMER_EVENTS - consumers creation/close
* LOG_DELIVERING_EVENTS - messages delivered to consumer, acked by consumer
* LOG_SENDING_EVENTS - messaged is sent, message is routed
* LOG_INTERNAL_EVENTS - critical failures, bridge deployments, queue creation/destroyed, message expired
*/
public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serializable {
private static final Logger logger = Logger.getLogger(LoggingActiveMQServerPlugin.class);
private static final long serialVersionUID = 1L;
public static final String LOG_ALL_EVENTS = "LOG_ALL_EVENTS";
public static final String LOG_CONNECTION_EVENTS = "LOG_CONNECTION_EVENTS";
public static final String LOG_SESSION_EVENTS = "LOG_SESSION_EVENTS";
public static final String LOG_CONSUMER_EVENTS = "LOG_CONSUMER_EVENTS";
public static final String LOG_DELIVERING_EVENTS = "LOG_DELIVERING_EVENTS";
public static final String LOG_SENDING_EVENTS = "LOG_SENDING_EVENTS";
public static final String LOG_INTERNAL_EVENTS = "LOG_INTERNAL_EVENTS";
public static final String UNAVAILABLE = "UNAVAILABLE";
private boolean logAll = false;
private boolean logConnectionEvents = false;
private boolean logSessionEvents = false;
private boolean logConsumerEvents = false;
private boolean logDeliveringEvents = false;
private boolean logSendingEvents = false;
private boolean logInternalEvents = false;
public boolean isLogAll() {
return logAll;
}
public boolean isLogConnectionEvents() {
return logConnectionEvents;
}
public boolean isLogSessionEvents() {
return logSessionEvents;
}
public boolean isLogConsumerEvents() {
return logConsumerEvents;
}
public boolean isLogDeliveringEvents() {
return logDeliveringEvents;
}
public boolean isLogSendingEvents() {
return logSendingEvents;
}
public boolean isLogInternalEvents() {
return logInternalEvents;
}
/**
* used to pass configured properties to Plugin
*
* @param properties
*/
@Override
public void init(Map<String, String> properties) {
logAll = Boolean.parseBoolean(properties.getOrDefault(LOG_ALL_EVENTS, "false"));
logConnectionEvents = Boolean.parseBoolean(properties.getOrDefault(LOG_CONNECTION_EVENTS, "false"));
logSessionEvents = Boolean.parseBoolean(properties.getOrDefault(LOG_SESSION_EVENTS, "false"));
logConsumerEvents = Boolean.parseBoolean(properties.getOrDefault(LOG_CONSUMER_EVENTS, "false"));
logDeliveringEvents = Boolean.parseBoolean(properties.getOrDefault(LOG_DELIVERING_EVENTS, "false"));
logSendingEvents = Boolean.parseBoolean(properties.getOrDefault(LOG_SENDING_EVENTS, "false"));
logInternalEvents = Boolean.parseBoolean(properties.getOrDefault(LOG_INTERNAL_EVENTS, "false"));
if (logger.isDebugEnabled()) {
dumpConfiguration();
}
}
/**
* A connection has been created.
*
* @param connection The newly created connection
* @throws ActiveMQException
*/
@Override
public void afterCreateConnection(RemotingConnection connection) throws ActiveMQException {
if (logger.isInfoEnabled() && (logAll || logConnectionEvents)) {
logger.infof("created connection: %s", connection);
}
}
/**
* A connection has been destroyed.
*
* @param connection
* @throws ActiveMQException
*/
@Override
public void afterDestroyConnection(RemotingConnection connection) throws ActiveMQException {
if (logger.isInfoEnabled() && (logAll || logConnectionEvents)) {
logger.infof("destroyed connection: %s", connection);
}
}
/**
* Before a session is created.
*
* @param name
* @param username
* @param minLargeMessageSize
* @param connection
* @param autoCommitSends
* @param autoCommitAcks
* @param preAcknowledge
* @param xa
* @param publicAddress
* @param callback
* @param autoCreateQueues
* @param context
* @param prefixes
* @throws ActiveMQException
*/
@Override
public void beforeCreateSession(String name,
String username,
int minLargeMessageSize,
RemotingConnection connection,
boolean autoCommitSends,
boolean autoCommitAcks,
boolean preAcknowledge,
boolean xa,
String publicAddress,
SessionCallback callback,
boolean autoCreateQueues,
OperationContext context,
Map<SimpleString, RoutingType> prefixes) throws ActiveMQException {
if (logger.isDebugEnabled() && (logAll || logSessionEvents)) {
logger.debugf("beforeCreateSession called with name: %s , username: %s, minLargeMessageSize: %s , connection: %s"
+ ", autoCommitSends: %s, autoCommitAcks: %s, preAcknowledge: %s, xa: %s, publicAddress: %s"
+ ", , autoCreateQueues: %s, context: %s ", name, username, minLargeMessageSize, connection,
autoCommitSends, autoCommitAcks, preAcknowledge, xa, publicAddress, autoCreateQueues, context);
}
}
/**
* After a session has been created.
*
* @param session The newly created session
* @throws ActiveMQException
*/
@Override
public void afterCreateSession(ServerSession session) throws ActiveMQException {
if (logger.isInfoEnabled() && (logAll || logSessionEvents)) {
logger.infof("created session name: %s, session connectionID: %s",
(session == null ? UNAVAILABLE : session.getName()),
(session == null ? UNAVAILABLE : session.getConnectionID()));
}
}
/**
* Before a session is closed
*
* @param session
* @param failed
* @throws ActiveMQException
*/
@Override
public void beforeCloseSession(ServerSession session, boolean failed) throws ActiveMQException {
if (logger.isDebugEnabled() && (logAll || logSessionEvents)) {
logger.debugf("beforeCloseSession called with session name : %s, session: %s, failed: %s",
(session == null ? UNAVAILABLE : session.getName()), session, failed);
}
}
/**
* After a session is closed
*
* @param session
* @param failed
* @throws ActiveMQException
*/
@Override
public void afterCloseSession(ServerSession session, boolean failed) throws ActiveMQException {
if (logger.isInfoEnabled() && (logAll || logSessionEvents)) {
logger.infof("closed session with session name: %s, failed: %s",
(session == null ? UNAVAILABLE : session.getName()), failed);
}
}
/**
* Before session metadata is added to the session
*
* @param session
* @param key
* @param data
* @throws ActiveMQException
*/
@Override
public void beforeSessionMetadataAdded(ServerSession session, String key, String data) throws ActiveMQException {
if (logger.isDebugEnabled() && (logAll || logSessionEvents)) {
logger.debugf("beforeSessionMetadataAdded called with session name: %s , session: %s, key: %s, data: %s",
(session == null ? UNAVAILABLE : session.getName()), session, key, data);
}
}
/**
* After session metadata is added to the session
*
* @param session
* @param key
* @param data
* @throws ActiveMQException
*/
@Override
public void afterSessionMetadataAdded(ServerSession session, String key, String data) throws ActiveMQException {
if (logger.isDebugEnabled() && (logAll || logSessionEvents)) {
logger.debugf("added session metadata for session name : %s, session: %s, key: %s, data: %s",
(session == null ? UNAVAILABLE : session.getName()), session, key, data);
} else if (logger.isInfoEnabled() && (logAll || logSessionEvents)) {
logger.infof("added session metadata for session name : %s, key: %s, data: %s",
(session == null ? UNAVAILABLE : session.getName()), key, data);
}
}
/**
* Before a consumer is created
*
* @param consumerID
* @param queueBinding
* @param filterString
* @param browseOnly
* @param supportLargeMessage
* @throws ActiveMQException
*/
@Override
public void beforeCreateConsumer(long consumerID,
QueueBinding queueBinding,
SimpleString filterString,
boolean browseOnly,
boolean supportLargeMessage) throws ActiveMQException {
if (logger.isDebugEnabled() && (logAll || logConsumerEvents)) {
logger.debugf("beforeCreateConsumer called with ConsumerID: %s, QueueBinding: %s, filterString: %s," +
" browseOnly: %s, supportLargeMessage: %s", consumerID, queueBinding, filterString, browseOnly,
supportLargeMessage);
}
}
/**
* After a consumer has been created
*
* @param consumer the created consumer
* @throws ActiveMQException
*/
@Override
public void afterCreateConsumer(ServerConsumer consumer) throws ActiveMQException {
if (logger.isInfoEnabled() && (logAll || logConsumerEvents)) {
logger.infof("created consumer with ID: %s, with session name: %s",
(consumer == null ? UNAVAILABLE : consumer.getID()),
(consumer == null ? UNAVAILABLE : consumer.getSessionID()));
}
}
/**
* Before a consumer is closed
*
* @param consumer
* @param failed
* @throws ActiveMQException
*/
@Override
public void beforeCloseConsumer(ServerConsumer consumer, boolean failed) throws ActiveMQException {
if (logger.isDebugEnabled() && (logAll || logConsumerEvents)) {
logger.debugf("beforeCloseConsumer called with consumer: %s, consumer sessionID: %s, failed: %s",
consumer, (consumer == null ? UNAVAILABLE : consumer.getSessionID()), failed);
}
}
/**
* After a consumer is closed
*
* @param consumer
* @param failed
* @throws ActiveMQException
*/
@Override
public void afterCloseConsumer(ServerConsumer consumer, boolean failed) throws ActiveMQException {
if (logger.isInfoEnabled() && (logAll || logConsumerEvents)) {
logger.infof("closed consumer ID: %s, with consumer Session: %s, failed: %s",
(consumer == null ? UNAVAILABLE : consumer.getID()),
(consumer == null ? UNAVAILABLE : consumer.getSessionID()), failed);
}
}
/**
* Before a queue is created
*
* @param queueConfig
* @throws ActiveMQException
*/
@Override
public void beforeCreateQueue(QueueConfig queueConfig) throws ActiveMQException {
if (logger.isDebugEnabled() && (logAll || logInternalEvents)) {
logger.debugf("beforeCreateQueue called with queueConfig: %s", queueConfig);
}
}
/**
* After a queue has been created
*
* @param queue The newly created queue
* @throws ActiveMQException
*/
@Override
public void afterCreateQueue(Queue queue) throws ActiveMQException {
if (logger.isInfoEnabled() && (logAll || logInternalEvents)) {
logger.infof("created queue: %s", queue);
}
}
/**
* Before a queue is destroyed
*
* @param queueName
* @param session
* @param checkConsumerCount
* @param removeConsumers
* @param autoDeleteAddress
* @throws ActiveMQException
*/
@Override
public void beforeDestroyQueue(SimpleString queueName,
final SecurityAuth session,
boolean checkConsumerCount,
boolean removeConsumers,
boolean autoDeleteAddress) throws ActiveMQException {
if (logger.isDebugEnabled() && (logAll || logInternalEvents)) {
logger.debugf("beforeDestroyQueue called with queueName: %s, session: %s, checkConsumerCount: %s," +
" removeConsumers: %s, autoDeleteAddress: %s", queueName, session, checkConsumerCount,
removeConsumers, autoDeleteAddress);
}
}
/**
* After a queue has been destroyed
*
* @param queue
* @param address
* @param session
* @param checkConsumerCount
* @param removeConsumers
* @param autoDeleteAddress
* @throws ActiveMQException
*/
@Override
public void afterDestroyQueue(Queue queue,
SimpleString address,
final SecurityAuth session,
boolean checkConsumerCount,
boolean removeConsumers,
boolean autoDeleteAddress) throws ActiveMQException {
if (logger.isInfoEnabled() && (logAll || logInternalEvents)) {
logger.infof("destroyed queue: %s, with args address: %s, session: %s, checkConsumerCount: %s," +
" removeConsumers: %s, autoDeleteAddress: %s", queue, address, session, checkConsumerCount,
removeConsumers, autoDeleteAddress);
}
}
/**
* Before a message is sent
*
* @param session the session that sends the message
* @param tx
* @param message
* @param direct
* @param noAutoCreateQueue
* @throws ActiveMQException
*/
@Override
public void beforeSend(ServerSession session,
Transaction tx,
Message message,
boolean direct,
boolean noAutoCreateQueue) throws ActiveMQException {
if (logger.isDebugEnabled() && (logAll || logSendingEvents)) {
logger.debugf("beforeSend called with message: %s, tx: %s, session: %s, direct: %s, noAutoCreateQueue: %s",
message, tx, session, direct, noAutoCreateQueue);
}
}
/**
* After a message is sent
*
* @param session the session that sends the message
* @param tx
* @param message
* @param direct
* @param noAutoCreateQueue
* @param result
* @throws ActiveMQException
*/
@Override
public void afterSend(ServerSession session,
Transaction tx,
Message message,
boolean direct,
boolean noAutoCreateQueue,
RoutingStatus result) throws ActiveMQException {
if (logger.isDebugEnabled() && (logAll || logSendingEvents)) {
logger.debugf("sent message: %s, session name : %s, session connectionID: %s, with tx: %s, session: %s," +
" direct: %s, noAutoCreateQueue: %s, result: %s", message,
(session == null ? UNAVAILABLE : session.getName()),
(session == null ? UNAVAILABLE : session.getConnectionID()),
tx, session, direct, noAutoCreateQueue, result);
} else if (logger.isInfoEnabled() && (logAll || logSendingEvents)) {
logger.infof("sent message with ID: %s, session name: %s, session connectionID: %s, result: %s",
(message == null ? UNAVAILABLE : message.getMessageID()),
(session == null ? UNAVAILABLE : session.getName()),
(session == null ? UNAVAILABLE : session.getConnectionID()), result);
}
}
/**
* Before a message is routed
*
* @param message
* @param context
* @param direct
* @param rejectDuplicates
* @throws ActiveMQException
*/
@Override
public void beforeMessageRoute(Message message,
RoutingContext context,
boolean direct,
boolean rejectDuplicates) throws ActiveMQException {
if (logger.isDebugEnabled() && (logAll || logSendingEvents)) {
logger.debugf("beforeMessageRoute called with message: %s, context: %s, direct: %s, rejectDuplicates: %s",
message, context, direct, rejectDuplicates);
}
}
/**
* After a message is routed
*
* @param message
* @param context
* @param direct
* @param rejectDuplicates
* @param result
* @throws ActiveMQException
*/
@Override
public void afterMessageRoute(Message message,
RoutingContext context,
boolean direct,
boolean rejectDuplicates,
RoutingStatus result) throws ActiveMQException {
if (logger.isDebugEnabled() && (logAll || logSendingEvents)) {
logger.debugf("routed message: %s, with context: %s, direct: %s, rejectDuplicates: %s, result: %s",
message, context, direct, rejectDuplicates, result);
} else if (logger.isInfoEnabled() && (logAll || logSendingEvents)) {
logger.infof("routed message with ID: %s, result: %s",
(message == null ? UNAVAILABLE : message.getMessageID()), result);
}
}
/**
* Before a message is delivered to a client consumer
*
* @param consumer the consumer the message will be delivered to
* @param reference message reference
* @throws ActiveMQException
*/
@Override
public void beforeDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException {
if (logger.isDebugEnabled() && (logAll || logDeliveringEvents)) {
logger.debugf("beforeDeliver called with consumer: %s, reference: %s", consumer, reference);
}
}
/**
* After a message is delivered to a client consumer
*
* @param consumer the consumer the message was delivered to
* @param reference message reference
* @throws ActiveMQException
*/
@Override
public void afterDeliver(ServerConsumer consumer, MessageReference reference) throws ActiveMQException {
if (logger.isDebugEnabled() && (logAll || logDeliveringEvents)) {
Message message = (reference == null ? null : reference.getMessage());
if (consumer != null) {
logger.debugf("delivered message with message ID: %s to consumer on address: %s, queue: %s, consumer sessionID: %s,"
+ " consumerID: %s, full message: %s, full consumer: %s", (message == null ? UNAVAILABLE : message.getMessageID()),
consumer.getQueueAddress(), consumer.getQueueName(), consumer.getSessionID(), consumer.getID(), reference, consumer);
} else {
logger.debugf("delivered message with message ID: %s, consumer info UNAVAILABLE", (message == null ? UNAVAILABLE : message.getMessageID()));
}
} else if (logger.isInfoEnabled() && (logAll || logDeliveringEvents)) {
Message message = (reference == null ? null : reference.getMessage());
if (consumer != null) {
logger.infof("delivered message with message ID: %s, to consumer on address: %s, queue: %s, consumer sessionID: %s, consumerID: %s",
(message == null ? UNAVAILABLE : message.getMessageID()), consumer.getQueueAddress(), consumer.getQueueName(),
consumer.getSessionID(), consumer.getID());
} else {
logger.infof("delivered message with message ID: %s, consumer info UNAVAILABLE", (message == null ? UNAVAILABLE : message.getMessageID()));
}
}
}
/**
* A message has been expired
*
* @param message The expired message
* @param messageExpiryAddress The message expiry address if exists
* @throws ActiveMQException
*/
@Override
public void messageExpired(MessageReference message, SimpleString messageExpiryAddress) throws ActiveMQException {
if (logger.isInfoEnabled() && (logAll || logInternalEvents)) {
logger.infof("expired message: %s, messageExpiryAddress: %s", message, messageExpiryAddress);
}
}
/**
* A message has been acknowledged
*
* @param ref The acked message
* @param reason The ack reason
* @throws ActiveMQException
*/
@Override
public void messageAcknowledged(MessageReference ref, AckReason reason) throws ActiveMQException {
if (logger.isDebugEnabled() && (logAll || logDeliveringEvents)) {
logger.debugf("acknowledged message: %s, with ackReason: %s", ref, reason);
} else if (logger.isInfoEnabled() && (logAll || logDeliveringEvents)) {
Message message = (ref == null ? null : ref.getMessage());
Queue queue = (ref == null ? null : ref.getQueue());
logger.infof("acknowledged message ID: %s, with messageRef consumerID: %s, messageRef QueueName: %s, with ackReason: %s",
(message == null ? UNAVAILABLE : message.getMessageID()), (ref == null ? UNAVAILABLE : ref.getConsumerId()),
(queue == null ? UNAVAILABLE : queue.getName()), reason);
}
}
/**
* Before a bridge is deployed
*
* @param config The bridge configuration
* @throws ActiveMQException
*/
@Override
public void beforeDeployBridge(BridgeConfiguration config) throws ActiveMQException {
if (logger.isDebugEnabled() && (logAll || logInternalEvents)) {
logger.debugf("beforeDeployBridge called with bridgeConfiguration: %s", config);
}
}
/**
* After a bridge has been deployed
*
* @param bridge The newly deployed bridge
* @throws ActiveMQException
*/
@Override
public void afterDeployBridge(Bridge bridge) throws ActiveMQException {
if (logger.isInfoEnabled() && (logAll || logInternalEvents)) {
logger.infof("deployed bridge: %s", bridge);
}
}
/**
* A Critical failure has been detected.
* This will be called before the broker is stopped
*
* @param components
* @throws ActiveMQException
*/
@Override
public void criticalFailure(CriticalComponent components) throws ActiveMQException {
if (logger.isInfoEnabled() && (logAll || logInternalEvents)) {
logger.infof("criticalFailure called with criticalComponent: %s", components);
}
}
/**
* dump the configuration of the logging Plugin
*/
private void dumpConfiguration() {
if (logger.isDebugEnabled()) {
logger.debug("LoggingPlugin logAll=" + logAll);
logger.debug("LoggingPlugin logConnectionEvents=" + logConnectionEvents);
logger.debug("LoggingPlugin logSessionEvents=" + logSessionEvents);
logger.debug("LoggingPlugin logConsumerEvents=" + logConsumerEvents);
logger.debug("LoggingPlugin logSendingEvents=" + logSendingEvents);
logger.debug("LoggingPlugin logDeliveringEvents=" + logDeliveringEvents);
logger.debug("LoggingPlugin logInternalEvents=" + logInternalEvents);
}
}
}

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert; import org.junit.Assert;
@ -469,6 +470,9 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
conf.setClusterPassword(s); conf.setClusterPassword(s);
Assert.assertEquals(s, conf.getClusterPassword()); Assert.assertEquals(s, conf.getClusterPassword());
conf.registerBrokerPlugin(new LoggingActiveMQServerPlugin());
Assert.assertEquals("ensure one plugin registered", 1, conf.getBrokerPlugins().size());
// This will use serialization to perform a deep copy of the object // This will use serialization to perform a deep copy of the object
Configuration conf2 = conf.copy(); Configuration conf2 = conf.copy();

View File

@ -53,4 +53,80 @@ Configuration config = new ConfigurationImpl();
config.registerBrokerPlugin(new UserPlugin()); config.registerBrokerPlugin(new UserPlugin());
``` ```
## Using the LoggingActiveMQServerPlugin
The LoggingActiveMQServerPlugin logs specific broker events.
You can select which events are logged by setting the following configuration properties to `true`.
<table summary="LoggingActiveMQServerPlugin configuration" border="1">
<colgroup>
<col/>
<col/>
</colgroup>
<thead>
<tr>
<th>Property</th>
<th>Property Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>LOG_CONNECTION_EVENTS</td>
<td>Log info when a Connection is created/destroy. Default `false`.</td>
</tr>
<tr>
<td>LOG_SESSION_EVENTS</td>
<td>Log info when a Session is created/closed. Default `false`.</td>
</tr>
<tr>
<td>LOG_CONSUMER_EVENTS</td>
<td>Logs info when a Consumer is created/closed. Default `false`.</td>
</tr>
<tr>
<td>LOG_DELIVERING_EVENTS</td>
<td>Logs info when message is delivered to a consumer and when a message is acknowledged by a consumer.
Default `false`</td>
</tr>
<tr>
<td>LOG_SENDING_EVENTS</td>
<td>Logs info when a message has been sent to an address and when a message has been routed within the broker.
Default `false`</td>
</tr>
<tr>
<td>LOG_INTERNAL_EVENTS</td>
<td>Logs info when a queue created/destroyed, when a message is expired, when a bridge is deployed and when a critical
failure occurs. Default `false`</td>
</tr>
<tr>
<td>LOG_ALL_EVENTS</td>
<td>Logs info for all the above events. Default `false`</td>
</tr>
</tbody>
</table>
By default the LoggingActiveMQServerPlugin wil not log any information. The logging is activated by setting one (or a selection)
of the above configuration properties to `true`.
To configure the plugin, you can add the following configuration to the broker. In the example below both LOG_DELIVERING_EVENTS
and LOG_SENDING_EVENTS will be logged by the broker.
```xml
<configuration ...>
...
<broker-plugins>
<broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
<property key="LOG_DELIVERING_EVENTS" value="true" />
<property key="LOG_SENDING_EVENTS" value="true" />
</broker-plugin>
</broker-plugins>
...
</configuration>
```
The LoggingActiveMQServerPlugin logs information at Log Level `INFO`. By setting the Logger
"org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin" to `DEBUG` more information for each
event will be logged.

View File

@ -0,0 +1,178 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.extras.byteman;
import javax.jms.Connection;
import javax.jms.JMSException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.junit.Test;
import org.junit.runner.RunWith;
/**
* NOTE: this test should be run at log level INFO
*
* This test checks the LoggingActiveMQServerPlugin is logging expected data with specific property configurations
* when client using AMQP
*/
@RunWith(BMUnitRunner.class)
public class LoggingActiveMQServerPluginAMQPTest extends LoggingActiveMQServerPluginTest {
/**
* Aim: test all events are logged when plugin configured with
* LOG_ALL_EVENTS
*
* Overridden as behaviour slightly different for AMQP - consumer closed plugin point called twice
*
* @throws Exception
*/
@Override
@Test
@BMRules(rules = {@BMRule(name = "test logAll EVENT",
targetClass = "org.jboss.logging.Logger",
targetMethod = "infof",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.LoggingActiveMQServerPluginTest.infoLog($1, $0)")})
public void testLogAll() throws Exception {
//initial plugin
ActiveMQServer activeMQServer = createServerWithLoggingPlugin(LoggingActiveMQServerPlugin.LOG_ALL_EVENTS);
activeMQServer.start();
try {
sendAndReceive(true, true, "log ALL Message_1", 0);
sendAndReceive(true, true, "log ALL Message_2", 0);
Thread.sleep(500);
assertEquals("created connections", 2, createdConnectionLogs.size());
assertEquals("destroyed connections", 2, destroyedConnectionLogs.size());
assertEquals("created consumer", 2, createdConsumerLogs.size());
assertEquals("closed consumer", 4, closedConsumerLogs.size());
assertEquals("delivered message", 2, deliveredLogs.size());
assertEquals("acked message", 2, ackedLogs.size());
assertEquals("sending message", 2, sentLogs.size());
assertEquals("routing message", 2, routedLogs.size());
assertEquals("queue created", 2, createdQueueLogs.size());
assertEquals("queue destroyed", 2, destroyedQueueLogs.size());
assertEquals("expired message", 0, messageExpiredLogs.size());
} finally {
activeMQServer.stop();
//reset the logs lists
clearLogLists();
}
}
/**
* Aim: test the consumer create/close events are logged when plugin configured with
* LOG_CONSUMER_EVENTS
*
* Overridden as behaviour slightly different for AMQP - consumer closed plugin point seems to be called twice
* @throws Exception
*/
@Override
@Test
@BMRules(rules = {@BMRule(name = "test LOG_CONSUMER_EVENTS",
targetClass = "org.jboss.logging.Logger",
targetMethod = "infof",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.LoggingActiveMQServerPluginTest.infoLog($1, $0)")})
public void testLogConsumerEvents() throws Exception {
ActiveMQServer activeMQServer = createServerWithLoggingPlugin(LoggingActiveMQServerPlugin.LOG_CONSUMER_EVENTS);
activeMQServer.start();
try {
sendAndReceive(true, true, "txtMessage", 0);
assertEquals("created connections", 0, createdConnectionLogs.size());
assertEquals("destroyed connections", 0, destroyedConnectionLogs.size());
assertEquals("created sessions", 0, createdSessionLogs.size());
assertEquals("closed sessions", 0, createdSessionLogs.size());
assertEquals("created consumer", 1, createdConsumerLogs.size());
assertEquals("closed consumer", 2, closedConsumerLogs.size());
assertEquals("delivered message", 0, deliveredLogs.size());
assertEquals("acked message", 0, ackedLogs.size());
assertEquals("sending message", 0, sentLogs.size());
assertEquals("routing message", 0, routedLogs.size());
assertEquals("queue created", 0, createdQueueLogs.size());
assertEquals("queue destroyed", 0, destroyedQueueLogs.size());
assertEquals("expired message", 0, messageExpiredLogs.size());
assertEquals("unexpected logs", 0, unexpectedLogs.size());
} finally {
activeMQServer.stop();
//reset the logs lists
clearLogLists();
}
}
/**
* Aim: test the session create/close events are logged when plugin configured with
* LOG_SESSION_EVENTS
*
* Overriden as addedMetaData does not seem to be invoked for AMQP
*
* @throws Exception
*/
@Override
@Test
@BMRules(rules = {@BMRule(name = "test LOG_SESSION_EVENTS",
targetClass = "org.jboss.logging.Logger",
targetMethod = "infof",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.LoggingActiveMQServerPluginTest.infoLog($1, $0)")})
public void testLogSessionEvents() throws Exception {
ActiveMQServer activeMQServer = createServerWithLoggingPlugin(LoggingActiveMQServerPlugin.LOG_SESSION_EVENTS);
activeMQServer.start();
try {
sendAndReceive(false,false,"test_message",0);
Thread.sleep(500);
assertEquals("created connections", 0, createdConnectionLogs.size());
assertEquals("destroyed connections", 0, destroyedConnectionLogs.size());
assertEquals("created sessions", 2, createdSessionLogs.size());
assertEquals("closed sessions", 2, closedSessionLogs.size());
assertEquals("unexpected logs", 0, unexpectedLogs.size());
} finally {
activeMQServer.stop();
//reset the logs lists
clearLogLists();
}
}
@Override
protected Connection createActiveMQConnection() throws JMSException {
JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
return factory.createConnection();
}
}

View File

@ -0,0 +1,190 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.extras.byteman;
import javax.jms.Connection;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.junit.Test;
import org.junit.runner.RunWith;
/**
*
* NOTE: this test should be run at log level INFO
*
* This test checks the LoggingActiveMQServerPlugin is logging expected data with specific property configurations
* when client using OpenWire
*
*/
@RunWith(BMUnitRunner.class)
public class LoggingActiveMQServerPluginOpenWireTest extends LoggingActiveMQServerPluginTest {
/**
* Aim: test queue creation are logged when plugin configured with
* LOG_INTERNAL_EVENTS
*
* Overiden as Openwire does not seem to destroy the queue.
*
* @throws Exception
*/
@Override
@Test
@BMRules(rules = {@BMRule(name = "test queue creation log",
targetClass = "org.jboss.logging.Logger",
targetMethod = "infof",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.LoggingActiveMQServerPluginTest.infoLog($1, $0)")})
public void testQueueCreationLog() throws Exception {
//initial plugin
ActiveMQServer activeMQServer = createServerWithLoggingPlugin(LoggingActiveMQServerPlugin.LOG_INTERNAL_EVENTS);
activeMQServer.start();
try {
sendAndReceive(false, true, "NO_MESSAGE", 0);
Thread.sleep(500);
assertEquals("created connections", 0, createdConnectionLogs.size());
assertEquals("destroyed connections", 0, destroyedConnectionLogs.size());
assertEquals("created sessions", 0, createdSessionLogs.size());
assertEquals("closed sessions", 0, createdSessionLogs.size());
assertEquals("created consumer", 0, createdConsumerLogs.size());
assertEquals("closed consumer", 0, closedConsumerLogs.size());
assertEquals("delivered message", 0, deliveredLogs.size());
assertEquals("acked message", 0, ackedLogs.size());
assertEquals("sending message", 0, sentLogs.size());
assertEquals("routing message", 0, routedLogs.size());
assertEquals("queue created", 1, createdQueueLogs.size());
assertEquals("queue destroyed", 0, destroyedQueueLogs.size());
assertEquals("expired message", 0, messageExpiredLogs.size());
assertEquals("unexpected logs", 0, unexpectedLogs.size());
} finally {
activeMQServer.stop();
//reset the logs lists
clearLogLists();
}
}
/**
* Aim: test the session create/close events are logged when plugin configured with
* LOG_SESSION_EVENTS
*
* Overriden as
* - ceated/closed sessions is invoke 3 times for openwire
* - no meta data added to the session .
* @throws Exception
*/
@Override
@Test
@BMRules(rules = {@BMRule(name = "test LOG_SESSION_EVENTS",
targetClass = "org.jboss.logging.Logger",
targetMethod = "infof",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.LoggingActiveMQServerPluginTest.infoLog($1, $0)")})
public void testLogSessionEvents() throws Exception {
ActiveMQServer activeMQServer = createServerWithLoggingPlugin(LoggingActiveMQServerPlugin.LOG_SESSION_EVENTS);
activeMQServer.start();
try {
sendAndReceive(false,false,"test_message",0);
Thread.sleep(500);
assertEquals("created connections", 0, createdConnectionLogs.size());
assertEquals("destroyed connections", 0, destroyedConnectionLogs.size());
assertEquals("created sessions", 3, createdSessionLogs.size());
assertEquals("closed sessions", 3, closedSessionLogs.size());
assertEquals("added Metadata logs", 0, addedSessionMetaData.size());
assertEquals("unexpected logs", 0, unexpectedLogs.size());
} finally {
activeMQServer.stop();
//reset the logs lists
clearLogLists();
}
}
/**
* Aim: test all events are logged when plugin configured with
* LOG_ALL_EVENTS
*
* Overridden as behaviour slightly different for queue create/destroy with Openwire
*
* @throws Exception
*/
@Override
@Test
@BMRules(rules = {@BMRule(name = "test logAll EVENT",
targetClass = "org.jboss.logging.Logger",
targetMethod = "infof",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.LoggingActiveMQServerPluginTest.infoLog($1, $0)")})
public void testLogAll() throws Exception {
//initial plugin
ActiveMQServer activeMQServer = createServerWithLoggingPlugin(LoggingActiveMQServerPlugin.LOG_ALL_EVENTS);
activeMQServer.start();
try {
sendAndReceive(true, true, "log ALL Message_1", 0);
sendAndReceive(true, true, "log ALL Message_2", 0);
Thread.sleep(500);
for (String log : unexpectedLogs) {
System.out.println(" others events logged >>>>" + log);
}
assertEquals("created connections", 2, createdConnectionLogs.size());
assertEquals("destroyed connections", 2, destroyedConnectionLogs.size());
//assertEquals("created sessions", 0, createdSessionLogs.size());
//assertEquals("closed sessions", 0, createdSessionLogs.size());
assertEquals("created consumer", 2, createdConsumerLogs.size());
assertEquals("closed consumer", 2, closedConsumerLogs.size());
assertEquals("delivered message", 2, deliveredLogs.size());
assertEquals("acked message", 2, ackedLogs.size());
assertEquals("sending message", 2, sentLogs.size());
assertEquals("routing message", 2, routedLogs.size());
assertEquals("queue created", 1, createdQueueLogs.size());
assertEquals("queue destroyed", 0, destroyedQueueLogs.size());
assertEquals("expired message", 0, messageExpiredLogs.size());
} finally {
activeMQServer.stop();
//reset the logs lists
clearLogLists();
}
}
@Override
protected Connection createActiveMQConnection() throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.watchTopicAdvisories=false");
return factory.createConnection();
}
}

View File

@ -0,0 +1,532 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.extras.byteman;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin;
import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.junit.Test;
import org.junit.runner.RunWith;
/**
* NOTE: this test should be run at log level INFO
*
* This test checks the LoggingActiveMQServerPlugin is logging expected data with specific property configurations
* when using CORE protocol
*
* Byteman is used to intercept the logged message and uses a helper method to put the messages into seperate lists
* based on its content. The test subsequently checks to ensure correct number of logs in each list.
*/
@RunWith(BMUnitRunner.class)
public class LoggingActiveMQServerPluginTest extends ActiveMQTestBase {
//buckets for logging
protected static List<String> createdConnectionLogs = new ArrayList<>();
protected static List<String> destroyedConnectionLogs = new ArrayList<>();
protected static List<String> createdSessionLogs = new ArrayList<>();
protected static List<String> closedSessionLogs = new ArrayList<>();
protected static List<String> createdConsumerLogs = new ArrayList<>();
protected static List<String> closedConsumerLogs = new ArrayList<>();
protected static List<String> deliveredLogs = new ArrayList<>();
protected static List<String> ackedLogs = new ArrayList<>();
protected static List<String> sentLogs = new ArrayList<>();
protected static List<String> routedLogs = new ArrayList<>();
protected static List<String> createdQueueLogs = new ArrayList<>();
protected static List<String> destroyedQueueLogs = new ArrayList<>();
protected static List<String> messageExpiredLogs = new ArrayList<>();
protected static List<String> unexpectedLogs = new ArrayList<>();
protected static List<String> addedSessionMetaData = new ArrayList<>();
/**
* Aim: verify connection creation/destroy is logged when LOG_CONNECTION_EVENTS is set
*
* @throws Exception
*/
@Test
@BMRules(rules = {@BMRule(name = "test LOG_CONNECTION_EVENTS",
targetClass = "org.jboss.logging.Logger",
targetMethod = "infof",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.LoggingActiveMQServerPluginTest.infoLog($1, $0)")})
public void testLogConnectEvents() throws Exception {
ActiveMQServer activeMQServer = createServerWithLoggingPlugin(LoggingActiveMQServerPlugin.LOG_CONNECTION_EVENTS);
activeMQServer.start();
try {
Connection connection = createActiveMQConnection();
connection.start();
Thread.sleep(100);
connection.close();
//ensure logs are collected.
Thread.sleep(500);
assertEquals("created connections", 1, createdConnectionLogs.size());
assertEquals("destroyed connections", 1, destroyedConnectionLogs.size());
assertEquals("created sessions", 0, createdSessionLogs.size());
assertEquals("closed sessions", 0, createdSessionLogs.size());
assertEquals("created consumer", 0, createdConsumerLogs.size());
assertEquals("closed consumer", 0, closedConsumerLogs.size());
assertEquals("delivered message", 0, deliveredLogs.size());
assertEquals("acked message", 0, ackedLogs.size());
assertEquals("sending message", 0, sentLogs.size());
assertEquals("routing message", 0, routedLogs.size());
assertEquals("queue created", 0, createdQueueLogs.size());
assertEquals("queue destroyed", 0, destroyedQueueLogs.size());
assertEquals("expired message", 0, messageExpiredLogs.size());
assertEquals("unexpected logs", 0, unexpectedLogs.size());
} finally {
activeMQServer.stop();
//reset the logs lists
clearLogLists();
}
}
/**
* Aim: test the session create/close events are logged when plugin configured with
* LOG_SESSION_EVENTS
*
* NOTE: Session plugin points seem to be invoked twice - did not expect that.
*
* @throws Exception
*/
@Test
@BMRules(rules = {@BMRule(name = "test LOG_SESSION_EVENTS",
targetClass = "org.jboss.logging.Logger",
targetMethod = "infof",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.LoggingActiveMQServerPluginTest.infoLog($1, $0)")})
public void testLogSessionEvents() throws Exception {
ActiveMQServer activeMQServer = createServerWithLoggingPlugin(LoggingActiveMQServerPlugin.LOG_SESSION_EVENTS);
activeMQServer.start();
try {
sendAndReceive(false, false, "test_message", 0);
Thread.sleep(500);
assertEquals("created connections", 0, createdConnectionLogs.size());
assertEquals("destroyed connections", 0, destroyedConnectionLogs.size());
assertEquals("created sessions", 2, createdSessionLogs.size());
assertEquals("closed sessions", 2, closedSessionLogs.size());
assertEquals("added Metadata logs", 2, addedSessionMetaData.size());
assertEquals("unexpected logs", 0, unexpectedLogs.size());
} finally {
activeMQServer.stop();
//reset the logs lists
clearLogLists();
}
}
/**
* Aim: test the consumer create/close events are logged when plugin configured with
* LOG_CONSUMER_EVENTS
* @throws Exception
*/
@Test
@BMRules(rules = {@BMRule(name = "test LOG_CONSUMER_EVENTS",
targetClass = "org.jboss.logging.Logger",
targetMethod = "infof",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.LoggingActiveMQServerPluginTest.infoLog($1, $0)")})
public void testLogConsumerEvents() throws Exception {
ActiveMQServer activeMQServer = createServerWithLoggingPlugin(LoggingActiveMQServerPlugin.LOG_CONSUMER_EVENTS);
activeMQServer.start();
try {
sendAndReceive(true, true, "txtMessage", 0);
assertEquals("created connections", 0, createdConnectionLogs.size());
assertEquals("destroyed connections", 0, destroyedConnectionLogs.size());
assertEquals("created sessions", 0, createdSessionLogs.size());
assertEquals("closed sessions", 0, createdSessionLogs.size());
assertEquals("created consumer", 1, createdConsumerLogs.size());
assertEquals("closed consumer", 1, closedConsumerLogs.size());
assertEquals("delivered message", 0, deliveredLogs.size());
assertEquals("acked message", 0, ackedLogs.size());
assertEquals("sending message", 0, sentLogs.size());
assertEquals("routing message", 0, routedLogs.size());
assertEquals("queue created", 0, createdQueueLogs.size());
assertEquals("queue destroyed", 0, destroyedQueueLogs.size());
assertEquals("expired message", 0, messageExpiredLogs.size());
assertEquals("unexpected logs", 0, unexpectedLogs.size());
} finally {
activeMQServer.stop();
//reset the logs lists
clearLogLists();
}
}
/**
* Aim: test delivering events are logged when plugin configured with
* LOG_DELIVERING_EVENTS
* @throws Exception
*/
@Test
@BMRules(rules = {@BMRule(name = "test LOG_DELIVERING_EVENTS",
targetClass = "org.jboss.logging.Logger",
targetMethod = "infof",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.LoggingActiveMQServerPluginTest.infoLog($1, $0)")})
public void testLogDeliveringEvents() throws Exception {
ActiveMQServer activeMQServer = createServerWithLoggingPlugin(LoggingActiveMQServerPlugin.LOG_DELIVERING_EVENTS);
activeMQServer.start();
try {
sendAndReceive(true, true, "txtMessage 1", 0);
assertEquals("created connections", 0, createdConnectionLogs.size());
assertEquals("destroyed connections", 0, destroyedConnectionLogs.size());
assertEquals("created sessions", 0, createdSessionLogs.size());
assertEquals("closed sessions", 0, createdSessionLogs.size());
assertEquals("created consumer", 0, createdConsumerLogs.size());
assertEquals("closed consumer", 0, closedConsumerLogs.size());
assertEquals("delivered message", 1, deliveredLogs.size());
assertEquals("acked message", 1, ackedLogs.size());
assertEquals("sending message", 0, sentLogs.size());
assertEquals("routing message", 0, routedLogs.size());
assertEquals("queue created", 0, createdQueueLogs.size());
assertEquals("queue destroyed", 0, destroyedQueueLogs.size());
assertEquals("expired message", 0, messageExpiredLogs.size());
assertEquals("unexpected logs", 0, unexpectedLogs.size());
} finally {
activeMQServer.stop();
//reset the logs lists
clearLogLists();
}
}
/**
* Aim: test sending events are logged when plugin configured with
* LOG_SENDING_EVENTS
* @throws Exception
*/
@Test
@BMRules(rules = {@BMRule(name = "test LOG_SENDING_EVENTS",
targetClass = "org.jboss.logging.Logger",
targetMethod = "infof",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.LoggingActiveMQServerPluginTest.infoLog($1, $0)")})
public void testLogSendingEvents() throws Exception {
//initial plugin
ActiveMQServer activeMQServer = createServerWithLoggingPlugin(LoggingActiveMQServerPlugin.LOG_SENDING_EVENTS);
activeMQServer.start();
try {
sendAndReceive(true, true, "txtMessage 1", 0);
sendAndReceive(true, true, "txtMessage 2", 0);
assertEquals("created connections", 0, createdConnectionLogs.size());
assertEquals("destroyed connections", 0, destroyedConnectionLogs.size());
assertEquals("created sessions", 0, createdSessionLogs.size());
assertEquals("closed sessions", 0, createdSessionLogs.size());
assertEquals("created consumer", 0, createdConsumerLogs.size());
assertEquals("closed consumer", 0, closedConsumerLogs.size());
assertEquals("delivered message", 0, deliveredLogs.size());
assertEquals("acked message", 0, ackedLogs.size());
assertEquals("sending message", 2, sentLogs.size());
assertEquals("routing message", 2, routedLogs.size());
assertEquals("queue created", 0, createdQueueLogs.size());
assertEquals("queue destroyed", 0, destroyedQueueLogs.size());
assertEquals("expired message", 0, messageExpiredLogs.size());
assertEquals("unexpected logs", 0, unexpectedLogs.size());
} finally {
activeMQServer.stop();
//reset the logs lists
clearLogLists();
}
}
/**
* Aim: test queue creation/destroy are logged when plugin configured with
* LOG_INTERNAL_EVENTS
* @throws Exception
*/
@Test
@BMRules(rules = {@BMRule(name = "test queue creation log",
targetClass = "org.jboss.logging.Logger",
targetMethod = "infof",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.LoggingActiveMQServerPluginTest.infoLog($1, $0)")})
public void testQueueCreationLog() throws Exception {
//initial plugin
ActiveMQServer activeMQServer = createServerWithLoggingPlugin(LoggingActiveMQServerPlugin.LOG_INTERNAL_EVENTS);
activeMQServer.start();
try {
sendAndReceive(false, true, "NO_MESSAGE", 0);
assertEquals("created connections", 0, createdConnectionLogs.size());
assertEquals("destroyed connections", 0, destroyedConnectionLogs.size());
assertEquals("created sessions", 0, createdSessionLogs.size());
assertEquals("closed sessions", 0, createdSessionLogs.size());
assertEquals("created consumer", 0, createdConsumerLogs.size());
assertEquals("closed consumer", 0, closedConsumerLogs.size());
assertEquals("delivered message", 0, deliveredLogs.size());
assertEquals("acked message", 0, ackedLogs.size());
assertEquals("sending message", 0, sentLogs.size());
assertEquals("routing message", 0, routedLogs.size());
assertEquals("queue created", 1, createdQueueLogs.size());
assertEquals("queue destroyed", 1, destroyedQueueLogs.size());
assertEquals("expired message", 0, messageExpiredLogs.size());
assertEquals("unexpected logs", 0, unexpectedLogs.size());
} finally {
activeMQServer.stop();
//reset the logs lists
clearLogLists();
}
}
/**
* Aim: test message expiry is logged when plugin configured with
* LOG_INTERNAL_EVENTS
* @throws Exception
*/
@Test
@BMRules(rules = {@BMRule(name = "test queue creation log",
targetClass = "org.jboss.logging.Logger",
targetMethod = "infof",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.LoggingActiveMQServerPluginTest.infoLog($1, $0)")})
public void testExpireMessageLog() throws Exception {
//initial plugin
ActiveMQServer activeMQServer = createServerWithLoggingPlugin(LoggingActiveMQServerPlugin.LOG_INTERNAL_EVENTS);
activeMQServer.start();
try {
sendAndReceive(true, false, "message to expiry", 100);
Thread.sleep(1000);
sendAndReceive(false, true, "NO_MESSAGE", 0);
assertEquals("created connections", 0, createdConnectionLogs.size());
assertEquals("destroyed connections", 0, destroyedConnectionLogs.size());
assertEquals("created sessions", 0, createdSessionLogs.size());
assertEquals("closed sessions", 0, createdSessionLogs.size());
assertEquals("created consumer", 0, createdConsumerLogs.size());
assertEquals("closed consumer", 0, closedConsumerLogs.size());
assertEquals("delivered message", 0, deliveredLogs.size());
assertEquals("acked message", 0, ackedLogs.size());
assertEquals("sending message", 0, sentLogs.size());
assertEquals("routing message", 0, routedLogs.size());
assertEquals("queue created", 1, createdQueueLogs.size());
assertEquals("queue destroyed", 1, destroyedQueueLogs.size());
assertEquals("expired message", 1, messageExpiredLogs.size());
assertEquals("unexpected logs", 0, unexpectedLogs.size());
} finally {
activeMQServer.stop();
//reset the logs lists
clearLogLists();
}
}
/**
* Aim: test all events are logged when plugin configured with
* LOG_ALL_EVENTS
* @throws Exception
*/
@Test
@BMRules(rules = {@BMRule(name = "test logAll EVENT",
targetClass = "org.jboss.logging.Logger",
targetMethod = "infof",
targetLocation = "ENTRY",
action = "org.apache.activemq.artemis.tests.extras.byteman.LoggingActiveMQServerPluginTest.infoLog($1, $0)")})
public void testLogAll() throws Exception {
//initial plugin
ActiveMQServer activeMQServer = createServerWithLoggingPlugin(LoggingActiveMQServerPlugin.LOG_ALL_EVENTS);
activeMQServer.start();
try {
sendAndReceive(true, true, "log ALL Message_1", 0);
sendAndReceive(true, true, "log ALL Message_2", 0);
Thread.sleep(500);
assertEquals("created connections", 2, createdConnectionLogs.size());
assertEquals("destroyed connections", 2, destroyedConnectionLogs.size());
assertEquals("created consumer", 2, createdConsumerLogs.size());
assertEquals("closed consumer", 2, closedConsumerLogs.size());
assertEquals("delivered message", 2, deliveredLogs.size());
assertEquals("acked message", 2, ackedLogs.size());
assertEquals("sending message", 2, sentLogs.size());
assertEquals("routing message", 2, routedLogs.size());
assertEquals("queue created", 2, createdQueueLogs.size());
assertEquals("queue destroyed", 2, destroyedQueueLogs.size());
assertEquals("expired message", 0, messageExpiredLogs.size());
} finally {
activeMQServer.stop();
//reset the logs lists
clearLogLists();
}
}
//collect the log invocation from LoggingActiveMQServerPlugin
public static void infoLog(Object message, org.jboss.logging.Logger logger) {
//not interested
if (!logger.getName().equals(LoggingActiveMQServerPlugin.class.getName())) {
return;
}
String stringMessage = (String) message;
if (stringMessage.startsWith("created connection")) {
createdConnectionLogs.add(stringMessage);
} else if (stringMessage.startsWith("destroyed connection")) {
destroyedConnectionLogs.add(stringMessage);
} else if (stringMessage.startsWith("created session")) {
createdSessionLogs.add(stringMessage);
} else if (stringMessage.startsWith("closed session")) {
closedSessionLogs.add(stringMessage);
} else if (stringMessage.startsWith("added session metadata")) {
addedSessionMetaData.add(stringMessage);
} else if (stringMessage.startsWith("created consumer")) {
createdConsumerLogs.add(stringMessage);
} else if (stringMessage.startsWith("closed consumer")) {
closedConsumerLogs.add(stringMessage);
} else if (stringMessage.startsWith("delivered message")) {
deliveredLogs.add(stringMessage);
} else if (stringMessage.startsWith("acknowledged message")) {
ackedLogs.add(stringMessage);
} else if (stringMessage.startsWith("sent message")) {
sentLogs.add(stringMessage);
} else if (stringMessage.startsWith("routed message")) {
routedLogs.add(stringMessage);
} else if (stringMessage.startsWith("created queue")) {
createdQueueLogs.add(stringMessage);
} else if (stringMessage.startsWith("destroyed queue")) {
destroyedQueueLogs.add(stringMessage);
} else if (stringMessage.startsWith("expired")) {
messageExpiredLogs.add(stringMessage);
} else {
unexpectedLogs.add(stringMessage);
}
}
//----- helper methods ---
protected void clearLogLists() {
createdConnectionLogs.clear();
destroyedConnectionLogs.clear();
createdSessionLogs.clear();
closedSessionLogs.clear();
createdConsumerLogs.clear();
closedConsumerLogs.clear();
deliveredLogs.clear();
ackedLogs.clear();
sentLogs.clear();
routedLogs.clear();
createdQueueLogs.clear();
destroyedQueueLogs.clear();
messageExpiredLogs.clear();
unexpectedLogs.clear();
addedSessionMetaData.clear();
}
protected ActiveMQServer createServerWithLoggingPlugin(String loggingPluginEventType) throws Exception {
//initial plugin
LoggingActiveMQServerPlugin loggingActiveMQServerPlugin = new LoggingActiveMQServerPlugin();
Map<String, String> properties = new HashMap<>();
properties.put(loggingPluginEventType, "true");
loggingActiveMQServerPlugin.init(properties);
//register
Configuration config = createDefaultConfig(true);
config.registerBrokerPlugin(loggingActiveMQServerPlugin);
return createServer(false, config);
}
protected void sendAndReceive(boolean send,
boolean receive,
String txtMessage,
long expiry) throws JMSException, InterruptedException {
Connection connection = createActiveMQConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
Queue queue = session.createQueue("TEST.QUEUE");
MessageConsumer messageConsumer = null;
if (receive) {
messageConsumer = session.createConsumer(queue);
Thread.sleep(1000);
}
if (send) {
MessageProducer messageProducer = session.createProducer(queue);
if (expiry > 0) {
messageProducer.setTimeToLive(expiry);
}
messageProducer.send(session.createTextMessage(txtMessage));
}
if (receive) {
messageConsumer.receive(100);
messageConsumer.close();
}
session.close();
connection.close();
}
protected Connection createActiveMQConnection() throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
return (ActiveMQConnection) factory.createConnection();
}
}

View File

@ -20,6 +20,7 @@ import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.config.impl.FileConfiguration; import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin;
import org.apache.activemq.artemis.jms.server.config.impl.FileJMSConfiguration; import org.apache.activemq.artemis.jms.server.config.impl.FileJMSConfiguration;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Test; import org.junit.Test;
@ -28,14 +29,7 @@ public class XmlConfigPluginTest extends ActiveMQTestBase {
@Test @Test
public void testStopStart1() throws Exception { public void testStopStart1() throws Exception {
FileConfiguration fc = new FileConfiguration(); ActiveMQServer server = createServerFromConfig("broker-plugins-config.xml");
FileJMSConfiguration fileConfiguration = new FileJMSConfiguration();
FileDeploymentManager deploymentManager = new FileDeploymentManager("broker-plugins-config.xml");
deploymentManager.addDeployable(fc);
deploymentManager.addDeployable(fileConfiguration);
deploymentManager.readConfiguration();
ActiveMQServer server = addServer(new ActiveMQServerImpl(fc));
try { try {
server.start(); server.start();
assertEquals(2, server.getBrokerPlugins().size()); assertEquals(2, server.getBrokerPlugins().size());
@ -51,4 +45,70 @@ public class XmlConfigPluginTest extends ActiveMQTestBase {
} }
} }
} }
/**
* Ensure the configuration is bring picked up correctly by LoggingActiveMQServerPlugin
* @throws Exception
*/
@Test
public void testLoggingActiveMQServerPlugin() throws Exception {
ActiveMQServer server = createServerFromConfig("broker-logging-plugin.xml");
try {
server.start();
assertEquals("only one plugin should be registered",1, server.getBrokerPlugins().size());
assertTrue("ensure LoggingActiveMQServerPlugin is registered",server.getBrokerPlugins().get(0) instanceof LoggingActiveMQServerPlugin);
LoggingActiveMQServerPlugin loggingActiveMQServerPlugin = (LoggingActiveMQServerPlugin) server.getBrokerPlugins().get(0);
assertEquals("check logAll", true, loggingActiveMQServerPlugin.isLogAll());
assertEquals("check logConnectionEvents", true, loggingActiveMQServerPlugin.isLogConnectionEvents());
assertEquals("check logSessionEvents", true, loggingActiveMQServerPlugin.isLogSessionEvents());
assertEquals("check logConsumerEvents", true, loggingActiveMQServerPlugin.isLogConsumerEvents());
assertEquals("check logDeliveringEvents", true, loggingActiveMQServerPlugin.isLogDeliveringEvents());
assertEquals("check logSendingEvents", true, loggingActiveMQServerPlugin.isLogSendingEvents());
assertEquals("check logInternalEvents", true, loggingActiveMQServerPlugin.isLogInternalEvents());
} finally {
if (server != null) {
server.stop();
}
}
}
/**
* ensure the LoggingActiveMQServerPlugin uses default values when configured with incorrect values
* @throws Exception
*/
@Test
public void testLoggingActiveMQServerPluginWrongValue() throws Exception {
ActiveMQServer server = createServerFromConfig("broker-logging-plugin-wrong.xml");
try {
server.start();
assertEquals("only one plugin should be registered",1, server.getBrokerPlugins().size());
assertTrue("ensure LoggingActiveMQServerPlugin is registered",server.getBrokerPlugins().get(0) instanceof LoggingActiveMQServerPlugin);
LoggingActiveMQServerPlugin loggingActiveMQServerPlugin = (LoggingActiveMQServerPlugin) server.getBrokerPlugins().get(0);
assertEquals("check logAll", false, loggingActiveMQServerPlugin.isLogAll());
assertEquals("check logConnectionEvents", false, loggingActiveMQServerPlugin.isLogConnectionEvents());
assertEquals("check logSessionEvents", false, loggingActiveMQServerPlugin.isLogSessionEvents());
assertEquals("check logConsumerEvents", false, loggingActiveMQServerPlugin.isLogConsumerEvents());
assertEquals("check logDeliveringEvents", false, loggingActiveMQServerPlugin.isLogDeliveringEvents());
assertEquals("check logSendingEvents", false, loggingActiveMQServerPlugin.isLogSendingEvents());
assertEquals("check logInternalEvents", false, loggingActiveMQServerPlugin.isLogInternalEvents());
} finally {
if (server != null) {
server.stop();
}
}
}
private ActiveMQServer createServerFromConfig(String configFileName) throws Exception {
FileConfiguration fc = new FileConfiguration();
FileJMSConfiguration fileConfiguration = new FileJMSConfiguration();
FileDeploymentManager deploymentManager = new FileDeploymentManager(configFileName);
deploymentManager.addDeployable(fc);
deploymentManager.addDeployable(fileConfiguration);
deploymentManager.readConfiguration();
return addServer(new ActiveMQServerImpl(fc));
}
} }

View File

@ -0,0 +1,51 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
<jms xmlns="urn:activemq:jms">
<queue name="myJMSQueue"/>
</jms>
<core xmlns="urn:activemq:core">
<connectors>
<connector name="netty-connector">tcp://localhost:61616</connector>
</connectors>
<journal-directory>./target/tmp/activemq-unit-test/broker-plugin-test/</journal-directory>
<acceptors>
<acceptor name="netty-acceptor">tcp://localhost:61616</acceptor>
</acceptors>
<security-enabled>false</security-enabled>
<queues>
<queue name="myQueue">
<address>myAddress</address>
</queue>
</queues>
<broker-plugins>
<broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
<property key="LOG_ALL_EVENTS" value="wrong_value"/>
<property key="LOG_CONSUMER_EVENTS" value="wrong_value"/>
</broker-plugin>
</broker-plugins>
</core>
</configuration>

View File

@ -0,0 +1,56 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
<jms xmlns="urn:activemq:jms">
<queue name="myJMSQueue"/>
</jms>
<core xmlns="urn:activemq:core">
<connectors>
<connector name="netty-connector">tcp://localhost:61616</connector>
</connectors>
<journal-directory>./target/tmp/activemq-unit-test/broker-plugin-test/</journal-directory>
<acceptors>
<acceptor name="netty-acceptor">tcp://localhost:61616</acceptor>
</acceptors>
<security-enabled>false</security-enabled>
<queues>
<queue name="myQueue">
<address>myAddress</address>
</queue>
</queues>
<broker-plugins>
<broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
<property key="LOG_ALL_EVENTS" value="true"/>
<property key="LOG_CONNECTION_EVENTS" value="true"/>
<property key="LOG_SESSION_EVENTS" value="true"/>
<property key="LOG_CONSUMER_EVENTS" value="true"/>
<property key="LOG_DELIVERING_EVENTS" value="true"/>
<property key="LOG_SENDING_EVENTS" value="true"/>
<property key="LOG_INTERNAL_EVENTS" value="true"/>
</broker-plugin>
</broker-plugins>
</core>
</configuration>