From 4d07aa5f1c792bff7bab25ed9fc4ef301ce17fa8 Mon Sep 17 00:00:00 2001 From: Zhihong Yu Date: Mon, 19 May 2014 16:17:53 +0000 Subject: [PATCH] HBASE-11090 Backport HBASE-11083 ExportSnapshot should provide capability to limit bandwidth consumption git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1595947 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoopbackport/ThrottledInputStream.java | 144 ++++++++++++++++++ hbase-server/pom.xml | 5 - .../hadoop/hbase/snapshot/ExportSnapshot.java | 2 +- 3 files changed, 145 insertions(+), 6 deletions(-) create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/ThrottledInputStream.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/ThrottledInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/ThrottledInputStream.java new file mode 100644 index 00000000000..369d71eadac --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/ThrottledInputStream.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io.hadoopbackport; + +import java.io.IOException; +import java.io.InputStream; + +/** + * The ThrottleInputStream provides bandwidth throttling on a specified + * InputStream. It is implemented as a wrapper on top of another InputStream + * instance. + * The throttling works by examining the number of bytes read from the underlying + * InputStream from the beginning, and sleep()ing for a time interval if + * the byte-transfer is found exceed the specified tolerable maximum. + * (Thus, while the read-rate might exceed the maximum for a given short interval, + * the average tends towards the specified maximum, overall.) + */ +public class ThrottledInputStream extends InputStream { + + private final InputStream rawStream; + private final long maxBytesPerSec; + private final long startTime = System.currentTimeMillis(); + + private long bytesRead = 0; + private long totalSleepTime = 0; + + private static final long SLEEP_DURATION_MS = 50; + + public ThrottledInputStream(InputStream rawStream) { + this(rawStream, Long.MAX_VALUE); + } + + public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) { + assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid"; + this.rawStream = rawStream; + this.maxBytesPerSec = maxBytesPerSec; + } + + @Override + public void close() throws IOException { + rawStream.close(); + } + + /** @inheritDoc */ + @Override + public int read() throws IOException { + throttle(); + int data = rawStream.read(); + if (data != -1) { + bytesRead++; + } + return data; + } + + /** @inheritDoc */ + @Override + public int read(byte[] b) throws IOException { + throttle(); + int readLen = rawStream.read(b); + if (readLen != -1) { + bytesRead += readLen; + } + return readLen; + } + + /** @inheritDoc */ + @Override + public int read(byte[] b, int off, int len) throws IOException { + throttle(); + int readLen = rawStream.read(b, off, len); + if (readLen != -1) { + bytesRead += readLen; + } + return readLen; + } + + private void throttle() throws IOException { + if (getBytesPerSec() > maxBytesPerSec) { + try { + Thread.sleep(SLEEP_DURATION_MS); + totalSleepTime += SLEEP_DURATION_MS; + } catch (InterruptedException e) { + throw new IOException("Thread aborted", e); + } + } + } + + /** + * Getter for the number of bytes read from this stream, since creation. + * @return The number of bytes. + */ + public long getTotalBytesRead() { + return bytesRead; + } + + /** + * Getter for the read-rate from this stream, since creation. + * Calculated as bytesRead/elapsedTimeSinceStart. + * @return Read rate, in bytes/sec. + */ + public long getBytesPerSec() { + long elapsed = (System.currentTimeMillis() - startTime) / 1000; + if (elapsed == 0) { + return bytesRead; + } else { + return bytesRead / elapsed; + } + } + + /** + * Getter the total time spent in sleep. + * @return Number of milliseconds spent in sleep. + */ + public long getTotalSleepTime() { + return totalSleepTime; + } + + /** @inheritDoc */ + @Override + public String toString() { + return "ThrottledInputStream{" + + "bytesRead=" + bytesRead + + ", maxBytesPerSec=" + maxBytesPerSec + + ", bytesPerSec=" + getBytesPerSec() + + ", totalSleepTime=" + totalSleepTime + + '}'; + } +} diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml index a6bb4c4ebdc..60343abe854 100644 --- a/hbase-server/pom.xml +++ b/hbase-server/pom.xml @@ -307,11 +307,6 @@ commons-collections commons-collections - - org.apache.hadoop - hadoop-distcp - ${hadoop-two.version} - org.apache.hbase hbase-hadoop-compat diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java index 09864f96d12..77baa757619 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java @@ -67,7 +67,7 @@ import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.security.TokenCache; -import org.apache.hadoop.tools.util.ThrottledInputStream; +import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;