HBASE-11083 ExportSnapshot should provide capability to limit bandwidth consumption

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1590291 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2014-04-26 20:15:14 +00:00
parent 6d291a55b4
commit d2c2792729
2 changed files with 22 additions and 4 deletions

View File

@ -307,6 +307,11 @@
<groupId>commons-collections</groupId> <groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId> <artifactId>commons-collections</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
<version>${hadoop-two.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hbase</groupId> <groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId> <artifactId>hbase-hadoop-compat</artifactId>

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.hbase.snapshot; package org.apache.hadoop.hbase.snapshot;
import java.io.BufferedInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -62,6 +64,7 @@ import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.tools.util.ThrottledInputStream;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
@ -86,6 +89,7 @@ public final class ExportSnapshot extends Configured implements Tool {
private static final String CONF_INPUT_ROOT = "snapshot.export.input.root"; private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size"; private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group"; private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
static final String CONF_TEST_FAILURE = "test.snapshot.export.failure"; static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry"; static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry";
@ -225,7 +229,11 @@ public final class ExportSnapshot extends Configured implements Tool {
} }
} }
FSDataInputStream in = openSourceFile(context, inputPath); InputStream in = openSourceFile(context, inputPath);
int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
if (Integer.MAX_VALUE != bandwidthMB) {
in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024);
}
try { try {
context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen()); context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
@ -298,7 +306,7 @@ public final class ExportSnapshot extends Configured implements Tool {
} }
private void copyData(final Context context, private void copyData(final Context context,
final Path inputPath, final FSDataInputStream in, final Path inputPath, final InputStream in,
final Path outputPath, final FSDataOutputStream out, final Path outputPath, final FSDataOutputStream out,
final long inputFileSize) final long inputFileSize)
throws IOException { throws IOException {
@ -585,7 +593,8 @@ public final class ExportSnapshot extends Configured implements Tool {
private void runCopyJob(final Path inputRoot, final Path outputRoot, private void runCopyJob(final Path inputRoot, final Path outputRoot,
final List<Pair<Path, Long>> snapshotFiles, final boolean verifyChecksum, final List<Pair<Path, Long>> snapshotFiles, final boolean verifyChecksum,
final String filesUser, final String filesGroup, final int filesMode, final String filesUser, final String filesGroup, final int filesMode,
final int mappers) throws IOException, InterruptedException, ClassNotFoundException { final int mappers, final int bandwidthMB)
throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = getConf(); Configuration conf = getConf();
if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup); if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
if (filesUser != null) conf.set(CONF_FILES_USER, filesUser); if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
@ -594,6 +603,7 @@ public final class ExportSnapshot extends Configured implements Tool {
conf.set(CONF_OUTPUT_ROOT, outputRoot.toString()); conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
conf.set(CONF_INPUT_ROOT, inputRoot.toString()); conf.set(CONF_INPUT_ROOT, inputRoot.toString());
conf.setInt("mapreduce.job.maps", mappers); conf.setInt("mapreduce.job.maps", mappers);
conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
Job job = new Job(conf); Job job = new Job(conf);
job.setJobName("ExportSnapshot"); job.setJobName("ExportSnapshot");
@ -655,6 +665,7 @@ public final class ExportSnapshot extends Configured implements Tool {
String filesGroup = null; String filesGroup = null;
String filesUser = null; String filesUser = null;
Path outputRoot = null; Path outputRoot = null;
int bandwidthMB = Integer.MAX_VALUE;
int filesMode = 0; int filesMode = 0;
int mappers = 0; int mappers = 0;
@ -681,6 +692,8 @@ public final class ExportSnapshot extends Configured implements Tool {
filesUser = args[++i]; filesUser = args[++i];
} else if (cmd.equals("-chgroup")) { } else if (cmd.equals("-chgroup")) {
filesGroup = args[++i]; filesGroup = args[++i];
} else if (cmd.equals("-bandwidth")) {
bandwidthMB = Integer.parseInt(args[++i]);
} else if (cmd.equals("-chmod")) { } else if (cmd.equals("-chmod")) {
filesMode = Integer.parseInt(args[++i], 8); filesMode = Integer.parseInt(args[++i], 8);
} else if (cmd.equals("-overwrite")) { } else if (cmd.equals("-overwrite")) {
@ -773,7 +786,7 @@ public final class ExportSnapshot extends Configured implements Tool {
LOG.warn("There are 0 store file to be copied. There may be no data in the table."); LOG.warn("There are 0 store file to be copied. There may be no data in the table.");
} else { } else {
runCopyJob(inputRoot, outputRoot, files, verifyChecksum, runCopyJob(inputRoot, outputRoot, files, verifyChecksum,
filesUser, filesGroup, filesMode, mappers); filesUser, filesGroup, filesMode, mappers, bandwidthMB);
} }
// Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot> // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>