From 0d898b7bb8d8d616133236da979a4316be4c1a6f Mon Sep 17 00:00:00 2001 From: Aaron Fabbri Date: Wed, 11 Apr 2018 17:19:56 -0700 Subject: [PATCH] HADOOP-12502 SetReplication OutOfMemoryError. Contributed by Vinayakumar B. --- .../apache/hadoop/fs/ChecksumFileSystem.java | 9 ++- .../java/org/apache/hadoop/fs/FileSystem.java | 2 +- .../org/apache/hadoop/fs/shell/Command.java | 69 +++++++++++++++++-- .../apache/hadoop/fs/shell/CopyCommands.java | 6 ++ .../java/org/apache/hadoop/fs/shell/Ls.java | 26 ++++++- .../org/apache/hadoop/fs/shell/PathData.java | 27 ++++++++ .../apache/hadoop/fs/shell/find/TestFind.java | 34 ++++++++- 7 files changed, 161 insertions(+), 12 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index 14c190553af..663c9101d09 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -677,7 +677,14 @@ public abstract class ChecksumFileSystem extends FilterFileSystem { public FileStatus[] listStatus(Path f) throws IOException { return fs.listStatus(f, DEFAULT_FILTER); } - + + @Override + public RemoteIterator listStatusIterator(final Path p) + throws IOException { + // Not-using fs#listStatusIterator() since it includes crc files as well + return new DirListingIterator<>(p); + } + /** * List the statuses of the files/directories in the given path if the path is * a directory. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index facfe03ae61..707b921bf9f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -2147,7 +2147,7 @@ public abstract class FileSystem extends Configured implements Closeable { /** * Generic iterator for implementing {@link #listStatusIterator(Path)}. */ - private class DirListingIterator implements + protected class DirListingIterator implements RemoteIterator { private final Path path; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Command.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Command.java index c292cf6a9dd..a4746cf76cc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Command.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Command.java @@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathNotFoundException; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -325,20 +326,68 @@ abstract public class Command extends Configured { */ protected void processPaths(PathData parent, PathData ... items) throws IOException { - // TODO: this really should be iterative for (PathData item : items) { try { - processPath(item); - if (recursive && isPathRecursable(item)) { - recursePath(item); - } - postProcessPath(item); + processPathInternal(item); } catch (IOException e) { displayError(e); } } } + /** + * Iterates over the given expanded paths and invokes + * {@link #processPath(PathData)} on each element. If "recursive" is true, + * will do a post-visit DFS on directories. + * @param parent if called via a recurse, will be the parent dir, else null + * @param itemsIterator a iterator of {@link PathData} objects to process + * @throws IOException if anything goes wrong... + */ + protected void processPaths(PathData parent, + RemoteIterator itemsIterator) throws IOException { + int groupSize = getListingGroupSize(); + if (groupSize == 0) { + // No grouping of contents required. + while (itemsIterator.hasNext()) { + processPaths(parent, itemsIterator.next()); + } + } else { + List items = new ArrayList(groupSize); + while (itemsIterator.hasNext()) { + items.add(itemsIterator.next()); + if (!itemsIterator.hasNext() || items.size() == groupSize) { + processPaths(parent, items.toArray(new PathData[items.size()])); + items.clear(); + } + } + } + } + + private void processPathInternal(PathData item) throws IOException { + processPath(item); + if (recursive && isPathRecursable(item)) { + recursePath(item); + } + postProcessPath(item); + } + + /** + * Whether the directory listing for a path should be sorted.? + * @return true/false. + */ + protected boolean isSorted() { + return false; + } + + /** + * While using iterator method for listing for a path, whether to group items + * and process as array? If so what is the size of array? + * @return size of the grouping array. + */ + protected int getListingGroupSize() { + return 0; + } + /** * Determines whether a {@link PathData} item is recursable. Default * implementation is to recurse directories but can be overridden to recurse @@ -384,7 +433,13 @@ abstract public class Command extends Configured { protected void recursePath(PathData item) throws IOException { try { depth++; - processPaths(item, item.getDirectoryContents()); + if (isSorted()) { + // use the non-iterative method for listing because explicit sorting is + // required. Iterators not guaranteed to return sorted elements + processPaths(item, item.getDirectoryContents()); + } else { + processPaths(item, item.getDirectoryContentsIterator()); + } } finally { depth--; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java index 11cb3d6c295..da7a2b2268d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java @@ -142,6 +142,12 @@ class CopyCommands { srcs.add(src); } } + + @Override + protected boolean isSorted() { + //Sort the children for merge + return true; + } } static class Cp extends CommandWithDestination { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Ls.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Ls.java index a2d5017f6fe..56504e4108d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Ls.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Ls.java @@ -230,8 +230,30 @@ class Ls extends FsCommand { } @Override - protected void processPaths(PathData parent, PathData ... items) - throws IOException { + protected boolean isSorted() { + // use the non-iterative method for listing because explicit sorting is + // required based on time/size/reverse or Total number of entries + // required to print summary first when non-recursive. + return !isRecursive() || isOrderTime() || isOrderSize() || isOrderReverse(); + } + + @Override + protected int getListingGroupSize() { + if (pathOnly) { + // If there is a need of printing only paths, then no grouping required + return 0; + } + /* + * LS output should be formatted properly. Grouping 100 items and formatting + * the output to reduce the creation of huge sized arrays. This method will + * be called only when recursive is set. + */ + return 100; + } + + @Override + protected void processPaths(PathData parent, PathData... items) + throws IOException { if (parent != null && !isRecursive() && items.length != 0) { if (!pathOnly) { out.println("Found " + items.length + " items"); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java index 3cf3273fa20..adf17df2db8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.PathIOException; import org.apache.hadoop.fs.PathIsDirectoryException; import org.apache.hadoop.fs.PathIsNotDirectoryException; import org.apache.hadoop.fs.PathNotFoundException; +import org.apache.hadoop.fs.RemoteIterator; /** * Encapsulates a Path (path), its FileStatus (stat), and its FileSystem (fs). @@ -276,6 +277,32 @@ public class PathData implements Comparable { return items; } + /** + * Returns a RemoteIterator for PathData objects of the items contained in the + * given directory. + * @return remote iterator of PathData objects for its children + * @throws IOException if anything else goes wrong... + */ + public RemoteIterator getDirectoryContentsIterator() + throws IOException { + checkIfExists(FileTypeRequirement.SHOULD_BE_DIRECTORY); + final RemoteIterator stats = this.fs.listStatusIterator(path); + return new RemoteIterator() { + + @Override + public boolean hasNext() throws IOException { + return stats.hasNext(); + } + + @Override + public PathData next() throws IOException { + FileStatus file = stats.next(); + String child = getStringForChildPath(file.getPath()); + return new PathData(fs, child, file); + } + }; + } + /** * Creates a new object for a child entry in this directory * @param child the basename will be appended to this object's path diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/find/TestFind.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/find/TestFind.java index 716230aa4c4..de0e512618b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/find/TestFind.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/find/TestFind.java @@ -19,18 +19,19 @@ package org.apache.hadoop.fs.shell.find; import static org.junit.Assert.*; import static org.mockito.Mockito.*; -import static org.mockito.Matchers.*; import java.io.IOException; import java.io.PrintStream; import java.util.Arrays; import java.util.Collections; import java.util.LinkedList; +import java.util.NoSuchElementException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.shell.PathData; import org.apache.hadoop.fs.shell.find.BaseExpression; import org.apache.hadoop.fs.shell.find.Expression; @@ -42,6 +43,9 @@ import org.junit.Rule; import org.junit.rules.Timeout; import org.junit.Test; import org.mockito.InOrder; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestFind { @@ -861,6 +865,34 @@ public class TestFind { when(mockFs.listStatus(eq(item5c.path))).thenReturn( new FileStatus[] { item5ca.stat }); + when(mockFs.listStatusIterator(Mockito.any(Path.class))) + .thenAnswer(new Answer>() { + + @Override + public RemoteIterator answer(InvocationOnMock invocation) + throws Throwable { + final Path p = (Path) invocation.getArguments()[0]; + final FileStatus[] stats = mockFs.listStatus(p); + + return new RemoteIterator() { + private int i = 0; + + @Override + public boolean hasNext() throws IOException { + return i < stats.length; + } + + @Override + public FileStatus next() throws IOException { + if (!hasNext()) { + throw new NoSuchElementException("No more entry in " + p); + } + return stats[i++]; + } + }; + } + }); + when(item1.stat.isSymlink()).thenReturn(false); when(item1a.stat.isSymlink()).thenReturn(false); when(item1aa.stat.isSymlink()).thenReturn(false);