HADOOP-13716. Add LambdaTestUtils class for tests; fix eventual consistency problem in contract test setup. Contributed by Steve Loughran.
This commit is contained in:
parent
2ab149f0e4
commit
385c1daa46
|
@ -27,12 +27,16 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.deleteChildren;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.dumpStats;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.listChildren;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.toList;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk;
|
||||
|
@ -45,6 +49,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk;
|
|||
public abstract class AbstractContractRootDirectoryTest extends AbstractFSContractTestBase {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(AbstractContractRootDirectoryTest.class);
|
||||
public static final int OBJECTSTORE_RETRY_TIMEOUT = 30000;
|
||||
|
||||
@Override
|
||||
public void setup() throws Exception {
|
||||
|
@ -79,23 +84,34 @@ public abstract class AbstractContractRootDirectoryTest extends AbstractFSContra
|
|||
// extra sanity checks here to avoid support calls about complete loss
|
||||
// of data
|
||||
skipIfUnsupported(TEST_ROOT_TESTS_ENABLED);
|
||||
Path root = new Path("/");
|
||||
final Path root = new Path("/");
|
||||
assertIsDirectory(root);
|
||||
// make sure it is clean
|
||||
FileSystem fs = getFileSystem();
|
||||
deleteChildren(fs, root, true);
|
||||
FileStatus[] children = listChildren(fs, root);
|
||||
if (children.length > 0) {
|
||||
StringBuilder error = new StringBuilder();
|
||||
error.append("Deletion of child entries failed, still have")
|
||||
.append(children.length)
|
||||
.append(System.lineSeparator());
|
||||
for (FileStatus child : children) {
|
||||
error.append(" ").append(child.getPath())
|
||||
.append(System.lineSeparator());
|
||||
}
|
||||
fail(error.toString());
|
||||
}
|
||||
// make sure the directory is clean. This includes some retry logic
|
||||
// to forgive blobstores whose listings can be out of sync with the file
|
||||
// status;
|
||||
final FileSystem fs = getFileSystem();
|
||||
final AtomicInteger iterations = new AtomicInteger(0);
|
||||
final FileStatus[] originalChildren = listChildren(fs, root);
|
||||
LambdaTestUtils.eventually(
|
||||
OBJECTSTORE_RETRY_TIMEOUT,
|
||||
new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
FileStatus[] deleted = deleteChildren(fs, root, true);
|
||||
FileStatus[] children = listChildren(fs, root);
|
||||
if (children.length > 0) {
|
||||
fail(String.format(
|
||||
"After %d attempts: listing after rm /* not empty"
|
||||
+ "\n%s\n%s\n%s",
|
||||
iterations.incrementAndGet(),
|
||||
dumpStats("final", children),
|
||||
dumpStats("deleted", deleted),
|
||||
dumpStats("original", originalChildren)));
|
||||
}
|
||||
return null;
|
||||
}
|
||||
},
|
||||
new LambdaTestUtils.ProportionalRetryInterval(50, 1000));
|
||||
// then try to delete the empty one
|
||||
boolean deleted = fs.delete(root, false);
|
||||
LOG.info("rm / of empty dir result is {}", deleted);
|
||||
|
|
|
@ -400,18 +400,18 @@ public class ContractTestUtils extends Assert {
|
|||
* @param fileSystem filesystem
|
||||
* @param path path to delete
|
||||
* @param recursive flag to indicate child entry deletion should be recursive
|
||||
* @return the number of child entries found and deleted (not including
|
||||
* @return the immediate child entries found and deleted (not including
|
||||
* any recursive children of those entries)
|
||||
* @throws IOException problem in the deletion process.
|
||||
*/
|
||||
public static int deleteChildren(FileSystem fileSystem,
|
||||
public static FileStatus[] deleteChildren(FileSystem fileSystem,
|
||||
Path path,
|
||||
boolean recursive) throws IOException {
|
||||
FileStatus[] children = listChildren(fileSystem, path);
|
||||
for (FileStatus entry : children) {
|
||||
fileSystem.delete(entry.getPath(), recursive);
|
||||
}
|
||||
return children.length;
|
||||
return children;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,521 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.test;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* Class containing methods and associated classes to make the most of Lambda
|
||||
* expressions in Hadoop tests.
|
||||
*
|
||||
* The code has been designed from the outset to be Java-8 friendly, but
|
||||
* to still be usable in Java 7.
|
||||
*
|
||||
* The code is modelled on {@code GenericTestUtils#waitFor(Supplier, int, int)},
|
||||
* but also lifts concepts from Scalatest's {@code awaitResult} and
|
||||
* its notion of pluggable retry logic (simple, backoff, maybe even things
|
||||
* with jitter: test author gets to choose).
|
||||
* The {@link #intercept(Class, Callable)} method is also all credit due
|
||||
* Scalatest, though it's been extended to also support a string message
|
||||
* check; useful when checking the contents of the exception.
|
||||
*/
|
||||
public final class LambdaTestUtils {
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(LambdaTestUtils.class);
|
||||
|
||||
private LambdaTestUtils() {
|
||||
}
|
||||
|
||||
/**
|
||||
* This is the string included in the assertion text in
|
||||
* {@link #intercept(Class, Callable)} if
|
||||
* the closure returned a null value.
|
||||
*/
|
||||
public static final String NULL_RESULT = "(null)";
|
||||
|
||||
/**
|
||||
* Interface to implement for converting a timeout into some form
|
||||
* of exception to raise.
|
||||
*/
|
||||
public interface TimeoutHandler {
|
||||
|
||||
/**
|
||||
* Create an exception (or throw one, if desired).
|
||||
* @param timeoutMillis timeout which has arisen
|
||||
* @param caught any exception which was caught; may be null
|
||||
* @return an exception which will then be thrown
|
||||
* @throws Exception if the handler wishes to raise an exception
|
||||
* that way.
|
||||
*/
|
||||
Exception evaluate(int timeoutMillis, Exception caught) throws Exception;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for a condition to be met, with a retry policy returning the
|
||||
* sleep time before the next attempt is made. If, at the end
|
||||
* of the timeout period, the condition is still false (or failing with
|
||||
* an exception), the timeout handler is invoked, passing in the timeout
|
||||
* and any exception raised in the last invocation. The exception returned
|
||||
* by this timeout handler is then rethrown.
|
||||
* <p>
|
||||
* Example: Wait 30s for a condition to be met, with a sleep of 30s
|
||||
* between each probe.
|
||||
* If the operation is failing, then, after 30s, the timeout handler
|
||||
* is called. This returns the exception passed in (if any),
|
||||
* or generates a new one.
|
||||
* <pre>
|
||||
* await(
|
||||
* 30 * 1000,
|
||||
* () -> { return 0 == filesystem.listFiles(new Path("/")).length); },
|
||||
* () -> 500),
|
||||
* (timeout, ex) -> ex != null ? ex : new TimeoutException("timeout"));
|
||||
* </pre>
|
||||
*
|
||||
* @param timeoutMillis timeout in milliseconds.
|
||||
* Can be zero, in which case only one attempt is made.
|
||||
* @param check predicate to evaluate
|
||||
* @param retry retry escalation logic
|
||||
* @param timeoutHandler handler invoked on timeout;
|
||||
* the returned exception will be thrown
|
||||
* @return the number of iterations before the condition was satisfied
|
||||
* @throws Exception the exception returned by {@code timeoutHandler} on
|
||||
* timeout
|
||||
* @throws FailFastException immediately if the evaluated operation raises it
|
||||
* @throws InterruptedException if interrupted.
|
||||
*/
|
||||
public static int await(int timeoutMillis,
|
||||
Callable<Boolean> check,
|
||||
Callable<Integer> retry,
|
||||
TimeoutHandler timeoutHandler)
|
||||
throws Exception {
|
||||
Preconditions.checkArgument(timeoutMillis >= 0,
|
||||
"timeoutMillis must be >= 0");
|
||||
Preconditions.checkNotNull(timeoutHandler);
|
||||
|
||||
long endTime = Time.now() + timeoutMillis;
|
||||
Exception ex = null;
|
||||
boolean running = true;
|
||||
int iterations = 0;
|
||||
while (running) {
|
||||
iterations++;
|
||||
try {
|
||||
if (check.call()) {
|
||||
return iterations;
|
||||
}
|
||||
// the probe failed but did not raise an exception. Reset any
|
||||
// exception raised by a previous probe failure.
|
||||
ex = null;
|
||||
} catch (InterruptedException | FailFastException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
LOG.debug("eventually() iteration {}", iterations, e);
|
||||
ex = e;
|
||||
}
|
||||
running = Time.now() < endTime;
|
||||
if (running) {
|
||||
int sleeptime = retry.call();
|
||||
if (sleeptime >= 0) {
|
||||
Thread.sleep(sleeptime);
|
||||
} else {
|
||||
running = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
// timeout
|
||||
Exception evaluate = timeoutHandler.evaluate(timeoutMillis, ex);
|
||||
if (evaluate == null) {
|
||||
// bad timeout handler logic; fall back to GenerateTimeout so the
|
||||
// underlying problem isn't lost.
|
||||
LOG.error("timeout handler {} did not throw an exception ",
|
||||
timeoutHandler);
|
||||
evaluate = new GenerateTimeout().evaluate(timeoutMillis, ex);
|
||||
}
|
||||
throw evaluate;
|
||||
}
|
||||
|
||||
/**
|
||||
* Simplified {@link #await(int, Callable, Callable, TimeoutHandler)}
|
||||
* operation with a fixed interval
|
||||
* and {@link GenerateTimeout} handler to generate a {@code TimeoutException}.
|
||||
* <p>
|
||||
* Example: await for probe to succeed:
|
||||
* <pre>
|
||||
* await(
|
||||
* 30 * 1000, 500,
|
||||
* () -> { return 0 == filesystem.listFiles(new Path("/")).length); });
|
||||
* </pre>
|
||||
*
|
||||
* @param timeoutMillis timeout in milliseconds.
|
||||
* Can be zero, in which case only one attempt is made.
|
||||
* @param intervalMillis interval in milliseconds between checks
|
||||
* @param check predicate to evaluate
|
||||
* @return the number of iterations before the condition was satisfied
|
||||
* @throws Exception returned by {@code failure} on timeout
|
||||
* @throws FailFastException immediately if the evaluated operation raises it
|
||||
* @throws InterruptedException if interrupted.
|
||||
*/
|
||||
public static int await(int timeoutMillis,
|
||||
int intervalMillis,
|
||||
Callable<Boolean> check) throws Exception {
|
||||
return await(timeoutMillis, check,
|
||||
new FixedRetryInterval(intervalMillis),
|
||||
new GenerateTimeout());
|
||||
}
|
||||
|
||||
/**
|
||||
* Repeatedly execute a closure until it returns a value rather than
|
||||
* raise an exception.
|
||||
* Exceptions are caught and, with one exception,
|
||||
* trigger a sleep and retry. This is similar of ScalaTest's
|
||||
* {@code eventually(timeout, closure)} operation, though that lacks
|
||||
* the ability to fail fast if the inner closure has determined that
|
||||
* a failure condition is non-recoverable.
|
||||
* <p>
|
||||
* Example: spin until an the number of files in a filesystem is non-zero,
|
||||
* returning the files found.
|
||||
* The sleep interval backs off by 500 ms each iteration to a maximum of 5s.
|
||||
* <pre>
|
||||
* FileStatus[] files = eventually( 30 * 1000,
|
||||
* () -> {
|
||||
* FileStatus[] f = filesystem.listFiles(new Path("/"));
|
||||
* assertEquals(0, f.length);
|
||||
* return f;
|
||||
* },
|
||||
* new ProportionalRetryInterval(500, 5000));
|
||||
* </pre>
|
||||
* This allows for a fast exit, yet reduces probe frequency over time.
|
||||
*
|
||||
* @param <T> return type
|
||||
* @param timeoutMillis timeout in milliseconds.
|
||||
* Can be zero, in which case only one attempt is made before failing.
|
||||
* @param eval expression to evaluate
|
||||
* @param retry retry interval generator
|
||||
* @return result of the first successful eval call
|
||||
* @throws Exception the last exception thrown before timeout was triggered
|
||||
* @throws FailFastException if raised -without any retry attempt.
|
||||
* @throws InterruptedException if interrupted during the sleep operation.
|
||||
*/
|
||||
public static <T> T eventually(int timeoutMillis,
|
||||
Callable<T> eval,
|
||||
Callable<Integer> retry) throws Exception {
|
||||
Preconditions.checkArgument(timeoutMillis >= 0,
|
||||
"timeoutMillis must be >= 0");
|
||||
long endTime = Time.now() + timeoutMillis;
|
||||
Exception ex;
|
||||
boolean running;
|
||||
int sleeptime;
|
||||
int iterations = 0;
|
||||
do {
|
||||
iterations++;
|
||||
try {
|
||||
return eval.call();
|
||||
} catch (InterruptedException | FailFastException e) {
|
||||
// these two exceptions trigger an immediate exit
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
LOG.debug("evaluate() iteration {}", iterations, e);
|
||||
ex = e;
|
||||
}
|
||||
running = Time.now() < endTime;
|
||||
if (running && (sleeptime = retry.call()) >= 0) {
|
||||
Thread.sleep(sleeptime);
|
||||
}
|
||||
} while (running);
|
||||
// timeout. Throw the last exception raised
|
||||
throw ex;
|
||||
}
|
||||
|
||||
/**
|
||||
* Simplified {@link #eventually(int, Callable, Callable)} method
|
||||
* with a fixed interval.
|
||||
* <p>
|
||||
* Example: wait 30s until an assertion holds, sleeping 1s between each
|
||||
* check.
|
||||
* <pre>
|
||||
* eventually( 30 * 1000, 1000,
|
||||
* () -> { assertEquals(0, filesystem.listFiles(new Path("/")).length); }
|
||||
* );
|
||||
* </pre>
|
||||
*
|
||||
* @param timeoutMillis timeout in milliseconds.
|
||||
* Can be zero, in which case only one attempt is made before failing.
|
||||
* @param intervalMillis interval in milliseconds
|
||||
* @param eval expression to evaluate
|
||||
* @return result of the first successful invocation of {@code eval()}
|
||||
* @throws Exception the last exception thrown before timeout was triggered
|
||||
* @throws FailFastException if raised -without any retry attempt.
|
||||
* @throws InterruptedException if interrupted during the sleep operation.
|
||||
*/
|
||||
public static <T> T eventually(int timeoutMillis,
|
||||
int intervalMillis,
|
||||
Callable<T> eval) throws Exception {
|
||||
return eventually(timeoutMillis, eval,
|
||||
new FixedRetryInterval(intervalMillis));
|
||||
}
|
||||
|
||||
/**
|
||||
* Intercept an exception; throw an {@code AssertionError} if one not raised.
|
||||
* The caught exception is rethrown if it is of the wrong class or
|
||||
* does not contain the text defined in {@code contained}.
|
||||
* <p>
|
||||
* Example: expect deleting a nonexistent file to raise a
|
||||
* {@code FileNotFoundException}.
|
||||
* <pre>
|
||||
* FileNotFoundException ioe = intercept(FileNotFoundException.class,
|
||||
* () -> {
|
||||
* filesystem.delete(new Path("/missing"), false);
|
||||
* });
|
||||
* </pre>
|
||||
*
|
||||
* @param clazz class of exception; the raised exception must be this class
|
||||
* <i>or a subclass</i>.
|
||||
* @param eval expression to eval
|
||||
* @param <T> return type of expression
|
||||
* @param <E> exception class
|
||||
* @return the caught exception if it was of the expected type
|
||||
* @throws Exception any other exception raised
|
||||
* @throws AssertionError if the evaluation call didn't raise an exception.
|
||||
* The error includes the {@code toString()} value of the result, if this
|
||||
* can be determined.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T, E extends Throwable> E intercept(
|
||||
Class<E> clazz,
|
||||
Callable<T> eval)
|
||||
throws Exception {
|
||||
try {
|
||||
T result = eval.call();
|
||||
throw new AssertionError("Expected an exception, got "
|
||||
+ robustToString(result));
|
||||
} catch (Throwable e) {
|
||||
if (clazz.isAssignableFrom(e.getClass())) {
|
||||
return (E)e;
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Intercept an exception; throw an {@code AssertionError} if one not raised.
|
||||
* The caught exception is rethrown if it is of the wrong class or
|
||||
* does not contain the text defined in {@code contained}.
|
||||
* <p>
|
||||
* Example: expect deleting a nonexistent file to raise a
|
||||
* {@code FileNotFoundException} with the {@code toString()} value
|
||||
* containing the text {@code "missing"}.
|
||||
* <pre>
|
||||
* FileNotFoundException ioe = intercept(FileNotFoundException.class,
|
||||
* "missing",
|
||||
* () -> {
|
||||
* filesystem.delete(new Path("/missing"), false);
|
||||
* });
|
||||
* </pre>
|
||||
*
|
||||
* @param clazz class of exception; the raised exception must be this class
|
||||
* <i>or a subclass</i>.
|
||||
* @param contained string which must be in the {@code toString()} value
|
||||
* of the exception
|
||||
* @param eval expression to eval
|
||||
* @param <T> return type of expression
|
||||
* @param <E> exception class
|
||||
* @return the caught exception if it was of the expected type and contents
|
||||
* @throws Exception any other exception raised
|
||||
* @throws AssertionError if the evaluation call didn't raise an exception.
|
||||
* The error includes the {@code toString()} value of the result, if this
|
||||
* can be determined.
|
||||
* @see GenericTestUtils#assertExceptionContains(String, Throwable)
|
||||
*/
|
||||
public static <T, E extends Throwable> E intercept(
|
||||
Class<E> clazz,
|
||||
String contained,
|
||||
Callable<T> eval)
|
||||
throws Exception {
|
||||
E ex = intercept(clazz, eval);
|
||||
GenericTestUtils.assertExceptionContains(contained, ex);
|
||||
return ex;
|
||||
}
|
||||
|
||||
/**
|
||||
* Robust string converter for exception messages; if the {@code toString()}
|
||||
* method throws an exception then that exception is caught and logged,
|
||||
* then a simple string of the classname logged.
|
||||
* This stops a {@code toString()} failure hiding underlying problems.
|
||||
* @param o object to stringify
|
||||
* @return a string for exception messages
|
||||
*/
|
||||
private static String robustToString(Object o) {
|
||||
if (o == null) {
|
||||
return NULL_RESULT;
|
||||
} else {
|
||||
try {
|
||||
return o.toString();
|
||||
} catch (Exception e) {
|
||||
LOG.info("Exception calling toString()", e);
|
||||
return o.getClass().toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns {@code TimeoutException} on a timeout. If
|
||||
* there was a inner class passed in, includes it as the
|
||||
* inner failure.
|
||||
*/
|
||||
public static class GenerateTimeout implements TimeoutHandler {
|
||||
private final String message;
|
||||
|
||||
public GenerateTimeout(String message) {
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
public GenerateTimeout() {
|
||||
this("timeout");
|
||||
}
|
||||
|
||||
/**
|
||||
* Evaluate operation creates a new {@code TimeoutException}.
|
||||
* @param timeoutMillis timeout in millis
|
||||
* @param caught optional caught exception
|
||||
* @return TimeoutException
|
||||
*/
|
||||
@Override
|
||||
public Exception evaluate(int timeoutMillis, Exception caught)
|
||||
throws Exception {
|
||||
String s = String.format("%s: after %d millis", message,
|
||||
timeoutMillis);
|
||||
String caughtText = caught != null
|
||||
? ("; " + robustToString(caught)) : "";
|
||||
|
||||
return (TimeoutException) (new TimeoutException(s + caughtText)
|
||||
.initCause(caught));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retry at a fixed time period between calls.
|
||||
*/
|
||||
public static class FixedRetryInterval implements Callable<Integer> {
|
||||
private final int intervalMillis;
|
||||
private int invocationCount = 0;
|
||||
|
||||
public FixedRetryInterval(int intervalMillis) {
|
||||
Preconditions.checkArgument(intervalMillis > 0);
|
||||
this.intervalMillis = intervalMillis;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
invocationCount++;
|
||||
return intervalMillis;
|
||||
}
|
||||
|
||||
public int getInvocationCount() {
|
||||
return invocationCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder sb = new StringBuilder(
|
||||
"FixedRetryInterval{");
|
||||
sb.append("interval=").append(intervalMillis);
|
||||
sb.append(", invocationCount=").append(invocationCount);
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gradually increase the sleep time by the initial interval, until
|
||||
* the limit set by {@code maxIntervalMillis} is reached.
|
||||
*/
|
||||
public static class ProportionalRetryInterval implements Callable<Integer> {
|
||||
private final int intervalMillis;
|
||||
private final int maxIntervalMillis;
|
||||
private int current;
|
||||
private int invocationCount = 0;
|
||||
|
||||
public ProportionalRetryInterval(int intervalMillis,
|
||||
int maxIntervalMillis) {
|
||||
Preconditions.checkArgument(intervalMillis > 0);
|
||||
Preconditions.checkArgument(maxIntervalMillis > 0);
|
||||
this.intervalMillis = intervalMillis;
|
||||
this.current = intervalMillis;
|
||||
this.maxIntervalMillis = maxIntervalMillis;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
invocationCount++;
|
||||
int last = current;
|
||||
if (last < maxIntervalMillis) {
|
||||
current += intervalMillis;
|
||||
}
|
||||
return last;
|
||||
}
|
||||
|
||||
public int getInvocationCount() {
|
||||
return invocationCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder sb = new StringBuilder(
|
||||
"ProportionalRetryInterval{");
|
||||
sb.append("interval=").append(intervalMillis);
|
||||
sb.append(", current=").append(current);
|
||||
sb.append(", limit=").append(maxIntervalMillis);
|
||||
sb.append(", invocationCount=").append(invocationCount);
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An exception which triggers a fast exist from the
|
||||
* {@link #eventually(int, Callable, Callable)} and
|
||||
* {@link #await(int, Callable, Callable, TimeoutHandler)} loops.
|
||||
*/
|
||||
public static class FailFastException extends Exception {
|
||||
|
||||
public FailFastException(String detailMessage) {
|
||||
super(detailMessage);
|
||||
}
|
||||
|
||||
public FailFastException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
/**
|
||||
* Instantiate from a format string.
|
||||
* @param format format string
|
||||
* @param args arguments to format
|
||||
* @return an instance with the message string constructed.
|
||||
*/
|
||||
public static FailFastException newInstance(String format, Object...args) {
|
||||
return new FailFastException(String.format(format, args));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,249 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.test;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.*;
|
||||
import static org.apache.hadoop.test.GenericTestUtils.*;
|
||||
|
||||
/**
|
||||
* Test the logic in {@link LambdaTestUtils}.
|
||||
* This test suite includes Java 8 and Java 7 code; the Java 8 code exists
|
||||
* to verify that the API is easily used with Lambda expressions.
|
||||
*/
|
||||
public class TestLambdaTestUtils extends Assert {
|
||||
|
||||
public static final int INTERVAL = 10;
|
||||
public static final int TIMEOUT = 50;
|
||||
private FixedRetryInterval retry = new FixedRetryInterval(INTERVAL);
|
||||
|
||||
/**
|
||||
* Always evaluates to true.
|
||||
*/
|
||||
public static final Callable<Boolean> ALWAYS_TRUE =
|
||||
new Callable<Boolean>() {
|
||||
@Override
|
||||
public Boolean call() throws Exception {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Always evaluates to false.
|
||||
*/
|
||||
public static final Callable<Boolean> ALWAYS_FALSE =
|
||||
new Callable<Boolean>() {
|
||||
@Override
|
||||
public Boolean call() throws Exception {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Text in the raised FNFE.
|
||||
*/
|
||||
public static final String MISSING = "not found";
|
||||
|
||||
/**
|
||||
* A predicate that always throws a FileNotFoundException.
|
||||
*/
|
||||
public static final Callable<Boolean> ALWAYS_FNFE =
|
||||
new Callable<Boolean>() {
|
||||
@Override
|
||||
public Boolean call() throws Exception {
|
||||
throw new FileNotFoundException(MISSING);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* reusable timeout handler.
|
||||
*/
|
||||
public static final GenerateTimeout
|
||||
TIMEOUT_FAILURE_HANDLER = new GenerateTimeout();
|
||||
|
||||
/**
|
||||
* Always evaluates to 3L.
|
||||
*/
|
||||
public static final Callable<Long> EVAL_3L = new Callable<Long>() {
|
||||
@Override
|
||||
public Long call() throws Exception {
|
||||
return 3L;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Always raises a {@code FileNotFoundException}.
|
||||
*/
|
||||
public static final Callable<Long> EVAL_FNFE = new Callable<Long>() {
|
||||
@Override
|
||||
public Long call() throws Exception {
|
||||
throw new FileNotFoundException(MISSING);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Assert the retry count is as expected.
|
||||
* @param expected expected value
|
||||
*/
|
||||
protected void assertRetryCount(int expected) {
|
||||
assertEquals(retry.toString(), expected, retry.getInvocationCount());
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert the retry count is as expected.
|
||||
* @param minCount minimum value
|
||||
*/
|
||||
protected void assertMinRetryCount(int minCount) {
|
||||
assertTrue("retry count of " + retry + " is not >= " + minCount,
|
||||
minCount <= retry.getInvocationCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAwaitAlwaysTrue() throws Throwable {
|
||||
await(TIMEOUT,
|
||||
ALWAYS_TRUE,
|
||||
new FixedRetryInterval(INTERVAL),
|
||||
TIMEOUT_FAILURE_HANDLER);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAwaitAlwaysFalse() throws Throwable {
|
||||
try {
|
||||
await(TIMEOUT,
|
||||
ALWAYS_FALSE,
|
||||
retry,
|
||||
TIMEOUT_FAILURE_HANDLER);
|
||||
fail("should not have got here");
|
||||
} catch (TimeoutException e) {
|
||||
assertTrue(retry.getInvocationCount() > 4);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAwaitLinearRetry() throws Throwable {
|
||||
ProportionalRetryInterval linearRetry =
|
||||
new ProportionalRetryInterval(INTERVAL * 2, TIMEOUT * 2);
|
||||
try {
|
||||
await(TIMEOUT,
|
||||
ALWAYS_FALSE,
|
||||
linearRetry,
|
||||
TIMEOUT_FAILURE_HANDLER);
|
||||
fail("should not have got here");
|
||||
} catch (TimeoutException e) {
|
||||
assertEquals(linearRetry.toString(),
|
||||
2, linearRetry.getInvocationCount());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAwaitFNFE() throws Throwable {
|
||||
try {
|
||||
await(TIMEOUT,
|
||||
ALWAYS_FNFE,
|
||||
retry,
|
||||
TIMEOUT_FAILURE_HANDLER);
|
||||
fail("should not have got here");
|
||||
} catch (TimeoutException e) {
|
||||
// inner clause is included
|
||||
assertTrue(retry.getInvocationCount() > 0);
|
||||
assertTrue(e.getCause() instanceof FileNotFoundException);
|
||||
assertExceptionContains(MISSING, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetryInterval() throws Throwable {
|
||||
ProportionalRetryInterval interval =
|
||||
new ProportionalRetryInterval(200, 1000);
|
||||
assertEquals(200, (int) interval.call());
|
||||
assertEquals(400, (int) interval.call());
|
||||
assertEquals(600, (int) interval.call());
|
||||
assertEquals(800, (int) interval.call());
|
||||
assertEquals(1000, (int) interval.call());
|
||||
assertEquals(1000, (int) interval.call());
|
||||
assertEquals(1000, (int) interval.call());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInterceptSuccess() throws Throwable {
|
||||
IOException ioe = intercept(IOException.class, ALWAYS_FNFE);
|
||||
assertExceptionContains(MISSING, ioe);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInterceptContains() throws Throwable {
|
||||
intercept(IOException.class, MISSING, ALWAYS_FNFE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInterceptContainsWrongString() throws Throwable {
|
||||
try {
|
||||
FileNotFoundException e =
|
||||
intercept(FileNotFoundException.class, "404", ALWAYS_FNFE);
|
||||
assertNotNull(e);
|
||||
throw e;
|
||||
} catch (AssertionError expected) {
|
||||
assertExceptionContains(MISSING, expected);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInterceptVoidCallable() throws Throwable {
|
||||
intercept(AssertionError.class,
|
||||
NULL_RESULT,
|
||||
new Callable<IOException>() {
|
||||
@Override
|
||||
public IOException call() throws Exception {
|
||||
return intercept(IOException.class,
|
||||
new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEventually() throws Throwable {
|
||||
long result = eventually(TIMEOUT, EVAL_3L, retry);
|
||||
assertEquals(3, result);
|
||||
assertEquals(0, retry.getInvocationCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEventuallyFailuresRetry() throws Throwable {
|
||||
try {
|
||||
eventually(TIMEOUT, EVAL_FNFE, retry);
|
||||
fail("should not have got here");
|
||||
} catch (IOException expected) {
|
||||
// expected
|
||||
assertMinRetryCount(1);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -25,6 +25,8 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
|
||||
import org.apache.hadoop.fs.contract.s3a.S3AContract;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -34,8 +36,6 @@ import java.io.FileNotFoundException;
|
|||
import java.util.concurrent.Callable;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
|
||||
|
||||
/**
|
||||
* Test S3A Failure translation, including a functional test
|
||||
|
@ -68,13 +68,15 @@ public class ITestS3AFailureHandling extends AbstractFSContractTestBase {
|
|||
writeDataset(fs, testpath, shortDataset, shortDataset.length, 1024, true);
|
||||
// here the file length is less. Probe the file to see if this is true,
|
||||
// with a spin and wait
|
||||
eventually(30 *1000, new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
assertEquals(shortLen, fs.getFileStatus(testpath).getLen());
|
||||
return null;
|
||||
}
|
||||
});
|
||||
LambdaTestUtils.eventually(30 * 1000, 1000,
|
||||
new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
assertEquals(shortLen, fs.getFileStatus(testpath).getLen());
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
// here length is shorter. Assuming it has propagated to all replicas,
|
||||
// the position of the input stream is now beyond the EOF.
|
||||
// An attempt to seek backwards to a position greater than the
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.slf4j.Logger;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
|
||||
|
@ -135,32 +134,6 @@ public class S3ATestUtils {
|
|||
return fc;
|
||||
}
|
||||
|
||||
/**
|
||||
* Repeatedly attempt a callback until timeout or a {@link FailFastException}
|
||||
* is raised. This is modeled on ScalaTests {@code eventually(Closure)} code.
|
||||
* @param timeout timeout
|
||||
* @param callback callback to invoke
|
||||
* @throws FailFastException any fast-failure
|
||||
* @throws Exception the exception which caused the iterator to fail
|
||||
*/
|
||||
public static void eventually(int timeout, Callable<Void> callback)
|
||||
throws Exception {
|
||||
Exception lastException;
|
||||
long endtime = System.currentTimeMillis() + timeout;
|
||||
do {
|
||||
try {
|
||||
callback.call();
|
||||
return;
|
||||
} catch (InterruptedException | FailFastException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
lastException = e;
|
||||
}
|
||||
Thread.sleep(500);
|
||||
} while (endtime > System.currentTimeMillis());
|
||||
throw lastException;
|
||||
}
|
||||
|
||||
/**
|
||||
* patch the endpoint option so that irrespective of where other tests
|
||||
* are working, the IO performance tests can work with the landsat
|
||||
|
@ -290,27 +263,6 @@ public class S3ATestUtils {
|
|||
? propval : confVal;
|
||||
}
|
||||
|
||||
/**
|
||||
* The exception to raise so as to exit fast from
|
||||
* {@link #eventually(int, Callable)}.
|
||||
*/
|
||||
public static class FailFastException extends Exception {
|
||||
public FailFastException() {
|
||||
}
|
||||
|
||||
public FailFastException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public FailFastException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public FailFastException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify the class of an exception. If it is not as expected, rethrow it.
|
||||
* Comparison is on the exact class, not subclass-of inference as
|
||||
|
|
Loading…
Reference in New Issue