HDFS-2800. Fix cancellation of checkpoints in the standby node to be more reliable. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1339745 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-05-17 17:49:48 +00:00
parent 5ef725eb53
commit 15ddb6634f
9 changed files with 174 additions and 91 deletions

View File

@ -217,6 +217,9 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3434. InvalidProtocolBufferException when visiting DN
browseDirectory.jsp (eli)
HDFS-2800. Fix cancellation of checkpoints in the standby node to be more
reliable. (todd)
Release 2.0.0-alpha - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -89,9 +90,6 @@ public class FSImage implements Closeable {
private final NNStorageRetentionManager archivalManager;
private SaveNamespaceContext curSaveNamespaceContext = null;
/**
* Construct an FSImage
* @param conf Configuration
@ -804,17 +802,28 @@ public class FSImage implements Closeable {
try {
thread.join();
} catch (InterruptedException iex) {
LOG.error("Caught exception while waiting for thread " +
LOG.error("Caught interrupted exception while waiting for thread " +
thread.getName() + " to finish. Retrying join");
}
}
}
}
/**
* @see #saveNamespace(FSNamesystem, Canceler)
*/
public synchronized void saveNamespace(FSNamesystem source)
throws IOException {
saveNamespace(source, null);
}
/**
* Save the contents of the FS image to a new image file in each of the
* current storage directories.
* @param canceler
*/
public synchronized void saveNamespace(FSNamesystem source) throws IOException {
public synchronized void saveNamespace(FSNamesystem source,
Canceler canceler) throws IOException {
assert editLog != null : "editLog must be initialized";
storage.attemptRestoreRemovedStorage();
@ -825,7 +834,7 @@ public class FSImage implements Closeable {
}
long imageTxId = getLastAppliedOrWrittenTxId();
try {
saveFSImageInAllDirs(source, imageTxId);
saveFSImageInAllDirs(source, imageTxId, canceler);
storage.writeAll();
} finally {
if (editLogWasOpen) {
@ -837,27 +846,27 @@ public class FSImage implements Closeable {
storage.writeTransactionIdFileToStorage(imageTxId + 1);
}
}
}
public void cancelSaveNamespace(String reason)
throws InterruptedException {
SaveNamespaceContext ctx = curSaveNamespaceContext;
if (ctx != null) {
ctx.cancel(reason); // waits until complete
}
}
/**
* @see #saveFSImageInAllDirs(FSNamesystem, long, Canceler)
*/
protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid)
throws IOException {
saveFSImageInAllDirs(source, txid, null);
}
protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid,
Canceler canceler)
throws IOException {
if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
throw new IOException("No image directories available!");
}
if (canceler == null) {
canceler = new Canceler();
}
SaveNamespaceContext ctx = new SaveNamespaceContext(
source, txid);
curSaveNamespaceContext = ctx;
source, txid, canceler);
try {
List<Thread> saveThreads = new ArrayList<Thread>();
@ -878,7 +887,7 @@ public class FSImage implements Closeable {
throw new IOException(
"Failed to save in any storage directories while saving namespace.");
}
if (ctx.isCancelled()) {
if (canceler.isCancelled()) {
deleteCancelledCheckpoint(txid);
ctx.checkCancelled(); // throws
assert false : "should have thrown above!";

View File

@ -540,7 +540,6 @@ class FSImageFormat {
private void saveImage(ByteBuffer currentDirName,
INodeDirectory current,
DataOutputStream out) throws IOException {
context.checkCancelled();
List<INode> children = current.getChildrenRaw();
if (children == null || children.isEmpty())
return;
@ -554,9 +553,13 @@ class FSImageFormat {
out.write(currentDirName.array(), 0, prefixLen);
}
out.writeInt(children.size());
int i = 0;
for(INode child : children) {
// print all children first
FSImageSerialization.saveINode2Image(child, out);
if (i++ % 50 == 0) {
context.checkCancelled();
}
}
for(INode child : children) {
if(!child.isDirectory())

View File

@ -702,7 +702,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/
void prepareToStopStandbyServices() throws ServiceFailedException {
if (standbyCheckpointer != null) {
standbyCheckpointer.cancelAndPreventCheckpoints();
standbyCheckpointer.cancelAndPreventCheckpoints(
"About to leave standby state");
}
}
@ -3372,27 +3373,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
}
/**
* Cancel an ongoing saveNamespace operation and wait for its
* threads to exit, if one is currently in progress.
*
* If no such operation is in progress, this call does nothing.
*
* @param reason a reason to be communicated to the caller saveNamespace
* @throws IOException
*/
void cancelSaveNamespace(String reason) throws IOException {
readLock();
try {
checkSuperuserPrivilege();
getFSImage().cancelSaveNamespace(reason);
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
readUnlock();
}
}
/**
* Enables/Disables/Checks restoring failed storage replicas if the storage becomes available again.
* Requires superuser privilege.

View File

@ -23,6 +23,7 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.util.Canceler;
import com.google.common.base.Preconditions;
@ -37,19 +38,16 @@ class SaveNamespaceContext {
private final List<StorageDirectory> errorSDs =
Collections.synchronizedList(new ArrayList<StorageDirectory>());
/**
* If the operation has been canceled, set to the reason why
* it has been canceled (eg standby moving to active)
*/
private volatile String cancelReason = null;
private final Canceler canceller;
private CountDownLatch completionLatch = new CountDownLatch(1);
SaveNamespaceContext(
FSNamesystem sourceNamesystem,
long txid) {
long txid,
Canceler canceller) {
this.sourceNamesystem = sourceNamesystem;
this.txid = txid;
this.canceller = canceller;
}
FSNamesystem getSourceNamesystem() {
@ -68,17 +66,6 @@ class SaveNamespaceContext {
return errorSDs;
}
/**
* Requests that the current saveNamespace operation be
* canceled if it is still running.
* @param reason the reason why cancellation is requested
* @throws InterruptedException
*/
void cancel(String reason) throws InterruptedException {
this.cancelReason = reason;
completionLatch.await();
}
void markComplete() {
Preconditions.checkState(completionLatch.getCount() == 1,
"Context already completed!");
@ -86,13 +73,9 @@ class SaveNamespaceContext {
}
void checkCancelled() throws SaveNamespaceCancelledException {
if (cancelReason != null) {
if (canceller.isCancelled()) {
throw new SaveNamespaceCancelledException(
cancelReason);
canceller.getCancellationReason());
}
}
boolean isCancelled() {
return cancelReason != null;
}
}

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.SaveNamespaceCancelledException;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
@ -59,6 +60,9 @@ public class StandbyCheckpointer {
private String activeNNAddress;
private InetSocketAddress myNNAddress;
private Object cancelLock = new Object();
private Canceler canceler;
// Keep track of how many checkpoints were canceled.
// This is for use in tests.
private static int canceledCount = 0;
@ -123,6 +127,7 @@ public class StandbyCheckpointer {
}
public void stop() throws IOException {
cancelAndPreventCheckpoints("Stopping checkpointer");
thread.setShouldRun(false);
thread.interrupt();
try {
@ -134,6 +139,7 @@ public class StandbyCheckpointer {
}
private void doCheckpoint() throws InterruptedException, IOException {
assert canceler != null;
long txid;
namesystem.writeLockInterruptibly();
@ -154,7 +160,7 @@ public class StandbyCheckpointer {
return;
}
img.saveNamespace(namesystem);
img.saveNamespace(namesystem, canceler);
txid = img.getStorage().getMostRecentCheckpointTxId();
assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" +
thisCheckpointTxId + " but instead saved at txid=" + txid;
@ -173,16 +179,18 @@ public class StandbyCheckpointer {
* and prevent any new checkpoints from starting for the next
* minute or so.
*/
public void cancelAndPreventCheckpoints() throws ServiceFailedException {
try {
thread.preventCheckpointsFor(PREVENT_AFTER_CANCEL_MS);
// TODO(HA): there is a really narrow race here if we are just
// about to start a checkpoint - this won't cancel it!
namesystem.getFSImage().cancelSaveNamespace(
"About to exit standby state");
} catch (InterruptedException e) {
throw new ServiceFailedException(
"Interrupted while trying to cancel checkpoint");
public void cancelAndPreventCheckpoints(String msg) throws ServiceFailedException {
thread.preventCheckpointsFor(PREVENT_AFTER_CANCEL_MS);
synchronized (cancelLock) {
// Before beginning a checkpoint, the checkpointer thread
// takes this lock, and creates a canceler object.
// If the canceler is non-null, then a checkpoint is in
// progress and we need to cancel it. If it's null, then
// the operation has not started, meaning that the above
// time-based prevention will take effect.
if (canceler != null) {
canceler.cancel(msg);
}
}
}
@ -272,10 +280,18 @@ public class StandbyCheckpointer {
"exceeds the configured interval " + checkpointConf.getPeriod());
needCheckpoint = true;
}
if (needCheckpoint && now < preventCheckpointsUntil) {
LOG.info("But skipping this checkpoint since we are about to failover!");
canceledCount++;
} else if (needCheckpoint) {
synchronized (cancelLock) {
if (now < preventCheckpointsUntil) {
LOG.info("But skipping this checkpoint since we are about to failover!");
canceledCount++;
continue;
}
assert canceler == null;
canceler = new Canceler();
}
if (needCheckpoint) {
doCheckpoint();
lastCheckpointTime = now;
}
@ -287,6 +303,10 @@ public class StandbyCheckpointer {
continue;
} catch (Throwable t) {
LOG.error("Exception in doCheckpoint", t);
} finally {
synchronized (cancelLock) {
canceler = null;
}
}
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.util;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Provides a simple interface where one thread can mark an operation
* for cancellation, and another thread can poll for whether the
* cancellation has occurred.
*/
@InterfaceAudience.Private
public class Canceler {
/**
* If the operation has been canceled, set to the reason why
* it has been canceled (eg standby moving to active)
*/
private volatile String cancelReason = null;
/**
* Requests that the current operation be canceled if it is still running.
* This does not block until the cancellation is successful.
* @param reason the reason why cancellation is requested
*/
public void cancel(String reason) {
this.cancelReason = reason;
}
public boolean isCancelled() {
return cancelReason != null;
}
public String getCancellationReason() {
return cancelReason;
}
}

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
@ -517,6 +518,7 @@ public class TestSaveNamespace {
try {
doAnEdit(fsn, 1);
final Canceler canceler = new Canceler();
// Save namespace
fsn.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
@ -524,7 +526,7 @@ public class TestSaveNamespace {
Future<Void> saverFuture = pool.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
image.saveNamespace(finalFsn);
image.saveNamespace(finalFsn, canceler);
return null;
}
});
@ -534,7 +536,7 @@ public class TestSaveNamespace {
// then cancel the saveNamespace
Future<Void> cancelFuture = pool.submit(new Callable<Void>() {
public Void call() throws Exception {
image.cancelSaveNamespace("cancelled");
canceler.cancel("cancelled");
return null;
}
});

View File

@ -21,6 +21,7 @@ import static org.junit.Assert.*;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.List;
@ -36,6 +37,11 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -52,12 +58,18 @@ public class TestStandbyCheckpoints {
private NameNode nn0, nn1;
private FileSystem fs;
@SuppressWarnings("rawtypes")
@Before
public void setupCluster() throws Exception {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true);
conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY,
SlowCodec.class.getCanonicalName());
CompressionCodecFactory.setCodecClasses(conf,
ImmutableList.<Class>of(SlowCodec.class));
MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
@ -159,14 +171,15 @@ public class TestStandbyCheckpoints {
// We should make exactly one checkpoint at this new txid.
Mockito.verify(spyImage1, Mockito.times(1))
.saveNamespace((FSNamesystem) Mockito.anyObject());
.saveNamespace((FSNamesystem) Mockito.anyObject(),
(Canceler)Mockito.anyObject());
}
/**
* Test cancellation of ongoing checkpoints when failover happens
* mid-checkpoint.
*/
@Test
@Test(timeout=120000)
public void testCheckpointCancellation() throws Exception {
cluster.transitionToStandby(0);
@ -191,16 +204,18 @@ public class TestStandbyCheckpoints {
cluster.transitionToActive(0);
for (int i = 0; i < 10; i++) {
boolean canceledOne = false;
for (int i = 0; i < 10 && !canceledOne; i++) {
doEdits(i*10, i*10 + 10);
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
cluster.transitionToStandby(1);
cluster.transitionToActive(0);
canceledOne = StandbyCheckpointer.getCanceledCount() > 0;
}
assertTrue(StandbyCheckpointer.getCanceledCount() > 0);
assertTrue(canceledOne);
}
private void doEdits(int start, int stop) throws IOException {
@ -210,4 +225,21 @@ public class TestStandbyCheckpoints {
}
}
/**
* A codec which just slows down the saving of the image significantly
* by sleeping a few milliseconds on every write. This makes it easy to
* catch the standby in the middle of saving a checkpoint.
*/
public static class SlowCodec extends GzipCodec {
@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
CompressionOutputStream ret = super.createOutputStream(out);
CompressionOutputStream spy = Mockito.spy(ret);
Mockito.doAnswer(new GenericTestUtils.SleepAnswer(2))
.when(spy).write(Mockito.<byte[]>any(), Mockito.anyInt(), Mockito.anyInt());
return spy;
}
}
}