HADOOP-13716. Add LambdaTestUtils class for tests; fix eventual consistency problem in contract test setup. Contributed by Steve Loughran.
This commit is contained in:
parent
b4edd115b4
commit
671d219c9c
|
@ -27,12 +27,16 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.deleteChildren;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.dumpStats;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.listChildren;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.toList;
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk;
|
||||
|
@ -45,6 +49,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk;
|
|||
public abstract class AbstractContractRootDirectoryTest extends AbstractFSContractTestBase {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(AbstractContractRootDirectoryTest.class);
|
||||
public static final int OBJECTSTORE_RETRY_TIMEOUT = 30000;
|
||||
|
||||
@Override
|
||||
public void setup() throws Exception {
|
||||
|
@ -79,23 +84,34 @@ public abstract class AbstractContractRootDirectoryTest extends AbstractFSContra
|
|||
// extra sanity checks here to avoid support calls about complete loss
|
||||
// of data
|
||||
skipIfUnsupported(TEST_ROOT_TESTS_ENABLED);
|
||||
Path root = new Path("/");
|
||||
final Path root = new Path("/");
|
||||
assertIsDirectory(root);
|
||||
// make sure it is clean
|
||||
FileSystem fs = getFileSystem();
|
||||
deleteChildren(fs, root, true);
|
||||
FileStatus[] children = listChildren(fs, root);
|
||||
if (children.length > 0) {
|
||||
StringBuilder error = new StringBuilder();
|
||||
error.append("Deletion of child entries failed, still have")
|
||||
.append(children.length)
|
||||
.append(System.lineSeparator());
|
||||
for (FileStatus child : children) {
|
||||
error.append(" ").append(child.getPath())
|
||||
.append(System.lineSeparator());
|
||||
}
|
||||
fail(error.toString());
|
||||
}
|
||||
// make sure the directory is clean. This includes some retry logic
|
||||
// to forgive blobstores whose listings can be out of sync with the file
|
||||
// status;
|
||||
final FileSystem fs = getFileSystem();
|
||||
final AtomicInteger iterations = new AtomicInteger(0);
|
||||
final FileStatus[] originalChildren = listChildren(fs, root);
|
||||
LambdaTestUtils.evaluate(
|
||||
new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
FileStatus[] deleted = deleteChildren(fs, root, true);
|
||||
FileStatus[] children = listChildren(fs, root);
|
||||
if (children.length > 0) {
|
||||
fail(String.format(
|
||||
"After %d attempts: listing after rm /* not empty"
|
||||
+ "\n%s\n%s\n%s",
|
||||
iterations.incrementAndGet(),
|
||||
dumpStats("final", children),
|
||||
dumpStats("deleted", deleted),
|
||||
dumpStats("original", originalChildren)));
|
||||
}
|
||||
return null;
|
||||
}
|
||||
},
|
||||
OBJECTSTORE_RETRY_TIMEOUT,
|
||||
new LambdaTestUtils.LinearRetryInterval(50, 1000));
|
||||
// then try to delete the empty one
|
||||
boolean deleted = fs.delete(root, false);
|
||||
LOG.info("rm / of empty dir result is {}", deleted);
|
||||
|
|
|
@ -400,18 +400,18 @@ public class ContractTestUtils extends Assert {
|
|||
* @param fileSystem filesystem
|
||||
* @param path path to delete
|
||||
* @param recursive flag to indicate child entry deletion should be recursive
|
||||
* @return the number of child entries found and deleted (not including
|
||||
* @return the immediate child entries found and deleted (not including
|
||||
* any recursive children of those entries)
|
||||
* @throws IOException problem in the deletion process.
|
||||
*/
|
||||
public static int deleteChildren(FileSystem fileSystem,
|
||||
public static FileStatus[] deleteChildren(FileSystem fileSystem,
|
||||
Path path,
|
||||
boolean recursive) throws IOException {
|
||||
FileStatus[] children = listChildren(fileSystem, path);
|
||||
for (FileStatus entry : children) {
|
||||
fileSystem.delete(entry.getPath(), recursive);
|
||||
}
|
||||
return children.length;
|
||||
return children;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,428 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.test;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* Class containing methods and associated classes to make the most of Lambda
|
||||
* expressions in Hadoop tests.
|
||||
*
|
||||
* The code has been designed from the outset to be Java-8 friendly, but still
|
||||
* be usable in Java 7.
|
||||
* In particular: support for waiting for a condition to be met.
|
||||
* This is to avoid tests having hard-coded sleeps in them.
|
||||
*
|
||||
* The code is modelled on {@code GenericTestUtils#waitFor(Supplier, int, int)},
|
||||
* but also lifts concepts from Scalatest's {@code awaitResult} and
|
||||
* its notion of pluggable retry logic (simple, backoff, maybe even things
|
||||
* with jitter: test author gets to choose).
|
||||
* The {@code intercept} method is also all credit due Scalatest, though
|
||||
* it's been extended to also support a string message check; useful when
|
||||
* checking the contents of the exception.
|
||||
*/
|
||||
public final class LambdaTestUtils {
|
||||
public static final Logger LOG = LoggerFactory.getLogger(LambdaTestUtils.class);
|
||||
|
||||
private LambdaTestUtils() {
|
||||
}
|
||||
|
||||
/**
|
||||
* This is the string included in the assertion text in
|
||||
* {@link #intercept(Class, Callable)} if
|
||||
* the closure returned a null value.
|
||||
*/
|
||||
public static final String NULL_RESULT = "(null)";
|
||||
|
||||
/**
|
||||
* Interface to implement for converting a timeout into some form
|
||||
* of exception to raise.
|
||||
*/
|
||||
public interface TimeoutHandler {
|
||||
|
||||
/**
|
||||
* Create an exception (or throw one, if desired).
|
||||
* @param timeoutMillis timeout which has arisen
|
||||
* @param caught any exception which was caught; may be null
|
||||
* @return an exception which will then be thrown
|
||||
* @throws Exception if the handler wishes to raise an exception
|
||||
* that way.
|
||||
*/
|
||||
Exception evaluate(int timeoutMillis, Exception caught) throws Exception;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for a condition to be met.
|
||||
* @param check predicate to evaluate
|
||||
* @param timeoutMillis timeout in milliseconds.
|
||||
* Can be zero, in which case only one attempt is made.
|
||||
* @param retry retry escalation logic
|
||||
* @param failure handler invoked on failure; the returned exception
|
||||
* will be thrown
|
||||
* @return the number of iterations before the condition was satisfied
|
||||
* @throws Exception returned by {@code failure} on timeout
|
||||
* @throws FailFastException immediately if the evaluated operation raises it
|
||||
* @throws InterruptedException if interrupted.
|
||||
*/
|
||||
public static int eventually(Callable<Boolean> check,
|
||||
int timeoutMillis,
|
||||
Callable<Integer> retry,
|
||||
TimeoutHandler failure)
|
||||
throws Exception {
|
||||
Preconditions.checkArgument(timeoutMillis >= 0,
|
||||
"timeoutMillis must be > 0");
|
||||
|
||||
long endTime = Time.now() + timeoutMillis;
|
||||
Exception ex = null;
|
||||
boolean running = true;
|
||||
int iterations = 0;
|
||||
while (running) {
|
||||
iterations++;
|
||||
try {
|
||||
if (check.call()) {
|
||||
return iterations;
|
||||
}
|
||||
} catch (InterruptedException | FailFastException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
LOG.debug("eventually() iteration {}", iterations, e);
|
||||
ex = e;
|
||||
}
|
||||
running = Time.now() < endTime;
|
||||
if (running) {
|
||||
int sleeptime = retry.call();
|
||||
if (sleeptime >= 0) {
|
||||
Thread.sleep(sleeptime);
|
||||
} else {
|
||||
running = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
// timeout
|
||||
throw failure.evaluate(timeoutMillis, ex);
|
||||
}
|
||||
|
||||
/**
|
||||
* Simplified {@code eventually()} clause; fixed interval
|
||||
* and {@link GenerateTimeout} used to generate the timeout.
|
||||
* @param check predicate to evaluate
|
||||
* @param timeoutMillis timeout in milliseconds.
|
||||
* Can be zero, in which case only one attempt is made.
|
||||
* @param intervalMillis interval in milliseconds between checks
|
||||
* @return the number of iterations before the condition was satisfied
|
||||
* @throws Exception returned by {@code failure} on timeout
|
||||
* @throws FailFastException immediately if the evaluated operation raises it
|
||||
* @throws InterruptedException if interrupted.
|
||||
*/
|
||||
public static int eventually(Callable<Boolean> check,
|
||||
int timeoutMillis,
|
||||
int intervalMillis) throws Exception {
|
||||
return eventually(check,
|
||||
timeoutMillis,
|
||||
new FixedRetryInterval(intervalMillis),
|
||||
new GenerateTimeout());
|
||||
}
|
||||
|
||||
/**
|
||||
* Await a result; exceptions are caught and, with one exception,
|
||||
* trigger a sleep and retry. This is similar of ScalaTest's
|
||||
* {@code Await.result()} operation, though that lacks the ability to
|
||||
* fail fast if the inner closure has determined that a failure condition
|
||||
* is non-recoverable.
|
||||
* @param eval expression to evaluate
|
||||
* @param timeoutMillis timeout in milliseconds.
|
||||
* Can be zero, in which case only one attempt is made.
|
||||
* @param retry retry interval generator
|
||||
* @param <T> return type
|
||||
* @return result of the first successful eval call
|
||||
* @throws Exception the last exception thrown before timeout was triggered
|
||||
* @throws FailFastException if raised -without any retry attempt.
|
||||
* @throws InterruptedException if interrupted during the sleep operation.
|
||||
*/
|
||||
public static <T> T evaluate(Callable<T> eval,
|
||||
int timeoutMillis,
|
||||
Callable<Integer> retry) throws Exception {
|
||||
Preconditions.checkArgument(timeoutMillis >= 0,
|
||||
"timeoutMillis must be >= 0");
|
||||
long endTime = Time.now() + timeoutMillis;
|
||||
Exception ex;
|
||||
boolean running;
|
||||
int sleeptime;
|
||||
int iterations = 0;
|
||||
do {
|
||||
iterations++;
|
||||
try {
|
||||
return eval.call();
|
||||
} catch (InterruptedException | FailFastException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
LOG.debug("evaluate() iteration {}", iterations, e);
|
||||
ex = e;
|
||||
}
|
||||
running = Time.now() < endTime;
|
||||
if (running && (sleeptime = retry.call()) >= 0) {
|
||||
Thread.sleep(sleeptime);
|
||||
}
|
||||
} while (running);
|
||||
// timeout. Throw the last exception raised
|
||||
throw ex;
|
||||
}
|
||||
|
||||
/**
|
||||
* Simplified {@code evaluate()} clause; fixed interval.
|
||||
* @param check predicate to evaluate
|
||||
* @param timeoutMillis wait interval between check failures
|
||||
* @param intervalMillis interval in milliseconds
|
||||
* @return result of the first successful eval call
|
||||
* @throws Exception the last exception thrown before timeout was triggered
|
||||
* @throws FailFastException if raised -without any retry attempt.
|
||||
* @throws InterruptedException if interrupted during the sleep operation.
|
||||
*/
|
||||
public static <T> T evaluate(Callable<T> eval,
|
||||
int timeoutMillis,
|
||||
int intervalMillis) throws Exception {
|
||||
return evaluate(eval,
|
||||
timeoutMillis,
|
||||
new FixedRetryInterval(intervalMillis));
|
||||
}
|
||||
|
||||
/**
|
||||
* Intercept an exception; raise an exception if it was not raised.
|
||||
* Exceptions of the wrong class are also rethrown.
|
||||
* @param clazz class of exception; the raised exception must be this class
|
||||
* <i>or a subclass</i>.
|
||||
* @param eval expression to eval
|
||||
* @param <T> return type of expression
|
||||
* @param <E> exception class
|
||||
* @return the caught exception if it was of the expected type
|
||||
* @throws Exception any other exception raised
|
||||
* @throws AssertionError if the evaluation call didn't raise an exception.
|
||||
* The error includes the {@code toString()} value of the result, if this
|
||||
* can be determined.
|
||||
*/
|
||||
public static <T, E extends Throwable> E intercept(
|
||||
Class<E> clazz,
|
||||
Callable<T> eval)
|
||||
throws Exception {
|
||||
try {
|
||||
T result = eval.call();
|
||||
throw new AssertionError("Expected an exception, got "
|
||||
+ robustToString(result));
|
||||
} catch (Throwable e) {
|
||||
if (clazz.isAssignableFrom(e.getClass())) {
|
||||
return (E)e;
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Intercept an exception; raise an exception if it was not raised.
|
||||
* Exceptions of the wrong class are also rethrown.
|
||||
* @param clazz class of exception; the raised exception must be this class
|
||||
* <i>or a subclass</i>.
|
||||
* @param contained string which must be in the {@code toString()} value
|
||||
* of the exception
|
||||
* @param eval expression to eval
|
||||
* @param <T> return type of expression
|
||||
* @param <E> exception class
|
||||
* @return the caught exception if it was of the expected type and contents
|
||||
* @throws Exception any other exception raised
|
||||
* @throws AssertionError if the evaluation call didn't raise an exception.
|
||||
* The error includes the {@code toString()} value of the result, if this
|
||||
* can be determined.
|
||||
*/
|
||||
public static <T, E extends Throwable> E intercept(
|
||||
Class<E> clazz,
|
||||
String contained,
|
||||
Callable<T> eval)
|
||||
throws Exception {
|
||||
E ex = intercept(clazz, eval);
|
||||
GenericTestUtils.assertExceptionContains(contained, ex);
|
||||
return ex;
|
||||
}
|
||||
|
||||
/**
|
||||
* Robust string converter for exception messages; if the {@code toString()}
|
||||
* method throws an exception then that exception is caught and logged,
|
||||
* then a simple string of the classname logged.
|
||||
* This stops a toString() failure hiding underlying problems in the code.
|
||||
* @param o object to stringify
|
||||
* @return a string for exception messages
|
||||
*/
|
||||
private static String robustToString(Object o) {
|
||||
if (o == null) {
|
||||
return NULL_RESULT;
|
||||
} else {
|
||||
try {
|
||||
return o.toString();
|
||||
} catch (Exception e) {
|
||||
LOG.info("Exception calling toString()", e);
|
||||
return o.getClass().toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns {@code TimeoutException} on a timeout. If
|
||||
* there was a inner class passed in, includes it as the
|
||||
* inner failure.
|
||||
*/
|
||||
public static class GenerateTimeout implements TimeoutHandler {
|
||||
private final String message;
|
||||
|
||||
public GenerateTimeout(String message) {
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
public GenerateTimeout() {
|
||||
this("timeout");
|
||||
}
|
||||
|
||||
/**
|
||||
* Evaluate by creating a new Timeout Exception.
|
||||
* @param timeoutMillis timeout in millis
|
||||
* @param caught optional caught exception
|
||||
* @return TimeoutException
|
||||
*/
|
||||
@Override
|
||||
public Exception evaluate(int timeoutMillis, Exception caught)
|
||||
throws Exception {
|
||||
String s = String.format("%s: after %d millis", message,
|
||||
timeoutMillis);
|
||||
String caughtText = caught != null
|
||||
? ("; " + robustToString(caught)) : "";
|
||||
|
||||
return (TimeoutException) (new TimeoutException(s + caughtText)
|
||||
.initCause(caught));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retry at a fixed time period between calls.
|
||||
*/
|
||||
public static class FixedRetryInterval implements Callable<Integer> {
|
||||
private final int intervalMillis;
|
||||
private int invocationCount = 0;
|
||||
|
||||
public FixedRetryInterval(int intervalMillis) {
|
||||
Preconditions.checkArgument(intervalMillis > 0);
|
||||
this.intervalMillis = intervalMillis;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
invocationCount++;
|
||||
return intervalMillis;
|
||||
}
|
||||
|
||||
public int getInvocationCount() {
|
||||
return invocationCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder sb = new StringBuilder(
|
||||
"FixedRetryInterval{");
|
||||
sb.append("interval=").append(intervalMillis);
|
||||
sb.append(", invocationCount=").append(invocationCount);
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gradually increase the sleep time by the initial interval, until
|
||||
* the limit set by {@code maxIntervalMillis} is reached.
|
||||
*/
|
||||
public static class LinearRetryInterval implements Callable<Integer> {
|
||||
private final int intervalMillis;
|
||||
private final int maxIntervalMillis;
|
||||
private int current;
|
||||
private int invocationCount = 0;
|
||||
|
||||
public LinearRetryInterval(int intervalMillis, int maxIntervalMillis) {
|
||||
Preconditions.checkArgument(intervalMillis > 0);
|
||||
Preconditions.checkArgument(maxIntervalMillis > 0);
|
||||
this.intervalMillis = intervalMillis;
|
||||
this.current = intervalMillis;
|
||||
this.maxIntervalMillis = maxIntervalMillis;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer call() throws Exception {
|
||||
invocationCount++;
|
||||
int last = current;
|
||||
if (last < maxIntervalMillis) {
|
||||
current += intervalMillis;
|
||||
}
|
||||
return last;
|
||||
}
|
||||
|
||||
public int getInvocationCount() {
|
||||
return invocationCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder sb = new StringBuilder(
|
||||
"LinearRetryInterval{");
|
||||
sb.append("interval=").append(intervalMillis);
|
||||
sb.append(", current=").append(current);
|
||||
sb.append(", limit=").append(maxIntervalMillis);
|
||||
sb.append(", invocationCount=").append(invocationCount);
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* An exception which triggers a fast exist from the
|
||||
* {@link #evaluate(Callable, int, Callable)} and
|
||||
* {@link #eventually(Callable, int, Callable, TimeoutHandler)} loops.
|
||||
*/
|
||||
public static class FailFastException extends Exception {
|
||||
|
||||
public FailFastException(String detailMessage) {
|
||||
super(detailMessage);
|
||||
}
|
||||
|
||||
public FailFastException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
/**
|
||||
* Instantiate from a format string.
|
||||
* @param format format string
|
||||
* @param args arguments to format
|
||||
* @return an instance with the message string constructed.
|
||||
*/
|
||||
public static FailFastException newInstance(String format, Object...args) {
|
||||
return new FailFastException(String.format(format, args));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,240 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.test;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.apache.hadoop.test.LambdaTestUtils.*;
|
||||
import static org.apache.hadoop.test.GenericTestUtils.*;
|
||||
|
||||
/**
|
||||
* Test the logic in {@link LambdaTestUtils}.
|
||||
* This test suite includes Java 8 and Java 7 code; the Java 8 code exists
|
||||
* to verify that the API is easily used with Lambda expressions.
|
||||
*/
|
||||
public class TestLambdaTestUtils extends Assert {
|
||||
|
||||
public static final int INTERVAL = 10;
|
||||
public static final int TIMEOUT = 50;
|
||||
private FixedRetryInterval retry = new FixedRetryInterval(INTERVAL);
|
||||
// counter for lambda expressions to use
|
||||
private int count;
|
||||
|
||||
/**
|
||||
* Always evaluates to true.
|
||||
*/
|
||||
public static final Callable<Boolean> ALWAYS_TRUE =
|
||||
new Callable<Boolean>() {
|
||||
@Override
|
||||
public Boolean call() throws Exception {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Always evaluates to false.
|
||||
*/
|
||||
public static final Callable<Boolean> ALWAYS_FALSE =
|
||||
new Callable<Boolean>() {
|
||||
@Override
|
||||
public Boolean call() throws Exception {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Text in the raised FNFE.
|
||||
*/
|
||||
public static final String MISSING = "not found";
|
||||
|
||||
/**
|
||||
* A predicate that always throws a FileNotFoundException.
|
||||
*/
|
||||
public static final Callable<Boolean> ALWAYS_FNFE =
|
||||
new Callable<Boolean>() {
|
||||
@Override
|
||||
public Boolean call() throws Exception {
|
||||
throw new FileNotFoundException(MISSING);
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* reusable timeout handler.
|
||||
*/
|
||||
public static final GenerateTimeout
|
||||
TIMEOUT_FAILURE_HANDLER = new GenerateTimeout();
|
||||
|
||||
/**
|
||||
* Always evaluates to 3L.
|
||||
*/
|
||||
public static final Callable<Long> EVAL_3L = new Callable<Long>() {
|
||||
@Override
|
||||
public Long call() throws Exception {
|
||||
return 3L;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Always raises a {@code FileNotFoundException}.
|
||||
*/
|
||||
public static final Callable<Long> EVAL_FNFE = new Callable<Long>() {
|
||||
@Override
|
||||
public Long call() throws Exception {
|
||||
throw new FileNotFoundException(MISSING);
|
||||
}
|
||||
};
|
||||
|
||||
@Test
|
||||
public void testEventuallyAlwaysTrue() throws Throwable {
|
||||
eventually(
|
||||
ALWAYS_TRUE,
|
||||
TIMEOUT,
|
||||
new FixedRetryInterval(INTERVAL),
|
||||
TIMEOUT_FAILURE_HANDLER);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEventuallyAlwaysFalse() throws Throwable {
|
||||
try {
|
||||
eventually(
|
||||
ALWAYS_FALSE,
|
||||
TIMEOUT,
|
||||
retry,
|
||||
TIMEOUT_FAILURE_HANDLER);
|
||||
fail("should not have got here");
|
||||
} catch (TimeoutException e) {
|
||||
assertTrue(retry.getInvocationCount() > 4);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEventuallyLinearRetry() throws Throwable {
|
||||
LinearRetryInterval linearRetry = new LinearRetryInterval(
|
||||
INTERVAL * 2,
|
||||
TIMEOUT * 2);
|
||||
try {
|
||||
eventually(
|
||||
ALWAYS_FALSE,
|
||||
TIMEOUT,
|
||||
linearRetry,
|
||||
TIMEOUT_FAILURE_HANDLER);
|
||||
fail("should not have got here");
|
||||
} catch (TimeoutException e) {
|
||||
assertEquals(linearRetry.toString(),
|
||||
2, linearRetry.getInvocationCount());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEventuallyFNFE() throws Throwable {
|
||||
try {
|
||||
eventually(
|
||||
ALWAYS_FNFE,
|
||||
TIMEOUT,
|
||||
retry,
|
||||
TIMEOUT_FAILURE_HANDLER);
|
||||
fail("should not have got here");
|
||||
} catch (TimeoutException e) {
|
||||
// inner clause is included
|
||||
assertTrue(retry.getInvocationCount() > 0);
|
||||
assertTrue(e.getCause() instanceof FileNotFoundException);
|
||||
assertExceptionContains(MISSING, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEvaluate() throws Throwable {
|
||||
long result = evaluate(EVAL_3L,
|
||||
TIMEOUT,
|
||||
retry);
|
||||
assertEquals(3, result);
|
||||
assertEquals(0, retry.getInvocationCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEvalFailuresRetry() throws Throwable {
|
||||
try {
|
||||
evaluate(EVAL_FNFE,
|
||||
TIMEOUT,
|
||||
retry);
|
||||
fail("should not have got here");
|
||||
} catch (IOException expected) {
|
||||
// expected
|
||||
assertMinRetryCount(1);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLinearRetryInterval() throws Throwable {
|
||||
LinearRetryInterval interval = new LinearRetryInterval(200, 1000);
|
||||
assertEquals(200, (int) interval.call());
|
||||
assertEquals(400, (int) interval.call());
|
||||
assertEquals(600, (int) interval.call());
|
||||
assertEquals(800, (int) interval.call());
|
||||
assertEquals(1000, (int) interval.call());
|
||||
assertEquals(1000, (int) interval.call());
|
||||
assertEquals(1000, (int) interval.call());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInterceptSuccess() throws Throwable {
|
||||
IOException ioe = intercept(IOException.class, ALWAYS_FNFE);
|
||||
assertExceptionContains(MISSING, ioe);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInterceptContainsSuccess() throws Throwable {
|
||||
intercept(IOException.class, MISSING, ALWAYS_FNFE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInterceptContainsWrongString() throws Throwable {
|
||||
try {
|
||||
FileNotFoundException e =
|
||||
intercept(FileNotFoundException.class, "404", ALWAYS_FNFE);
|
||||
throw e;
|
||||
} catch (AssertionError expected) {
|
||||
assertExceptionContains(MISSING, expected);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert the retry count is as expected.
|
||||
* @param expected expected value
|
||||
*/
|
||||
protected void assertRetryCount(int expected) {
|
||||
assertEquals(retry.toString(), expected, retry.getInvocationCount());
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert the retry count is as expected.
|
||||
* @param minCount minimum value
|
||||
*/
|
||||
protected void assertMinRetryCount(int minCount) {
|
||||
assertTrue("retry count of " + retry
|
||||
+ " is not >= " + minCount,
|
||||
minCount <= retry.getInvocationCount());
|
||||
}
|
||||
}
|
|
@ -25,6 +25,8 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
|
||||
import org.apache.hadoop.fs.contract.s3a.S3AContract;
|
||||
import org.apache.hadoop.test.LambdaTestUtils;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -34,8 +36,6 @@ import java.io.FileNotFoundException;
|
|||
import java.util.concurrent.Callable;
|
||||
|
||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
|
||||
|
||||
/**
|
||||
* Test S3A Failure translation, including a functional test
|
||||
|
@ -68,13 +68,13 @@ public class ITestS3AFailureHandling extends AbstractFSContractTestBase {
|
|||
writeDataset(fs, testpath, shortDataset, shortDataset.length, 1024, true);
|
||||
// here the file length is less. Probe the file to see if this is true,
|
||||
// with a spin and wait
|
||||
eventually(30 *1000, new Callable<Void>() {
|
||||
LambdaTestUtils.evaluate(new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
assertEquals(shortLen, fs.getFileStatus(testpath).getLen());
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}, 30 * 1000, 1000);
|
||||
// here length is shorter. Assuming it has propagated to all replicas,
|
||||
// the position of the input stream is now beyond the EOF.
|
||||
// An attempt to seek backwards to a position greater than the
|
||||
|
|
|
@ -135,32 +135,6 @@ public class S3ATestUtils {
|
|||
return fc;
|
||||
}
|
||||
|
||||
/**
|
||||
* Repeatedly attempt a callback until timeout or a {@link FailFastException}
|
||||
* is raised. This is modeled on ScalaTests {@code eventually(Closure)} code.
|
||||
* @param timeout timeout
|
||||
* @param callback callback to invoke
|
||||
* @throws FailFastException any fast-failure
|
||||
* @throws Exception the exception which caused the iterator to fail
|
||||
*/
|
||||
public static void eventually(int timeout, Callable<Void> callback)
|
||||
throws Exception {
|
||||
Exception lastException;
|
||||
long endtime = System.currentTimeMillis() + timeout;
|
||||
do {
|
||||
try {
|
||||
callback.call();
|
||||
return;
|
||||
} catch (InterruptedException | FailFastException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
lastException = e;
|
||||
}
|
||||
Thread.sleep(500);
|
||||
} while (endtime > System.currentTimeMillis());
|
||||
throw lastException;
|
||||
}
|
||||
|
||||
/**
|
||||
* patch the endpoint option so that irrespective of where other tests
|
||||
* are working, the IO performance tests can work with the landsat
|
||||
|
@ -290,27 +264,6 @@ public class S3ATestUtils {
|
|||
? propval : confVal;
|
||||
}
|
||||
|
||||
/**
|
||||
* The exception to raise so as to exit fast from
|
||||
* {@link #eventually(int, Callable)}.
|
||||
*/
|
||||
public static class FailFastException extends Exception {
|
||||
public FailFastException() {
|
||||
}
|
||||
|
||||
public FailFastException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public FailFastException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public FailFastException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify the class of an exception. If it is not as expected, rethrow it.
|
||||
* Comparison is on the exact class, not subclass-of inference as
|
||||
|
|
Loading…
Reference in New Issue