mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-6446 - use shared logger and pepend statements with connection counter. Old behaviour or per connection logger can be obtained with trace=true&jmxPort=0
This commit is contained in:
parent
c60d716968
commit
5385fd1bb3
|
@ -27,6 +27,8 @@ import org.apache.activemq.util.LogWriterFinder;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.apache.activemq.TransportLoggerSupport.defaultJmxPort;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Singleton class to create TransportLogger objects.
|
* Singleton class to create TransportLogger objects.
|
||||||
* When the method getInstance() is called for the first time,
|
* When the method getInstance() is called for the first time,
|
||||||
|
@ -61,10 +63,6 @@ public class TransportLoggerFactory {
|
||||||
* This setting only has a meaning if
|
* This setting only has a meaning if
|
||||||
*/
|
*/
|
||||||
private static boolean defaultInitialBehavior = true;
|
private static boolean defaultInitialBehavior = true;
|
||||||
/**
|
|
||||||
* Default port to control the transport loggers through JMX
|
|
||||||
*/
|
|
||||||
private static int defaultJmxPort = 1099;
|
|
||||||
|
|
||||||
private boolean transportLoggerControlCreated = false;
|
private boolean transportLoggerControlCreated = false;
|
||||||
private ManagementContext managementContext;
|
private ManagementContext managementContext;
|
||||||
|
@ -137,7 +135,11 @@ public class TransportLoggerFactory {
|
||||||
*/
|
*/
|
||||||
public TransportLogger createTransportLogger(Transport next, String logWriterName,
|
public TransportLogger createTransportLogger(Transport next, String logWriterName,
|
||||||
boolean useJmx, boolean startLogging, int jmxport) throws IOException {
|
boolean useJmx, boolean startLogging, int jmxport) throws IOException {
|
||||||
int id = getNextId();
|
int id = -1; // new default to single logger
|
||||||
|
// allow old behaviour with incantation
|
||||||
|
if (!useJmx && jmxport != defaultJmxPort) {
|
||||||
|
id = getNextId();
|
||||||
|
}
|
||||||
return createTransportLogger(next, id, createLog(id), logWriterName, useJmx, startLogging, jmxport);
|
return createTransportLogger(next, id, createLog(id), logWriterName, useJmx, startLogging, jmxport);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -162,6 +164,9 @@ public class TransportLoggerFactory {
|
||||||
String logWriterName, boolean dynamicManagement, boolean startLogging, int jmxport) throws IOException {
|
String logWriterName, boolean dynamicManagement, boolean startLogging, int jmxport) throws IOException {
|
||||||
try {
|
try {
|
||||||
LogWriter logWriter = logWriterFinder.newInstance(logWriterName);
|
LogWriter logWriter = logWriterFinder.newInstance(logWriterName);
|
||||||
|
if (id == -1) {
|
||||||
|
logWriter.setPrefix(String.format("%08X: ", getNextId()));
|
||||||
|
}
|
||||||
TransportLogger tl = new TransportLogger (next, log, startLogging, logWriter);
|
TransportLogger tl = new TransportLogger (next, log, startLogging, logWriter);
|
||||||
if (dynamicManagement) {
|
if (dynamicManagement) {
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
|
@ -183,7 +188,7 @@ public class TransportLoggerFactory {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Logger createLog(int id) {
|
private static Logger createLog(int id) {
|
||||||
return LoggerFactory.getLogger(TransportLogger.class.getName()+".Connection:" + id);
|
return LoggerFactory.getLogger(TransportLogger.class.getName()+".Connection" + (id > 0 ? ":"+id : "" ));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -37,6 +37,11 @@ import org.slf4j.Logger;
|
||||||
*/
|
*/
|
||||||
public class CustomLogWriter implements LogWriter {
|
public class CustomLogWriter implements LogWriter {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setPrefix(String prefix) {
|
||||||
|
// for the custom case, revert to the logger per connection
|
||||||
|
}
|
||||||
|
|
||||||
// doc comment inherited from LogWriter
|
// doc comment inherited from LogWriter
|
||||||
public void initialMessage(Logger log) {
|
public void initialMessage(Logger log) {
|
||||||
|
|
||||||
|
|
|
@ -30,6 +30,12 @@ import org.slf4j.Logger;
|
||||||
*/
|
*/
|
||||||
public class DefaultLogWriter implements LogWriter {
|
public class DefaultLogWriter implements LogWriter {
|
||||||
|
|
||||||
|
String prefix = "";
|
||||||
|
@Override
|
||||||
|
public void setPrefix(String prefix) {
|
||||||
|
this.prefix = prefix;
|
||||||
|
}
|
||||||
|
|
||||||
// doc comment inherited from LogWriter
|
// doc comment inherited from LogWriter
|
||||||
public void initialMessage(Logger log) {
|
public void initialMessage(Logger log) {
|
||||||
// Default log writer does nothing here
|
// Default log writer does nothing here
|
||||||
|
@ -37,32 +43,32 @@ public class DefaultLogWriter implements LogWriter {
|
||||||
|
|
||||||
// doc comment inherited from LogWriter
|
// doc comment inherited from LogWriter
|
||||||
public void logRequest (Logger log, Object command) {
|
public void logRequest (Logger log, Object command) {
|
||||||
log.debug("SENDING REQUEST: "+command);
|
log.debug(prefix + "SENDING REQUEST: "+command);
|
||||||
}
|
}
|
||||||
|
|
||||||
// doc comment inherited from LogWriter
|
// doc comment inherited from LogWriter
|
||||||
public void logResponse (Logger log, Object response) {
|
public void logResponse (Logger log, Object response) {
|
||||||
log.debug("GOT RESPONSE: "+response);
|
log.debug(prefix + "GOT RESPONSE: "+response);
|
||||||
}
|
}
|
||||||
|
|
||||||
// doc comment inherited from LogWriter
|
// doc comment inherited from LogWriter
|
||||||
public void logAsyncRequest (Logger log, Object command) {
|
public void logAsyncRequest (Logger log, Object command) {
|
||||||
log.debug("SENDING ASNYC REQUEST: "+command);
|
log.debug(prefix + "SENDING ASNYC REQUEST: "+command);
|
||||||
}
|
}
|
||||||
|
|
||||||
// doc comment inherited from LogWriter
|
// doc comment inherited from LogWriter
|
||||||
public void logOneWay (Logger log, Object command) {
|
public void logOneWay (Logger log, Object command) {
|
||||||
log.debug("SENDING: "+command);
|
log.debug(prefix + "SENDING: "+command);
|
||||||
}
|
}
|
||||||
|
|
||||||
// doc comment inherited from LogWriter
|
// doc comment inherited from LogWriter
|
||||||
public void logReceivedCommand (Logger log, Object command) {
|
public void logReceivedCommand (Logger log, Object command) {
|
||||||
log.debug("RECEIVED: " + command);
|
log.debug(prefix + "RECEIVED: " + command);
|
||||||
}
|
}
|
||||||
|
|
||||||
// doc comment inherited from LogWriter
|
// doc comment inherited from LogWriter
|
||||||
public void logReceivedException (Logger log, IOException error) {
|
public void logReceivedException (Logger log, IOException error) {
|
||||||
log.debug("RECEIVED Exception: "+error, error);
|
log.debug(prefix + "RECEIVED Exception: "+error, error);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,12 @@ public class TransportLoggerSupport {
|
||||||
|
|
||||||
public static String defaultLogWriterName = "default";
|
public static String defaultLogWriterName = "default";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default port to control the transport loggers through JMX
|
||||||
|
*/
|
||||||
|
public static int defaultJmxPort = 1099;
|
||||||
|
|
||||||
|
|
||||||
public static interface SPI {
|
public static interface SPI {
|
||||||
public Transport createTransportLogger(Transport transport) throws IOException;
|
public Transport createTransportLogger(Transport transport) throws IOException;
|
||||||
public Transport createTransportLogger(Transport transport, String logWriterName, boolean dynamicManagement, boolean startLogging, int jmxPort) throws IOException;
|
public Transport createTransportLogger(Transport transport, String logWriterName, boolean dynamicManagement, boolean startLogging, int jmxPort) throws IOException;
|
||||||
|
|
|
@ -34,6 +34,13 @@ import org.slf4j.Logger;
|
||||||
*/
|
*/
|
||||||
public interface LogWriter {
|
public interface LogWriter {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* prefix each statement with this value. allows connections to be correlated
|
||||||
|
* when logger is shared
|
||||||
|
* @param prefix
|
||||||
|
*/
|
||||||
|
public void setPrefix(String prefix);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Writes a header message to the log.
|
* Writes a header message to the log.
|
||||||
* @param log The log to be written to.
|
* @param log The log to be written to.
|
||||||
|
|
|
@ -112,6 +112,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
|
||||||
* TransportConnector URIs.
|
* TransportConnector URIs.
|
||||||
*/
|
*/
|
||||||
protected boolean startLogging = true;
|
protected boolean startLogging = true;
|
||||||
|
protected int jmxPort = TransportLoggerSupport.defaultJmxPort;
|
||||||
protected final ServerSocketFactory serverSocketFactory;
|
protected final ServerSocketFactory serverSocketFactory;
|
||||||
protected final BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
|
protected final BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
|
||||||
protected Thread socketHandlerThread;
|
protected Thread socketHandlerThread;
|
||||||
|
@ -258,6 +259,14 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
|
||||||
this.dynamicManagement = useJmx;
|
this.dynamicManagement = useJmx;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setJmxPort(int jmxPort) {
|
||||||
|
this.jmxPort = jmxPort;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getJmxPort() {
|
||||||
|
return jmxPort;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isStartLogging() {
|
public boolean isStartLogging() {
|
||||||
return startLogging;
|
return startLogging;
|
||||||
}
|
}
|
||||||
|
@ -577,6 +586,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
|
||||||
options.put("logWriterName", logWriterName);
|
options.put("logWriterName", logWriterName);
|
||||||
options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
|
options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
|
||||||
options.put("startLogging", Boolean.valueOf(startLogging));
|
options.put("startLogging", Boolean.valueOf(startLogging));
|
||||||
|
options.put("jmxPort", Integer.valueOf(jmxPort));
|
||||||
options.putAll(transportOptions);
|
options.putAll(transportOptions);
|
||||||
|
|
||||||
TransportInfo transportInfo = configureTransport(this, socket);
|
TransportInfo transportInfo = configureTransport(this, socket);
|
||||||
|
|
|
@ -226,7 +226,7 @@ public class KahaDBTest extends TestCase {
|
||||||
assertEquals("Expected to received all messages.", count, 100);
|
assertEquals("Expected to received all messages.", count, 100);
|
||||||
broker.stop();
|
broker.stop();
|
||||||
|
|
||||||
Logger.getRootLogger().addAppender(appender);
|
Logger.getRootLogger().removeAppender(appender);
|
||||||
assertFalse("Did not replay any records from the journal", didSomeRecovery.get());
|
assertFalse("Did not replay any records from the journal", didSomeRecovery.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,152 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.usecases;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.broker.BrokerFactory;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.util.DefaultTestAppender;
|
||||||
|
import org.apache.log4j.Level;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.apache.log4j.spi.LoggingEvent;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
public class AMQ6446Test {
|
||||||
|
|
||||||
|
private BrokerService brokerService;
|
||||||
|
LinkedList<Connection> connections = new LinkedList<>();
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test2Connections() throws Exception {
|
||||||
|
final String urlTraceParam = "?trace=true";
|
||||||
|
startBroker(urlTraceParam);
|
||||||
|
final HashSet<String> loggers = new HashSet<String>();
|
||||||
|
final HashSet<String> messages = new HashSet<String>();
|
||||||
|
|
||||||
|
DefaultTestAppender appender = new DefaultTestAppender() {
|
||||||
|
@Override
|
||||||
|
public void doAppend(LoggingEvent event) {
|
||||||
|
loggers.add(event.getLoggerName());
|
||||||
|
messages.add(event.getRenderedMessage());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Logger.getRootLogger().addAppender(appender);
|
||||||
|
Logger.getRootLogger().setLevel(Level.DEBUG);
|
||||||
|
|
||||||
|
String brokerUrlWithTrace = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString() +
|
||||||
|
urlTraceParam;
|
||||||
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrlWithTrace);
|
||||||
|
|
||||||
|
for (int i=0; i<2; i++) {
|
||||||
|
Connection c = factory.createConnection();
|
||||||
|
c.start();
|
||||||
|
connections.add(c);
|
||||||
|
}
|
||||||
|
|
||||||
|
Logger.getRootLogger().removeAppender(appender);
|
||||||
|
|
||||||
|
// no logger ends with :2
|
||||||
|
assertFalse(foundMatch(loggers, ".*:2$"));
|
||||||
|
|
||||||
|
// starts with 000000x:
|
||||||
|
assertTrue(foundMatch(messages, "^0+\\d:.*"));
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean foundMatch(Collection<String> values, String regex) {
|
||||||
|
boolean found = false;
|
||||||
|
Pattern p = Pattern.compile(regex);
|
||||||
|
|
||||||
|
for (String input: values) {
|
||||||
|
Matcher m = p.matcher(input);
|
||||||
|
found = m.matches();
|
||||||
|
if (found) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return found;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test2ConnectionsLegacy() throws Exception {
|
||||||
|
final String legacySupportParam = "?trace=true&jmxPort=22";
|
||||||
|
startBroker(legacySupportParam);
|
||||||
|
|
||||||
|
final HashSet<String> loggers = new HashSet<String>();
|
||||||
|
final HashSet<String> messages = new HashSet<String>();
|
||||||
|
|
||||||
|
DefaultTestAppender appender = new DefaultTestAppender() {
|
||||||
|
@Override
|
||||||
|
public void doAppend(LoggingEvent event) {
|
||||||
|
loggers.add(event.getLoggerName());
|
||||||
|
messages.add(event.getRenderedMessage());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Logger.getRootLogger().addAppender(appender);
|
||||||
|
Logger.getRootLogger().setLevel(Level.TRACE);
|
||||||
|
|
||||||
|
String brokerUrlWithTrace = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString() +
|
||||||
|
legacySupportParam;
|
||||||
|
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrlWithTrace);
|
||||||
|
|
||||||
|
for (int i=0; i<2; i++) {
|
||||||
|
Connection c = factory.createConnection();
|
||||||
|
c.start();
|
||||||
|
connections.add(c);
|
||||||
|
}
|
||||||
|
|
||||||
|
Logger.getRootLogger().removeAppender(appender);
|
||||||
|
|
||||||
|
// logger ends with :2
|
||||||
|
assertTrue(foundMatch(loggers, ".*:2$"));
|
||||||
|
|
||||||
|
// starts with 000000x:
|
||||||
|
assertFalse(foundMatch(messages, "^0+\\d:.*"));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
for (Connection connection : connections) {
|
||||||
|
try {
|
||||||
|
connection.close();
|
||||||
|
} catch (Exception ignored) {}
|
||||||
|
}
|
||||||
|
brokerService.stop();
|
||||||
|
brokerService.waitUntilStopped();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void startBroker(String urlParam) throws Exception {
|
||||||
|
brokerService = BrokerFactory.createBroker("broker:(tcp://0.0.0.0:0" + urlParam + ")/localhost?useJmx=false&persistent=false");
|
||||||
|
brokerService.start();
|
||||||
|
brokerService.waitUntilStarted();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue