From 155a644b1cce68b5ef7dcf20eca8ed29bf39c771 Mon Sep 17 00:00:00 2001 From: tedyu Date: Fri, 5 Dec 2014 06:52:04 -0800 Subject: [PATCH] HBASE-12632 ThrottledInputStream/ExportSnapshot does not throttle --- .../hadoopbackport/ThrottledInputStream.java | 27 ++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/ThrottledInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/ThrottledInputStream.java index dd6df0ccb0b..e1da69553cf 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/ThrottledInputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/ThrottledInputStream.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.io.hadoopbackport; import java.io.IOException; import java.io.InputStream; +import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.hbase.classification.InterfaceAudience; /** @@ -93,8 +94,32 @@ public class ThrottledInputStream extends InputStream { return readLen; } + /** + * Read bytes starting from the specified position. This requires rawStream is + * an instance of {@link PositionedReadable}. + * @param position + * @param buffer + * @param offset + * @param length + * @return the number of bytes read + */ + public int read(long position, byte[] buffer, int offset, int length) + throws IOException { + if (!(rawStream instanceof PositionedReadable)) { + throw new UnsupportedOperationException( + "positioned read is not supported by the internal stream"); + } + throttle(); + int readLen = ((PositionedReadable) rawStream).read(position, buffer, + offset, length); + if (readLen != -1) { + bytesRead += readLen; + } + return readLen; + } + private void throttle() throws IOException { - if (getBytesPerSec() > maxBytesPerSec) { + while (getBytesPerSec() > maxBytesPerSec) { try { Thread.sleep(SLEEP_DURATION_MS); totalSleepTime += SLEEP_DURATION_MS;