HADOOP-18162. hadoop-common support for MAPREDUCE-7341 Manifest Committer

* New statistic names in StoreStatisticNames
  (for joint use with s3a committers)
* Improvements to IOStatistics implementation classes
* RateLimiting wrapper to guava RateLimiter
* S3A committer Tasks moved over as TaskPool and
  added support for RemoteIterator
* JsonSerialization.load() to fail fast if source does not exist

+ tests.

This commit is a prerequisite for the main MAPREDUCE-7341 Manifest Committer
patch.

Contributed by Steve Loughran

Change-Id: Ia92e2ab5083ac3d8d3d713a4d9cb3e9e0278f654
This commit is contained in:
Steve Loughran 2022-03-16 15:12:03 +00:00
parent 712d9bece8
commit e06ed88012
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
11 changed files with 1487 additions and 2 deletions

View File

@ -112,6 +112,9 @@ public final class StoreStatisticNames {
/** {@value}. */ /** {@value}. */
public static final String OP_MODIFY_ACL_ENTRIES = "op_modify_acl_entries"; public static final String OP_MODIFY_ACL_ENTRIES = "op_modify_acl_entries";
/** {@value}. */
public static final String OP_MSYNC = "op_msync";
/** {@value}. */ /** {@value}. */
public static final String OP_OPEN = "op_open"; public static final String OP_OPEN = "op_open";
@ -172,6 +175,9 @@ public final class StoreStatisticNames {
public static final String STORE_IO_THROTTLED public static final String STORE_IO_THROTTLED
= "store_io_throttled"; = "store_io_throttled";
/** Rate limiting was reported {@value}. */
public static final String STORE_IO_RATE_LIMITED = "store_io_rate_limited";
/** Requests made of a store: {@value}. */ /** Requests made of a store: {@value}. */
public static final String STORE_IO_REQUEST public static final String STORE_IO_REQUEST
= "store_io_request"; = "store_io_request";

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.fs.statistics.impl;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.time.Duration;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -450,12 +451,37 @@ public final class IOStatisticsBinding {
* @param factory factory of duration trackers * @param factory factory of duration trackers
* @param statistic statistic key * @param statistic statistic key
* @param input input callable. * @param input input callable.
* @throws IOException IO failure.
*/ */
public static void trackDurationOfInvocation( public static void trackDurationOfInvocation(
DurationTrackerFactory factory, DurationTrackerFactory factory,
String statistic, String statistic,
InvocationRaisingIOE input) throws IOException { InvocationRaisingIOE input) throws IOException {
measureDurationOfInvocation(factory, statistic, input);
}
/**
* Given an IOException raising callable/lambda expression,
* execute it and update the relevant statistic,
* returning the measured duration.
*
* {@link #trackDurationOfInvocation(DurationTrackerFactory, String, InvocationRaisingIOE)}
* with the duration returned for logging etc.; added as a new
* method to avoid linking problems with any code calling the existing
* method.
*
* @param factory factory of duration trackers
* @param statistic statistic key
* @param input input callable.
* @return the duration of the operation, as measured by the duration tracker.
* @throws IOException IO failure.
*/
public static Duration measureDurationOfInvocation(
DurationTrackerFactory factory,
String statistic,
InvocationRaisingIOE input) throws IOException {
// create the tracker outside try-with-resources so // create the tracker outside try-with-resources so
// that failures can be set in the catcher. // that failures can be set in the catcher.
DurationTracker tracker = createTracker(factory, statistic); DurationTracker tracker = createTracker(factory, statistic);
@ -473,6 +499,7 @@ public final class IOStatisticsBinding {
// set the failed flag. // set the failed flag.
tracker.close(); tracker.close();
} }
return tracker.asDuration();
} }
/** /**
@ -622,7 +649,7 @@ public final class IOStatisticsBinding {
* @param statistic statistic to track * @param statistic statistic to track
* @return a duration tracker. * @return a duration tracker.
*/ */
private static DurationTracker createTracker( public static DurationTracker createTracker(
@Nullable final DurationTrackerFactory factory, @Nullable final DurationTrackerFactory factory,
final String statistic) { final String statistic) {
return factory != null return factory != null

View File

@ -255,4 +255,15 @@ public interface IOStatisticsStore extends IOStatistics,
*/ */
void addTimedOperation(String prefix, Duration duration); void addTimedOperation(String prefix, Duration duration);
/**
* Add a statistics sample as a min, max and mean and count.
* @param key key to add.
* @param count count.
*/
default void addSample(String key, long count) {
incrementCounter(key, count);
addMeanStatisticSample(key, count);
addMaximumSample(key, count);
addMinimumSample(key, count);
}
} }

View File

@ -88,6 +88,11 @@ final class PairedDurationTrackerFactory implements DurationTrackerFactory {
public Duration asDuration() { public Duration asDuration() {
return firstDuration.asDuration(); return firstDuration.asDuration();
} }
@Override
public String toString() {
return firstDuration.toString();
}
} }
} }

View File

@ -103,4 +103,11 @@ public class StatisticDurationTracker extends OperationDuration
} }
iostats.addTimedOperation(name, asDuration()); iostats.addTimedOperation(name, asDuration());
} }
@Override
public String toString() {
return " Duration of " +
(failed? (key + StoreStatisticNames.SUFFIX_FAILURES) : key)
+ ": " + super.toString();
}
} }

View File

@ -173,6 +173,9 @@ public class JsonSerialization<T> {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public synchronized T load(File jsonFile) public synchronized T load(File jsonFile)
throws IOException, JsonParseException, JsonMappingException { throws IOException, JsonParseException, JsonMappingException {
if (!jsonFile.exists()) {
throw new FileNotFoundException("No such file: " + jsonFile);
}
if (!jsonFile.isFile()) { if (!jsonFile.isFile()) {
throw new FileNotFoundException("Not a file: " + jsonFile); throw new FileNotFoundException("Not a file: " + jsonFile);
} }
@ -182,7 +185,7 @@ public class JsonSerialization<T> {
try { try {
return mapper.readValue(jsonFile, classType); return mapper.readValue(jsonFile, classType);
} catch (IOException e) { } catch (IOException e) {
LOG.error("Exception while parsing json file {}", jsonFile, e); LOG.warn("Exception while parsing json file {}", jsonFile, e);
throw e; throw e;
} }
} }

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import java.time.Duration;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Minimal subset of google rate limiter class.
* Can be used to throttle use of object stores where excess load
* will trigger cluster-wide throttling, backoff etc. and so collapse
* performance.
* The time waited is returned as a Duration type.
* The google rate limiter implements this by allowing a caller to ask for
* more capacity than is available. This will be granted
* but the subsequent request will be blocked if the bucket of
* capacity hasn't let refilled to the point where there is
* capacity again.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface RateLimiting {
/**
* Acquire rate limiter capacity.
* If there is not enough space, the permits will be acquired,
* but the subsequent call will block until the capacity has been
* refilled.
* @param requestedCapacity capacity to acquire.
* @return time spent waiting for output.
*/
Duration acquire(int requestedCapacity);
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import java.time.Duration;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.RateLimiter;
/**
* Factory for Rate Limiting.
* This should be only place in the code where the guava RateLimiter is imported.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class RateLimitingFactory {
private static final RateLimiting UNLIMITED = new NoRateLimiting();
/**
* No waiting took place.
*/
private static final Duration INSTANTLY = Duration.ofMillis(0);
private RateLimitingFactory() {
}
/**
* No Rate Limiting.
*/
private static class NoRateLimiting implements RateLimiting {
@Override
public Duration acquire(int requestedCapacity) {
return INSTANTLY;
}
}
/**
* Rate limiting restricted to that of a google rate limiter.
*/
private static final class RestrictedRateLimiting implements RateLimiting {
private final RateLimiter limiter;
/**
* Constructor.
* @param capacityPerSecond capacity in permits/second.
*/
private RestrictedRateLimiting(int capacityPerSecond) {
this.limiter = RateLimiter.create(capacityPerSecond);
}
@Override
public Duration acquire(int requestedCapacity) {
final double delayMillis = limiter.acquire(requestedCapacity);
return delayMillis == 0
? INSTANTLY
: Duration.ofMillis((long) (delayMillis * 1000));
}
}
/**
* Get the unlimited rate.
* @return a rate limiter which always has capacity.
*/
public static RateLimiting unlimitedRate() {
return UNLIMITED;
}
/**
* Create an instance.
* If the rate is 0; return the unlimited rate.
* @param capacity capacity in permits/second.
* @return limiter restricted to the given capacity.
*/
public static RateLimiting create(int capacity) {
return capacity == 0
? unlimitedRate()
: new RestrictedRateLimiting(capacity);
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util.functional;
import java.io.Closeable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import static java.util.Objects.requireNonNull;
/**
* A task submitter which is closeable, and whose close() call
* shuts down the pool. This can help manage
* thread pool lifecycles.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public final class CloseableTaskPoolSubmitter implements TaskPool.Submitter,
Closeable {
/** Executors. */
private ExecutorService pool;
/**
* Constructor.
* @param pool non-null executor.
*/
public CloseableTaskPoolSubmitter(final ExecutorService pool) {
this.pool = requireNonNull(pool);
}
/**
* Get the pool.
* @return the pool.
*/
public ExecutorService getPool() {
return pool;
}
/**
* Shut down the pool.
*/
@Override
public void close() {
if (pool != null) {
pool.shutdown();
pool = null;
}
}
@Override
public Future<?> submit(final Runnable task) {
return pool.submit(task);
}
}

View File

@ -0,0 +1,613 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util.functional;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.RemoteIterator;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.util.functional.RemoteIterators.remoteIteratorFromIterable;
/**
* Utility class for parallel execution, takes closures for the various
* actions.
* There is no retry logic: it is expected to be handled by the closures.
* From {@code org.apache.hadoop.fs.s3a.commit.Tasks} which came from
* the Netflix committer patch.
* Apache Iceberg has its own version of this, with a common ancestor
* at some point in its history.
* A key difference with this class is that the iterator is always,
* internally, an {@link RemoteIterator}.
* This is to allow tasks to be scheduled while incremental operations
* such as paged directory listings are still collecting in results.
*
* While awaiting completion, this thread spins and sleeps a time of
* {@link #SLEEP_INTERVAL_AWAITING_COMPLETION}, which, being a
* busy-wait, is inefficient.
* There's an implicit assumption that remote IO is being performed, and
* so this is not impacting throughput/performance.
*
* History:
* This class came with the Netflix contributions to the S3A committers
* in HADOOP-13786.
* It was moved into hadoop-common for use in the manifest committer and
* anywhere else it is needed, and renamed in the process as
* "Tasks" has too many meanings in the hadoop source.
* The iterator was then changed from a normal java iterable
* to a hadoop {@link org.apache.hadoop.fs.RemoteIterator}.
* This allows a task pool to be supplied with incremental listings
* from object stores, scheduling work as pages of listing
* results come in, rather than blocking until the entire
* directory/directory tree etc has been enumerated.
*
* There is a variant of this in Apache Iceberg in
* {@code org.apache.iceberg.util.Tasks}
* That is not derived from any version in the hadoop codebase, it
* just shares a common ancestor somewhere in the Netflix codebase.
* It is the more sophisticated version.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class TaskPool {
private static final Logger LOG =
LoggerFactory.getLogger(TaskPool.class);
/**
* Interval in milliseconds to await completion.
*/
private static final int SLEEP_INTERVAL_AWAITING_COMPLETION = 10;
private TaskPool() {
}
/**
* Callback invoked to process an item.
* @param <I> item type being processed
* @param <E> exception class which may be raised
*/
@FunctionalInterface
public interface Task<I, E extends Exception> {
void run(I item) throws E;
}
/**
* Callback invoked on a failure.
* @param <I> item type being processed
* @param <E> exception class which may be raised
*/
@FunctionalInterface
public interface FailureTask<I, E extends Exception> {
/**
* process a failure.
* @param item item the task is processing
* @param exception the exception which was raised.
* @throws E Exception of type E
*/
void run(I item, Exception exception) throws E;
}
/**
* Builder for task execution.
* @param <I> item type
*/
public static class Builder<I> {
private final RemoteIterator<I> items;
private Submitter service = null;
private FailureTask<I, ?> onFailure = null;
private boolean stopOnFailure = false;
private boolean suppressExceptions = false;
private Task<I, ?> revertTask = null;
private boolean stopRevertsOnFailure = false;
private Task<I, ?> abortTask = null;
private boolean stopAbortsOnFailure = false;
private int sleepInterval = SLEEP_INTERVAL_AWAITING_COMPLETION;
/**
* Create the builder.
* @param items items to process
*/
Builder(RemoteIterator<I> items) {
this.items = requireNonNull(items, "items");
}
/**
* Create the builder.
* @param items items to process
*/
Builder(Iterable<I> items) {
this(remoteIteratorFromIterable(items));
}
/**
* Declare executor service: if null, the tasks are executed in a single
* thread.
* @param submitter service to schedule tasks with.
* @return this builder.
*/
public Builder<I> executeWith(@Nullable Submitter submitter) {
this.service = submitter;
return this;
}
/**
* Task to invoke on failure.
* @param task task
* @return the builder
*/
public Builder<I> onFailure(FailureTask<I, ?> task) {
this.onFailure = task;
return this;
}
public Builder<I> stopOnFailure() {
this.stopOnFailure = true;
return this;
}
/**
* Suppress exceptions from tasks.
* RemoteIterator exceptions are not suppressable.
* @return the builder.
*/
public Builder<I> suppressExceptions() {
return suppressExceptions(true);
}
/**
* Suppress exceptions from tasks.
* RemoteIterator exceptions are not suppressable.
* @param suppress new value
* @return the builder.
*/
public Builder<I> suppressExceptions(boolean suppress) {
this.suppressExceptions = suppress;
return this;
}
/**
* Task to revert with after another task failed.
* @param task task to execute
* @return the builder
*/
public Builder<I> revertWith(Task<I, ?> task) {
this.revertTask = task;
return this;
}
/**
* Stop trying to revert if one operation fails.
* @return the builder
*/
public Builder<I> stopRevertsOnFailure() {
this.stopRevertsOnFailure = true;
return this;
}
/**
* Task to abort with after another task failed.
* @param task task to execute
* @return the builder
*/
public Builder<I> abortWith(Task<I, ?> task) {
this.abortTask = task;
return this;
}
/**
* Stop trying to abort if one operation fails.
* @return the builder
*/
public Builder<I> stopAbortsOnFailure() {
this.stopAbortsOnFailure = true;
return this;
}
/**
* Set the sleep interval.
* @param value new value
* @return the builder
*/
public Builder sleepInterval(final int value) {
sleepInterval = value;
return this;
}
/**
* Execute the task across the data.
* @param task task to execute
* @param <E> exception which may be raised in execution.
* @return true if the operation executed successfully
* @throws E any exception raised.
* @throws IOException IOExceptions raised by remote iterator or in execution.
*/
public <E extends Exception> boolean run(Task<I, E> task) throws E, IOException {
requireNonNull(items, "items");
if (!items.hasNext()) {
// if there are no items, return without worrying about
// execution pools, errors etc.
return true;
}
if (service != null) {
// thread pool, so run in parallel
return runParallel(task);
} else {
// single threaded execution.
return runSingleThreaded(task);
}
}
/**
* Single threaded execution.
* @param task task to execute
* @param <E> exception which may be raised in execution.
* @return true if the operation executed successfully
* @throws E any exception raised.
* @throws IOException IOExceptions raised by remote iterator or in execution.
*/
private <E extends Exception> boolean runSingleThreaded(Task<I, E> task)
throws E, IOException {
List<I> succeeded = new ArrayList<>();
List<Exception> exceptions = new ArrayList<>();
RemoteIterator<I> iterator = items;
boolean threw = true;
try {
while (iterator.hasNext()) {
I item = iterator.next();
try {
task.run(item);
succeeded.add(item);
} catch (Exception e) {
exceptions.add(e);
if (onFailure != null) {
try {
onFailure.run(item, e);
} catch (Exception failException) {
LOG.error("Failed to clean up on failure", e);
// keep going
}
}
if (stopOnFailure) {
break;
}
}
}
threw = false;
} catch (IOException iteratorIOE) {
// an IOE is reaised here during iteration
LOG.debug("IOException when iterating through {}", iterator, iteratorIOE);
throw iteratorIOE;
} finally {
// threw handles exceptions that were *not* caught by the catch block,
// and exceptions that were caught and possibly handled by onFailure
// are kept in exceptions.
if (threw || !exceptions.isEmpty()) {
if (revertTask != null) {
boolean failed = false;
for (I item : succeeded) {
try {
revertTask.run(item);
} catch (Exception e) {
LOG.error("Failed to revert task", e);
failed = true;
// keep going
}
if (stopRevertsOnFailure && failed) {
break;
}
}
}
if (abortTask != null) {
boolean failed = false;
while (iterator.hasNext()) {
try {
abortTask.run(iterator.next());
} catch (Exception e) {
failed = true;
LOG.error("Failed to abort task", e);
// keep going
}
if (stopAbortsOnFailure && failed) {
break;
}
}
}
}
}
if (!suppressExceptions && !exceptions.isEmpty()) {
TaskPool.<E>throwOne(exceptions);
}
return exceptions.isEmpty();
}
/**
* Parallel execution.
* @param task task to execute
* @param <E> exception which may be raised in execution.
* @return true if the operation executed successfully
* @throws E any exception raised.
* @throws IOException IOExceptions raised by remote iterator or in execution.
*/
private <E extends Exception> boolean runParallel(final Task<I, E> task)
throws E, IOException {
final Queue<I> succeeded = new ConcurrentLinkedQueue<>();
final Queue<Exception> exceptions = new ConcurrentLinkedQueue<>();
final AtomicBoolean taskFailed = new AtomicBoolean(false);
final AtomicBoolean abortFailed = new AtomicBoolean(false);
final AtomicBoolean revertFailed = new AtomicBoolean(false);
List<Future<?>> futures = new ArrayList<>();
IOException iteratorIOE = null;
final RemoteIterator<I> iterator = this.items;
try {
while(iterator.hasNext()) {
final I item = iterator.next();
// submit a task for each item that will either run or abort the task
futures.add(service.submit(() -> {
if (!(stopOnFailure && taskFailed.get())) {
// run the task
boolean threw = true;
try {
LOG.debug("Executing task");
task.run(item);
succeeded.add(item);
LOG.debug("Task succeeded");
threw = false;
} catch (Exception e) {
taskFailed.set(true);
exceptions.add(e);
LOG.info("Task failed {}", e.toString());
LOG.debug("Task failed", e);
if (onFailure != null) {
try {
onFailure.run(item, e);
} catch (Exception failException) {
LOG.warn("Failed to clean up on failure", e);
// swallow the exception
}
}
} finally {
if (threw) {
taskFailed.set(true);
}
}
} else if (abortTask != null) {
// abort the task instead of running it
if (stopAbortsOnFailure && abortFailed.get()) {
return;
}
boolean failed = true;
try {
LOG.info("Aborting task");
abortTask.run(item);
failed = false;
} catch (Exception e) {
LOG.error("Failed to abort task", e);
// swallow the exception
} finally {
if (failed) {
abortFailed.set(true);
}
}
}
}));
}
} catch (IOException e) {
// iterator failure.
LOG.debug("IOException when iterating through {}", iterator, e);
iteratorIOE = e;
// mark as a task failure so all submitted tasks will halt/abort
taskFailed.set(true);
}
// let the above tasks complete (or abort)
waitFor(futures, sleepInterval);
int futureCount = futures.size();
futures.clear();
if (taskFailed.get() && revertTask != null) {
// at least one task failed, revert any that succeeded
LOG.info("Reverting all {} succeeded tasks from {} futures",
succeeded.size(), futureCount);
for (final I item : succeeded) {
futures.add(service.submit(() -> {
if (stopRevertsOnFailure && revertFailed.get()) {
return;
}
boolean failed = true;
try {
revertTask.run(item);
failed = false;
} catch (Exception e) {
LOG.error("Failed to revert task", e);
// swallow the exception
} finally {
if (failed) {
revertFailed.set(true);
}
}
}));
}
// let the revert tasks complete
waitFor(futures, sleepInterval);
}
// give priority to execution exceptions over
// iterator exceptions.
if (!suppressExceptions && !exceptions.isEmpty()) {
// there's an exception list to build up, cast and throw.
TaskPool.<E>throwOne(exceptions);
}
// raise any iterator exception.
// this can not be suppressed.
if (iteratorIOE != null) {
throw iteratorIOE;
}
// return true if all tasks succeeded.
return !taskFailed.get();
}
}
/**
* Wait for all the futures to complete; there's a small sleep between
* each iteration; enough to yield the CPU.
* @param futures futures.
* @param sleepInterval Interval in milliseconds to await completion.
*/
private static void waitFor(Collection<Future<?>> futures, int sleepInterval) {
int size = futures.size();
LOG.debug("Waiting for {} tasks to complete", size);
int oldNumFinished = 0;
while (true) {
int numFinished = (int) futures.stream().filter(Future::isDone).count();
if (oldNumFinished != numFinished) {
LOG.debug("Finished count -> {}/{}", numFinished, size);
oldNumFinished = numFinished;
}
if (numFinished == size) {
// all of the futures are done, stop looping
break;
} else {
try {
Thread.sleep(sleepInterval);
} catch (InterruptedException e) {
futures.forEach(future -> future.cancel(true));
Thread.currentThread().interrupt();
break;
}
}
}
}
/**
* Create a task builder for the iterable.
* @param items item source.
* @param <I> type of result.
* @return builder.
*/
public static <I> Builder<I> foreach(Iterable<I> items) {
return new Builder<>(requireNonNull(items, "items"));
}
/**
* Create a task builder for the remote iterator.
* @param items item source.
* @param <I> type of result.
* @return builder.
*/
public static <I> Builder<I> foreach(RemoteIterator<I> items) {
return new Builder<>(items);
}
public static <I> Builder<I> foreach(I[] items) {
return new Builder<>(Arrays.asList(requireNonNull(items, "items")));
}
/**
* Throw one exception, adding the others as suppressed
* exceptions attached to the one thrown.
* This method never completes normally.
* @param exceptions collection of exceptions
* @param <E> class of exceptions
* @throws E an extracted exception.
*/
private static <E extends Exception> void throwOne(
Collection<Exception> exceptions)
throws E {
Iterator<Exception> iter = exceptions.iterator();
Exception e = iter.next();
Class<? extends Exception> exceptionClass = e.getClass();
while (iter.hasNext()) {
Exception other = iter.next();
if (!exceptionClass.isInstance(other)) {
e.addSuppressed(other);
}
}
TaskPool.<E>castAndThrow(e);
}
/**
* Raise an exception of the declared type.
* This method never completes normally.
* @param e exception
* @param <E> class of exceptions
* @throws E a recast exception.
*/
@SuppressWarnings("unchecked")
private static <E extends Exception> void castAndThrow(Exception e) throws E {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
throw (E) e;
}
/**
* Interface to whatever lets us submit tasks.
*/
public interface Submitter {
/**
* Submit work.
* @param task task to execute
* @return the future of the submitted task.
*/
Future<?> submit(Runnable task);
}
}

View File

@ -0,0 +1,585 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util.functional;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.test.HadoopTestBase;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test Task Pool class.
* This is pulled straight out of the S3A version.
*/
@RunWith(Parameterized.class)
public class TestTaskPool extends HadoopTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(TestTaskPool.class);
public static final int ITEM_COUNT = 16;
private static final int FAILPOINT = 8;
private final int numThreads;
/**
* Thread pool for task execution.
*/
private ExecutorService threadPool;
/**
* Task submitter bonded to the thread pool, or
* null for the 0-thread case.
*/
private TaskPool.Submitter submitter;
private final CounterTask failingTask
= new CounterTask("failing committer", FAILPOINT, Item::commit);
private final FailureCounter failures
= new FailureCounter("failures", 0, null);
private final CounterTask reverter
= new CounterTask("reverter", 0, Item::revert);
private final CounterTask aborter
= new CounterTask("aborter", 0, Item::abort);
/**
* Test array for parameterized test runs: how many threads and
* to use. Threading makes some of the assertions brittle; there are
* more checks on single thread than parallel ops.
* @return a list of parameter tuples.
*/
@Parameterized.Parameters(name = "threads={0}")
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{0},
{1},
{3},
{8},
{16},
});
}
private List<Item> items;
/**
* Construct the parameterized test.
* @param numThreads number of threads
*/
public TestTaskPool(int numThreads) {
this.numThreads = numThreads;
}
/**
* In a parallel test run there is more than one thread doing the execution.
* @return true if the threadpool size is >1
*/
public boolean isParallel() {
return numThreads > 1;
}
@Before
public void setup() {
items = IntStream.rangeClosed(1, ITEM_COUNT)
.mapToObj(i -> new Item(i,
String.format("With %d threads", numThreads)))
.collect(Collectors.toList());
if (numThreads > 0) {
threadPool = Executors.newFixedThreadPool(numThreads,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(getMethodName() + "-pool-%d")
.build());
submitter = new PoolSubmitter();
} else {
submitter = null;
}
}
@After
public void teardown() {
if (threadPool != null) {
threadPool.shutdown();
threadPool = null;
}
}
private class PoolSubmitter implements TaskPool.Submitter {
@Override
public Future<?> submit(final Runnable task) {
return threadPool.submit(task);
}
}
/**
* create the builder.
* @return pre-inited builder
*/
private TaskPool.Builder<Item> builder() {
return TaskPool.foreach(items).executeWith(submitter);
}
private void assertRun(TaskPool.Builder<Item> builder,
CounterTask task) throws IOException {
boolean b = builder.run(task);
assertTrue("Run of " + task + " failed", b);
}
private void assertFailed(TaskPool.Builder<Item> builder,
CounterTask task) throws IOException {
boolean b = builder.run(task);
assertFalse("Run of " + task + " unexpectedly succeeded", b);
}
private String itemsToString() {
return "[" + items.stream().map(Item::toString)
.collect(Collectors.joining("\n")) + "]";
}
@Test
public void testSimpleInvocation() throws Throwable {
CounterTask t = new CounterTask("simple", 0, Item::commit);
assertRun(builder(), t);
t.assertInvoked("", ITEM_COUNT);
}
@Test
public void testFailNoStoppingSuppressed() throws Throwable {
assertFailed(builder().suppressExceptions(), failingTask);
failingTask.assertInvoked("Continued through operations", ITEM_COUNT);
items.forEach(Item::assertCommittedOrFailed);
}
@Test
public void testFailFastSuppressed() throws Throwable {
assertFailed(builder()
.suppressExceptions()
.stopOnFailure(),
failingTask);
if (isParallel()) {
failingTask.assertInvokedAtLeast("stop fast", FAILPOINT);
} else {
failingTask.assertInvoked("stop fast", FAILPOINT);
}
}
@Test
public void testFailedCallAbortSuppressed() throws Throwable {
assertFailed(builder()
.stopOnFailure()
.suppressExceptions()
.abortWith(aborter),
failingTask);
failingTask.assertInvokedAtLeast("success", FAILPOINT);
if (!isParallel()) {
aborter.assertInvokedAtLeast("abort", 1);
// all uncommitted items were aborted
items.stream().filter(i -> !i.committed)
.map(Item::assertAborted);
items.stream().filter(i -> i.committed)
.forEach(i -> assertFalse(i.toString(), i.aborted));
}
}
@Test
public void testFailedCalledWhenNotStoppingSuppressed() throws Throwable {
assertFailed(builder()
.suppressExceptions()
.onFailure(failures),
failingTask);
failingTask.assertInvokedAtLeast("success", FAILPOINT);
// only one failure was triggered
failures.assertInvoked("failure event", 1);
}
@Test
public void testFailFastCallRevertSuppressed() throws Throwable {
assertFailed(builder()
.stopOnFailure()
.revertWith(reverter)
.abortWith(aborter)
.suppressExceptions()
.onFailure(failures),
failingTask);
failingTask.assertInvokedAtLeast("success", FAILPOINT);
if (!isParallel()) {
aborter.assertInvokedAtLeast("abort", 1);
// all uncommitted items were aborted
items.stream().filter(i -> !i.committed)
.filter(i -> !i.failed)
.forEach(Item::assertAborted);
}
// all committed were reverted
items.stream().filter(i -> i.committed && !i.failed)
.forEach(Item::assertReverted);
// all reverted items are committed
items.stream().filter(i -> i.reverted)
.forEach(Item::assertCommitted);
// only one failure was triggered
failures.assertInvoked("failure event", 1);
}
@Test
public void testFailSlowCallRevertSuppressed() throws Throwable {
assertFailed(builder()
.suppressExceptions()
.revertWith(reverter)
.onFailure(failures),
failingTask);
failingTask.assertInvokedAtLeast("success", FAILPOINT);
// all committed were reverted
// identify which task failed from the set
int failing = failures.getItem().id;
items.stream()
.filter(i -> i.id != failing)
.filter(i -> i.committed)
.forEach(Item::assertReverted);
// all reverted items are committed
items.stream().filter(i -> i.reverted)
.forEach(Item::assertCommitted);
// only one failure was triggered
failures.assertInvoked("failure event", 1);
}
@Test
public void testFailFastExceptions() throws Throwable {
intercept(IOException.class,
() -> builder()
.stopOnFailure()
.run(failingTask));
if (isParallel()) {
failingTask.assertInvokedAtLeast("stop fast", FAILPOINT);
} else {
failingTask.assertInvoked("stop fast", FAILPOINT);
}
}
@Test
public void testFailSlowExceptions() throws Throwable {
intercept(IOException.class,
() -> builder()
.run(failingTask));
failingTask.assertInvoked("continued through operations", ITEM_COUNT);
items.forEach(Item::assertCommittedOrFailed);
}
@Test
public void testFailFastExceptionsWithAbortFailure() throws Throwable {
CounterTask failFirst = new CounterTask("task", 1, Item::commit);
CounterTask a = new CounterTask("aborter", 1, Item::abort);
intercept(IOException.class,
() -> builder()
.stopOnFailure()
.abortWith(a)
.run(failFirst));
if (!isParallel()) {
// expect the other tasks to be aborted
a.assertInvokedAtLeast("abort", ITEM_COUNT - 1);
}
}
@Test
public void testFailFastExceptionsWithAbortFailureStopped() throws Throwable {
CounterTask failFirst = new CounterTask("task", 1, Item::commit);
CounterTask a = new CounterTask("aborter", 1, Item::abort);
intercept(IOException.class,
() -> builder()
.stopOnFailure()
.stopAbortsOnFailure()
.abortWith(a)
.run(failFirst));
if (!isParallel()) {
// expect the other tasks to be aborted
a.assertInvoked("abort", 1);
}
}
/**
* Fail the last one committed, all the rest will be reverted.
* The actual ID of the last task has to be picke dup from the
* failure callback, as in the pool it may be one of any.
*/
@Test
public void testRevertAllSuppressed() throws Throwable {
CounterTask failLast = new CounterTask("task", ITEM_COUNT, Item::commit);
assertFailed(builder()
.suppressExceptions()
.stopOnFailure()
.revertWith(reverter)
.abortWith(aborter)
.onFailure(failures),
failLast);
failLast.assertInvoked("success", ITEM_COUNT);
int abCount = aborter.getCount();
int revCount = reverter.getCount();
assertEquals(ITEM_COUNT, 1 + abCount + revCount);
// identify which task failed from the set
int failing = failures.getItem().id;
// all committed were reverted
items.stream()
.filter(i -> i.id != failing)
.filter(i -> i.committed)
.forEach(Item::assertReverted);
items.stream()
.filter(i -> i.id != failing)
.filter(i -> !i.committed)
.forEach(Item::assertAborted);
// all reverted items are committed
items.stream().filter(i -> i.reverted)
.forEach(Item::assertCommitted);
// only one failure was triggered
failures.assertInvoked("failure event", 1);
}
/**
* The Item which tasks process.
*/
private final class Item {
private final int id;
private final String text;
private volatile boolean committed, aborted, reverted, failed;
private Item(int item, String text) {
this.id = item;
this.text = text;
}
boolean commit() {
committed = true;
return true;
}
boolean abort() {
aborted = true;
return true;
}
boolean revert() {
reverted = true;
return true;
}
boolean fail() {
failed = true;
return true;
}
public Item assertCommitted() {
assertTrue(toString() + " was not committed in\n"
+ itemsToString(),
committed);
return this;
}
public Item assertCommittedOrFailed() {
assertTrue(toString() + " was not committed nor failed in\n"
+ itemsToString(),
committed || failed);
return this;
}
public Item assertAborted() {
assertTrue(toString() + " was not aborted in\n"
+ itemsToString(),
aborted);
return this;
}
public Item assertReverted() {
assertTrue(toString() + " was not reverted in\n"
+ itemsToString(),
reverted);
return this;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Item{");
sb.append(String.format("[%02d]", id));
sb.append(", committed=").append(committed);
sb.append(", aborted=").append(aborted);
sb.append(", reverted=").append(reverted);
sb.append(", failed=").append(failed);
sb.append(", text=").append(text);
sb.append('}');
return sb.toString();
}
}
/**
* Class which can count invocations and, if limit > 0, will raise
* an exception on the specific invocation of {@link #note(Object)}
* whose count == limit.
*/
private class BaseCounter {
private final AtomicInteger counter = new AtomicInteger(0);
private final int limit;
private final String name;
private Item item;
private final Optional<Function<Item, Boolean>> action;
/**
* Base counter, tracks items.
* @param name name for string/exception/logs.
* @param limit limit at which an exception is raised, 0 == never
* @param action optional action to invoke after the increment,
* before limit check
*/
BaseCounter(String name,
int limit,
Function<Item, Boolean> action) {
this.name = name;
this.limit = limit;
this.action = Optional.ofNullable(action);
}
/**
* Apply the action to an item; log at info afterwards with both the
* before and after string values of the item.
* @param i item to process.
* @throws IOException failure in the action
*/
void process(Item i) throws IOException {
this.item = i;
int count = counter.incrementAndGet();
if (limit == count) {
i.fail();
LOG.info("{}: Failed {}", this, i);
throw new IOException(String.format("%s: Limit %d reached for %s",
this, limit, i));
}
String before = i.toString();
action.map(a -> a.apply(i));
LOG.info("{}: {} -> {}", this, before, i);
}
int getCount() {
return counter.get();
}
Item getItem() {
return item;
}
void assertInvoked(String text, int expected) {
assertEquals(toString() + ": " + text, expected, getCount());
}
void assertInvokedAtLeast(String text, int expected) {
int actual = getCount();
assertTrue(toString() + ": " + text
+ "-expected " + expected
+ " invocations, but got " + actual
+ " in " + itemsToString(),
expected <= actual);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"BaseCounter{");
sb.append("name='").append(name).append('\'');
sb.append(", count=").append(counter.get());
sb.append(", limit=").append(limit);
sb.append(", item=").append(item);
sb.append('}');
return sb.toString();
}
}
private final class CounterTask
extends BaseCounter implements TaskPool.Task<Item, IOException> {
private CounterTask(String name, int limit,
Function<Item, Boolean> action) {
super(name, limit, action);
}
@Override
public void run(Item item) throws IOException {
process(item);
}
}
private final class FailureCounter
extends BaseCounter
implements TaskPool.FailureTask<Item, IOException> {
private Exception exception;
private FailureCounter(String name, int limit,
Function<Item, Boolean> action) {
super(name, limit, action);
}
@Override
public void run(Item item, Exception ex) throws IOException {
process(item);
this.exception = ex;
}
private Exception getException() {
return exception;
}
}
}