From fd3e554ff04658563e0ac3cd42738ad90a78cf95 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Fri, 7 Mar 2014 23:39:14 +0000 Subject: [PATCH] HDFS-5064. Standby checkpoints should not block concurrent readers. Contributed by Aaron Twining Myers. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1575449 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hadoop/hdfs/server/namenode/FSImage.java | 6 +- .../hdfs/server/namenode/FSNamesystem.java | 37 ++++++++++- .../server/namenode/FSNamesystemLock.java | 25 +++++++- .../hdfs/server/namenode/NNStorage.java | 2 +- .../namenode/ha/StandbyCheckpointer.java | 4 +- .../org/apache/hadoop/hdfs/util/RwLock.java | 9 +++ .../namenode/ha/TestStandbyCheckpoints.java | 63 +++++++++++++++++++ 8 files changed, 142 insertions(+), 7 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index d06eb7eb550..c81ba2db604 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -329,6 +329,9 @@ Release 2.4.0 - UNRELEASED HDFS-6065. HDFS zero-copy reads should return null on EOF when doing ZCR (cmccabe) + HDFS-5064. Standby checkpoints should not block concurrent readers. + (atm via wang) + BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index 0e9779f5ea5..35df93cc708 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -1262,7 +1262,11 @@ public class FSImage implements Closeable { this.lastAppliedTxId = editLog.getLastWrittenTxId(); } - public synchronized long getMostRecentCheckpointTxId() { + // Should be OK for this to not be synchronized since all of the places which + // mutate this value are themselves synchronized so it shouldn't be possible + // to see this flop back and forth. In the worst case this will just return an + // old value. + public long getMostRecentCheckpointTxId() { return storage.getMostRecentCheckpointTxId(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index f96ce7eccf4..e40c9a2059b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -116,6 +116,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.management.NotCompliantMBeanException; @@ -1294,20 +1295,47 @@ public class FSNamesystem implements Namesystem, FSClusterStats, this.fsLock.readLock().lock(); } @Override + public void longReadLockInterruptibly() throws InterruptedException { + this.fsLock.longReadLock().lockInterruptibly(); + try { + this.fsLock.readLock().lockInterruptibly(); + } catch (InterruptedException ie) { + // In the event we're interrupted while getting the normal FSNS read lock, + // release the long read lock. + this.fsLock.longReadLock().unlock(); + throw ie; + } + } + @Override + public void longReadUnlock() { + this.fsLock.readLock().unlock(); + this.fsLock.longReadLock().unlock(); + } + @Override public void readUnlock() { this.fsLock.readLock().unlock(); } @Override public void writeLock() { + this.fsLock.longReadLock().lock(); this.fsLock.writeLock().lock(); } @Override public void writeLockInterruptibly() throws InterruptedException { - this.fsLock.writeLock().lockInterruptibly(); + this.fsLock.longReadLock().lockInterruptibly(); + try { + this.fsLock.writeLock().lockInterruptibly(); + } catch (InterruptedException ie) { + // In the event we're interrupted while getting the normal FSNS write + // lock, release the long read lock. + this.fsLock.longReadLock().unlock(); + throw ie; + } } @Override public void writeUnlock() { this.fsLock.writeLock().unlock(); + this.fsLock.longReadLock().unlock(); } @Override public boolean hasWriteLock() { @@ -6806,9 +6834,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } @VisibleForTesting - ReentrantReadWriteLock getFsLockForTests() { + public ReentrantReadWriteLock getFsLockForTests() { return fsLock.coarseLock; } + + @VisibleForTesting + public ReentrantLock getLongReadLockForTests() { + return fsLock.longReadLock; + } @VisibleForTesting public SafeModeInfo getSafeModeInfoForTests() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java index a2a1648e458..a1b9477a694 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.namenode; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.annotations.VisibleForTesting; @@ -33,6 +34,24 @@ class FSNamesystemLock implements ReadWriteLock { @VisibleForTesting protected ReentrantReadWriteLock coarseLock; + /** + * When locking the FSNS for a read that may take a long time, we take this + * lock before taking the regular FSNS read lock. All writers also take this + * lock before taking the FSNS write lock. Regular (short) readers do not + * take this lock at all, instead relying solely on the synchronization of the + * regular FSNS lock. + * + * This scheme ensures that: + * 1) In the case of normal (fast) ops, readers proceed concurrently and + * writers are not starved. + * 2) In the case of long read ops, short reads are allowed to proceed + * concurrently during the duration of the long read. + * + * See HDFS-5064 for more context. + */ + @VisibleForTesting + protected ReentrantLock longReadLock = new ReentrantLock(true); + FSNamesystemLock(boolean fair) { this.coarseLock = new ReentrantReadWriteLock(fair); } @@ -46,6 +65,10 @@ class FSNamesystemLock implements ReadWriteLock { public Lock writeLock() { return coarseLock.writeLock(); } + + public Lock longReadLock() { + return longReadLock; + } public int getReadHoldCount() { return coarseLock.getReadHoldCount(); @@ -58,4 +81,4 @@ class FSNamesystemLock implements ReadWriteLock { public boolean isWriteLockedByCurrentThread() { return coarseLock.isWriteLockedByCurrentThread(); } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java index 21c56c2f669..11466bf857c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java @@ -124,7 +124,7 @@ public class NNStorage extends Storage implements Closeable, * recent fsimage file. This does not include any transactions * that have since been written to the edit log. */ - protected long mostRecentCheckpointTxId = HdfsConstants.INVALID_TXID; + protected volatile long mostRecentCheckpointTxId = HdfsConstants.INVALID_TXID; /** * Time of the last checkpoint, in milliseconds since the epoch. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java index fc19bd64539..1a8897864c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java @@ -145,7 +145,7 @@ public class StandbyCheckpointer { assert canceler != null; final long txid; - namesystem.writeLockInterruptibly(); + namesystem.longReadLockInterruptibly(); try { assert namesystem.getEditLog().isOpenForRead() : "Standby Checkpointer should only attempt a checkpoint when " + @@ -168,7 +168,7 @@ public class StandbyCheckpointer { assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" + thisCheckpointTxId + " but instead saved at txid=" + txid; } finally { - namesystem.writeUnlock(); + namesystem.longReadUnlock(); } // Upload the saved checkpoint back to the active diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/RwLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/RwLock.java index 36e240108be..2792460d531 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/RwLock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/RwLock.java @@ -21,6 +21,15 @@ package org.apache.hadoop.hdfs.util; public interface RwLock { /** Acquire read lock. */ public void readLock(); + + /** + * Acquire the long read lock, unless interrupted while waiting. The long + * read lock should also serve to block all concurrent writers. + **/ + void longReadLockInterruptibly() throws InterruptedException; + + /** Release the long read lock. */ + public void longReadUnlock(); /** Release read lock. */ public void readUnlock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java index 0ca112da5c3..9b472cdc4e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -25,6 +26,7 @@ import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.net.URI; +import java.net.URL; import java.util.List; import org.apache.commons.logging.Log; @@ -33,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.server.namenode.FSImage; @@ -310,6 +313,66 @@ public class TestStandbyCheckpoints { assertTrue("SBN should have finished checkpointing.", answerer.getFireCount() == 1 && answerer.getResultCount() == 1); } + + @Test(timeout=300000) + public void testReadsAllowedDuringCheckpoint() throws Exception { + + // Set it up so that we know when the SBN checkpoint starts and ends. + FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nn1); + DelayAnswer answerer = new DelayAnswer(LOG); + Mockito.doAnswer(answerer).when(spyImage1) + .saveNamespace(Mockito.any(FSNamesystem.class), + Mockito.any(Canceler.class)); + + // Perform some edits and wait for a checkpoint to start on the SBN. + doEdits(0, 1000); + nn0.getRpcServer().rollEditLog(); + answerer.waitForCall(); + assertTrue("SBN is not performing checkpoint but it should be.", + answerer.getFireCount() == 1 && answerer.getResultCount() == 0); + + // Make sure that the lock has actually been taken by the checkpointing + // thread. + ThreadUtil.sleepAtLeastIgnoreInterrupts(1000); + + // Perform an RPC that needs to take the write lock. + Thread t = new Thread() { + @Override + public void run() { + try { + nn1.getRpcServer().restoreFailedStorage("false"); + } catch (IOException e) { + e.printStackTrace(); + } + } + }; + t.start(); + + // Make sure that our thread is waiting for the lock. + ThreadUtil.sleepAtLeastIgnoreInterrupts(1000); + + assertFalse(nn1.getNamesystem().getFsLockForTests().hasQueuedThreads()); + assertFalse(nn1.getNamesystem().getFsLockForTests().isWriteLocked()); + assertTrue(nn1.getNamesystem().getLongReadLockForTests().hasQueuedThreads()); + + // Get /jmx of the standby NN web UI, which will cause the FSNS read lock to + // be taken. + String pageContents = DFSTestUtil.urlGet(new URL("http://" + + nn1.getHttpAddress().getHostName() + ":" + + nn1.getHttpAddress().getPort() + "/jmx")); + assertTrue(pageContents.contains("NumLiveDataNodes")); + + // Make sure that the checkpoint is still going on, implying that the client + // RPC to the SBN happened during the checkpoint. + assertTrue("SBN should have still been checkpointing.", + answerer.getFireCount() == 1 && answerer.getResultCount() == 0); + answerer.proceed(); + answerer.waitForResult(); + assertTrue("SBN should have finished checkpointing.", + answerer.getFireCount() == 1 && answerer.getResultCount() == 1); + + t.join(); + } private void doEdits(int start, int stop) throws IOException { for (int i = start; i < stop; i++) {