From 35b44b53877ae6689327f2983438b16dc9a2db76 Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Thu, 7 Jun 2012 18:43:45 +0000 Subject: [PATCH] HDFS-3485. DataTransferThrottler will over-throttle when currentTimeMillis jumps. Contributed by Andy Isaacson. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1347753 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +++ .../apache/hadoop/hdfs/server/common/Util.java | 17 ++++++++++++++++- .../hadoop/hdfs/util/DataTransferThrottler.java | 12 +++++++----- .../server/datanode/TestBlockReplacement.java | 12 +++++++++--- 4 files changed, 35 insertions(+), 9 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 3c35d4a9318..34c3bc27be0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -162,6 +162,9 @@ Release 2.0.1-alpha - UNRELEASED HDFS-3492. Fix some misuses of InputStream#skip (Colin Patrick McCabe via todd) + HDFS-3485. DataTransferThrottler will over-throttle when currentTimeMillis + jumps (Andy Isaacson via todd) + Release 2.0.0-alpha - 05-23-2012 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java index 1f4e9741664..7c8c301180c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java @@ -34,13 +34,28 @@ public final class Util { private final static Log LOG = LogFactory.getLog(Util.class.getName()); /** - * Current system time. + * Current system time. Do not use this to calculate a duration or interval + * to sleep, because it will be broken by settimeofday. Instead, use + * monotonicNow. * @return current time in msec. */ public static long now() { return System.currentTimeMillis(); } + /** + * Current time from some arbitrary time base in the past, counting in + * milliseconds, and not affected by settimeofday or similar system clock + * changes. This is appropriate to use when computing how much longer to + * wait for an interval to expire. + * @return a monotonic clock that counts in milliseconds. + */ + public static long monotonicNow() { + final long NANOSECONDS_PER_MILLISECOND = 1000000; + + return System.nanoTime() / NANOSECONDS_PER_MILLISECOND; + } + /** * Interprets the passed string as a URI. In case of error it * assumes the specified string is a file. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/DataTransferThrottler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/DataTransferThrottler.java index 8ef570181ab..d452205cc69 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/DataTransferThrottler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/DataTransferThrottler.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.util; +import static org.apache.hadoop.hdfs.server.common.Util.monotonicNow; + /** * a class to throttle the data transfers. * This class is thread safe. It can be shared by multiple threads. @@ -26,9 +28,9 @@ package org.apache.hadoop.hdfs.util; public class DataTransferThrottler { private long period; // period over which bw is imposed private long periodExtension; // Max period over which bw accumulates. - private long bytesPerPeriod; // total number of bytes can be sent in each period - private long curPeriodStart; // current period starting time - private long curReserve; // remaining bytes can be sent in the period + private long bytesPerPeriod; // total number of bytes can be sent in each period + private long curPeriodStart; // current period starting time + private long curReserve; // remaining bytes can be sent in the period private long bytesAlreadyUsed; /** Constructor @@ -45,7 +47,7 @@ public class DataTransferThrottler { * @param bandwidthPerSec bandwidth allowed in bytes per second. */ public DataTransferThrottler(long period, long bandwidthPerSec) { - this.curPeriodStart = System.currentTimeMillis(); + this.curPeriodStart = monotonicNow(); this.period = period; this.curReserve = this.bytesPerPeriod = bandwidthPerSec*period/1000; this.periodExtension = period*3; @@ -87,7 +89,7 @@ public class DataTransferThrottler { bytesAlreadyUsed += numOfBytes; while (curReserve <= 0) { - long now = System.currentTimeMillis(); + long now = monotonicNow(); long curPeriodEnd = curPeriodStart + period; if ( now < curPeriodEnd ) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java index 78d20ad655c..8584cc8266f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java @@ -28,8 +28,6 @@ import java.util.List; import java.util.Random; import java.util.concurrent.TimeoutException; -import junit.framework.TestCase; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -52,14 +50,21 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.net.NetUtils; + +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + /** * This class tests if block replacement request to data nodes work correctly. */ -public class TestBlockReplacement extends TestCase { +public class TestBlockReplacement { private static final Log LOG = LogFactory.getLog( "org.apache.hadoop.hdfs.TestBlockReplacement"); MiniDFSCluster cluster; + @Test public void testThrottler() throws IOException { Configuration conf = new HdfsConfiguration(); FileSystem.setDefaultUri(conf, "hdfs://localhost:0"); @@ -83,6 +88,7 @@ public class TestBlockReplacement extends TestCase { assertTrue(totalBytes*1000/(end-start)<=bandwidthPerSec); } + @Test public void testBlockReplacement() throws IOException, TimeoutException { final Configuration CONF = new HdfsConfiguration(); final String[] INITIAL_RACKS = {"/RACK0", "/RACK1", "/RACK2"};