ARTEMIS-4391 tests: rework AssertionLoggerHandler

This commit is contained in:
Alexey Markevich 2023-08-04 15:08:59 +02:00 committed by clebertsuconic
parent 999789bdc5
commit 67f9c9d92d
54 changed files with 823 additions and 1120 deletions

View File

@ -94,28 +94,22 @@ public class DelegateCallbackTest {
}
@Test
public void shouldLogOnDoneForEachExceptions() {
AssertionLoggerHandler.startCapture();
try {
public void shouldLogOnDoneForEachExceptions() throws Exception {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
final CountingIOCallback countingIOCallback = new CountingIOCallback(true);
final DelegateCallback callback = new DelegateCallback(Collections.singleton(countingIOCallback));
callback.done();
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ142024"));
} finally {
AssertionLoggerHandler.stopCapture();
Assert.assertTrue(loggerHandler.findText("AMQ142024"));
}
}
@Test
public void shouldLogOnErrorForEachExceptions() {
AssertionLoggerHandler.startCapture();
try {
public void shouldLogOnErrorForEachExceptions() throws Exception {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
final CountingIOCallback countingIOCallback = new CountingIOCallback(true);
final DelegateCallback callback = new DelegateCallback(Collections.singleton(countingIOCallback));
callback.onError(0, "not a real error");
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ142025"));
} finally {
AssertionLoggerHandler.stopCapture();
Assert.assertTrue(loggerHandler.findText("AMQ142025"));
}
}

View File

@ -97,47 +97,39 @@ public class SimpleBundleTest {
}
@Test
public void longList() {
AssertionLoggerHandler.startCapture(false, true);
try {
public void longList() throws Exception {
try (AssertionLoggerHandler logHandler = new AssertionLoggerHandler()) {
SimpleBundle.MESSAGES.longParameters("1", "2", "3", "4", "5");
Assert.assertTrue("parameter not found", AssertionLoggerHandler.findText("p1"));
Assert.assertTrue("parameter not found", AssertionLoggerHandler.findText("p2"));
Assert.assertTrue("parameter not found", AssertionLoggerHandler.findText("p3"));
Assert.assertTrue("parameter not found", AssertionLoggerHandler.findText("p4"));
Assert.assertTrue("parameter not found", AssertionLoggerHandler.findText("p5"));
Assert.assertFalse("{}", AssertionLoggerHandler.findText("{}"));
} finally {
AssertionLoggerHandler.stopCapture();
Assert.assertTrue("parameter not found", logHandler.findText("p1"));
Assert.assertTrue("parameter not found", logHandler.findText("p2"));
Assert.assertTrue("parameter not found", logHandler.findText("p3"));
Assert.assertTrue("parameter not found", logHandler.findText("p4"));
Assert.assertTrue("parameter not found", logHandler.findText("p5"));
Assert.assertFalse("{}", logHandler.findText("{}"));
}
}
@Test
public void onlyException() {
try {
AssertionLoggerHandler.startCapture(false, false);
public void onlyException() throws Exception {
try (AssertionLoggerHandler logHandler = new AssertionLoggerHandler()) {
SimpleBundle.MESSAGES.onlyException(createMyExceptionBreadcrumbMethod("MSG7777"));
Assert.assertTrue(AssertionLoggerHandler.findText("TST14"));
Assert.assertFalse(AssertionLoggerHandler.findText("MSG7777"));
Assert.assertTrue(logHandler.findText("TST14"));
Assert.assertFalse(logHandler.findText("MSG7777"));
}
AssertionLoggerHandler.clear();
AssertionLoggerHandler.startCapture(false, true);
try (AssertionLoggerHandler logHandler = new AssertionLoggerHandler(true)) {
SimpleBundle.MESSAGES.onlyException(createMyExceptionBreadcrumbMethod("MSG7777"));
Assert.assertTrue(AssertionLoggerHandler.findText("TST14"));
Assert.assertTrue(AssertionLoggerHandler.findText("MSG7777"));
Assert.assertTrue(AssertionLoggerHandler.findText("createMyExceptionBreadcrumbMethod"));
} finally {
AssertionLoggerHandler.stopCapture();
Assert.assertTrue(logHandler.findText("TST14"));
Assert.assertTrue(logHandler.findTrace("MSG7777"));
Assert.assertTrue(logHandler.findTrace("createMyExceptionBreadcrumbMethod"));
}
}
// I'm doing it on a method just to assert if this method will appear on the stack trace
private MyException createMyExceptionBreadcrumbMethod(String message) {
private static MyException createMyExceptionBreadcrumbMethod(String message) {
return new MyException(message);
}

View File

@ -1219,8 +1219,8 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
@Test
public void testAddressSettingsPageLimitInvalidConfiguration1() throws Throwable {
AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);
AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler();
runAfter(() -> loggerHandler.close());
ConfigurationImpl configuration = new ConfigurationImpl();
Properties properties = new Properties();
@ -1245,14 +1245,14 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
Assert.assertEquals(null, storeImpl.getPageLimitMessages());
Assert.assertEquals(null, storeImpl.getPageLimitBytes());
Assert.assertEquals(null, storeImpl.getPageFullMessagePolicy());
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224125"));
Assert.assertTrue(loggerHandler.findText("AMQ224125"));
}
@Test
public void testAddressSettingsPageLimitInvalidConfiguration2() throws Throwable {
AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);
AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler();
runAfter(() -> loggerHandler.close());
ConfigurationImpl configuration = new ConfigurationImpl();
Properties properties = new Properties();
@ -1276,13 +1276,13 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
Assert.assertEquals(null, storeImpl.getPageLimitMessages());
Assert.assertEquals(null, storeImpl.getPageLimitBytes());
Assert.assertEquals(null, storeImpl.getPageFullMessagePolicy());
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224125"));
Assert.assertTrue(loggerHandler.findText("AMQ224125"));
}
@Test
public void testAddressSettingsPageLimitInvalidConfiguration3() throws Throwable {
AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);
AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler();
runAfter(() -> loggerHandler.close());
ConfigurationImpl configuration = new ConfigurationImpl();
Properties properties = new Properties();
@ -1306,13 +1306,13 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
Assert.assertEquals(null, storeImpl.getPageLimitMessages());
Assert.assertEquals(null, storeImpl.getPageLimitBytes());
Assert.assertEquals(null, storeImpl.getPageFullMessagePolicy());
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224125"));
Assert.assertTrue(loggerHandler.findText("AMQ224125"));
}
@Test
public void testAddressSettingsPageLimitInvalidConfiguration4() throws Throwable {
AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);
AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler();
runAfter(() -> loggerHandler.close());
ConfigurationImpl configuration = new ConfigurationImpl();
Properties properties = new Properties();
@ -1336,7 +1336,7 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
Assert.assertEquals(null, storeImpl.getPageLimitMessages());
Assert.assertEquals(null, storeImpl.getPageLimitBytes());
Assert.assertEquals(null, storeImpl.getPageFullMessagePolicy());
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224124"));
Assert.assertTrue(loggerHandler.findText("AMQ224124"));
}

View File

@ -967,16 +967,13 @@ public class FileConfigurationTest extends ConfigurationImplTest {
@Test
public void testValidateCache() throws Exception {
AssertionLoggerHandler.startCapture(true);
try {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
FileConfiguration fc = new FileConfiguration();
FileDeploymentManager deploymentManager = new FileDeploymentManager(getConfigurationName());
deploymentManager.addDeployable(fc);
deploymentManager.readConfiguration();
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224117"));
Assert.assertEquals(1, AssertionLoggerHandler.countText("AMQ224117"));
} finally {
AssertionLoggerHandler.stopCapture();
Assert.assertTrue(loggerHandler.findText("AMQ224117"));
Assert.assertEquals(1, loggerHandler.countText("AMQ224117"));
}
}

View File

@ -22,8 +22,6 @@ import java.nio.charset.StandardCharsets;
import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
@ -32,30 +30,17 @@ import org.junit.Test;
*/
public class WrongRoleFileConfigurationParserTest extends ActiveMQTestBase {
@BeforeClass
public static void prepareLogger() {
AssertionLoggerHandler.startCapture();
}
@AfterClass
public static void clearLogger() {
AssertionLoggerHandler.stopCapture();
}
/**
*
*
*
*/
@Test
public void testParsingDefaultServerConfig() throws Exception {
FileConfigurationParser parser = new FileConfigurationParser();
ByteArrayInputStream input = new ByteArrayInputStream(configuration.getBytes(StandardCharsets.UTF_8));
parser.parseMainConfig(input);
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
FileConfigurationParser parser = new FileConfigurationParser();
ByteArrayInputStream input = new ByteArrayInputStream(configuration.getBytes(StandardCharsets.UTF_8));
parser.parseMainConfig(input);
// Using the code only because I don't want a test failing just for someone editing Log text
assertTrue(AssertionLoggerHandler.findText("AMQ222177", "create-durable-queue"));
assertTrue(AssertionLoggerHandler.findText("AMQ222177", "delete-durable-queue"));
// Using the code only because I don't want a test failing just for someone editing Log text
assertTrue(loggerHandler.findText("AMQ222177", "create-durable-queue"));
assertTrue(loggerHandler.findText("AMQ222177", "delete-durable-queue"));
}
}
private static final String configuration = "<configuration xmlns=\"urn:activemq\"\n" +

View File

@ -293,10 +293,9 @@ public class FileMoveManagerTest {
@Test
public void testMoveOverPaging() throws Exception {
AssertionLoggerHandler.startCapture();
ExecutorService threadPool = Executors.newCachedThreadPool();
try {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
manager.setMaxFolders(3);
for (int i = 1; i <= 10; i++) {
HierarchicalRepository<AddressSettings> addressSettings = new HierarchicalObjectRepository<>();
@ -325,9 +324,8 @@ public class FileMoveManagerTest {
Assert.assertEquals(Math.min(i, manager.getMaxFolders()), manager.getNumberOfFolders());
}
Assert.assertFalse("The loggers are complaining about address.txt", AssertionLoggerHandler.findText("address.txt"));
Assert.assertFalse("The loggers are complaining about address.txt", loggerHandler.findText("address.txt"));
} finally {
AssertionLoggerHandler.stopCapture();
threadPool.shutdown();
}

View File

@ -45,8 +45,7 @@ public class ActiveMQServerStartupTest extends ActiveMQTestBase {
private void testTooLongToStart(CriticalAnalyzerPolicy policy) throws Exception {
AssertionLoggerHandler.startCapture();
try {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
ConfigurationImpl configuration = new ConfigurationImpl();
configuration.setCriticalAnalyzerPolicy(policy);
@ -58,11 +57,9 @@ public class ActiveMQServerStartupTest extends ActiveMQTestBase {
// this will be faking a condition
server.setState(ActiveMQServer.SERVER_STATE.STARTING);
CriticalAnalyzerAccessor.fireActions(server.getCriticalAnalyzer(), new CriticalComponentImpl(server.getCriticalAnalyzer(), 2));
Assert.assertTrue(AssertionLoggerHandler.findText("224116"));
Assert.assertTrue(loggerHandler.findText("224116"));
Assert.assertEquals(ActiveMQServer.SERVER_STATE.STARTING, server.getState()); // should not be changed
server.stop();
} finally {
AssertionLoggerHandler.stopCapture();
}
}

View File

@ -101,21 +101,6 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
}
}
private LeaseLock lock(long acquireMillis, long queryTimeoutMillis, long allowedTimeDiff) {
try {
return JdbcSharedStateManager
.createLiveLock(
UUID.randomUUID().toString(),
jdbcSharedStateManager.getJdbcConnectionProvider(),
sqlProvider,
acquireMillis,
queryTimeoutMillis,
allowedTimeDiff);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
@Before
public void createLockTable() throws Exception {
dbConf = createDefaultDatabaseStorageConfiguration();
@ -418,9 +403,7 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
}
@Test
public void validateTimeDiffsOnLeaseLock() {
AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);
public void validateTimeDiffsOnLeaseLock() throws Exception {
AtomicInteger diff = new AtomicInteger(0);
@ -435,18 +418,22 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
}
};
diff.set(10_000);
hackLock.dbCurrentTimeMillis();
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224118"));
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
diff.set(10_000);
hackLock.dbCurrentTimeMillis();
Assert.assertTrue(loggerHandler.findText("AMQ224118"));
}
diff.set(-10_000);
AssertionLoggerHandler.clear();
hackLock.dbCurrentTimeMillis();
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224118"));
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
diff.set(-10_000);
hackLock.dbCurrentTimeMillis();
Assert.assertTrue(loggerHandler.findText("AMQ224118"));
}
diff.set(0);
AssertionLoggerHandler.clear();
hackLock.dbCurrentTimeMillis();
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224118"));
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
diff.set(0);
hackLock.dbCurrentTimeMillis();
Assert.assertFalse(loggerHandler.findText("AMQ224118"));
}
}
}

View File

@ -16,76 +16,62 @@
*/
package org.apache.activemq.artemis.logs;
import java.io.Closeable;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Serializable;
import java.io.StringWriter;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.regex.Pattern;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Configurator;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
/**
* This class contains a tool where programs could intercept for LogMessage given an interval of time between {@link #startCapture()}
* and {@link #stopCapture()}
* This class contains a tool where programs could intercept for LogMessage
*
* Be careful with this use as this is intended for testing only (such as testcases)
*/
@Plugin(name = "AssertionLoggerHandler", category = "Core", elementType = "appender")
public class AssertionLoggerHandler extends AbstractAppender {
public class AssertionLoggerHandler extends AbstractAppender implements Closeable {
private static final Map<String, LogEvent> messages = new ConcurrentHashMap<>();
private static List<String> traceMessages;
private static volatile boolean capture = false;
private static volatile boolean captureStackTrace = false;
private final Deque<LogEntry> messages = new ConcurrentLinkedDeque<>();
private final boolean captureStackTrace;
public static class Builder<B extends Builder<B>> extends AbstractAppender.Builder<B> implements org.apache.logging.log4j.core.util.Builder<AssertionLoggerHandler> {
@Override
public AssertionLoggerHandler build() {
return new AssertionLoggerHandler(getName(), getFilter(), getOrCreateLayout(), isIgnoreExceptions(), getPropertyArray());
}
public AssertionLoggerHandler() {
this(false);
}
@PluginBuilderFactory
public static <B extends Builder<B>> B newBuilder() {
return new Builder<B>().asBuilder();
}
protected AssertionLoggerHandler(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions, Property[] properties) {
super(name, filter, layout, ignoreExceptions, properties);
public AssertionLoggerHandler(boolean captureStackTrace) {
super("AssertionLoggerHandler" + System.currentTimeMillis(), null, null, true, Property.EMPTY_ARRAY);
this.captureStackTrace = captureStackTrace;
org.apache.logging.log4j.core.Logger rootLogger = (org.apache.logging.log4j.core.Logger) LogManager.getRootLogger();
rootLogger.addAppender(this);
super.start();
}
@Override
public void append(LogEvent event) {
if (capture) {
String formattedMessage = event.getMessage().getFormattedMessage();
if (captureStackTrace && event.getThrown() != null) {
StringWriter stackOutput = new StringWriter();
event.getThrown().printStackTrace(new PrintWriter(stackOutput));
formattedMessage += stackOutput.toString();
}
messages.put(formattedMessage, event);
if (traceMessages != null) {
traceMessages.add(formattedMessage);
}
LogEntry logEntry = new LogEntry();
logEntry.message = event.getMessage().getFormattedMessage();
logEntry.level = event.getLevel();
if (captureStackTrace && event.getThrown() != null) {
StringWriter stackOutput = new StringWriter();
event.getThrown().printStackTrace(new PrintWriter(stackOutput));
logEntry.stackTrace = stackOutput.toString();
}
messages.addFirst(logEntry);
}
@Override
public void close() throws IOException {
org.apache.logging.log4j.core.Logger rootLogger = (org.apache.logging.log4j.core.Logger) LogManager.getRootLogger();
rootLogger.removeAppender(this);
}
/**
@ -94,11 +80,10 @@ public class AssertionLoggerHandler extends AbstractAppender {
* @param level
* @return
*/
public static boolean hasLevel(LogLevel level) {
public boolean hasLevel(LogLevel level) {
Level implLevel = level.toImplLevel();
for (LogEvent event : messages.values()) {
if (implLevel.equals(event.getLevel())) {
for (LogEntry logEntry : messages) {
if (implLevel == logEntry.level) {
return true;
}
}
@ -119,44 +104,20 @@ public class AssertionLoggerHandler extends AbstractAppender {
return LogLevel.fromImplLevel(existingLevel);
}
public static boolean findText(long mstimeout, String... text) {
long timeMax = System.currentTimeMillis() + mstimeout;
do {
if (findText(text)) {
return true;
}
}
while (timeMax > System.currentTimeMillis());
return false;
}
/**
* Find a line that contains the parameters passed as an argument
*
* @param text
* @return
*/
public static boolean findText(final String... text) {
for (Map.Entry<String, LogEvent> entry : messages.entrySet()) {
String key = entry.getKey();
public boolean findText(final String... text) {
for (LogEntry logEntry : messages) {
boolean found = true;
for (String txtCheck : text) {
found = key.contains(txtCheck);
if (!found) {
// If the main log message doesn't contain what we're looking for let's look in the message from the exception (if there is one).
Throwable throwable = entry.getValue().getThrown();
if (throwable != null && throwable.getMessage() != null) {
found = throwable.getMessage().contains(txtCheck);
if (!found) {
break;
}
} else {
break;
}
found = logEntry.message.contains(txtCheck);
if (found) {
continue;
}
}
@ -168,87 +129,44 @@ public class AssertionLoggerHandler extends AbstractAppender {
return false;
}
public static int countText(final String... text) {
int found = 0;
if (traceMessages != null) {
for (String str : traceMessages) {
for (String txtCheck : text) {
if (str.contains(txtCheck)) {
found++;
}
}
}
} else {
for (Map.Entry<String, LogEvent> entry : messages.entrySet()) {
String key = entry.getKey();
for (String txtCheck : text) {
if (key.contains(txtCheck)) {
found++;
}
}
}
}
return found;
}
public static boolean matchText(final String pattern) {
Pattern r = Pattern.compile(pattern);
for (Map.Entry<String, LogEvent> entry : messages.entrySet()) {
if (r.matcher(entry.getKey()).matches()) {
/**
* Find a stacktrace that contains the parameters passed as an argument
*
* @param trace
* @return
*/
public boolean findTrace(final String trace) {
for (LogEntry logEntry : messages) {
if (logEntry.stackTrace != null && logEntry.stackTrace.contains(trace)) {
return true;
} else {
Throwable throwable = entry.getValue().getThrown();
if (throwable != null && throwable.getMessage() != null) {
if (r.matcher(throwable.getMessage()).matches()) {
return true;
}
}
}
}
return false;
}
public static final void clear() {
messages.clear();
if (traceMessages != null) {
traceMessages.clear();
public int countText(final String... text) {
int found = 0;
for (LogEntry logEntry : messages) {
for (String txtCheck : text) {
if (logEntry.message.contains(txtCheck)) {
found++;
}
}
}
return found;
}
public static final void startCapture() {
startCapture(false);
}
public boolean matchText(final String pattern) {
Pattern r = Pattern.compile(pattern);
/**
*
* @param individualMessages enables counting individual messages.
*/
public static final void startCapture(boolean individualMessages) {
startCapture(individualMessages, captureStackTrace);
}
/**
*
* @param individualMessages enables counting individual messages.
*/
public static final void startCapture(boolean individualMessages, boolean captureStackTrace) {
clear();
if (individualMessages) {
traceMessages = new LinkedList<>();
for (LogEntry logEntry : messages) {
if (r.matcher(logEntry.message).matches()) {
return true;
}
}
capture = true;
AssertionLoggerHandler.captureStackTrace = captureStackTrace;
}
public static final void stopCapture() {
capture = false;
captureStackTrace = false;
clear();
traceMessages = null;
return false;
}
public enum LogLevel {
@ -271,24 +189,18 @@ public class AssertionLoggerHandler extends AbstractAppender {
}
private static LogLevel fromImplLevel(Level implLevel) {
if (Level.FATAL.equals(implLevel)) {
return FATAL;
} else if (Level.ERROR.equals(implLevel)) {
return ERROR;
} else if (Level.WARN.equals(implLevel)) {
return WARN;
} else if (Level.INFO.equals(implLevel)) {
return INFO;
} else if (Level.DEBUG.equals(implLevel)) {
return DEBUG;
} else if (Level.TRACE.equals(implLevel)) {
return TRACE;
} else if (Level.OFF.equals(implLevel)) {
return OFF;
} else {
throw new IllegalArgumentException("Unexpected level:" + implLevel);
for (LogLevel logLevel : LogLevel.values()) {
if (logLevel.implLevel == implLevel) {
return logLevel;
}
}
throw new IllegalArgumentException("Unexpected level:" + implLevel);
}
}
private static class LogEntry {
String message;
String stackTrace;
Level level;
}
}

View File

@ -21,7 +21,6 @@ import javax.jms.ConnectionFactory;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import javax.jms.Session;
import java.util.concurrent.atomic.AtomicBoolean;
import junit.framework.Test;
@ -48,9 +47,7 @@ public class ReconnectWithSameClientIDTest extends EmbeddedBrokerTestSupport {
}
public void testReconnectMultipleTimesWithSameClientID() throws Exception {
AssertionLoggerHandler.startCapture();
try {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
connection = connectionFactory.createConnection();
useConnection(connection);
@ -72,9 +69,7 @@ public class ReconnectWithSameClientIDTest extends EmbeddedBrokerTestSupport {
connection.close();
connection = connectionFactory.createConnection();
useConnection(connection);
Assert.assertFalse(AssertionLoggerHandler.findText("Failed to register MBean"));
} finally {
AssertionLoggerHandler.stopCapture();
Assert.assertFalse(loggerHandler.findText("Failed to register MBean"));
}
}

View File

@ -14,7 +14,7 @@
# limitations under the License.
# Log4J 2 configuration used in tests.
rootLogger = INFO, Console, Test
rootLogger = INFO, Console
logger.activemq.name=org.apache.activemq
logger.activemq.level=INFO
@ -37,9 +37,3 @@ appender.console.type=Console
appender.console.name=Console
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=[%t] %d{HH:mm:ss,SSS} %-5level [%logger] %msg%n
# Test appender org.apache.activemq.artemis.logs.AssertionLoggerHandler
appender.test.type=AssertionLoggerHandler
appender.test.name=Test
appender.test.layout.type=PatternLayout
appender.test.layout.pattern=[%t] %d{HH:mm:ss,SSS} %-5level [%logger] %msg%n

View File

@ -49,6 +49,8 @@ public class MultiThreadedAuditLoggingTest extends ActiveMQTestBase {
private static LogLevel previousLevel = null;
private static AssertionLoggerHandler loggerHandler;
@Override
@Before
public void setUp() throws Exception {
@ -69,13 +71,13 @@ public class MultiThreadedAuditLoggingTest extends ActiveMQTestBase {
@BeforeClass
public static void prepareLogger() {
previousLevel = AssertionLoggerHandler.setLevel(MESSAGE_AUDIT_LOGGER_NAME, LogLevel.INFO);
AssertionLoggerHandler.startCapture();
loggerHandler = new AssertionLoggerHandler();
}
@AfterClass
public static void clearLogger() {
public static void clearLogger() throws Exception {
try {
AssertionLoggerHandler.stopCapture();
loggerHandler.close();
} finally {
AssertionLoggerHandler.setLevel(MESSAGE_AUDIT_LOGGER_NAME, previousLevel);
}
@ -232,10 +234,10 @@ public class MultiThreadedAuditLoggingTest extends ActiveMQTestBase {
Assert.assertFalse(producer.failed);
}
assertFalse(AssertionLoggerHandler.matchText(".*User queue1\\(queue1\\).* is consuming a message from queue2.*"));
assertFalse(AssertionLoggerHandler.matchText(".*User queue2\\(queue2\\).* is consuming a message from queue1.*"));
assertTrue(AssertionLoggerHandler.matchText(".*User queue2\\(queue2\\).* is consuming a message from queue2.*"));
assertTrue(AssertionLoggerHandler.matchText(".*User queue1\\(queue1\\).* is consuming a message from queue1.*"));
assertFalse(loggerHandler.matchText(".*User queue1\\(queue1\\).* is consuming a message from queue2.*"));
assertFalse(loggerHandler.matchText(".*User queue2\\(queue2\\).* is consuming a message from queue1.*"));
assertTrue(loggerHandler.matchText(".*User queue2\\(queue2\\).* is consuming a message from queue2.*"));
assertTrue(loggerHandler.matchText(".*User queue1\\(queue1\\).* is consuming a message from queue1.*"));
}
}
}

View File

@ -1105,11 +1105,10 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
server.stop();
AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);
server.start();
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ221019"));
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
server.start();
Assert.assertTrue(loggerHandler.findText("AMQ221019"));
}
validateNoFilesOnLargeDir();
runAfter(server::stop);
@ -1134,12 +1133,13 @@ public class AmqpLargeMessageTest extends AmqpClientTestSupport {
server.stop();
AssertionLoggerHandler.startCapture();
server.start();
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
server.start();
// These two should not happen as the consumer will receive them
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ221019")); // unferenced record
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ221018")); // unferenced large message
// These two should not happen as the consumer will receive them
Assert.assertFalse(loggerHandler.findText("AMQ221019")); // unferenced record
Assert.assertFalse(loggerHandler.findText("AMQ221018")); // unferenced large message
}
connection = addConnection(client.connect());
try {

View File

@ -139,8 +139,8 @@ public class AMQPRedistributeClusterTest extends AmqpTestSupport {
}
public void internalQueueRedistribution(String protocol) throws Exception {
AssertionLoggerHandler.startCapture();
runAfter((AssertionLoggerHandler::stopCapture));
AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler();
runAfter(() -> loggerHandler.close());
ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
@ -184,8 +184,8 @@ public class AMQPRedistributeClusterTest extends AmqpTestSupport {
assertEmptyQueue(b2.locateQueue(QUEUE_NAME));
// if you see this message, most likely the notifications are being copied to the mirror
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
Assert.assertFalse(loggerHandler.findText("AMQ222196"));
Assert.assertFalse(loggerHandler.findText("AMQ224037"));
}
@Test
@ -200,8 +200,8 @@ public class AMQPRedistributeClusterTest extends AmqpTestSupport {
public void internalTopicRedistribution(String protocol) throws Exception {
AssertionLoggerHandler.startCapture();
runAfter((AssertionLoggerHandler::stopCapture));
AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler();
runAfter(() -> loggerHandler.close());
final int numMessages = 100;
@ -276,8 +276,8 @@ public class AMQPRedistributeClusterTest extends AmqpTestSupport {
}
}
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
Assert.assertFalse(loggerHandler.findText("AMQ222196"));
Assert.assertFalse(loggerHandler.findText("AMQ224037"));
Assert.assertEquals(0, a1TopicSubscription.getConsumerCount());
Wait.assertEquals(numMessages / 2, a1TopicSubscription::getMessageCount);
@ -315,8 +315,8 @@ public class AMQPRedistributeClusterTest extends AmqpTestSupport {
}
// if you see this message, most likely the notifications are being copied to the mirror
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
Assert.assertFalse(loggerHandler.findText("AMQ222196"));
Assert.assertFalse(loggerHandler.findText("AMQ224037"));
assertEmptyQueue(a1TopicSubscription);
assertEmptyQueue(a2TopicSubscription);

View File

@ -78,18 +78,19 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
ActiveMQServer server_2;
private AssertionLoggerHandler loggerHandler;
@Before
public void startLogging() {
AssertionLoggerHandler.startCapture();
loggerHandler = new AssertionLoggerHandler();
}
@After
public void stopLogging() {
public void stopLogging() throws Exception {
try {
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214"));
Assert.assertFalse(loggerHandler.findText("AMQ222214"));
} finally {
AssertionLoggerHandler.stopCapture();
loggerHandler.close();
}
}
@ -155,22 +156,21 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
Assert.assertNotNull(mirrorQueue);
AssertionLoggerHandler.startCapture();
runAfter(() -> AssertionLoggerHandler.stopCapture());
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
// Adding some PostAck event that will never be found on the target for an expiry
org.apache.activemq.artemis.api.core.Message message = AMQPMirrorMessageFactory.createMessage(mirrorQueue.getAddress().toString(), SimpleString.toSimpleString("sometest"), SimpleString.toSimpleString("sometest"), AMQPMirrorControllerSource.POST_ACK, "0000", 3333L, AckReason.EXPIRED).setDurable(true);
message.setMessageID(server_2.getStorageManager().generateID());
server_2.getPostOffice().route(message, false);
// Adding some PostAck event that will never be found on the target for an expiry
org.apache.activemq.artemis.api.core.Message message = AMQPMirrorMessageFactory.createMessage(mirrorQueue.getAddress().toString(), SimpleString.toSimpleString("sometest"), SimpleString.toSimpleString("sometest"), AMQPMirrorControllerSource.POST_ACK, "0000", 3333L, AckReason.EXPIRED).setDurable(true);
message.setMessageID(server_2.getStorageManager().generateID());
server_2.getPostOffice().route(message, false);
// Adding some PostAck event that will never be found on the target for a regular ack
message = AMQPMirrorMessageFactory.createMessage(mirrorQueue.getAddress().toString(), SimpleString.toSimpleString("sometest"), SimpleString.toSimpleString("sometest"), AMQPMirrorControllerSource.POST_ACK, "0000", 3334L, AckReason.NORMAL).setDurable(true);
message.setMessageID(server_2.getStorageManager().generateID());
server_2.getPostOffice().route(message, false);
// Adding some PostAck event that will never be found on the target for a regular ack
message = AMQPMirrorMessageFactory.createMessage(mirrorQueue.getAddress().toString(), SimpleString.toSimpleString("sometest"), SimpleString.toSimpleString("sometest"), AMQPMirrorControllerSource.POST_ACK, "0000", 3334L, AckReason.NORMAL).setDurable(true);
message.setMessageID(server_2.getStorageManager().generateID());
server_2.getPostOffice().route(message, false);
Wait.assertEquals(0L, mirrorQueue::getMessageCount, 2000, 100);
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224041"));
AssertionLoggerHandler.stopCapture();
Wait.assertEquals(0L, mirrorQueue::getMessageCount, 2000, 100);
Assert.assertFalse(loggerHandler.findText("AMQ224041"));
}
server_2.stop();
server.stop();
@ -424,7 +424,7 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
server_2.addAddressInfo(new AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
server_2.createQueue(new QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214"));
Assert.assertFalse(loggerHandler.findText("AMQ222214"));
// Get the Queue View early to avoid racing the delivery.
final Queue queueView = locateQueue(server_2, getQueueName());
@ -494,7 +494,7 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
server_2.addAddressInfo(new AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
server_2.createQueue(new QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214"));
Assert.assertFalse(loggerHandler.findText("AMQ222214"));
{ // sender
AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + AMQP_PORT_2), null, null);
@ -724,7 +724,7 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
queueOnServer2.getPagingStore().startPaging();
}
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214"));
Assert.assertFalse(loggerHandler.findText("AMQ222214"));
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
@ -733,7 +733,7 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
producer.send(message);
}
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214"));
Assert.assertFalse(loggerHandler.findText("AMQ222214"));
Queue queueOnServer1;
if (deferredStart) {

View File

@ -67,6 +67,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport {
protected static final int AMQP_PORT_2 = 5673;
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
ActiveMQServer server_2;
private AssertionLoggerHandler loggerHandler;
@After
public void stopServer1() throws Exception {
@ -84,15 +85,16 @@ public class BrokerInSyncTest extends AmqpClientTestSupport {
@Before
public void startLogging() {
AssertionLoggerHandler.startCapture();
loggerHandler = new AssertionLoggerHandler();
}
@After
public void stopLogging() {
public void stopLogging() throws Exception {
try {
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214"));
Assert.assertFalse(loggerHandler.findText("AMQ222214"));
} finally {
AssertionLoggerHandler.stopCapture();
loggerHandler.close();
}
}
@ -467,8 +469,8 @@ public class BrokerInSyncTest extends AmqpClientTestSupport {
@Test
public void testLVQ() throws Exception {
AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);
AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler();
runAfter(() -> loggerHandler.close());
server.setIdentity("Server1");
{
@ -541,7 +543,7 @@ public class BrokerInSyncTest extends AmqpClientTestSupport {
server_2.stop();
server.stop();
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222153"));
Assert.assertFalse(loggerHandler.findText("AMQ222153"));
}

View File

@ -91,22 +91,18 @@ public class ValidateAMQPErrorsTest extends AmqpClientTestSupport {
@After
public void stop() throws Exception {
try {
if (mockServer != null) {
mockServer.close();
mockServer = null;
if (mockServer != null) {
mockServer.close();
mockServer = null;
}
if (vertx != null) {
try {
CountDownLatch latch = new CountDownLatch(1);
vertx.close((x) -> latch.countDown());
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
} finally {
vertx = null;
}
if (vertx != null) {
try {
CountDownLatch latch = new CountDownLatch(1);
vertx.close((x) -> latch.countDown());
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
} finally {
vertx = null;
}
}
} finally {
AssertionLoggerHandler.stopCapture(); // Just in case startCapture was called in any of the tests here
}
}
@ -121,8 +117,7 @@ public class ValidateAMQPErrorsTest extends AmqpClientTestSupport {
*/
@Test
public void testConnectItself() throws Exception {
try {
AssertionLoggerHandler.startCapture();
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(10).setRetryInterval(1);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
@ -132,20 +127,19 @@ public class ValidateAMQPErrorsTest extends AmqpClientTestSupport {
Assert.assertEquals(1, server.getBrokerConnections().size());
server.getBrokerConnections().forEach((t) -> Wait.assertFalse(t::isStarted));
Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ111001")); // max retry
AssertionLoggerHandler.clear();
Wait.assertTrue(() -> loggerHandler.findText("AMQ111001")); // max retry
}
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
Thread.sleep(100);
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ111002")); // there shouldn't be a retry after the last failure
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ111003")); // there shouldn't be a retry after the last failure
} finally {
AssertionLoggerHandler.stopCapture();
Assert.assertFalse(loggerHandler.findText("AMQ111002")); // there shouldn't be a retry after the last failure
Assert.assertFalse(loggerHandler.findText("AMQ111003")); // there shouldn't be a retry after the last failure
}
}
@Test
public void testCloseLinkOnMirror() throws Exception {
try {
AssertionLoggerHandler.startCapture();
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
ActiveMQServer server2 = createServer(AMQP_PORT_2, false);
@ -155,7 +149,7 @@ public class ValidateAMQPErrorsTest extends AmqpClientTestSupport {
server.start();
Assert.assertEquals(1, server.getBrokerConnections().size());
Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ111002"));
Wait.assertTrue(() -> loggerHandler.findText("AMQ111002"));
server.getBrokerConnections().forEach((t) -> Wait.assertTrue(() -> ((AMQPBrokerConnection) t).isConnecting()));
server2.start();
@ -204,8 +198,6 @@ public class ValidateAMQPErrorsTest extends AmqpClientTestSupport {
}
}
} finally {
AssertionLoggerHandler.stopCapture();
}
}
@ -222,63 +214,64 @@ public class ValidateAMQPErrorsTest extends AmqpClientTestSupport {
public void testCloseLink(boolean isSender) throws Exception {
AtomicInteger errors = new AtomicInteger(0);
AssertionLoggerHandler.startCapture(true);
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
ActiveMQServer server2 = createServer(AMQP_PORT_2, false);
ActiveMQServer server2 = createServer(AMQP_PORT_2, false);
if (isSender) {
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(-1).setRetryInterval(10);
amqpConnection.addElement(new AMQPBrokerConnectionElement().setMatchAddress(getQueueName()).setType(AMQPBrokerConnectionAddressType.SENDER));
server.getConfiguration().addAMQPConnection(amqpConnection);
} else {
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(10);
amqpConnection.addElement(new AMQPBrokerConnectionElement().setMatchAddress(getQueueName()).setType(AMQPBrokerConnectionAddressType.RECEIVER));
server2.getConfiguration().addAMQPConnection(amqpConnection);
}
if (isSender) {
server.start();
Assert.assertEquals(1, server.getBrokerConnections().size());
} else {
server2.start();
Assert.assertEquals(1, server2.getBrokerConnections().size());
}
Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ111002"));
server.getBrokerConnections().forEach((t) -> Wait.assertTrue(() -> ((AMQPBrokerConnection) t).isConnecting()));
if (isSender) {
server2.start();
} else {
server.start();
}
server.getBrokerConnections().forEach((t) -> Wait.assertFalse(() -> ((AMQPBrokerConnection) t).isConnecting()));
createAddressAndQueues(server);
createAddressAndQueues(server2);
Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
Wait.assertTrue(() -> server2.locateQueue(getQueueName()) != null);
ActiveMQServer serverReceivingConnections = isSender ? server2 : server;
Wait.assertEquals(1, serverReceivingConnections.getRemotingService()::getConnectionCount);
serverReceivingConnections.getRemotingService().getConnections().forEach((t) -> {
try {
ActiveMQProtonRemotingConnection connection = (ActiveMQProtonRemotingConnection) t;
ConnectionImpl protonConnection = (ConnectionImpl) connection.getAmqpConnection().getHandler().getConnection();
Wait.waitFor(() -> protonConnection.linkHead(of(ACTIVE), of(ACTIVE)) != null);
connection.getAmqpConnection().runNow(() -> {
Link theLink = protonConnection.linkHead(of(ACTIVE), of(ACTIVE));
theLink.close();
connection.flush();
});
} catch (Exception e) {
errors.incrementAndGet();
e.printStackTrace();
if (isSender) {
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(-1).setRetryInterval(10);
amqpConnection.addElement(new AMQPBrokerConnectionElement().setMatchAddress(getQueueName()).setType(AMQPBrokerConnectionAddressType.SENDER));
server.getConfiguration().addAMQPConnection(amqpConnection);
} else {
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(10);
amqpConnection.addElement(new AMQPBrokerConnectionElement().setMatchAddress(getQueueName()).setType(AMQPBrokerConnectionAddressType.RECEIVER));
server2.getConfiguration().addAMQPConnection(amqpConnection);
}
});
Wait.assertEquals(1, () -> AssertionLoggerHandler.countText("AMQ119021"));
if (isSender) {
server.start();
Assert.assertEquals(1, server.getBrokerConnections().size());
} else {
server2.start();
Assert.assertEquals(1, server2.getBrokerConnections().size());
}
Wait.assertTrue(() -> loggerHandler.findText("AMQ111002"));
server.getBrokerConnections().forEach((t) -> Wait.assertTrue(() -> ((AMQPBrokerConnection) t).isConnecting()));
if (isSender) {
server2.start();
} else {
server.start();
}
server.getBrokerConnections().forEach((t) -> Wait.assertFalse(() -> ((AMQPBrokerConnection) t).isConnecting()));
createAddressAndQueues(server);
createAddressAndQueues(server2);
Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
Wait.assertTrue(() -> server2.locateQueue(getQueueName()) != null);
ActiveMQServer serverReceivingConnections = isSender ? server2 : server;
Wait.assertEquals(1, serverReceivingConnections.getRemotingService()::getConnectionCount);
serverReceivingConnections.getRemotingService().getConnections().forEach((t) -> {
try {
ActiveMQProtonRemotingConnection connection = (ActiveMQProtonRemotingConnection) t;
ConnectionImpl protonConnection = (ConnectionImpl) connection.getAmqpConnection().getHandler().getConnection();
Wait.waitFor(() -> protonConnection.linkHead(of(ACTIVE), of(ACTIVE)) != null);
connection.getAmqpConnection().runNow(() -> {
Link theLink = protonConnection.linkHead(of(ACTIVE), of(ACTIVE));
theLink.close();
connection.flush();
});
} catch (Exception e) {
errors.incrementAndGet();
// e.printStackTrace();
}
});
Wait.assertEquals(1, () -> loggerHandler.countText("AMQ119021"));
}
ConnectionFactory cf1 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
ConnectionFactory cf2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2);
@ -329,8 +322,7 @@ public class ValidateAMQPErrorsTest extends AmqpClientTestSupport {
});
});
try {
AssertionLoggerHandler.startCapture(true);
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + mockServer.actualPort() + "?connect-timeout-millis=20").setReconnectAttempts(5).setRetryInterval(10);
amqpConnection.addElement(new AMQPBrokerConnectionElement().setMatchAddress(getQueueName()).setType(AMQPBrokerConnectionAddressType.SENDER));
@ -338,8 +330,8 @@ public class ValidateAMQPErrorsTest extends AmqpClientTestSupport {
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ111001"));
Wait.assertEquals(6, () -> AssertionLoggerHandler.countText("AMQ119020")); // 0..5 == 6
Wait.assertTrue(() -> loggerHandler.findText("AMQ111001"));
Wait.assertEquals(6, () -> loggerHandler.countText("AMQ119020")); // 0..5 == 6
} finally {
mockServer.close();
@ -351,97 +343,98 @@ public class ValidateAMQPErrorsTest extends AmqpClientTestSupport {
startVerx();
AssertionLoggerHandler.startCapture(true);
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
ProtonServerOptions serverOptions = new ProtonServerOptions();
ProtonServerOptions serverOptions = new ProtonServerOptions();
AtomicInteger countOpen = new AtomicInteger(0);
CyclicBarrier startFlag = new CyclicBarrier(2);
CountDownLatch blockBeforeOpen = new CountDownLatch(1);
AtomicInteger disconnects = new AtomicInteger(0);
AtomicInteger messagesReceived = new AtomicInteger(0);
AtomicInteger errors = new AtomicInteger(0);
AtomicInteger countOpen = new AtomicInteger(0);
CyclicBarrier startFlag = new CyclicBarrier(2);
CountDownLatch blockBeforeOpen = new CountDownLatch(1);
AtomicInteger disconnects = new AtomicInteger(0);
AtomicInteger messagesReceived = new AtomicInteger(0);
AtomicInteger errors = new AtomicInteger(0);
ConcurrentHashSet<ProtonConnection> connections = new ConcurrentHashSet<>();
ConcurrentHashSet<ProtonConnection> connections = new ConcurrentHashSet<>();
mockServer = new MockServer(vertx, serverOptions, null, serverConnection -> {
serverConnection.disconnectHandler(c -> {
disconnects.incrementAndGet(); // number of retries
connections.remove(c);
});
serverConnection.openHandler(serverSender -> {
serverConnection.closeHandler(x -> {
serverConnection.close();
connections.remove(serverConnection);
mockServer = new MockServer(vertx, serverOptions, null, serverConnection -> {
serverConnection.disconnectHandler(c -> {
disconnects.incrementAndGet(); // number of retries
connections.remove(c);
});
serverConnection.open();
connections.add(serverConnection);
});
serverConnection.sessionOpenHandler((s) -> {
s.open();
});
serverConnection.senderOpenHandler((x) -> {
x.open();
});
serverConnection.receiverOpenHandler((x) -> {
if (countOpen.incrementAndGet() > 2) {
if (countOpen.get() == 3) {
try {
startFlag.await(10, TimeUnit.SECONDS);
blockBeforeOpen.await(10, TimeUnit.SECONDS);
return;
} catch (Throwable ignored) {
}
}
HashMap<Symbol, Object> brokerIDProperties = new HashMap<>();
brokerIDProperties.put(AMQPMirrorControllerSource.BROKER_ID, "fake-id");
x.setProperties(brokerIDProperties);
x.setOfferedCapabilities(new Symbol[]{AMQPMirrorControllerSource.MIRROR_CAPABILITY});
x.setTarget(x.getRemoteTarget());
serverConnection.openHandler(serverSender -> {
serverConnection.closeHandler(x -> {
serverConnection.close();
connections.remove(serverConnection);
});
serverConnection.open();
connections.add(serverConnection);
});
serverConnection.sessionOpenHandler((s) -> {
s.open();
});
serverConnection.senderOpenHandler((x) -> {
x.open();
x.handler((del, msg) -> {
if (msg.getApplicationProperties() != null) {
Map map = msg.getApplicationProperties().getValue();
Object value = map.get("sender");
if (value != null) {
if (messagesReceived.get() != ((Integer) value).intValue()) {
logger.warn("Message out of order. Expected {} but received {}", messagesReceived.get(), value);
errors.incrementAndGet();
}
messagesReceived.incrementAndGet();
});
serverConnection.receiverOpenHandler((x) -> {
if (countOpen.incrementAndGet() > 2) {
if (countOpen.get() == 3) {
try {
startFlag.await(10, TimeUnit.SECONDS);
blockBeforeOpen.await(10, TimeUnit.SECONDS);
return;
} catch (Throwable ignored) {
}
}
});
}
HashMap<Symbol, Object> brokerIDProperties = new HashMap<>();
brokerIDProperties.put(AMQPMirrorControllerSource.BROKER_ID, "fake-id");
x.setProperties(brokerIDProperties);
x.setOfferedCapabilities(new Symbol[]{AMQPMirrorControllerSource.MIRROR_CAPABILITY});
x.setTarget(x.getRemoteTarget());
x.open();
x.handler((del, msg) -> {
if (msg.getApplicationProperties() != null) {
Map map = msg.getApplicationProperties().getValue();
Object value = map.get("sender");
if (value != null) {
if (messagesReceived.get() != ((Integer) value).intValue()) {
logger.warn("Message out of order. Expected {} but received {}", messagesReceived.get(), value);
errors.incrementAndGet();
}
messagesReceived.incrementAndGet();
}
}
});
}
});
});
});
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + mockServer.actualPort() + "?connect-timeout-millis=1000").setReconnectAttempts(10).setRetryInterval(10);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + mockServer.actualPort() + "?connect-timeout-millis=1000").setReconnectAttempts(10).setRetryInterval(10);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
startFlag.await(10, TimeUnit.SECONDS);
blockBeforeOpen.countDown();
startFlag.await(10, TimeUnit.SECONDS);
blockBeforeOpen.countDown();
Wait.assertEquals(2, disconnects::intValue);
Wait.assertEquals(1, connections::size);
Wait.assertEquals(2, disconnects::intValue);
Wait.assertEquals(1, connections::size);
Wait.assertEquals(3, () -> AssertionLoggerHandler.countText("AMQ119020"));
Wait.assertEquals(3, () -> loggerHandler.countText("AMQ119020"));
ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(getQueueName()));
for (int i = 0; i < 100; i++) {
TextMessage message = session.createTextMessage("hello");
message.setIntProperty("sender", i);
producer.send(message);
ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(getQueueName()));
for (int i = 0; i < 100; i++) {
TextMessage message = session.createTextMessage("hello");
message.setIntProperty("sender", i);
producer.send(message);
}
}
}
Wait.assertEquals(100, messagesReceived::intValue, 5000);
Assert.assertEquals(0, errors.get(), 5000);
Wait.assertEquals(100, messagesReceived::intValue, 5000);
Assert.assertEquals(0, errors.get(), 5000);
}
}
@Test
@ -464,15 +457,16 @@ public class ValidateAMQPErrorsTest extends AmqpClientTestSupport {
});
});
AssertionLoggerHandler.startCapture(true);
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + mockServer.actualPort() + "?connect-timeout-millis=100").setReconnectAttempts(5).setRetryInterval(10);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + mockServer.actualPort() + "?connect-timeout-millis=100").setReconnectAttempts(5).setRetryInterval(10);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ111001"));
Assert.assertEquals(6, AssertionLoggerHandler.countText("AMQ119018")); // 0..5 = 6
Wait.assertTrue(() -> loggerHandler.findText("AMQ111001"));
Assert.assertEquals(6, loggerHandler.countText("AMQ119018")); // 0..5 = 6
}
}
/**
@ -599,53 +593,54 @@ public class ValidateAMQPErrorsTest extends AmqpClientTestSupport {
@Test
public void testNoClientDesiredMirrorCapability() throws Exception {
AssertionLoggerHandler.startCapture();
server.start();
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
server.start();
AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + AMQP_PORT), null, null);
client.setValidator(new AmqpValidator() {
AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + AMQP_PORT), null, null);
client.setValidator(new AmqpValidator() {
@Override
public void inspectOpenedResource(Sender sender) {
ErrorCondition condition = sender.getRemoteCondition();
@Override
public void inspectOpenedResource(Sender sender) {
ErrorCondition condition = sender.getRemoteCondition();
if (condition != null && condition.getCondition() != null) {
if (!condition.getCondition().equals(AmqpError.ILLEGAL_STATE)) {
markAsInvalid("Should have been closed with an illegal state error, but error was: " + condition);
if (condition != null && condition.getCondition() != null) {
if (!condition.getCondition().equals(AmqpError.ILLEGAL_STATE)) {
markAsInvalid("Should have been closed with an illegal state error, but error was: " + condition);
}
if (!condition.getDescription().contains("AMQ119024")) {
markAsInvalid("should have indicated the error code about missing a desired capability");
}
if (!condition.getDescription().contains(AMQPMirrorControllerSource.MIRROR_CAPABILITY)) {
markAsInvalid("should have indicated the error code about missing a desired capability");
}
} else {
markAsInvalid("Sender should have been detached with an error");
}
if (!condition.getDescription().contains("AMQ119024")) {
markAsInvalid("should have indicated the error code about missing a desired capability");
}
if (!condition.getDescription().contains(AMQPMirrorControllerSource.MIRROR_CAPABILITY)) {
markAsInvalid("should have indicated the error code about missing a desired capability");
}
} else {
markAsInvalid("Sender should have been detached with an error");
}
}
});
});
String address = ProtonProtocolManager.getMirrorAddress(getTestName());
AmqpConnection connection = client.connect();
try {
AmqpSession session = connection.createSession();
String address = ProtonProtocolManager.getMirrorAddress(getTestName());
AmqpConnection connection = client.connect();
try {
session.createSender(address);
fail("Link should have been refused.");
} catch (Exception ex) {
Assert.assertTrue(ex.getMessage().contains("AMQ119024"));
logger.debug("Caught expected exception");
AmqpSession session = connection.createSession();
try {
session.createSender(address);
fail("Link should have been refused.");
} catch (Exception ex) {
Assert.assertTrue(ex.getMessage().contains("AMQ119024"));
logger.debug("Caught expected exception");
}
connection.getStateInspector().assertValid();
} finally {
connection.close();
}
connection.getStateInspector().assertValid();
} finally {
connection.close();
Wait.assertTrue(() -> loggerHandler.findText("AMQ119024"));
}
Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ119024"));
}
}

View File

@ -86,8 +86,8 @@ public class BridgeRetryFullFailureTest extends ActiveMQTestBase {
int NUMBER_OF_MESSAGES = 1000;
AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);
AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler();
runAfter(() -> loggerHandler.close());
try (Connection connection = factory0.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -100,7 +100,7 @@ public class BridgeRetryFullFailureTest extends ActiveMQTestBase {
}
}
Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ229102"));
Wait.assertTrue(() -> loggerHandler.findText("AMQ229102"));
// the reconnects and failure may introduce out of order issues. so we just check if they were all received
HashSet<Integer> receivedIntegers = new HashSet<>();

View File

@ -45,7 +45,6 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -61,11 +60,6 @@ public class AutoCreateTest extends ActiveMQTestBase {
private ActiveMQServer server;
@After
public void clearLogg() {
AssertionLoggerHandler.stopCapture();
}
@Override
@Before
public void setUp() throws Exception {
@ -139,7 +133,6 @@ public class AutoCreateTest extends ActiveMQTestBase {
@Test
public void testSweep() throws Exception {
AssertionLoggerHandler.startCapture();
server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling scanner, we will perform it manually
server.start();
String QUEUE_NAME = "autoCreateAndRecreate";
@ -168,18 +161,19 @@ public class AutoCreateTest extends ActiveMQTestBase {
Assert.assertNotNull(info);
Assert.assertTrue(info.isAutoCreated());
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224112"));
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224112"));
Assert.assertTrue("Queue name should be mentioned on logs", AssertionLoggerHandler.findText(QUEUE_NAME));
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224113")); // we need another sweep to remove it
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertFalse(loggerHandler.findText("AMQ224112"));
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertTrue(loggerHandler.findText("AMQ224112"));
Assert.assertTrue("Queue name should be mentioned on logs", loggerHandler.findText(QUEUE_NAME));
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertTrue(loggerHandler.findText("AMQ224113")); // we need another sweep to remove it
}
}
@Test
public void testSweepAddress() throws Exception {
AssertionLoggerHandler.startCapture();
server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling scanner, we will perform it manually
AddressSettings settings = new AddressSettings().setAutoDeleteQueues(true).setAutoDeleteAddresses(true).setAutoDeleteAddressesDelay(10).setAutoDeleteQueuesDelay(10);
server.getConfiguration().getAddressSettings().clear();
@ -203,20 +197,21 @@ public class AutoCreateTest extends ActiveMQTestBase {
Wait.assertTrue(() -> infoRef.getBindingRemovedTimestamp() != -1);
}
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
Thread.sleep(50);
Assert.assertFalse(info.isSwept());
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
Assert.assertTrue(info.isSwept());
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224113"));
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
Assert.assertFalse(loggerHandler.findText("AMQ224113"));
Thread.sleep(50);
Assert.assertFalse(info.isSwept());
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertFalse(loggerHandler.findText("AMQ224113"));
Assert.assertTrue(info.isSwept());
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertTrue(loggerHandler.findText("AMQ224113"));
}
}
@Test
public void testNegativeSweepAddress() throws Exception {
AssertionLoggerHandler.startCapture();
server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling scanner, we will perform it manually
AddressSettings settings = new AddressSettings().setAutoDeleteQueues(true).setAutoDeleteAddresses(true).setAutoDeleteAddressesDelay(10).setAutoDeleteQueuesDelay(10);
server.getConfiguration().getAddressSettings().clear();
@ -240,25 +235,26 @@ public class AutoCreateTest extends ActiveMQTestBase {
Wait.assertTrue(() -> infoRef.getBindingRemovedTimestamp() != -1);
}
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
Thread.sleep(50);
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
Assert.assertTrue(info.isSwept());
try (Connection connection = cf.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(ADDRESS_NAME);
session.createConsumer(topic);
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
Assert.assertFalse(loggerHandler.findText("AMQ224113"));
Thread.sleep(50);
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113"));
Assert.assertFalse(info.isSwept()); // it should be cleared because there is a consumer now
Assert.assertFalse(loggerHandler.findText("AMQ224113"));
Assert.assertTrue(info.isSwept());
try (Connection connection = cf.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(ADDRESS_NAME);
session.createConsumer(topic);
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertFalse(loggerHandler.findText("AMQ224113"));
Assert.assertFalse(info.isSwept()); // it should be cleared because there is a consumer now
}
}
}
@Test
public void testNegativeSweepBecauseOfConsumer() throws Exception {
AssertionLoggerHandler.startCapture();
server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling scanner, we will perform it manually
server.start();
String QUEUE_NAME = getName();
@ -275,27 +271,26 @@ public class AutoCreateTest extends ActiveMQTestBase {
Assert.assertNotNull(info);
Assert.assertTrue(info.isAutoCreated());
try (Connection connection = cf.createConnection()) {
try (Connection connection = cf.createConnection();
AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(QUEUE_NAME);
Assert.assertTrue(serverQueue.isSwept());
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224112"));
Assert.assertFalse(loggerHandler.findText("AMQ224112"));
MessageConsumer consumer = session.createConsumer(queue);
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertFalse(serverQueue.isSwept());
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113")); // we need another sweep to remove it
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224112"));
Assert.assertFalse(loggerHandler.findText("AMQ224113")); // we need another sweep to remove it
Assert.assertFalse(loggerHandler.findText("AMQ224112"));
}
}
@Test
public void testNegativeSweepBecauseOfSend() throws Exception {
AssertionLoggerHandler.startCapture();
server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling scanner, we will perform it manually
server.start();
String QUEUE_NAME = getName();
@ -313,21 +308,22 @@ public class AutoCreateTest extends ActiveMQTestBase {
Assert.assertTrue(info.isAutoCreated());
try (Connection connection = cf.createConnection()) {
try (Connection connection = cf.createConnection();
AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(QUEUE_NAME);
Assert.assertTrue(serverQueue.isSwept());
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224112"));
Assert.assertFalse(loggerHandler.findText("AMQ224112"));
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("hello"));
Wait.assertEquals(1, serverQueue::getMessageCount);
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertFalse(serverQueue.isSwept());
PostOfficeTestAccessor.reapAddresses((PostOfficeImpl) server.getPostOffice());
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113")); // we need another sweep to remove it
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224112"));
Assert.assertFalse(loggerHandler.findText("AMQ224113")); // we need another sweep to remove it
Assert.assertFalse(loggerHandler.findText("AMQ224112"));
}
}
@ -355,7 +351,6 @@ public class AutoCreateTest extends ActiveMQTestBase {
server.getAddressSettingsRepository().addMatch(getName(), new AddressSettings().setAutoCreateAddresses(true).setAutoDeleteAddressesDelay(TimeUnit.DAYS.toMillis(1)).setAutoDeleteQueuesDelay(TimeUnit.DAYS.toMillis(1)));
}
AssertionLoggerHandler.startCapture();
server.getConfiguration().setAddressQueueScanPeriod(-1); // disabling scanner, we will perform it manually
server.start();
String QUEUE_NAME = getName();
@ -373,12 +368,13 @@ public class AutoCreateTest extends ActiveMQTestBase {
Assert.assertTrue(info.isAutoCreated());
server.stop();
server.start();
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224113"));
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224112"));
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
server.start();
AssertionLoggerHandler.clear();
Assert.assertTrue(loggerHandler.findText("AMQ224113"));
Assert.assertTrue(loggerHandler.findText("AMQ224112"));
}
String randomString = "random " + RandomUtil.randomString();
@ -394,10 +390,12 @@ public class AutoCreateTest extends ActiveMQTestBase {
Assert.assertTrue(info.isAutoCreated());
server.stop();
server.start();
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
server.start();
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224113")); // this time around the queue had messages, it has to exist
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224112"));
Assert.assertFalse(loggerHandler.findText("AMQ224113")); // this time around the queue had messages, it has to exist
Assert.assertFalse(loggerHandler.findText("AMQ224112"));
}
info = server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(QUEUE_NAME));
Assert.assertNotNull(info);

View File

@ -90,8 +90,7 @@ public class ForceDeleteQueue extends ActiveMQTestBase {
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, uri);
Connection conn = factory.createConnection();
AssertionLoggerHandler.startCapture();
try {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName.toString());
MessageProducer producer = session.createProducer(queue);
@ -139,11 +138,10 @@ public class ForceDeleteQueue extends ActiveMQTestBase {
}
Assert.assertFalse(AssertionLoggerHandler.findText("Cannot find add info"));
Assert.assertFalse(loggerHandler.findText("Cannot find add info"));
} finally {
AssertionLoggerHandler.stopCapture();
try {
conn.close();
} catch (Throwable ignored) {

View File

@ -410,11 +410,10 @@ public class LargeMessageTest extends LargeMessageTestBase {
});
server.stop();
AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);
server.start();
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ221019"));
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
server.start();
Assert.assertTrue(loggerHandler.findText("AMQ221019"));
}
validateNoFilesOnLargeDir();
runAfter(server::stop);
@ -2087,8 +2086,8 @@ public class LargeMessageTest extends LargeMessageTestBase {
@Test
public void testPageOnLargeMessageMultipleQueues() throws Exception {
AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);
AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler();
runAfter(() -> loggerHandler.close());
Configuration config = createDefaultConfig(isNetty());
@ -2213,8 +2212,8 @@ public class LargeMessageTest extends LargeMessageTestBase {
session.close();
}
// Reference Counting negative errrors
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ214034"));
// Reference Counting negative errors
Assert.assertFalse(loggerHandler.findText("AMQ214034"));
}
// JBPAPP-6237

View File

@ -30,9 +30,9 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.SpawnedVMSupport;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -45,11 +45,6 @@ public class PendingDeliveriesTest extends ClientTestBase {
server.createQueue(new QueueConfiguration("queue1").setRoutingType(RoutingType.ANYCAST));
}
@After
public void clearLogger() throws Exception {
AssertionLoggerHandler.stopCapture();
}
private static final String AMQP_URI = "amqp://localhost:61616?amqp.saslLayer=false";
private static final String CORE_URI_NO_RECONNECT = "tcp://localhost:61616?confirmationWindowSize=-1";
private static final String CORE_URI_WITH_RECONNECT = "tcp://localhost:61616?confirmationWindowSize=" + (1024 * 1024);
@ -173,17 +168,19 @@ public class PendingDeliveriesTest extends ClientTestBase {
@Test
public void testCleanShutdownNoLogger() throws Exception {
AssertionLoggerHandler.startCapture();
startClient(CORE_URI_NO_RECONNECT, "queue1", false, true);
Thread.sleep(500);
Assert.assertFalse(AssertionLoggerHandler.findText("clearing up resources"));
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
startClient(CORE_URI_NO_RECONNECT, "queue1", false, true);
Thread.sleep(500);
Assert.assertFalse(loggerHandler.findText("clearing up resources"));
}
}
@Test
public void testBadShutdownLogger() throws Exception {
AssertionLoggerHandler.startCapture();
startClient(CORE_URI_NO_RECONNECT, "queue1", false, false);
Assert.assertTrue(AssertionLoggerHandler.findText(1000, "clearing up resources"));
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
startClient(CORE_URI_NO_RECONNECT, "queue1", false, false);
Wait.assertTrue(() -> loggerHandler.findText("clearing up resources"), 1000);
}
}
@Test

View File

@ -95,9 +95,7 @@ public class AutoDeleteDistributedTest extends ClusterTestBase {
@Test
public void testAutoDelete() throws Exception {
AssertionLoggerHandler.startCapture();
try {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
AtomicBoolean error = new AtomicBoolean(false);
@ -153,10 +151,8 @@ public class AutoDeleteDistributedTest extends ClusterTestBase {
Assert.assertFalse(error.get());
Thread.sleep(100); // I couldn't make it to fail without a minimal sleep here
Assert.assertFalse(AssertionLoggerHandler.findText("java.lang.IllegalStateException"));
Assert.assertFalse(AssertionLoggerHandler.findText("Cannot find binding"));
} finally {
AssertionLoggerHandler.stopCapture();
Assert.assertFalse(loggerHandler.findText("java.lang.IllegalStateException"));
Assert.assertFalse(loggerHandler.findText("Cannot find binding"));
}
}

View File

@ -83,22 +83,19 @@ public class NetworkIsolationTest extends FailoverTestBase {
@Test
public void testDoNotActivateOnIsolation() throws Exception {
AssertionLoggerHandler.startCapture();
try {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
ServerLocator locator = getServerLocator();
// this block here is just to validate if ignoring loopback addresses logic is in place
{
backupServer.getServer().getNetworkHealthCheck().addAddress("127.0.0.1");
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ202001"));
AssertionLoggerHandler.clear();
Assert.assertTrue(loggerHandler.findText("AMQ202001"));
backupServer.getServer().getNetworkHealthCheck().setIgnoreLoopback(true).addAddress("127.0.0.1");
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ202001"));
Assert.assertEquals(1, loggerHandler.countText("AMQ202001"));
backupServer.getServer().getNetworkHealthCheck().clearAddresses();
}
@ -133,8 +130,6 @@ public class NetworkIsolationTest extends FailoverTestBase {
// This will make sure the backup got synchronized after the network was activated again
Assert.assertTrue(getReplicationEndpoint(backupServer.getServer()).isStarted());
} finally {
AssertionLoggerHandler.stopCapture();
}
}

View File

@ -136,102 +136,99 @@ public class ReplicaTimeoutTest extends ActiveMQTestBase {
@Test//(timeout = 120000)
public void testFailbackTimeout() throws Exception {
AssertionLoggerHandler.startCapture();
TestableServer backupServer = null;
TestableServer liveServer = null;
ClientSessionFactory sf = null;
try {
TestableServer backupServer = null;
TestableServer liveServer = null;
ClientSessionFactory sf = null;
try {
final TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
final TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
final TransportConfiguration backupAcceptor = getAcceptorTransportConfiguration(false);
final TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
final TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
final TransportConfiguration backupAcceptor = getAcceptorTransportConfiguration(false);
Configuration backupConfig = createDefaultInVMConfig();
Configuration liveConfig = createDefaultInVMConfig();
Configuration backupConfig = createDefaultInVMConfig();
Configuration liveConfig = createDefaultInVMConfig();
configureReplicationPair(backupConfig, liveConfig, backupConnector, backupAcceptor, liveConnector);
configureReplicationPair(backupConfig, liveConfig, backupConnector, backupAcceptor, liveConnector);
backupConfig.setBindingsDirectory(getBindingsDir(0, true)).setJournalDirectory(getJournalDir(0, true)).
setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true)).setSecurityEnabled(false);
liveConfig.setBindingsDirectory(getBindingsDir(0, false)).setJournalDirectory(getJournalDir(0, false)).
setPagingDirectory(getPageDir(0, false)).setLargeMessagesDirectory(getLargeMessagesDir(0, false)).setSecurityEnabled(false);
backupConfig.setBindingsDirectory(getBindingsDir(0, true)).setJournalDirectory(getJournalDir(0, true)).
setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true)).setSecurityEnabled(false);
liveConfig.setBindingsDirectory(getBindingsDir(0, false)).setJournalDirectory(getJournalDir(0, false)).
setPagingDirectory(getPageDir(0, false)).setLargeMessagesDirectory(getLargeMessagesDir(0, false)).setSecurityEnabled(false);
NodeManager replicatedBackupNodeManager = createReplicatedBackupNodeManager(backupConfig);
NodeManager replicatedBackupNodeManager = createReplicatedBackupNodeManager(backupConfig);
backupServer = createTestableServer(backupConfig, replicatedBackupNodeManager);
backupServer = createTestableServer(backupConfig, replicatedBackupNodeManager);
liveConfig.clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true));
liveConfig.clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true));
NodeManager nodeManager = createReplicatedBackupNodeManager(liveConfig);
liveServer = createTestableServer(liveConfig, nodeManager);
NodeManager nodeManager = createReplicatedBackupNodeManager(liveConfig);
liveServer = createTestableServer(liveConfig, nodeManager);
final TestableServer theBackup = backupServer;
final TestableServer theBackup = backupServer;
liveServer.start();
backupServer.start();
liveServer.start();
backupServer.start();
Wait.assertTrue(backupServer.getServer()::isReplicaSync);
Wait.assertTrue(backupServer.getServer()::isReplicaSync);
sf = createSessionFactory();
sf = createSessionFactory();
ClientSession session = createSession(sf, true, true);
ClientSession session = createSession(sf, true, true);
session.createQueue(new QueueConfiguration(ADDRESS));
session.createQueue(new QueueConfiguration(ADDRESS));
crash(liveServer, backupServer, session);
crash(liveServer, backupServer, session);
Wait.assertTrue(backupServer.getServer()::isActive);
Wait.assertTrue(backupServer.getServer()::isActive);
((ActiveMQServerImpl) backupServer.getServer()).setAfterActivationCreated(new Runnable() {
@Override
public void run() {
final Activation backupActivation = theBackup.getServer().getActivation();
if (backupActivation instanceof SharedNothingBackupActivation) {
SharedNothingBackupActivation activation = (SharedNothingBackupActivation) backupActivation;
ReplicationEndpoint repEnd = activation.getReplicationEndpoint();
repEnd.addOutgoingInterceptorForReplication((packet, connection) -> {
((ActiveMQServerImpl) backupServer.getServer()).setAfterActivationCreated(new Runnable() {
@Override
public void run() {
final Activation backupActivation = theBackup.getServer().getActivation();
if (backupActivation instanceof SharedNothingBackupActivation) {
SharedNothingBackupActivation activation = (SharedNothingBackupActivation) backupActivation;
ReplicationEndpoint repEnd = activation.getReplicationEndpoint();
repEnd.addOutgoingInterceptorForReplication((packet, connection) -> {
if (packet.getType() == PacketImpl.REPLICATION_RESPONSE_V2) {
return false;
}
return true;
});
} else if (backupActivation instanceof ReplicationBackupActivation) {
ReplicationBackupActivation activation = (ReplicationBackupActivation) backupActivation;
activation.spyReplicationEndpointCreation(replicationEndpoint -> {
replicationEndpoint.addOutgoingInterceptorForReplication((packet, connection) -> {
if (packet.getType() == PacketImpl.REPLICATION_RESPONSE_V2) {
return false;
}
return true;
});
} else if (backupActivation instanceof ReplicationBackupActivation) {
ReplicationBackupActivation activation = (ReplicationBackupActivation) backupActivation;
activation.spyReplicationEndpointCreation(replicationEndpoint -> {
replicationEndpoint.addOutgoingInterceptorForReplication((packet, connection) -> {
if (packet.getType() == PacketImpl.REPLICATION_RESPONSE_V2) {
return false;
}
return true;
});
});
}
});
}
});
}
});
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler(true)) {
liveServer.start();
Assert.assertTrue(Wait.waitFor(() -> AssertionLoggerHandler.findText("AMQ229114")));
if (expectLiveSuicide()) {
Wait.assertFalse(liveServer.getServer()::isStarted);
}
} finally {
if (sf != null) {
sf.close();
}
try {
liveServer.getServer().stop();
} catch (Throwable ignored) {
}
try {
backupServer.getServer().stop();
} catch (Throwable ignored) {
}
Assert.assertTrue(Wait.waitFor(() -> loggerHandler.findTrace("AMQ229114")));
}
if (expectLiveSuicide()) {
Wait.assertFalse(liveServer.getServer()::isStarted);
}
} finally {
AssertionLoggerHandler.stopCapture();
if (sf != null) {
sf.close();
}
try {
liveServer.getServer().stop();
} catch (Throwable ignored) {
}
try {
backupServer.getServer().stop();
} catch (Throwable ignored) {
}
}
}

View File

@ -42,8 +42,8 @@ public class NettyFederatedQueueTest extends FederatedTestBase {
@Test
public void testFederatedQueueBiDirectionalUpstream() throws Exception {
AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);
AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler();
runAfter(() -> loggerHandler.close());
String queueName = getName();
FederationConfiguration federationConfiguration0 = FederatedTestUtil.createQueueUpstreamFederationConfiguration("server1", queueName);
@ -89,6 +89,6 @@ public class NettyFederatedQueueTest extends FederatedTestBase {
Assert.assertNotNull(consumer2.receive(5000));
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222153"));
Assert.assertFalse(loggerHandler.findText("AMQ222153"));
}
}

View File

@ -129,9 +129,7 @@ public class CreateSubscriptionTest extends JMSTestBase {
public void testCreateManyConsumers(String queueType) throws Exception {
AssertionLoggerHandler.startCapture();
try {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("myTopic")).addRoutingType(RoutingType.MULTICAST));
ConnectionFactory cf = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
@ -191,9 +189,7 @@ public class CreateSubscriptionTest extends JMSTestBase {
}
Assert.assertEquals(0, errors.get());
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ229018"));
} finally {
AssertionLoggerHandler.stopCapture();
Assert.assertFalse(loggerHandler.findText("AMQ229018"));
}
}

View File

@ -51,7 +51,6 @@ import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncodin
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestBase;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.fakes.SimpleEncoding;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -279,18 +278,13 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
journal.appendUpdateRecordTransactional(4, -1, recordType, "upd".getBytes());
journal.appendCommitRecord(3, true);
journal.testCompact();
int cnt = loggerHandler.countText("must be");
Assert.assertTrue(cnt > 0);
Assert.assertFalse(failed.get());
try {
AssertionLoggerHandler.startCapture();
journal.testCompact();
Assert.assertTrue(AssertionLoggerHandler.findText("must be"));
Assert.assertFalse(failed.get());
AssertionLoggerHandler.clear();
journal.testCompact();
Assert.assertFalse(AssertionLoggerHandler.findText("must be")); // the invalid records should be cleared during a compact
} finally {
AssertionLoggerHandler.stopCapture();
}
journal.testCompact();
Assert.assertEquals(cnt, loggerHandler.countText("must be")); // the invalid records should be cleared during a compact
}
@Test
@ -416,7 +410,6 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
journal.start();
ArrayList<RecordInfo> list = new ArrayList<>();
journal.load(new LoaderCallback() {
@Override
public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction) {

View File

@ -507,8 +507,7 @@ public class MQTTTest extends MQTTTestSupport {
@Test(timeout = 60 * 1000)
public void testSendAndReceiveRetainedLargeMessage() throws Exception {
AssertionLoggerHandler.startCapture();
try {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
byte[] payload = new byte[ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE * 2];
for (int i = 0; i < payload.length; i++) {
payload[i] = '2';
@ -554,9 +553,7 @@ public class MQTTTest extends MQTTTestSupport {
subscriber3.disconnect();
publisher.disconnect();
Assert.assertFalse(AssertionLoggerHandler.findText("Exception"));
} finally {
AssertionLoggerHandler.stopCapture();
Assert.assertFalse(loggerHandler.findText("Exception"));
}
}

View File

@ -144,9 +144,7 @@ public class MqttWildCardSubAutoCreateTest extends MQTTTestSupport {
@Test
public void testCoreHierarchicalTopic() throws Exception {
try {
AssertionLoggerHandler.startCapture();
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
ConnectionFactory cf = new ActiveMQConnectionFactory();
Connection connection = cf.createConnection();
@ -228,9 +226,7 @@ public class MqttWildCardSubAutoCreateTest extends MQTTTestSupport {
connection.close();
Assert.assertFalse(AssertionLoggerHandler.findText("222295"));
} finally {
AssertionLoggerHandler.stopCapture();
Assert.assertFalse(loggerHandler.findText("222295"));
}
}

View File

@ -217,8 +217,7 @@ public class MQTT5Test extends MQTT5TestSupport {
@Test(timeout = DEFAULT_TIMEOUT)
public void testRecursiveWill() throws Exception {
AssertionLoggerHandler.startCapture(true);
try {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
final String WILL_QUEUE = "will";
server.createQueue(new QueueConfiguration(WILL_QUEUE).setRoutingType(RoutingType.ANYCAST));
PagingManagerImplAccessor.setDiskFull((PagingManagerImpl) server.getPagingManager(), true);
@ -226,9 +225,7 @@ public class MQTT5Test extends MQTT5TestSupport {
MqttConnectionOptions options = new MqttConnectionOptionsBuilder().will(WILL_QUEUE, new MqttMessage(RandomUtil.randomBytes())).build();
client.connect(options);
client.disconnectForcibly(0, 0, false);
Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ229119"), 2000, 100);
} finally {
AssertionLoggerHandler.stopCapture();
Wait.assertTrue(() -> loggerHandler.findText("AMQ229119"), 2000, 100);
}
}

View File

@ -101,8 +101,7 @@ public class CompactingOpenWireTest extends BasicOpenWireTest {
compactDone.countDown();
});
CountDownLatch latchDone = new CountDownLatch(THREADS);
AssertionLoggerHandler.startCapture();
try {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
String space1k = new String(new char[5]).replace('\0', ' ');
for (int i = 0; i < THREADS; i++) {
@ -153,11 +152,10 @@ public class CompactingOpenWireTest extends BasicOpenWireTest {
compactDone.await(10, TimeUnit.MINUTES);
executorService.shutdownNow();
Assert.assertEquals(0, errors.get());
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ144003")); // error compacting
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222055")); // records not found
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222302")); // string conversion issue
Assert.assertFalse(loggerHandler.findText("AMQ144003")); // error compacting
Assert.assertFalse(loggerHandler.findText("AMQ222055")); // records not found
Assert.assertFalse(loggerHandler.findText("AMQ222302")); // string conversion issue
} finally {
AssertionLoggerHandler.stopCapture();
running.set(false);
executorService.shutdownNow();
}

View File

@ -142,8 +142,7 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest {
@Test
public void testFastLargeMessageProducerDropOnPaging() throws Exception {
AssertionLoggerHandler.startCapture();
try {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler(true)) {
// Create 200K Message
int size = 200 * 1024;
@ -182,10 +181,8 @@ public class OpenWireLargeMessageTest extends BasicOpenWireTest {
}
}
server.stop();
Assert.assertFalse(AssertionLoggerHandler.findText("NullPointerException"));
Assert.assertFalse(AssertionLoggerHandler.findText("It was not possible to delete message"));
} finally {
AssertionLoggerHandler.stopCapture();
Assert.assertFalse(loggerHandler.findTrace("NullPointerException"));
Assert.assertFalse(loggerHandler.findText("It was not possible to delete message"));
}
}

View File

@ -30,25 +30,14 @@ import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.state.SessionState;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class SessionHandlingOpenWireTest extends BasicOpenWireTest {
@BeforeClass
public static void prepareLogger() {
AssertionLoggerHandler.startCapture();
}
@AfterClass
public static void clearLogger() {
AssertionLoggerHandler.stopCapture();
}
@Test
public void testInternalSessionHandling() throws Exception {
try (Connection conn = factory.createConnection()) {
try (Connection conn = factory.createConnection();
AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
conn.start();
try (Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
Destination dest = createDestination(session,ActiveMQDestination.QUEUE_TYPE);
@ -57,14 +46,15 @@ public class SessionHandlingOpenWireTest extends BasicOpenWireTest {
Message m = consumer.receive(2000);
assertNotNull(m);
}
assertFalse(loggerHandler.findText("Client connection failed, clearing up resources for session"));
assertFalse(loggerHandler.findText("Cleared up resources for session"));
}
assertFalse(AssertionLoggerHandler.findText("Client connection failed, clearing up resources for session"));
assertFalse(AssertionLoggerHandler.findText("Cleared up resources for session"));
}
@Test
public void testInternalSessionHandlingNoSessionClose() throws Exception {
try (Connection conn = factory.createConnection()) {
try (Connection conn = factory.createConnection();
AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
conn.start();
for (int i = 0; i < 100; i++) {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -82,9 +72,9 @@ public class SessionHandlingOpenWireTest extends BasicOpenWireTest {
}
}
assertFalse(loggerHandler.findText("Client connection failed, clearing up resources for session"));
assertFalse(loggerHandler.findText("Cleared up resources for session"));
}
assertFalse(AssertionLoggerHandler.findText("Client connection failed, clearing up resources for session"));
assertFalse(AssertionLoggerHandler.findText("Cleared up resources for session"));
}

View File

@ -48,22 +48,16 @@ public class AddressFullLoggingTest extends ActiveMQTestBase {
private static final String SERVER_LOGGER_NAME = ActiveMQServerLogger.class.getPackage().getName();
private static LogLevel previousLevel = null;
private static LogLevel previousLevel;
@BeforeClass
public static void prepareLogger() {
previousLevel = AssertionLoggerHandler.setLevel(SERVER_LOGGER_NAME, LogLevel.INFO);
AssertionLoggerHandler.startCapture();
}
@AfterClass
public static void clearLogger() {
try {
AssertionLoggerHandler.stopCapture();
} finally {
AssertionLoggerHandler.setLevel(SERVER_LOGGER_NAME, previousLevel);
}
public static void clearLogger() throws Exception {
AssertionLoggerHandler.setLevel(SERVER_LOGGER_NAME, previousLevel);
}
@Test
@ -150,16 +144,18 @@ public class AddressFullLoggingTest extends ActiveMQTestBase {
break;
msg.acknowledge();
}
//this is needed to allow to kick-in at least once disk scan
TimeUnit.MILLISECONDS.sleep(server.getConfiguration().getDiskScanPeriod() * 2);
session.close();
locator.close();
server.stop();
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
//this is needed to allow to kick-in at least once disk scan
TimeUnit.MILLISECONDS.sleep(server.getConfiguration().getDiskScanPeriod() * 2);
session.close();
locator.close();
server.stop();
// Using the code only so the test doesn't fail just because someone edits the log text
Assert.assertTrue("Expected to find AMQ222183", AssertionLoggerHandler.findText("AMQ222183", "myAddress"));
Assert.assertTrue("Expected to find AMQ221046", AssertionLoggerHandler.findText("AMQ221046", "myAddress"));
Assert.assertFalse("Expected to not find AMQ222211", AssertionLoggerHandler.findText("AMQ222211"));
// Using the code only so the test doesn't fail just because someone edits the log text
Assert.assertTrue("Expected to find AMQ222183", loggerHandler.findText("AMQ222183", MY_ADDRESS));
Assert.assertTrue("Expected to find AMQ221046", loggerHandler.findText("AMQ221046", MY_ADDRESS));
Assert.assertFalse("Expected to not find AMQ222211", loggerHandler.findText("AMQ222211"));
}
}
}

View File

@ -19,7 +19,6 @@ package org.apache.activemq.artemis.tests.integration.paging;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
@ -141,18 +140,15 @@ public class AnonymousProducerPageTest extends ActiveMQTestBase {
connection.close();
}
private void validatePolicyMismatch(Session session, MessageProducer producer) throws JMSException {
AssertionLoggerHandler.startCapture();
try {
private void validatePolicyMismatch(Session session, MessageProducer producer) throws Exception {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
producer.send(session.createQueue("blockedQueue"), session.createMessage());
session.commit();
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ111004"));
AssertionLoggerHandler.clear();
Assert.assertTrue(loggerHandler.findText("AMQ111004"));
producer.send(session.createQueue(getName()), session.createMessage());
session.commit();
Assert.assertFalse("The warning should be printed only once", AssertionLoggerHandler.findText("AMQ111004"));
} finally {
AssertionLoggerHandler.stopCapture();
Assert.assertEquals("The warning should be printed only once", 1, loggerHandler.countText("AMQ111004"));
}
}

View File

@ -380,9 +380,6 @@ public class MaxMessagesPagingTest extends ActiveMQTestBase {
Queue queue = server.locateQueue(ADDRESS);
AssertionLoggerHandler.startCapture();
runAfter(() -> AssertionLoggerHandler.stopCapture());
for (int repeat = 0; repeat < 5; repeat++) {
boolean durable = repeat % 2 == 0;
@ -397,15 +394,13 @@ public class MaxMessagesPagingTest extends ActiveMQTestBase {
Wait.assertEquals(MESSAGE_COUNT, queue::getMessageCount);
AssertionLoggerHandler.clear();
try {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler(true)) {
producer.send(session.createTextMessage("should fail"));
if (durable) {
Assert.fail(("supposed to fail"));
} else {
// in case of async send, the exception will not propagate to the client, and we should still check the logger on that case
Wait.assertTrue(() -> AssertionLoggerHandler.findText("is full")); // my intention was to assert for "AMQ229102" howerver openwire is not using the code here
Wait.assertTrue(() -> loggerHandler.findTrace("is full")); // my intention was to assert for "AMQ229102" howerver openwire is not using the code here
}
} catch (Exception expected) {
}
@ -492,46 +487,45 @@ public class MaxMessagesPagingTest extends ActiveMQTestBase {
runAfter(connSend::close); // not using closeable because OPENWIRE might not support it depending on the version
runAfter(connReceive::close); // not using closeable because OPENWIRE might not support it depending on the version
AssertionLoggerHandler.startCapture();
runAfter(() -> AssertionLoggerHandler.stopCapture());
ExecutorService executorService = Executors.newSingleThreadExecutor();
runAfter(executorService::shutdownNow);
CountDownLatch done = new CountDownLatch(1);
executorService.execute(() -> {
try {
Session session = connSend.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(ADDRESS));
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < MESSAGES; i++) {
producer.send(session.createTextMessage("OK!" + i));
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
CountDownLatch done = new CountDownLatch(1);
executorService.execute(() -> {
try {
Session session = connSend.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(ADDRESS));
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < MESSAGES; i++) {
producer.send(session.createTextMessage("OK!" + i));
}
session.close();
} catch (Exception e) {
// e.printStackTrace();
}
session.close();
} catch (Exception e) {
e.printStackTrace();
}
done.countDown();
});
done.countDown();
});
Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ222183"), 5000, 10); //unblock
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ221046")); // should not been unblocked
Wait.assertTrue(() -> loggerHandler.findText("AMQ222183"), 5000, 10); //unblock
Assert.assertFalse(loggerHandler.findText("AMQ221046")); // should not been unblocked
AssertionLoggerHandler.clear();
Assert.assertFalse(done.await(100, TimeUnit.MILLISECONDS));
Session sessionReceive = connReceive.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = sessionReceive.createConsumer(sessionReceive.createQueue(ADDRESS));
for (int i = 0; i < MESSAGES; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals("OK!" + i, message.getText());
Assert.assertFalse(done.await(100, TimeUnit.MILLISECONDS));
}
sessionReceive.close();
Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ221046"), 5000, 10); // unblock
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
Session sessionReceive = connReceive.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = sessionReceive.createConsumer(sessionReceive.createQueue(ADDRESS));
for (int i = 0; i < MESSAGES; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals("OK!" + i, message.getText());
}
sessionReceive.close();
Wait.assertTrue(() -> loggerHandler.findText("AMQ221046"), 5000, 10); // unblock
}
}
@ -600,12 +594,8 @@ public class MaxMessagesPagingTest extends ActiveMQTestBase {
runAfter(connSend::close); // not using closeable because OPENWIRE might not support it depending on the version
runAfter(connReceive::close); // not using closeable because OPENWIRE might not support it depending on the version
AssertionLoggerHandler.startCapture();
runAfter(() -> AssertionLoggerHandler.stopCapture());
for (int repeat = 0; repeat < 5; repeat++) {
AssertionLoggerHandler.clear();
{
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
Session session = connSend.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(ADDRESS));
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
@ -613,14 +603,12 @@ public class MaxMessagesPagingTest extends ActiveMQTestBase {
producer.send(session.createTextMessage("OK!" + i));
}
session.close();
}
if (repeat == 0) {
// the server will only log it on the first repeat as expected
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ222039")); // dropped messages
}
if (repeat == 0) {
// the server will only log it on the first repeat as expected
Assert.assertTrue(loggerHandler.findText("AMQ222039")); // dropped messages
}
{
Session sessionReceive = connReceive.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = sessionReceive.createConsumer(sessionReceive.createQueue(ADDRESS));
for (int i = 0; i < 10; i++) {

View File

@ -57,19 +57,21 @@ public class PageCountSyncOnNonTXTest extends SpawnedTestBase {
Process process;
private AssertionLoggerHandler loggerHandler;
@Before
public void checkLoggerStart() throws Exception {
AssertionLoggerHandler.startCapture();
loggerHandler = new AssertionLoggerHandler();
}
@After
public void checkLoggerEnd() throws Exception {
try {
// These are the message errors for the negative size address size
Assert.assertFalse(AssertionLoggerHandler.findText("222214"));
Assert.assertFalse(AssertionLoggerHandler.findText("222215"));
Assert.assertFalse(loggerHandler.findText("222214"));
Assert.assertFalse(loggerHandler.findText("222215"));
} finally {
AssertionLoggerHandler.stopCapture();
loggerHandler.close();
}
}

View File

@ -60,21 +60,22 @@ public class PagingCounterTest extends ActiveMQTestBase {
private ServerLocator sl;
private AssertionLoggerHandler loggerHandler;
@Before
public void checkLoggerStart() throws Exception {
AssertionLoggerHandler.startCapture();
loggerHandler = new AssertionLoggerHandler();
}
@After
public void checkLoggerEnd() throws Exception {
try {
// These are the message errors for the negative size address size
Assert.assertFalse(AssertionLoggerHandler.findText("222214"));
Assert.assertFalse(AssertionLoggerHandler.findText("222215"));
Assert.assertFalse(loggerHandler.findText("222214"));
Assert.assertFalse(loggerHandler.findText("222215"));
} finally {
AssertionLoggerHandler.stopCapture();
loggerHandler.close();
}
}

View File

@ -110,8 +110,6 @@ public class PagingLimitTest extends ActiveMQTestBase {
String protocol,
boolean transacted,
boolean drop) throws Exception {
AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(queueName);
Assert.assertNotNull(serverQueue);
@ -168,7 +166,7 @@ public class PagingLimitTest extends ActiveMQTestBase {
session.commit();
}
try {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
producer.send(session.createTextMessage("should not complete"));
if (transacted) {
session.commit();
@ -176,7 +174,7 @@ public class PagingLimitTest extends ActiveMQTestBase {
if (!drop) {
Assert.fail("an Exception was expected");
}
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ224120"));
Assert.assertTrue(loggerHandler.findText("AMQ224120"));
} catch (JMSException e) {
logger.debug("Expected exception, ok!", e);
}
@ -226,11 +224,9 @@ public class PagingLimitTest extends ActiveMQTestBase {
if (transacted) {
session.commit();
}
AssertionLoggerHandler.clear();
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224120"));
try {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
producer.send(session.createTextMessage("should not complete"));
if (transacted) {
session.commit();
@ -238,7 +234,7 @@ public class PagingLimitTest extends ActiveMQTestBase {
if (!drop) {
Assert.fail("an Exception was expected");
} else {
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224120"));
Assert.assertFalse(loggerHandler.findText("AMQ224120"));
}
} catch (JMSException e) {
logger.debug("Expected exception, ok!", e);

View File

@ -89,8 +89,8 @@ public class PagingMaxReadLimitTest extends ActiveMQTestBase {
Assert.assertTrue(serverQueue.getPagingStore().isPaging());
}
AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);
AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler();
runAfter(() -> loggerHandler.close());
AtomicInteger errorCounter = new AtomicInteger(0);
CountDownLatch done = new CountDownLatch(1);
@ -120,7 +120,7 @@ public class PagingMaxReadLimitTest extends ActiveMQTestBase {
});
Assert.assertTrue(done.await(5, TimeUnit.SECONDS));
Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ224127"), 2000, 10);
Wait.assertTrue(() -> loggerHandler.findText("AMQ224127"), 2000, 10);
Assert.assertEquals(0, errorCounter.get());
}

View File

@ -154,6 +154,8 @@ public class PagingTest extends ActiveMQTestBase {
static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
private AssertionLoggerHandler loggerHandler;
public PagingTest(StoreConfiguration.StoreType storeType) {
this.storeType = storeType;
}
@ -166,17 +168,17 @@ public class PagingTest extends ActiveMQTestBase {
@Before
public void checkLoggerStart() throws Exception {
AssertionLoggerHandler.startCapture();
loggerHandler = new AssertionLoggerHandler();
}
@After
public void checkLoggerEnd() throws Exception {
try {
// These are the message errors for the negative size address size
Assert.assertFalse(AssertionLoggerHandler.findText("222214"));
Assert.assertFalse(AssertionLoggerHandler.findText("222215"));
Assert.assertFalse(loggerHandler.findText("222214"));
Assert.assertFalse(loggerHandler.findText("222215"));
} finally {
AssertionLoggerHandler.stopCapture();
loggerHandler.close();
}
}
@ -287,9 +289,7 @@ public class PagingTest extends ActiveMQTestBase {
@Test
public void testPageTX() throws Exception {
AssertionLoggerHandler.startCapture();
try {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
Configuration config = createDefaultInVMConfig();
final int PAGE_MAX = 20 * 1024;
@ -366,12 +366,10 @@ public class PagingTest extends ActiveMQTestBase {
}
session.close();
Assert.assertFalse(AssertionLoggerHandler.findText("Could not locate page"));
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222029"));
Assert.assertFalse(loggerHandler.findText("Could not locate page"));
Assert.assertFalse(loggerHandler.findText("AMQ222029"));
server.getPagingManager().getPageStore(ADDRESS).enableCleanup();
Wait.assertFalse(server.getPagingManager().getPageStore(ADDRESS)::isPaging);
} finally {
AssertionLoggerHandler.stopCapture();
}
}
@ -2679,9 +2677,7 @@ public class PagingTest extends ActiveMQTestBase {
// this test only applies to file-based stores
Assume.assumeTrue(storeType == StoreConfiguration.StoreType.FILE);
AssertionLoggerHandler.startCapture();
try {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false).setPagingDirectory("/" + UUID.randomUUID().toString());
@ -2732,12 +2728,8 @@ public class PagingTest extends ActiveMQTestBase {
sf.close();
locator.close();
} finally {
try {
if (storeType != StoreConfiguration.StoreType.DATABASE) {
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ144010"));
}
} finally {
AssertionLoggerHandler.stopCapture();
if (storeType != StoreConfiguration.StoreType.DATABASE) {
Assert.assertTrue(loggerHandler.findText("AMQ144010"));
}
}
}
@ -6350,84 +6342,72 @@ public class PagingTest extends ActiveMQTestBase {
*/
@Test
public void testFailMessagesNonDurable() throws Exception {
AssertionLoggerHandler.startCapture();
clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig();
HashMap<String, AddressSettings> settings = new HashMap<>();
AddressSettings set = new AddressSettings();
set.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
settings.put(PagingTest.ADDRESS.toString(), set);
server = createServer(true, config, 1024, 5 * 1024, settings);
server.start();
locator.setBlockOnNonDurableSend(false).setBlockOnDurableSend(false).setBlockOnAcknowledge(true);
sf = createSessionFactory(locator);
ClientSession session = sf.createSession(true, true, 0);
session.createQueue(new QueueConfiguration(PagingTest.ADDRESS));
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
ClientMessage message = session.createMessage(false);
int biggerMessageSize = 1024;
byte[] body = new byte[biggerMessageSize];
ByteBuffer bb = ByteBuffer.wrap(body);
for (int j = 1; j <= biggerMessageSize; j++) {
bb.put(getSamplebyte(j));
}
message.getBodyBuffer().writeBytes(body);
// Send enough messages to fill up the address, but don't test for an immediate exception because we do not block
// on non-durable send. Instead of receiving an exception immediately the exception will be logged on the server.
for (int i = 0; i < 32; i++) {
producer.send(message);
}
// allow time for the logging to actually happen on the server
Wait.assertTrue("Expected to find AMQ224016", () -> loggerHandler.findText("AMQ224016"), 100);
ClientConsumer consumer = session.createConsumer(ADDRESS);
session.start();
// Once the destination is full and the client has run out of credits then it will receive an exception
for (int i = 0; i < 10; i++) {
validateExceptionOnSending(producer, message);
}
// Receive a message.. this should release credits
ClientMessage msgReceived = consumer.receive(5000);
assertNotNull(msgReceived);
msgReceived.acknowledge();
session.commit(); // to make sure it's on the server (roundtrip)
try {
clearDataRecreateServerDirs();
Configuration config = createDefaultInVMConfig();
HashMap<String, AddressSettings> settings = new HashMap<>();
AddressSettings set = new AddressSettings();
set.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
settings.put(PagingTest.ADDRESS.toString(), set);
server = createServer(true, config, 1024, 5 * 1024, settings);
server.start();
locator.setBlockOnNonDurableSend(false).setBlockOnDurableSend(false).setBlockOnAcknowledge(true);
sf = createSessionFactory(locator);
ClientSession session = sf.createSession(true, true, 0);
session.createQueue(new QueueConfiguration(PagingTest.ADDRESS));
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
ClientMessage message = session.createMessage(false);
int biggerMessageSize = 1024;
byte[] body = new byte[biggerMessageSize];
ByteBuffer bb = ByteBuffer.wrap(body);
for (int j = 1; j <= biggerMessageSize; j++) {
bb.put(getSamplebyte(j));
}
message.getBodyBuffer().writeBytes(body);
// Send enough messages to fill up the address, but don't test for an immediate exception because we do not block
// on non-durable send. Instead of receiving an exception immediately the exception will be logged on the server.
for (int i = 0; i < 32; i++) {
for (int i = 0; i < 1000; i++) {
// this send will succeed on the server
producer.send(message);
}
// allow time for the logging to actually happen on the server
Thread.sleep(100);
Assert.assertTrue("Expected to find AMQ224016", AssertionLoggerHandler.findText("AMQ224016"));
ClientConsumer consumer = session.createConsumer(ADDRESS);
session.start();
// Once the destination is full and the client has run out of credits then it will receive an exception
for (int i = 0; i < 10; i++) {
validateExceptionOnSending(producer, message);
}
// Receive a message.. this should release credits
ClientMessage msgReceived = consumer.receive(5000);
assertNotNull(msgReceived);
msgReceived.acknowledge();
session.commit(); // to make sure it's on the server (roundtrip)
boolean exception = false;
try {
for (int i = 0; i < 1000; i++) {
// this send will succeed on the server
producer.send(message);
}
} catch (Exception e) {
exception = true;
}
assertTrue("Expected to throw an exception", exception);
} finally {
AssertionLoggerHandler.stopCapture();
fail("Expected to throw an exception");
} catch (Exception expected) {
}
}

View File

@ -111,8 +111,8 @@ public class PendingTXCounterTest extends ActiveMQTestBase {
}
private void pendingSend(String protocol, boolean rollback, boolean restart) throws Exception {
AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);
AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler();
runAfter(() -> loggerHandler.close());
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(ADDRESS);
@ -135,7 +135,7 @@ public class PendingTXCounterTest extends ActiveMQTestBase {
}
}
Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ222038"), 2000);
Wait.assertTrue(() -> loggerHandler.findText("AMQ222038"), 2000);
Wait.assertEquals(INITIAL_NUMBER_OF_MESSAGES, serverQueue::getMessageCount, 2000);
@ -224,8 +224,8 @@ public class PendingTXCounterTest extends ActiveMQTestBase {
}
private void pendingACKTXRollback(String protocol, boolean rollback, boolean restart) throws Exception {
AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);
AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler();
runAfter(() -> loggerHandler.close());
org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(ADDRESS);
@ -243,7 +243,7 @@ public class PendingTXCounterTest extends ActiveMQTestBase {
}
}
Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ222038"), 2000);
Wait.assertTrue(() -> loggerHandler.findText("AMQ222038"), 2000);
Wait.assertEquals(NUMBER_OF_MESSAGES, serverQueue::getMessageCount, 2000);

View File

@ -30,9 +30,7 @@ import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -40,16 +38,6 @@ public class FileLockTimeoutTest extends ActiveMQTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@BeforeClass
public static void prepareLogger() {
AssertionLoggerHandler.startCapture();
}
@AfterClass
public static void clearLogger() {
AssertionLoggerHandler.stopCapture();
}
protected void doTest(final boolean useAIO) throws Exception {
if (useAIO) {
Assert.assertTrue(String.format("libAIO is not loaded on %s %s %s", System.getProperty("os.name"), System.getProperty("os.arch"), System.getProperty("os.version")), LibaioContext.isLoaded());
@ -85,17 +73,19 @@ public class FileLockTimeoutTest extends ActiveMQTestBase {
}
};
Future<?> f = service.submit(r);
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler(true)) {
Future<?> f = service.submit(r);
try {
f.get(15, TimeUnit.SECONDS);
} catch (Exception e) {
logger.warn("aborting test because server is taking too long to start");
try {
f.get(15, TimeUnit.SECONDS);
} catch (Exception e) {
logger.warn("aborting test because server is taking too long to start");
}
service.shutdown();
assertTrue("Expected to find AMQ224000", loggerHandler.findText("AMQ224000"));
assertTrue("Expected to find \"Timed out waiting for lock\"", loggerHandler.findTrace("Timed out waiting for lock"));
}
service.shutdown();
assertTrue("Expected to find AMQ224000", AssertionLoggerHandler.findText("AMQ224000"));
assertTrue("Expected to find \"Timed out waiting for lock\"", AssertionLoggerHandler.findText("Timed out waiting for lock"));
}
}

View File

@ -30,21 +30,16 @@ import org.junit.Test;
public class NetworkHealthCheckTest extends ActiveMQTestBase {
private static final String HEALTH_CHECK_LOGGER_NAME = NetworkHealthCheck.class.getName();
private static LogLevel previousLevel = null;
private static LogLevel previousLevel;
@BeforeClass
public static void prepareLogger() {
previousLevel = AssertionLoggerHandler.setLevel(HEALTH_CHECK_LOGGER_NAME, LogLevel.DEBUG);
AssertionLoggerHandler.startCapture();
}
@AfterClass
public static void clearLogger() {
try {
AssertionLoggerHandler.stopCapture();
} finally {
AssertionLoggerHandler.setLevel(HEALTH_CHECK_LOGGER_NAME, previousLevel);
}
AssertionLoggerHandler.setLevel(HEALTH_CHECK_LOGGER_NAME, previousLevel);
}
@ -62,10 +57,10 @@ public class NetworkHealthCheckTest extends ActiveMQTestBase {
ActiveMQServer server = createServer(false, config);
server.start();
try {
Assert.assertTrue(AssertionLoggerHandler.findText("executing ping:: " + String.format(customIpv4Command, checkingTimeout, checkingHost)));
Assert.assertFalse(AssertionLoggerHandler.findText(String.format(NetworkHealthCheck.IPV4_DEFAULT_COMMAND, checkingTimeout, checkingHost)));
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
server.start();
Assert.assertTrue(loggerHandler.findText("executing ping:: " + String.format(customIpv4Command, checkingTimeout, checkingHost)));
Assert.assertFalse(loggerHandler.findText(String.format(NetworkHealthCheck.IPV4_DEFAULT_COMMAND, checkingTimeout, checkingHost)));
} finally {
server.stop();
}

View File

@ -23,23 +23,11 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
public class PotentialOOMELoggingTest extends ActiveMQTestBase {
@BeforeClass
public static void prepareLogger() {
AssertionLoggerHandler.startCapture();
}
@AfterClass
public static void clearLogger() {
AssertionLoggerHandler.stopCapture();
}
/**
* When running this test from an IDE add this to the test command line so that the AssertionLoggerHandler works properly:
*
@ -48,14 +36,17 @@ public class PotentialOOMELoggingTest extends ActiveMQTestBase {
@Test
public void testBlockLogging() throws Exception {
ActiveMQServer server = createServer(false, createDefaultInVMConfig());
for (int i = 0; i < 100; i++) {
for (int i = 0; i < 200; i++) {
server.getConfiguration().addQueueConfiguration(new QueueConfiguration(UUID.randomUUID().toString()));
}
server.getConfiguration().setGlobalMaxSize(-1);
server.getConfiguration().getAddressSettings().put("#", new AddressSettings().setMaxSizeBytes(10485760 * 10));
server.start();
// Using the code only so the test doesn't fail just because someone edits the log text
Assert.assertTrue("Expected to find 222205", AssertionLoggerHandler.findText("AMQ222205"));
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
server.start();
// Using the code only so the test doesn't fail just because someone edits the log text
Assert.assertTrue("Expected to find 222205", loggerHandler.findText("AMQ222205"));
}
}
}

View File

@ -39,8 +39,7 @@ public class SimpleStartStopTest extends ActiveMQTestBase {
*/
@Test
public void testStartStopAndCleanupIDs() throws Exception {
AssertionLoggerHandler.startCapture();
try {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
ActiveMQServer server = null;
for (int i = 0; i < 50; i++) {
@ -50,8 +49,8 @@ public class SimpleStartStopTest extends ActiveMQTestBase {
}
// There shouldn't be any error from starting / stopping the server
assertFalse("There shouldn't be any error for just starting / stopping the server", AssertionLoggerHandler.hasLevel(LogLevel.ERROR));
assertFalse(AssertionLoggerHandler.findText("AMQ224008"));
assertFalse("There shouldn't be any error for just starting / stopping the server", loggerHandler.hasLevel(LogLevel.ERROR));
assertFalse(loggerHandler.findText("AMQ224008"));
HashMap<Integer, AtomicInteger> records = this.internalCountJournalLivingRecords(server.getConfiguration(), false);
@ -75,8 +74,6 @@ public class SimpleStartStopTest extends ActiveMQTestBase {
server.stop();
} finally {
AssertionLoggerHandler.stopCapture();
}
}
}

View File

@ -77,6 +77,8 @@ public class CoreClientOverOneWaySSLTest extends ActiveMQTestBase {
private TransportConfiguration tc;
private AssertionLoggerHandler loggerHandler;
@Parameterized.Parameters(name = "storeProvider={0}, storeType={1}, generateWarning={2}, useKeystoreAlias={3}")
public static Collection getParameters() {
return Arrays.asList(new Object[][]{
@ -107,19 +109,19 @@ public class CoreClientOverOneWaySSLTest extends ActiveMQTestBase {
@Before
public void validateLogging() {
AssertionLoggerHandler.startCapture();
loggerHandler = new AssertionLoggerHandler();
}
@After
public void afterValidateLogging() {
public void afterValidateLogging() throws Exception {
try {
if (this.generateWarning) {
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ212080"));
Assert.assertTrue(loggerHandler.findText("AMQ212080"));
} else {
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ212080"));
Assert.assertFalse(loggerHandler.findText("AMQ212080"));
}
} finally {
AssertionLoggerHandler.stopCapture();
loggerHandler.close();
}
}

View File

@ -207,8 +207,7 @@ public class StompTest extends StompTestBase {
@Test
public void testSendOverDiskFull() throws Exception {
AssertionLoggerHandler.startCapture();
try {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
MessageConsumer consumer = session.createConsumer(queue);
conn.connect(defUser, defPass);
@ -235,9 +234,7 @@ public class StompTest extends StompTestBase {
}
assertNotNull(e);
// It should encounter the exception on logs
AssertionLoggerHandler.findText("AMQ119119");
} finally {
AssertionLoggerHandler.stopCapture();
loggerHandler.findText("AMQ119119");
}
}

View File

@ -184,12 +184,11 @@ public class MegaCleanerPagingTest extends ActiveMQTestBase {
session.commit();
connection.close();
AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);
server.stop();
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222023")); // error associated with OME
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222010")); // critical IO Error
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
server.stop();
Assert.assertFalse(loggerHandler.findText("AMQ222023")); // error associated with OME
Assert.assertFalse(loggerHandler.findText("AMQ222010")); // critical IO Error
}
}
@ -243,13 +242,13 @@ public class MegaCleanerPagingTest extends ActiveMQTestBase {
Assert.assertNull(consumer.receiveNoWait());
connection.close();
AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);
store.getCursorProvider().resumeCleanup();
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
store.getCursorProvider().resumeCleanup();
server.stop();
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222023")); // error associated with OME
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222010")); // critical IO Error
server.stop();
Assert.assertFalse(loggerHandler.findText("AMQ222023")); // error associated with OME
Assert.assertFalse(loggerHandler.findText("AMQ222010")); // critical IO Error
}
}
public void internalTest(boolean midstream) throws Throwable {
@ -334,16 +333,16 @@ public class MegaCleanerPagingTest extends ActiveMQTestBase {
connection.close();
slowConnection.close();
AssertionLoggerHandler.startCapture();
runAfter(AssertionLoggerHandler::stopCapture);
store.enableCleanup();
store.getCursorProvider().scheduleCleanup();
if (!midstream) {
Wait.assertFalse(store::isPaging);
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
store.enableCleanup();
store.getCursorProvider().scheduleCleanup();
if (!midstream) {
Wait.assertFalse(store::isPaging);
}
server.stop();
Assert.assertFalse(loggerHandler.findText("AMQ222023")); // error associated with OME
Assert.assertFalse(loggerHandler.findText("AMQ222010")); // critical IO Error
}
server.stop();
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222023")); // error associated with OME
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222010")); // critical IO Error
}

View File

@ -53,17 +53,19 @@ import java.lang.invoke.MethodHandles;
public abstract class JournalImplTestBase extends ActiveMQTestBase {
protected AssertionLoggerHandler loggerHandler;
@Before
public void startLogger() {
AssertionLoggerHandler.startCapture();
loggerHandler = new AssertionLoggerHandler();
}
@After
public void stopLogger() {
public void stopLogger() throws Exception {
try {
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ144009"));
Assert.assertFalse(loggerHandler.findText("AMQ144009"));
} finally {
AssertionLoggerHandler.stopCapture();
loggerHandler.close();
}
}

View File

@ -2128,8 +2128,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
@Test
public void testDoubleDelete() throws Exception {
AssertionLoggerHandler.startCapture();
try {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler(true)) {
setup(10, 10 * 1024, true);
createJournal();
startJournal();
@ -2154,7 +2153,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
journal.appendDeleteRecord(2, false);
} catch (java.lang.IllegalStateException expected) {
} catch (Exception e) {
e.printStackTrace();
// e.printStackTrace();
}
});
threads[i].start();
@ -2169,13 +2168,11 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
}
journal.flush();
Assert.assertFalse(AssertionLoggerHandler.findText("NullPointerException"));
Assert.assertFalse(loggerHandler.findTrace("NullPointerException"));
stopJournal();
createJournal();
startJournal();
loadAndCheck();
} finally {
AssertionLoggerHandler.stopCapture();
}
}

View File

@ -28,8 +28,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
@ -88,8 +86,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import static org.apache.activemq.artemis.logs.AssertionLoggerHandler.findText;
public class PagingStoreImplTest extends ActiveMQTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -99,7 +95,6 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
}
private static final SimpleString destinationTestName = new SimpleString("test");
private final ReadLock lock = new ReentrantReadWriteLock().readLock();
protected ExecutorService executor;
@ -1090,13 +1085,10 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
getExecutorFactory().getExecutor(), getExecutorFactory().getExecutor(), true);
store.start();
AssertionLoggerHandler.startCapture();
try {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
store.startPaging();
store.stopPaging();
Assert.assertTrue(findText("AMQ222038"));
} finally {
AssertionLoggerHandler.stopCapture();
Assert.assertTrue(loggerHandler.findText("AMQ222038"));
}
}
@ -1113,13 +1105,10 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE),
getExecutorFactory().getExecutor(), getExecutorFactory().getExecutor(), true);
store.start();
AssertionLoggerHandler.startCapture();
try {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
store.startPaging();
store.stopPaging();
Assert.assertTrue(findText("AMQ224108"));
} finally {
AssertionLoggerHandler.stopCapture();
Assert.assertTrue(loggerHandler.findText("AMQ224108"));
}
}

View File

@ -21,9 +21,7 @@ import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -35,16 +33,6 @@ import org.slf4j.LoggerFactory;
*/
public class AssertionLoggerTest {
@Before
public void prepare() {
AssertionLoggerHandler.startCapture(true);
}
@After
public void cleanup() {
AssertionLoggerHandler.stopCapture();
}
@Test
public void testHandlingOnAMQP() throws Exception {
validateLogging(ActiveMQProtonRemotingConnection.class);
@ -57,29 +45,32 @@ public class AssertionLoggerTest {
@Test
public void testInfoAMQP() throws Exception {
ActiveMQAMQPProtocolLogger.LOGGER.retryConnection("test", "test", 1, 1);
Assert.assertTrue(AssertionLoggerHandler.findText("AMQ111002"));
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
ActiveMQAMQPProtocolLogger.LOGGER.retryConnection("test", "test", 1, 1);
Assert.assertTrue(loggerHandler.findText("AMQ111002"));
}
}
private void validateLogging(Class<?> clazz) {
private void validateLogging(Class<?> clazz) throws Exception {
String randomLogging = RandomUtil.randomString();
Logger logging = LoggerFactory.getLogger(clazz);
logging.warn(randomLogging);
Assert.assertTrue(AssertionLoggerHandler.findText(randomLogging));
AssertionLoggerHandler.clear();
for (int i = 0; i < 10; i++) {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
logging.warn(randomLogging);
}
Assert.assertEquals(10, AssertionLoggerHandler.countText(randomLogging));
AssertionLoggerHandler.clear();
for (int i = 0; i < 10; i++) {
logging.info(randomLogging);
Assert.assertTrue(loggerHandler.findText(randomLogging));
}
Assert.assertEquals(10, AssertionLoggerHandler.countText(randomLogging));
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
for (int i = 0; i < 10; i++) {
logging.warn(randomLogging);
}
Assert.assertEquals(10, loggerHandler.countText(randomLogging));
}
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
for (int i = 0; i < 10; i++) {
logging.info(randomLogging);
}
Assert.assertEquals(10, loggerHandler.countText(randomLogging));
}
}
}