HADOOP-13716. Add LambdaTestUtils class for tests; fix eventual consistency problem in contract test setup. Contributed by Steve Loughran.

This commit is contained in:
Anu Engineer 2016-10-20 12:47:57 -07:00
parent 6f4192d77d
commit 42f8a1d6eb
6 changed files with 707 additions and 70 deletions

View File

@ -27,12 +27,16 @@
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.test.LambdaTestUtils;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.deleteChildren; import static org.apache.hadoop.fs.contract.ContractTestUtils.deleteChildren;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dumpStats;
import static org.apache.hadoop.fs.contract.ContractTestUtils.listChildren; import static org.apache.hadoop.fs.contract.ContractTestUtils.listChildren;
import static org.apache.hadoop.fs.contract.ContractTestUtils.toList; import static org.apache.hadoop.fs.contract.ContractTestUtils.toList;
import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk; import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk;
@ -45,6 +49,7 @@
public abstract class AbstractContractRootDirectoryTest extends AbstractFSContractTestBase { public abstract class AbstractContractRootDirectoryTest extends AbstractFSContractTestBase {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(AbstractContractRootDirectoryTest.class); LoggerFactory.getLogger(AbstractContractRootDirectoryTest.class);
public static final int OBJECTSTORE_RETRY_TIMEOUT = 30000;
@Override @Override
public void setup() throws Exception { public void setup() throws Exception {
@ -79,23 +84,34 @@ public void testRmEmptyRootDirNonRecursive() throws Throwable {
// extra sanity checks here to avoid support calls about complete loss // extra sanity checks here to avoid support calls about complete loss
// of data // of data
skipIfUnsupported(TEST_ROOT_TESTS_ENABLED); skipIfUnsupported(TEST_ROOT_TESTS_ENABLED);
Path root = new Path("/"); final Path root = new Path("/");
assertIsDirectory(root); assertIsDirectory(root);
// make sure it is clean // make sure the directory is clean. This includes some retry logic
FileSystem fs = getFileSystem(); // to forgive blobstores whose listings can be out of sync with the file
deleteChildren(fs, root, true); // status;
final FileSystem fs = getFileSystem();
final AtomicInteger iterations = new AtomicInteger(0);
final FileStatus[] originalChildren = listChildren(fs, root);
LambdaTestUtils.evaluate(
new Callable<Void>() {
@Override
public Void call() throws Exception {
FileStatus[] deleted = deleteChildren(fs, root, true);
FileStatus[] children = listChildren(fs, root); FileStatus[] children = listChildren(fs, root);
if (children.length > 0) { if (children.length > 0) {
StringBuilder error = new StringBuilder(); fail(String.format(
error.append("Deletion of child entries failed, still have") "After %d attempts: listing after rm /* not empty"
.append(children.length) + "\n%s\n%s\n%s",
.append(System.lineSeparator()); iterations.incrementAndGet(),
for (FileStatus child : children) { dumpStats("final", children),
error.append(" ").append(child.getPath()) dumpStats("deleted", deleted),
.append(System.lineSeparator()); dumpStats("original", originalChildren)));
} }
fail(error.toString()); return null;
} }
},
OBJECTSTORE_RETRY_TIMEOUT,
new LambdaTestUtils.LinearRetryInterval(50, 1000));
// then try to delete the empty one // then try to delete the empty one
boolean deleted = fs.delete(root, false); boolean deleted = fs.delete(root, false);
LOG.info("rm / of empty dir result is {}", deleted); LOG.info("rm / of empty dir result is {}", deleted);

View File

@ -400,18 +400,18 @@ public static void rejectRootOperation(Path path) throws IOException {
* @param fileSystem filesystem * @param fileSystem filesystem
* @param path path to delete * @param path path to delete
* @param recursive flag to indicate child entry deletion should be recursive * @param recursive flag to indicate child entry deletion should be recursive
* @return the number of child entries found and deleted (not including * @return the immediate child entries found and deleted (not including
* any recursive children of those entries) * any recursive children of those entries)
* @throws IOException problem in the deletion process. * @throws IOException problem in the deletion process.
*/ */
public static int deleteChildren(FileSystem fileSystem, public static FileStatus[] deleteChildren(FileSystem fileSystem,
Path path, Path path,
boolean recursive) throws IOException { boolean recursive) throws IOException {
FileStatus[] children = listChildren(fileSystem, path); FileStatus[] children = listChildren(fileSystem, path);
for (FileStatus entry : children) { for (FileStatus entry : children) {
fileSystem.delete(entry.getPath(), recursive); fileSystem.delete(entry.getPath(), recursive);
} }
return children.length; return children;
} }
/** /**

View File

@ -0,0 +1,428 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.test;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.util.Time;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeoutException;
/**
* Class containing methods and associated classes to make the most of Lambda
* expressions in Hadoop tests.
*
* The code has been designed from the outset to be Java-8 friendly, but still
* be usable in Java 7.
* In particular: support for waiting for a condition to be met.
* This is to avoid tests having hard-coded sleeps in them.
*
* The code is modelled on {@code GenericTestUtils#waitFor(Supplier, int, int)},
* but also lifts concepts from Scalatest's {@code awaitResult} and
* its notion of pluggable retry logic (simple, backoff, maybe even things
* with jitter: test author gets to choose).
* The {@code intercept} method is also all credit due Scalatest, though
* it's been extended to also support a string message check; useful when
* checking the contents of the exception.
*/
public final class LambdaTestUtils {
public static final Logger LOG = LoggerFactory.getLogger(LambdaTestUtils.class);
private LambdaTestUtils() {
}
/**
* This is the string included in the assertion text in
* {@link #intercept(Class, Callable)} if
* the closure returned a null value.
*/
public static final String NULL_RESULT = "(null)";
/**
* Interface to implement for converting a timeout into some form
* of exception to raise.
*/
public interface TimeoutHandler {
/**
* Create an exception (or throw one, if desired).
* @param timeoutMillis timeout which has arisen
* @param caught any exception which was caught; may be null
* @return an exception which will then be thrown
* @throws Exception if the handler wishes to raise an exception
* that way.
*/
Exception evaluate(int timeoutMillis, Exception caught) throws Exception;
}
/**
* Wait for a condition to be met.
* @param check predicate to evaluate
* @param timeoutMillis timeout in milliseconds.
* Can be zero, in which case only one attempt is made.
* @param retry retry escalation logic
* @param failure handler invoked on failure; the returned exception
* will be thrown
* @return the number of iterations before the condition was satisfied
* @throws Exception returned by {@code failure} on timeout
* @throws FailFastException immediately if the evaluated operation raises it
* @throws InterruptedException if interrupted.
*/
public static int eventually(Callable<Boolean> check,
int timeoutMillis,
Callable<Integer> retry,
TimeoutHandler failure)
throws Exception {
Preconditions.checkArgument(timeoutMillis >= 0,
"timeoutMillis must be > 0");
long endTime = Time.now() + timeoutMillis;
Exception ex = null;
boolean running = true;
int iterations = 0;
while (running) {
iterations++;
try {
if (check.call()) {
return iterations;
}
} catch (InterruptedException | FailFastException e) {
throw e;
} catch (Exception e) {
LOG.debug("eventually() iteration {}", iterations, e);
ex = e;
}
running = Time.now() < endTime;
if (running) {
int sleeptime = retry.call();
if (sleeptime >= 0) {
Thread.sleep(sleeptime);
} else {
running = false;
}
}
}
// timeout
throw failure.evaluate(timeoutMillis, ex);
}
/**
* Simplified {@code eventually()} clause; fixed interval
* and {@link GenerateTimeout} used to generate the timeout.
* @param check predicate to evaluate
* @param timeoutMillis timeout in milliseconds.
* Can be zero, in which case only one attempt is made.
* @param intervalMillis interval in milliseconds between checks
* @return the number of iterations before the condition was satisfied
* @throws Exception returned by {@code failure} on timeout
* @throws FailFastException immediately if the evaluated operation raises it
* @throws InterruptedException if interrupted.
*/
public static int eventually(Callable<Boolean> check,
int timeoutMillis,
int intervalMillis) throws Exception {
return eventually(check,
timeoutMillis,
new FixedRetryInterval(intervalMillis),
new GenerateTimeout());
}
/**
* Await a result; exceptions are caught and, with one exception,
* trigger a sleep and retry. This is similar of ScalaTest's
* {@code Await.result()} operation, though that lacks the ability to
* fail fast if the inner closure has determined that a failure condition
* is non-recoverable.
* @param eval expression to evaluate
* @param timeoutMillis timeout in milliseconds.
* Can be zero, in which case only one attempt is made.
* @param retry retry interval generator
* @param <T> return type
* @return result of the first successful eval call
* @throws Exception the last exception thrown before timeout was triggered
* @throws FailFastException if raised -without any retry attempt.
* @throws InterruptedException if interrupted during the sleep operation.
*/
public static <T> T evaluate(Callable<T> eval,
int timeoutMillis,
Callable<Integer> retry) throws Exception {
Preconditions.checkArgument(timeoutMillis >= 0,
"timeoutMillis must be >= 0");
long endTime = Time.now() + timeoutMillis;
Exception ex;
boolean running;
int sleeptime;
int iterations = 0;
do {
iterations++;
try {
return eval.call();
} catch (InterruptedException | FailFastException e) {
throw e;
} catch (Exception e) {
LOG.debug("evaluate() iteration {}", iterations, e);
ex = e;
}
running = Time.now() < endTime;
if (running && (sleeptime = retry.call()) >= 0) {
Thread.sleep(sleeptime);
}
} while (running);
// timeout. Throw the last exception raised
throw ex;
}
/**
* Simplified {@code evaluate()} clause; fixed interval.
* @param check predicate to evaluate
* @param timeoutMillis wait interval between check failures
* @param intervalMillis interval in milliseconds
* @return result of the first successful eval call
* @throws Exception the last exception thrown before timeout was triggered
* @throws FailFastException if raised -without any retry attempt.
* @throws InterruptedException if interrupted during the sleep operation.
*/
public static <T> T evaluate(Callable<T> eval,
int timeoutMillis,
int intervalMillis) throws Exception {
return evaluate(eval,
timeoutMillis,
new FixedRetryInterval(intervalMillis));
}
/**
* Intercept an exception; raise an exception if it was not raised.
* Exceptions of the wrong class are also rethrown.
* @param clazz class of exception; the raised exception must be this class
* <i>or a subclass</i>.
* @param eval expression to eval
* @param <T> return type of expression
* @param <E> exception class
* @return the caught exception if it was of the expected type
* @throws Exception any other exception raised
* @throws AssertionError if the evaluation call didn't raise an exception.
* The error includes the {@code toString()} value of the result, if this
* can be determined.
*/
public static <T, E extends Throwable> E intercept(
Class<E> clazz,
Callable<T> eval)
throws Exception {
try {
T result = eval.call();
throw new AssertionError("Expected an exception, got "
+ robustToString(result));
} catch (Throwable e) {
if (clazz.isAssignableFrom(e.getClass())) {
return (E)e;
} else {
throw e;
}
}
}
/**
* Intercept an exception; raise an exception if it was not raised.
* Exceptions of the wrong class are also rethrown.
* @param clazz class of exception; the raised exception must be this class
* <i>or a subclass</i>.
* @param contained string which must be in the {@code toString()} value
* of the exception
* @param eval expression to eval
* @param <T> return type of expression
* @param <E> exception class
* @return the caught exception if it was of the expected type and contents
* @throws Exception any other exception raised
* @throws AssertionError if the evaluation call didn't raise an exception.
* The error includes the {@code toString()} value of the result, if this
* can be determined.
*/
public static <T, E extends Throwable> E intercept(
Class<E> clazz,
String contained,
Callable<T> eval)
throws Exception {
E ex = intercept(clazz, eval);
GenericTestUtils.assertExceptionContains(contained, ex);
return ex;
}
/**
* Robust string converter for exception messages; if the {@code toString()}
* method throws an exception then that exception is caught and logged,
* then a simple string of the classname logged.
* This stops a toString() failure hiding underlying problems in the code.
* @param o object to stringify
* @return a string for exception messages
*/
private static String robustToString(Object o) {
if (o == null) {
return NULL_RESULT;
} else {
try {
return o.toString();
} catch (Exception e) {
LOG.info("Exception calling toString()", e);
return o.getClass().toString();
}
}
}
/**
* Returns {@code TimeoutException} on a timeout. If
* there was a inner class passed in, includes it as the
* inner failure.
*/
public static class GenerateTimeout implements TimeoutHandler {
private final String message;
public GenerateTimeout(String message) {
this.message = message;
}
public GenerateTimeout() {
this("timeout");
}
/**
* Evaluate by creating a new Timeout Exception.
* @param timeoutMillis timeout in millis
* @param caught optional caught exception
* @return TimeoutException
*/
@Override
public Exception evaluate(int timeoutMillis, Exception caught)
throws Exception {
String s = String.format("%s: after %d millis", message,
timeoutMillis);
String caughtText = caught != null
? ("; " + robustToString(caught)) : "";
return (TimeoutException) (new TimeoutException(s + caughtText)
.initCause(caught));
}
}
/**
* Retry at a fixed time period between calls.
*/
public static class FixedRetryInterval implements Callable<Integer> {
private final int intervalMillis;
private int invocationCount = 0;
public FixedRetryInterval(int intervalMillis) {
Preconditions.checkArgument(intervalMillis > 0);
this.intervalMillis = intervalMillis;
}
@Override
public Integer call() throws Exception {
invocationCount++;
return intervalMillis;
}
public int getInvocationCount() {
return invocationCount;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"FixedRetryInterval{");
sb.append("interval=").append(intervalMillis);
sb.append(", invocationCount=").append(invocationCount);
sb.append('}');
return sb.toString();
}
}
/**
* Gradually increase the sleep time by the initial interval, until
* the limit set by {@code maxIntervalMillis} is reached.
*/
public static class LinearRetryInterval implements Callable<Integer> {
private final int intervalMillis;
private final int maxIntervalMillis;
private int current;
private int invocationCount = 0;
public LinearRetryInterval(int intervalMillis, int maxIntervalMillis) {
Preconditions.checkArgument(intervalMillis > 0);
Preconditions.checkArgument(maxIntervalMillis > 0);
this.intervalMillis = intervalMillis;
this.current = intervalMillis;
this.maxIntervalMillis = maxIntervalMillis;
}
@Override
public Integer call() throws Exception {
invocationCount++;
int last = current;
if (last < maxIntervalMillis) {
current += intervalMillis;
}
return last;
}
public int getInvocationCount() {
return invocationCount;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"LinearRetryInterval{");
sb.append("interval=").append(intervalMillis);
sb.append(", current=").append(current);
sb.append(", limit=").append(maxIntervalMillis);
sb.append(", invocationCount=").append(invocationCount);
sb.append('}');
return sb.toString();
}
}
/**
* An exception which triggers a fast exist from the
* {@link #evaluate(Callable, int, Callable)} and
* {@link #eventually(Callable, int, Callable, TimeoutHandler)} loops.
*/
public static class FailFastException extends Exception {
public FailFastException(String detailMessage) {
super(detailMessage);
}
public FailFastException(String message, Throwable cause) {
super(message, cause);
}
/**
* Instantiate from a format string.
* @param format format string
* @param args arguments to format
* @return an instance with the message string constructed.
*/
public static FailFastException newInstance(String format, Object...args) {
return new FailFastException(String.format(format, args));
}
}
}

View File

@ -0,0 +1,240 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.test;
import org.junit.Assert;
import org.junit.Test;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.test.LambdaTestUtils.*;
import static org.apache.hadoop.test.GenericTestUtils.*;
/**
* Test the logic in {@link LambdaTestUtils}.
* This test suite includes Java 8 and Java 7 code; the Java 8 code exists
* to verify that the API is easily used with Lambda expressions.
*/
public class TestLambdaTestUtils extends Assert {
public static final int INTERVAL = 10;
public static final int TIMEOUT = 50;
private FixedRetryInterval retry = new FixedRetryInterval(INTERVAL);
// counter for lambda expressions to use
private int count;
/**
* Always evaluates to true.
*/
public static final Callable<Boolean> ALWAYS_TRUE =
new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return true;
}
};
/**
* Always evaluates to false.
*/
public static final Callable<Boolean> ALWAYS_FALSE =
new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return false;
}
};
/**
* Text in the raised FNFE.
*/
public static final String MISSING = "not found";
/**
* A predicate that always throws a FileNotFoundException.
*/
public static final Callable<Boolean> ALWAYS_FNFE =
new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
throw new FileNotFoundException(MISSING);
}
};
/**
* reusable timeout handler.
*/
public static final GenerateTimeout
TIMEOUT_FAILURE_HANDLER = new GenerateTimeout();
/**
* Always evaluates to 3L.
*/
public static final Callable<Long> EVAL_3L = new Callable<Long>() {
@Override
public Long call() throws Exception {
return 3L;
}
};
/**
* Always raises a {@code FileNotFoundException}.
*/
public static final Callable<Long> EVAL_FNFE = new Callable<Long>() {
@Override
public Long call() throws Exception {
throw new FileNotFoundException(MISSING);
}
};
@Test
public void testEventuallyAlwaysTrue() throws Throwable {
eventually(
ALWAYS_TRUE,
TIMEOUT,
new FixedRetryInterval(INTERVAL),
TIMEOUT_FAILURE_HANDLER);
}
@Test
public void testEventuallyAlwaysFalse() throws Throwable {
try {
eventually(
ALWAYS_FALSE,
TIMEOUT,
retry,
TIMEOUT_FAILURE_HANDLER);
fail("should not have got here");
} catch (TimeoutException e) {
assertTrue(retry.getInvocationCount() > 4);
}
}
@Test
public void testEventuallyLinearRetry() throws Throwable {
LinearRetryInterval linearRetry = new LinearRetryInterval(
INTERVAL * 2,
TIMEOUT * 2);
try {
eventually(
ALWAYS_FALSE,
TIMEOUT,
linearRetry,
TIMEOUT_FAILURE_HANDLER);
fail("should not have got here");
} catch (TimeoutException e) {
assertEquals(linearRetry.toString(),
2, linearRetry.getInvocationCount());
}
}
@Test
public void testEventuallyFNFE() throws Throwable {
try {
eventually(
ALWAYS_FNFE,
TIMEOUT,
retry,
TIMEOUT_FAILURE_HANDLER);
fail("should not have got here");
} catch (TimeoutException e) {
// inner clause is included
assertTrue(retry.getInvocationCount() > 0);
assertTrue(e.getCause() instanceof FileNotFoundException);
assertExceptionContains(MISSING, e);
}
}
@Test
public void testEvaluate() throws Throwable {
long result = evaluate(EVAL_3L,
TIMEOUT,
retry);
assertEquals(3, result);
assertEquals(0, retry.getInvocationCount());
}
@Test
public void testEvalFailuresRetry() throws Throwable {
try {
evaluate(EVAL_FNFE,
TIMEOUT,
retry);
fail("should not have got here");
} catch (IOException expected) {
// expected
assertMinRetryCount(1);
}
}
@Test
public void testLinearRetryInterval() throws Throwable {
LinearRetryInterval interval = new LinearRetryInterval(200, 1000);
assertEquals(200, (int) interval.call());
assertEquals(400, (int) interval.call());
assertEquals(600, (int) interval.call());
assertEquals(800, (int) interval.call());
assertEquals(1000, (int) interval.call());
assertEquals(1000, (int) interval.call());
assertEquals(1000, (int) interval.call());
}
@Test
public void testInterceptSuccess() throws Throwable {
IOException ioe = intercept(IOException.class, ALWAYS_FNFE);
assertExceptionContains(MISSING, ioe);
}
@Test
public void testInterceptContainsSuccess() throws Throwable {
intercept(IOException.class, MISSING, ALWAYS_FNFE);
}
@Test
public void testInterceptContainsWrongString() throws Throwable {
try {
FileNotFoundException e =
intercept(FileNotFoundException.class, "404", ALWAYS_FNFE);
throw e;
} catch (AssertionError expected) {
assertExceptionContains(MISSING, expected);
}
}
/**
* Assert the retry count is as expected.
* @param expected expected value
*/
protected void assertRetryCount(int expected) {
assertEquals(retry.toString(), expected, retry.getInvocationCount());
}
/**
* Assert the retry count is as expected.
* @param minCount minimum value
*/
protected void assertMinRetryCount(int minCount) {
assertTrue("retry count of " + retry
+ " is not >= " + minCount,
minCount <= retry.getInvocationCount());
}
}

View File

@ -25,6 +25,8 @@
import org.apache.hadoop.fs.contract.AbstractFSContract; import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase; import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
import org.apache.hadoop.fs.contract.s3a.S3AContract; import org.apache.hadoop.fs.contract.s3a.S3AContract;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -34,8 +36,6 @@
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*; import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
/** /**
* Test S3A Failure translation, including a functional test * Test S3A Failure translation, including a functional test
@ -68,13 +68,13 @@ public void testReadFileChanged() throws Throwable {
writeDataset(fs, testpath, shortDataset, shortDataset.length, 1024, true); writeDataset(fs, testpath, shortDataset, shortDataset.length, 1024, true);
// here the file length is less. Probe the file to see if this is true, // here the file length is less. Probe the file to see if this is true,
// with a spin and wait // with a spin and wait
eventually(30 *1000, new Callable<Void>() { LambdaTestUtils.evaluate(new Callable<Void>() {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
assertEquals(shortLen, fs.getFileStatus(testpath).getLen()); assertEquals(shortLen, fs.getFileStatus(testpath).getLen());
return null; return null;
} }
}); }, 30 * 1000, 1000);
// here length is shorter. Assuming it has propagated to all replicas, // here length is shorter. Assuming it has propagated to all replicas,
// the position of the input stream is now beyond the EOF. // the position of the input stream is now beyond the EOF.
// An attempt to seek backwards to a position greater than the // An attempt to seek backwards to a position greater than the

View File

@ -135,32 +135,6 @@ public static FileContext createTestFileContext(Configuration conf)
return fc; return fc;
} }
/**
* Repeatedly attempt a callback until timeout or a {@link FailFastException}
* is raised. This is modeled on ScalaTests {@code eventually(Closure)} code.
* @param timeout timeout
* @param callback callback to invoke
* @throws FailFastException any fast-failure
* @throws Exception the exception which caused the iterator to fail
*/
public static void eventually(int timeout, Callable<Void> callback)
throws Exception {
Exception lastException;
long endtime = System.currentTimeMillis() + timeout;
do {
try {
callback.call();
return;
} catch (InterruptedException | FailFastException e) {
throw e;
} catch (Exception e) {
lastException = e;
}
Thread.sleep(500);
} while (endtime > System.currentTimeMillis());
throw lastException;
}
/** /**
* patch the endpoint option so that irrespective of where other tests * patch the endpoint option so that irrespective of where other tests
* are working, the IO performance tests can work with the landsat * are working, the IO performance tests can work with the landsat
@ -290,27 +264,6 @@ public static String getTestProperty(Configuration conf,
? propval : confVal; ? propval : confVal;
} }
/**
* The exception to raise so as to exit fast from
* {@link #eventually(int, Callable)}.
*/
public static class FailFastException extends Exception {
public FailFastException() {
}
public FailFastException(String message) {
super(message);
}
public FailFastException(String message, Throwable cause) {
super(message, cause);
}
public FailFastException(Throwable cause) {
super(cause);
}
}
/** /**
* Verify the class of an exception. If it is not as expected, rethrow it. * Verify the class of an exception. If it is not as expected, rethrow it.
* Comparison is on the exact class, not subclass-of inference as * Comparison is on the exact class, not subclass-of inference as