From cfba355052df15f8eb6cc9b8e90e2d8492bec7d7 Mon Sep 17 00:00:00 2001 From: Ravi Prakash Date: Tue, 21 Apr 2015 16:43:02 -0700 Subject: [PATCH] HADOOP-11827. Speed-up distcp buildListing() using threadpool (Zoran Dimitrijevic via raviprak) --- .../hadoop-common/CHANGES.txt | 3 + .../apache/hadoop/tools/DistCpConstants.java | 4 + .../hadoop/tools/DistCpOptionSwitch.java | 9 +- .../apache/hadoop/tools/DistCpOptions.java | 27 +++ .../apache/hadoop/tools/OptionsParser.java | 12 ++ .../hadoop/tools/SimpleCopyListing.java | 169 ++++++++++++++--- .../hadoop/tools/util/ProducerConsumer.java | 177 ++++++++++++++++++ .../apache/hadoop/tools/util/WorkReport.java | 78 ++++++++ .../apache/hadoop/tools/util/WorkRequest.java | 53 ++++++ .../tools/util/WorkRequestProcessor.java | 38 ++++ .../apache/hadoop/tools/TestCopyListing.java | 20 +- .../apache/hadoop/tools/TestIntegration.java | 17 ++ .../hadoop/tools/TestOptionsParser.java | 42 +++++ .../tools/util/TestProducerConsumer.java | 109 +++++++++++ 14 files changed, 728 insertions(+), 30 deletions(-) create mode 100644 hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java create mode 100644 hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkReport.java create mode 100644 hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequest.java create mode 100644 hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java create mode 100644 hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestProducerConsumer.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 02066b678ee..a6814f8ee15 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -499,6 +499,9 @@ Release 2.8.0 - UNRELEASED HADOOP-11819. HttpServerFunctionalTest#prepareTestWebapp should create web app directory if it does not exist. (Rohith via vinayakumarb) + HADOOP-11827. Speed-up distcp buildListing() using threadpool + (Zoran Dimitrijevic via raviprak) + OPTIMIZATIONS HADOOP-11785. Reduce the number of listStatus operation in distcp diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index a1af2af767f..7ecb6ce415b 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -23,6 +23,9 @@ package org.apache.hadoop.tools; */ public class DistCpConstants { + /* Default number of threads to use for building file listing */ + public static final int DEFAULT_LISTSTATUS_THREADS = 1; + /* Default number of maps to use for DistCp */ public static final int DEFAULT_MAPS = 20; @@ -47,6 +50,7 @@ public class DistCpConstants { public static final String CONF_LABEL_SYNC_FOLDERS = "distcp.sync.folders"; public static final String CONF_LABEL_DELETE_MISSING = "distcp.delete.missing.source"; public static final String CONF_LABEL_SSL_CONF = "distcp.keystore.resource"; + public static final String CONF_LABEL_LISTSTATUS_THREADS = "distcp.liststatus.threads"; public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps"; public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing"; public static final String CONF_LABEL_COPY_STRATEGY = "distcp.copy.strategy"; diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java index e9c7d46152e..f90319dab8d 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java @@ -83,7 +83,14 @@ public enum DistCpOptionSwitch { SSL_CONF(DistCpConstants.CONF_LABEL_SSL_CONF, new Option("mapredSslConf", true, "Configuration for ssl config file" + ", to use with hftps://")), - + /** + * Number of threads for building source file listing (before map-reduce + * phase, max one listStatus per thread at a time). + */ + NUM_LISTSTATUS_THREADS(DistCpConstants.CONF_LABEL_LISTSTATUS_THREADS, + new Option("numListstatusThreads", true, "Number of threads to " + + "use for building file listing (max " + + DistCpOptions.maxNumListstatusThreads + ").")), /** * Max number of maps to use during copy. DistCp will split work * as equally as possible among these maps diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java index 709e5832bab..d8f3ff7162a 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java @@ -44,6 +44,8 @@ public class DistCpOptions { private boolean blocking = true; private boolean useDiff = false; + public static final int maxNumListstatusThreads = 40; + private int numListstatusThreads = 0; // Indicates that flag is not set. private int maxMaps = DistCpConstants.DEFAULT_MAPS; private int mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB; @@ -124,6 +126,7 @@ public class DistCpOptions { this.overwrite = that.overwrite; this.skipCRC = that.skipCRC; this.blocking = that.blocking; + this.numListstatusThreads = that.numListstatusThreads; this.maxMaps = that.maxMaps; this.mapBandwidth = that.mapBandwidth; this.sslConfigurationFile = that.getSslConfigurationFile(); @@ -312,6 +315,30 @@ public class DistCpOptions { this.skipCRC = skipCRC; } + /** Get the number of threads to use for listStatus + * + * @return Number of threads to do listStatus + */ + public int getNumListstatusThreads() { + return numListstatusThreads; + } + + /** Set the number of threads to use for listStatus. We allow max 40 + * threads. Setting numThreads to zero signify we should use the value + * from conf properties. + * + * @param numThreads - Number of threads + */ + public void setNumListstatusThreads(int numThreads) { + if (numThreads > maxNumListstatusThreads) { + this.numListstatusThreads = maxNumListstatusThreads; + } else if (numThreads > 0) { + this.numListstatusThreads = numThreads; + } else { + this.numListstatusThreads = 0; + } + } + /** Get the max number of maps to use for this copy * * @return Max number of maps diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java index a3a76ef5851..1729479ef89 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java @@ -185,6 +185,18 @@ public class OptionsParser { getOptionValue(DistCpOptionSwitch.SSL_CONF.getSwitch())); } + if (command.hasOption(DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch())) { + try { + Integer numThreads = Integer.parseInt(getVal(command, + DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()).trim()); + option.setNumListstatusThreads(numThreads); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "Number of liststatus threads is invalid: " + getVal(command, + DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()), e); + } + } + if (command.hasOption(DistCpOptionSwitch.MAX_MAPS.getSwitch())) { try { Integer maps = Integer.parseInt( diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java index e8a23aa6ff0..b9ba099d8ee 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java @@ -29,13 +29,17 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.tools.DistCpOptions.FileAttribute; import org.apache.hadoop.tools.util.DistCpUtils; +import org.apache.hadoop.tools.util.ProducerConsumer; +import org.apache.hadoop.tools.util.WorkReport; +import org.apache.hadoop.tools.util.WorkRequest; +import org.apache.hadoop.tools.util.WorkRequestProcessor; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.security.Credentials; import com.google.common.annotations.VisibleForTesting; import java.io.*; -import java.util.Stack; +import java.util.ArrayList; import static org.apache.hadoop.tools.DistCpConstants .HDFS_RESERVED_RAW_DIRECTORY_NAME; @@ -50,7 +54,10 @@ public class SimpleCopyListing extends CopyListing { private static final Log LOG = LogFactory.getLog(SimpleCopyListing.class); private long totalPaths = 0; + private long totalDirs = 0; private long totalBytesToCopy = 0; + private int numListstatusThreads = 1; + private final int maxRetries = 3; /** * Protected constructor, to initialize configuration. @@ -61,6 +68,16 @@ public class SimpleCopyListing extends CopyListing { */ protected SimpleCopyListing(Configuration configuration, Credentials credentials) { super(configuration, credentials); + numListstatusThreads = getConf().getInt( + DistCpConstants.CONF_LABEL_LISTSTATUS_THREADS, + DistCpConstants.DEFAULT_LISTSTATUS_THREADS); + } + + @VisibleForTesting + protected SimpleCopyListing(Configuration configuration, Credentials credentials, + int numListstatusThreads) { + super(configuration, credentials); + this.numListstatusThreads = numListstatusThreads; } @Override @@ -160,6 +177,10 @@ public class SimpleCopyListing extends CopyListing { @VisibleForTesting public void doBuildListing(SequenceFile.Writer fileListWriter, DistCpOptions options) throws IOException { + if (options.getNumListstatusThreads() > 0) { + numListstatusThreads = options.getNumListstatusThreads(); + } + try { for (Path path: options.getSourcePaths()) { FileSystem sourceFS = path.getFileSystem(getConf()); @@ -181,6 +202,7 @@ public class SimpleCopyListing extends CopyListing { sourcePathRoot, options); } if (explore) { + ArrayList sourceDirs = new ArrayList(); for (FileStatus sourceStatus: sourceFiles) { if (LOG.isDebugEnabled()) { LOG.debug("Recording source-path: " + sourceStatus.getPath() + " for copy."); @@ -195,15 +217,18 @@ public class SimpleCopyListing extends CopyListing { if (sourceStatus.isDirectory()) { if (LOG.isDebugEnabled()) { - LOG.debug("Traversing source dir: " + sourceStatus.getPath()); + LOG.debug("Adding source dir for traverse: " + sourceStatus.getPath()); } - traverseDirectory(fileListWriter, sourceFS, sourceStatus, - sourcePathRoot, options); + sourceDirs.add(sourceStatus); } } + traverseDirectory(fileListWriter, sourceFS, sourceDirs, + sourcePathRoot, options); } } fileListWriter.close(); + printStats(); + LOG.info("Build file listing completed."); fileListWriter = null; } finally { IOUtils.cleanup(LOG, fileListWriter); @@ -275,43 +300,135 @@ public class SimpleCopyListing extends CopyListing { SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE)); } - private static FileStatus[] getChildren(FileSystem fileSystem, - FileStatus parent) throws IOException { - return fileSystem.listStatus(parent.getPath()); + /* + * Private class to implement WorkRequestProcessor interface. It processes + * each directory (represented by FileStatus item) and returns a list of all + * file-system objects in that directory (files and directories). In case of + * retriable exceptions it increments retry counter and returns the same + * directory for later retry. + */ + private static class FileStatusProcessor + implements WorkRequestProcessor { + private FileSystem fileSystem; + + public FileStatusProcessor(FileSystem fileSystem) { + this.fileSystem = fileSystem; + } + + /* + * Processor for FileSystem.listStatus(). + * + * @param workRequest Input work item that contains FileStatus item which + * is a parent directory we want to list. + * @return Outputs WorkReport with a list of objects in the + * directory (array of objects, empty if parent directory is + * empty). In case of intermittent exception we increment retry + * counter and return the list containing the parent directory). + */ + public WorkReport processItem( + WorkRequest workRequest) { + FileStatus parent = workRequest.getItem(); + int retry = workRequest.getRetry(); + WorkReport result = null; + try { + if (retry > 0) { + int sleepSeconds = 2; + for (int i = 1; i < retry; i++) { + sleepSeconds *= 2; + } + try { + Thread.sleep(1000 * sleepSeconds); + } catch (InterruptedException ie) { + LOG.debug("Interrupted while sleeping in exponential backoff."); + } + } + result = new WorkReport( + fileSystem.listStatus(parent.getPath()), 0, true); + } catch (FileNotFoundException fnf) { + LOG.error("FileNotFoundException exception in listStatus: " + + fnf.getMessage()); + result = new WorkReport(new FileStatus[0], 0, true, fnf); + } catch (Exception e) { + LOG.error("Exception in listStatus. Will send for retry."); + FileStatus[] parentList = new FileStatus[1]; + parentList[0] = parent; + result = new WorkReport(parentList, retry + 1, false, e); + } + return result; + } + } + + private void printStats() { + LOG.info("Paths (files+dirs) cnt = " + totalPaths + + "; dirCnt = " + totalDirs); + } + + private void maybePrintStats() { + if (totalPaths % 100000 == 0) { + printStats(); + } } private void traverseDirectory(SequenceFile.Writer fileListWriter, FileSystem sourceFS, - FileStatus sourceStatus, + ArrayList sourceDirs, Path sourcePathRoot, DistCpOptions options) throws IOException { final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL); final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR); final boolean preserveRawXattrs = options.shouldPreserveRawXattrs(); - Stack pathStack = new Stack(); - pathStack.push(sourceStatus); - while (!pathStack.isEmpty()) { - for (FileStatus child: getChildren(sourceFS, pathStack.pop())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Recording source-path: " + child.getPath() + " for copy."); - } - CopyListingFileStatus childCopyListingStatus = - DistCpUtils.toCopyListingFileStatus(sourceFS, child, - preserveAcls && child.isDirectory(), - preserveXAttrs && child.isDirectory(), - preserveRawXattrs && child.isDirectory()); - writeToFileListing(fileListWriter, childCopyListingStatus, - sourcePathRoot, options); - if (child.isDirectory()) { + assert numListstatusThreads > 0; + LOG.debug("Starting thread pool of " + numListstatusThreads + + " listStatus workers."); + ProducerConsumer workers = + new ProducerConsumer(numListstatusThreads); + for (int i = 0; i < numListstatusThreads; i++) { + workers.addWorker( + new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf()))); + } + + for (FileStatus status : sourceDirs) { + workers.put(new WorkRequest(status, 0)); + maybePrintStats(); + } + + while (workers.hasWork()) { + try { + WorkReport workResult = workers.take(); + int retry = workResult.getRetry(); + for (FileStatus child: workResult.getItem()) { if (LOG.isDebugEnabled()) { - LOG.debug("Traversing into source dir: " + child.getPath()); + LOG.debug("Recording source-path: " + child.getPath() + " for copy."); + } + if (retry == 0) { + CopyListingFileStatus childCopyListingStatus = + DistCpUtils.toCopyListingFileStatus(sourceFS, child, + preserveAcls && child.isDirectory(), + preserveXAttrs && child.isDirectory(), + preserveRawXattrs && child.isDirectory()); + writeToFileListing(fileListWriter, childCopyListingStatus, + sourcePathRoot, options); + } + if (retry < maxRetries) { + if (child.isDirectory()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Traversing into source dir: " + child.getPath()); + } + workers.put(new WorkRequest(child, retry)); + maybePrintStats(); + } + } else { + LOG.error("Giving up on " + child.getPath() + + " after " + retry + " retries."); } - pathStack.push(child); } + } catch (InterruptedException ie) { + LOG.error("Could not get item from childQueue. Retrying..."); } } + workers.shutdown(); } private void writeToFileListingRoot(SequenceFile.Writer fileListWriter, @@ -351,6 +468,8 @@ public class SimpleCopyListing extends CopyListing { if (!fileStatus.isDirectory()) { totalBytesToCopy += fileStatus.getLen(); + } else { + totalDirs++; } totalPaths++; } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java new file mode 100644 index 00000000000..3dad4e3b742 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ProducerConsumer.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools.util; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.tools.util.WorkReport; +import org.apache.hadoop.tools.util.WorkRequest; +import org.apache.hadoop.tools.util.WorkRequestProcessor; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.ArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * ProducerConsumer class encapsulates input and output queues and a + * thread-pool of Workers that loop on WorkRequest inputQueue and for each + * consumed WorkRequest Workers invoke WorkRequestProcessor.processItem() + * and output resulting WorkReport to the outputQueue. + */ +public class ProducerConsumer { + private Log LOG = LogFactory.getLog(ProducerConsumer.class); + private LinkedBlockingQueue> inputQueue; + private LinkedBlockingQueue> outputQueue; + private ExecutorService executor; + private AtomicInteger workCnt; + + /** + * ProducerConsumer maintains input and output queues and a thread-pool of + * workers. + * + * @param numThreads Size of thread-pool to execute Workers. + */ + public ProducerConsumer(int numThreads) { + this.inputQueue = new LinkedBlockingQueue>(); + this.outputQueue = new LinkedBlockingQueue>(); + executor = Executors.newFixedThreadPool(numThreads); + workCnt = new AtomicInteger(0); + } + + /** + * Add another worker that will consume WorkRequest items from input + * queue, process each item using supplied processor, and for every + * processed item output WorkReport to output queue. + * + * @param processor Processor implementing WorkRequestProcessor interface. + * + */ + public void addWorker(WorkRequestProcessor processor) { + executor.execute(new Worker(processor)); + } + + /** + * Shutdown ProducerConsumer worker thread-pool without waiting for + * completion of any pending work. + */ + public void shutdown() { + executor.shutdown(); + } + + /** + * Returns number of pending ProducerConsumer items (submitted to input + * queue for processing via put() method but not yet consumed by take() + * or blockingTake(). + * + * @return Number of items in ProducerConsumer (either pending for + * processing or waiting to be consumed). + */ + public int getWorkCnt() { + return workCnt.get(); + } + + /** + * Returns true if there are items in ProducerConsumer that are either + * pending for processing or waiting to be consumed. + * + * @return True if there were more items put() to ProducerConsumer than + * consumed by take() or blockingTake(). + */ + public boolean hasWork() { + return workCnt.get() > 0; + } + + /** + * Blocking put workRequest to ProducerConsumer input queue. + * + * @param WorkRequest item to be processed. + */ + public void put(WorkRequest workRequest) { + boolean isDone = false; + while (!isDone) { + try { + inputQueue.put(workRequest); + workCnt.incrementAndGet(); + isDone = true; + } catch (InterruptedException ie) { + LOG.error("Could not put workRequest into inputQueue. Retrying..."); + } + } + } + + /** + * Blocking take from ProducerConsumer output queue that can be interrupted. + * + * @return WorkReport item returned by processor's processItem(). + */ + public WorkReport take() throws InterruptedException { + WorkReport report = outputQueue.take(); + workCnt.decrementAndGet(); + return report; + } + + /** + * Blocking take from ProducerConsumer output queue (catches exceptions and + * retries forever). + * + * @return WorkReport item returned by processor's processItem(). + */ + public WorkReport blockingTake() { + while (true) { + try { + WorkReport report = outputQueue.take(); + workCnt.decrementAndGet(); + return report; + } catch (InterruptedException ie) { + LOG.debug("Retrying in blockingTake..."); + } + } + } + + private class Worker implements Runnable { + private WorkRequestProcessor processor; + + public Worker(WorkRequestProcessor processor) { + this.processor = processor; + } + + public void run() { + while (true) { + try { + WorkRequest work = inputQueue.take(); + WorkReport result = processor.processItem(work); + + boolean isDone = false; + while (!isDone) { + try { + outputQueue.put(result); + isDone = true; + } catch (InterruptedException ie) { + LOG.debug("Could not put report into outputQueue. Retrying..."); + } + } + } catch (InterruptedException ie) { + LOG.debug("Interrupted while waiting for request from inputQueue."); + } + } + } + } +} diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkReport.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkReport.java new file mode 100644 index 00000000000..91c9805724a --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkReport.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools.util; + +/** + * WorkReport is a simple container for items of class T and its + * corresponding retry counter that indicates how many times this item + * was previously attempted to be processed. + */ +public class WorkReport { + private T item; + private final boolean success; + private final int retry; + private final Exception exception; + + /** + * @param item Object representing work report. + * @param retry Number of unsuccessful attempts to process work. + * @param success Indicates whether work was successfully completed. + */ + public WorkReport(T item, int retry, boolean success) { + this(item, retry, success, null); + } + + /** + * @param item Object representing work report. + * @param retry Number of unsuccessful attempts to process work. + * @param success Indicates whether work was successfully completed. + * @param exception Exception thrown while processing work. + */ + public WorkReport(T item, int retry, boolean success, Exception exception) { + this.item = item; + this.retry = retry; + this.success = success; + this.exception = exception; + } + + public T getItem() { + return item; + } + + /** + * @return True if the work was processed successfully. + */ + public boolean getSuccess() { + return success; + } + + /** + * @return Number of unsuccessful attempts to process work. + */ + public int getRetry() { + return retry; + } + + /** + * @return Exception thrown while processing work. + */ + public Exception getException() { + return exception; + } +} diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequest.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequest.java new file mode 100644 index 00000000000..339a3ab0962 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools.util; + +/** + * WorkRequest is a simple container for items of class T and its + * corresponding retry counter that indicates how many times this item + * was previously attempted to be processed. + */ +public class WorkRequest { + private int retry; + private T item; + + public WorkRequest(T item) { + this(item, 0); + } + + /** + * @param item Object representing WorkRequest input data. + * @param retry Number of previous attempts to process this work request. + */ + public WorkRequest(T item, int retry) { + this.item = item; + this.retry = retry; + } + + public T getItem() { + return item; + } + + /** + * @return Number of previous attempts to process this work request. + */ + public int getRetry() { + return retry; + } +} diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java new file mode 100644 index 00000000000..b6d8a097c06 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/WorkRequestProcessor.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools.util; + +import org.apache.hadoop.tools.util.WorkReport; +import org.apache.hadoop.tools.util.WorkRequest; + +/** + * Interface for ProducerConsumer worker loop. + * + */ +public interface WorkRequestProcessor { + + /** + * Work processor. + * + * @param workRequest Input work item. + * @return Outputs WorkReport after processing workRequest item. + * + */ + public WorkReport processItem(WorkRequest workRequest); +} diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java index d8f7e0b5d85..8381c1b7032 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java @@ -32,6 +32,9 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.junit.Test; import org.junit.Assert; import org.junit.BeforeClass; @@ -40,9 +43,12 @@ import org.junit.AfterClass; import java.io.File; import java.io.IOException; import java.io.OutputStream; -import java.util.List; +import java.util.Arrays; import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +@RunWith(value = Parameterized.class) public class TestCopyListing extends SimpleCopyListing { private static final Log LOG = LogFactory.getLog(TestCopyListing.class); @@ -63,9 +69,15 @@ public class TestCopyListing extends SimpleCopyListing { cluster.shutdown(); } } - - public TestCopyListing() { - super(config, CREDENTIALS); + + @Parameters + public static Collection data() { + Object[][] data = new Object[][] { { 1 }, { 2 }, { 10 }, { 20} }; + return Arrays.asList(data); + } + + public TestCopyListing(int numListstatusThreads) { + super(config, CREDENTIALS, numListstatusThreads); } protected TestCopyListing(Configuration configuration) { diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java index 67d885a7cc8..572634272fa 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java @@ -30,14 +30,19 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.tools.util.TestDistCpUtils; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.junit.Test; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; +@RunWith(value = Parameterized.class) public class TestIntegration { private static final Log LOG = LogFactory.getLog(TestIntegration.class); @@ -46,6 +51,17 @@ public class TestIntegration { private static Path listFile; private static Path target; private static String root; + private int numListstatusThreads; + + public TestIntegration(int numListstatusThreads) { + this.numListstatusThreads = numListstatusThreads; + } + + @Parameters + public static Collection data() { + Object[][] data = new Object[][] { { 1 }, { 2 }, { 10 } }; + return Arrays.asList(data); + } private static Configuration getConf() { Configuration conf = new Configuration(); @@ -597,6 +613,7 @@ public class TestIntegration { options.setDeleteMissing(delete); options.setOverwrite(overwrite); options.setTargetPathExists(targetExists); + options.setNumListstatusThreads(numListstatusThreads); try { new DistCp(getConf(), options).execute(); } catch (Exception e) { diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java index cc9da3351d2..6eddfb222c6 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java @@ -303,6 +303,48 @@ public class TestOptionsParser { } catch (IllegalArgumentException ignore) { } } + @Test + public void testParseNumListstatusThreads() { + DistCpOptions options = OptionsParser.parse(new String[] { + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + // If command line argument isn't set, we expect .getNumListstatusThreads + // option to be zero (so that we know when to override conf properties). + Assert.assertEquals(0, options.getNumListstatusThreads()); + + options = OptionsParser.parse(new String[] { + "--numListstatusThreads", + "12", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertEquals(12, options.getNumListstatusThreads()); + + options = OptionsParser.parse(new String[] { + "--numListstatusThreads", + "0", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertEquals(0, options.getNumListstatusThreads()); + + try { + OptionsParser.parse(new String[] { + "--numListstatusThreads", + "hello", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.fail("Non numberic numListstatusThreads parsed"); + } catch (IllegalArgumentException ignore) { } + + // Ignore large number of threads. + options = OptionsParser.parse(new String[] { + "--numListstatusThreads", + "100", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertEquals(DistCpOptions.maxNumListstatusThreads, + options.getNumListstatusThreads()); + } + @Test public void testSourceListing() { DistCpOptions options = OptionsParser.parse(new String[] { diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestProducerConsumer.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestProducerConsumer.java new file mode 100644 index 00000000000..de0fcfd6e39 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestProducerConsumer.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools.util; + +import org.apache.hadoop.tools.util.ProducerConsumer; +import org.apache.hadoop.tools.util.WorkReport; +import org.apache.hadoop.tools.util.WorkRequest; +import org.apache.hadoop.tools.util.WorkRequestProcessor; +import org.junit.Assert; +import org.junit.Test; + +import java.lang.Exception; +import java.lang.Integer; + +public class TestProducerConsumer { + public class CopyProcessor implements WorkRequestProcessor { + public WorkReport processItem(WorkRequest workRequest) { + Integer item = new Integer(workRequest.getItem()); + return new WorkReport(item, 0, true); + } + } + + public class ExceptionProcessor implements WorkRequestProcessor { + @SuppressWarnings("null") + public WorkReport processItem(WorkRequest workRequest) { + try { + Integer item = null; + item.intValue(); // Throw NULL pointer exception. + + // We should never be here (null pointer exception above) + return new WorkReport(item, 0, true); + } catch (Exception e) { + Integer item = new Integer(workRequest.getItem()); + return new WorkReport(item, 1, false, e); + } + } + } + + @Test + public void testSimpleProducerConsumer() { + ProducerConsumer worker = + new ProducerConsumer(1); + worker.addWorker(new CopyProcessor()); + worker.put(new WorkRequest(42)); + try { + WorkReport report = worker.take(); + Assert.assertEquals(42, report.getItem().intValue()); + } catch (InterruptedException ie) { + Assert.assertTrue(false); + } + } + + @Test + public void testMultipleProducerConsumer() { + ProducerConsumer workers = + new ProducerConsumer(10); + for (int i = 0; i < 10; i++) { + workers.addWorker(new CopyProcessor()); + } + + int sum = 0; + int numRequests = 2000; + for (int i = 0; i < numRequests; i++) { + workers.put(new WorkRequest(i + 42)); + sum += i + 42; + } + + int numReports = 0; + while (workers.getWorkCnt() > 0) { + WorkReport report = workers.blockingTake(); + sum -= report.getItem().intValue(); + numReports++; + } + Assert.assertEquals(0, sum); + Assert.assertEquals(numRequests, numReports); + } + + @Test + public void testExceptionProducerConsumer() { + ProducerConsumer worker = + new ProducerConsumer(1); + worker.addWorker(new ExceptionProcessor()); + worker.put(new WorkRequest(42)); + try { + WorkReport report = worker.take(); + Assert.assertEquals(42, report.getItem().intValue()); + Assert.assertFalse(report.getSuccess()); + Assert.assertNotNull(report.getException()); + } catch (InterruptedException ie) { + Assert.assertTrue(false); + } + } +}