HADOOP-12502 SetReplication OutOfMemoryError. Contributed by Vinayakumar B.

This commit is contained in:
Aaron Fabbri 2018-04-11 17:19:56 -07:00
parent 18de6f2042
commit 0d898b7bb8
No known key found for this signature in database
GPG Key ID: B2EEFA9E78118A29
7 changed files with 161 additions and 12 deletions

View File

@ -677,7 +677,14 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
public FileStatus[] listStatus(Path f) throws IOException {
return fs.listStatus(f, DEFAULT_FILTER);
}
@Override
public RemoteIterator<FileStatus> listStatusIterator(final Path p)
throws IOException {
// Not-using fs#listStatusIterator() since it includes crc files as well
return new DirListingIterator<>(p);
}
/**
* List the statuses of the files/directories in the given path if the path is
* a directory.

View File

@ -2147,7 +2147,7 @@ public abstract class FileSystem extends Configured implements Closeable {
/**
* Generic iterator for implementing {@link #listStatusIterator(Path)}.
*/
private class DirListingIterator<T extends FileStatus> implements
protected class DirListingIterator<T extends FileStatus> implements
RemoteIterator<T> {
private final Path path;

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathNotFoundException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -325,20 +326,68 @@ abstract public class Command extends Configured {
*/
protected void processPaths(PathData parent, PathData ... items)
throws IOException {
// TODO: this really should be iterative
for (PathData item : items) {
try {
processPath(item);
if (recursive && isPathRecursable(item)) {
recursePath(item);
}
postProcessPath(item);
processPathInternal(item);
} catch (IOException e) {
displayError(e);
}
}
}
/**
* Iterates over the given expanded paths and invokes
* {@link #processPath(PathData)} on each element. If "recursive" is true,
* will do a post-visit DFS on directories.
* @param parent if called via a recurse, will be the parent dir, else null
* @param itemsIterator a iterator of {@link PathData} objects to process
* @throws IOException if anything goes wrong...
*/
protected void processPaths(PathData parent,
RemoteIterator<PathData> itemsIterator) throws IOException {
int groupSize = getListingGroupSize();
if (groupSize == 0) {
// No grouping of contents required.
while (itemsIterator.hasNext()) {
processPaths(parent, itemsIterator.next());
}
} else {
List<PathData> items = new ArrayList<PathData>(groupSize);
while (itemsIterator.hasNext()) {
items.add(itemsIterator.next());
if (!itemsIterator.hasNext() || items.size() == groupSize) {
processPaths(parent, items.toArray(new PathData[items.size()]));
items.clear();
}
}
}
}
private void processPathInternal(PathData item) throws IOException {
processPath(item);
if (recursive && isPathRecursable(item)) {
recursePath(item);
}
postProcessPath(item);
}
/**
* Whether the directory listing for a path should be sorted.?
* @return true/false.
*/
protected boolean isSorted() {
return false;
}
/**
* While using iterator method for listing for a path, whether to group items
* and process as array? If so what is the size of array?
* @return size of the grouping array.
*/
protected int getListingGroupSize() {
return 0;
}
/**
* Determines whether a {@link PathData} item is recursable. Default
* implementation is to recurse directories but can be overridden to recurse
@ -384,7 +433,13 @@ abstract public class Command extends Configured {
protected void recursePath(PathData item) throws IOException {
try {
depth++;
processPaths(item, item.getDirectoryContents());
if (isSorted()) {
// use the non-iterative method for listing because explicit sorting is
// required. Iterators not guaranteed to return sorted elements
processPaths(item, item.getDirectoryContents());
} else {
processPaths(item, item.getDirectoryContentsIterator());
}
} finally {
depth--;
}

View File

@ -142,6 +142,12 @@ class CopyCommands {
srcs.add(src);
}
}
@Override
protected boolean isSorted() {
//Sort the children for merge
return true;
}
}
static class Cp extends CommandWithDestination {

View File

@ -230,8 +230,30 @@ class Ls extends FsCommand {
}
@Override
protected void processPaths(PathData parent, PathData ... items)
throws IOException {
protected boolean isSorted() {
// use the non-iterative method for listing because explicit sorting is
// required based on time/size/reverse or Total number of entries
// required to print summary first when non-recursive.
return !isRecursive() || isOrderTime() || isOrderSize() || isOrderReverse();
}
@Override
protected int getListingGroupSize() {
if (pathOnly) {
// If there is a need of printing only paths, then no grouping required
return 0;
}
/*
* LS output should be formatted properly. Grouping 100 items and formatting
* the output to reduce the creation of huge sized arrays. This method will
* be called only when recursive is set.
*/
return 100;
}
@Override
protected void processPaths(PathData parent, PathData... items)
throws IOException {
if (parent != null && !isRecursive() && items.length != 0) {
if (!pathOnly) {
out.println("Found " + items.length + " items");

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.PathIsDirectoryException;
import org.apache.hadoop.fs.PathIsNotDirectoryException;
import org.apache.hadoop.fs.PathNotFoundException;
import org.apache.hadoop.fs.RemoteIterator;
/**
* Encapsulates a Path (path), its FileStatus (stat), and its FileSystem (fs).
@ -276,6 +277,32 @@ public class PathData implements Comparable<PathData> {
return items;
}
/**
* Returns a RemoteIterator for PathData objects of the items contained in the
* given directory.
* @return remote iterator of PathData objects for its children
* @throws IOException if anything else goes wrong...
*/
public RemoteIterator<PathData> getDirectoryContentsIterator()
throws IOException {
checkIfExists(FileTypeRequirement.SHOULD_BE_DIRECTORY);
final RemoteIterator<FileStatus> stats = this.fs.listStatusIterator(path);
return new RemoteIterator<PathData>() {
@Override
public boolean hasNext() throws IOException {
return stats.hasNext();
}
@Override
public PathData next() throws IOException {
FileStatus file = stats.next();
String child = getStringForChildPath(file.getPath());
return new PathData(fs, child, file);
}
};
}
/**
* Creates a new object for a child entry in this directory
* @param child the basename will be appended to this object's path

View File

@ -19,18 +19,19 @@ package org.apache.hadoop.fs.shell.find;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import static org.mockito.Matchers.*;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.NoSuchElementException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.shell.PathData;
import org.apache.hadoop.fs.shell.find.BaseExpression;
import org.apache.hadoop.fs.shell.find.Expression;
@ -42,6 +43,9 @@ import org.junit.Rule;
import org.junit.rules.Timeout;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestFind {
@ -861,6 +865,34 @@ public class TestFind {
when(mockFs.listStatus(eq(item5c.path))).thenReturn(
new FileStatus[] { item5ca.stat });
when(mockFs.listStatusIterator(Mockito.any(Path.class)))
.thenAnswer(new Answer<RemoteIterator<FileStatus>>() {
@Override
public RemoteIterator<FileStatus> answer(InvocationOnMock invocation)
throws Throwable {
final Path p = (Path) invocation.getArguments()[0];
final FileStatus[] stats = mockFs.listStatus(p);
return new RemoteIterator<FileStatus>() {
private int i = 0;
@Override
public boolean hasNext() throws IOException {
return i < stats.length;
}
@Override
public FileStatus next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException("No more entry in " + p);
}
return stats[i++];
}
};
}
});
when(item1.stat.isSymlink()).thenReturn(false);
when(item1a.stat.isSymlink()).thenReturn(false);
when(item1aa.stat.isSymlink()).thenReturn(false);