diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/TransportLoggerFactory.java b/activemq-broker/src/main/java/org/apache/activemq/transport/TransportLoggerFactory.java index 37ee00448b..599ea46ebf 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/transport/TransportLoggerFactory.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/TransportLoggerFactory.java @@ -27,6 +27,8 @@ import org.apache.activemq.util.LogWriterFinder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.activemq.TransportLoggerSupport.defaultJmxPort; + /** * Singleton class to create TransportLogger objects. * When the method getInstance() is called for the first time, @@ -61,10 +63,6 @@ public class TransportLoggerFactory { * This setting only has a meaning if */ private static boolean defaultInitialBehavior = true; - /** - * Default port to control the transport loggers through JMX - */ - private static int defaultJmxPort = 1099; private boolean transportLoggerControlCreated = false; private ManagementContext managementContext; @@ -137,7 +135,11 @@ public class TransportLoggerFactory { */ public TransportLogger createTransportLogger(Transport next, String logWriterName, boolean useJmx, boolean startLogging, int jmxport) throws IOException { - int id = getNextId(); + int id = -1; // new default to single logger + // allow old behaviour with incantation + if (!useJmx && jmxport != defaultJmxPort) { + id = getNextId(); + } return createTransportLogger(next, id, createLog(id), logWriterName, useJmx, startLogging, jmxport); } @@ -162,6 +164,9 @@ public class TransportLoggerFactory { String logWriterName, boolean dynamicManagement, boolean startLogging, int jmxport) throws IOException { try { LogWriter logWriter = logWriterFinder.newInstance(logWriterName); + if (id == -1) { + logWriter.setPrefix(String.format("%08X: ", getNextId())); + } TransportLogger tl = new TransportLogger (next, log, startLogging, logWriter); if (dynamicManagement) { synchronized (this) { @@ -183,7 +188,7 @@ public class TransportLoggerFactory { } private static Logger createLog(int id) { - return LoggerFactory.getLogger(TransportLogger.class.getName()+".Connection:" + id); + return LoggerFactory.getLogger(TransportLogger.class.getName()+".Connection" + (id > 0 ? ":"+id : "" )); } /** diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/CustomLogWriter.java b/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/CustomLogWriter.java index 65b916205f..4387cdc119 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/CustomLogWriter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/CustomLogWriter.java @@ -36,7 +36,12 @@ import org.slf4j.Logger; * */ public class CustomLogWriter implements LogWriter { - + + @Override + public void setPrefix(String prefix) { + // for the custom case, revert to the logger per connection + } + // doc comment inherited from LogWriter public void initialMessage(Logger log) { diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/DefaultLogWriter.java b/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/DefaultLogWriter.java index b8261d9a43..4de3706336 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/DefaultLogWriter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/logwriters/DefaultLogWriter.java @@ -30,6 +30,12 @@ import org.slf4j.Logger; */ public class DefaultLogWriter implements LogWriter { + String prefix = ""; + @Override + public void setPrefix(String prefix) { + this.prefix = prefix; + } + // doc comment inherited from LogWriter public void initialMessage(Logger log) { // Default log writer does nothing here @@ -37,32 +43,32 @@ public class DefaultLogWriter implements LogWriter { // doc comment inherited from LogWriter public void logRequest (Logger log, Object command) { - log.debug("SENDING REQUEST: "+command); + log.debug(prefix + "SENDING REQUEST: "+command); } // doc comment inherited from LogWriter public void logResponse (Logger log, Object response) { - log.debug("GOT RESPONSE: "+response); + log.debug(prefix + "GOT RESPONSE: "+response); } // doc comment inherited from LogWriter public void logAsyncRequest (Logger log, Object command) { - log.debug("SENDING ASNYC REQUEST: "+command); + log.debug(prefix + "SENDING ASNYC REQUEST: "+command); } // doc comment inherited from LogWriter public void logOneWay (Logger log, Object command) { - log.debug("SENDING: "+command); + log.debug(prefix + "SENDING: "+command); } // doc comment inherited from LogWriter public void logReceivedCommand (Logger log, Object command) { - log.debug("RECEIVED: " + command); + log.debug(prefix + "RECEIVED: " + command); } // doc comment inherited from LogWriter public void logReceivedException (Logger log, IOException error) { - log.debug("RECEIVED Exception: "+error, error); + log.debug(prefix + "RECEIVED Exception: "+error, error); } diff --git a/activemq-client/src/main/java/org/apache/activemq/TransportLoggerSupport.java b/activemq-client/src/main/java/org/apache/activemq/TransportLoggerSupport.java index 2a4e600696..80ad17698e 100644 --- a/activemq-client/src/main/java/org/apache/activemq/TransportLoggerSupport.java +++ b/activemq-client/src/main/java/org/apache/activemq/TransportLoggerSupport.java @@ -27,6 +27,12 @@ public class TransportLoggerSupport { public static String defaultLogWriterName = "default"; + /** + * Default port to control the transport loggers through JMX + */ + public static int defaultJmxPort = 1099; + + public static interface SPI { public Transport createTransportLogger(Transport transport) throws IOException; public Transport createTransportLogger(Transport transport, String logWriterName, boolean dynamicManagement, boolean startLogging, int jmxPort) throws IOException; diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/LogWriter.java b/activemq-client/src/main/java/org/apache/activemq/transport/LogWriter.java index afc2ca25d7..d92ccc6fb6 100644 --- a/activemq-client/src/main/java/org/apache/activemq/transport/LogWriter.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/LogWriter.java @@ -34,6 +34,13 @@ import org.slf4j.Logger; */ public interface LogWriter { + /** + * prefix each statement with this value. allows connections to be correlated + * when logger is shared + * @param prefix + */ + public void setPrefix(String prefix); + /** * Writes a header message to the log. * @param log The log to be written to. diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java index 5d623b6264..f3e225fe2e 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java @@ -112,6 +112,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements * TransportConnector URIs. */ protected boolean startLogging = true; + protected int jmxPort = TransportLoggerSupport.defaultJmxPort; protected final ServerSocketFactory serverSocketFactory; protected final BlockingQueue socketQueue = new LinkedBlockingQueue(); protected Thread socketHandlerThread; @@ -258,6 +259,14 @@ public class TcpTransportServer extends TransportServerThreadSupport implements this.dynamicManagement = useJmx; } + public void setJmxPort(int jmxPort) { + this.jmxPort = jmxPort; + } + + public int getJmxPort() { + return jmxPort; + } + public boolean isStartLogging() { return startLogging; } @@ -577,6 +586,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements options.put("logWriterName", logWriterName); options.put("dynamicManagement", Boolean.valueOf(dynamicManagement)); options.put("startLogging", Boolean.valueOf(startLogging)); + options.put("jmxPort", Integer.valueOf(jmxPort)); options.putAll(transportOptions); TransportInfo transportInfo = configureTransport(this, socket); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java index bd8152451d..74d2bb88ce 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java @@ -226,7 +226,7 @@ public class KahaDBTest extends TestCase { assertEquals("Expected to received all messages.", count, 100); broker.stop(); - Logger.getRootLogger().addAppender(appender); + Logger.getRootLogger().removeAppender(appender); assertFalse("Did not replay any records from the journal", didSomeRecovery.get()); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6446Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6446Test.java new file mode 100644 index 0000000000..ecc570fe75 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6446Test.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.DefaultTestAppender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.jms.Connection; +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class AMQ6446Test { + + private BrokerService brokerService; + LinkedList connections = new LinkedList<>(); + + @Test + public void test2Connections() throws Exception { + final String urlTraceParam = "?trace=true"; + startBroker(urlTraceParam); + final HashSet loggers = new HashSet(); + final HashSet messages = new HashSet(); + + DefaultTestAppender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + loggers.add(event.getLoggerName()); + messages.add(event.getRenderedMessage()); + } + }; + + Logger.getRootLogger().addAppender(appender); + Logger.getRootLogger().setLevel(Level.DEBUG); + + String brokerUrlWithTrace = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString() + + urlTraceParam; + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrlWithTrace); + + for (int i=0; i<2; i++) { + Connection c = factory.createConnection(); + c.start(); + connections.add(c); + } + + Logger.getRootLogger().removeAppender(appender); + + // no logger ends with :2 + assertFalse(foundMatch(loggers, ".*:2$")); + + // starts with 000000x: + assertTrue(foundMatch(messages, "^0+\\d:.*")); + } + + public boolean foundMatch(Collection values, String regex) { + boolean found = false; + Pattern p = Pattern.compile(regex); + + for (String input: values) { + Matcher m = p.matcher(input); + found = m.matches(); + if (found) { + break; + } + } + return found; + } + + @Test + public void test2ConnectionsLegacy() throws Exception { + final String legacySupportParam = "?trace=true&jmxPort=22"; + startBroker(legacySupportParam); + + final HashSet loggers = new HashSet(); + final HashSet messages = new HashSet(); + + DefaultTestAppender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + loggers.add(event.getLoggerName()); + messages.add(event.getRenderedMessage()); + } + }; + + Logger.getRootLogger().addAppender(appender); + Logger.getRootLogger().setLevel(Level.TRACE); + + String brokerUrlWithTrace = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString() + + legacySupportParam; + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrlWithTrace); + + for (int i=0; i<2; i++) { + Connection c = factory.createConnection(); + c.start(); + connections.add(c); + } + + Logger.getRootLogger().removeAppender(appender); + + // logger ends with :2 + assertTrue(foundMatch(loggers, ".*:2$")); + + // starts with 000000x: + assertFalse(foundMatch(messages, "^0+\\d:.*")); + + } + + @After + public void tearDown() throws Exception { + for (Connection connection : connections) { + try { + connection.close(); + } catch (Exception ignored) {} + } + brokerService.stop(); + brokerService.waitUntilStopped(); + } + + public void startBroker(String urlParam) throws Exception { + brokerService = BrokerFactory.createBroker("broker:(tcp://0.0.0.0:0" + urlParam + ")/localhost?useJmx=false&persistent=false"); + brokerService.start(); + brokerService.waitUntilStarted(); + } + +}