HDFS-3485. DataTransferThrottler will over-throttle when currentTimeMillis jumps. Contributed by Andy Isaacson.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1347753 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-06-07 18:43:45 +00:00
parent 1156fc41dc
commit 35b44b5387
4 changed files with 35 additions and 9 deletions

View File

@ -162,6 +162,9 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3492. Fix some misuses of InputStream#skip (Colin Patrick McCabe HDFS-3492. Fix some misuses of InputStream#skip (Colin Patrick McCabe
via todd) via todd)
HDFS-3485. DataTransferThrottler will over-throttle when currentTimeMillis
jumps (Andy Isaacson via todd)
Release 2.0.0-alpha - 05-23-2012 Release 2.0.0-alpha - 05-23-2012
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -34,13 +34,28 @@ public final class Util {
private final static Log LOG = LogFactory.getLog(Util.class.getName()); private final static Log LOG = LogFactory.getLog(Util.class.getName());
/** /**
* Current system time. * Current system time. Do not use this to calculate a duration or interval
* to sleep, because it will be broken by settimeofday. Instead, use
* monotonicNow.
* @return current time in msec. * @return current time in msec.
*/ */
public static long now() { public static long now() {
return System.currentTimeMillis(); return System.currentTimeMillis();
} }
/**
* Current time from some arbitrary time base in the past, counting in
* milliseconds, and not affected by settimeofday or similar system clock
* changes. This is appropriate to use when computing how much longer to
* wait for an interval to expire.
* @return a monotonic clock that counts in milliseconds.
*/
public static long monotonicNow() {
final long NANOSECONDS_PER_MILLISECOND = 1000000;
return System.nanoTime() / NANOSECONDS_PER_MILLISECOND;
}
/** /**
* Interprets the passed string as a URI. In case of error it * Interprets the passed string as a URI. In case of error it
* assumes the specified string is a file. * assumes the specified string is a file.

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hdfs.util; package org.apache.hadoop.hdfs.util;
import static org.apache.hadoop.hdfs.server.common.Util.monotonicNow;
/** /**
* a class to throttle the data transfers. * a class to throttle the data transfers.
* This class is thread safe. It can be shared by multiple threads. * This class is thread safe. It can be shared by multiple threads.
@ -26,9 +28,9 @@ package org.apache.hadoop.hdfs.util;
public class DataTransferThrottler { public class DataTransferThrottler {
private long period; // period over which bw is imposed private long period; // period over which bw is imposed
private long periodExtension; // Max period over which bw accumulates. private long periodExtension; // Max period over which bw accumulates.
private long bytesPerPeriod; // total number of bytes can be sent in each period private long bytesPerPeriod; // total number of bytes can be sent in each period
private long curPeriodStart; // current period starting time private long curPeriodStart; // current period starting time
private long curReserve; // remaining bytes can be sent in the period private long curReserve; // remaining bytes can be sent in the period
private long bytesAlreadyUsed; private long bytesAlreadyUsed;
/** Constructor /** Constructor
@ -45,7 +47,7 @@ public class DataTransferThrottler {
* @param bandwidthPerSec bandwidth allowed in bytes per second. * @param bandwidthPerSec bandwidth allowed in bytes per second.
*/ */
public DataTransferThrottler(long period, long bandwidthPerSec) { public DataTransferThrottler(long period, long bandwidthPerSec) {
this.curPeriodStart = System.currentTimeMillis(); this.curPeriodStart = monotonicNow();
this.period = period; this.period = period;
this.curReserve = this.bytesPerPeriod = bandwidthPerSec*period/1000; this.curReserve = this.bytesPerPeriod = bandwidthPerSec*period/1000;
this.periodExtension = period*3; this.periodExtension = period*3;
@ -87,7 +89,7 @@ public class DataTransferThrottler {
bytesAlreadyUsed += numOfBytes; bytesAlreadyUsed += numOfBytes;
while (curReserve <= 0) { while (curReserve <= 0) {
long now = System.currentTimeMillis(); long now = monotonicNow();
long curPeriodEnd = curPeriodStart + period; long curPeriodEnd = curPeriodStart + period;
if ( now < curPeriodEnd ) { if ( now < curPeriodEnd ) {

View File

@ -28,8 +28,6 @@ import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import junit.framework.TestCase;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -52,14 +50,21 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/** /**
* This class tests if block replacement request to data nodes work correctly. * This class tests if block replacement request to data nodes work correctly.
*/ */
public class TestBlockReplacement extends TestCase { public class TestBlockReplacement {
private static final Log LOG = LogFactory.getLog( private static final Log LOG = LogFactory.getLog(
"org.apache.hadoop.hdfs.TestBlockReplacement"); "org.apache.hadoop.hdfs.TestBlockReplacement");
MiniDFSCluster cluster; MiniDFSCluster cluster;
@Test
public void testThrottler() throws IOException { public void testThrottler() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
FileSystem.setDefaultUri(conf, "hdfs://localhost:0"); FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
@ -83,6 +88,7 @@ public class TestBlockReplacement extends TestCase {
assertTrue(totalBytes*1000/(end-start)<=bandwidthPerSec); assertTrue(totalBytes*1000/(end-start)<=bandwidthPerSec);
} }
@Test
public void testBlockReplacement() throws IOException, TimeoutException { public void testBlockReplacement() throws IOException, TimeoutException {
final Configuration CONF = new HdfsConfiguration(); final Configuration CONF = new HdfsConfiguration();
final String[] INITIAL_RACKS = {"/RACK0", "/RACK1", "/RACK2"}; final String[] INITIAL_RACKS = {"/RACK0", "/RACK1", "/RACK2"};