diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index b2975dc481c..59017948c1a 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -53,6 +53,9 @@ Trunk (Unreleased) IMPROVEMENTS + HADOOP-11203. Allow ditscp to accept bandwitdh in fraction MegaBytes + (Raju Bairishetti via amareshwari) + HADOOP-8017. Configure hadoop-main pom to get rid of M2E plugin execution not covered (Eric Charles via bobby) diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index 21dca628f78..93d6a621cf2 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -30,7 +30,7 @@ public class DistCpConstants { public static final int DEFAULT_MAPS = 20; /* Default bandwidth if none specified */ - public static final int DEFAULT_BANDWIDTH_MB = 100; + public static final float DEFAULT_BANDWIDTH_MB = 100; /* Default strategy for copying. Implementation looked up from distcp-default.xml diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java index ed4a0b2e888..f16a5d29062 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java @@ -174,10 +174,11 @@ public enum DistCpOptionSwitch { "copied to <= n bytes")), /** - * Specify bandwidth per map in MB + * Specify bandwidth per map in MB, accepts bandwidth as a fraction */ BANDWIDTH(DistCpConstants.CONF_LABEL_BANDWIDTH_MB, - new Option("bandwidth", true, "Specify bandwidth per map in MB")), + new Option("bandwidth", true, "Specify bandwidth per map in MB," + + " accepts bandwidth as a fraction.")), /** * Path containing a list of strings, which when found in the path of diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java index 302b626b8af..5b4ccf95e4f 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java @@ -47,7 +47,7 @@ public class DistCpOptions { public static final int maxNumListstatusThreads = 40; private int numListstatusThreads = 0; // Indicates that flag is not set. private int maxMaps = DistCpConstants.DEFAULT_MAPS; - private int mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB; + private float mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB; private String sslConfigurationFile; @@ -366,7 +366,7 @@ public class DistCpOptions { * * @return Bandwidth in MB */ - public int getMapBandwidth() { + public float getMapBandwidth() { return mapBandwidth; } @@ -375,7 +375,7 @@ public class DistCpOptions { * * @param mapBandwidth - per map bandwidth */ - public void setMapBandwidth(int mapBandwidth) { + public void setMapBandwidth(float mapBandwidth) { assert mapBandwidth > 0 : "Bandwidth " + mapBandwidth + " is invalid (should be > 0)"; this.mapBandwidth = mapBandwidth; } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java index 37add1edac4..b41451346df 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java @@ -293,7 +293,7 @@ public class OptionsParser { DistCpOptions option) { if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) { try { - Integer mapBandwidth = Integer.parseInt( + Float mapBandwidth = Float.parseFloat( getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()).trim()); if (mapBandwidth <= 0) { throw new IllegalArgumentException("Bandwidth specified is not " + diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java index cca36df67f5..f75fe76178c 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyMapper.java @@ -62,6 +62,8 @@ public class CopyMapper extends Mapper BYTESEXPECTED,// Number of bytes expected to be copied. BYTESFAILED, // Number of bytes that failed to be copied. BYTESSKIPPED, // Number of bytes that were skipped from copy. + SLEEP_TIME_MS, // Time map slept while trying to honor bandwidth cap. + BANDWIDTH_IN_BYTES, // Effective transfer rate in B/s. } /** @@ -85,7 +87,9 @@ public class CopyMapper extends Mapper private EnumSet preserve = EnumSet.noneOf(FileAttribute.class); private FileSystem targetFS = null; - private Path targetWorkPath = null; + private Path targetWorkPath = null; + private long startEpoch; + private long totalBytesCopied = 0; /** * Implementation of the Mapper::setup() method. This extracts the DistCp- @@ -118,6 +122,7 @@ public class CopyMapper extends Mapper if (conf.get(DistCpConstants.CONF_LABEL_SSL_CONF) != null) { initializeSSLConf(context); } + startEpoch = System.currentTimeMillis(); } /** @@ -288,6 +293,7 @@ public class CopyMapper extends Mapper incrementCounter(context, Counter.BYTESEXPECTED, sourceFileStatus.getLen()); incrementCounter(context, Counter.BYTESCOPIED, bytesCopied); incrementCounter(context, Counter.COPY, 1); + totalBytesCopied += bytesCopied; } private void createTargetDirsWithRetry(String description, @@ -373,4 +379,13 @@ public class CopyMapper extends Mapper return false; } } + + @Override + protected void cleanup(Context context) + throws IOException, InterruptedException { + super.cleanup(context); + long secs = (System.currentTimeMillis() - startEpoch) / 1000; + incrementCounter(context, Counter.BANDWIDTH_IN_BYTES, + totalBytesCopied / ((secs == 0 ? 1 : secs))); + } } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java index 65d644bfefa..6b5078c29ba 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java @@ -293,7 +293,7 @@ public class RetriableFileCopyCommand extends RetriableCommand { Configuration conf) throws IOException { try { FileSystem fs = path.getFileSystem(conf); - long bandwidthMB = conf.getInt(DistCpConstants.CONF_LABEL_BANDWIDTH_MB, + float bandwidthMB = conf.getFloat(DistCpConstants.CONF_LABEL_BANDWIDTH_MB, DistCpConstants.DEFAULT_BANDWIDTH_MB); FSDataInputStream in = fs.open(path); return new ThrottledInputStream(in, bandwidthMB * 1024 * 1024); diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java index 9e435d99afd..2be8ef043c2 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java @@ -39,7 +39,7 @@ import com.google.common.base.Preconditions; public class ThrottledInputStream extends InputStream { private final InputStream rawStream; - private final long maxBytesPerSec; + private final float maxBytesPerSec; private final long startTime = System.currentTimeMillis(); private long bytesRead = 0; @@ -51,8 +51,8 @@ public class ThrottledInputStream extends InputStream { this(rawStream, Long.MAX_VALUE); } - public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) { - assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid"; + public ThrottledInputStream(InputStream rawStream, float maxBytesPerSec) { + assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid"; this.rawStream = rawStream; this.maxBytesPerSec = maxBytesPerSec; } diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java index b9d9ada066d..616872b5821 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java @@ -32,6 +32,8 @@ import java.util.NoSuchElementException; public class TestOptionsParser { + private static final float DELTA = 0.001f; + @Test public void testParseIgnoreFailure() { DistCpOptions options = OptionsParser.parse(new String[] { @@ -104,14 +106,14 @@ public class TestOptionsParser { DistCpOptions options = OptionsParser.parse(new String[] { "hdfs://localhost:8020/source/first", "hdfs://localhost:8020/target/"}); - Assert.assertEquals(options.getMapBandwidth(), DistCpConstants.DEFAULT_BANDWIDTH_MB); + Assert.assertEquals(options.getMapBandwidth(), DistCpConstants.DEFAULT_BANDWIDTH_MB, DELTA); options = OptionsParser.parse(new String[] { "-bandwidth", - "11", + "11.2", "hdfs://localhost:8020/source/first", "hdfs://localhost:8020/target/"}); - Assert.assertEquals(options.getMapBandwidth(), 11); + Assert.assertEquals(options.getMapBandwidth(), 11.2, DELTA); } @Test(expected=IllegalArgumentException.class) @@ -585,8 +587,8 @@ public class TestOptionsParser { options.appendToConf(conf); Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false)); Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.ATOMIC_COMMIT.getConfigLabel(), false)); - Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), - DistCpConstants.DEFAULT_BANDWIDTH_MB); + Assert.assertEquals(conf.getFloat(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), + DistCpConstants.DEFAULT_BANDWIDTH_MB, DELTA); conf = new Configuration(); Assert.assertFalse(conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false)); @@ -597,14 +599,14 @@ public class TestOptionsParser { "-delete", "-pu", "-bandwidth", - "11", + "11.2", "hdfs://localhost:8020/source/first", "hdfs://localhost:8020/target/"}); options.appendToConf(conf); Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false)); Assert.assertTrue(conf.getBoolean(DistCpOptionSwitch.DELETE_MISSING.getConfigLabel(), false)); Assert.assertEquals(conf.get(DistCpOptionSwitch.PRESERVE_STATUS.getConfigLabel()), "U"); - Assert.assertEquals(conf.getInt(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), 11); + Assert.assertEquals(conf.getFloat(DistCpOptionSwitch.BANDWIDTH.getConfigLabel(), -1), 11.2, DELTA); } @Test