HADOOP-14696. parallel tests don't work for Windows. Contributed by Allen Wittenauer

This commit is contained in:
Chris Douglas 2018-03-12 19:47:42 -07:00
parent 19292bc264
commit 45d1b0fdcc
5 changed files with 161 additions and 85 deletions

View File

@ -979,30 +979,13 @@
<build> <build>
<plugins> <plugins>
<plugin> <plugin>
<artifactId>maven-antrun-plugin</artifactId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-maven-plugins</artifactId>
<executions> <executions>
<execution> <execution>
<id>create-parallel-tests-dirs</id> <id>parallel-tests-createdir</id>
<phase>test-compile</phase>
<configuration>
<target>
<script language="javascript"><![CDATA[
var baseDirs = [
"${test.build.data}",
"${test.build.dir}",
"${hadoop.tmp.dir}" ];
for (var i in baseDirs) {
for (var j = 1; j <= ${testsThreadCount}; ++j) {
var mkdir = project.createTask("mkdir");
mkdir.setDir(new java.io.File(baseDirs[i], j));
mkdir.perform();
}
}
]]></script>
</target>
</configuration>
<goals> <goals>
<goal>run</goal> <goal>parallel-tests-createdir</goal>
</goals> </goals>
</execution> </execution>
</executions> </executions>
@ -1015,6 +998,7 @@
<reuseForks>false</reuseForks> <reuseForks>false</reuseForks>
<argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine> <argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine>
<systemPropertyVariables> <systemPropertyVariables>
<testsThreadCount>${testsThreadCount}</testsThreadCount>
<test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data> <test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
<test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir> <test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
<hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir> <hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>

View File

@ -286,7 +286,7 @@ public abstract class GenericTestUtils {
public static void assertExists(File f) { public static void assertExists(File f) {
Assert.assertTrue("File " + f + " should exist", f.exists()); Assert.assertTrue("File " + f + " should exist", f.exists());
} }
/** /**
* List all of the files in 'dir' that match the regex 'pattern'. * List all of the files in 'dir' that match the regex 'pattern'.
* Then check that this list is identical to 'expectedMatches'. * Then check that this list is identical to 'expectedMatches'.
@ -294,7 +294,7 @@ public abstract class GenericTestUtils {
*/ */
public static void assertGlobEquals(File dir, String pattern, public static void assertGlobEquals(File dir, String pattern,
String ... expectedMatches) throws IOException { String ... expectedMatches) throws IOException {
Set<String> found = Sets.newTreeSet(); Set<String> found = Sets.newTreeSet();
for (File f : FileUtil.listFiles(dir)) { for (File f : FileUtil.listFiles(dir)) {
if (f.getName().matches(pattern)) { if (f.getName().matches(pattern)) {
@ -349,7 +349,7 @@ public abstract class GenericTestUtils {
StringUtils.stringifyException(t)), StringUtils.stringifyException(t)),
t); t);
} }
} }
/** /**
* Wait for the specified test to return true. The test will be performed * Wait for the specified test to return true. The test will be performed
@ -499,18 +499,18 @@ public abstract class GenericTestUtils {
*/ */
public static class DelayAnswer implements Answer<Object> { public static class DelayAnswer implements Answer<Object> {
private final Log LOG; private final Log LOG;
private final CountDownLatch fireLatch = new CountDownLatch(1); private final CountDownLatch fireLatch = new CountDownLatch(1);
private final CountDownLatch waitLatch = new CountDownLatch(1); private final CountDownLatch waitLatch = new CountDownLatch(1);
private final CountDownLatch resultLatch = new CountDownLatch(1); private final CountDownLatch resultLatch = new CountDownLatch(1);
private final AtomicInteger fireCounter = new AtomicInteger(0); private final AtomicInteger fireCounter = new AtomicInteger(0);
private final AtomicInteger resultCounter = new AtomicInteger(0); private final AtomicInteger resultCounter = new AtomicInteger(0);
// Result fields set after proceed() is called. // Result fields set after proceed() is called.
private volatile Throwable thrown; private volatile Throwable thrown;
private volatile Object returnValue; private volatile Object returnValue;
public DelayAnswer(Log log) { public DelayAnswer(Log log) {
this.LOG = log; this.LOG = log;
} }
@ -521,7 +521,7 @@ public abstract class GenericTestUtils {
public void waitForCall() throws InterruptedException { public void waitForCall() throws InterruptedException {
fireLatch.await(); fireLatch.await();
} }
/** /**
* Tell the method to proceed. * Tell the method to proceed.
* This should only be called after waitForCall() * This should only be called after waitForCall()
@ -529,7 +529,7 @@ public abstract class GenericTestUtils {
public void proceed() { public void proceed() {
waitLatch.countDown(); waitLatch.countDown();
} }
@Override @Override
public Object answer(InvocationOnMock invocation) throws Throwable { public Object answer(InvocationOnMock invocation) throws Throwable {
LOG.info("DelayAnswer firing fireLatch"); LOG.info("DelayAnswer firing fireLatch");
@ -558,7 +558,7 @@ public abstract class GenericTestUtils {
resultLatch.countDown(); resultLatch.countDown();
} }
} }
/** /**
* After calling proceed(), this will wait until the call has * After calling proceed(), this will wait until the call has
* completed and a result has been returned to the caller. * completed and a result has been returned to the caller.
@ -566,7 +566,7 @@ public abstract class GenericTestUtils {
public void waitForResult() throws InterruptedException { public void waitForResult() throws InterruptedException {
resultLatch.await(); resultLatch.await();
} }
/** /**
* After the call has gone through, return any exception that * After the call has gone through, return any exception that
* was thrown, or null if no exception was thrown. * was thrown, or null if no exception was thrown.
@ -574,7 +574,7 @@ public abstract class GenericTestUtils {
public Throwable getThrown() { public Throwable getThrown() {
return thrown; return thrown;
} }
/** /**
* After the call has gone through, return the call's return value, * After the call has gone through, return the call's return value,
* or null in case it was void or an exception was thrown. * or null in case it was void or an exception was thrown.
@ -582,20 +582,20 @@ public abstract class GenericTestUtils {
public Object getReturnValue() { public Object getReturnValue() {
return returnValue; return returnValue;
} }
public int getFireCount() { public int getFireCount() {
return fireCounter.get(); return fireCounter.get();
} }
public int getResultCount() { public int getResultCount() {
return resultCounter.get(); return resultCounter.get();
} }
} }
/** /**
* An Answer implementation that simply forwards all calls through * An Answer implementation that simply forwards all calls through
* to a delegate. * to a delegate.
* *
* This is useful as the default Answer for a mock object, to create * This is useful as the default Answer for a mock object, to create
* something like a spy on an RPC proxy. For example: * something like a spy on an RPC proxy. For example:
* <code> * <code>
@ -606,14 +606,14 @@ public abstract class GenericTestUtils {
* ... * ...
* </code> * </code>
*/ */
public static class DelegateAnswer implements Answer<Object> { public static class DelegateAnswer implements Answer<Object> {
private final Object delegate; private final Object delegate;
private final Log log; private final Log log;
public DelegateAnswer(Object delegate) { public DelegateAnswer(Object delegate) {
this(null, delegate); this(null, delegate);
} }
public DelegateAnswer(Log log, Object delegate) { public DelegateAnswer(Log log, Object delegate) {
this.log = log; this.log = log;
this.delegate = delegate; this.delegate = delegate;
@ -653,7 +653,7 @@ public abstract class GenericTestUtils {
this.minSleepTime = minSleepTime; this.minSleepTime = minSleepTime;
this.maxSleepTime = maxSleepTime; this.maxSleepTime = maxSleepTime;
} }
@Override @Override
public Object answer(InvocationOnMock invocation) throws Throwable { public Object answer(InvocationOnMock invocation) throws Throwable {
boolean interrupted = false; boolean interrupted = false;
@ -683,11 +683,11 @@ public abstract class GenericTestUtils {
" but got:\n" + output, " but got:\n" + output,
Pattern.compile(pattern).matcher(output).find()); Pattern.compile(pattern).matcher(output).find());
} }
public static void assertValueNear(long expected, long actual, long allowedError) { public static void assertValueNear(long expected, long actual, long allowedError) {
assertValueWithinRange(expected - allowedError, expected + allowedError, actual); assertValueWithinRange(expected - allowedError, expected + allowedError, actual);
} }
public static void assertValueWithinRange(long expectedMin, long expectedMax, public static void assertValueWithinRange(long expectedMin, long expectedMax,
long actual) { long actual) {
Assert.assertTrue("Expected " + actual + " to be in range (" + expectedMin + "," Assert.assertTrue("Expected " + actual + " to be in range (" + expectedMin + ","
@ -842,4 +842,28 @@ public abstract class GenericTestUtils {
failf(format, args); failf(format, args);
} }
} }
/**
* Retreive the max number of parallel test threads when running under maven.
* @return int number of threads
*/
public static int getTestsThreadCount() {
String propString = System.getProperty("testsThreadCount", "1");
int threadCount = 1;
if (propString != null) {
String trimProp = propString.trim();
if (trimProp.endsWith("C")) {
double multiplier = Double.parseDouble(
trimProp.substring(0, trimProp.length()-1));
double calculated = multiplier * ((double) Runtime
.getRuntime()
.availableProcessors());
threadCount = calculated > 0d ? Math.max((int) calculated, 1) : 0;
} else {
threadCount = Integer.parseInt(trimProp);
}
}
return threadCount;
}
} }

View File

@ -515,30 +515,13 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<build> <build>
<plugins> <plugins>
<plugin> <plugin>
<artifactId>maven-antrun-plugin</artifactId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-maven-plugins</artifactId>
<executions> <executions>
<execution> <execution>
<id>create-parallel-tests-dirs</id> <id>parallel-tests-createdir</id>
<phase>test-compile</phase>
<configuration>
<target>
<script language="javascript"><![CDATA[
var baseDirs = [
"${test.build.data}",
"${test.build.dir}",
"${hadoop.tmp.dir}" ];
for (var i in baseDirs) {
for (var j = 1; j <= ${testsThreadCount}; ++j) {
var mkdir = project.createTask("mkdir");
mkdir.setDir(new java.io.File(baseDirs[i], j));
mkdir.perform();
}
}
]]></script>
</target>
</configuration>
<goals> <goals>
<goal>run</goal> <goal>parallel-tests-createdir</goal>
</goals> </goals>
</execution> </execution>
</executions> </executions>
@ -551,6 +534,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<reuseForks>false</reuseForks> <reuseForks>false</reuseForks>
<argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine> <argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine>
<systemPropertyVariables> <systemPropertyVariables>
<testsThreadCount>${testsThreadCount}</testsThreadCount>
<test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data> <test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
<test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir> <test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
<hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir> <hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>

View File

@ -0,0 +1,100 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.maven.plugin.paralleltests;
import java.io.File;
import org.apache.maven.plugin.AbstractMojo;
import org.apache.maven.plugin.MojoExecutionException;
import org.apache.maven.plugins.annotations.LifecyclePhase;
import org.apache.maven.plugins.annotations.Mojo;
import org.apache.maven.plugins.annotations.Parameter;
/**
* Goal which creates the parallel-test directories.
*/
@Mojo(name="parallel-tests-createdir",
defaultPhase = LifecyclePhase.GENERATE_TEST_RESOURCES)
public class CreateDirsMojo extends AbstractMojo {
/**
* Location of the test.build.dir.
*/
@Parameter(defaultValue="${project.build.directory}/test-dir")
private File testBuildDir;
/**
* Location of the test.build.data.
*/
@Parameter(defaultValue="${project.build.directory}/test-dir")
private File testBuildData;
/**
* Location of the test.build.data.
*/
@Parameter(defaultValue="${project.build.directory}/tmp")
private File hadoopTmpDir;
/**
* Thread count.
*/
@Parameter(defaultValue="${testsThreadCount}")
private String testsThreadCount;
public void execute() throws MojoExecutionException {
int numDirs=getTestsThreadCount();
mkParallelDirs(testBuildDir, numDirs);
mkParallelDirs(testBuildData, numDirs);
mkParallelDirs(hadoopTmpDir, numDirs);
}
/**
* Get the real number of parallel threads.
* @return int number of threads
*/
public int getTestsThreadCount() {
int threadCount = 1;
if (testsThreadCount != null) {
String trimProp = testsThreadCount.trim();
if (trimProp.endsWith("C")) {
double multiplier = Double.parseDouble(
trimProp.substring(0, trimProp.length()-1));
double calculated = multiplier * ((double) Runtime
.getRuntime()
.availableProcessors());
threadCount = calculated > 0d ? Math.max((int) calculated, 1) : 0;
} else {
threadCount = Integer.parseInt(testsThreadCount);
}
}
return threadCount;
}
private void mkParallelDirs(File testDir, int numDirs)
throws MojoExecutionException {
for (int i=1; i<=numDirs; i++) {
File newDir = new File(testDir, String.valueOf(i));
if (!newDir.exists()) {
getLog().info("Creating " + newDir.toString());
if (!newDir.mkdirs()) {
throw new MojoExecutionException("Unable to create "
+ newDir.toString());
}
}
}
}
}

View File

@ -85,30 +85,13 @@
<build> <build>
<plugins> <plugins>
<plugin> <plugin>
<artifactId>maven-antrun-plugin</artifactId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-maven-plugins</artifactId>
<executions> <executions>
<execution> <execution>
<id>create-parallel-tests-dirs</id> <id>parallel-tests-createdir</id>
<phase>test-compile</phase>
<configuration>
<target>
<script language="javascript"><![CDATA[
var baseDirs = [
"${test.build.data}",
"${test.build.dir}",
"${hadoop.tmp.dir}" ];
for (var i in baseDirs) {
for (var j = 1; j <= ${testsThreadCount}; ++j) {
var mkdir = project.createTask("mkdir");
mkdir.setDir(new java.io.File(baseDirs[i], j));
mkdir.perform();
}
}
]]></script>
</target>
</configuration>
<goals> <goals>
<goal>run</goal> <goal>parallel-tests-createdir</goal>
</goals> </goals>
</execution> </execution>
</executions> </executions>
@ -121,6 +104,7 @@
<reuseForks>false</reuseForks> <reuseForks>false</reuseForks>
<argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine> <argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine>
<systemPropertyVariables> <systemPropertyVariables>
<testsThreadCount>${testsThreadCount}</testsThreadCount>
<test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data> <test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
<test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir> <test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
<hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir> <hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>