MAPREDUCE-7435. Manifest Committer OOM on abfs (#5519)

This modifies the manifest committer so that the list of files
to rename is passed between stages as a file of
writeable entries on the local filesystem.

The map of directories to create is still passed in memory;
this map is built across all tasks, so even if many tasks
created files, if they all write into the same set of directories
the memory needed is O(directories) with the
task count not a factor.

The _SUCCESS file reports on heap size through gauges.
This should give a warning if there are problems.

Contributed by Steve Loughran
This commit is contained in:
Steve Loughran 2023-06-09 17:00:59 +01:00 committed by GitHub
parent 9c989515ba
commit 7a45ef4164
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 2580 additions and 491 deletions

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.statistics;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Setter for IOStatistics entries.
* These operations have been in the read/write API
* {@code IOStatisticsStore} since IOStatistics
* was added; extracting into its own interface allows for
* {@link IOStatisticsSnapshot} to also support it.
* These are the simple setters, they don't provide for increments,
* decrements, calculation of min/max/mean etc.
* @since The interface and IOStatisticsSnapshot support was added <i>after</i> Hadoop 3.3.5
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface IOStatisticsSetters extends IOStatistics {
/**
* Set a counter.
*
* No-op if the counter is unknown.
* @param key statistics key
* @param value value to set
*/
void setCounter(String key, long value);
/**
* Set a gauge.
*
* @param key statistics key
* @param value value to set
*/
void setGauge(String key, long value);
/**
* Set a maximum.
* @param key statistics key
* @param value value to set
*/
void setMaximum(String key, long value);
/**
* Set a minimum.
* @param key statistics key
* @param value value to set
*/
void setMinimum(String key, long value);
/**
* Set a mean statistic to a given value.
* @param key statistic key
* @param value new value.
*/
void setMeanStatistic(String key, MeanStatistic value);
}

View File

@ -62,7 +62,8 @@
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class IOStatisticsSnapshot
implements IOStatistics, Serializable, IOStatisticsAggregator {
implements IOStatistics, Serializable, IOStatisticsAggregator,
IOStatisticsSetters {
private static final long serialVersionUID = -1762522703841538084L;
@ -222,6 +223,33 @@ public synchronized Map<String, MeanStatistic> meanStatistics() {
return meanStatistics;
}
@Override
public synchronized void setCounter(final String key, final long value) {
counters().put(key, value);
}
@Override
public synchronized void setGauge(final String key, final long value) {
gauges().put(key, value);
}
@Override
public synchronized void setMaximum(final String key, final long value) {
maximums().put(key, value);
}
@Override
public synchronized void setMinimum(final String key, final long value) {
minimums().put(key, value);
}
@Override
public void setMeanStatistic(final String key, final MeanStatistic value) {
meanStatistics().put(key, value);
}
@Override
public String toString() {
return ioStatisticsToString(this);

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.statistics.impl;
package org.apache.hadoop.fs.statistics.impl;
import javax.annotation.Nullable;
import java.time.Duration;
@ -25,7 +25,6 @@
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.MeanStatistic;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
/**
* This may seem odd having an IOStatisticsStore which does nothing

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.statistics.IOStatisticsSetters;
import org.apache.hadoop.fs.statistics.MeanStatistic;
/**
@ -31,6 +32,7 @@
* use in classes which track statistics for reporting.
*/
public interface IOStatisticsStore extends IOStatistics,
IOStatisticsSetters,
IOStatisticsAggregator,
DurationTrackerFactory {
@ -56,24 +58,6 @@ default long incrementCounter(String key) {
*/
long incrementCounter(String key, long value);
/**
* Set a counter.
*
* No-op if the counter is unknown.
* @param key statistics key
* @param value value to set
*/
void setCounter(String key, long value);
/**
* Set a gauge.
*
* No-op if the gauge is unknown.
* @param key statistics key
* @param value value to set
*/
void setGauge(String key, long value);
/**
* Increment a gauge.
* <p>
@ -85,14 +69,6 @@ default long incrementCounter(String key) {
*/
long incrementGauge(String key, long value);
/**
* Set a maximum.
* No-op if the maximum is unknown.
* @param key statistics key
* @param value value to set
*/
void setMaximum(String key, long value);
/**
* Increment a maximum.
* <p>
@ -104,16 +80,6 @@ default long incrementCounter(String key) {
*/
long incrementMaximum(String key, long value);
/**
* Set a minimum.
* <p>
* No-op if the minimum is unknown.
* </p>
* @param key statistics key
* @param value value to set
*/
void setMinimum(String key, long value);
/**
* Increment a minimum.
* <p>
@ -147,16 +113,6 @@ default long incrementCounter(String key) {
*/
void addMaximumSample(String key, long value);
/**
* Set a mean statistic to a given value.
* <p>
* No-op if the key is unknown.
* </p>
* @param key statistic key
* @param value new value.
*/
void setMeanStatistic(String key, MeanStatistic value);
/**
* Add a sample to the mean statistics.
* <p>

View File

@ -67,6 +67,17 @@ public interface IOStatisticsStoreBuilder {
IOStatisticsStoreBuilder withDurationTracking(
String... prefixes);
/**
* A value which is tracked with counter/min/max/mean.
* Similar to {@link #withDurationTracking(String...)}
* but without the failure option and with the same name
* across all categories.
* @param prefixes prefixes to add.
* @return the builder
*/
IOStatisticsStoreBuilder withSampleTracking(
String... prefixes);
/**
* Build the collector.
* @return a new collector.

View File

@ -92,6 +92,18 @@ public IOStatisticsStoreBuilderImpl withDurationTracking(
return this;
}
@Override
public IOStatisticsStoreBuilderImpl withSampleTracking(
final String... prefixes) {
for (String p : prefixes) {
withCounters(p);
withMinimums(p);
withMaximums(p);
withMeanStatistics(p);
}
return this;
}
@Override
public IOStatisticsStore build() {
return new IOStatisticsStoreImpl(counters, gauges, minimums,

View File

@ -191,6 +191,37 @@ public static <S> RemoteIterator<S> closingRemoteIterator(
return new CloseRemoteIterator<>(iterator, toClose);
}
/**
* Wrap an iterator with one which adds a continuation probe.
* This allows work to exit fast without complicated breakout logic
* @param iterator source
* @param continueWork predicate which will trigger a fast halt if it returns false.
* @param <S> source type.
* @return a new iterator
*/
public static <S> RemoteIterator<S> haltableRemoteIterator(
final RemoteIterator<S> iterator,
final CallableRaisingIOE<Boolean> continueWork) {
return new HaltableRemoteIterator<>(iterator, continueWork);
}
/**
* A remote iterator which simply counts up, stopping once the
* value is greater than the value of {@code excludedFinish}.
* This is primarily for tests or when submitting work into a TaskPool.
* equivalent to
* <pre>
* for(long l = start, l &lt; excludedFinish; l++) yield l;
* </pre>
* @param start start value
* @param excludedFinish excluded finish
* @return an iterator which returns longs from [start, finish)
*/
public static RemoteIterator<Long> rangeExcludingIterator(
final long start, final long excludedFinish) {
return new RangeExcludingLongIterator(start, excludedFinish);
}
/**
* Build a list from a RemoteIterator.
* @param source source iterator
@ -391,10 +422,12 @@ public void close() throws IOException {
/**
* Wrapper of another remote iterator; IOStatistics
* and Closeable methods are passed down if implemented.
* This class may be subclassed within the hadoop codebase
* if custom iterators are needed.
* @param <S> source type
* @param <T> type of returned value
*/
private static abstract class WrappingRemoteIterator<S, T>
public static abstract class WrappingRemoteIterator<S, T>
implements RemoteIterator<T>, IOStatisticsSource, Closeable {
/**
@ -715,4 +748,93 @@ public void close() throws IOException {
}
}
}
/**
* An iterator which allows for a fast exit predicate.
* @param <S> source type
*/
private static final class HaltableRemoteIterator<S>
extends WrappingRemoteIterator<S, S> {
/**
* Probe as to whether work should continue.
*/
private final CallableRaisingIOE<Boolean> continueWork;
/**
* Wrap an iterator with one which adds a continuation probe.
* The probe will be called in the {@link #hasNext()} method, before
* the source iterator is itself checked and in {@link #next()}
* before retrieval.
* That is: it may be called multiple times per iteration.
* @param source source iterator.
* @param continueWork predicate which will trigger a fast halt if it returns false.
*/
private HaltableRemoteIterator(
final RemoteIterator<S> source,
final CallableRaisingIOE<Boolean> continueWork) {
super(source);
this.continueWork = continueWork;
}
@Override
public boolean hasNext() throws IOException {
return sourceHasNext();
}
@Override
public S next() throws IOException {
return sourceNext();
}
@Override
protected boolean sourceHasNext() throws IOException {
return continueWork.apply() && super.sourceHasNext();
}
}
/**
* A remote iterator which simply counts up, stopping once the
* value is greater than the finish.
* This is primarily for tests or when submitting work into a TaskPool.
*/
private static final class RangeExcludingLongIterator implements RemoteIterator<Long> {
/**
* Current value.
*/
private long current;
/**
* End value.
*/
private final long excludedFinish;
/**
* Construct.
* @param start start value.
* @param excludedFinish halt the iterator once the current value is equal
* to or greater than this.
*/
private RangeExcludingLongIterator(final long start, final long excludedFinish) {
this.current = start;
this.excludedFinish = excludedFinish;
}
@Override
public boolean hasNext() throws IOException {
return current < excludedFinish;
}
@Override
public Long next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
final long s = current;
current++;
return s;
}
}
}

View File

@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.statistics;
import java.util.Arrays;
import java.util.Collection;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.apache.hadoop.fs.statistics.impl.ForwardingIOStatisticsStore;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.test.AbstractHadoopTestBase;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticGauge;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMean;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMinimum;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
/**
* Test the {@link IOStatisticsSetters} interface implementations through
* a parameterized run with each implementation.
* For each of the setters, the value is set, verified,
* updated, verified again.
* An option known to be undefined in all created IOStatisticsStore instances
* is set, to verify it is harmless.
*/
@RunWith(Parameterized.class)
public class TestIOStatisticsSetters extends AbstractHadoopTestBase {
public static final String COUNTER = "counter";
public static final String GAUGE = "gauge";
public static final String MAXIMUM = "max";
public static final String MINIMUM = "min";
public static final String MEAN = "mean";
private final IOStatisticsSetters ioStatistics;
private final boolean createsNewEntries;
@Parameterized.Parameters(name="{0}")
public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{
{"IOStatisticsSnapshot", new IOStatisticsSnapshot(), true},
{"IOStatisticsStore", createTestStore(), false},
{"ForwardingIOStatisticsStore", new ForwardingIOStatisticsStore(createTestStore()), false},
});
}
/**
* Create a test store with the stats used for testing set up.
* @return a set up store
*/
private static IOStatisticsStore createTestStore() {
return iostatisticsStore()
.withCounters(COUNTER)
.withGauges(GAUGE)
.withMaximums(MAXIMUM)
.withMinimums(MINIMUM)
.withMeanStatistics(MEAN)
.build();
}
public TestIOStatisticsSetters(
String source,
IOStatisticsSetters ioStatisticsSetters,
boolean createsNewEntries) {
this.ioStatistics = ioStatisticsSetters;
this.createsNewEntries = createsNewEntries;
}
@Test
public void testCounter() throws Throwable {
// write
ioStatistics.setCounter(COUNTER, 1);
assertThatStatisticCounter(ioStatistics, COUNTER)
.isEqualTo(1);
// update
ioStatistics.setCounter(COUNTER, 2);
assertThatStatisticCounter(ioStatistics, COUNTER)
.isEqualTo(2);
// unknown value
final String unknown = "unknown";
ioStatistics.setCounter(unknown, 3);
if (createsNewEntries) {
assertThatStatisticCounter(ioStatistics, unknown)
.isEqualTo(3);
} else {
Assertions.assertThat(ioStatistics.counters())
.describedAs("Counter map in {}", ioStatistics)
.doesNotContainKey(unknown);
}
}
@Test
public void testMaximum() throws Throwable {
// write
ioStatistics.setMaximum(MAXIMUM, 1);
assertThatStatisticMaximum(ioStatistics, MAXIMUM)
.isEqualTo(1);
// update
ioStatistics.setMaximum(MAXIMUM, 2);
assertThatStatisticMaximum(ioStatistics, MAXIMUM)
.isEqualTo(2);
// unknown value
ioStatistics.setMaximum("mm2", 3);
}
@Test
public void testMinimum() throws Throwable {
// write
ioStatistics.setMinimum(MINIMUM, 1);
assertThatStatisticMinimum(ioStatistics, MINIMUM)
.isEqualTo(1);
// update
ioStatistics.setMinimum(MINIMUM, 2);
assertThatStatisticMinimum(ioStatistics, MINIMUM)
.isEqualTo(2);
// unknown value
ioStatistics.setMinimum("c2", 3);
}
@Test
public void testGauge() throws Throwable {
// write
ioStatistics.setGauge(GAUGE, 1);
assertThatStatisticGauge(ioStatistics, GAUGE)
.isEqualTo(1);
// update
ioStatistics.setGauge(GAUGE, 2);
assertThatStatisticGauge(ioStatistics, GAUGE)
.isEqualTo(2);
// unknown value
ioStatistics.setGauge("g2", 3);
}
@Test
public void testMean() throws Throwable {
// write
final MeanStatistic mean11 = new MeanStatistic(1, 1);
ioStatistics.setMeanStatistic(MEAN, mean11);
assertThatStatisticMean(ioStatistics, MEAN)
.isEqualTo(mean11);
// update
final MeanStatistic mean22 = new MeanStatistic(2, 2);
ioStatistics.setMeanStatistic(MEAN, mean22);
assertThatStatisticMean(ioStatistics, MEAN)
.isEqualTo(mean22);
// unknown value
ioStatistics.setMeanStatistic("m2", mean11);
}
}

View File

@ -22,8 +22,10 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.util.Preconditions;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -37,6 +39,7 @@
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.util.functional.RemoteIterators.*;
import static org.apache.hadoop.util.functional.RemoteIterators.haltableRemoteIterator;
import static org.assertj.core.api.Assertions.assertThat;
/**
@ -287,6 +290,44 @@ public void testJavaIterableCloseInNextLoop() throws Throwable {
}
@Test
public void testHaltableIterator() throws Throwable {
final int limit = 4;
AtomicInteger count = new AtomicInteger(limit);
// a countdown of 10, but the halting predicate will fail earlier
// if the value of "count" has dropped to zero
final RemoteIterator<Long> it =
haltableRemoteIterator(
rangeExcludingIterator(0, 10),
() -> count.get() > 0);
verifyInvoked(it, limit, (v) -> count.decrementAndGet());
}
@Test
public void testHaltableIteratorNoHalt() throws Throwable {
// a countdown of 10, but the halting predicate will fail earlier
// if the value of "count" has dropped to zero
final int finish = 10;
final RemoteIterator<Long> it =
haltableRemoteIterator(
rangeExcludingIterator(0, finish),
() -> true);
verifyInvoked(it, finish);
}
@Test
public void testRangeExcludingIterator() throws Throwable {
verifyInvoked(rangeExcludingIterator(0, 0), 0);
verifyInvoked(rangeExcludingIterator(0, -1), 0);
verifyInvoked(rangeExcludingIterator(0, 100), 100);
intercept(NoSuchElementException.class, () ->
rangeExcludingIterator(0, 0).next());
}
/**
* assert that the string value of an object contains the
* expected text.
@ -327,6 +368,19 @@ protected <T> void verifyInvoked(final RemoteIterator<T> it,
.isEqualTo(length);
}
/**
* Verify that the iteration completes with a given invocation count.
* @param it iterator
* @param <T> type.
* @param length expected size
*/
protected <T> void verifyInvoked(
final RemoteIterator<T> it,
final int length)
throws IOException {
verifyInvoked(it, length, (t) -> { });
}
/**
* Close an iterator if it is iterable.
* @param it iterator

View File

@ -58,6 +58,8 @@
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtDebug;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.CAPABILITY_DYNAMIC_PARTITIONING;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_DIAGNOSTICS_MANIFEST_DIR;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_IO_PROCESSORS;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_IO_PROCESSORS_DEFAULT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_SUMMARY_REPORT_DIR;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASKS_COMPLETED_COUNT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASKS_FAILED_COUNT;
@ -393,7 +395,9 @@ public void commitJob(final JobContext jobContext) throws IOException {
marker = result.getJobSuccessData();
// update the cached success with the new report.
setSuccessReport(marker);
// patch in the #of threads as it is useful
marker.putDiagnostic(OPT_IO_PROCESSORS,
conf.get(OPT_IO_PROCESSORS, Long.toString(OPT_IO_PROCESSORS_DEFAULT)));
} catch (IOException e) {
// failure. record it for the summary
failure = e;
@ -688,7 +692,7 @@ public String toString() {
* to date.
* The report will updated with the current active stage,
* and if {@code thrown} is non-null, it will be added to the
* diagnistics (and the job tagged as a failure).
* diagnostics (and the job tagged as a failure).
* Static for testability.
* @param activeStage active stage
* @param config configuration to use.

View File

@ -148,6 +148,11 @@ public final class ManifestCommitterConfig implements IOStatisticsSource {
*/
private final boolean deleteTargetPaths;
/**
* Entry writer queue capacity.
*/
private final int writerQueueCapacity;
/**
* Constructor.
* @param outputPath destination path of the job.
@ -190,6 +195,9 @@ public final class ManifestCommitterConfig implements IOStatisticsSource {
this.deleteTargetPaths = conf.getBoolean(
OPT_DELETE_TARGET_FILES,
OPT_DELETE_TARGET_FILES_DEFAULT);
this.writerQueueCapacity = conf.getInt(
OPT_WRITER_QUEUE_CAPACITY,
DEFAULT_WRITER_QUEUE_CAPACITY);
// if constructed with a task attempt, build the task ID and path.
if (context instanceof TaskAttemptContext) {
@ -251,6 +259,8 @@ FileSystem getDestinationFileSystem() throws IOException {
StageConfig createStageConfig() {
StageConfig stageConfig = new StageConfig();
stageConfig
.withConfiguration(conf)
.withDeleteTargetPaths(deleteTargetPaths)
.withIOStatistics(iostatistics)
.withJobAttemptNumber(jobAttemptNumber)
.withJobDirectories(dirs)
@ -262,8 +272,7 @@ StageConfig createStageConfig() {
.withTaskAttemptDir(taskAttemptDir)
.withTaskAttemptId(taskAttemptId)
.withTaskId(taskId)
.withDeleteTargetPaths(deleteTargetPaths);
.withWriterQueueCapacity(writerQueueCapacity);
return stageConfig;
}
@ -323,6 +332,14 @@ public String getName() {
return name;
}
/**
* Get writer queue capacity.
* @return the queue capacity
*/
public int getWriterQueueCapacity() {
return writerQueueCapacity;
}
@Override
public IOStatisticsStore getIOStatistics() {
return iostatistics;

View File

@ -151,7 +151,7 @@ public final class ManifestCommitterConstants {
/**
* Default value: {@value}.
*/
public static final int OPT_IO_PROCESSORS_DEFAULT = 64;
public static final int OPT_IO_PROCESSORS_DEFAULT = 32;
/**
* Directory for saving job summary reports.
@ -240,6 +240,26 @@ public final class ManifestCommitterConstants {
public static final String CAPABILITY_DYNAMIC_PARTITIONING =
"mapreduce.job.committer.dynamic.partitioning";
/**
* Queue capacity between task manifest loading an entry file writer.
* If more than this number of manifest lists are waiting to be written,
* the enqueue is blocking.
* There's an expectation that writing to the local file is a lot faster
* than the parallelized buffer reads, therefore that this queue can
* be emptied at the same rate it is filled.
* Value {@value}.
*/
public static final String OPT_WRITER_QUEUE_CAPACITY =
OPT_PREFIX + "writer.queue.capacity";
/**
* Default value of {@link #OPT_WRITER_QUEUE_CAPACITY}.
* Value {@value}.
*/
public static final int DEFAULT_WRITER_QUEUE_CAPACITY = OPT_IO_PROCESSORS_DEFAULT;
private ManifestCommitterConstants() {
}

View File

@ -34,6 +34,9 @@ public final class DiagnosticKeys {
public static final String STAGE = "stage";
public static final String EXCEPTION = "exception";
public static final String STACKTRACE = "stacktrace";
public static final String TOTAL_MEMORY = "total.memory";
public static final String FREE_MEMORY = "free.memory";
public static final String HEAP_MEMORY = "heap.memory";
/** Directory where manifests were renamed: {@value}. */

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.lib.output.committer.manifest.files;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.Objects;
@ -28,6 +30,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData.marshallPath;
@ -37,12 +40,13 @@
/**
* A directory entry in the task manifest.
* Uses shorter field names for smaller files.
* Hash and equals are on dir name only; there's no real expectation
* that those operations are needed.
* Hash and equals are on dir name only.
* Can be serialized as a java object, json object
* or hadoop writable.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public final class DirEntry implements Serializable {
public final class DirEntry implements Serializable, Writable {
private static final long serialVersionUID = 5658520530209859765L;
@ -65,7 +69,7 @@ public final class DirEntry implements Serializable {
private int level;
/**
* Constructor only for use by jackson.
* Constructor for use by jackson/writable.
* Do Not Delete.
*/
private DirEntry() {
@ -177,6 +181,20 @@ public int hashCode() {
return Objects.hash(dir);
}
@Override
public void write(final DataOutput out) throws IOException {
out.writeUTF(dir);
out.writeInt(type);
out.writeInt(level);
}
@Override
public void readFields(final DataInput in) throws IOException {
dir = in.readUTF();
type = in.readInt();
level = in.readInt();
}
/**
* A directory entry.
* @param dest destination path.

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.lib.output.committer.manifest.files;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.Objects;
@ -29,7 +31,11 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData.marshallPath;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData.unmarshallPath;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.AbstractManifestData.verify;
@ -37,12 +43,14 @@
/**
* A File entry in the task manifest.
* Uses shorter field names for smaller files.
* Used as a Hadoop writable when saved to in intermediate file
* during job commit.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
@JsonInclude(JsonInclude.Include.NON_NULL)
public final class FileEntry implements Serializable {
public final class FileEntry implements Serializable, Writable {
private static final long serialVersionUID = -550288489009777867L;
@ -62,10 +70,10 @@ public final class FileEntry implements Serializable {
private String etag;
/**
* Constructor only for use by jackson.
* Constructor for serialization/deserialization.
* Do Not Delete.
*/
private FileEntry() {
public FileEntry() {
}
/**
@ -176,9 +184,10 @@ public boolean equals(Object o) {
return false;
}
FileEntry that = (FileEntry) o;
return size == that.size && source.equals(that.source) && dest.equals(
that.dest) &&
Objects.equals(etag, that.etag);
return size == that.size
&& Objects.equals(source, that.source)
&& Objects.equals(dest, that.dest)
&& Objects.equals(etag, that.etag);
}
@Override
@ -186,4 +195,19 @@ public int hashCode() {
return Objects.hash(source, dest);
}
@Override
public void write(final DataOutput out) throws IOException {
Text.writeString(out, requireNonNull(source, "null source"));
Text.writeString(out, requireNonNull(dest, "null dest"));
Text.writeString(out, etag != null ? etag : "");
WritableUtils.writeVLong(out, size);
}
@Override
public void readFields(final DataInput in) throws IOException {
source = Text.readString(in);
dest = Text.readString(in);
etag = Text.readString(in);
size = WritableUtils.readVLong(in);
}
}

View File

@ -0,0 +1,569 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.util.functional.FutureIO;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.util.Preconditions.checkState;
/**
* Read or write entry file.
* This can be used to create a simple reader, or to create
* a writer queue where different threads can queue data for
* writing.
* The entry file is a SequenceFile with KV = {NullWritable, FileEntry};
*/
public class EntryFileIO {
private static final Logger LOG = LoggerFactory.getLogger(
EntryFileIO.class);
/**
* How long should the writer shutdown take?
*/
public static final int WRITER_SHUTDOWN_TIMEOUT_SECONDS = 60;
/**
* How long should trying to queue a write block before giving up
* with an error?
* This is a safety feature to ensure that if something has gone wrong
* in the queue code the job fails with an error rather than just hangs
*/
public static final int WRITER_QUEUE_PUT_TIMEOUT_MINUTES = 10;
/** Configuration used to load filesystems. */
private final Configuration conf;
/**
* Constructor.
* @param conf Configuration used to load filesystems
*/
public EntryFileIO(final Configuration conf) {
this.conf = conf;
}
/**
* Create a writer to a local file.
* @param file file
* @return the writer
* @throws IOException failure to create the file
*/
public SequenceFile.Writer createWriter(File file) throws IOException {
return createWriter(toPath(file));
}
/**
* Create a writer to a file on any FS.
* @param path path to write to.
* @return the writer
* @throws IOException failure to create the file
*/
public SequenceFile.Writer createWriter(Path path) throws IOException {
return SequenceFile.createWriter(conf,
SequenceFile.Writer.file(path),
SequenceFile.Writer.keyClass(NullWritable.class),
SequenceFile.Writer.valueClass(FileEntry.class));
}
/**
* Reader is created with sequential reads.
* @param file file
* @return the reader
* @throws IOException failure to open
*/
public SequenceFile.Reader createReader(File file) throws IOException {
return createReader(toPath(file));
}
/**
* Reader is created with sequential reads.
* @param path path
* @return the reader
* @throws IOException failure to open
*/
public SequenceFile.Reader createReader(Path path) throws IOException {
return new SequenceFile.Reader(conf,
SequenceFile.Reader.file(path));
}
/**
* Iterator to retrieve file entries from the sequence file.
* Closeable: cast and invoke to close the reader.
* @param reader reader;
* @return iterator
*/
public RemoteIterator<FileEntry> iterateOver(SequenceFile.Reader reader) {
return new EntryIterator(reader);
}
/**
* Create and start an entry writer.
* @param writer writer
* @param capacity queue capacity
* @return the writer.
*/
public EntryWriter launchEntryWriter(SequenceFile.Writer writer, int capacity) {
final EntryWriter ew = new EntryWriter(writer, capacity);
ew.start();
return ew;
}
/**
* Write a sequence of entries to the writer.
* @param writer writer
* @param entries entries
* @param close close the stream afterwards
* @return number of entries written
* @throws IOException write failure.
*/
public static int write(SequenceFile.Writer writer,
Collection<FileEntry> entries,
boolean close)
throws IOException {
try {
for (FileEntry entry : entries) {
writer.append(NullWritable.get(), entry);
}
writer.flush();
} finally {
if (close) {
writer.close();
}
}
return entries.size();
}
/**
* Given a file, create a Path.
* @param file file
* @return path to the file
*/
public static Path toPath(final File file) {
return new Path(file.toURI());
}
/**
* Actions in the queue.
*/
private enum Actions {
/** Write the supplied list of entries. */
write,
/** Stop the processor thread. */
stop
}
/**
* What gets queued: an action and a list of entries.
*/
private static final class QueueEntry {
private final Actions action;
private final List<FileEntry> entries;
private QueueEntry(final Actions action, List<FileEntry> entries) {
this.action = action;
this.entries = entries;
}
private QueueEntry(final Actions action) {
this(action, null);
}
}
/**
* A Writer thread takes reads from a queue containing
* list of entries to save; these are serialized via the writer to
* the output stream.
* Other threads can queue the file entry lists from loaded manifests
* for them to be written.
* These threads will be blocked when the queue capacity is reached.
* This is quite a complex process, with the main troublespots in the code
* being:
* - managing the shutdown
* - failing safely on write failures, restarting all blocked writers in the process
*/
public static final class EntryWriter implements Closeable {
/**
* The destination of the output.
*/
private final SequenceFile.Writer writer;
/**
* Blocking queue of actions.
*/
private final BlockingQueue<QueueEntry> queue;
/**
* stop flag.
*/
private final AtomicBoolean stop = new AtomicBoolean(false);
/**
* Is the processor thread active.
*/
private final AtomicBoolean active = new AtomicBoolean(false);
private final int capacity;
/**
* Executor of writes.
*/
private ExecutorService executor;
/**
* Future invoked.
*/
private Future<Integer> future;
/**
* count of file entries saved; only updated in one thread
* so volatile.
*/
private final AtomicInteger count = new AtomicInteger();
/**
* Any failure caught on the writer thread; this should be
* raised within the task/job thread as it implies that the
* entire write has failed.
*/
private final AtomicReference<IOException> failure = new AtomicReference<>();
/**
* Create.
* @param writer writer
* @param capacity capacity.
*/
private EntryWriter(SequenceFile.Writer writer, int capacity) {
checkState(capacity > 0, "invalid queue capacity %s", capacity);
this.writer = requireNonNull(writer);
this.capacity = capacity;
this.queue = new ArrayBlockingQueue<>(capacity);
}
/**
* Is the writer active?
* @return true if the processor thread is live
*/
public boolean isActive() {
return active.get();
}
/**
* Get count of files processed.
* @return the count
*/
public int getCount() {
return count.get();
}
/**
* Any failure.
* @return any IOException caught when writing the output
*/
public IOException getFailure() {
return failure.get();
}
/**
* Start the thread.
*/
private void start() {
checkState(executor == null, "already started");
active.set(true);
executor = HadoopExecutors.newSingleThreadExecutor();
future = executor.submit(this::processor);
LOG.debug("Started entry writer {}", this);
}
/**
* Add a list of entries to the queue.
* @param entries entries.
* @return whether the queue worked.
*/
public boolean enqueue(List<FileEntry> entries) {
if (entries.isEmpty()) {
LOG.debug("ignoring enqueue of empty list");
// exit fast, but return true.
return true;
}
if (active.get()) {
try {
LOG.debug("Queueing {} entries", entries.size());
final boolean enqueued = queue.offer(new QueueEntry(Actions.write, entries),
WRITER_QUEUE_PUT_TIMEOUT_MINUTES, TimeUnit.MINUTES);
if (!enqueued) {
LOG.warn("Timeout submitting entries to {}", this);
}
return enqueued;
} catch (InterruptedException e) {
Thread.interrupted();
return false;
}
} else {
LOG.warn("EntryFile write queue inactive; discarding {} entries submitted to {}",
entries.size(), this);
return false;
}
}
/**
* Queue and process entries until done.
* @return count of entries written.
* @throws UncheckedIOException on write failure
*/
private int processor() {
Thread.currentThread().setName("EntryIOWriter");
try {
while (!stop.get()) {
final QueueEntry queueEntry = queue.take();
switch (queueEntry.action) {
case stop: // stop the operation
LOG.debug("Stop processing");
stop.set(true);
break;
case write: // write data
default: // here to shut compiler up
// write
final List<FileEntry> entries = queueEntry.entries;
LOG.debug("Adding block of {} entries", entries.size());
for (FileEntry entry : entries) {
append(entry);
}
break;
}
}
} catch (IOException e) {
LOG.debug("Write failure", e);
failure.set(e);
throw new UncheckedIOException(e);
} catch (InterruptedException e) {
// being stopped implicitly
LOG.debug("interrupted", e);
} finally {
stop.set(true);
active.set(false);
// clear the queue, so wake up on any failure mode.
queue.clear();
}
return count.get();
}
/**
* write one entry.
* @param entry entry to write
* @throws IOException on write failure
*/
private void append(FileEntry entry) throws IOException {
writer.append(NullWritable.get(), entry);
final int c = count.incrementAndGet();
LOG.trace("Added entry #{}: {}", c, entry);
}
/**
* Close: stop accepting new writes, wait for queued writes to complete.
* @throws IOException failure closing that writer, or somehow the future
* raises an IOE which isn't caught for later.
*/
@Override
public void close() throws IOException {
// declare as inactive.
// this stops queueing more data, but leaves
// the worker thread still polling and writing.
if (!active.getAndSet(false)) {
// already stopped
return;
}
LOG.debug("Shutting down writer; entry lists in queue: {}",
capacity - queue.remainingCapacity());
// signal queue closure by queuing a stop option.
// this is added at the end of the list of queued blocks,
// of which are written.
try {
queue.put(new QueueEntry(Actions.stop));
} catch (InterruptedException e) {
Thread.interrupted();
}
try {
// wait for the op to finish.
int total = FutureIO.awaitFuture(future, WRITER_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
LOG.debug("Processed {} files", total);
executor.shutdown();
} catch (TimeoutException e) {
LOG.warn("Timeout waiting for write thread to finish");
// trouble. force close
executor.shutdownNow();
// close the stream
} finally {
writer.close();
}
}
/**
* Raise any IOException caught during execution of the writer thread.
* @throws IOException if one was caught and saved.
*/
public void maybeRaiseWriteException() throws IOException {
final IOException f = failure.get();
if (f != null) {
throw f;
}
}
@Override
public String toString() {
return "EntryWriter{" +
"stop=" + stop.get() +
", active=" + active.get() +
", count=" + count.get() +
", queue depth=" + queue.size() +
", failure=" + failure +
'}';
}
}
/**
* Iterator to retrieve file entries from the sequence file.
* Closeable; it will close automatically when the last element is read.
* No thread safety.
*/
@VisibleForTesting
static final class EntryIterator implements RemoteIterator<FileEntry>, Closeable {
private final SequenceFile.Reader reader;
private FileEntry fetched;
private boolean closed;
private int count;
/**
* Create an iterator.
* @param reader the file to read from.
*/
private EntryIterator(final SequenceFile.Reader reader) {
this.reader = requireNonNull(reader);
}
@Override
public void close() throws IOException {
if (!closed) {
closed = true;
reader.close();
}
}
@Override
public String toString() {
return "EntryIterator{" +
"closed=" + closed +
", count=" + count +
", fetched=" + fetched +
'}';
}
@Override
public boolean hasNext() throws IOException {
return fetched != null || fetchNext();
}
/**
* Fetch the next entry.
* If there is none, then the reader is closed before `false`
* is returned.
* @return true if a record was retrieved.
* @throws IOException IO failure.
*/
private boolean fetchNext() throws IOException {
FileEntry readBack = new FileEntry();
if (reader.next(NullWritable.get(), readBack)) {
fetched = readBack;
count++;
return true;
} else {
fetched = null;
close();
return false;
}
}
@Override
public FileEntry next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
final FileEntry r = fetched;
fetched = null;
return r;
}
/**
* Is the stream closed.
* @return true if closed.
*/
public boolean isClosed() {
return closed;
}
int getCount() {
return count;
}
}
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
import java.io.File;
import java.util.Collection;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DirEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage;
import static java.util.Objects.requireNonNull;
/**
* Information about the loaded manifest data;
* Returned from {@link LoadManifestsStage} and then
* used for renaming the work.
*/
public final class LoadedManifestData {
/**
* Directories.
*/
private final Collection<DirEntry> directories;
/**
* Path of the intermediate cache of
* files to rename.
* This will be a sequence file of long -> FileEntry
*/
private final Path entrySequenceData;
/**
* How many files will be renamed.
*/
private final int fileCount;
/**
* Data about the loaded manifests.
* @param directories directories
* @param entrySequenceData Path in local fs to the entry sequence data.
* @param fileCount number of files.
*/
public LoadedManifestData(
final Collection<DirEntry> directories,
final Path entrySequenceData,
final int fileCount) {
this.directories = requireNonNull(directories);
this.fileCount = fileCount;
this.entrySequenceData = requireNonNull(entrySequenceData);
}
public Collection<DirEntry> getDirectories() {
return directories;
}
public int getFileCount() {
return fileCount;
}
/**
* Get the path to the entry sequence data file.
* @return the path
*/
public Path getEntrySequenceData() {
return entrySequenceData;
}
/**
* Get the entry sequence data as a file.
*/
public File getEntrySequenceFile() {
return new File(entrySequenceData.toUri());
}
/**
* Delete the entry sequence file.
* @return whether or not the delete was successful.
*/
public boolean deleteEntrySequenceFile() {
return getEntrySequenceFile().delete();
}
}

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
import org.apache.hadoop.fs.statistics.IOStatisticsSetters;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStoreBuilder;
import org.apache.hadoop.mapreduce.JobContext;
@ -57,8 +58,11 @@
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SPARK_WRITE_UUID;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SUMMARY_FILENAME_FORMAT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.TMP_SUFFIX;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DiagnosticKeys.FREE_MEMORY;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DiagnosticKeys.HEAP_MEMORY;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DiagnosticKeys.PRINCIPAL;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DiagnosticKeys.STAGE;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DiagnosticKeys.TOTAL_MEMORY;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.COUNTER_STATISTICS;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.InternalConstants.DURATION_STATISTICS;
@ -82,10 +86,7 @@ public static IOStatisticsStoreBuilder createIOStatisticsStore() {
final IOStatisticsStoreBuilder store
= iostatisticsStore();
store.withCounters(COUNTER_STATISTICS);
store.withMaximums(COUNTER_STATISTICS);
store.withMinimums(COUNTER_STATISTICS);
store.withMeanStatistics(COUNTER_STATISTICS);
store.withSampleTracking(COUNTER_STATISTICS);
store.withDurationTracking(DURATION_STATISTICS);
return store;
}
@ -224,6 +225,21 @@ public static ManifestSuccessData createManifestOutcome(
return outcome;
}
/**
* Add heap information to IOStatisticSetters gauges, with a stage in front of every key.
* @param ioStatisticsSetters map to update
* @param stage stage
*/
public static void addHeapInformation(IOStatisticsSetters ioStatisticsSetters,
String stage) {
final long totalMemory = Runtime.getRuntime().totalMemory();
final long freeMemory = Runtime.getRuntime().freeMemory();
final String prefix = "stage.";
ioStatisticsSetters.setGauge(prefix + stage + "." + TOTAL_MEMORY, totalMemory);
ioStatisticsSetters.setGauge(prefix + stage + "." + FREE_MEMORY, freeMemory);
ioStatisticsSetters.setGauge(prefix + stage + "." + HEAP_MEMORY, totalMemory - freeMemory);
}
/**
* Create the filename for a report from the jobID.
* @param jobId jobId

View File

@ -161,7 +161,7 @@ protected AbstractJobOrTaskStage(
getRequiredTaskAttemptId();
getRequiredTaskAttemptDir();
stageName = String.format("[Task-Attempt %s]", getRequiredTaskAttemptId());
} else {
} else {
stageName = String.format("[Job-Attempt %s/%02d]",
stageConfig.getJobId(),
stageConfig.getJobAttemptNumber());
@ -312,6 +312,15 @@ private void noteAnyRateLimiting(String statistic, Duration wait) {
}
}
/**
* Get the operations callbacks.
* @return the operations invocable against the destination.
*/
public ManifestStoreOperations getOperations() {
return operations;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
@ -677,11 +686,18 @@ protected boolean storeSupportsResilientCommit() {
return operations.storeSupportsResilientCommit();
}
/**
* Maybe delete the destination.
* This routine is optimized for the data not existing, as HEAD seems to cost less
* than a DELETE; assuming most calls don't have data, this is faster.
* @param deleteDest should an attempt to delete the dest be made?
* @param dest destination path
* @throws IOException IO failure, including permissions.
*/
private void maybeDeleteDest(final boolean deleteDest, final Path dest) throws IOException {
if (deleteDest) {
// delete the destination, always, knowing that it's a no-op if
// the data isn't there. Skipping the change saves one round trip
// to actually look for the file/object
if (deleteDest && getFileStatusOrNull(dest) != null) {
boolean deleted = delete(dest, true);
// log the outcome in case of emergency diagnostics traces
// being needed.

View File

@ -295,7 +295,7 @@ public static final class Arguments {
* @param statisticName stage name to report
* @param enabled is the stage enabled?
* @param deleteTaskAttemptDirsInParallel delete task attempt dirs in
* parallel?
* parallel?
* @param suppressExceptions suppress exceptions?
*/
public Arguments(

View File

@ -18,27 +18,31 @@
package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
import java.io.File;
import java.io.IOException;
import java.util.List;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.LoadedManifestData;
import static java.util.Objects.requireNonNull;
import static org.apache.commons.io.FileUtils.byteCountToDisplaySize;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_BYTES_COMMITTED_COUNT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_FILES_COMMITTED_COUNT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_COMMIT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_CREATE_TARGET_DIRS;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_LOAD_MANIFESTS;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_RENAME_FILES;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DiagnosticKeys.MANIFESTS;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.addHeapInformation;
/**
* Commit the Job.
@ -67,98 +71,119 @@ protected CommitJobStage.Result executeStage(
getJobId(),
storeSupportsResilientCommit());
boolean createMarker = arguments.isCreateMarker();
// once the manifest has been loaded, a temp file needs to be
// deleted; so track the value.
LoadedManifestData loadedManifestData = null;
// load the manifests
final StageConfig stageConfig = getStageConfig();
LoadManifestsStage.Result result
= new LoadManifestsStage(stageConfig).apply(true);
List<TaskManifest> manifests = result.getManifests();
LoadManifestsStage.SummaryInfo summary = result.getSummary();
try {
boolean createMarker = arguments.isCreateMarker();
IOStatisticsSnapshot heapInfo = new IOStatisticsSnapshot();
addHeapInformation(heapInfo, "setup");
// load the manifests
final StageConfig stageConfig = getStageConfig();
LoadManifestsStage.Result result = new LoadManifestsStage(stageConfig).apply(
new LoadManifestsStage.Arguments(
File.createTempFile("manifest", ".list"),
/* do not cache manifests */
stageConfig.getWriterQueueCapacity()));
LoadManifestsStage.SummaryInfo loadedManifestSummary = result.getSummary();
loadedManifestData = result.getLoadedManifestData();
LOG.debug("{}: Job Summary {}", getName(), summary);
LOG.info("{}: Committing job with file count: {}; total size {} bytes",
getName(),
summary.getFileCount(),
byteCountToDisplaySize(summary.getTotalFileSize()));
LOG.debug("{}: Job Summary {}", getName(), loadedManifestSummary);
LOG.info("{}: Committing job with file count: {}; total size {} bytes",
getName(),
loadedManifestSummary.getFileCount(),
String.format("%,d", loadedManifestSummary.getTotalFileSize()));
addHeapInformation(heapInfo, OP_STAGE_JOB_LOAD_MANIFESTS);
// add in the manifest statistics to our local IOStatistics for
// reporting.
IOStatisticsStore iostats = getIOStatistics();
iostats.aggregate(loadedManifestSummary.getIOStatistics());
// add in the manifest statistics to our local IOStatistics for
// reporting.
IOStatisticsStore iostats = getIOStatistics();
iostats.aggregate(summary.getIOStatistics());
// prepare destination directories.
final CreateOutputDirectoriesStage.Result dirStageResults =
new CreateOutputDirectoriesStage(stageConfig)
.apply(loadedManifestData.getDirectories());
addHeapInformation(heapInfo, OP_STAGE_JOB_CREATE_TARGET_DIRS);
// prepare destination directories.
final CreateOutputDirectoriesStage.Result dirStageResults =
new CreateOutputDirectoriesStage(stageConfig)
.apply(manifests);
// commit all the tasks.
// The success data includes a snapshot of the IO Statistics
// and hence all aggregate stats from the tasks.
ManifestSuccessData successData;
successData = new RenameFilesStage(stageConfig).apply(
Pair.of(manifests, dirStageResults.getCreatedDirectories()));
if (LOG.isDebugEnabled()) {
LOG.debug("{}: _SUCCESS file summary {}", getName(), successData.toJson());
}
// update the counter of bytes committed and files.
// use setCounter so as to ignore any values accumulated when
// aggregating tasks.
iostats.setCounter(
COMMITTER_FILES_COMMITTED_COUNT,
summary.getFileCount());
iostats.setCounter(
COMMITTER_BYTES_COMMITTED_COUNT,
summary.getTotalFileSize());
successData.snapshotIOStatistics(iostats);
// rename manifests. Only warn on failure here.
final String manifestRenameDir = arguments.getManifestRenameDir();
if (isNotBlank(manifestRenameDir)) {
Path manifestRenamePath = new Path(
new Path(manifestRenameDir),
getJobId());
LOG.info("{}: Renaming manifests to {}", getName(), manifestRenamePath);
try {
renameDir(getTaskManifestDir(), manifestRenamePath);
// save this path in the summary diagnostics
successData.getDiagnostics().put(MANIFESTS, manifestRenamePath.toUri().toString());
} catch (IOException | IllegalArgumentException e) {
// rename failure, including path for wrong filesystem
LOG.warn("{}: Failed to rename manifests to {}", getName(), manifestRenamePath, e);
// commit all the tasks.
// The success data includes a snapshot of the IO Statistics
// and hence all aggregate stats from the tasks.
ManifestSuccessData successData;
successData = new RenameFilesStage(stageConfig).apply(
Triple.of(loadedManifestData,
dirStageResults.getCreatedDirectories(),
stageConfig.getSuccessMarkerFileLimit()));
if (LOG.isDebugEnabled()) {
LOG.debug("{}: _SUCCESS file summary {}", getName(), successData.toJson());
}
addHeapInformation(heapInfo, OP_STAGE_JOB_RENAME_FILES);
// update the counter of bytes committed and files.
// use setCounter so as to ignore any values accumulated when
// aggregating tasks.
iostats.setCounter(
COMMITTER_FILES_COMMITTED_COUNT,
loadedManifestSummary.getFileCount());
iostats.setCounter(
COMMITTER_BYTES_COMMITTED_COUNT,
loadedManifestSummary.getTotalFileSize());
successData.snapshotIOStatistics(iostats);
successData.getIOStatistics().aggregate(heapInfo);
// rename manifests. Only warn on failure here.
final String manifestRenameDir = arguments.getManifestRenameDir();
if (isNotBlank(manifestRenameDir)) {
Path manifestRenamePath = new Path(
new Path(manifestRenameDir),
getJobId());
LOG.info("{}: Renaming manifests to {}", getName(), manifestRenamePath);
try {
renameDir(getTaskManifestDir(), manifestRenamePath);
// save this path in the summary diagnostics
successData.getDiagnostics().put(MANIFESTS, manifestRenamePath.toUri().toString());
} catch (IOException | IllegalArgumentException e) {
// rename failure, including path for wrong filesystem
LOG.warn("{}: Failed to rename manifests to {}", getName(), manifestRenamePath, e);
}
}
// save the _SUCCESS if the option is enabled.
Path successPath = null;
if (createMarker) {
// save a snapshot of the IO Statistics
successPath = new SaveSuccessFileStage(stageConfig)
.apply(successData);
LOG.debug("{}: Saving _SUCCESS file to {}", getName(), successPath);
}
// optional cleanup
new CleanupJobStage(stageConfig).apply(arguments.getCleanupArguments());
// and then, after everything else: optionally validate.
if (arguments.isValidateOutput()) {
// cache and restore the active stage field
LOG.info("{}: Validating output.", getName());
new ValidateRenamedFilesStage(stageConfig)
.apply(loadedManifestData.getEntrySequenceData());
}
// restore the active stage so that when the report is saved
// it is declared as job commit, not cleanup or validate.
stageConfig.enterStage(getStageName(arguments));
// the result
return new Result(successPath, successData);
} finally {
// cleanup; return code is ignored.
if (loadedManifestData != null) {
loadedManifestData.deleteEntrySequenceFile();
}
}
// save the _SUCCESS if the option is enabled.
Path successPath = null;
if (createMarker) {
// save a snapshot of the IO Statistics
successPath = new SaveSuccessFileStage(stageConfig)
.apply(successData);
LOG.debug("{}: Saving _SUCCESS file to {}", getName(), successPath);
}
// optional cleanup
new CleanupJobStage(stageConfig).apply(arguments.getCleanupArguments());
// and then, after everything else: optionally validate.
if (arguments.isValidateOutput()) {
// cache and restore the active stage field
LOG.info("{}: Validating output.", getName());
new ValidateRenamedFilesStage(stageConfig)
.apply(result.getManifests());
}
// restore the active stage so that when the report is saved
// it is declared as job commit, not cleanup or validate.
stageConfig.enterStage(getStageName(arguments));
// the result
return new CommitJobStage.Result(successPath, successData);
}
/**

View File

@ -21,7 +21,9 @@
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -35,9 +37,9 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DirEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.EntryStatus;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.util.functional.TaskPool;
import static java.util.Objects.requireNonNull;
@ -75,16 +77,14 @@
*/
public class CreateOutputDirectoriesStage extends
AbstractJobOrTaskStage<
List<TaskManifest>,
Collection<DirEntry>,
CreateOutputDirectoriesStage.Result> {
private static final Logger LOG = LoggerFactory.getLogger(
CreateOutputDirectoriesStage.class);
/**
* Directories as a map of (path, path).
* Using a map rather than any set for efficient concurrency; the
* concurrent sets don't do lookups so fast.
* Directories as a map of (path, DirMapState).
*/
private final Map<Path, DirMapState> dirMap = new ConcurrentHashMap<>();
@ -101,20 +101,20 @@ public CreateOutputDirectoriesStage(final StageConfig stageConfig) {
@Override
protected Result executeStage(
final List<TaskManifest> taskManifests)
final Collection<DirEntry> manifestDirs)
throws IOException {
final List<Path> directories = createAllDirectories(taskManifests);
final List<Path> directories = createAllDirectories(manifestDirs);
LOG.debug("{}: Created {} directories", getName(), directories.size());
return new Result(new HashSet<>(directories), dirMap);
}
/**
* For each task, build the list of directories it wants.
* @param taskManifests task manifests
* Build the list of directories to create.
* @param manifestDirs dir entries from the manifests
* @return the list of paths which have been created.
*/
private List<Path> createAllDirectories(final List<TaskManifest> taskManifests)
private List<Path> createAllDirectories(final Collection<DirEntry> manifestDirs)
throws IOException {
// all directories which need to exist across all
@ -128,32 +128,27 @@ private List<Path> createAllDirectories(final List<TaskManifest> taskManifests)
// will be created at that path.
final Set<Path> filesToDelete = new HashSet<>();
// iterate through the task manifests
// and all output dirs into the set of dirs to
// create.
// hopefully there is a lot of overlap, so the
// final number of dirs to create is small.
for (TaskManifest task: taskManifests) {
final List<DirEntry> destDirectories = task.getDestDirectories();
Collections.sort(destDirectories, (o1, o2) ->
o1.getLevel() - o2.getLevel());
for (DirEntry entry: destDirectories) {
// add the dest entry
final Path path = entry.getDestPath();
if (!leaves.containsKey(path)) {
leaves.put(path, entry);
// sort the values of dir map by directory level: parent dirs will
// come first in the sorting
List<DirEntry> destDirectories = new ArrayList<>(manifestDirs);
// if it is a file to delete, record this.
if (entry.getStatus() == EntryStatus.file) {
filesToDelete.add(path);
}
final Path parent = path.getParent();
if (parent != null && leaves.containsKey(parent)) {
// there's a parent dir, move it from the leaf list
// to parent list
parents.put(parent,
leaves.remove(parent));
}
Collections.sort(destDirectories, Comparator.comparingInt(DirEntry::getLevel));
// iterate through the directory map
for (DirEntry entry: destDirectories) {
// add the dest entry
final Path path = entry.getDestPath();
if (!leaves.containsKey(path)) {
leaves.put(path, entry);
// if it is a file to delete, record this.
if (entry.getStatus() == EntryStatus.file) {
filesToDelete.add(path);
}
final Path parent = path.getParent();
if (parent != null && leaves.containsKey(parent)) {
// there's a parent dir, move it from the leaf list
// to parent list
parents.put(parent, leaves.remove(parent));
}
}
}
@ -168,7 +163,9 @@ private List<Path> createAllDirectories(final List<TaskManifest> taskManifests)
// Now the real work.
final int createCount = leaves.size();
LOG.info("Preparing {} directory/directories", createCount);
LOG.info("Preparing {} directory/directories; {} parent dirs implicitly created",
createCount, parents.size());
// now probe for and create the leaf dirs, which are those at the
// bottom level
Duration d = measureDurationOfInvocation(getIOStatistics(), OP_CREATE_DIRECTORIES, () ->
@ -188,7 +185,7 @@ private List<Path> createAllDirectories(final List<TaskManifest> taskManifests)
/**
* report a single directory failure.
* @param path path which could not be deleted
* @param dirEntry dir which could not be deleted
* @param e exception raised.
*/
private void reportMkDirFailure(DirEntry dirEntry, Exception e) {
@ -246,6 +243,7 @@ private void deleteDirWithFile(Path dir) throws IOException {
* and, if the operation took place, the list of created dirs.
* Reports progress on invocation.
* @param dirEntry entry
* @throws PathIOException if after multiple attempts, the dest dir couldn't be created.
* @throws IOException failure.
*/
private void createOneDirectory(final DirEntry dirEntry) throws IOException {
@ -274,9 +272,17 @@ private void createOneDirectory(final DirEntry dirEntry) throws IOException {
* Try to efficiently and robustly create a directory in a method which is
* expected to be executed in parallel with operations creating
* peer directories.
* @param path path to create
* @return true if dir created/found
* @throws IOException IO Failure.
* A return value of {@link DirMapState#dirWasCreated} or
* {@link DirMapState#dirCreatedOnSecondAttempt} indicates
* this thread did the creation.
* Other outcomes imply it already existed; if the directory
* cannot be created/found then a {@link PathIOException} is thrown.
* The outcome should be added to the {@link #dirMap} to avoid further creation attempts.
* @param dirEntry dir to create
* @return Outcome of the operation, such as whether the entry was created, found in store.
* It will always be a success outcome of some form.
* @throws PathIOException if after multiple attempts, the dest dir couldn't be created.
* @throws IOException Other IO failure
*/
private DirMapState maybeCreateOneDirectory(DirEntry dirEntry) throws IOException {
final EntryStatus status = dirEntry.getStatus();

View File

@ -18,29 +18,42 @@
package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DirEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.EntryFileIO;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.LoadedManifestData;
import org.apache.hadoop.util.functional.TaskPool;
import static org.apache.commons.io.FileUtils.byteCountToDisplaySize;
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.snapshotIOStatistics;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASK_DIRECTORY_COUNT_MEAN;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASK_FILE_COUNT_MEAN;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_TASK_MANIFEST_FILE_SIZE;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_LOAD_ALL_MANIFESTS;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_LOAD_MANIFESTS;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.maybeAddIOStatistics;
import static org.apache.hadoop.util.functional.RemoteIterators.haltableRemoteIterator;
/**
* Stage to load all the task manifests in the job attempt directory.
@ -52,7 +65,7 @@
*/
public class LoadManifestsStage extends
AbstractJobOrTaskStage<
Boolean,
LoadManifestsStage.Arguments,
LoadManifestsStage.Result> {
private static final Logger LOG = LoggerFactory.getLogger(
@ -64,14 +77,14 @@ public class LoadManifestsStage extends
private final SummaryInfo summaryInfo = new SummaryInfo();
/**
* Should manifests be pruned of IOStatistics?
* Map of directories from manifests, coalesced to reduce duplication.
*/
private boolean pruneManifests;
private final Map<String, DirEntry> directories = new ConcurrentHashMap<>();
/**
* List of loaded manifests.
* Writer of entries.
*/
private final List<TaskManifest> manifests = new ArrayList<>();
private EntryFileIO.EntryWriter entryWriter;
public LoadManifestsStage(final StageConfig stageConfig) {
super(false, stageConfig, OP_STAGE_JOB_LOAD_MANIFESTS, true);
@ -79,43 +92,83 @@ public LoadManifestsStage(final StageConfig stageConfig) {
/**
* Load the manifests.
* @param prune should manifests be pruned of IOStatistics?
* @param arguments stage arguments
* @return the summary and a list of manifests.
* @throws IOException IO failure.
*/
@Override
protected LoadManifestsStage.Result executeStage(
final Boolean prune) throws IOException {
final LoadManifestsStage.Arguments arguments) throws IOException {
EntryFileIO entryFileIO = new EntryFileIO(getStageConfig().getConf());
final Path manifestDir = getTaskManifestDir();
LOG.info("{}: Executing Manifest Job Commit with manifests in {}",
getName(),
manifestDir);
pruneManifests = prune;
// build a list of all task manifests successfully committed
//
msync(manifestDir);
final RemoteIterator<FileStatus> manifestFiles = listManifests();
final List<TaskManifest> manifestList = loadAllManifests(manifestFiles);
LOG.info("{}: Summary of {} manifests loaded in {}: {}",
getName(),
manifestList.size(),
manifestDir,
summaryInfo);
final Path entrySequenceData = arguments.getEntrySequenceData();
// collect any stats
maybeAddIOStatistics(getIOStatistics(), manifestFiles);
return new LoadManifestsStage.Result(summaryInfo, manifestList);
// the entry writer for queuing data.
entryWriter = entryFileIO.launchEntryWriter(
entryFileIO.createWriter(entrySequenceData),
arguments.queueCapacity);
try {
// sync fs before the list
msync(manifestDir);
// build a list of all task manifests successfully committed,
// which will break out if the writing is stopped (due to any failure)
final RemoteIterator<FileStatus> manifestFiles =
haltableRemoteIterator(listManifests(),
() -> entryWriter.isActive());
processAllManifests(manifestFiles);
maybeAddIOStatistics(getIOStatistics(), manifestFiles);
LOG.info("{}: Summary of {} manifests loaded in {}: {}",
getName(),
summaryInfo.manifestCount,
manifestDir,
summaryInfo);
// close cleanly
entryWriter.close();
// if anything failed, raise it.
entryWriter.maybeRaiseWriteException();
// collect any stats
} catch (EntryWriteException e) {
// something went wrong while writing.
// raise anything on the write thread,
entryWriter.maybeRaiseWriteException();
// falling back to that from the worker thread
throw e;
} finally {
// close which is a no-op if the clean close was invoked;
// it is not a no-op if something went wrong with reading/parsing/processing
// the manifests.
entryWriter.close();
}
final LoadedManifestData loadedManifestData = new LoadedManifestData(
new ArrayList<>(directories.values()), // new array to free up the map
entrySequenceData,
entryWriter.getCount());
return new LoadManifestsStage.Result(summaryInfo, loadedManifestData);
}
/**
* Load all the manifests.
* Load and process all the manifests.
* @param manifestFiles list of manifest files.
* @return the loaded manifests.
* @throws IOException IO Failure.
* @throws IOException failure to load/parse/queue
*/
private List<TaskManifest> loadAllManifests(
private void processAllManifests(
final RemoteIterator<FileStatus> manifestFiles) throws IOException {
trackDurationOfInvocation(getIOStatistics(), OP_LOAD_ALL_MANIFESTS, () ->
@ -123,33 +176,73 @@ private List<TaskManifest> loadAllManifests(
.executeWith(getIOProcessors())
.stopOnFailure()
.run(this::processOneManifest));
return manifests;
}
/**
* Method invoked to process one manifest.
* @param status file to process.
* @throws IOException failure to load/parse
* @throws IOException failure to load/parse/queue
*/
private void processOneManifest(FileStatus status)
throws IOException {
updateAuditContext(OP_LOAD_ALL_MANIFESTS);
TaskManifest m = fetchTaskManifest(status);
TaskManifest manifest = fetchTaskManifest(status);
progress();
// update the manifest list in a synchronized block.
// update the directories
final int created = coalesceDirectories(manifest);
final String attemptID = manifest.getTaskAttemptID();
LOG.debug("{}: task attempt {} added {} directories",
getName(), attemptID, created);
synchronized (manifests) {
manifests.add(m);
// and the summary info in the same block, to
// eliminate the need to acquire a second lock.
summaryInfo.add(m);
// add to the summary.
summaryInfo.add(manifest);
// clear the manifest extra data so if
// blocked waiting for queue capacity,
// memory use is reduced.
manifest.setIOStatistics(null);
manifest.getExtraData().clear();
// queue those files.
final boolean enqueued = entryWriter.enqueue(manifest.getFilesToCommit());
if (!enqueued) {
LOG.warn("{}: Failed to write manifest for task {}",
getName(), attemptID);
throw new EntryWriteException(attemptID);
}
if (pruneManifests) {
m.setIOStatistics(null);
m.getExtraData().clear();
}
/**
* Coalesce all directories and clear the entry in the manifest.
* There's only ever one writer at a time, which it is hoped reduces
* contention. before the lock is acquired: if there are no new directories,
* the write lock is never needed.
* @param manifest manifest to process
* @return the number of directories created;
*/
@VisibleForTesting
int coalesceDirectories(final TaskManifest manifest) {
// build a list of dirs to create.
// this scans the map
final List<DirEntry> toCreate = manifest.getDestDirectories().stream()
.filter(e -> !directories.containsKey(e))
.collect(Collectors.toList());
if (!toCreate.isEmpty()) {
// need to add more directories;
// still a possibility that they may be created between the
// filtering and this thread having the write lock.
synchronized (directories) {
toCreate.forEach(entry -> {
directories.putIfAbsent(entry.getDir(), entry);
});
}
}
return toCreate.size();
}
/**
@ -173,55 +266,121 @@ private TaskManifest fetchTaskManifest(FileStatus status)
final long size = manifest.getTotalFileSize();
LOG.info("{}: Task Attempt {} file {}: File count: {}; data size={}",
getName(), id, status.getPath(), filecount, size);
// record file size for tracking of memory consumption.
getIOStatistics().addMeanStatisticSample(COMMITTER_TASK_MANIFEST_FILE_SIZE,
status.getLen());
// record file size for tracking of memory consumption, work etc.
final IOStatisticsStore iostats = getIOStatistics();
iostats.addSample(COMMITTER_TASK_MANIFEST_FILE_SIZE, status.getLen());
iostats.addSample(COMMITTER_TASK_FILE_COUNT_MEAN, filecount);
iostats.addSample(COMMITTER_TASK_DIRECTORY_COUNT_MEAN,
manifest.getDestDirectories().size());
return manifest;
}
/**
* Stage arguments.
*/
public static final class Arguments {
/**
* File where the listing has been saved.
*/
private final File entrySequenceFile;
/**
* Capacity for queue between manifest loader and the writers.
*/
private final int queueCapacity;
/**
* Arguments.
* @param entrySequenceFile path to local file to create for storing entries
* @param queueCapacity capacity of the queue
*/
public Arguments(
final File entrySequenceFile,
final int queueCapacity) {
this.entrySequenceFile = entrySequenceFile;
this.queueCapacity = queueCapacity;
}
private Path getEntrySequenceData() {
return new Path(entrySequenceFile.toURI());
}
}
/**
* Result of the stage.
*/
public static final class Result {
private final SummaryInfo summary;
private final List<TaskManifest> manifests;
/**
* Output of this stage to pass on to the subsequence stages.
*/
private final LoadedManifestData loadedManifestData;
public Result(SummaryInfo summary,
List<TaskManifest> manifests) {
/**
* Result.
* @param summary summary of jobs
* @param loadedManifestData all loaded manifest data
*/
public Result(
final SummaryInfo summary,
final LoadedManifestData loadedManifestData) {
this.summary = summary;
this.manifests = manifests;
this.loadedManifestData = loadedManifestData;
}
public SummaryInfo getSummary() {
return summary;
}
public List<TaskManifest> getManifests() {
return manifests;
public LoadedManifestData getLoadedManifestData() {
return loadedManifestData;
}
}
/**
* IOE to raise on queueing failure.
*/
public static final class EntryWriteException extends IOException {
private EntryWriteException(String taskId) {
super("Failed to write manifest data for task "
+ taskId + "to local file");
}
}
/**
* Summary information.
* Implementation note: atomic counters are used here to keep spotbugs quiet,
* not because of any concurrency risks.
*/
public static final class SummaryInfo implements IOStatisticsSource {
/**
* Aggregate IOStatistics.
*/
private IOStatisticsSnapshot iostatistics = snapshotIOStatistics();
private final IOStatisticsSnapshot iostatistics = snapshotIOStatistics();
/**
* Task IDs.
*/
private final List<String> taskIDs = new ArrayList<>();
/**
* Task IDs.
*/
private final List<String> taskAttemptIDs = new ArrayList<>();
/**
* How many manifests were loaded.
*/
private long manifestCount;
private AtomicLong manifestCount = new AtomicLong();
/**
* Total number of files to rename.
*/
private long fileCount;
private AtomicLong fileCount = new AtomicLong();
/**
* Total number of directories which may need
@ -229,12 +388,12 @@ public static final class SummaryInfo implements IOStatisticsSource {
* As there is no dedup, this is likely to be
* a (major) overestimate.
*/
private long directoryCount;
private AtomicLong directoryCount = new AtomicLong();
/**
* Total amount of data to be committed.
*/
private long totalFileSize;
private AtomicLong totalFileSize = new AtomicLong();
/**
* Get the IOStatistics.
@ -246,31 +405,41 @@ public IOStatisticsSnapshot getIOStatistics() {
}
public long getFileCount() {
return fileCount;
return fileCount.get();
}
public long getDirectoryCount() {
return directoryCount;
return directoryCount.get();
}
public long getTotalFileSize() {
return totalFileSize;
return totalFileSize.get();
}
public long getManifestCount() {
return manifestCount;
return manifestCount.get();
}
public List<String> getTaskIDs() {
return taskIDs;
}
public List<String> getTaskAttemptIDs() {
return taskAttemptIDs;
}
/**
* Add all statistics.
* Add all statistics; synchronized.
* @param manifest manifest to add.
*/
public void add(TaskManifest manifest) {
manifestCount++;
public synchronized void add(TaskManifest manifest) {
manifestCount.incrementAndGet();
iostatistics.aggregate(manifest.getIOStatistics());
fileCount += manifest.getFilesToCommit().size();
directoryCount += manifest.getDestDirectories().size();
totalFileSize += manifest.getTotalFileSize();
fileCount.addAndGet(manifest.getFilesToCommit().size());
directoryCount.addAndGet(manifest.getDestDirectories().size());
totalFileSize.addAndGet(manifest.getTotalFileSize());
taskIDs.add(manifest.getTaskID());
taskAttemptIDs.add(manifest.getTaskAttemptID());
}
/**
@ -281,11 +450,11 @@ public void add(TaskManifest manifest) {
public String toString() {
final StringBuilder sb = new StringBuilder(
"SummaryInfo{");
sb.append("manifestCount=").append(manifestCount);
sb.append(", fileCount=").append(fileCount);
sb.append(", directoryCount=").append(directoryCount);
sb.append("manifestCount=").append(getManifestCount());
sb.append(", fileCount=").append(getFileCount());
sb.append(", directoryCount=").append(getDirectoryCount());
sb.append(", totalFileSize=").append(
byteCountToDisplaySize(totalFileSize));
byteCountToDisplaySize(getTotalFileSize()));
sb.append('}');
return sb.toString();
}

View File

@ -27,23 +27,30 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.EntryFileIO;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.LoadedManifestData;
import org.apache.hadoop.util.functional.TaskPool;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SUCCESS_MARKER_FILE_LIMIT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_COMMIT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_RENAME_FILES;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createManifestOutcome;
import static org.apache.hadoop.thirdparty.com.google.common.collect.Iterables.concat;
/**
* This stage renames all the files.
* Input: the manifests and the set of directories created, as returned by
* {@link CreateOutputDirectoriesStage}.
* Input:
* <ol>
* <li>{@link LoadedManifestData} from the {@link LoadManifestsStage}</li>
* <li>the set of directories created, as returned by
* {@link CreateOutputDirectoriesStage}.</li>
* </ol>
* The files to rename are determined by reading the entry file referenced
* in the {@link LoadedManifestData}; these are read and renamed incrementally.
*
* If the job is configured to delete target files, if the parent dir
* had to be created, the delete() call can be skipped.
* It returns a manifest success data file summarizing the
@ -51,7 +58,7 @@
*/
public class RenameFilesStage extends
AbstractJobOrTaskStage<
Pair<List<TaskManifest>, Set<Path>>,
Triple<LoadedManifestData, Set<Path>, Integer>,
ManifestSuccessData> {
private static final Logger LOG = LoggerFactory.getLogger(
@ -92,37 +99,36 @@ public synchronized long getTotalFileSize() {
/**
* Rename files in job commit.
* @param taskManifests a list of task manifests containing files.
* @param args tuple of (manifest data, set of created dirs)
* @return the job report.
* @throws IOException failure
*/
@Override
protected ManifestSuccessData executeStage(
Pair<List<TaskManifest>, Set<Path>> args)
Triple<LoadedManifestData, Set<Path>, Integer> args)
throws IOException {
final List<TaskManifest> taskManifests = args.getLeft();
createdDirectories = args.getRight();
final LoadedManifestData manifestData = args.getLeft();
createdDirectories = args.getMiddle();
final EntryFileIO entryFileIO = new EntryFileIO(getStageConfig().getConf());
final ManifestSuccessData success = createManifestOutcome(getStageConfig(),
OP_STAGE_JOB_COMMIT);
final int manifestCount = taskManifests.size();
LOG.info("{}: Executing Manifest Job Commit with {} manifests in {}",
getName(), manifestCount, getTaskManifestDir());
LOG.info("{}: Executing Manifest Job Commit with {} files",
getName(), manifestData.getFileCount());
// first step is to aggregate the output of all manifests into a single
// list of files to commit.
// Which Guava can do in a zero-copy concatenated iterator
// iterate over the entries in the file.
try (SequenceFile.Reader reader = entryFileIO.createReader(
manifestData.getEntrySequenceData())) {
final Iterable<FileEntry> filesToCommit = concat(taskManifests.stream()
.map(TaskManifest::getFilesToCommit)
.collect(Collectors.toList()));
TaskPool.foreach(filesToCommit)
.executeWith(getIOProcessors())
.stopOnFailure()
.run(this::commitOneFile);
TaskPool.foreach(entryFileIO.iterateOver(reader))
.executeWith(getIOProcessors())
.stopOnFailure()
.run(this::commitOneFile);
}
// synchronized block to keep spotbugs happy.
List<FileEntry> committed = getFilesCommitted();
@ -133,7 +139,7 @@ protected ManifestSuccessData executeStage(
// enough for simple testing
success.setFilenamePaths(
committed
.subList(0, Math.min(committed.size(), SUCCESS_MARKER_FILE_LIMIT))
.subList(0, Math.min(committed.size(), args.getRight()))
.stream().map(FileEntry::getDestPath)
.collect(Collectors.toList()));

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
@ -28,7 +29,9 @@
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.functional.TaskPool;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.DEFAULT_WRITER_QUEUE_CAPACITY;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SUCCESS_MARKER;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SUCCESS_MARKER_FILE_LIMIT;
/**
* Stage Config.
@ -152,6 +155,23 @@ public class StageConfig {
*/
private String name = "";
/**
* Configuration used where needed.
* Default value is a configuration with the normal constructor;
* jobs should override this with what was passed down.
*/
private Configuration conf = new Configuration();
/**
* Entry writer queue capacity.
*/
private int writerQueueCapacity = DEFAULT_WRITER_QUEUE_CAPACITY;
/**
* Number of marker files to include in success file.
*/
private int successMarkerFileLimit = SUCCESS_MARKER_FILE_LIMIT;
public StageConfig() {
}
@ -405,6 +425,42 @@ public String getName() {
return name;
}
/**
* Set configuration.
* @param value new value
* @return the builder
*/
public StageConfig withConfiguration(Configuration value) {
conf = value;
return this;
}
/**
* Get configuration.
* @return the configuration
*/
public Configuration getConf() {
return conf;
}
/**
* Get writer queue capacity.
* @return the queue capacity
*/
public int getWriterQueueCapacity() {
return writerQueueCapacity;
}
/**
* Set writer queue capacity.
* @param value new value
* @return the builder
*/
public StageConfig withWriterQueueCapacity(final int value) {
writerQueueCapacity = value;
return this;
}
/**
* Handler for stage entry events.
* @return the handler.
@ -532,6 +588,22 @@ public boolean getDeleteTargetPaths() {
return deleteTargetPaths;
}
/**
* Number of marker files to include in success file.
* @param value new value
* @return the builder
*/
public StageConfig withSuccessMarkerFileLimit(final int value) {
checkOpen();
successMarkerFileLimit = value;
return this;
}
public int getSuccessMarkerFileLimit() {
return successMarkerFileLimit;
}
/**
* Enter the stage; calls back to
* {@link #enterStageEventHandler} if non-null.

View File

@ -22,23 +22,21 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.EntryFileIO;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.OutputValidationException;
import org.apache.hadoop.util.functional.TaskPool;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_VALIDATE_OUTPUT;
import static org.apache.hadoop.thirdparty.com.google.common.collect.Iterables.concat;
/**
* This stage validates all files by scanning the manifests
@ -50,17 +48,12 @@
*/
public class ValidateRenamedFilesStage extends
AbstractJobOrTaskStage<
List<TaskManifest>,
Path,
List<FileEntry>> {
private static final Logger LOG = LoggerFactory.getLogger(
ValidateRenamedFilesStage.class);
/**
* Set this to halt all workers.
*/
private final AtomicBoolean halt = new AtomicBoolean();
/**
* List of all files committed.
*/
@ -93,34 +86,27 @@ private synchronized void addFileCommitted(FileEntry entry) {
* has a file in the destination of the same size.
* If two tasks have both written the same file or
* a source file was changed after the task was committed,
* then a mistmatch will be detected -provided the file
* then a mismatch will be detected -provided the file
* length is now different.
* @param taskManifests list of manifests.
* @param entryFile path to entry file
* @return list of files committed.
*/
@Override
protected List<FileEntry> executeStage(
final List<TaskManifest> taskManifests)
final Path entryFile)
throws IOException {
// set the list of files to be as big as the number of tasks.
// synchronized to stop complaints.
synchronized (this) {
filesCommitted = new ArrayList<>(taskManifests.size());
final EntryFileIO entryFileIO = new EntryFileIO(getStageConfig().getConf());
try (SequenceFile.Reader reader = entryFileIO.createReader(entryFile)) {
// iterate over the entries in the file.
TaskPool.foreach(entryFileIO.iterateOver(reader))
.executeWith(getIOProcessors())
.stopOnFailure()
.run(this::validateOneFile);
return getFilesCommitted();
}
// validate all the files.
final Iterable<FileEntry> filesToCommit = concat(taskManifests.stream()
.map(TaskManifest::getFilesToCommit)
.collect(Collectors.toList()));
TaskPool.foreach(filesToCommit)
.executeWith(getIOProcessors())
.stopOnFailure()
.run(this::validateOneFile);
return getFilesCommitted();
}
/**
@ -132,10 +118,6 @@ protected List<FileEntry> executeStage(
private void validateOneFile(FileEntry entry) throws IOException {
updateAuditContext(OP_STAGE_JOB_VALIDATE_OUTPUT);
if (halt.get()) {
// told to stop
return;
}
// report progress back
progress();
// look validate the file.
@ -157,7 +139,8 @@ private void validateOneFile(FileEntry entry) throws IOException {
// etags, if the source had one.
final String sourceEtag = entry.getEtag();
if (isNotBlank(sourceEtag)) {
if (getOperations().storePreservesEtagsThroughRenames(destStatus.getPath())
&& isNotBlank(sourceEtag)) {
final String destEtag = ManifestCommitterSupport.getEtag(destStatus);
if (!sourceEtag.equals(destEtag)) {
LOG.warn("Etag of dest file {}: {} does not match that of manifest entry {}",

View File

@ -207,17 +207,18 @@ in the option `mapreduce.manifest.committer.io.threads`.
Larger values may be used.
XML
Hadoop XML configuration
```xml
<property>
<name>mapreduce.manifest.committer.io.threads</name>
<value>200</value>
<value>32</value>
</property>
```
spark-defaults.conf
```
spark.hadoop.mapreduce.manifest.committer.io.threads 200
In `spark-defaults.conf`
```properties
spark.hadoop.mapreduce.manifest.committer.io.threads 32
```
A larger value than that of the number of cores allocated to
@ -225,6 +226,10 @@ the MapReduce AM or Spark Driver does not directly overload
the CPUs, as the threads are normally waiting for (slow) IO
against the object store/filesystem to complete.
Manifest loading in job commit may be memory intensive;
the larger the number of threads, the more manifests which
will be loaded simultaneously.
Caveats
* In Spark, multiple jobs may be committed in the same process,
each of which will create their own thread pool during job
@ -234,6 +239,36 @@ Caveats
`mapreduce.manifest.committer.io.rate` can help avoid this.
### `mapreduce.manifest.committer.writer.queue.capacity`
This is a secondary scale option.
It controls the size of the queue for storing lists of files to rename from
the manifests loaded from the target filesystem, manifests loaded
from a pool of worker threads, and the single thread which saves
the entries from each manifest to an intermediate file in the local filesystem.
Once the queue is full, all manifest loading threads will block.
```xml
<property>
<name>mapreduce.manifest.committer.writer.queue.capacity</name>
<value>32</value>
</property>
```
As the local filesystem is usually much faster to write to than any cloud store,
this queue size should not be a limit on manifest load performance.
It can help limit the amount of memory consumed during manifest load during
job commit.
The maximum number of loaded manifests will be:
```
mapreduce.manifest.committer.writer.queue.capacity + mapreduce.manifest.committer.io.threads
```
## <a name="deleting"></a> Optional: deleting target files in Job Commit
The classic `FileOutputCommitter` deletes files at the destination paths
@ -611,13 +646,14 @@ spark.hadoop.mapreduce.manifest.committer.summary.report.directory (optional: U
There are some advanced options which are intended for development and testing,
rather than production use.
| Option | Meaning | Default Value |
|--------|---------|---------------|
| `mapreduce.manifest.committer.store.operations.classname` | Classname for Manifest Store Operations | `""` |
| `mapreduce.manifest.committer.validate.output` | Perform output validation? | `false` |
| Option | Meaning | Default Value |
|--------|----------------------------------------------|---------------|
| `mapreduce.manifest.committer.store.operations.classname` | Classname for Manifest Store Operations | `""` |
| `mapreduce.manifest.committer.validate.output` | Perform output validation? | `false` |
| `mapreduce.manifest.committer.writer.queue.capacity` | Queue capacity for writing intermediate file | `32` |
## Validating output `mapreduce.manifest.committer.validate.output`
### Validating output `mapreduce.manifest.committer.validate.output`
The option `mapreduce.manifest.committer.validate.output` triggers a check of every renamed file to
verify it has the expected length.
@ -626,7 +662,7 @@ This adds the overhead of a `HEAD` request per file, and so is recommended for t
There is no verification of the actual contents.
## Controlling storage integration `mapreduce.manifest.committer.store.operations.classname`
### Controlling storage integration `mapreduce.manifest.committer.store.operations.classname`
The manifest committer interacts with filesystems through implementations of the interface
`ManifestStoreOperations`.

View File

@ -756,66 +756,74 @@ private void testConcurrentCommitTaskWithSubDir(int version)
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
version);
conf.setClass("fs.file.impl", RLFS.class, FileSystem.class);
final String fileImpl = "fs.file.impl";
final String fileImplClassname = "org.apache.hadoop.fs.LocalFileSystem";
conf.setClass(fileImpl, RLFS.class, FileSystem.class);
FileSystem.closeAll();
final JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
final FileOutputCommitter amCommitter =
new FileOutputCommitter(outDir, jContext);
amCommitter.setupJob(jContext);
final TaskAttemptContext[] taCtx = new TaskAttemptContextImpl[2];
taCtx[0] = new TaskAttemptContextImpl(conf, taskID);
taCtx[1] = new TaskAttemptContextImpl(conf, taskID1);
final TextOutputFormat[] tof = new TextOutputFormat[2];
for (int i = 0; i < tof.length; i++) {
tof[i] = new TextOutputFormat() {
@Override
public Path getDefaultWorkFile(TaskAttemptContext context,
String extension) throws IOException {
final FileOutputCommitter foc = (FileOutputCommitter)
getOutputCommitter(context);
return new Path(new Path(foc.getWorkPath(), SUB_DIR),
getUniqueFile(context, getOutputName(context), extension));
}
};
}
final ExecutorService executor = HadoopExecutors.newFixedThreadPool(2);
try {
for (int i = 0; i < taCtx.length; i++) {
final int taskIdx = i;
executor.submit(new Callable<Void>() {
final JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
final FileOutputCommitter amCommitter =
new FileOutputCommitter(outDir, jContext);
amCommitter.setupJob(jContext);
final TaskAttemptContext[] taCtx = new TaskAttemptContextImpl[2];
taCtx[0] = new TaskAttemptContextImpl(conf, taskID);
taCtx[1] = new TaskAttemptContextImpl(conf, taskID1);
final TextOutputFormat[] tof = new TextOutputFormat[2];
for (int i = 0; i < tof.length; i++) {
tof[i] = new TextOutputFormat() {
@Override
public Void call() throws IOException, InterruptedException {
final OutputCommitter outputCommitter =
tof[taskIdx].getOutputCommitter(taCtx[taskIdx]);
outputCommitter.setupTask(taCtx[taskIdx]);
final RecordWriter rw =
tof[taskIdx].getRecordWriter(taCtx[taskIdx]);
writeOutput(rw, taCtx[taskIdx]);
outputCommitter.commitTask(taCtx[taskIdx]);
return null;
public Path getDefaultWorkFile(TaskAttemptContext context,
String extension) throws IOException {
final FileOutputCommitter foc = (FileOutputCommitter)
getOutputCommitter(context);
return new Path(new Path(foc.getWorkPath(), SUB_DIR),
getUniqueFile(context, getOutputName(context), extension));
}
});
};
}
final ExecutorService executor = HadoopExecutors.newFixedThreadPool(2);
try {
for (int i = 0; i < taCtx.length; i++) {
final int taskIdx = i;
executor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException, InterruptedException {
final OutputCommitter outputCommitter =
tof[taskIdx].getOutputCommitter(taCtx[taskIdx]);
outputCommitter.setupTask(taCtx[taskIdx]);
final RecordWriter rw =
tof[taskIdx].getRecordWriter(taCtx[taskIdx]);
writeOutput(rw, taCtx[taskIdx]);
outputCommitter.commitTask(taCtx[taskIdx]);
return null;
}
});
}
} finally {
executor.shutdown();
while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
LOG.info("Awaiting thread termination!");
}
}
amCommitter.commitJob(jContext);
final RawLocalFileSystem lfs = new RawLocalFileSystem();
lfs.setConf(conf);
assertFalse("Must not end up with sub_dir/sub_dir",
lfs.exists(new Path(OUT_SUB_DIR, SUB_DIR)));
// validate output
validateContent(OUT_SUB_DIR);
FileUtil.fullyDelete(new File(outDir.toString()));
} finally {
executor.shutdown();
while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
LOG.info("Awaiting thread termination!");
}
// needed to avoid this test contaminating others in the same JVM
FileSystem.closeAll();
conf.set(fileImpl, fileImplClassname);
}
amCommitter.commitJob(jContext);
final RawLocalFileSystem lfs = new RawLocalFileSystem();
lfs.setConf(conf);
assertFalse("Must not end up with sub_dir/sub_dir",
lfs.exists(new Path(OUT_SUB_DIR, SUB_DIR)));
// validate output
validateContent(OUT_SUB_DIR);
FileUtil.fullyDelete(new File(outDir.toString()));
}
@Test

View File

@ -74,6 +74,7 @@
import static org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER;
import static org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory.COMMITTER_FACTORY_CLASS;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConfig.createCloseableTaskSubmitter;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.DEFAULT_WRITER_QUEUE_CAPACITY;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.JOB_ID_SOURCE_MAPREDUCE;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.MANIFEST_COMMITTER_FACTORY;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_DIAGNOSTICS_MANIFEST_DIR;
@ -787,6 +788,7 @@ protected StageConfig createStageConfig(
jobId, jobAttemptNumber);
StageConfig config = new StageConfig();
config
.withConfiguration(getConfiguration())
.withIOProcessors(getSubmitter())
.withIOStatistics(getStageStatistics())
.withJobId(jobId)
@ -795,7 +797,9 @@ protected StageConfig createStageConfig(
.withJobDirectories(attemptDirs)
.withName(String.format(NAME_FORMAT_JOB_ATTEMPT, jobId))
.withOperations(getStoreOperations())
.withProgressable(getProgressCounter());
.withProgressable(getProgressCounter())
.withSuccessMarkerFileLimit(100_000)
.withWriterQueueCapacity(DEFAULT_WRITER_QUEUE_CAPACITY);
// if there's a task attempt ID set, set up its details
if (taskIndex >= 0) {

View File

@ -24,6 +24,7 @@
import java.io.PrintStream;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -37,6 +38,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@ -47,11 +49,15 @@
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestPrinter;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.EntryFileIO;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.LoadedManifestData;
import org.apache.hadoop.util.functional.RemoteIterators;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.MANIFEST_COMMITTER_CLASSNAME;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SUCCESS_MARKER;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.EntryFileIO.toPath;
import static org.assertj.core.api.Assertions.assertThat;
/**
@ -76,7 +82,14 @@ public final class ManifestCommitterTestSupport {
* default number of task attempts for some tests.
* Value: {@value}.
*/
public static final int NUMBER_OF_TASK_ATTEMPTS = 200;
public static final int NUMBER_OF_TASK_ATTEMPTS = 2000;
/**
* Smaller number of task attempts for some tests against object
* stores where IO overhead is higher.
* Value: {@value}.
*/
public static final int NUMBER_OF_TASK_ATTEMPTS_SMALL = 200;
private ManifestCommitterTestSupport() {
}
@ -193,29 +206,35 @@ public static Map<Path, LocatedFileStatus> validateGeneratedFiles(
Path destDir,
ManifestSuccessData successData,
boolean exclusive) throws IOException {
Map<Path, LocatedFileStatus> map = new HashMap<>();
Map<Path, LocatedFileStatus> fileListing = new HashMap<>();
RemoteIterators.foreach(fs.listFiles(destDir, true),
e -> {
if (!e.getPath().getName().startsWith("_")) {
map.put(e.getPath(), e);
fileListing.put(e.getPath(), e);
}
});
final List<Path> actual = fileListing.keySet().stream()
.sorted(Comparator.comparing(Path::getName))
.collect(Collectors.toList());
// map has all files other than temp ones and the success marker
// what do we expect
final List<Path> expected = filesInManifest(successData);
expected.sort(Comparator.comparing(Path::getName));
// all of those must be found
Assertions.assertThat(map.keySet())
.describedAs("Files in FS compared to manifest")
Assertions.assertThat(actual)
.describedAs("Files in FS expected to contain all listed in manifest")
.containsAll(expected);
// and if exclusive, that too
if (exclusive) {
Assertions.assertThat(map.keySet())
.describedAs("Files in FS compared to manifest")
Assertions.assertThat(actual)
.describedAs("Files in FS expected to be exclusively of the job")
.hasSize(expected.size())
.containsExactlyInAnyOrderElementsOf(expected);
}
return map;
return fileListing;
}
/**
@ -295,6 +314,24 @@ static void assertDirEntryMatch(
.isEqualTo(type);
}
/**
* Save a manifest to an entry file; returning the loaded manifest data.
* Caller MUST clean up the temp file.
* @param entryFileIO IO class
* @param manifest manifest to process.
* @return info about the load
* @throws IOException write failure
*/
public static LoadedManifestData saveManifest(EntryFileIO entryFileIO, TaskManifest manifest)
throws IOException {
final File tempFile = File.createTempFile("entries", ".seq");
final SequenceFile.Writer writer = entryFileIO.createWriter(tempFile);
return new LoadedManifestData(
manifest.getDestDirectories(),
toPath(tempFile),
EntryFileIO.write(writer, manifest.getFilesToCommit(), true));
}
/**
* Closeable which can be used to safely close writers in
* a try-with-resources block..

View File

@ -32,11 +32,9 @@
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.DirEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.EntryStatus;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CreateOutputDirectoriesStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupJobStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageConfig;
import org.apache.hadoop.util.Lists;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupCounterStatistic;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
@ -103,14 +101,14 @@ public void testPrepareSomeDirs() throws Throwable {
final long initialFileStatusCount = lookupCounterStatistic(iostats, OP_GET_FILE_STATUS);
final int dirCount = 8;
// add duplicate entries to the list even though in the current iteration
// that couldn't happen.
final List<Path> dirs = subpaths(destDir, dirCount);
final List<DirEntry> dirEntries = dirEntries(dirs, 1, EntryStatus.not_found);
dirEntries.addAll(dirEntries(dirs, 1, EntryStatus.not_found));
// two manifests with duplicate entries
final List<TaskManifest> manifests = Lists.newArrayList(
manifestWithDirsToCreate(dirEntries),
manifestWithDirsToCreate(dirEntries));
final CreateOutputDirectoriesStage.Result result = mkdirStage.apply(manifests);
final CreateOutputDirectoriesStage.Result result = mkdirStage.apply(dirEntries);
Assertions.assertThat(result.getCreatedDirectories())
.describedAs("output of %s", mkdirStage)
.containsExactlyInAnyOrderElementsOf(dirs);
@ -125,8 +123,7 @@ public void testPrepareSomeDirs() throws Throwable {
final CreateOutputDirectoriesStage s2 =
new CreateOutputDirectoriesStage(stageConfig);
final CreateOutputDirectoriesStage.Result r2 = s2.apply(
Lists.newArrayList(
manifestWithDirsToCreate(dirEntries(dirs, 1, EntryStatus.dir))));
dirEntries(dirs, 1, EntryStatus.dir));
// no directories are now created.
Assertions.assertThat(r2.getCreatedDirectories())
@ -157,19 +154,6 @@ protected List<DirEntry> dirEntries(Collection<Path> paths,
.collect(Collectors.toList());
}
/**
* Create a manifest with the list of directory entries added.
* Job commit requires the entries to have been probed for, and
* for the entire tree under the dest path to be included.
* @param dirEntries list of directory entries.
* @return the manifest.
*/
protected TaskManifest manifestWithDirsToCreate(List<DirEntry> dirEntries) {
final TaskManifest taskManifest = new TaskManifest();
taskManifest.getDestDirectories().addAll(dirEntries);
return taskManifest;
}
/**
* Assert the directory map status of a path.
* @param result stage result
@ -241,12 +225,9 @@ public void testPrepareDirtyTree() throws Throwable {
parentIsDir.setStatus(EntryStatus.dir);
leafIsFile.setStatus(EntryStatus.file);
final List<TaskManifest> manifests = Lists.newArrayList(
manifestWithDirsToCreate(directories));
// first attempt will succeed.
final CreateOutputDirectoriesStage.Result result =
mkdirStage.apply(manifests);
mkdirStage.apply(directories);
LOG.info("Job Statistics\n{}", ioStatisticsToPrettyString(iostats));
@ -270,7 +251,7 @@ public void testPrepareDirtyTree() throws Throwable {
// attempt will fail because one of the entries marked as
// a file to delete is now a non-empty directory
LOG.info("Executing failing attempt to create the directories");
intercept(IOException.class, () -> attempt2.apply(manifests));
intercept(IOException.class, () -> attempt2.apply(directories));
verifyStatisticCounterValue(iostats, OP_PREPARE_DIR_ANCESTORS + SUFFIX_FAILURES, 1);
verifyStatisticCounterValue(iostats, OP_DELETE + SUFFIX_FAILURES, 1);
@ -281,14 +262,12 @@ public void testPrepareDirtyTree() throws Throwable {
directories3.addAll(dirEntries(level2, 2, EntryStatus.dir));
directories3.addAll(dirEntries(level3, 3, EntryStatus.dir));
final List<TaskManifest> manifests3 = Lists.newArrayList(
manifestWithDirsToCreate(directories3));
CreateOutputDirectoriesStage attempt3 =
new CreateOutputDirectoriesStage(
createStageConfigForJob(JOB1, destDir)
.withDeleteTargetPaths(true));
final CreateOutputDirectoriesStage.Result r3 =
attempt3.apply(manifests3);
attempt3.apply(directories3);
assertDirMapStatus(r3, leafIsFile.getDestPath(),
CreateOutputDirectoriesStage.DirMapState.dirFoundInStore);
Assertions.assertThat(r3.getCreatedDirectories())

View File

@ -18,13 +18,14 @@
package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runners.MethodSorters;
@ -36,6 +37,7 @@
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.LoadedManifestData;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.OutputValidationException;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbortTaskStage;
@ -53,6 +55,7 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.DEFAULT_WRITER_QUEUE_CAPACITY;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.JOB_ID_SOURCE_MAPREDUCE;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.MANIFEST_COMMITTER_CLASSNAME;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.COMMITTER_BYTES_COMMITTED_COUNT;
@ -141,6 +144,14 @@ public class TestJobThroughManifestCommitter
*/
private StageConfig ta11Config;
/**
* Loaded manifest data, set in job commit and used in validation.
* This is static so it can be passed from where it is loaded
* {@link #test_0400_loadManifests()} to subsequent tests.
*/
private static LoadedManifestData
loadedManifestData;
@Override
public void setup() throws Exception {
super.setup();
@ -442,19 +453,24 @@ public void test_0340_setupThenAbortTask11() throws Throwable {
@Test
public void test_0400_loadManifests() throws Throwable {
describe("Load all manifests; committed must be TA01 and TA10");
File entryFile = File.createTempFile("entry", ".seq");
LoadManifestsStage.Arguments args = new LoadManifestsStage.Arguments(
entryFile, DEFAULT_WRITER_QUEUE_CAPACITY);
LoadManifestsStage.Result result
= new LoadManifestsStage(getJobStageConfig()).apply(true);
String summary = result.getSummary().toString();
= new LoadManifestsStage(getJobStageConfig()).apply(args);
loadedManifestData = result.getLoadedManifestData();
Assertions.assertThat(loadedManifestData)
.describedAs("manifest data from %s", result)
.isNotNull();
final LoadManifestsStage.SummaryInfo stageSummary = result.getSummary();
String summary = stageSummary.toString();
LOG.info("Manifest summary {}", summary);
List<TaskManifest> manifests = result.getManifests();
Assertions.assertThat(manifests)
.describedAs("Loaded manifests in %s", summary)
.hasSize(2);
Map<String, TaskManifest> manifestMap = toMap(manifests);
verifyManifestTaskAttemptID(
manifestMap.get(taskAttempt01), taskAttempt01);
verifyManifestTaskAttemptID(
manifestMap.get(taskAttempt10), taskAttempt10);
Assertions.assertThat(stageSummary.getTaskAttemptIDs())
.describedAs("Task attempts in %s", summary)
.hasSize(2)
.contains(taskAttempt01, taskAttempt10);
}
@Test
@ -473,19 +489,20 @@ public void test_0410_commitJob() throws Throwable {
public void test_0420_validateJob() throws Throwable {
describe("Validate the output of the job through the validation"
+ " stage");
Assumptions.assumeThat(loadedManifestData)
.describedAs("Loaded Manifest Data from earlier stage")
.isNotNull();
// load in the success data.
ManifestSuccessData successData = loadAndPrintSuccessData(
getFileSystem(),
getJobStageConfig().getJobSuccessMarkerPath());
// load manifests stage will load all the task manifests again
List<TaskManifest> manifests = new LoadManifestsStage(getJobStageConfig())
.apply(true).getManifests();
// Now verify their files exist, returning the list of renamed files.
List<String> committedFiles = new ValidateRenamedFilesStage(getJobStageConfig())
.apply(manifests)
final List<FileEntry> validatedEntries = new ValidateRenamedFilesStage(getJobStageConfig())
.apply(loadedManifestData.getEntrySequenceData());
List<String> committedFiles = validatedEntries
.stream().map(FileEntry::getDest)
.collect(Collectors.toList());
@ -497,24 +514,7 @@ public void test_0420_validateJob() throws Throwable {
Assertions.assertThat(committedFiles)
.containsAll(successData.getFilenames());
// now patch one of the manifest files by editing an entry
FileEntry entry = manifests.get(0).getFilesToCommit().get(0);
// no longer exists.
String oldName = entry.getDest();
String newName = oldName + ".missing";
entry.setDest(newName);
// validation will now fail
intercept(OutputValidationException.class, ".missing", () ->
new ValidateRenamedFilesStage(getJobStageConfig())
.apply(manifests));
// restore the name, but change the size
entry.setDest(oldName);
entry.setSize(128_000_000);
intercept(OutputValidationException.class, () ->
new ValidateRenamedFilesStage(getJobStageConfig())
.apply(manifests));
}
@Test
@ -558,7 +558,7 @@ public void test_0430_validateStatistics() throws Throwable {
}
@Test
public void test_440_validateSuccessFiles() throws Throwable {
public void test_0440_validateSuccessFiles() throws Throwable {
// load in the success data.
final FileSystem fs = getFileSystem();
@ -570,6 +570,30 @@ public void test_440_validateSuccessFiles() throws Throwable {
successData, false);
}
/**
* Verify that the validation stage will correctly report a failure
* if one of the files has as different name.
*/
@Test
public void test_0450_validationDetectsFailures() throws Throwable {
// delete an entry, repeat
final List<FileEntry> validatedEntries = new ValidateRenamedFilesStage(getJobStageConfig())
.apply(loadedManifestData.getEntrySequenceData());
final Path path = validatedEntries.get(0).getDestPath();
final Path p2 = new Path(path.getParent(), path.getName() + "-renamed");
final FileSystem fs = getFileSystem();
fs.rename(path, p2);
try {
intercept(OutputValidationException.class, () ->
new ValidateRenamedFilesStage(getJobStageConfig())
.apply(loadedManifestData.getEntrySequenceData()));
} finally {
// if this doesn't happen, later stages will fail.
fs.rename(p2, path);
}
}
@Test
public void test_0900_cleanupJob() throws Throwable {
describe("Cleanup job");

View File

@ -18,19 +18,32 @@
package org.apache.hadoop.mapreduce.lib.output.committer.manifest;
import java.io.File;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestPrinter;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestSuccessData;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CleanupJobStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.CreateOutputDirectoriesStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.LoadManifestsStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.SetupJobStage;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.StageConfig;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.DEFAULT_WRITER_QUEUE_CAPACITY;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_SUMMARY_REPORT_DIR;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_COMMIT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.addHeapInformation;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createJobSummaryFilename;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.createManifestOutcome;
/**
* Test loading manifests from a store.
@ -43,8 +56,12 @@
*/
public class TestLoadManifestsStage extends AbstractManifestCommitterTest {
public static final int FILES_PER_TASK_ATTEMPT = 100;
private int taskAttemptCount;
private File entryFile;
/**
* How many task attempts to make?
* Override point.
@ -63,6 +80,18 @@ public void setup() throws Exception {
.isGreaterThan(0);
}
@Override
public void teardown() throws Exception {
if (entryFile != null) {
entryFile.delete();
}
super.teardown();
}
public long heapSize() {
return Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
}
/**
* Build a large number of manifests, but without the real files
* and directories.
@ -79,28 +108,40 @@ public void testSaveThenLoadManyManifests() throws Throwable {
describe("Creating many manifests with fake file/dir entries,"
+ " load them and prepare the output dirs.");
int filesPerTaskAttempt = 10;
int filesPerTaskAttempt = FILES_PER_TASK_ATTEMPT;
LOG.info("Number of task attempts: {}, files per task attempt {}",
taskAttemptCount, filesPerTaskAttempt);
setJobStageConfig(createStageConfigForJob(JOB1, getDestDir()));
final StageConfig stageConfig = createStageConfigForJob(JOB1, getDestDir());
setJobStageConfig(stageConfig);
// set up the job.
new SetupJobStage(getJobStageConfig()).apply(false);
new SetupJobStage(stageConfig).apply(false);
LOG.info("Creating manifest files for {}", taskAttemptCount);
executeTaskAttempts(taskAttemptCount, filesPerTaskAttempt);
IOStatisticsSnapshot heapInfo = new IOStatisticsSnapshot();
heapinfo(heapInfo, "initial");
LOG.info("Loading in the manifests");
// Load in the manifests
LoadManifestsStage stage = new LoadManifestsStage(
getJobStageConfig());
stageConfig);
entryFile = File.createTempFile("entry", ".seq");
LoadManifestsStage.Arguments args = new LoadManifestsStage.Arguments(
entryFile, DEFAULT_WRITER_QUEUE_CAPACITY);
LoadManifestsStage.Result result = stage.apply(true);
LoadManifestsStage.SummaryInfo summary = result.getSummary();
List<TaskManifest> loadedManifests = result.getManifests();
LoadManifestsStage.Result loadManifestsResult = stage.apply(args);
LoadManifestsStage.SummaryInfo summary = loadManifestsResult.getSummary();
LOG.info("\nJob statistics after loading {}",
ioStatisticsToPrettyString(getStageStatistics()));
LOG.info("Heap size = {}", heapSize());
heapinfo(heapInfo, "load.manifests");
Assertions.assertThat(summary.getManifestCount())
.describedAs("Manifest count of %s", summary)
@ -112,19 +153,19 @@ public void testSaveThenLoadManyManifests() throws Throwable {
.describedAs("File Size of %s", summary)
.isEqualTo(getTotalDataSize());
// now that manifest list.
List<String> manifestTaskIds = loadedManifests.stream()
.map(TaskManifest::getTaskID)
.collect(Collectors.toList());
List<String> manifestTaskIds = summary.getTaskIDs();
Assertions.assertThat(getTaskIds())
.describedAs("Task IDs of all tasks")
.containsExactlyInAnyOrderElementsOf(manifestTaskIds);
// now let's see about aggregating a large set of directories
Set<Path> createdDirectories = new CreateOutputDirectoriesStage(
getJobStageConfig())
.apply(loadedManifests)
stageConfig)
.apply(loadManifestsResult.getLoadedManifestData().getDirectories())
.getCreatedDirectories();
heapinfo(heapInfo, "create.directories");
// but after the merge process, only one per generated file output
// dir exists
@ -134,8 +175,34 @@ public void testSaveThenLoadManyManifests() throws Throwable {
// and skipping the rename stage (which is going to fail),
// go straight to cleanup
new CleanupJobStage(getJobStageConfig()).apply(
new CleanupJobStage(stageConfig).apply(
new CleanupJobStage.Arguments("", true, true, false));
heapinfo(heapInfo, "cleanup");
ManifestSuccessData success = createManifestOutcome(stageConfig, OP_STAGE_JOB_COMMIT);
success.snapshotIOStatistics(getStageStatistics());
success.getIOStatistics().aggregate(heapInfo);
Configuration conf = getConfiguration();
enableManifestCommitter(conf);
String reportDir = conf.getTrimmed(OPT_SUMMARY_REPORT_DIR, "");
Path reportDirPath = new Path(reportDir);
Path path = new Path(reportDirPath,
createJobSummaryFilename("TestLoadManifestsStage"));
final FileSystem summaryFS = path.getFileSystem(conf);
success.save(summaryFS, path, true);
LOG.info("Saved summary to {}", path);
new ManifestPrinter().loadAndPrintManifest(summaryFS, path);
}
/**
* Force a GC then add heap info.
* @param stats stats to update
* @param stage stage name
*/
private static void heapinfo(final IOStatisticsSnapshot stats, final String stage) {
System.gc();
addHeapInformation(stats, stage);
}
}

View File

@ -29,7 +29,7 @@
import org.junit.Assume;
import org.junit.Test;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -39,6 +39,8 @@
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.TaskManifest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.EntryFileIO;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.LoadedManifestData;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.UnreliableManifestStoreOperations;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.RenameFilesStage;
@ -48,7 +50,9 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyFileContents;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.SUCCESS_MARKER_FILE_LIMIT;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_COMMIT_FILE_RENAME;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport.saveManifest;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestCommitterSupport.getEtag;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.UnreliableManifestStoreOperations.SIMULATED_FAILURE;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages.AbstractJobOrTaskStage.FAILED_TO_RENAME_PREFIX;
@ -82,6 +86,11 @@ public class TestRenameStageFailure extends AbstractManifestCommitterTest {
/** resilient commit expected? */
private boolean resilientCommit;
/**
* Entry file IO.
*/
private EntryFileIO entryFileIO;
protected boolean isResilientCommit() {
return resilientCommit;
}
@ -109,6 +118,7 @@ public void setup() throws Exception {
= new UnreliableManifestStoreOperations(wrappedOperations);
setStoreOperations(failures);
resilientCommit = wrappedOperations.storeSupportsResilientCommit();
entryFileIO = new EntryFileIO(getConfiguration());
}
/**
@ -232,9 +242,15 @@ public void testDeleteTargetPaths() throws Throwable {
LOG.info("Exception raised: {}", ex.toString());
}
final LoadedManifestData manifestData = saveManifest(entryFileIO, manifest);
// delete target paths and it works
new RenameFilesStage(stageConfig.withDeleteTargetPaths(true))
.apply(Pair.of(manifests, Collections.emptySet()));
try {
new RenameFilesStage(stageConfig.withDeleteTargetPaths(true))
.apply(Triple.of(manifestData, Collections.emptySet(), SUCCESS_MARKER_FILE_LIMIT));
} finally {
manifestData.getEntrySequenceFile().delete();
}
// and the new data made it over
verifyFileContents(fs, dest, sourceData);
@ -348,9 +364,15 @@ private <E extends Throwable> E expectRenameFailure(
IOStatisticsStore iostatistics = stage.getIOStatistics();
long failures0 = iostatistics.counters().get(RENAME_FAILURES);
final LoadedManifestData manifestData = saveManifest(entryFileIO, manifest);
// rename MUST raise an exception.
E ex = intercept(exceptionClass, errorText, () ->
stage.apply(Pair.of(manifests, Collections.emptySet())));
E ex;
try {
ex = intercept(exceptionClass, errorText, () ->
stage.apply(Triple.of(manifestData, Collections.emptySet(), SUCCESS_MARKER_FILE_LIMIT)));
} finally {
manifestData.getEntrySequenceFile().delete();
}
LOG.info("Statistics {}", ioStatisticsToPrettyString(iostatistics));
// the IOStatistics record the rename as a failure.

View File

@ -0,0 +1,382 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.AbstractManifestCommitterTest;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.FileEntry;
import org.apache.hadoop.util.functional.TaskPool;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.util.functional.RemoteIterators.foreach;
import static org.apache.hadoop.util.functional.RemoteIterators.rangeExcludingIterator;
/**
* Test {@link EntryFileIO}.
*/
public class TestEntryFileIO extends AbstractManifestCommitterTest {
private static final Logger LOG = LoggerFactory.getLogger(
TestEntryFileIO.class);
/**
* Entry to save.
*/
public static final FileEntry ENTRY = new FileEntry("source", "dest", 100, "etag");
/**
* Entry file instance.
*/
private EntryFileIO entryFileIO;
/**
* Path to a test entry file.
*/
private File entryFile;
/**
* Create an entry file during setup.
*/
@Before
public void setup() throws Exception {
entryFileIO = new EntryFileIO(new Configuration());
createEntryFile();
}
/**
* Teardown deletes any entry file.
* @throws Exception on any failure
*/
@After
public void teardown() throws Exception {
Thread.currentThread().setName("teardown");
if (getEntryFile() != null) {
getEntryFile().delete();
}
}
/**
* Create a temp entry file and set the entryFile field to it.
* @throws IOException creation failure
*/
private void createEntryFile() throws IOException {
setEntryFile(File.createTempFile("entry", ".seq"));
}
/**
* reference to any temp file created.
*/
private File getEntryFile() {
return entryFile;
}
private void setEntryFile(File entryFile) {
this.entryFile = entryFile;
}
/**
* Create a file with one entry, then read it back
* via all the mechanisms available.
*/
@Test
public void testCreateWriteReadFileOneEntry() throws Throwable {
final FileEntry source = ENTRY;
// do an explicit close to help isolate any failure.
SequenceFile.Writer writer = createWriter();
writer.append(NullWritable.get(), source);
writer.flush();
writer.close();
FileEntry readBack = new FileEntry();
try (SequenceFile.Reader reader = readEntryFile()) {
reader.next(NullWritable.get(), readBack);
}
Assertions.assertThat(readBack)
.describedAs("entry read back from sequence file")
.isEqualTo(source);
// now use the iterator to access it.
final RemoteIterator<FileEntry> it =
iterateOverEntryFile();
List<FileEntry> files = new ArrayList<>();
foreach(it, files::add);
Assertions.assertThat(files)
.describedAs("iteration over the entry file")
.hasSize(1)
.element(0)
.isEqualTo(source);
final EntryFileIO.EntryIterator et = (EntryFileIO.EntryIterator) it;
Assertions.assertThat(et)
.describedAs("entry iterator %s", et)
.matches(p -> p.isClosed())
.extracting(p -> p.getCount())
.isEqualTo(1);
}
/**
* Create a writer.
* @return a writer
* @throws IOException failure to create the file.
*/
private SequenceFile.Writer createWriter() throws IOException {
return entryFileIO.createWriter(getEntryFile());
}
/**
* Create an iterator over the records in the (non empty) entry file.
* @return an iterator over entries.
* @throws IOException failure to open the file
*/
private RemoteIterator<FileEntry> iterateOverEntryFile() throws IOException {
return entryFileIO.iterateOver(readEntryFile());
}
/**
* Create a reader for the (non empty) entry file.
* @return a reader.
* @throws IOException failure to open the file
*/
private SequenceFile.Reader readEntryFile() throws IOException {
assertEntryFileNonEmpty();
return entryFileIO.createReader(getEntryFile());
}
/**
* Create a file with one entry.
*/
@Test
public void testCreateEmptyFile() throws Throwable {
final File file = getEntryFile();
entryFileIO.createWriter(file).close();
// now use the iterator to access it.
List<FileEntry> files = new ArrayList<>();
Assertions.assertThat(foreach(iterateOverEntryFile(), files::add))
.describedAs("Count of iterations over entries in an entry file with no entries")
.isEqualTo(0);
}
private void assertEntryFileNonEmpty() {
Assertions.assertThat(getEntryFile().length())
.describedAs("Length of file %s", getEntryFile())
.isGreaterThan(0);
}
@Test
public void testCreateInvalidWriter() throws Throwable {
intercept(NullPointerException.class, () ->
entryFileIO.launchEntryWriter(null, 1));
}
@Test
public void testCreateInvalidWriterCapacity() throws Throwable {
intercept(IllegalStateException.class, () ->
entryFileIO.launchEntryWriter(null, 0));
}
/**
* Generate lots of data and write it.
*/
@Test
public void testLargeStreamingWrite() throws Throwable {
// list of 100 entries at a time
int listSize = 100;
// and the number of block writes
int writes = 100;
List<FileEntry> list = buildEntryList(listSize);
int total = listSize * writes;
try (EntryFileIO.EntryWriter out = entryFileIO.launchEntryWriter(createWriter(), 2)) {
Assertions.assertThat(out.isActive())
.describedAs("out.isActive in ()", out)
.isTrue();
for (int i = 0; i < writes; i++) {
Assertions.assertThat(out.enqueue(list))
.describedAs("enqueue of list")
.isTrue();
}
out.close();
out.maybeRaiseWriteException();
Assertions.assertThat(out.isActive())
.describedAs("out.isActive in ()", out)
.isFalse();
Assertions.assertThat(out.getCount())
.describedAs("total elements written")
.isEqualTo(total);
}
// now read it back
AtomicInteger count = new AtomicInteger();
foreach(iterateOverEntryFile(), e -> {
final int elt = count.getAndIncrement();
final int index = elt % listSize;
Assertions.assertThat(e)
.describedAs("element %d in file mapping to index %d", elt, index)
.isEqualTo(list.get(index));
});
Assertions.assertThat(count.get())
.describedAs("total elements read")
.isEqualTo(total);
}
/**
* Build an entry list.
* @param listSize size of the list
* @return a list of entries
*/
private static List<FileEntry> buildEntryList(final int listSize) {
List<FileEntry> list = new ArrayList<>(listSize);
for (int i = 0; i < listSize; i++) {
list.add(new FileEntry("source" + i, "dest" + i, i, "etag-" + i));
}
// just for debugging/regression testing
Assertions.assertThat(list).hasSize(listSize);
return list;
}
/**
* Write lists to the output, but the stream is going to fail after a
* configured number of records have been written.
* Verify that the (blocked) submitter is woken up
* and that the exception was preserved for rethrowing.
*/
@Test
public void testFailurePropagation() throws Throwable {
final int count = 4;
final SequenceFile.Writer writer = spyWithFailingAppend(
entryFileIO.createWriter(getEntryFile()), count);
// list of 100 entries at a time
// and the number of block writes
List<FileEntry> list = buildEntryList(1);
// small queue ensures the posting thread is blocked
try (EntryFileIO.EntryWriter out = entryFileIO.launchEntryWriter(writer, 2)) {
boolean valid = true;
for (int i = 0; valid && i < count * 2; i++) {
valid = out.enqueue(list);
}
LOG.info("queue to {} finished valid={}", out, valid);
out.close();
// verify the exception is as expected
intercept(IOException.class, "mocked", () ->
out.maybeRaiseWriteException());
// and verify the count of invocations.
Assertions.assertThat(out.getCount())
.describedAs("process count of %s", count)
.isEqualTo(count);
}
}
/**
* Spy on a writer with the append operation to fail after the given count of calls
* is reached.
* @param writer write.
* @param count number of allowed append calls.
* @return spied writer.
* @throws IOException from the signature of the append() call mocked.
*/
private static SequenceFile.Writer spyWithFailingAppend(final SequenceFile.Writer writer,
final int count)
throws IOException {
AtomicLong limit = new AtomicLong(count);
final SequenceFile.Writer spied = Mockito.spy(writer);
Mockito.doAnswer((InvocationOnMock invocation) -> {
final Writable k = invocation.getArgument(0);
final Writable v = invocation.getArgument(1);
if (limit.getAndDecrement() > 0) {
writer.append(k, v);
} else {
throw new IOException("mocked");
}
return null;
}).when(spied).append(Mockito.any(Writable.class), Mockito.any(Writable.class));
return spied;
}
/**
* Multithreaded writing.
*/
@Test
public void testParallelWrite() throws Throwable {
// list of 100 entries at a time
int listSize = 100;
// and the number of block writes
int attempts = 100;
List<FileEntry> list = buildEntryList(listSize);
int total = listSize * attempts;
try (EntryFileIO.EntryWriter out = entryFileIO.launchEntryWriter(createWriter(), 20)) {
TaskPool.foreach(rangeExcludingIterator(0, attempts))
.executeWith(getSubmitter())
.stopOnFailure()
.run(l -> {
out.enqueue(list);
});
out.close();
out.maybeRaiseWriteException();
Assertions.assertThat(out.getCount())
.describedAs("total elements written")
.isEqualTo(total);
}
// now read it back
Assertions.assertThat(foreach(iterateOverEntryFile(), e -> { }))
.describedAs("total elements read")
.isEqualTo(total);
}
}

View File

@ -37,7 +37,7 @@
import org.apache.hadoop.fs.s3a.statistics.StatisticTypeEnum;
import org.apache.hadoop.fs.s3a.statistics.impl.AbstractS3AStatisticsSource;
import org.apache.hadoop.fs.s3a.statistics.impl.CountingChangeTracker;
import org.apache.hadoop.fs.s3a.statistics.impl.ForwardingIOStatisticsStore;
import org.apache.hadoop.fs.statistics.impl.ForwardingIOStatisticsStore;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;

View File

@ -62,6 +62,11 @@ public class AbfsManifestStoreOperations extends
*/
private ResilientCommitByRename resilientCommitByRename;
/**
* Are etags preserved in renames?
*/
private boolean etagsPreserved;
@Override
public AzureBlobFileSystem getFileSystem() {
return (AzureBlobFileSystem) super.getFileSystem();
@ -83,15 +88,22 @@ public void bindToFileSystem(FileSystem filesystem, Path path) throws IOExceptio
super.bindToFileSystem(filesystem, path);
try {
resilientCommitByRename = getFileSystem().createResilientCommitSupport(path);
// this also means that etags are preserved.
etagsPreserved = true;
LOG.debug("Bonded to filesystem with resilient commits under path {}", path);
} catch (UnsupportedOperationException e) {
LOG.debug("No resilient commit support under path {}", path);
}
}
/**
* Etags are preserved through Gen2 stores, but not wasb stores.
* @param path path to probe.
* @return true if this store preserves etags.
*/
@Override
public boolean storePreservesEtagsThroughRenames(final Path path) {
return true;
return etagsPreserved;
}
/**

View File

@ -20,7 +20,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_SMALL_FILES_COMPLETELY;
import static org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_STORE_OPERATIONS_CLASS;
/**
@ -43,6 +45,15 @@ static Configuration prepareTestConfiguration(
// use ABFS Store operations
conf.set(OPT_STORE_OPERATIONS_CLASS,
AbfsManifestStoreOperations.NAME);
// turn on small file read if not explicitly set to a value.
conf.setBooleanIfUnset(AZURE_READ_SMALL_FILES_COMPLETELY, true);
// use a larger thread pool to compensate for latencies
final String size = Integer.toString(192);
conf.setIfUnset(ManifestCommitterConstants.OPT_IO_PROCESSORS, size);
conf.setIfUnset(ManifestCommitterConstants.OPT_WRITER_QUEUE_CAPACITY, size);
// no need for parallel delete here as we aren't at the scale where unified delete
// is going to time out
conf.setBooleanIfUnset(ManifestCommitterConstants.OPT_CLEANUP_PARALLEL_DELETE, false);
return conf;
}

View File

@ -19,9 +19,11 @@
package org.apache.hadoop.fs.azurebfs.commit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azure.integration.AzureTestConstants;
import org.apache.hadoop.fs.azurebfs.contract.ABFSContractTestBinding;
import org.apache.hadoop.fs.azurebfs.contract.AbfsFileSystemContract;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterTestSupport;
import org.apache.hadoop.mapreduce.lib.output.committer.manifest.TestLoadManifestsStage;
/**
@ -52,4 +54,16 @@ protected AbstractFSContract createContract(final Configuration conf) {
return new AbfsFileSystemContract(conf, binding.isSecureMode());
}
@Override
protected int getTestTimeoutMillis() {
return AzureTestConstants.SCALE_TEST_TIMEOUT_MILLIS;
}
/**
* @return a smaller number of TAs than the base test suite does.
*/
@Override
protected int numberOfTaskAttempts() {
return ManifestCommitterTestSupport.NUMBER_OF_TASK_ATTEMPTS_SMALL;
}
}