HADOOP-13716. Add LambdaTestUtils class for tests; fix eventual consistency problem in contract test setup. Contributed by Steve Loughran.

This commit is contained in:
Steve Loughran 2016-10-21 19:11:31 +01:00
parent 5754772bd6
commit 4b56954fea
6 changed files with 816 additions and 76 deletions

View File

@ -27,12 +27,16 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.test.LambdaTestUtils;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.deleteChildren; import static org.apache.hadoop.fs.contract.ContractTestUtils.deleteChildren;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dumpStats;
import static org.apache.hadoop.fs.contract.ContractTestUtils.listChildren; import static org.apache.hadoop.fs.contract.ContractTestUtils.listChildren;
import static org.apache.hadoop.fs.contract.ContractTestUtils.toList; import static org.apache.hadoop.fs.contract.ContractTestUtils.toList;
import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk; import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk;
@ -45,6 +49,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk;
public abstract class AbstractContractRootDirectoryTest extends AbstractFSContractTestBase { public abstract class AbstractContractRootDirectoryTest extends AbstractFSContractTestBase {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(AbstractContractRootDirectoryTest.class); LoggerFactory.getLogger(AbstractContractRootDirectoryTest.class);
public static final int OBJECTSTORE_RETRY_TIMEOUT = 30000;
@Override @Override
public void setup() throws Exception { public void setup() throws Exception {
@ -79,23 +84,34 @@ public abstract class AbstractContractRootDirectoryTest extends AbstractFSContra
// extra sanity checks here to avoid support calls about complete loss // extra sanity checks here to avoid support calls about complete loss
// of data // of data
skipIfUnsupported(TEST_ROOT_TESTS_ENABLED); skipIfUnsupported(TEST_ROOT_TESTS_ENABLED);
Path root = new Path("/"); final Path root = new Path("/");
assertIsDirectory(root); assertIsDirectory(root);
// make sure it is clean // make sure the directory is clean. This includes some retry logic
FileSystem fs = getFileSystem(); // to forgive blobstores whose listings can be out of sync with the file
deleteChildren(fs, root, true); // status;
FileStatus[] children = listChildren(fs, root); final FileSystem fs = getFileSystem();
if (children.length > 0) { final AtomicInteger iterations = new AtomicInteger(0);
StringBuilder error = new StringBuilder(); final FileStatus[] originalChildren = listChildren(fs, root);
error.append("Deletion of child entries failed, still have") LambdaTestUtils.eventually(
.append(children.length) OBJECTSTORE_RETRY_TIMEOUT,
.append(System.lineSeparator()); new Callable<Void>() {
for (FileStatus child : children) { @Override
error.append(" ").append(child.getPath()) public Void call() throws Exception {
.append(System.lineSeparator()); FileStatus[] deleted = deleteChildren(fs, root, true);
} FileStatus[] children = listChildren(fs, root);
fail(error.toString()); if (children.length > 0) {
} fail(String.format(
"After %d attempts: listing after rm /* not empty"
+ "\n%s\n%s\n%s",
iterations.incrementAndGet(),
dumpStats("final", children),
dumpStats("deleted", deleted),
dumpStats("original", originalChildren)));
}
return null;
}
},
new LambdaTestUtils.ProportionalRetryInterval(50, 1000));
// then try to delete the empty one // then try to delete the empty one
boolean deleted = fs.delete(root, false); boolean deleted = fs.delete(root, false);
LOG.info("rm / of empty dir result is {}", deleted); LOG.info("rm / of empty dir result is {}", deleted);

View File

@ -400,18 +400,18 @@ public class ContractTestUtils extends Assert {
* @param fileSystem filesystem * @param fileSystem filesystem
* @param path path to delete * @param path path to delete
* @param recursive flag to indicate child entry deletion should be recursive * @param recursive flag to indicate child entry deletion should be recursive
* @return the number of child entries found and deleted (not including * @return the immediate child entries found and deleted (not including
* any recursive children of those entries) * any recursive children of those entries)
* @throws IOException problem in the deletion process. * @throws IOException problem in the deletion process.
*/ */
public static int deleteChildren(FileSystem fileSystem, public static FileStatus[] deleteChildren(FileSystem fileSystem,
Path path, Path path,
boolean recursive) throws IOException { boolean recursive) throws IOException {
FileStatus[] children = listChildren(fileSystem, path); FileStatus[] children = listChildren(fileSystem, path);
for (FileStatus entry : children) { for (FileStatus entry : children) {
fileSystem.delete(entry.getPath(), recursive); fileSystem.delete(entry.getPath(), recursive);
} }
return children.length; return children;
} }
/** /**

View File

@ -0,0 +1,521 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.test;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.util.Time;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeoutException;
/**
* Class containing methods and associated classes to make the most of Lambda
* expressions in Hadoop tests.
*
* The code has been designed from the outset to be Java-8 friendly, but
* to still be usable in Java 7.
*
* The code is modelled on {@code GenericTestUtils#waitFor(Supplier, int, int)},
* but also lifts concepts from Scalatest's {@code awaitResult} and
* its notion of pluggable retry logic (simple, backoff, maybe even things
* with jitter: test author gets to choose).
* The {@link #intercept(Class, Callable)} method is also all credit due
* Scalatest, though it's been extended to also support a string message
* check; useful when checking the contents of the exception.
*/
public final class LambdaTestUtils {
public static final Logger LOG =
LoggerFactory.getLogger(LambdaTestUtils.class);
private LambdaTestUtils() {
}
/**
* This is the string included in the assertion text in
* {@link #intercept(Class, Callable)} if
* the closure returned a null value.
*/
public static final String NULL_RESULT = "(null)";
/**
* Interface to implement for converting a timeout into some form
* of exception to raise.
*/
public interface TimeoutHandler {
/**
* Create an exception (or throw one, if desired).
* @param timeoutMillis timeout which has arisen
* @param caught any exception which was caught; may be null
* @return an exception which will then be thrown
* @throws Exception if the handler wishes to raise an exception
* that way.
*/
Exception evaluate(int timeoutMillis, Exception caught) throws Exception;
}
/**
* Wait for a condition to be met, with a retry policy returning the
* sleep time before the next attempt is made. If, at the end
* of the timeout period, the condition is still false (or failing with
* an exception), the timeout handler is invoked, passing in the timeout
* and any exception raised in the last invocation. The exception returned
* by this timeout handler is then rethrown.
* <p>
* Example: Wait 30s for a condition to be met, with a sleep of 30s
* between each probe.
* If the operation is failing, then, after 30s, the timeout handler
* is called. This returns the exception passed in (if any),
* or generates a new one.
* <pre>
* await(
* 30 * 1000,
* () -> { return 0 == filesystem.listFiles(new Path("/")).length); },
* () -> 500),
* (timeout, ex) -> ex != null ? ex : new TimeoutException("timeout"));
* </pre>
*
* @param timeoutMillis timeout in milliseconds.
* Can be zero, in which case only one attempt is made.
* @param check predicate to evaluate
* @param retry retry escalation logic
* @param timeoutHandler handler invoked on timeout;
* the returned exception will be thrown
* @return the number of iterations before the condition was satisfied
* @throws Exception the exception returned by {@code timeoutHandler} on
* timeout
* @throws FailFastException immediately if the evaluated operation raises it
* @throws InterruptedException if interrupted.
*/
public static int await(int timeoutMillis,
Callable<Boolean> check,
Callable<Integer> retry,
TimeoutHandler timeoutHandler)
throws Exception {
Preconditions.checkArgument(timeoutMillis >= 0,
"timeoutMillis must be >= 0");
Preconditions.checkNotNull(timeoutHandler);
long endTime = Time.now() + timeoutMillis;
Exception ex = null;
boolean running = true;
int iterations = 0;
while (running) {
iterations++;
try {
if (check.call()) {
return iterations;
}
// the probe failed but did not raise an exception. Reset any
// exception raised by a previous probe failure.
ex = null;
} catch (InterruptedException | FailFastException e) {
throw e;
} catch (Exception e) {
LOG.debug("eventually() iteration {}", iterations, e);
ex = e;
}
running = Time.now() < endTime;
if (running) {
int sleeptime = retry.call();
if (sleeptime >= 0) {
Thread.sleep(sleeptime);
} else {
running = false;
}
}
}
// timeout
Exception evaluate = timeoutHandler.evaluate(timeoutMillis, ex);
if (evaluate == null) {
// bad timeout handler logic; fall back to GenerateTimeout so the
// underlying problem isn't lost.
LOG.error("timeout handler {} did not throw an exception ",
timeoutHandler);
evaluate = new GenerateTimeout().evaluate(timeoutMillis, ex);
}
throw evaluate;
}
/**
* Simplified {@link #await(int, Callable, Callable, TimeoutHandler)}
* operation with a fixed interval
* and {@link GenerateTimeout} handler to generate a {@code TimeoutException}.
* <p>
* Example: await for probe to succeed:
* <pre>
* await(
* 30 * 1000, 500,
* () -> { return 0 == filesystem.listFiles(new Path("/")).length); });
* </pre>
*
* @param timeoutMillis timeout in milliseconds.
* Can be zero, in which case only one attempt is made.
* @param intervalMillis interval in milliseconds between checks
* @param check predicate to evaluate
* @return the number of iterations before the condition was satisfied
* @throws Exception returned by {@code failure} on timeout
* @throws FailFastException immediately if the evaluated operation raises it
* @throws InterruptedException if interrupted.
*/
public static int await(int timeoutMillis,
int intervalMillis,
Callable<Boolean> check) throws Exception {
return await(timeoutMillis, check,
new FixedRetryInterval(intervalMillis),
new GenerateTimeout());
}
/**
* Repeatedly execute a closure until it returns a value rather than
* raise an exception.
* Exceptions are caught and, with one exception,
* trigger a sleep and retry. This is similar of ScalaTest's
* {@code eventually(timeout, closure)} operation, though that lacks
* the ability to fail fast if the inner closure has determined that
* a failure condition is non-recoverable.
* <p>
* Example: spin until an the number of files in a filesystem is non-zero,
* returning the files found.
* The sleep interval backs off by 500 ms each iteration to a maximum of 5s.
* <pre>
* FileStatus[] files = eventually( 30 * 1000,
* () -> {
* FileStatus[] f = filesystem.listFiles(new Path("/"));
* assertEquals(0, f.length);
* return f;
* },
* new ProportionalRetryInterval(500, 5000));
* </pre>
* This allows for a fast exit, yet reduces probe frequency over time.
*
* @param <T> return type
* @param timeoutMillis timeout in milliseconds.
* Can be zero, in which case only one attempt is made before failing.
* @param eval expression to evaluate
* @param retry retry interval generator
* @return result of the first successful eval call
* @throws Exception the last exception thrown before timeout was triggered
* @throws FailFastException if raised -without any retry attempt.
* @throws InterruptedException if interrupted during the sleep operation.
*/
public static <T> T eventually(int timeoutMillis,
Callable<T> eval,
Callable<Integer> retry) throws Exception {
Preconditions.checkArgument(timeoutMillis >= 0,
"timeoutMillis must be >= 0");
long endTime = Time.now() + timeoutMillis;
Exception ex;
boolean running;
int sleeptime;
int iterations = 0;
do {
iterations++;
try {
return eval.call();
} catch (InterruptedException | FailFastException e) {
// these two exceptions trigger an immediate exit
throw e;
} catch (Exception e) {
LOG.debug("evaluate() iteration {}", iterations, e);
ex = e;
}
running = Time.now() < endTime;
if (running && (sleeptime = retry.call()) >= 0) {
Thread.sleep(sleeptime);
}
} while (running);
// timeout. Throw the last exception raised
throw ex;
}
/**
* Simplified {@link #eventually(int, Callable, Callable)} method
* with a fixed interval.
* <p>
* Example: wait 30s until an assertion holds, sleeping 1s between each
* check.
* <pre>
* eventually( 30 * 1000, 1000,
* () -> { assertEquals(0, filesystem.listFiles(new Path("/")).length); }
* );
* </pre>
*
* @param timeoutMillis timeout in milliseconds.
* Can be zero, in which case only one attempt is made before failing.
* @param intervalMillis interval in milliseconds
* @param eval expression to evaluate
* @return result of the first successful invocation of {@code eval()}
* @throws Exception the last exception thrown before timeout was triggered
* @throws FailFastException if raised -without any retry attempt.
* @throws InterruptedException if interrupted during the sleep operation.
*/
public static <T> T eventually(int timeoutMillis,
int intervalMillis,
Callable<T> eval) throws Exception {
return eventually(timeoutMillis, eval,
new FixedRetryInterval(intervalMillis));
}
/**
* Intercept an exception; throw an {@code AssertionError} if one not raised.
* The caught exception is rethrown if it is of the wrong class or
* does not contain the text defined in {@code contained}.
* <p>
* Example: expect deleting a nonexistent file to raise a
* {@code FileNotFoundException}.
* <pre>
* FileNotFoundException ioe = intercept(FileNotFoundException.class,
* () -> {
* filesystem.delete(new Path("/missing"), false);
* });
* </pre>
*
* @param clazz class of exception; the raised exception must be this class
* <i>or a subclass</i>.
* @param eval expression to eval
* @param <T> return type of expression
* @param <E> exception class
* @return the caught exception if it was of the expected type
* @throws Exception any other exception raised
* @throws AssertionError if the evaluation call didn't raise an exception.
* The error includes the {@code toString()} value of the result, if this
* can be determined.
*/
@SuppressWarnings("unchecked")
public static <T, E extends Throwable> E intercept(
Class<E> clazz,
Callable<T> eval)
throws Exception {
try {
T result = eval.call();
throw new AssertionError("Expected an exception, got "
+ robustToString(result));
} catch (Throwable e) {
if (clazz.isAssignableFrom(e.getClass())) {
return (E)e;
}
throw e;
}
}
/**
* Intercept an exception; throw an {@code AssertionError} if one not raised.
* The caught exception is rethrown if it is of the wrong class or
* does not contain the text defined in {@code contained}.
* <p>
* Example: expect deleting a nonexistent file to raise a
* {@code FileNotFoundException} with the {@code toString()} value
* containing the text {@code "missing"}.
* <pre>
* FileNotFoundException ioe = intercept(FileNotFoundException.class,
* "missing",
* () -> {
* filesystem.delete(new Path("/missing"), false);
* });
* </pre>
*
* @param clazz class of exception; the raised exception must be this class
* <i>or a subclass</i>.
* @param contained string which must be in the {@code toString()} value
* of the exception
* @param eval expression to eval
* @param <T> return type of expression
* @param <E> exception class
* @return the caught exception if it was of the expected type and contents
* @throws Exception any other exception raised
* @throws AssertionError if the evaluation call didn't raise an exception.
* The error includes the {@code toString()} value of the result, if this
* can be determined.
* @see GenericTestUtils#assertExceptionContains(String, Throwable)
*/
public static <T, E extends Throwable> E intercept(
Class<E> clazz,
String contained,
Callable<T> eval)
throws Exception {
E ex = intercept(clazz, eval);
GenericTestUtils.assertExceptionContains(contained, ex);
return ex;
}
/**
* Robust string converter for exception messages; if the {@code toString()}
* method throws an exception then that exception is caught and logged,
* then a simple string of the classname logged.
* This stops a {@code toString()} failure hiding underlying problems.
* @param o object to stringify
* @return a string for exception messages
*/
private static String robustToString(Object o) {
if (o == null) {
return NULL_RESULT;
} else {
try {
return o.toString();
} catch (Exception e) {
LOG.info("Exception calling toString()", e);
return o.getClass().toString();
}
}
}
/**
* Returns {@code TimeoutException} on a timeout. If
* there was a inner class passed in, includes it as the
* inner failure.
*/
public static class GenerateTimeout implements TimeoutHandler {
private final String message;
public GenerateTimeout(String message) {
this.message = message;
}
public GenerateTimeout() {
this("timeout");
}
/**
* Evaluate operation creates a new {@code TimeoutException}.
* @param timeoutMillis timeout in millis
* @param caught optional caught exception
* @return TimeoutException
*/
@Override
public Exception evaluate(int timeoutMillis, Exception caught)
throws Exception {
String s = String.format("%s: after %d millis", message,
timeoutMillis);
String caughtText = caught != null
? ("; " + robustToString(caught)) : "";
return (TimeoutException) (new TimeoutException(s + caughtText)
.initCause(caught));
}
}
/**
* Retry at a fixed time period between calls.
*/
public static class FixedRetryInterval implements Callable<Integer> {
private final int intervalMillis;
private int invocationCount = 0;
public FixedRetryInterval(int intervalMillis) {
Preconditions.checkArgument(intervalMillis > 0);
this.intervalMillis = intervalMillis;
}
@Override
public Integer call() throws Exception {
invocationCount++;
return intervalMillis;
}
public int getInvocationCount() {
return invocationCount;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"FixedRetryInterval{");
sb.append("interval=").append(intervalMillis);
sb.append(", invocationCount=").append(invocationCount);
sb.append('}');
return sb.toString();
}
}
/**
* Gradually increase the sleep time by the initial interval, until
* the limit set by {@code maxIntervalMillis} is reached.
*/
public static class ProportionalRetryInterval implements Callable<Integer> {
private final int intervalMillis;
private final int maxIntervalMillis;
private int current;
private int invocationCount = 0;
public ProportionalRetryInterval(int intervalMillis,
int maxIntervalMillis) {
Preconditions.checkArgument(intervalMillis > 0);
Preconditions.checkArgument(maxIntervalMillis > 0);
this.intervalMillis = intervalMillis;
this.current = intervalMillis;
this.maxIntervalMillis = maxIntervalMillis;
}
@Override
public Integer call() throws Exception {
invocationCount++;
int last = current;
if (last < maxIntervalMillis) {
current += intervalMillis;
}
return last;
}
public int getInvocationCount() {
return invocationCount;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"ProportionalRetryInterval{");
sb.append("interval=").append(intervalMillis);
sb.append(", current=").append(current);
sb.append(", limit=").append(maxIntervalMillis);
sb.append(", invocationCount=").append(invocationCount);
sb.append('}');
return sb.toString();
}
}
/**
* An exception which triggers a fast exist from the
* {@link #eventually(int, Callable, Callable)} and
* {@link #await(int, Callable, Callable, TimeoutHandler)} loops.
*/
public static class FailFastException extends Exception {
public FailFastException(String detailMessage) {
super(detailMessage);
}
public FailFastException(String message, Throwable cause) {
super(message, cause);
}
/**
* Instantiate from a format string.
* @param format format string
* @param args arguments to format
* @return an instance with the message string constructed.
*/
public static FailFastException newInstance(String format, Object...args) {
return new FailFastException(String.format(format, args));
}
}
}

View File

@ -0,0 +1,249 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.test;
import org.junit.Assert;
import org.junit.Test;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.test.LambdaTestUtils.*;
import static org.apache.hadoop.test.GenericTestUtils.*;
/**
* Test the logic in {@link LambdaTestUtils}.
* This test suite includes Java 8 and Java 7 code; the Java 8 code exists
* to verify that the API is easily used with Lambda expressions.
*/
public class TestLambdaTestUtils extends Assert {
public static final int INTERVAL = 10;
public static final int TIMEOUT = 50;
private FixedRetryInterval retry = new FixedRetryInterval(INTERVAL);
/**
* Always evaluates to true.
*/
public static final Callable<Boolean> ALWAYS_TRUE =
new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return true;
}
};
/**
* Always evaluates to false.
*/
public static final Callable<Boolean> ALWAYS_FALSE =
new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return false;
}
};
/**
* Text in the raised FNFE.
*/
public static final String MISSING = "not found";
/**
* A predicate that always throws a FileNotFoundException.
*/
public static final Callable<Boolean> ALWAYS_FNFE =
new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
throw new FileNotFoundException(MISSING);
}
};
/**
* reusable timeout handler.
*/
public static final GenerateTimeout
TIMEOUT_FAILURE_HANDLER = new GenerateTimeout();
/**
* Always evaluates to 3L.
*/
public static final Callable<Long> EVAL_3L = new Callable<Long>() {
@Override
public Long call() throws Exception {
return 3L;
}
};
/**
* Always raises a {@code FileNotFoundException}.
*/
public static final Callable<Long> EVAL_FNFE = new Callable<Long>() {
@Override
public Long call() throws Exception {
throw new FileNotFoundException(MISSING);
}
};
/**
* Assert the retry count is as expected.
* @param expected expected value
*/
protected void assertRetryCount(int expected) {
assertEquals(retry.toString(), expected, retry.getInvocationCount());
}
/**
* Assert the retry count is as expected.
* @param minCount minimum value
*/
protected void assertMinRetryCount(int minCount) {
assertTrue("retry count of " + retry + " is not >= " + minCount,
minCount <= retry.getInvocationCount());
}
@Test
public void testAwaitAlwaysTrue() throws Throwable {
await(TIMEOUT,
ALWAYS_TRUE,
new FixedRetryInterval(INTERVAL),
TIMEOUT_FAILURE_HANDLER);
}
@Test
public void testAwaitAlwaysFalse() throws Throwable {
try {
await(TIMEOUT,
ALWAYS_FALSE,
retry,
TIMEOUT_FAILURE_HANDLER);
fail("should not have got here");
} catch (TimeoutException e) {
assertTrue(retry.getInvocationCount() > 4);
}
}
@Test
public void testAwaitLinearRetry() throws Throwable {
ProportionalRetryInterval linearRetry =
new ProportionalRetryInterval(INTERVAL * 2, TIMEOUT * 2);
try {
await(TIMEOUT,
ALWAYS_FALSE,
linearRetry,
TIMEOUT_FAILURE_HANDLER);
fail("should not have got here");
} catch (TimeoutException e) {
assertEquals(linearRetry.toString(),
2, linearRetry.getInvocationCount());
}
}
@Test
public void testAwaitFNFE() throws Throwable {
try {
await(TIMEOUT,
ALWAYS_FNFE,
retry,
TIMEOUT_FAILURE_HANDLER);
fail("should not have got here");
} catch (TimeoutException e) {
// inner clause is included
assertTrue(retry.getInvocationCount() > 0);
assertTrue(e.getCause() instanceof FileNotFoundException);
assertExceptionContains(MISSING, e);
}
}
@Test
public void testRetryInterval() throws Throwable {
ProportionalRetryInterval interval =
new ProportionalRetryInterval(200, 1000);
assertEquals(200, (int) interval.call());
assertEquals(400, (int) interval.call());
assertEquals(600, (int) interval.call());
assertEquals(800, (int) interval.call());
assertEquals(1000, (int) interval.call());
assertEquals(1000, (int) interval.call());
assertEquals(1000, (int) interval.call());
}
@Test
public void testInterceptSuccess() throws Throwable {
IOException ioe = intercept(IOException.class, ALWAYS_FNFE);
assertExceptionContains(MISSING, ioe);
}
@Test
public void testInterceptContains() throws Throwable {
intercept(IOException.class, MISSING, ALWAYS_FNFE);
}
@Test
public void testInterceptContainsWrongString() throws Throwable {
try {
FileNotFoundException e =
intercept(FileNotFoundException.class, "404", ALWAYS_FNFE);
assertNotNull(e);
throw e;
} catch (AssertionError expected) {
assertExceptionContains(MISSING, expected);
}
}
@Test
public void testInterceptVoidCallable() throws Throwable {
intercept(AssertionError.class,
NULL_RESULT,
new Callable<IOException>() {
@Override
public IOException call() throws Exception {
return intercept(IOException.class,
new Callable<Void>() {
@Override
public Void call() throws Exception {
return null;
}
});
}
});
}
@Test
public void testEventually() throws Throwable {
long result = eventually(TIMEOUT, EVAL_3L, retry);
assertEquals(3, result);
assertEquals(0, retry.getInvocationCount());
}
@Test
public void testEventuallyFailuresRetry() throws Throwable {
try {
eventually(TIMEOUT, EVAL_FNFE, retry);
fail("should not have got here");
} catch (IOException expected) {
// expected
assertMinRetryCount(1);
}
}
}

View File

@ -25,6 +25,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.AbstractFSContract; import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase; import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
import org.apache.hadoop.fs.contract.s3a.S3AContract; import org.apache.hadoop.fs.contract.s3a.S3AContract;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -34,8 +36,6 @@ import java.io.FileNotFoundException;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*; import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
/** /**
* Test S3A Failure translation, including a functional test * Test S3A Failure translation, including a functional test
@ -68,13 +68,15 @@ public class ITestS3AFailureHandling extends AbstractFSContractTestBase {
writeDataset(fs, testpath, shortDataset, shortDataset.length, 1024, true); writeDataset(fs, testpath, shortDataset, shortDataset.length, 1024, true);
// here the file length is less. Probe the file to see if this is true, // here the file length is less. Probe the file to see if this is true,
// with a spin and wait // with a spin and wait
eventually(30 *1000, new Callable<Void>() { LambdaTestUtils.eventually(30 * 1000, 1000,
@Override new Callable<Void>() {
public Void call() throws Exception { @Override
assertEquals(shortLen, fs.getFileStatus(testpath).getLen()); public Void call() throws Exception {
return null; assertEquals(shortLen, fs.getFileStatus(testpath).getLen());
} return null;
}); }
});
// here length is shorter. Assuming it has propagated to all replicas, // here length is shorter. Assuming it has propagated to all replicas,
// the position of the input stream is now beyond the EOF. // the position of the input stream is now beyond the EOF.
// An attempt to seek backwards to a position greater than the // An attempt to seek backwards to a position greater than the

View File

@ -28,7 +28,6 @@ import org.slf4j.Logger;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.concurrent.Callable;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.*; import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
@ -135,32 +134,6 @@ public class S3ATestUtils {
return fc; return fc;
} }
/**
* Repeatedly attempt a callback until timeout or a {@link FailFastException}
* is raised. This is modeled on ScalaTests {@code eventually(Closure)} code.
* @param timeout timeout
* @param callback callback to invoke
* @throws FailFastException any fast-failure
* @throws Exception the exception which caused the iterator to fail
*/
public static void eventually(int timeout, Callable<Void> callback)
throws Exception {
Exception lastException;
long endtime = System.currentTimeMillis() + timeout;
do {
try {
callback.call();
return;
} catch (InterruptedException | FailFastException e) {
throw e;
} catch (Exception e) {
lastException = e;
}
Thread.sleep(500);
} while (endtime > System.currentTimeMillis());
throw lastException;
}
/** /**
* patch the endpoint option so that irrespective of where other tests * patch the endpoint option so that irrespective of where other tests
* are working, the IO performance tests can work with the landsat * are working, the IO performance tests can work with the landsat
@ -290,27 +263,6 @@ public class S3ATestUtils {
? propval : confVal; ? propval : confVal;
} }
/**
* The exception to raise so as to exit fast from
* {@link #eventually(int, Callable)}.
*/
public static class FailFastException extends Exception {
public FailFastException() {
}
public FailFastException(String message) {
super(message);
}
public FailFastException(String message, Throwable cause) {
super(message, cause);
}
public FailFastException(Throwable cause) {
super(cause);
}
}
/** /**
* Verify the class of an exception. If it is not as expected, rethrow it. * Verify the class of an exception. If it is not as expected, rethrow it.
* Comparison is on the exact class, not subclass-of inference as * Comparison is on the exact class, not subclass-of inference as