HDFS-3485. DataTransferThrottler will over-throttle when currentTimeMillis jumps. Contributed by Andy Isaacson.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1347751 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
743cbf6ef3
commit
6d8efb7378
|
@ -313,6 +313,9 @@ Branch-2 ( Unreleased changes )
|
||||||
HDFS-3492. Fix some misuses of InputStream#skip (Colin Patrick McCabe
|
HDFS-3492. Fix some misuses of InputStream#skip (Colin Patrick McCabe
|
||||||
via todd)
|
via todd)
|
||||||
|
|
||||||
|
HDFS-3485. DataTransferThrottler will over-throttle when currentTimeMillis
|
||||||
|
jumps (Andy Isaacson via todd)
|
||||||
|
|
||||||
Release 2.0.0-alpha - 05-23-2012
|
Release 2.0.0-alpha - 05-23-2012
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -34,13 +34,28 @@ public final class Util {
|
||||||
private final static Log LOG = LogFactory.getLog(Util.class.getName());
|
private final static Log LOG = LogFactory.getLog(Util.class.getName());
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Current system time.
|
* Current system time. Do not use this to calculate a duration or interval
|
||||||
|
* to sleep, because it will be broken by settimeofday. Instead, use
|
||||||
|
* monotonicNow.
|
||||||
* @return current time in msec.
|
* @return current time in msec.
|
||||||
*/
|
*/
|
||||||
public static long now() {
|
public static long now() {
|
||||||
return System.currentTimeMillis();
|
return System.currentTimeMillis();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Current time from some arbitrary time base in the past, counting in
|
||||||
|
* milliseconds, and not affected by settimeofday or similar system clock
|
||||||
|
* changes. This is appropriate to use when computing how much longer to
|
||||||
|
* wait for an interval to expire.
|
||||||
|
* @return a monotonic clock that counts in milliseconds.
|
||||||
|
*/
|
||||||
|
public static long monotonicNow() {
|
||||||
|
final long NANOSECONDS_PER_MILLISECOND = 1000000;
|
||||||
|
|
||||||
|
return System.nanoTime() / NANOSECONDS_PER_MILLISECOND;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Interprets the passed string as a URI. In case of error it
|
* Interprets the passed string as a URI. In case of error it
|
||||||
* assumes the specified string is a file.
|
* assumes the specified string is a file.
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.util;
|
package org.apache.hadoop.hdfs.util;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.server.common.Util.monotonicNow;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* a class to throttle the data transfers.
|
* a class to throttle the data transfers.
|
||||||
* This class is thread safe. It can be shared by multiple threads.
|
* This class is thread safe. It can be shared by multiple threads.
|
||||||
|
@ -45,7 +47,7 @@ public class DataTransferThrottler {
|
||||||
* @param bandwidthPerSec bandwidth allowed in bytes per second.
|
* @param bandwidthPerSec bandwidth allowed in bytes per second.
|
||||||
*/
|
*/
|
||||||
public DataTransferThrottler(long period, long bandwidthPerSec) {
|
public DataTransferThrottler(long period, long bandwidthPerSec) {
|
||||||
this.curPeriodStart = System.currentTimeMillis();
|
this.curPeriodStart = monotonicNow();
|
||||||
this.period = period;
|
this.period = period;
|
||||||
this.curReserve = this.bytesPerPeriod = bandwidthPerSec*period/1000;
|
this.curReserve = this.bytesPerPeriod = bandwidthPerSec*period/1000;
|
||||||
this.periodExtension = period*3;
|
this.periodExtension = period*3;
|
||||||
|
@ -87,7 +89,7 @@ public class DataTransferThrottler {
|
||||||
bytesAlreadyUsed += numOfBytes;
|
bytesAlreadyUsed += numOfBytes;
|
||||||
|
|
||||||
while (curReserve <= 0) {
|
while (curReserve <= 0) {
|
||||||
long now = System.currentTimeMillis();
|
long now = monotonicNow();
|
||||||
long curPeriodEnd = curPeriodStart + period;
|
long curPeriodEnd = curPeriodStart + period;
|
||||||
|
|
||||||
if ( now < curPeriodEnd ) {
|
if ( now < curPeriodEnd ) {
|
||||||
|
|
|
@ -28,8 +28,6 @@ import java.util.List;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import junit.framework.TestCase;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -52,14 +50,21 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.common.Util;
|
import org.apache.hadoop.hdfs.server.common.Util;
|
||||||
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class tests if block replacement request to data nodes work correctly.
|
* This class tests if block replacement request to data nodes work correctly.
|
||||||
*/
|
*/
|
||||||
public class TestBlockReplacement extends TestCase {
|
public class TestBlockReplacement {
|
||||||
private static final Log LOG = LogFactory.getLog(
|
private static final Log LOG = LogFactory.getLog(
|
||||||
"org.apache.hadoop.hdfs.TestBlockReplacement");
|
"org.apache.hadoop.hdfs.TestBlockReplacement");
|
||||||
|
|
||||||
MiniDFSCluster cluster;
|
MiniDFSCluster cluster;
|
||||||
|
@Test
|
||||||
public void testThrottler() throws IOException {
|
public void testThrottler() throws IOException {
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
|
FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
|
||||||
|
@ -83,6 +88,7 @@ public class TestBlockReplacement extends TestCase {
|
||||||
assertTrue(totalBytes*1000/(end-start)<=bandwidthPerSec);
|
assertTrue(totalBytes*1000/(end-start)<=bandwidthPerSec);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
public void testBlockReplacement() throws IOException, TimeoutException {
|
public void testBlockReplacement() throws IOException, TimeoutException {
|
||||||
final Configuration CONF = new HdfsConfiguration();
|
final Configuration CONF = new HdfsConfiguration();
|
||||||
final String[] INITIAL_RACKS = {"/RACK0", "/RACK1", "/RACK2"};
|
final String[] INITIAL_RACKS = {"/RACK0", "/RACK1", "/RACK2"};
|
||||||
|
|
Loading…
Reference in New Issue