Add test and supporting enhancements to the JUnit module to allow for
easy repetition of tests that don't always fail reliably
(cherry picked from commit 6ae169e275)
This commit is contained in:
Timothy Bish 2016-05-26 18:02:59 -04:00 committed by Christopher L. Shannon (cshannon)
parent fda982dccb
commit 508c12d948
9 changed files with 455 additions and 6 deletions

View File

@ -103,6 +103,11 @@
<artifactId>activemq-leveldb-store</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq.tooling</groupId>
<artifactId>activemq-junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View File

@ -154,8 +154,11 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
public void flow() throws Exception {
int updatedCredit = getEndpoint().getCredit();
LOG.trace("Flow: drain={} credit={}, remoteCredit={}",
getEndpoint().getDrain(), getEndpoint().getCredit(), getEndpoint().getRemoteCredit());
if (LOG.isTraceEnabled()) {
LOG.trace("Flow: currentCredit={}, draining={}, drain={} credit={}, remoteCredit={}, queued={}",
currentCredit, draining, getEndpoint().getDrain(),
getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued());
}
if (getEndpoint().getDrain() && (updatedCredit != currentCredit || !draining)) {
currentCredit = updatedCredit >= 0 ? updatedCredit : 0;
@ -166,6 +169,9 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
control.setConsumerId(getConsumerId());
control.setDestination(getDestination());
control.setPrefetch(0);
LOG.trace("Flow: Pull case -> consumer control with prefetch (0)");
sendToActiveMQ(control, null);
// Now request dispatch of the drain amount, we request immediate
@ -177,6 +183,9 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
pullRequest.setTimeout(-1);
pullRequest.setAlwaysSignalDone(true);
pullRequest.setQuantity(currentCredit);
LOG.trace("Pull case -> consumer pull request quantity = {}", currentCredit);
sendToActiveMQ(pullRequest, null);
} else if (updatedCredit != currentCredit) {
currentCredit = updatedCredit >= 0 ? updatedCredit : 0;
@ -184,7 +193,12 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
control.setConsumerId(getConsumerId());
control.setDestination(getDestination());
control.setPrefetch(currentCredit);
LOG.trace("Flow: update -> consumer control with prefetch (0)");
sendToActiveMQ(control, null);
} else {
LOG.trace("Flow: no credit change -> no broker updates needed");
}
}
@ -403,6 +417,12 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
draining = false;
currentCredit = 0;
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Sender:[{}] msgId={} currentCredit={}, draining={}, drain={} credit={}, remoteCredit={}, queued={}",
getEndpoint().getName(), jms.getJMSMessageID(), currentCredit, draining, getEndpoint().getDrain(),
getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued());
}
jms.setRedeliveryCounter(md.getRedeliveryCounter());
jms.setReadOnlyBody(true);
final EncodedMessage amqp = outboundTransformer.transform(jms);

View File

@ -182,7 +182,7 @@ public class JMSClientContext {
factory.setUsername(username);
factory.setPassword(password);
factory.setAlwaysSyncSend(syncPublish);
factory.setForceSyncSend(syncPublish);
factory.setTopicPrefix("topic://");
factory.setQueuePrefix("queue://");

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import java.util.Enumeration;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.junit.ActiveMQTestRunner;
import org.apache.activemq.junit.Repeat;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests for various QueueBrowser scenarios with an AMQP JMS client.
*/
@RunWith(ActiveMQTestRunner.class)
public class JMSQueueBrowserTest extends JMSClientTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(JMSClientTest.class);
@Test(timeout = 60000)
@Repeat(repetitions = 1)
public void testBrowseAllInQueueZeroPrefetch() throws Exception {
final int MSG_COUNT = 5;
JmsConnectionFactory cf = new JmsConnectionFactory(getBrokerURI() + "?jms.prefetchPolicy.all=0");
connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
Queue queue = session.createQueue(getDestinationName());
sendMessages(name.getMethodName(), MSG_COUNT, false);
QueueViewMBean proxy = getProxyToQueue(getDestinationName());
assertEquals(MSG_COUNT, proxy.getQueueSize());
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser);
Enumeration<?> enumeration = browser.getEnumeration();
int count = 0;
while (count < MSG_COUNT && enumeration.hasMoreElements()) {
Message msg = (Message) enumeration.nextElement();
assertNotNull(msg);
LOG.debug("Recv: {}", msg);
count++;
}
LOG.debug("Received all expected message, checking that hasMoreElements returns false");
assertFalse(enumeration.hasMoreElements());
assertEquals(5, count);
}
@Override
protected boolean isUseOpenWireConnector() {
return true;
}
}

View File

@ -20,21 +20,25 @@
#
log4j.rootLogger=WARN, console, file
log4j.logger.org.apache.activemq=INFO
log4j.logger.org.apache.activemq.broker.scheduler=TRACE
log4j.logger.org.apache.activemq.transport.amqp=DEBUG
log4j.logger.org.apache.activemq.transport.amqp.FRAMES=INFO
log4j.logger.org.fusesource=INFO
# Configure various level of detail for Qpid JMS logs.
log4j.logger.org.apache.qpid.jms=INFO
log4j.logger.org.apache.qpid.jms.provider=INFO
log4j.logger.org.apache.qpid.jms.provider.amqp=INFO
log4j.logger.org.apache.qpid.jms.provider.amqp.FRAMES=INFO
# Console will only display warnnings
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d | %-5p | %t | %m%n
log4j.appender.console.layout.ConversionPattern=%d [%-15.15t] - %-5p %-25.30c{1} - %m%n
log4j.appender.console.threshold=TRACE
# File appender will contain all info messages
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n
log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] - %-5p %-30.30c{1} - %m%n
log4j.appender.file.file=target/test.log
log4j.appender.file.append=true

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.junit;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.internal.runners.statements.FailOnTimeout;
import org.junit.runners.BlockJUnit4ClassRunner;
import org.junit.runners.model.FrameworkMethod;
import org.junit.runners.model.InitializationError;
import org.junit.runners.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A Custom JUnit test runner for customizing JUnit tests run in ActiveMQ.
*/
public class ActiveMQTestRunner extends BlockJUnit4ClassRunner {
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQTestRunner.class);
public ActiveMQTestRunner(Class<?> klass) throws InitializationError {
super(klass);
}
@Override
protected Statement methodBlock(final FrameworkMethod method) {
Statement statement = super.methodBlock(method);
// Check for repeats needed
statement = withPotentialRepeat(method, statement);
return statement;
}
/**
* Perform the same logic as
* {@link BlockJUnit4ClassRunner#withPotentialTimeout(FrameworkMethod, Object, Statement)}
* but with additional support for changing the coded timeout with an extended value.
*
* @return either a {@link FailOnTimeout}, or the supplied {@link Statement} as appropriate.
*/
@SuppressWarnings("deprecation")
@Override
protected Statement withPotentialTimeout(FrameworkMethod frameworkMethod, Object testInstance, Statement next) {
long testTimeout = getOriginalTimeout(frameworkMethod);
if (testTimeout > 0) {
String multiplierString = System.getProperty("org.apache.activemq.junit.testTimeoutMultiplier");
double multiplier = 0.0;
try {
multiplier = Double.parseDouble(multiplierString);
} catch (NullPointerException npe) {
} catch (NumberFormatException nfe) {
LOG.warn("Ignoring testTimeoutMultiplier not set to a valid value: " + multiplierString);
}
if (multiplier > 0.0) {
LOG.info("Test timeout multiple {} applied to test timeout {}ms: new timeout = {}",
multiplier, testTimeout, (long) (testTimeout * multiplier));
testTimeout = (long) (testTimeout * multiplier);
}
next = FailOnTimeout.builder().
withTimeout(testTimeout, TimeUnit.MILLISECONDS).build(next);
} else {
next = super.withPotentialTimeout(frameworkMethod, testInstance, next);
}
return next;
}
/**
* Check for the presence of a {@link Repeat} annotation and return a {@link RepeatStatement}
* to handle executing the test repeated or the original value if not repeating.
*
* @return either a {@link RepeatStatement}, or the supplied {@link Statement} as appropriate.
*/
protected Statement withPotentialRepeat(FrameworkMethod frameworkMethod, Statement next) {
Repeat repeatAnnotation = frameworkMethod.getAnnotation(Repeat.class);
if (repeatAnnotation != null) {
next = RepeatStatement.builder().build(repeatAnnotation, next);
}
return next;
}
/**
* Retrieve the original JUnit {@code timeout} from the {@link Test @Test}
* annotation on the incoming {@linkplain FrameworkMethod test method}.
*
* @return the timeout, or {@code 0} if none was specified
*/
protected long getOriginalTimeout(FrameworkMethod frameworkMethod) {
Test test = frameworkMethod.getAnnotation(Test.class);
return (test != null && test.timeout() > 0 ? test.timeout() : 0);
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.junit;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* A Custom Test annotation used to repeat a troublesome test multiple
* times when attempting to reproduce an intermittent failure.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ java.lang.annotation.ElementType.METHOD })
public @interface Repeat {
int repetitions() default 1;
boolean untilFailure() default false;
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.junit;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
/**
* Test rule used to allow a test to have the Repeat annotation applied.
*/
public class RepeatRule implements TestRule {
@Override
public Statement apply(Statement statement, Description description) {
Repeat repeat = description.getAnnotation(Repeat.class);
if (repeat != null) {
statement = RepeatStatement.builder().build(repeat, statement);
}
return statement;
}
}

View File

@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.junit;
import org.junit.runners.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class RepeatStatement extends Statement {
private static final Logger LOG = LoggerFactory.getLogger(RepeatStatement.class);
private final int repetitions;
private final boolean untilFailure;
private final Statement statement;
public static Builder builder() {
return new Builder();
}
public RepeatStatement(int times, boolean untilFailure, Statement statement) {
this.repetitions = times;
this.untilFailure = untilFailure;
this.statement = statement;
}
protected RepeatStatement(Builder builder, Statement next) {
this.repetitions = builder.getRepetitions();
this.untilFailure = builder.isUntilFailure();
this.statement = next;
}
@Override
public void evaluate() throws Throwable {
for (int i = 0; i < repetitions && !untilFailure; i++) {
if (untilFailure) {
LOG.info("Running test iteration: {}.", i + 1);
} else {
LOG.info("Running test iteration: {} of configured repetitions: {}", i + 1, repetitions);
}
statement.evaluate();
}
}
/**
* Builder for {@link Repeat}.
*/
public static class Builder {
private int repetitions = 1;
private boolean untilFailure = false;
protected Builder() {}
/**
* Specifies the number of times to run the test.
*
* @param repetitions
* The number of times to run the test.
*
* @return {@code this} for method chaining.
*/
public Builder withRepetitions(int repetitions) {
if (repetitions <= 0) {
throw new IllegalArgumentException("repetitions must be greater than zero");
}
this.repetitions = repetitions;
return this;
}
/**
* Specifies the number of times to run the test.
*
* @param untilFailure
* true if the test should run until a failure occurs.
*
* @return {@code this} for method chaining.
*/
public Builder withRunUntilFailure(boolean untilFailure) {
this.untilFailure = untilFailure;
return this;
}
protected int getRepetitions() {
return repetitions;
}
protected boolean isUntilFailure() {
return untilFailure;
}
/**
* Builds a {@link RepeatStatement} instance using the values in this builder.
*
* @param next
* The statement instance to wrap with the newly create repeat statement.
*
* @return a new {@link RepeatStatement} that wraps the given {@link Statement}.
*/
public RepeatStatement build(Statement next) {
if (next == null) {
throw new NullPointerException("statement cannot be null");
}
return new RepeatStatement(this, next);
}
/**
* Builds a {@link RepeatStatement} instance using the values in this builder.
*
* @param annotation
* The {@link Repeat} annotation that triggered this statement being created.
* @param next
* The statement instance to wrap with the newly create repeat statement.
*
* @return a new {@link RepeatStatement} that wraps the given {@link Statement}.
*/
public RepeatStatement build(Repeat annotation, Statement next) {
if (next == null) {
throw new NullPointerException("statement cannot be null");
}
if (annotation == null) {
throw new NullPointerException("annotation cannot be null");
}
withRepetitions(annotation.repetitions());
withRunUntilFailure(annotation.untilFailure());
return new RepeatStatement(this, next);
}
}
}