From 42f8a1d6eb5add3180094b126c4af2b3ddbc7cae Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Thu, 20 Oct 2016 12:47:57 -0700 Subject: [PATCH] HADOOP-13716. Add LambdaTestUtils class for tests; fix eventual consistency problem in contract test setup. Contributed by Steve Loughran. --- .../AbstractContractRootDirectoryTest.java | 48 +- .../hadoop/fs/contract/ContractTestUtils.java | 6 +- .../apache/hadoop/test/LambdaTestUtils.java | 428 ++++++++++++++++++ .../hadoop/test/TestLambdaTestUtils.java | 240 ++++++++++ .../fs/s3a/ITestS3AFailureHandling.java | 8 +- .../apache/hadoop/fs/s3a/S3ATestUtils.java | 47 -- 6 files changed, 707 insertions(+), 70 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/TestLambdaTestUtils.java diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java index 0a8f464486a..7295f98dc44 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java @@ -27,12 +27,16 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.test.LambdaTestUtils; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.deleteChildren; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dumpStats; import static org.apache.hadoop.fs.contract.ContractTestUtils.listChildren; import static org.apache.hadoop.fs.contract.ContractTestUtils.toList; import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk; @@ -45,6 +49,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk; public abstract class AbstractContractRootDirectoryTest extends AbstractFSContractTestBase { private static final Logger LOG = LoggerFactory.getLogger(AbstractContractRootDirectoryTest.class); + public static final int OBJECTSTORE_RETRY_TIMEOUT = 30000; @Override public void setup() throws Exception { @@ -79,23 +84,34 @@ public abstract class AbstractContractRootDirectoryTest extends AbstractFSContra // extra sanity checks here to avoid support calls about complete loss // of data skipIfUnsupported(TEST_ROOT_TESTS_ENABLED); - Path root = new Path("/"); + final Path root = new Path("/"); assertIsDirectory(root); - // make sure it is clean - FileSystem fs = getFileSystem(); - deleteChildren(fs, root, true); - FileStatus[] children = listChildren(fs, root); - if (children.length > 0) { - StringBuilder error = new StringBuilder(); - error.append("Deletion of child entries failed, still have") - .append(children.length) - .append(System.lineSeparator()); - for (FileStatus child : children) { - error.append(" ").append(child.getPath()) - .append(System.lineSeparator()); - } - fail(error.toString()); - } + // make sure the directory is clean. This includes some retry logic + // to forgive blobstores whose listings can be out of sync with the file + // status; + final FileSystem fs = getFileSystem(); + final AtomicInteger iterations = new AtomicInteger(0); + final FileStatus[] originalChildren = listChildren(fs, root); + LambdaTestUtils.evaluate( + new Callable() { + @Override + public Void call() throws Exception { + FileStatus[] deleted = deleteChildren(fs, root, true); + FileStatus[] children = listChildren(fs, root); + if (children.length > 0) { + fail(String.format( + "After %d attempts: listing after rm /* not empty" + + "\n%s\n%s\n%s", + iterations.incrementAndGet(), + dumpStats("final", children), + dumpStats("deleted", deleted), + dumpStats("original", originalChildren))); + } + return null; + } + }, + OBJECTSTORE_RETRY_TIMEOUT, + new LambdaTestUtils.LinearRetryInterval(50, 1000)); // then try to delete the empty one boolean deleted = fs.delete(root, false); LOG.info("rm / of empty dir result is {}", deleted); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java index 16bfb9a6962..73c8f1ce095 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java @@ -400,18 +400,18 @@ public class ContractTestUtils extends Assert { * @param fileSystem filesystem * @param path path to delete * @param recursive flag to indicate child entry deletion should be recursive - * @return the number of child entries found and deleted (not including + * @return the immediate child entries found and deleted (not including * any recursive children of those entries) * @throws IOException problem in the deletion process. */ - public static int deleteChildren(FileSystem fileSystem, + public static FileStatus[] deleteChildren(FileSystem fileSystem, Path path, boolean recursive) throws IOException { FileStatus[] children = listChildren(fileSystem, path); for (FileStatus entry : children) { fileSystem.delete(entry.getPath(), recursive); } - return children.length; + return children; } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java new file mode 100644 index 00000000000..51cdb77bc75 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java @@ -0,0 +1,428 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.test; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.util.Time; + +import java.util.concurrent.Callable; +import java.util.concurrent.TimeoutException; + +/** + * Class containing methods and associated classes to make the most of Lambda + * expressions in Hadoop tests. + * + * The code has been designed from the outset to be Java-8 friendly, but still + * be usable in Java 7. + * In particular: support for waiting for a condition to be met. + * This is to avoid tests having hard-coded sleeps in them. + * + * The code is modelled on {@code GenericTestUtils#waitFor(Supplier, int, int)}, + * but also lifts concepts from Scalatest's {@code awaitResult} and + * its notion of pluggable retry logic (simple, backoff, maybe even things + * with jitter: test author gets to choose). + * The {@code intercept} method is also all credit due Scalatest, though + * it's been extended to also support a string message check; useful when + * checking the contents of the exception. + */ +public final class LambdaTestUtils { + public static final Logger LOG = LoggerFactory.getLogger(LambdaTestUtils.class); + + private LambdaTestUtils() { + } + + /** + * This is the string included in the assertion text in + * {@link #intercept(Class, Callable)} if + * the closure returned a null value. + */ + public static final String NULL_RESULT = "(null)"; + + /** + * Interface to implement for converting a timeout into some form + * of exception to raise. + */ + public interface TimeoutHandler { + + /** + * Create an exception (or throw one, if desired). + * @param timeoutMillis timeout which has arisen + * @param caught any exception which was caught; may be null + * @return an exception which will then be thrown + * @throws Exception if the handler wishes to raise an exception + * that way. + */ + Exception evaluate(int timeoutMillis, Exception caught) throws Exception; + } + + /** + * Wait for a condition to be met. + * @param check predicate to evaluate + * @param timeoutMillis timeout in milliseconds. + * Can be zero, in which case only one attempt is made. + * @param retry retry escalation logic + * @param failure handler invoked on failure; the returned exception + * will be thrown + * @return the number of iterations before the condition was satisfied + * @throws Exception returned by {@code failure} on timeout + * @throws FailFastException immediately if the evaluated operation raises it + * @throws InterruptedException if interrupted. + */ + public static int eventually(Callable check, + int timeoutMillis, + Callable retry, + TimeoutHandler failure) + throws Exception { + Preconditions.checkArgument(timeoutMillis >= 0, + "timeoutMillis must be > 0"); + + long endTime = Time.now() + timeoutMillis; + Exception ex = null; + boolean running = true; + int iterations = 0; + while (running) { + iterations++; + try { + if (check.call()) { + return iterations; + } + } catch (InterruptedException | FailFastException e) { + throw e; + } catch (Exception e) { + LOG.debug("eventually() iteration {}", iterations, e); + ex = e; + } + running = Time.now() < endTime; + if (running) { + int sleeptime = retry.call(); + if (sleeptime >= 0) { + Thread.sleep(sleeptime); + } else { + running = false; + } + } + } + // timeout + throw failure.evaluate(timeoutMillis, ex); + } + + /** + * Simplified {@code eventually()} clause; fixed interval + * and {@link GenerateTimeout} used to generate the timeout. + * @param check predicate to evaluate + * @param timeoutMillis timeout in milliseconds. + * Can be zero, in which case only one attempt is made. + * @param intervalMillis interval in milliseconds between checks + * @return the number of iterations before the condition was satisfied + * @throws Exception returned by {@code failure} on timeout + * @throws FailFastException immediately if the evaluated operation raises it + * @throws InterruptedException if interrupted. + */ + public static int eventually(Callable check, + int timeoutMillis, + int intervalMillis) throws Exception { + return eventually(check, + timeoutMillis, + new FixedRetryInterval(intervalMillis), + new GenerateTimeout()); + } + + /** + * Await a result; exceptions are caught and, with one exception, + * trigger a sleep and retry. This is similar of ScalaTest's + * {@code Await.result()} operation, though that lacks the ability to + * fail fast if the inner closure has determined that a failure condition + * is non-recoverable. + * @param eval expression to evaluate + * @param timeoutMillis timeout in milliseconds. + * Can be zero, in which case only one attempt is made. + * @param retry retry interval generator + * @param return type + * @return result of the first successful eval call + * @throws Exception the last exception thrown before timeout was triggered + * @throws FailFastException if raised -without any retry attempt. + * @throws InterruptedException if interrupted during the sleep operation. + */ + public static T evaluate(Callable eval, + int timeoutMillis, + Callable retry) throws Exception { + Preconditions.checkArgument(timeoutMillis >= 0, + "timeoutMillis must be >= 0"); + long endTime = Time.now() + timeoutMillis; + Exception ex; + boolean running; + int sleeptime; + int iterations = 0; + do { + iterations++; + try { + return eval.call(); + } catch (InterruptedException | FailFastException e) { + throw e; + } catch (Exception e) { + LOG.debug("evaluate() iteration {}", iterations, e); + ex = e; + } + running = Time.now() < endTime; + if (running && (sleeptime = retry.call()) >= 0) { + Thread.sleep(sleeptime); + } + } while (running); + // timeout. Throw the last exception raised + throw ex; + } + + /** + * Simplified {@code evaluate()} clause; fixed interval. + * @param check predicate to evaluate + * @param timeoutMillis wait interval between check failures + * @param intervalMillis interval in milliseconds + * @return result of the first successful eval call + * @throws Exception the last exception thrown before timeout was triggered + * @throws FailFastException if raised -without any retry attempt. + * @throws InterruptedException if interrupted during the sleep operation. + */ + public static T evaluate(Callable eval, + int timeoutMillis, + int intervalMillis) throws Exception { + return evaluate(eval, + timeoutMillis, + new FixedRetryInterval(intervalMillis)); + } + + /** + * Intercept an exception; raise an exception if it was not raised. + * Exceptions of the wrong class are also rethrown. + * @param clazz class of exception; the raised exception must be this class + * or a subclass. + * @param eval expression to eval + * @param return type of expression + * @param exception class + * @return the caught exception if it was of the expected type + * @throws Exception any other exception raised + * @throws AssertionError if the evaluation call didn't raise an exception. + * The error includes the {@code toString()} value of the result, if this + * can be determined. + */ + public static E intercept( + Class clazz, + Callable eval) + throws Exception { + try { + T result = eval.call(); + throw new AssertionError("Expected an exception, got " + + robustToString(result)); + } catch (Throwable e) { + if (clazz.isAssignableFrom(e.getClass())) { + return (E)e; + } else { + throw e; + } + } + } + + /** + * Intercept an exception; raise an exception if it was not raised. + * Exceptions of the wrong class are also rethrown. + * @param clazz class of exception; the raised exception must be this class + * or a subclass. + * @param contained string which must be in the {@code toString()} value + * of the exception + * @param eval expression to eval + * @param return type of expression + * @param exception class + * @return the caught exception if it was of the expected type and contents + * @throws Exception any other exception raised + * @throws AssertionError if the evaluation call didn't raise an exception. + * The error includes the {@code toString()} value of the result, if this + * can be determined. + */ + public static E intercept( + Class clazz, + String contained, + Callable eval) + throws Exception { + E ex = intercept(clazz, eval); + GenericTestUtils.assertExceptionContains(contained, ex); + return ex; + } + + /** + * Robust string converter for exception messages; if the {@code toString()} + * method throws an exception then that exception is caught and logged, + * then a simple string of the classname logged. + * This stops a toString() failure hiding underlying problems in the code. + * @param o object to stringify + * @return a string for exception messages + */ + private static String robustToString(Object o) { + if (o == null) { + return NULL_RESULT; + } else { + try { + return o.toString(); + } catch (Exception e) { + LOG.info("Exception calling toString()", e); + return o.getClass().toString(); + } + } + } + + /** + * Returns {@code TimeoutException} on a timeout. If + * there was a inner class passed in, includes it as the + * inner failure. + */ + public static class GenerateTimeout implements TimeoutHandler { + private final String message; + + public GenerateTimeout(String message) { + this.message = message; + } + + public GenerateTimeout() { + this("timeout"); + } + + /** + * Evaluate by creating a new Timeout Exception. + * @param timeoutMillis timeout in millis + * @param caught optional caught exception + * @return TimeoutException + */ + @Override + public Exception evaluate(int timeoutMillis, Exception caught) + throws Exception { + String s = String.format("%s: after %d millis", message, + timeoutMillis); + String caughtText = caught != null + ? ("; " + robustToString(caught)) : ""; + + return (TimeoutException) (new TimeoutException(s + caughtText) + .initCause(caught)); + } + } + + /** + * Retry at a fixed time period between calls. + */ + public static class FixedRetryInterval implements Callable { + private final int intervalMillis; + private int invocationCount = 0; + + public FixedRetryInterval(int intervalMillis) { + Preconditions.checkArgument(intervalMillis > 0); + this.intervalMillis = intervalMillis; + } + + @Override + public Integer call() throws Exception { + invocationCount++; + return intervalMillis; + } + + public int getInvocationCount() { + return invocationCount; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "FixedRetryInterval{"); + sb.append("interval=").append(intervalMillis); + sb.append(", invocationCount=").append(invocationCount); + sb.append('}'); + return sb.toString(); + } + } + + /** + * Gradually increase the sleep time by the initial interval, until + * the limit set by {@code maxIntervalMillis} is reached. + */ + public static class LinearRetryInterval implements Callable { + private final int intervalMillis; + private final int maxIntervalMillis; + private int current; + private int invocationCount = 0; + + public LinearRetryInterval(int intervalMillis, int maxIntervalMillis) { + Preconditions.checkArgument(intervalMillis > 0); + Preconditions.checkArgument(maxIntervalMillis > 0); + this.intervalMillis = intervalMillis; + this.current = intervalMillis; + this.maxIntervalMillis = maxIntervalMillis; + } + + @Override + public Integer call() throws Exception { + invocationCount++; + int last = current; + if (last < maxIntervalMillis) { + current += intervalMillis; + } + return last; + } + + public int getInvocationCount() { + return invocationCount; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "LinearRetryInterval{"); + sb.append("interval=").append(intervalMillis); + sb.append(", current=").append(current); + sb.append(", limit=").append(maxIntervalMillis); + sb.append(", invocationCount=").append(invocationCount); + sb.append('}'); + return sb.toString(); + } + } + + /** + * An exception which triggers a fast exist from the + * {@link #evaluate(Callable, int, Callable)} and + * {@link #eventually(Callable, int, Callable, TimeoutHandler)} loops. + */ + public static class FailFastException extends Exception { + + public FailFastException(String detailMessage) { + super(detailMessage); + } + + public FailFastException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Instantiate from a format string. + * @param format format string + * @param args arguments to format + * @return an instance with the message string constructed. + */ + public static FailFastException newInstance(String format, Object...args) { + return new FailFastException(String.format(format, args)); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/TestLambdaTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/TestLambdaTestUtils.java new file mode 100644 index 00000000000..a2af8647933 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/TestLambdaTestUtils.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.test; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.test.LambdaTestUtils.*; +import static org.apache.hadoop.test.GenericTestUtils.*; + +/** + * Test the logic in {@link LambdaTestUtils}. + * This test suite includes Java 8 and Java 7 code; the Java 8 code exists + * to verify that the API is easily used with Lambda expressions. + */ +public class TestLambdaTestUtils extends Assert { + + public static final int INTERVAL = 10; + public static final int TIMEOUT = 50; + private FixedRetryInterval retry = new FixedRetryInterval(INTERVAL); + // counter for lambda expressions to use + private int count; + + /** + * Always evaluates to true. + */ + public static final Callable ALWAYS_TRUE = + new Callable() { + @Override + public Boolean call() throws Exception { + return true; + } + }; + + /** + * Always evaluates to false. + */ + public static final Callable ALWAYS_FALSE = + new Callable() { + @Override + public Boolean call() throws Exception { + return false; + } + }; + + /** + * Text in the raised FNFE. + */ + public static final String MISSING = "not found"; + + /** + * A predicate that always throws a FileNotFoundException. + */ + public static final Callable ALWAYS_FNFE = + new Callable() { + @Override + public Boolean call() throws Exception { + throw new FileNotFoundException(MISSING); + } + }; + + /** + * reusable timeout handler. + */ + public static final GenerateTimeout + TIMEOUT_FAILURE_HANDLER = new GenerateTimeout(); + + /** + * Always evaluates to 3L. + */ + public static final Callable EVAL_3L = new Callable() { + @Override + public Long call() throws Exception { + return 3L; + } + }; + + /** + * Always raises a {@code FileNotFoundException}. + */ + public static final Callable EVAL_FNFE = new Callable() { + @Override + public Long call() throws Exception { + throw new FileNotFoundException(MISSING); + } + }; + + @Test + public void testEventuallyAlwaysTrue() throws Throwable { + eventually( + ALWAYS_TRUE, + TIMEOUT, + new FixedRetryInterval(INTERVAL), + TIMEOUT_FAILURE_HANDLER); + } + + @Test + public void testEventuallyAlwaysFalse() throws Throwable { + try { + eventually( + ALWAYS_FALSE, + TIMEOUT, + retry, + TIMEOUT_FAILURE_HANDLER); + fail("should not have got here"); + } catch (TimeoutException e) { + assertTrue(retry.getInvocationCount() > 4); + } + } + + @Test + public void testEventuallyLinearRetry() throws Throwable { + LinearRetryInterval linearRetry = new LinearRetryInterval( + INTERVAL * 2, + TIMEOUT * 2); + try { + eventually( + ALWAYS_FALSE, + TIMEOUT, + linearRetry, + TIMEOUT_FAILURE_HANDLER); + fail("should not have got here"); + } catch (TimeoutException e) { + assertEquals(linearRetry.toString(), + 2, linearRetry.getInvocationCount()); + } + } + + @Test + public void testEventuallyFNFE() throws Throwable { + try { + eventually( + ALWAYS_FNFE, + TIMEOUT, + retry, + TIMEOUT_FAILURE_HANDLER); + fail("should not have got here"); + } catch (TimeoutException e) { + // inner clause is included + assertTrue(retry.getInvocationCount() > 0); + assertTrue(e.getCause() instanceof FileNotFoundException); + assertExceptionContains(MISSING, e); + } + } + + @Test + public void testEvaluate() throws Throwable { + long result = evaluate(EVAL_3L, + TIMEOUT, + retry); + assertEquals(3, result); + assertEquals(0, retry.getInvocationCount()); + } + + @Test + public void testEvalFailuresRetry() throws Throwable { + try { + evaluate(EVAL_FNFE, + TIMEOUT, + retry); + fail("should not have got here"); + } catch (IOException expected) { + // expected + assertMinRetryCount(1); + } + } + + @Test + public void testLinearRetryInterval() throws Throwable { + LinearRetryInterval interval = new LinearRetryInterval(200, 1000); + assertEquals(200, (int) interval.call()); + assertEquals(400, (int) interval.call()); + assertEquals(600, (int) interval.call()); + assertEquals(800, (int) interval.call()); + assertEquals(1000, (int) interval.call()); + assertEquals(1000, (int) interval.call()); + assertEquals(1000, (int) interval.call()); + } + + @Test + public void testInterceptSuccess() throws Throwable { + IOException ioe = intercept(IOException.class, ALWAYS_FNFE); + assertExceptionContains(MISSING, ioe); + } + + @Test + public void testInterceptContainsSuccess() throws Throwable { + intercept(IOException.class, MISSING, ALWAYS_FNFE); + } + + @Test + public void testInterceptContainsWrongString() throws Throwable { + try { + FileNotFoundException e = + intercept(FileNotFoundException.class, "404", ALWAYS_FNFE); + throw e; + } catch (AssertionError expected) { + assertExceptionContains(MISSING, expected); + } + } + + /** + * Assert the retry count is as expected. + * @param expected expected value + */ + protected void assertRetryCount(int expected) { + assertEquals(retry.toString(), expected, retry.getInvocationCount()); + } + + /** + * Assert the retry count is as expected. + * @param minCount minimum value + */ + protected void assertMinRetryCount(int minCount) { + assertTrue("retry count of " + retry + + " is not >= " + minCount, + minCount <= retry.getInvocationCount()); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java index 06864881e38..728b1a90ea9 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java @@ -25,6 +25,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.AbstractFSContract; import org.apache.hadoop.fs.contract.AbstractFSContractTestBase; import org.apache.hadoop.fs.contract.s3a.S3AContract; +import org.apache.hadoop.test.LambdaTestUtils; + import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,8 +36,6 @@ import java.io.FileNotFoundException; import java.util.concurrent.Callable; import static org.apache.hadoop.fs.contract.ContractTestUtils.*; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; -import static org.apache.hadoop.fs.s3a.S3AUtils.*; /** * Test S3A Failure translation, including a functional test @@ -68,13 +68,13 @@ public class ITestS3AFailureHandling extends AbstractFSContractTestBase { writeDataset(fs, testpath, shortDataset, shortDataset.length, 1024, true); // here the file length is less. Probe the file to see if this is true, // with a spin and wait - eventually(30 *1000, new Callable() { + LambdaTestUtils.evaluate(new Callable() { @Override public Void call() throws Exception { assertEquals(shortLen, fs.getFileStatus(testpath).getLen()); return null; } - }); + }, 30 * 1000, 1000); // here length is shorter. Assuming it has propagated to all replicas, // the position of the input stream is now beyond the EOF. // An attempt to seek backwards to a position greater than the diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index c67e118e976..249ba10d6b8 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -135,32 +135,6 @@ public class S3ATestUtils { return fc; } - /** - * Repeatedly attempt a callback until timeout or a {@link FailFastException} - * is raised. This is modeled on ScalaTests {@code eventually(Closure)} code. - * @param timeout timeout - * @param callback callback to invoke - * @throws FailFastException any fast-failure - * @throws Exception the exception which caused the iterator to fail - */ - public static void eventually(int timeout, Callable callback) - throws Exception { - Exception lastException; - long endtime = System.currentTimeMillis() + timeout; - do { - try { - callback.call(); - return; - } catch (InterruptedException | FailFastException e) { - throw e; - } catch (Exception e) { - lastException = e; - } - Thread.sleep(500); - } while (endtime > System.currentTimeMillis()); - throw lastException; - } - /** * patch the endpoint option so that irrespective of where other tests * are working, the IO performance tests can work with the landsat @@ -290,27 +264,6 @@ public class S3ATestUtils { ? propval : confVal; } - /** - * The exception to raise so as to exit fast from - * {@link #eventually(int, Callable)}. - */ - public static class FailFastException extends Exception { - public FailFastException() { - } - - public FailFastException(String message) { - super(message); - } - - public FailFastException(String message, Throwable cause) { - super(message, cause); - } - - public FailFastException(Throwable cause) { - super(cause); - } - } - /** * Verify the class of an exception. If it is not as expected, rethrow it. * Comparison is on the exact class, not subclass-of inference as