HDFS-5064. Standby checkpoints should not block concurrent readers. Contributed by Aaron Twining Myers.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1575449 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Wang 2014-03-07 23:39:14 +00:00
parent 78f1a475c8
commit fd3e554ff0
8 changed files with 142 additions and 7 deletions

View File

@ -329,6 +329,9 @@ Release 2.4.0 - UNRELEASED
HDFS-6065. HDFS zero-copy reads should return null on EOF when doing ZCR HDFS-6065. HDFS zero-copy reads should return null on EOF when doing ZCR
(cmccabe) (cmccabe)
HDFS-5064. Standby checkpoints should not block concurrent readers.
(atm via wang)
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9) HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)

View File

@ -1262,7 +1262,11 @@ public class FSImage implements Closeable {
this.lastAppliedTxId = editLog.getLastWrittenTxId(); this.lastAppliedTxId = editLog.getLastWrittenTxId();
} }
public synchronized long getMostRecentCheckpointTxId() { // Should be OK for this to not be synchronized since all of the places which
// mutate this value are themselves synchronized so it shouldn't be possible
// to see this flop back and forth. In the worst case this will just return an
// old value.
public long getMostRecentCheckpointTxId() {
return storage.getMostRecentCheckpointTxId(); return storage.getMostRecentCheckpointTxId();
} }
} }

View File

@ -116,6 +116,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.management.NotCompliantMBeanException; import javax.management.NotCompliantMBeanException;
@ -1294,20 +1295,47 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
this.fsLock.readLock().lock(); this.fsLock.readLock().lock();
} }
@Override @Override
public void longReadLockInterruptibly() throws InterruptedException {
this.fsLock.longReadLock().lockInterruptibly();
try {
this.fsLock.readLock().lockInterruptibly();
} catch (InterruptedException ie) {
// In the event we're interrupted while getting the normal FSNS read lock,
// release the long read lock.
this.fsLock.longReadLock().unlock();
throw ie;
}
}
@Override
public void longReadUnlock() {
this.fsLock.readLock().unlock();
this.fsLock.longReadLock().unlock();
}
@Override
public void readUnlock() { public void readUnlock() {
this.fsLock.readLock().unlock(); this.fsLock.readLock().unlock();
} }
@Override @Override
public void writeLock() { public void writeLock() {
this.fsLock.longReadLock().lock();
this.fsLock.writeLock().lock(); this.fsLock.writeLock().lock();
} }
@Override @Override
public void writeLockInterruptibly() throws InterruptedException { public void writeLockInterruptibly() throws InterruptedException {
this.fsLock.writeLock().lockInterruptibly(); this.fsLock.longReadLock().lockInterruptibly();
try {
this.fsLock.writeLock().lockInterruptibly();
} catch (InterruptedException ie) {
// In the event we're interrupted while getting the normal FSNS write
// lock, release the long read lock.
this.fsLock.longReadLock().unlock();
throw ie;
}
} }
@Override @Override
public void writeUnlock() { public void writeUnlock() {
this.fsLock.writeLock().unlock(); this.fsLock.writeLock().unlock();
this.fsLock.longReadLock().unlock();
} }
@Override @Override
public boolean hasWriteLock() { public boolean hasWriteLock() {
@ -6806,9 +6834,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
@VisibleForTesting @VisibleForTesting
ReentrantReadWriteLock getFsLockForTests() { public ReentrantReadWriteLock getFsLockForTests() {
return fsLock.coarseLock; return fsLock.coarseLock;
} }
@VisibleForTesting
public ReentrantLock getLongReadLockForTests() {
return fsLock.longReadLock;
}
@VisibleForTesting @VisibleForTesting
public SafeModeInfo getSafeModeInfoForTests() { public SafeModeInfo getSafeModeInfoForTests() {

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -33,6 +34,24 @@ class FSNamesystemLock implements ReadWriteLock {
@VisibleForTesting @VisibleForTesting
protected ReentrantReadWriteLock coarseLock; protected ReentrantReadWriteLock coarseLock;
/**
* When locking the FSNS for a read that may take a long time, we take this
* lock before taking the regular FSNS read lock. All writers also take this
* lock before taking the FSNS write lock. Regular (short) readers do not
* take this lock at all, instead relying solely on the synchronization of the
* regular FSNS lock.
*
* This scheme ensures that:
* 1) In the case of normal (fast) ops, readers proceed concurrently and
* writers are not starved.
* 2) In the case of long read ops, short reads are allowed to proceed
* concurrently during the duration of the long read.
*
* See HDFS-5064 for more context.
*/
@VisibleForTesting
protected ReentrantLock longReadLock = new ReentrantLock(true);
FSNamesystemLock(boolean fair) { FSNamesystemLock(boolean fair) {
this.coarseLock = new ReentrantReadWriteLock(fair); this.coarseLock = new ReentrantReadWriteLock(fair);
} }
@ -46,6 +65,10 @@ class FSNamesystemLock implements ReadWriteLock {
public Lock writeLock() { public Lock writeLock() {
return coarseLock.writeLock(); return coarseLock.writeLock();
} }
public Lock longReadLock() {
return longReadLock;
}
public int getReadHoldCount() { public int getReadHoldCount() {
return coarseLock.getReadHoldCount(); return coarseLock.getReadHoldCount();
@ -58,4 +81,4 @@ class FSNamesystemLock implements ReadWriteLock {
public boolean isWriteLockedByCurrentThread() { public boolean isWriteLockedByCurrentThread() {
return coarseLock.isWriteLockedByCurrentThread(); return coarseLock.isWriteLockedByCurrentThread();
} }
} }

View File

@ -124,7 +124,7 @@ public class NNStorage extends Storage implements Closeable,
* recent fsimage file. This does not include any transactions * recent fsimage file. This does not include any transactions
* that have since been written to the edit log. * that have since been written to the edit log.
*/ */
protected long mostRecentCheckpointTxId = HdfsConstants.INVALID_TXID; protected volatile long mostRecentCheckpointTxId = HdfsConstants.INVALID_TXID;
/** /**
* Time of the last checkpoint, in milliseconds since the epoch. * Time of the last checkpoint, in milliseconds since the epoch.

View File

@ -145,7 +145,7 @@ public class StandbyCheckpointer {
assert canceler != null; assert canceler != null;
final long txid; final long txid;
namesystem.writeLockInterruptibly(); namesystem.longReadLockInterruptibly();
try { try {
assert namesystem.getEditLog().isOpenForRead() : assert namesystem.getEditLog().isOpenForRead() :
"Standby Checkpointer should only attempt a checkpoint when " + "Standby Checkpointer should only attempt a checkpoint when " +
@ -168,7 +168,7 @@ public class StandbyCheckpointer {
assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" + assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" +
thisCheckpointTxId + " but instead saved at txid=" + txid; thisCheckpointTxId + " but instead saved at txid=" + txid;
} finally { } finally {
namesystem.writeUnlock(); namesystem.longReadUnlock();
} }
// Upload the saved checkpoint back to the active // Upload the saved checkpoint back to the active

View File

@ -21,6 +21,15 @@ package org.apache.hadoop.hdfs.util;
public interface RwLock { public interface RwLock {
/** Acquire read lock. */ /** Acquire read lock. */
public void readLock(); public void readLock();
/**
* Acquire the long read lock, unless interrupted while waiting. The long
* read lock should also serve to block all concurrent writers.
**/
void longReadLockInterruptibly() throws InterruptedException;
/** Release the long read lock. */
public void longReadUnlock();
/** Release read lock. */ /** Release read lock. */
public void readUnlock(); public void readUnlock();

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode.ha; package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -25,6 +26,7 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.URI; import java.net.URI;
import java.net.URL;
import java.util.List; import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -33,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.FSImage;
@ -310,6 +313,66 @@ public class TestStandbyCheckpoints {
assertTrue("SBN should have finished checkpointing.", assertTrue("SBN should have finished checkpointing.",
answerer.getFireCount() == 1 && answerer.getResultCount() == 1); answerer.getFireCount() == 1 && answerer.getResultCount() == 1);
} }
@Test(timeout=300000)
public void testReadsAllowedDuringCheckpoint() throws Exception {
// Set it up so that we know when the SBN checkpoint starts and ends.
FSImage spyImage1 = NameNodeAdapter.spyOnFsImage(nn1);
DelayAnswer answerer = new DelayAnswer(LOG);
Mockito.doAnswer(answerer).when(spyImage1)
.saveNamespace(Mockito.any(FSNamesystem.class),
Mockito.any(Canceler.class));
// Perform some edits and wait for a checkpoint to start on the SBN.
doEdits(0, 1000);
nn0.getRpcServer().rollEditLog();
answerer.waitForCall();
assertTrue("SBN is not performing checkpoint but it should be.",
answerer.getFireCount() == 1 && answerer.getResultCount() == 0);
// Make sure that the lock has actually been taken by the checkpointing
// thread.
ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
// Perform an RPC that needs to take the write lock.
Thread t = new Thread() {
@Override
public void run() {
try {
nn1.getRpcServer().restoreFailedStorage("false");
} catch (IOException e) {
e.printStackTrace();
}
}
};
t.start();
// Make sure that our thread is waiting for the lock.
ThreadUtil.sleepAtLeastIgnoreInterrupts(1000);
assertFalse(nn1.getNamesystem().getFsLockForTests().hasQueuedThreads());
assertFalse(nn1.getNamesystem().getFsLockForTests().isWriteLocked());
assertTrue(nn1.getNamesystem().getLongReadLockForTests().hasQueuedThreads());
// Get /jmx of the standby NN web UI, which will cause the FSNS read lock to
// be taken.
String pageContents = DFSTestUtil.urlGet(new URL("http://" +
nn1.getHttpAddress().getHostName() + ":" +
nn1.getHttpAddress().getPort() + "/jmx"));
assertTrue(pageContents.contains("NumLiveDataNodes"));
// Make sure that the checkpoint is still going on, implying that the client
// RPC to the SBN happened during the checkpoint.
assertTrue("SBN should have still been checkpointing.",
answerer.getFireCount() == 1 && answerer.getResultCount() == 0);
answerer.proceed();
answerer.waitForResult();
assertTrue("SBN should have finished checkpointing.",
answerer.getFireCount() == 1 && answerer.getResultCount() == 1);
t.join();
}
private void doEdits(int start, int stop) throws IOException { private void doEdits(int start, int stop) throws IOException {
for (int i = start; i < stop; i++) { for (int i = start; i < stop; i++) {