Add test and supporting enhancements to the JUnit module to allow for
easy repetition of tests that don't always fail reliably
This commit is contained in:
Timothy Bish 2016-05-26 18:02:59 -04:00
parent 3c5c5779e2
commit 6ae169e275
9 changed files with 455 additions and 6 deletions

View File

@ -103,6 +103,11 @@
<artifactId>activemq-leveldb-store</artifactId> <artifactId>activemq-leveldb-store</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.activemq.tooling</groupId>
<artifactId>activemq-junit</artifactId>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>
<artifactId>junit</artifactId> <artifactId>junit</artifactId>

View File

@ -154,8 +154,11 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
public void flow() throws Exception { public void flow() throws Exception {
int updatedCredit = getEndpoint().getCredit(); int updatedCredit = getEndpoint().getCredit();
LOG.trace("Flow: drain={} credit={}, remoteCredit={}", if (LOG.isTraceEnabled()) {
getEndpoint().getDrain(), getEndpoint().getCredit(), getEndpoint().getRemoteCredit()); LOG.trace("Flow: currentCredit={}, draining={}, drain={} credit={}, remoteCredit={}, queued={}",
currentCredit, draining, getEndpoint().getDrain(),
getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued());
}
if (getEndpoint().getDrain() && (updatedCredit != currentCredit || !draining)) { if (getEndpoint().getDrain() && (updatedCredit != currentCredit || !draining)) {
currentCredit = updatedCredit >= 0 ? updatedCredit : 0; currentCredit = updatedCredit >= 0 ? updatedCredit : 0;
@ -166,6 +169,9 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
control.setConsumerId(getConsumerId()); control.setConsumerId(getConsumerId());
control.setDestination(getDestination()); control.setDestination(getDestination());
control.setPrefetch(0); control.setPrefetch(0);
LOG.trace("Flow: Pull case -> consumer control with prefetch (0)");
sendToActiveMQ(control, null); sendToActiveMQ(control, null);
// Now request dispatch of the drain amount, we request immediate // Now request dispatch of the drain amount, we request immediate
@ -177,6 +183,9 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
pullRequest.setTimeout(-1); pullRequest.setTimeout(-1);
pullRequest.setAlwaysSignalDone(true); pullRequest.setAlwaysSignalDone(true);
pullRequest.setQuantity(currentCredit); pullRequest.setQuantity(currentCredit);
LOG.trace("Pull case -> consumer pull request quantity = {}", currentCredit);
sendToActiveMQ(pullRequest, null); sendToActiveMQ(pullRequest, null);
} else if (updatedCredit != currentCredit) { } else if (updatedCredit != currentCredit) {
currentCredit = updatedCredit >= 0 ? updatedCredit : 0; currentCredit = updatedCredit >= 0 ? updatedCredit : 0;
@ -184,7 +193,12 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
control.setConsumerId(getConsumerId()); control.setConsumerId(getConsumerId());
control.setDestination(getDestination()); control.setDestination(getDestination());
control.setPrefetch(currentCredit); control.setPrefetch(currentCredit);
LOG.trace("Flow: update -> consumer control with prefetch (0)");
sendToActiveMQ(control, null); sendToActiveMQ(control, null);
} else {
LOG.trace("Flow: no credit change -> no broker updates needed");
} }
} }
@ -403,6 +417,12 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
draining = false; draining = false;
currentCredit = 0; currentCredit = 0;
} else { } else {
if (LOG.isTraceEnabled()) {
LOG.trace("Sender:[{}] msgId={} currentCredit={}, draining={}, drain={} credit={}, remoteCredit={}, queued={}",
getEndpoint().getName(), jms.getJMSMessageID(), currentCredit, draining, getEndpoint().getDrain(),
getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued());
}
jms.setRedeliveryCounter(md.getRedeliveryCounter()); jms.setRedeliveryCounter(md.getRedeliveryCounter());
jms.setReadOnlyBody(true); jms.setReadOnlyBody(true);
final EncodedMessage amqp = outboundTransformer.transform(jms); final EncodedMessage amqp = outboundTransformer.transform(jms);

View File

@ -182,7 +182,7 @@ public class JMSClientContext {
factory.setUsername(username); factory.setUsername(username);
factory.setPassword(password); factory.setPassword(password);
factory.setAlwaysSyncSend(syncPublish); factory.setForceSyncSend(syncPublish);
factory.setTopicPrefix("topic://"); factory.setTopicPrefix("topic://");
factory.setQueuePrefix("queue://"); factory.setQueuePrefix("queue://");

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import java.util.Enumeration;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.junit.ActiveMQTestRunner;
import org.apache.activemq.junit.Repeat;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests for various QueueBrowser scenarios with an AMQP JMS client.
*/
@RunWith(ActiveMQTestRunner.class)
public class JMSQueueBrowserTest extends JMSClientTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(JMSClientTest.class);
@Test(timeout = 60000)
@Repeat(repetitions = 1)
public void testBrowseAllInQueueZeroPrefetch() throws Exception {
final int MSG_COUNT = 5;
JmsConnectionFactory cf = new JmsConnectionFactory(getBrokerURI() + "?jms.prefetchPolicy.all=0");
connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
Queue queue = session.createQueue(getDestinationName());
sendMessages(name.getMethodName(), MSG_COUNT, false);
QueueViewMBean proxy = getProxyToQueue(getDestinationName());
assertEquals(MSG_COUNT, proxy.getQueueSize());
QueueBrowser browser = session.createBrowser(queue);
assertNotNull(browser);
Enumeration<?> enumeration = browser.getEnumeration();
int count = 0;
while (count < MSG_COUNT && enumeration.hasMoreElements()) {
Message msg = (Message) enumeration.nextElement();
assertNotNull(msg);
LOG.debug("Recv: {}", msg);
count++;
}
LOG.debug("Received all expected message, checking that hasMoreElements returns false");
assertFalse(enumeration.hasMoreElements());
assertEquals(5, count);
}
@Override
protected boolean isUseOpenWireConnector() {
return true;
}
}

View File

@ -20,21 +20,25 @@
# #
log4j.rootLogger=WARN, console, file log4j.rootLogger=WARN, console, file
log4j.logger.org.apache.activemq=INFO log4j.logger.org.apache.activemq=INFO
log4j.logger.org.apache.activemq.broker.scheduler=TRACE
log4j.logger.org.apache.activemq.transport.amqp=DEBUG log4j.logger.org.apache.activemq.transport.amqp=DEBUG
log4j.logger.org.apache.activemq.transport.amqp.FRAMES=INFO log4j.logger.org.apache.activemq.transport.amqp.FRAMES=INFO
log4j.logger.org.fusesource=INFO log4j.logger.org.fusesource=INFO
# Configure various level of detail for Qpid JMS logs.
log4j.logger.org.apache.qpid.jms=INFO log4j.logger.org.apache.qpid.jms=INFO
log4j.logger.org.apache.qpid.jms.provider=INFO
log4j.logger.org.apache.qpid.jms.provider.amqp=INFO
log4j.logger.org.apache.qpid.jms.provider.amqp.FRAMES=INFO
# Console will only display warnnings # Console will only display warnnings
log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d | %-5p | %t | %m%n log4j.appender.console.layout.ConversionPattern=%d [%-15.15t] - %-5p %-25.30c{1} - %m%n
log4j.appender.console.threshold=TRACE log4j.appender.console.threshold=TRACE
# File appender will contain all info messages # File appender will contain all info messages
log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d | %-5p | %m | %c | %t%n log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] - %-5p %-30.30c{1} - %m%n
log4j.appender.file.file=target/test.log log4j.appender.file.file=target/test.log
log4j.appender.file.append=true log4j.appender.file.append=true

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.junit;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.junit.internal.runners.statements.FailOnTimeout;
import org.junit.runners.BlockJUnit4ClassRunner;
import org.junit.runners.model.FrameworkMethod;
import org.junit.runners.model.InitializationError;
import org.junit.runners.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A Custom JUnit test runner for customizing JUnit tests run in ActiveMQ.
*/
public class ActiveMQTestRunner extends BlockJUnit4ClassRunner {
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQTestRunner.class);
public ActiveMQTestRunner(Class<?> klass) throws InitializationError {
super(klass);
}
@Override
protected Statement methodBlock(final FrameworkMethod method) {
Statement statement = super.methodBlock(method);
// Check for repeats needed
statement = withPotentialRepeat(method, statement);
return statement;
}
/**
* Perform the same logic as
* {@link BlockJUnit4ClassRunner#withPotentialTimeout(FrameworkMethod, Object, Statement)}
* but with additional support for changing the coded timeout with an extended value.
*
* @return either a {@link FailOnTimeout}, or the supplied {@link Statement} as appropriate.
*/
@SuppressWarnings("deprecation")
@Override
protected Statement withPotentialTimeout(FrameworkMethod frameworkMethod, Object testInstance, Statement next) {
long testTimeout = getOriginalTimeout(frameworkMethod);
if (testTimeout > 0) {
String multiplierString = System.getProperty("org.apache.activemq.junit.testTimeoutMultiplier");
double multiplier = 0.0;
try {
multiplier = Double.parseDouble(multiplierString);
} catch (NullPointerException npe) {
} catch (NumberFormatException nfe) {
LOG.warn("Ignoring testTimeoutMultiplier not set to a valid value: " + multiplierString);
}
if (multiplier > 0.0) {
LOG.info("Test timeout multiple {} applied to test timeout {}ms: new timeout = {}",
multiplier, testTimeout, (long) (testTimeout * multiplier));
testTimeout = (long) (testTimeout * multiplier);
}
next = FailOnTimeout.builder().
withTimeout(testTimeout, TimeUnit.MILLISECONDS).build(next);
} else {
next = super.withPotentialTimeout(frameworkMethod, testInstance, next);
}
return next;
}
/**
* Check for the presence of a {@link Repeat} annotation and return a {@link RepeatStatement}
* to handle executing the test repeated or the original value if not repeating.
*
* @return either a {@link RepeatStatement}, or the supplied {@link Statement} as appropriate.
*/
protected Statement withPotentialRepeat(FrameworkMethod frameworkMethod, Statement next) {
Repeat repeatAnnotation = frameworkMethod.getAnnotation(Repeat.class);
if (repeatAnnotation != null) {
next = RepeatStatement.builder().build(repeatAnnotation, next);
}
return next;
}
/**
* Retrieve the original JUnit {@code timeout} from the {@link Test @Test}
* annotation on the incoming {@linkplain FrameworkMethod test method}.
*
* @return the timeout, or {@code 0} if none was specified
*/
protected long getOriginalTimeout(FrameworkMethod frameworkMethod) {
Test test = frameworkMethod.getAnnotation(Test.class);
return (test != null && test.timeout() > 0 ? test.timeout() : 0);
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.junit;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* A Custom Test annotation used to repeat a troublesome test multiple
* times when attempting to reproduce an intermittent failure.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ java.lang.annotation.ElementType.METHOD })
public @interface Repeat {
int repetitions() default 1;
boolean untilFailure() default false;
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.junit;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
/**
* Test rule used to allow a test to have the Repeat annotation applied.
*/
public class RepeatRule implements TestRule {
@Override
public Statement apply(Statement statement, Description description) {
Repeat repeat = description.getAnnotation(Repeat.class);
if (repeat != null) {
statement = RepeatStatement.builder().build(repeat, statement);
}
return statement;
}
}

View File

@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.junit;
import org.junit.runners.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class RepeatStatement extends Statement {
private static final Logger LOG = LoggerFactory.getLogger(RepeatStatement.class);
private final int repetitions;
private final boolean untilFailure;
private final Statement statement;
public static Builder builder() {
return new Builder();
}
public RepeatStatement(int times, boolean untilFailure, Statement statement) {
this.repetitions = times;
this.untilFailure = untilFailure;
this.statement = statement;
}
protected RepeatStatement(Builder builder, Statement next) {
this.repetitions = builder.getRepetitions();
this.untilFailure = builder.isUntilFailure();
this.statement = next;
}
@Override
public void evaluate() throws Throwable {
for (int i = 0; i < repetitions && !untilFailure; i++) {
if (untilFailure) {
LOG.info("Running test iteration: {}.", i + 1);
} else {
LOG.info("Running test iteration: {} of configured repetitions: {}", i + 1, repetitions);
}
statement.evaluate();
}
}
/**
* Builder for {@link Repeat}.
*/
public static class Builder {
private int repetitions = 1;
private boolean untilFailure = false;
protected Builder() {}
/**
* Specifies the number of times to run the test.
*
* @param repetitions
* The number of times to run the test.
*
* @return {@code this} for method chaining.
*/
public Builder withRepetitions(int repetitions) {
if (repetitions <= 0) {
throw new IllegalArgumentException("repetitions must be greater than zero");
}
this.repetitions = repetitions;
return this;
}
/**
* Specifies the number of times to run the test.
*
* @param untilFailure
* true if the test should run until a failure occurs.
*
* @return {@code this} for method chaining.
*/
public Builder withRunUntilFailure(boolean untilFailure) {
this.untilFailure = untilFailure;
return this;
}
protected int getRepetitions() {
return repetitions;
}
protected boolean isUntilFailure() {
return untilFailure;
}
/**
* Builds a {@link RepeatStatement} instance using the values in this builder.
*
* @param next
* The statement instance to wrap with the newly create repeat statement.
*
* @return a new {@link RepeatStatement} that wraps the given {@link Statement}.
*/
public RepeatStatement build(Statement next) {
if (next == null) {
throw new NullPointerException("statement cannot be null");
}
return new RepeatStatement(this, next);
}
/**
* Builds a {@link RepeatStatement} instance using the values in this builder.
*
* @param annotation
* The {@link Repeat} annotation that triggered this statement being created.
* @param next
* The statement instance to wrap with the newly create repeat statement.
*
* @return a new {@link RepeatStatement} that wraps the given {@link Statement}.
*/
public RepeatStatement build(Repeat annotation, Statement next) {
if (next == null) {
throw new NullPointerException("statement cannot be null");
}
if (annotation == null) {
throw new NullPointerException("annotation cannot be null");
}
withRepetitions(annotation.repetitions());
withRunUntilFailure(annotation.untilFailure());
return new RepeatStatement(this, next);
}
}
}