Revert "HADOOP-13716. Add LambdaTestUtils class for tests; fix eventual consistency problem in contract test setup. Contributed by Steve Loughran."
This reverts commit 42f8a1d6eb
.
This commit is contained in:
parent
1f384b617d
commit
f0af3dee25
|
@ -27,16 +27,12 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.test.LambdaTestUtils;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.deleteChildren;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.deleteChildren;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.dumpStats;
|
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.listChildren;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.listChildren;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.toList;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.toList;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk;
|
||||||
|
@ -49,7 +45,6 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk;
|
||||||
public abstract class AbstractContractRootDirectoryTest extends AbstractFSContractTestBase {
|
public abstract class AbstractContractRootDirectoryTest extends AbstractFSContractTestBase {
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(AbstractContractRootDirectoryTest.class);
|
LoggerFactory.getLogger(AbstractContractRootDirectoryTest.class);
|
||||||
public static final int OBJECTSTORE_RETRY_TIMEOUT = 30000;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
|
@ -84,34 +79,23 @@ public abstract class AbstractContractRootDirectoryTest extends AbstractFSContra
|
||||||
// extra sanity checks here to avoid support calls about complete loss
|
// extra sanity checks here to avoid support calls about complete loss
|
||||||
// of data
|
// of data
|
||||||
skipIfUnsupported(TEST_ROOT_TESTS_ENABLED);
|
skipIfUnsupported(TEST_ROOT_TESTS_ENABLED);
|
||||||
final Path root = new Path("/");
|
Path root = new Path("/");
|
||||||
assertIsDirectory(root);
|
assertIsDirectory(root);
|
||||||
// make sure the directory is clean. This includes some retry logic
|
// make sure it is clean
|
||||||
// to forgive blobstores whose listings can be out of sync with the file
|
FileSystem fs = getFileSystem();
|
||||||
// status;
|
deleteChildren(fs, root, true);
|
||||||
final FileSystem fs = getFileSystem();
|
FileStatus[] children = listChildren(fs, root);
|
||||||
final AtomicInteger iterations = new AtomicInteger(0);
|
if (children.length > 0) {
|
||||||
final FileStatus[] originalChildren = listChildren(fs, root);
|
StringBuilder error = new StringBuilder();
|
||||||
LambdaTestUtils.evaluate(
|
error.append("Deletion of child entries failed, still have")
|
||||||
new Callable<Void>() {
|
.append(children.length)
|
||||||
@Override
|
.append(System.lineSeparator());
|
||||||
public Void call() throws Exception {
|
for (FileStatus child : children) {
|
||||||
FileStatus[] deleted = deleteChildren(fs, root, true);
|
error.append(" ").append(child.getPath())
|
||||||
FileStatus[] children = listChildren(fs, root);
|
.append(System.lineSeparator());
|
||||||
if (children.length > 0) {
|
}
|
||||||
fail(String.format(
|
fail(error.toString());
|
||||||
"After %d attempts: listing after rm /* not empty"
|
}
|
||||||
+ "\n%s\n%s\n%s",
|
|
||||||
iterations.incrementAndGet(),
|
|
||||||
dumpStats("final", children),
|
|
||||||
dumpStats("deleted", deleted),
|
|
||||||
dumpStats("original", originalChildren)));
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
OBJECTSTORE_RETRY_TIMEOUT,
|
|
||||||
new LambdaTestUtils.LinearRetryInterval(50, 1000));
|
|
||||||
// then try to delete the empty one
|
// then try to delete the empty one
|
||||||
boolean deleted = fs.delete(root, false);
|
boolean deleted = fs.delete(root, false);
|
||||||
LOG.info("rm / of empty dir result is {}", deleted);
|
LOG.info("rm / of empty dir result is {}", deleted);
|
||||||
|
|
|
@ -400,18 +400,18 @@ public class ContractTestUtils extends Assert {
|
||||||
* @param fileSystem filesystem
|
* @param fileSystem filesystem
|
||||||
* @param path path to delete
|
* @param path path to delete
|
||||||
* @param recursive flag to indicate child entry deletion should be recursive
|
* @param recursive flag to indicate child entry deletion should be recursive
|
||||||
* @return the immediate child entries found and deleted (not including
|
* @return the number of child entries found and deleted (not including
|
||||||
* any recursive children of those entries)
|
* any recursive children of those entries)
|
||||||
* @throws IOException problem in the deletion process.
|
* @throws IOException problem in the deletion process.
|
||||||
*/
|
*/
|
||||||
public static FileStatus[] deleteChildren(FileSystem fileSystem,
|
public static int deleteChildren(FileSystem fileSystem,
|
||||||
Path path,
|
Path path,
|
||||||
boolean recursive) throws IOException {
|
boolean recursive) throws IOException {
|
||||||
FileStatus[] children = listChildren(fileSystem, path);
|
FileStatus[] children = listChildren(fileSystem, path);
|
||||||
for (FileStatus entry : children) {
|
for (FileStatus entry : children) {
|
||||||
fileSystem.delete(entry.getPath(), recursive);
|
fileSystem.delete(entry.getPath(), recursive);
|
||||||
}
|
}
|
||||||
return children;
|
return children.length;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -1,428 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.test;
|
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import org.apache.hadoop.util.Time;
|
|
||||||
|
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Class containing methods and associated classes to make the most of Lambda
|
|
||||||
* expressions in Hadoop tests.
|
|
||||||
*
|
|
||||||
* The code has been designed from the outset to be Java-8 friendly, but still
|
|
||||||
* be usable in Java 7.
|
|
||||||
* In particular: support for waiting for a condition to be met.
|
|
||||||
* This is to avoid tests having hard-coded sleeps in them.
|
|
||||||
*
|
|
||||||
* The code is modelled on {@code GenericTestUtils#waitFor(Supplier, int, int)},
|
|
||||||
* but also lifts concepts from Scalatest's {@code awaitResult} and
|
|
||||||
* its notion of pluggable retry logic (simple, backoff, maybe even things
|
|
||||||
* with jitter: test author gets to choose).
|
|
||||||
* The {@code intercept} method is also all credit due Scalatest, though
|
|
||||||
* it's been extended to also support a string message check; useful when
|
|
||||||
* checking the contents of the exception.
|
|
||||||
*/
|
|
||||||
public final class LambdaTestUtils {
|
|
||||||
public static final Logger LOG = LoggerFactory.getLogger(LambdaTestUtils.class);
|
|
||||||
|
|
||||||
private LambdaTestUtils() {
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This is the string included in the assertion text in
|
|
||||||
* {@link #intercept(Class, Callable)} if
|
|
||||||
* the closure returned a null value.
|
|
||||||
*/
|
|
||||||
public static final String NULL_RESULT = "(null)";
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Interface to implement for converting a timeout into some form
|
|
||||||
* of exception to raise.
|
|
||||||
*/
|
|
||||||
public interface TimeoutHandler {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create an exception (or throw one, if desired).
|
|
||||||
* @param timeoutMillis timeout which has arisen
|
|
||||||
* @param caught any exception which was caught; may be null
|
|
||||||
* @return an exception which will then be thrown
|
|
||||||
* @throws Exception if the handler wishes to raise an exception
|
|
||||||
* that way.
|
|
||||||
*/
|
|
||||||
Exception evaluate(int timeoutMillis, Exception caught) throws Exception;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wait for a condition to be met.
|
|
||||||
* @param check predicate to evaluate
|
|
||||||
* @param timeoutMillis timeout in milliseconds.
|
|
||||||
* Can be zero, in which case only one attempt is made.
|
|
||||||
* @param retry retry escalation logic
|
|
||||||
* @param failure handler invoked on failure; the returned exception
|
|
||||||
* will be thrown
|
|
||||||
* @return the number of iterations before the condition was satisfied
|
|
||||||
* @throws Exception returned by {@code failure} on timeout
|
|
||||||
* @throws FailFastException immediately if the evaluated operation raises it
|
|
||||||
* @throws InterruptedException if interrupted.
|
|
||||||
*/
|
|
||||||
public static int eventually(Callable<Boolean> check,
|
|
||||||
int timeoutMillis,
|
|
||||||
Callable<Integer> retry,
|
|
||||||
TimeoutHandler failure)
|
|
||||||
throws Exception {
|
|
||||||
Preconditions.checkArgument(timeoutMillis >= 0,
|
|
||||||
"timeoutMillis must be > 0");
|
|
||||||
|
|
||||||
long endTime = Time.now() + timeoutMillis;
|
|
||||||
Exception ex = null;
|
|
||||||
boolean running = true;
|
|
||||||
int iterations = 0;
|
|
||||||
while (running) {
|
|
||||||
iterations++;
|
|
||||||
try {
|
|
||||||
if (check.call()) {
|
|
||||||
return iterations;
|
|
||||||
}
|
|
||||||
} catch (InterruptedException | FailFastException e) {
|
|
||||||
throw e;
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.debug("eventually() iteration {}", iterations, e);
|
|
||||||
ex = e;
|
|
||||||
}
|
|
||||||
running = Time.now() < endTime;
|
|
||||||
if (running) {
|
|
||||||
int sleeptime = retry.call();
|
|
||||||
if (sleeptime >= 0) {
|
|
||||||
Thread.sleep(sleeptime);
|
|
||||||
} else {
|
|
||||||
running = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// timeout
|
|
||||||
throw failure.evaluate(timeoutMillis, ex);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Simplified {@code eventually()} clause; fixed interval
|
|
||||||
* and {@link GenerateTimeout} used to generate the timeout.
|
|
||||||
* @param check predicate to evaluate
|
|
||||||
* @param timeoutMillis timeout in milliseconds.
|
|
||||||
* Can be zero, in which case only one attempt is made.
|
|
||||||
* @param intervalMillis interval in milliseconds between checks
|
|
||||||
* @return the number of iterations before the condition was satisfied
|
|
||||||
* @throws Exception returned by {@code failure} on timeout
|
|
||||||
* @throws FailFastException immediately if the evaluated operation raises it
|
|
||||||
* @throws InterruptedException if interrupted.
|
|
||||||
*/
|
|
||||||
public static int eventually(Callable<Boolean> check,
|
|
||||||
int timeoutMillis,
|
|
||||||
int intervalMillis) throws Exception {
|
|
||||||
return eventually(check,
|
|
||||||
timeoutMillis,
|
|
||||||
new FixedRetryInterval(intervalMillis),
|
|
||||||
new GenerateTimeout());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Await a result; exceptions are caught and, with one exception,
|
|
||||||
* trigger a sleep and retry. This is similar of ScalaTest's
|
|
||||||
* {@code Await.result()} operation, though that lacks the ability to
|
|
||||||
* fail fast if the inner closure has determined that a failure condition
|
|
||||||
* is non-recoverable.
|
|
||||||
* @param eval expression to evaluate
|
|
||||||
* @param timeoutMillis timeout in milliseconds.
|
|
||||||
* Can be zero, in which case only one attempt is made.
|
|
||||||
* @param retry retry interval generator
|
|
||||||
* @param <T> return type
|
|
||||||
* @return result of the first successful eval call
|
|
||||||
* @throws Exception the last exception thrown before timeout was triggered
|
|
||||||
* @throws FailFastException if raised -without any retry attempt.
|
|
||||||
* @throws InterruptedException if interrupted during the sleep operation.
|
|
||||||
*/
|
|
||||||
public static <T> T evaluate(Callable<T> eval,
|
|
||||||
int timeoutMillis,
|
|
||||||
Callable<Integer> retry) throws Exception {
|
|
||||||
Preconditions.checkArgument(timeoutMillis >= 0,
|
|
||||||
"timeoutMillis must be >= 0");
|
|
||||||
long endTime = Time.now() + timeoutMillis;
|
|
||||||
Exception ex;
|
|
||||||
boolean running;
|
|
||||||
int sleeptime;
|
|
||||||
int iterations = 0;
|
|
||||||
do {
|
|
||||||
iterations++;
|
|
||||||
try {
|
|
||||||
return eval.call();
|
|
||||||
} catch (InterruptedException | FailFastException e) {
|
|
||||||
throw e;
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.debug("evaluate() iteration {}", iterations, e);
|
|
||||||
ex = e;
|
|
||||||
}
|
|
||||||
running = Time.now() < endTime;
|
|
||||||
if (running && (sleeptime = retry.call()) >= 0) {
|
|
||||||
Thread.sleep(sleeptime);
|
|
||||||
}
|
|
||||||
} while (running);
|
|
||||||
// timeout. Throw the last exception raised
|
|
||||||
throw ex;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Simplified {@code evaluate()} clause; fixed interval.
|
|
||||||
* @param check predicate to evaluate
|
|
||||||
* @param timeoutMillis wait interval between check failures
|
|
||||||
* @param intervalMillis interval in milliseconds
|
|
||||||
* @return result of the first successful eval call
|
|
||||||
* @throws Exception the last exception thrown before timeout was triggered
|
|
||||||
* @throws FailFastException if raised -without any retry attempt.
|
|
||||||
* @throws InterruptedException if interrupted during the sleep operation.
|
|
||||||
*/
|
|
||||||
public static <T> T evaluate(Callable<T> eval,
|
|
||||||
int timeoutMillis,
|
|
||||||
int intervalMillis) throws Exception {
|
|
||||||
return evaluate(eval,
|
|
||||||
timeoutMillis,
|
|
||||||
new FixedRetryInterval(intervalMillis));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Intercept an exception; raise an exception if it was not raised.
|
|
||||||
* Exceptions of the wrong class are also rethrown.
|
|
||||||
* @param clazz class of exception; the raised exception must be this class
|
|
||||||
* <i>or a subclass</i>.
|
|
||||||
* @param eval expression to eval
|
|
||||||
* @param <T> return type of expression
|
|
||||||
* @param <E> exception class
|
|
||||||
* @return the caught exception if it was of the expected type
|
|
||||||
* @throws Exception any other exception raised
|
|
||||||
* @throws AssertionError if the evaluation call didn't raise an exception.
|
|
||||||
* The error includes the {@code toString()} value of the result, if this
|
|
||||||
* can be determined.
|
|
||||||
*/
|
|
||||||
public static <T, E extends Throwable> E intercept(
|
|
||||||
Class<E> clazz,
|
|
||||||
Callable<T> eval)
|
|
||||||
throws Exception {
|
|
||||||
try {
|
|
||||||
T result = eval.call();
|
|
||||||
throw new AssertionError("Expected an exception, got "
|
|
||||||
+ robustToString(result));
|
|
||||||
} catch (Throwable e) {
|
|
||||||
if (clazz.isAssignableFrom(e.getClass())) {
|
|
||||||
return (E)e;
|
|
||||||
} else {
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Intercept an exception; raise an exception if it was not raised.
|
|
||||||
* Exceptions of the wrong class are also rethrown.
|
|
||||||
* @param clazz class of exception; the raised exception must be this class
|
|
||||||
* <i>or a subclass</i>.
|
|
||||||
* @param contained string which must be in the {@code toString()} value
|
|
||||||
* of the exception
|
|
||||||
* @param eval expression to eval
|
|
||||||
* @param <T> return type of expression
|
|
||||||
* @param <E> exception class
|
|
||||||
* @return the caught exception if it was of the expected type and contents
|
|
||||||
* @throws Exception any other exception raised
|
|
||||||
* @throws AssertionError if the evaluation call didn't raise an exception.
|
|
||||||
* The error includes the {@code toString()} value of the result, if this
|
|
||||||
* can be determined.
|
|
||||||
*/
|
|
||||||
public static <T, E extends Throwable> E intercept(
|
|
||||||
Class<E> clazz,
|
|
||||||
String contained,
|
|
||||||
Callable<T> eval)
|
|
||||||
throws Exception {
|
|
||||||
E ex = intercept(clazz, eval);
|
|
||||||
GenericTestUtils.assertExceptionContains(contained, ex);
|
|
||||||
return ex;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Robust string converter for exception messages; if the {@code toString()}
|
|
||||||
* method throws an exception then that exception is caught and logged,
|
|
||||||
* then a simple string of the classname logged.
|
|
||||||
* This stops a toString() failure hiding underlying problems in the code.
|
|
||||||
* @param o object to stringify
|
|
||||||
* @return a string for exception messages
|
|
||||||
*/
|
|
||||||
private static String robustToString(Object o) {
|
|
||||||
if (o == null) {
|
|
||||||
return NULL_RESULT;
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
return o.toString();
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.info("Exception calling toString()", e);
|
|
||||||
return o.getClass().toString();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns {@code TimeoutException} on a timeout. If
|
|
||||||
* there was a inner class passed in, includes it as the
|
|
||||||
* inner failure.
|
|
||||||
*/
|
|
||||||
public static class GenerateTimeout implements TimeoutHandler {
|
|
||||||
private final String message;
|
|
||||||
|
|
||||||
public GenerateTimeout(String message) {
|
|
||||||
this.message = message;
|
|
||||||
}
|
|
||||||
|
|
||||||
public GenerateTimeout() {
|
|
||||||
this("timeout");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Evaluate by creating a new Timeout Exception.
|
|
||||||
* @param timeoutMillis timeout in millis
|
|
||||||
* @param caught optional caught exception
|
|
||||||
* @return TimeoutException
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public Exception evaluate(int timeoutMillis, Exception caught)
|
|
||||||
throws Exception {
|
|
||||||
String s = String.format("%s: after %d millis", message,
|
|
||||||
timeoutMillis);
|
|
||||||
String caughtText = caught != null
|
|
||||||
? ("; " + robustToString(caught)) : "";
|
|
||||||
|
|
||||||
return (TimeoutException) (new TimeoutException(s + caughtText)
|
|
||||||
.initCause(caught));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Retry at a fixed time period between calls.
|
|
||||||
*/
|
|
||||||
public static class FixedRetryInterval implements Callable<Integer> {
|
|
||||||
private final int intervalMillis;
|
|
||||||
private int invocationCount = 0;
|
|
||||||
|
|
||||||
public FixedRetryInterval(int intervalMillis) {
|
|
||||||
Preconditions.checkArgument(intervalMillis > 0);
|
|
||||||
this.intervalMillis = intervalMillis;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Integer call() throws Exception {
|
|
||||||
invocationCount++;
|
|
||||||
return intervalMillis;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getInvocationCount() {
|
|
||||||
return invocationCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
final StringBuilder sb = new StringBuilder(
|
|
||||||
"FixedRetryInterval{");
|
|
||||||
sb.append("interval=").append(intervalMillis);
|
|
||||||
sb.append(", invocationCount=").append(invocationCount);
|
|
||||||
sb.append('}');
|
|
||||||
return sb.toString();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Gradually increase the sleep time by the initial interval, until
|
|
||||||
* the limit set by {@code maxIntervalMillis} is reached.
|
|
||||||
*/
|
|
||||||
public static class LinearRetryInterval implements Callable<Integer> {
|
|
||||||
private final int intervalMillis;
|
|
||||||
private final int maxIntervalMillis;
|
|
||||||
private int current;
|
|
||||||
private int invocationCount = 0;
|
|
||||||
|
|
||||||
public LinearRetryInterval(int intervalMillis, int maxIntervalMillis) {
|
|
||||||
Preconditions.checkArgument(intervalMillis > 0);
|
|
||||||
Preconditions.checkArgument(maxIntervalMillis > 0);
|
|
||||||
this.intervalMillis = intervalMillis;
|
|
||||||
this.current = intervalMillis;
|
|
||||||
this.maxIntervalMillis = maxIntervalMillis;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Integer call() throws Exception {
|
|
||||||
invocationCount++;
|
|
||||||
int last = current;
|
|
||||||
if (last < maxIntervalMillis) {
|
|
||||||
current += intervalMillis;
|
|
||||||
}
|
|
||||||
return last;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getInvocationCount() {
|
|
||||||
return invocationCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
final StringBuilder sb = new StringBuilder(
|
|
||||||
"LinearRetryInterval{");
|
|
||||||
sb.append("interval=").append(intervalMillis);
|
|
||||||
sb.append(", current=").append(current);
|
|
||||||
sb.append(", limit=").append(maxIntervalMillis);
|
|
||||||
sb.append(", invocationCount=").append(invocationCount);
|
|
||||||
sb.append('}');
|
|
||||||
return sb.toString();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* An exception which triggers a fast exist from the
|
|
||||||
* {@link #evaluate(Callable, int, Callable)} and
|
|
||||||
* {@link #eventually(Callable, int, Callable, TimeoutHandler)} loops.
|
|
||||||
*/
|
|
||||||
public static class FailFastException extends Exception {
|
|
||||||
|
|
||||||
public FailFastException(String detailMessage) {
|
|
||||||
super(detailMessage);
|
|
||||||
}
|
|
||||||
|
|
||||||
public FailFastException(String message, Throwable cause) {
|
|
||||||
super(message, cause);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Instantiate from a format string.
|
|
||||||
* @param format format string
|
|
||||||
* @param args arguments to format
|
|
||||||
* @return an instance with the message string constructed.
|
|
||||||
*/
|
|
||||||
public static FailFastException newInstance(String format, Object...args) {
|
|
||||||
return new FailFastException(String.format(format, args));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,240 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.test;
|
|
||||||
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
import java.util.concurrent.TimeoutException;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.test.LambdaTestUtils.*;
|
|
||||||
import static org.apache.hadoop.test.GenericTestUtils.*;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test the logic in {@link LambdaTestUtils}.
|
|
||||||
* This test suite includes Java 8 and Java 7 code; the Java 8 code exists
|
|
||||||
* to verify that the API is easily used with Lambda expressions.
|
|
||||||
*/
|
|
||||||
public class TestLambdaTestUtils extends Assert {
|
|
||||||
|
|
||||||
public static final int INTERVAL = 10;
|
|
||||||
public static final int TIMEOUT = 50;
|
|
||||||
private FixedRetryInterval retry = new FixedRetryInterval(INTERVAL);
|
|
||||||
// counter for lambda expressions to use
|
|
||||||
private int count;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Always evaluates to true.
|
|
||||||
*/
|
|
||||||
public static final Callable<Boolean> ALWAYS_TRUE =
|
|
||||||
new Callable<Boolean>() {
|
|
||||||
@Override
|
|
||||||
public Boolean call() throws Exception {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Always evaluates to false.
|
|
||||||
*/
|
|
||||||
public static final Callable<Boolean> ALWAYS_FALSE =
|
|
||||||
new Callable<Boolean>() {
|
|
||||||
@Override
|
|
||||||
public Boolean call() throws Exception {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Text in the raised FNFE.
|
|
||||||
*/
|
|
||||||
public static final String MISSING = "not found";
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A predicate that always throws a FileNotFoundException.
|
|
||||||
*/
|
|
||||||
public static final Callable<Boolean> ALWAYS_FNFE =
|
|
||||||
new Callable<Boolean>() {
|
|
||||||
@Override
|
|
||||||
public Boolean call() throws Exception {
|
|
||||||
throw new FileNotFoundException(MISSING);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
|
||||||
* reusable timeout handler.
|
|
||||||
*/
|
|
||||||
public static final GenerateTimeout
|
|
||||||
TIMEOUT_FAILURE_HANDLER = new GenerateTimeout();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Always evaluates to 3L.
|
|
||||||
*/
|
|
||||||
public static final Callable<Long> EVAL_3L = new Callable<Long>() {
|
|
||||||
@Override
|
|
||||||
public Long call() throws Exception {
|
|
||||||
return 3L;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Always raises a {@code FileNotFoundException}.
|
|
||||||
*/
|
|
||||||
public static final Callable<Long> EVAL_FNFE = new Callable<Long>() {
|
|
||||||
@Override
|
|
||||||
public Long call() throws Exception {
|
|
||||||
throw new FileNotFoundException(MISSING);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testEventuallyAlwaysTrue() throws Throwable {
|
|
||||||
eventually(
|
|
||||||
ALWAYS_TRUE,
|
|
||||||
TIMEOUT,
|
|
||||||
new FixedRetryInterval(INTERVAL),
|
|
||||||
TIMEOUT_FAILURE_HANDLER);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testEventuallyAlwaysFalse() throws Throwable {
|
|
||||||
try {
|
|
||||||
eventually(
|
|
||||||
ALWAYS_FALSE,
|
|
||||||
TIMEOUT,
|
|
||||||
retry,
|
|
||||||
TIMEOUT_FAILURE_HANDLER);
|
|
||||||
fail("should not have got here");
|
|
||||||
} catch (TimeoutException e) {
|
|
||||||
assertTrue(retry.getInvocationCount() > 4);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testEventuallyLinearRetry() throws Throwable {
|
|
||||||
LinearRetryInterval linearRetry = new LinearRetryInterval(
|
|
||||||
INTERVAL * 2,
|
|
||||||
TIMEOUT * 2);
|
|
||||||
try {
|
|
||||||
eventually(
|
|
||||||
ALWAYS_FALSE,
|
|
||||||
TIMEOUT,
|
|
||||||
linearRetry,
|
|
||||||
TIMEOUT_FAILURE_HANDLER);
|
|
||||||
fail("should not have got here");
|
|
||||||
} catch (TimeoutException e) {
|
|
||||||
assertEquals(linearRetry.toString(),
|
|
||||||
2, linearRetry.getInvocationCount());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testEventuallyFNFE() throws Throwable {
|
|
||||||
try {
|
|
||||||
eventually(
|
|
||||||
ALWAYS_FNFE,
|
|
||||||
TIMEOUT,
|
|
||||||
retry,
|
|
||||||
TIMEOUT_FAILURE_HANDLER);
|
|
||||||
fail("should not have got here");
|
|
||||||
} catch (TimeoutException e) {
|
|
||||||
// inner clause is included
|
|
||||||
assertTrue(retry.getInvocationCount() > 0);
|
|
||||||
assertTrue(e.getCause() instanceof FileNotFoundException);
|
|
||||||
assertExceptionContains(MISSING, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testEvaluate() throws Throwable {
|
|
||||||
long result = evaluate(EVAL_3L,
|
|
||||||
TIMEOUT,
|
|
||||||
retry);
|
|
||||||
assertEquals(3, result);
|
|
||||||
assertEquals(0, retry.getInvocationCount());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testEvalFailuresRetry() throws Throwable {
|
|
||||||
try {
|
|
||||||
evaluate(EVAL_FNFE,
|
|
||||||
TIMEOUT,
|
|
||||||
retry);
|
|
||||||
fail("should not have got here");
|
|
||||||
} catch (IOException expected) {
|
|
||||||
// expected
|
|
||||||
assertMinRetryCount(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testLinearRetryInterval() throws Throwable {
|
|
||||||
LinearRetryInterval interval = new LinearRetryInterval(200, 1000);
|
|
||||||
assertEquals(200, (int) interval.call());
|
|
||||||
assertEquals(400, (int) interval.call());
|
|
||||||
assertEquals(600, (int) interval.call());
|
|
||||||
assertEquals(800, (int) interval.call());
|
|
||||||
assertEquals(1000, (int) interval.call());
|
|
||||||
assertEquals(1000, (int) interval.call());
|
|
||||||
assertEquals(1000, (int) interval.call());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testInterceptSuccess() throws Throwable {
|
|
||||||
IOException ioe = intercept(IOException.class, ALWAYS_FNFE);
|
|
||||||
assertExceptionContains(MISSING, ioe);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testInterceptContainsSuccess() throws Throwable {
|
|
||||||
intercept(IOException.class, MISSING, ALWAYS_FNFE);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testInterceptContainsWrongString() throws Throwable {
|
|
||||||
try {
|
|
||||||
FileNotFoundException e =
|
|
||||||
intercept(FileNotFoundException.class, "404", ALWAYS_FNFE);
|
|
||||||
throw e;
|
|
||||||
} catch (AssertionError expected) {
|
|
||||||
assertExceptionContains(MISSING, expected);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Assert the retry count is as expected.
|
|
||||||
* @param expected expected value
|
|
||||||
*/
|
|
||||||
protected void assertRetryCount(int expected) {
|
|
||||||
assertEquals(retry.toString(), expected, retry.getInvocationCount());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Assert the retry count is as expected.
|
|
||||||
* @param minCount minimum value
|
|
||||||
*/
|
|
||||||
protected void assertMinRetryCount(int minCount) {
|
|
||||||
assertTrue("retry count of " + retry
|
|
||||||
+ " is not >= " + minCount,
|
|
||||||
minCount <= retry.getInvocationCount());
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -25,8 +25,6 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||||
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
|
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
|
||||||
import org.apache.hadoop.fs.contract.s3a.S3AContract;
|
import org.apache.hadoop.fs.contract.s3a.S3AContract;
|
||||||
import org.apache.hadoop.test.LambdaTestUtils;
|
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -36,6 +34,8 @@ import java.io.FileNotFoundException;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test S3A Failure translation, including a functional test
|
* Test S3A Failure translation, including a functional test
|
||||||
|
@ -68,13 +68,13 @@ public class ITestS3AFailureHandling extends AbstractFSContractTestBase {
|
||||||
writeDataset(fs, testpath, shortDataset, shortDataset.length, 1024, true);
|
writeDataset(fs, testpath, shortDataset, shortDataset.length, 1024, true);
|
||||||
// here the file length is less. Probe the file to see if this is true,
|
// here the file length is less. Probe the file to see if this is true,
|
||||||
// with a spin and wait
|
// with a spin and wait
|
||||||
LambdaTestUtils.evaluate(new Callable<Void>() {
|
eventually(30 *1000, new Callable<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public Void call() throws Exception {
|
public Void call() throws Exception {
|
||||||
assertEquals(shortLen, fs.getFileStatus(testpath).getLen());
|
assertEquals(shortLen, fs.getFileStatus(testpath).getLen());
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}, 30 * 1000, 1000);
|
});
|
||||||
// here length is shorter. Assuming it has propagated to all replicas,
|
// here length is shorter. Assuming it has propagated to all replicas,
|
||||||
// the position of the input stream is now beyond the EOF.
|
// the position of the input stream is now beyond the EOF.
|
||||||
// An attempt to seek backwards to a position greater than the
|
// An attempt to seek backwards to a position greater than the
|
||||||
|
|
|
@ -135,6 +135,32 @@ public class S3ATestUtils {
|
||||||
return fc;
|
return fc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Repeatedly attempt a callback until timeout or a {@link FailFastException}
|
||||||
|
* is raised. This is modeled on ScalaTests {@code eventually(Closure)} code.
|
||||||
|
* @param timeout timeout
|
||||||
|
* @param callback callback to invoke
|
||||||
|
* @throws FailFastException any fast-failure
|
||||||
|
* @throws Exception the exception which caused the iterator to fail
|
||||||
|
*/
|
||||||
|
public static void eventually(int timeout, Callable<Void> callback)
|
||||||
|
throws Exception {
|
||||||
|
Exception lastException;
|
||||||
|
long endtime = System.currentTimeMillis() + timeout;
|
||||||
|
do {
|
||||||
|
try {
|
||||||
|
callback.call();
|
||||||
|
return;
|
||||||
|
} catch (InterruptedException | FailFastException e) {
|
||||||
|
throw e;
|
||||||
|
} catch (Exception e) {
|
||||||
|
lastException = e;
|
||||||
|
}
|
||||||
|
Thread.sleep(500);
|
||||||
|
} while (endtime > System.currentTimeMillis());
|
||||||
|
throw lastException;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* patch the endpoint option so that irrespective of where other tests
|
* patch the endpoint option so that irrespective of where other tests
|
||||||
* are working, the IO performance tests can work with the landsat
|
* are working, the IO performance tests can work with the landsat
|
||||||
|
@ -264,6 +290,27 @@ public class S3ATestUtils {
|
||||||
? propval : confVal;
|
? propval : confVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The exception to raise so as to exit fast from
|
||||||
|
* {@link #eventually(int, Callable)}.
|
||||||
|
*/
|
||||||
|
public static class FailFastException extends Exception {
|
||||||
|
public FailFastException() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public FailFastException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public FailFastException(String message, Throwable cause) {
|
||||||
|
super(message, cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
public FailFastException(Throwable cause) {
|
||||||
|
super(cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verify the class of an exception. If it is not as expected, rethrow it.
|
* Verify the class of an exception. If it is not as expected, rethrow it.
|
||||||
* Comparison is on the exact class, not subclass-of inference as
|
* Comparison is on the exact class, not subclass-of inference as
|
||||||
|
|
Loading…
Reference in New Issue