HADOOP-17620. DistCp: Use Iterator for listing target directory as well. (#2861). Contributed by Ayush Saxena.

Signed-off-by: Vinayakumar B <vinayakumarb@apache.org>
This commit is contained in:
Ayush Saxena 2021-04-23 22:48:15 +05:30
parent 842714d99f
commit b743d56eb4
3 changed files with 17 additions and 6 deletions

View File

@ -65,7 +65,8 @@ import static org.apache.hadoop.tools.DistCpConstants
* Note: The SimpleCopyListing doesn't handle wild-cards in the input-paths. * Note: The SimpleCopyListing doesn't handle wild-cards in the input-paths.
*/ */
public class SimpleCopyListing extends CopyListing { public class SimpleCopyListing extends CopyListing {
private static final Logger LOG = LoggerFactory.getLogger(SimpleCopyListing.class); public static final Logger LOG =
LoggerFactory.getLogger(SimpleCopyListing.class);
public static final int DEFAULT_FILE_STATUS_SIZE = 1000; public static final int DEFAULT_FILE_STATUS_SIZE = 1000;
public static final boolean DEFAULT_RANDOMIZE_FILE_LISTING = true; public static final boolean DEFAULT_RANDOMIZE_FILE_LISTING = true;
@ -601,7 +602,7 @@ public class SimpleCopyListing extends CopyListing {
} }
private void printStats() { private void printStats() {
LOG.info("Paths (files+dirs) cnt = {}; dirCnt = ", totalPaths, totalDirs); LOG.info("Paths (files+dirs) cnt = {}; dirCnt = {}", totalPaths, totalDirs);
} }
private void maybePrintStats() { private void maybePrintStats() {

View File

@ -564,12 +564,15 @@ public class CopyCommitter extends FileOutputCommitter {
// thread count is picked up from the job // thread count is picked up from the job
int threads = conf.getInt(DistCpConstants.CONF_LABEL_LISTSTATUS_THREADS, int threads = conf.getInt(DistCpConstants.CONF_LABEL_LISTSTATUS_THREADS,
DistCpConstants.DEFAULT_LISTSTATUS_THREADS); DistCpConstants.DEFAULT_LISTSTATUS_THREADS);
boolean useIterator =
conf.getBoolean(DistCpConstants.CONF_LABEL_USE_ITERATOR, false);
LOG.info("Scanning destination directory {} with thread count: {}", LOG.info("Scanning destination directory {} with thread count: {}",
targetFinalPath, threads); targetFinalPath, threads);
DistCpOptions options = new DistCpOptions.Builder(targets, resultNonePath) DistCpOptions options = new DistCpOptions.Builder(targets, resultNonePath)
.withOverwrite(overwrite) .withOverwrite(overwrite)
.withSyncFolder(syncFolder) .withSyncFolder(syncFolder)
.withNumListstatusThreads(threads) .withNumListstatusThreads(threads)
.withUseIterator(useIterator)
.build(); .build();
DistCpContext distCpContext = new DistCpContext(options); DistCpContext distCpContext = new DistCpContext(options);
distCpContext.setTargetPathExists(targetPathExists); distCpContext.setTargetPathExists(targetPathExists);

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCp; import org.apache.hadoop.tools.DistCp;
import org.apache.hadoop.tools.DistCpConstants; import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpOptions; import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.SimpleCopyListing;
import org.apache.hadoop.tools.mapred.CopyMapper; import org.apache.hadoop.tools.mapred.CopyMapper;
import org.apache.hadoop.tools.util.DistCpTestUtils; import org.apache.hadoop.tools.util.DistCpTestUtils;
import org.apache.hadoop.util.functional.RemoteIterators; import org.apache.hadoop.util.functional.RemoteIterators;
@ -628,11 +629,17 @@ public abstract class AbstractContractDistCpTest
GenericTestUtils GenericTestUtils
.createFiles(remoteFS, source, getDepth(), getWidth(), getWidth()); .createFiles(remoteFS, source, getDepth(), getWidth(), getWidth());
DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(), GenericTestUtils.LogCapturer log =
dest.toString(), "-useiterator", conf); GenericTestUtils.LogCapturer.captureLogs(SimpleCopyListing.LOG);
Assertions DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(),
.assertThat(RemoteIterators.toList(localFS.listFiles(dest, true))) dest.toString(), "-useiterator -update -delete", conf);
// Check the target listing was also done using iterator.
Assertions.assertThat(log.getOutput()).contains(
"Building listing using iterator mode for " + dest.toString());
Assertions.assertThat(RemoteIterators.toList(localFS.listFiles(dest, true)))
.describedAs("files").hasSize(getTotalFiles()); .describedAs("files").hasSize(getTotalFiles());
} }