From 44864c68b59a72f27279212c1316adec37e5209a Mon Sep 17 00:00:00 2001 From: Uma Maheswara Rao G Date: Thu, 14 Aug 2014 04:20:00 +0000 Subject: [PATCH] HDFS-6783. Fix HDFS CacheReplicationMonitor rescan logic. Contributed by Yi Liu and Colin Patrick McCabe. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1617872 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../CacheReplicationMonitor.java | 55 +++++++++++-------- 2 files changed, 35 insertions(+), 22 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 0a90370dc68..cffa1679fd2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -506,6 +506,8 @@ Release 2.6.0 - UNRELEASED HDFS-6247. Avoid timeouts for replaceBlock() call by sending intermediate responses to Balancer (vinayakumarb) + HDFS-6783. Fix HDFS CacheReplicationMonitor rescan logic. (Yi Liu via umamahesh) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java index cf5a4a6504b..cfd6333469b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java @@ -103,22 +103,22 @@ public class CacheReplicationMonitor extends Thread implements Closeable { */ private final Condition scanFinished; - /** - * Whether there are pending CacheManager operations that necessitate a - * CacheReplicationMonitor rescan. Protected by the CRM lock. - */ - private boolean needsRescan = true; - - /** - * Whether we are currently doing a rescan. Protected by the CRM lock. - */ - private boolean isScanning = false; - /** * The number of rescans completed. Used to wait for scans to finish. * Protected by the CacheReplicationMonitor lock. */ - private long scanCount = 0; + private long completedScanCount = 0; + + /** + * The scan we're currently performing, or -1 if no scan is in progress. + * Protected by the CacheReplicationMonitor lock. + */ + private long curScanCount = -1; + + /** + * The number of rescans we need to complete. Protected by the CRM lock. + */ + private long neededScanCount = 0; /** * True if this monitor should terminate. Protected by the CRM lock. @@ -169,7 +169,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable { LOG.info("Shutting down CacheReplicationMonitor"); return; } - if (needsRescan) { + if (completedScanCount < neededScanCount) { LOG.info("Rescanning because of pending operations"); break; } @@ -182,8 +182,6 @@ public class CacheReplicationMonitor extends Thread implements Closeable { doRescan.await(delta, TimeUnit.MILLISECONDS); curTimeMs = Time.monotonicNow(); } - isScanning = true; - needsRescan = false; } finally { lock.unlock(); } @@ -194,8 +192,8 @@ public class CacheReplicationMonitor extends Thread implements Closeable { // Update synchronization-related variables. lock.lock(); try { - isScanning = false; - scanCount++; + completedScanCount = curScanCount; + curScanCount = -1; scanFinished.signalAll(); } finally { lock.unlock(); @@ -226,16 +224,15 @@ public class CacheReplicationMonitor extends Thread implements Closeable { "Must not hold the FSN write lock when waiting for a rescan."); Preconditions.checkArgument(lock.isHeldByCurrentThread(), "Must hold the CRM lock when waiting for a rescan."); - if (!needsRescan) { + if (neededScanCount <= completedScanCount) { return; } // If no scan is already ongoing, mark the CRM as dirty and kick - if (!isScanning) { + if (curScanCount < 0) { doRescan.signal(); } // Wait until the scan finishes and the count advances - final long startCount = scanCount; - while ((!shutdown) && (startCount >= scanCount)) { + while ((!shutdown) && (completedScanCount < neededScanCount)) { try { scanFinished.await(); } catch (InterruptedException e) { @@ -253,7 +250,14 @@ public class CacheReplicationMonitor extends Thread implements Closeable { public void setNeedsRescan() { Preconditions.checkArgument(lock.isHeldByCurrentThread(), "Must hold the CRM lock when setting the needsRescan bit."); - this.needsRescan = true; + if (curScanCount >= 0) { + // If there is a scan in progress, we need to wait for the scan after + // that. + neededScanCount = curScanCount + 1; + } else { + // If there is no scan in progress, we need to wait for the next scan. + neededScanCount = completedScanCount + 1; + } } /** @@ -284,10 +288,17 @@ public class CacheReplicationMonitor extends Thread implements Closeable { scannedBlocks = 0; namesystem.writeLock(); try { + lock.lock(); if (shutdown) { throw new InterruptedException("CacheReplicationMonitor was " + "shut down."); } + curScanCount = completedScanCount + 1; + } + finally { + lock.unlock(); + } + try { resetStatistics(); rescanCacheDirectives(); rescanCachedBlockMap();