HADOOP-11827. Speed-up distcp buildListing() using threadpool (Zoran Dimitrijevic via raviprak)

This commit is contained in:
Ravi Prakash 2015-04-21 16:43:02 -07:00
parent 2c14690368
commit cfba355052
14 changed files with 728 additions and 30 deletions

View File

@ -499,6 +499,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-11819. HttpServerFunctionalTest#prepareTestWebapp should create web HADOOP-11819. HttpServerFunctionalTest#prepareTestWebapp should create web
app directory if it does not exist. (Rohith via vinayakumarb) app directory if it does not exist. (Rohith via vinayakumarb)
HADOOP-11827. Speed-up distcp buildListing() using threadpool
(Zoran Dimitrijevic via raviprak)
OPTIMIZATIONS OPTIMIZATIONS
HADOOP-11785. Reduce the number of listStatus operation in distcp HADOOP-11785. Reduce the number of listStatus operation in distcp

View File

@ -23,6 +23,9 @@
*/ */
public class DistCpConstants { public class DistCpConstants {
/* Default number of threads to use for building file listing */
public static final int DEFAULT_LISTSTATUS_THREADS = 1;
/* Default number of maps to use for DistCp */ /* Default number of maps to use for DistCp */
public static final int DEFAULT_MAPS = 20; public static final int DEFAULT_MAPS = 20;
@ -47,6 +50,7 @@ public class DistCpConstants {
public static final String CONF_LABEL_SYNC_FOLDERS = "distcp.sync.folders"; public static final String CONF_LABEL_SYNC_FOLDERS = "distcp.sync.folders";
public static final String CONF_LABEL_DELETE_MISSING = "distcp.delete.missing.source"; public static final String CONF_LABEL_DELETE_MISSING = "distcp.delete.missing.source";
public static final String CONF_LABEL_SSL_CONF = "distcp.keystore.resource"; public static final String CONF_LABEL_SSL_CONF = "distcp.keystore.resource";
public static final String CONF_LABEL_LISTSTATUS_THREADS = "distcp.liststatus.threads";
public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps"; public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps";
public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing"; public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing";
public static final String CONF_LABEL_COPY_STRATEGY = "distcp.copy.strategy"; public static final String CONF_LABEL_COPY_STRATEGY = "distcp.copy.strategy";

View File

@ -83,7 +83,14 @@ public enum DistCpOptionSwitch {
SSL_CONF(DistCpConstants.CONF_LABEL_SSL_CONF, SSL_CONF(DistCpConstants.CONF_LABEL_SSL_CONF,
new Option("mapredSslConf", true, "Configuration for ssl config file" + new Option("mapredSslConf", true, "Configuration for ssl config file" +
", to use with hftps://")), ", to use with hftps://")),
/**
* Number of threads for building source file listing (before map-reduce
* phase, max one listStatus per thread at a time).
*/
NUM_LISTSTATUS_THREADS(DistCpConstants.CONF_LABEL_LISTSTATUS_THREADS,
new Option("numListstatusThreads", true, "Number of threads to " +
"use for building file listing (max " +
DistCpOptions.maxNumListstatusThreads + ").")),
/** /**
* Max number of maps to use during copy. DistCp will split work * Max number of maps to use during copy. DistCp will split work
* as equally as possible among these maps * as equally as possible among these maps

View File

@ -44,6 +44,8 @@ public class DistCpOptions {
private boolean blocking = true; private boolean blocking = true;
private boolean useDiff = false; private boolean useDiff = false;
public static final int maxNumListstatusThreads = 40;
private int numListstatusThreads = 0; // Indicates that flag is not set.
private int maxMaps = DistCpConstants.DEFAULT_MAPS; private int maxMaps = DistCpConstants.DEFAULT_MAPS;
private int mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB; private int mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB;
@ -124,6 +126,7 @@ public DistCpOptions(DistCpOptions that) {
this.overwrite = that.overwrite; this.overwrite = that.overwrite;
this.skipCRC = that.skipCRC; this.skipCRC = that.skipCRC;
this.blocking = that.blocking; this.blocking = that.blocking;
this.numListstatusThreads = that.numListstatusThreads;
this.maxMaps = that.maxMaps; this.maxMaps = that.maxMaps;
this.mapBandwidth = that.mapBandwidth; this.mapBandwidth = that.mapBandwidth;
this.sslConfigurationFile = that.getSslConfigurationFile(); this.sslConfigurationFile = that.getSslConfigurationFile();
@ -312,6 +315,30 @@ public void setSkipCRC(boolean skipCRC) {
this.skipCRC = skipCRC; this.skipCRC = skipCRC;
} }
/** Get the number of threads to use for listStatus
*
* @return Number of threads to do listStatus
*/
public int getNumListstatusThreads() {
return numListstatusThreads;
}
/** Set the number of threads to use for listStatus. We allow max 40
* threads. Setting numThreads to zero signify we should use the value
* from conf properties.
*
* @param numThreads - Number of threads
*/
public void setNumListstatusThreads(int numThreads) {
if (numThreads > maxNumListstatusThreads) {
this.numListstatusThreads = maxNumListstatusThreads;
} else if (numThreads > 0) {
this.numListstatusThreads = numThreads;
} else {
this.numListstatusThreads = 0;
}
}
/** Get the max number of maps to use for this copy /** Get the max number of maps to use for this copy
* *
* @return Max number of maps * @return Max number of maps

View File

@ -185,6 +185,18 @@ public static DistCpOptions parse(String args[]) throws IllegalArgumentException
getOptionValue(DistCpOptionSwitch.SSL_CONF.getSwitch())); getOptionValue(DistCpOptionSwitch.SSL_CONF.getSwitch()));
} }
if (command.hasOption(DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch())) {
try {
Integer numThreads = Integer.parseInt(getVal(command,
DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()).trim());
option.setNumListstatusThreads(numThreads);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"Number of liststatus threads is invalid: " + getVal(command,
DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()), e);
}
}
if (command.hasOption(DistCpOptionSwitch.MAX_MAPS.getSwitch())) { if (command.hasOption(DistCpOptionSwitch.MAX_MAPS.getSwitch())) {
try { try {
Integer maps = Integer.parseInt( Integer maps = Integer.parseInt(

View File

@ -29,13 +29,17 @@
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute; import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
import org.apache.hadoop.tools.util.DistCpUtils; import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.tools.util.ProducerConsumer;
import org.apache.hadoop.tools.util.WorkReport;
import org.apache.hadoop.tools.util.WorkRequest;
import org.apache.hadoop.tools.util.WorkRequestProcessor;
import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import java.io.*; import java.io.*;
import java.util.Stack; import java.util.ArrayList;
import static org.apache.hadoop.tools.DistCpConstants import static org.apache.hadoop.tools.DistCpConstants
.HDFS_RESERVED_RAW_DIRECTORY_NAME; .HDFS_RESERVED_RAW_DIRECTORY_NAME;
@ -50,7 +54,10 @@ public class SimpleCopyListing extends CopyListing {
private static final Log LOG = LogFactory.getLog(SimpleCopyListing.class); private static final Log LOG = LogFactory.getLog(SimpleCopyListing.class);
private long totalPaths = 0; private long totalPaths = 0;
private long totalDirs = 0;
private long totalBytesToCopy = 0; private long totalBytesToCopy = 0;
private int numListstatusThreads = 1;
private final int maxRetries = 3;
/** /**
* Protected constructor, to initialize configuration. * Protected constructor, to initialize configuration.
@ -61,6 +68,16 @@ public class SimpleCopyListing extends CopyListing {
*/ */
protected SimpleCopyListing(Configuration configuration, Credentials credentials) { protected SimpleCopyListing(Configuration configuration, Credentials credentials) {
super(configuration, credentials); super(configuration, credentials);
numListstatusThreads = getConf().getInt(
DistCpConstants.CONF_LABEL_LISTSTATUS_THREADS,
DistCpConstants.DEFAULT_LISTSTATUS_THREADS);
}
@VisibleForTesting
protected SimpleCopyListing(Configuration configuration, Credentials credentials,
int numListstatusThreads) {
super(configuration, credentials);
this.numListstatusThreads = numListstatusThreads;
} }
@Override @Override
@ -160,6 +177,10 @@ public void doBuildListing(Path pathToListingFile, DistCpOptions options) throws
@VisibleForTesting @VisibleForTesting
public void doBuildListing(SequenceFile.Writer fileListWriter, public void doBuildListing(SequenceFile.Writer fileListWriter,
DistCpOptions options) throws IOException { DistCpOptions options) throws IOException {
if (options.getNumListstatusThreads() > 0) {
numListstatusThreads = options.getNumListstatusThreads();
}
try { try {
for (Path path: options.getSourcePaths()) { for (Path path: options.getSourcePaths()) {
FileSystem sourceFS = path.getFileSystem(getConf()); FileSystem sourceFS = path.getFileSystem(getConf());
@ -181,6 +202,7 @@ public void doBuildListing(SequenceFile.Writer fileListWriter,
sourcePathRoot, options); sourcePathRoot, options);
} }
if (explore) { if (explore) {
ArrayList<FileStatus> sourceDirs = new ArrayList<FileStatus>();
for (FileStatus sourceStatus: sourceFiles) { for (FileStatus sourceStatus: sourceFiles) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Recording source-path: " + sourceStatus.getPath() + " for copy."); LOG.debug("Recording source-path: " + sourceStatus.getPath() + " for copy.");
@ -195,15 +217,18 @@ public void doBuildListing(SequenceFile.Writer fileListWriter,
if (sourceStatus.isDirectory()) { if (sourceStatus.isDirectory()) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Traversing source dir: " + sourceStatus.getPath()); LOG.debug("Adding source dir for traverse: " + sourceStatus.getPath());
} }
traverseDirectory(fileListWriter, sourceFS, sourceStatus, sourceDirs.add(sourceStatus);
sourcePathRoot, options);
} }
} }
traverseDirectory(fileListWriter, sourceFS, sourceDirs,
sourcePathRoot, options);
} }
} }
fileListWriter.close(); fileListWriter.close();
printStats();
LOG.info("Build file listing completed.");
fileListWriter = null; fileListWriter = null;
} finally { } finally {
IOUtils.cleanup(LOG, fileListWriter); IOUtils.cleanup(LOG, fileListWriter);
@ -275,43 +300,135 @@ private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException {
SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE)); SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE));
} }
private static FileStatus[] getChildren(FileSystem fileSystem, /*
FileStatus parent) throws IOException { * Private class to implement WorkRequestProcessor interface. It processes
return fileSystem.listStatus(parent.getPath()); * each directory (represented by FileStatus item) and returns a list of all
* file-system objects in that directory (files and directories). In case of
* retriable exceptions it increments retry counter and returns the same
* directory for later retry.
*/
private static class FileStatusProcessor
implements WorkRequestProcessor<FileStatus, FileStatus[]> {
private FileSystem fileSystem;
public FileStatusProcessor(FileSystem fileSystem) {
this.fileSystem = fileSystem;
}
/*
* Processor for FileSystem.listStatus().
*
* @param workRequest Input work item that contains FileStatus item which
* is a parent directory we want to list.
* @return Outputs WorkReport<FileStatus[]> with a list of objects in the
* directory (array of objects, empty if parent directory is
* empty). In case of intermittent exception we increment retry
* counter and return the list containing the parent directory).
*/
public WorkReport<FileStatus[]> processItem(
WorkRequest<FileStatus> workRequest) {
FileStatus parent = workRequest.getItem();
int retry = workRequest.getRetry();
WorkReport<FileStatus[]> result = null;
try {
if (retry > 0) {
int sleepSeconds = 2;
for (int i = 1; i < retry; i++) {
sleepSeconds *= 2;
}
try {
Thread.sleep(1000 * sleepSeconds);
} catch (InterruptedException ie) {
LOG.debug("Interrupted while sleeping in exponential backoff.");
}
}
result = new WorkReport<FileStatus[]>(
fileSystem.listStatus(parent.getPath()), 0, true);
} catch (FileNotFoundException fnf) {
LOG.error("FileNotFoundException exception in listStatus: " +
fnf.getMessage());
result = new WorkReport<FileStatus[]>(new FileStatus[0], 0, true, fnf);
} catch (Exception e) {
LOG.error("Exception in listStatus. Will send for retry.");
FileStatus[] parentList = new FileStatus[1];
parentList[0] = parent;
result = new WorkReport<FileStatus[]>(parentList, retry + 1, false, e);
}
return result;
}
}
private void printStats() {
LOG.info("Paths (files+dirs) cnt = " + totalPaths +
"; dirCnt = " + totalDirs);
}
private void maybePrintStats() {
if (totalPaths % 100000 == 0) {
printStats();
}
} }
private void traverseDirectory(SequenceFile.Writer fileListWriter, private void traverseDirectory(SequenceFile.Writer fileListWriter,
FileSystem sourceFS, FileSystem sourceFS,
FileStatus sourceStatus, ArrayList<FileStatus> sourceDirs,
Path sourcePathRoot, Path sourcePathRoot,
DistCpOptions options) DistCpOptions options)
throws IOException { throws IOException {
final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL); final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR); final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
final boolean preserveRawXattrs = options.shouldPreserveRawXattrs(); final boolean preserveRawXattrs = options.shouldPreserveRawXattrs();
Stack<FileStatus> pathStack = new Stack<FileStatus>();
pathStack.push(sourceStatus);
while (!pathStack.isEmpty()) { assert numListstatusThreads > 0;
for (FileStatus child: getChildren(sourceFS, pathStack.pop())) { LOG.debug("Starting thread pool of " + numListstatusThreads +
if (LOG.isDebugEnabled()) { " listStatus workers.");
LOG.debug("Recording source-path: " + child.getPath() + " for copy."); ProducerConsumer<FileStatus, FileStatus[]> workers =
} new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
CopyListingFileStatus childCopyListingStatus = for (int i = 0; i < numListstatusThreads; i++) {
DistCpUtils.toCopyListingFileStatus(sourceFS, child, workers.addWorker(
preserveAcls && child.isDirectory(), new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf())));
preserveXAttrs && child.isDirectory(), }
preserveRawXattrs && child.isDirectory());
writeToFileListing(fileListWriter, childCopyListingStatus, for (FileStatus status : sourceDirs) {
sourcePathRoot, options); workers.put(new WorkRequest<FileStatus>(status, 0));
if (child.isDirectory()) { maybePrintStats();
}
while (workers.hasWork()) {
try {
WorkReport<FileStatus[]> workResult = workers.take();
int retry = workResult.getRetry();
for (FileStatus child: workResult.getItem()) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Traversing into source dir: " + child.getPath()); LOG.debug("Recording source-path: " + child.getPath() + " for copy.");
}
if (retry == 0) {
CopyListingFileStatus childCopyListingStatus =
DistCpUtils.toCopyListingFileStatus(sourceFS, child,
preserveAcls && child.isDirectory(),
preserveXAttrs && child.isDirectory(),
preserveRawXattrs && child.isDirectory());
writeToFileListing(fileListWriter, childCopyListingStatus,
sourcePathRoot, options);
}
if (retry < maxRetries) {
if (child.isDirectory()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Traversing into source dir: " + child.getPath());
}
workers.put(new WorkRequest<FileStatus>(child, retry));
maybePrintStats();
}
} else {
LOG.error("Giving up on " + child.getPath() +
" after " + retry + " retries.");
} }
pathStack.push(child);
} }
} catch (InterruptedException ie) {
LOG.error("Could not get item from childQueue. Retrying...");
} }
} }
workers.shutdown();
} }
private void writeToFileListingRoot(SequenceFile.Writer fileListWriter, private void writeToFileListingRoot(SequenceFile.Writer fileListWriter,
@ -351,6 +468,8 @@ private void writeToFileListing(SequenceFile.Writer fileListWriter,
if (!fileStatus.isDirectory()) { if (!fileStatus.isDirectory()) {
totalBytesToCopy += fileStatus.getLen(); totalBytesToCopy += fileStatus.getLen();
} else {
totalDirs++;
} }
totalPaths++; totalPaths++;
} }

View File

@ -0,0 +1,177 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.util;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.tools.util.WorkReport;
import org.apache.hadoop.tools.util.WorkRequest;
import org.apache.hadoop.tools.util.WorkRequestProcessor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
/**
* ProducerConsumer class encapsulates input and output queues and a
* thread-pool of Workers that loop on WorkRequest<T> inputQueue and for each
* consumed WorkRequest Workers invoke WorkRequestProcessor.processItem()
* and output resulting WorkReport<R> to the outputQueue.
*/
public class ProducerConsumer<T, R> {
private Log LOG = LogFactory.getLog(ProducerConsumer.class);
private LinkedBlockingQueue<WorkRequest<T>> inputQueue;
private LinkedBlockingQueue<WorkReport<R>> outputQueue;
private ExecutorService executor;
private AtomicInteger workCnt;
/**
* ProducerConsumer maintains input and output queues and a thread-pool of
* workers.
*
* @param numThreads Size of thread-pool to execute Workers.
*/
public ProducerConsumer(int numThreads) {
this.inputQueue = new LinkedBlockingQueue<WorkRequest<T>>();
this.outputQueue = new LinkedBlockingQueue<WorkReport<R>>();
executor = Executors.newFixedThreadPool(numThreads);
workCnt = new AtomicInteger(0);
}
/**
* Add another worker that will consume WorkRequest<T> items from input
* queue, process each item using supplied processor, and for every
* processed item output WorkReport<R> to output queue.
*
* @param processor Processor implementing WorkRequestProcessor interface.
*
*/
public void addWorker(WorkRequestProcessor<T, R> processor) {
executor.execute(new Worker(processor));
}
/**
* Shutdown ProducerConsumer worker thread-pool without waiting for
* completion of any pending work.
*/
public void shutdown() {
executor.shutdown();
}
/**
* Returns number of pending ProducerConsumer items (submitted to input
* queue for processing via put() method but not yet consumed by take()
* or blockingTake().
*
* @return Number of items in ProducerConsumer (either pending for
* processing or waiting to be consumed).
*/
public int getWorkCnt() {
return workCnt.get();
}
/**
* Returns true if there are items in ProducerConsumer that are either
* pending for processing or waiting to be consumed.
*
* @return True if there were more items put() to ProducerConsumer than
* consumed by take() or blockingTake().
*/
public boolean hasWork() {
return workCnt.get() > 0;
}
/**
* Blocking put workRequest to ProducerConsumer input queue.
*
* @param WorkRequest<T> item to be processed.
*/
public void put(WorkRequest<T> workRequest) {
boolean isDone = false;
while (!isDone) {
try {
inputQueue.put(workRequest);
workCnt.incrementAndGet();
isDone = true;
} catch (InterruptedException ie) {
LOG.error("Could not put workRequest into inputQueue. Retrying...");
}
}
}
/**
* Blocking take from ProducerConsumer output queue that can be interrupted.
*
* @return WorkReport<R> item returned by processor's processItem().
*/
public WorkReport<R> take() throws InterruptedException {
WorkReport<R> report = outputQueue.take();
workCnt.decrementAndGet();
return report;
}
/**
* Blocking take from ProducerConsumer output queue (catches exceptions and
* retries forever).
*
* @return WorkReport<R> item returned by processor's processItem().
*/
public WorkReport<R> blockingTake() {
while (true) {
try {
WorkReport<R> report = outputQueue.take();
workCnt.decrementAndGet();
return report;
} catch (InterruptedException ie) {
LOG.debug("Retrying in blockingTake...");
}
}
}
private class Worker implements Runnable {
private WorkRequestProcessor<T, R> processor;
public Worker(WorkRequestProcessor<T, R> processor) {
this.processor = processor;
}
public void run() {
while (true) {
try {
WorkRequest<T> work = inputQueue.take();
WorkReport<R> result = processor.processItem(work);
boolean isDone = false;
while (!isDone) {
try {
outputQueue.put(result);
isDone = true;
} catch (InterruptedException ie) {
LOG.debug("Could not put report into outputQueue. Retrying...");
}
}
} catch (InterruptedException ie) {
LOG.debug("Interrupted while waiting for request from inputQueue.");
}
}
}
}
}

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.util;
/**
* WorkReport<T> is a simple container for items of class T and its
* corresponding retry counter that indicates how many times this item
* was previously attempted to be processed.
*/
public class WorkReport<T> {
private T item;
private final boolean success;
private final int retry;
private final Exception exception;
/**
* @param item Object representing work report.
* @param retry Number of unsuccessful attempts to process work.
* @param success Indicates whether work was successfully completed.
*/
public WorkReport(T item, int retry, boolean success) {
this(item, retry, success, null);
}
/**
* @param item Object representing work report.
* @param retry Number of unsuccessful attempts to process work.
* @param success Indicates whether work was successfully completed.
* @param exception Exception thrown while processing work.
*/
public WorkReport(T item, int retry, boolean success, Exception exception) {
this.item = item;
this.retry = retry;
this.success = success;
this.exception = exception;
}
public T getItem() {
return item;
}
/**
* @return True if the work was processed successfully.
*/
public boolean getSuccess() {
return success;
}
/**
* @return Number of unsuccessful attempts to process work.
*/
public int getRetry() {
return retry;
}
/**
* @return Exception thrown while processing work.
*/
public Exception getException() {
return exception;
}
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.util;
/**
* WorkRequest<T> is a simple container for items of class T and its
* corresponding retry counter that indicates how many times this item
* was previously attempted to be processed.
*/
public class WorkRequest<T> {
private int retry;
private T item;
public WorkRequest(T item) {
this(item, 0);
}
/**
* @param item Object representing WorkRequest input data.
* @param retry Number of previous attempts to process this work request.
*/
public WorkRequest(T item, int retry) {
this.item = item;
this.retry = retry;
}
public T getItem() {
return item;
}
/**
* @return Number of previous attempts to process this work request.
*/
public int getRetry() {
return retry;
}
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.util;
import org.apache.hadoop.tools.util.WorkReport;
import org.apache.hadoop.tools.util.WorkRequest;
/**
* Interface for ProducerConsumer worker loop.
*
*/
public interface WorkRequestProcessor<T, R> {
/**
* Work processor.
*
* @param workRequest Input work item.
* @return Outputs WorkReport after processing workRequest item.
*
*/
public WorkReport<R> processItem(WorkRequest<T> workRequest);
}

View File

@ -32,6 +32,9 @@
import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.junit.Test; import org.junit.Test;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -40,9 +43,12 @@
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.List; import java.util.Arrays;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@RunWith(value = Parameterized.class)
public class TestCopyListing extends SimpleCopyListing { public class TestCopyListing extends SimpleCopyListing {
private static final Log LOG = LogFactory.getLog(TestCopyListing.class); private static final Log LOG = LogFactory.getLog(TestCopyListing.class);
@ -63,9 +69,15 @@ public static void destroy() {
cluster.shutdown(); cluster.shutdown();
} }
} }
public TestCopyListing() { @Parameters
super(config, CREDENTIALS); public static Collection<Object[]> data() {
Object[][] data = new Object[][] { { 1 }, { 2 }, { 10 }, { 20} };
return Arrays.asList(data);
}
public TestCopyListing(int numListstatusThreads) {
super(config, CREDENTIALS, numListstatusThreads);
} }
protected TestCopyListing(Configuration configuration) { protected TestCopyListing(Configuration configuration) {

View File

@ -30,14 +30,19 @@
import org.apache.hadoop.tools.util.TestDistCpUtils; import org.apache.hadoop.tools.util.TestDistCpUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.List; import java.util.List;
@RunWith(value = Parameterized.class)
public class TestIntegration { public class TestIntegration {
private static final Log LOG = LogFactory.getLog(TestIntegration.class); private static final Log LOG = LogFactory.getLog(TestIntegration.class);
@ -46,6 +51,17 @@ public class TestIntegration {
private static Path listFile; private static Path listFile;
private static Path target; private static Path target;
private static String root; private static String root;
private int numListstatusThreads;
public TestIntegration(int numListstatusThreads) {
this.numListstatusThreads = numListstatusThreads;
}
@Parameters
public static Collection<Object[]> data() {
Object[][] data = new Object[][] { { 1 }, { 2 }, { 10 } };
return Arrays.asList(data);
}
private static Configuration getConf() { private static Configuration getConf() {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
@ -597,6 +613,7 @@ private void runTest(Path listFile, Path target, boolean targetExists,
options.setDeleteMissing(delete); options.setDeleteMissing(delete);
options.setOverwrite(overwrite); options.setOverwrite(overwrite);
options.setTargetPathExists(targetExists); options.setTargetPathExists(targetExists);
options.setNumListstatusThreads(numListstatusThreads);
try { try {
new DistCp(getConf(), options).execute(); new DistCp(getConf(), options).execute();
} catch (Exception e) { } catch (Exception e) {

View File

@ -303,6 +303,48 @@ public void testParseMaps() {
} catch (IllegalArgumentException ignore) { } } catch (IllegalArgumentException ignore) { }
} }
@Test
public void testParseNumListstatusThreads() {
DistCpOptions options = OptionsParser.parse(new String[] {
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/"});
// If command line argument isn't set, we expect .getNumListstatusThreads
// option to be zero (so that we know when to override conf properties).
Assert.assertEquals(0, options.getNumListstatusThreads());
options = OptionsParser.parse(new String[] {
"--numListstatusThreads",
"12",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/"});
Assert.assertEquals(12, options.getNumListstatusThreads());
options = OptionsParser.parse(new String[] {
"--numListstatusThreads",
"0",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/"});
Assert.assertEquals(0, options.getNumListstatusThreads());
try {
OptionsParser.parse(new String[] {
"--numListstatusThreads",
"hello",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/"});
Assert.fail("Non numberic numListstatusThreads parsed");
} catch (IllegalArgumentException ignore) { }
// Ignore large number of threads.
options = OptionsParser.parse(new String[] {
"--numListstatusThreads",
"100",
"hdfs://localhost:8020/source/first",
"hdfs://localhost:8020/target/"});
Assert.assertEquals(DistCpOptions.maxNumListstatusThreads,
options.getNumListstatusThreads());
}
@Test @Test
public void testSourceListing() { public void testSourceListing() {
DistCpOptions options = OptionsParser.parse(new String[] { DistCpOptions options = OptionsParser.parse(new String[] {

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools.util;
import org.apache.hadoop.tools.util.ProducerConsumer;
import org.apache.hadoop.tools.util.WorkReport;
import org.apache.hadoop.tools.util.WorkRequest;
import org.apache.hadoop.tools.util.WorkRequestProcessor;
import org.junit.Assert;
import org.junit.Test;
import java.lang.Exception;
import java.lang.Integer;
public class TestProducerConsumer {
public class CopyProcessor implements WorkRequestProcessor<Integer, Integer> {
public WorkReport<Integer> processItem(WorkRequest<Integer> workRequest) {
Integer item = new Integer(workRequest.getItem());
return new WorkReport<Integer>(item, 0, true);
}
}
public class ExceptionProcessor implements WorkRequestProcessor<Integer, Integer> {
@SuppressWarnings("null")
public WorkReport<Integer> processItem(WorkRequest<Integer> workRequest) {
try {
Integer item = null;
item.intValue(); // Throw NULL pointer exception.
// We should never be here (null pointer exception above)
return new WorkReport<Integer>(item, 0, true);
} catch (Exception e) {
Integer item = new Integer(workRequest.getItem());
return new WorkReport<Integer>(item, 1, false, e);
}
}
}
@Test
public void testSimpleProducerConsumer() {
ProducerConsumer<Integer, Integer> worker =
new ProducerConsumer<Integer, Integer>(1);
worker.addWorker(new CopyProcessor());
worker.put(new WorkRequest<Integer>(42));
try {
WorkReport<Integer> report = worker.take();
Assert.assertEquals(42, report.getItem().intValue());
} catch (InterruptedException ie) {
Assert.assertTrue(false);
}
}
@Test
public void testMultipleProducerConsumer() {
ProducerConsumer<Integer, Integer> workers =
new ProducerConsumer<Integer, Integer>(10);
for (int i = 0; i < 10; i++) {
workers.addWorker(new CopyProcessor());
}
int sum = 0;
int numRequests = 2000;
for (int i = 0; i < numRequests; i++) {
workers.put(new WorkRequest<Integer>(i + 42));
sum += i + 42;
}
int numReports = 0;
while (workers.getWorkCnt() > 0) {
WorkReport<Integer> report = workers.blockingTake();
sum -= report.getItem().intValue();
numReports++;
}
Assert.assertEquals(0, sum);
Assert.assertEquals(numRequests, numReports);
}
@Test
public void testExceptionProducerConsumer() {
ProducerConsumer<Integer, Integer> worker =
new ProducerConsumer<Integer, Integer>(1);
worker.addWorker(new ExceptionProcessor());
worker.put(new WorkRequest<Integer>(42));
try {
WorkReport<Integer> report = worker.take();
Assert.assertEquals(42, report.getItem().intValue());
Assert.assertFalse(report.getSuccess());
Assert.assertNotNull(report.getException());
} catch (InterruptedException ie) {
Assert.assertTrue(false);
}
}
}