HDFS-2507. Merge r1190060 from trunk to 0.23
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1298089 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d667cd871b
commit
b1dbd75065
|
@ -1248,6 +1248,8 @@ Release 0.23.0 - 2011-11-01
|
|||
HDFS-2308. NamenodeProtocol.endCheckpoint is vestigial and can be removed.
|
||||
(eli)
|
||||
|
||||
HDFS-2507. Allow saveNamespace operations to be canceled. (todd)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
|
||||
|
|
|
@ -83,7 +83,9 @@ public class FSImage implements Closeable {
|
|||
|
||||
final private Configuration conf;
|
||||
|
||||
private final NNStorageRetentionManager archivalManager;
|
||||
private final NNStorageRetentionManager archivalManager;
|
||||
|
||||
private SaveNamespaceContext curSaveNamespaceContext = null;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -715,14 +717,15 @@ public class FSImage implements Closeable {
|
|||
/**
|
||||
* Save the contents of the FS image to the file.
|
||||
*/
|
||||
void saveFSImage(FSNamesystem source, StorageDirectory sd, long txid)
|
||||
void saveFSImage(SaveNamespaceContext context, StorageDirectory sd)
|
||||
throws IOException {
|
||||
long txid = context.getTxId();
|
||||
File newFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
|
||||
File dstFile = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE, txid);
|
||||
|
||||
FSImageFormat.Saver saver = new FSImageFormat.Saver();
|
||||
FSImageFormat.Saver saver = new FSImageFormat.Saver(context);
|
||||
FSImageCompression compression = FSImageCompression.createCompression(conf);
|
||||
saver.save(newFile, txid, source, compression);
|
||||
saver.save(newFile, compression);
|
||||
|
||||
MD5FileUtils.saveMD5File(dstFile, saver.getSavedDigest());
|
||||
storage.setMostRecentCheckpointInfo(txid, Util.now());
|
||||
|
@ -740,25 +743,24 @@ public class FSImage implements Closeable {
|
|||
* and writing it out.
|
||||
*/
|
||||
private class FSImageSaver implements Runnable {
|
||||
private final SaveNamespaceContext context;
|
||||
private StorageDirectory sd;
|
||||
private List<StorageDirectory> errorSDs;
|
||||
private final long txid;
|
||||
private final FSNamesystem source;
|
||||
|
||||
FSImageSaver(FSNamesystem source, StorageDirectory sd,
|
||||
List<StorageDirectory> errorSDs, long txid) {
|
||||
this.source = source;
|
||||
|
||||
public FSImageSaver(SaveNamespaceContext context, StorageDirectory sd) {
|
||||
this.context = context;
|
||||
this.sd = sd;
|
||||
this.errorSDs = errorSDs;
|
||||
this.txid = txid;
|
||||
}
|
||||
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
saveFSImage(source, sd, txid);
|
||||
saveFSImage(context, sd);
|
||||
} catch (SaveNamespaceCancelledException snce) {
|
||||
LOG.info("Cancelled image saving for " + sd.getRoot() +
|
||||
": " + snce.getMessage());
|
||||
// don't report an error on the storage dir!
|
||||
} catch (Throwable t) {
|
||||
LOG.error("Unable to save image for " + sd.getRoot(), t);
|
||||
errorSDs.add(sd);
|
||||
context.reportErrorOnStorageDirectory(sd);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -784,7 +786,7 @@ public class FSImage implements Closeable {
|
|||
* Save the contents of the FS image to a new image file in each of the
|
||||
* current storage directories.
|
||||
*/
|
||||
void saveNamespace(FSNamesystem source) throws IOException {
|
||||
synchronized void saveNamespace(FSNamesystem source) throws IOException {
|
||||
assert editLog != null : "editLog must be initialized";
|
||||
storage.attemptRestoreRemovedStorage();
|
||||
|
||||
|
@ -800,46 +802,71 @@ public class FSImage implements Closeable {
|
|||
} finally {
|
||||
if (editLogWasOpen) {
|
||||
editLog.startLogSegment(imageTxId + 1, true);
|
||||
// Take this opportunity to note the current transaction
|
||||
// Take this opportunity to note the current transaction.
|
||||
// Even if the namespace save was cancelled, this marker
|
||||
// is only used to determine what transaction ID is required
|
||||
// for startup. So, it doesn't hurt to update it unnecessarily.
|
||||
storage.writeTransactionIdFileToStorage(imageTxId + 1);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected void saveFSImageInAllDirs(FSNamesystem source, long txid)
|
||||
throws IOException {
|
||||
void cancelSaveNamespace(String reason)
|
||||
throws InterruptedException {
|
||||
SaveNamespaceContext ctx = curSaveNamespaceContext;
|
||||
if (ctx != null) {
|
||||
ctx.cancel(reason); // waits until complete
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid)
|
||||
throws IOException {
|
||||
if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
|
||||
throw new IOException("No image directories available!");
|
||||
}
|
||||
|
||||
List<StorageDirectory> errorSDs =
|
||||
Collections.synchronizedList(new ArrayList<StorageDirectory>());
|
||||
|
||||
List<Thread> saveThreads = new ArrayList<Thread>();
|
||||
// save images into current
|
||||
for (Iterator<StorageDirectory> it
|
||||
= storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
|
||||
StorageDirectory sd = it.next();
|
||||
FSImageSaver saver = new FSImageSaver(source, sd, errorSDs, txid);
|
||||
Thread saveThread = new Thread(saver, saver.toString());
|
||||
saveThreads.add(saveThread);
|
||||
saveThread.start();
|
||||
}
|
||||
waitForThreads(saveThreads);
|
||||
saveThreads.clear();
|
||||
storage.reportErrorsOnDirectories(errorSDs);
|
||||
|
||||
if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
|
||||
throw new IOException(
|
||||
"Failed to save in any storage directories while saving namespace.");
|
||||
}
|
||||
|
||||
renameCheckpoint(txid);
|
||||
SaveNamespaceContext ctx = new SaveNamespaceContext(
|
||||
source, txid);
|
||||
curSaveNamespaceContext = ctx;
|
||||
|
||||
// Since we now have a new checkpoint, we can clean up some
|
||||
// old edit logs and checkpoints.
|
||||
purgeOldStorage();
|
||||
try {
|
||||
List<Thread> saveThreads = new ArrayList<Thread>();
|
||||
// save images into current
|
||||
for (Iterator<StorageDirectory> it
|
||||
= storage.dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
|
||||
StorageDirectory sd = it.next();
|
||||
FSImageSaver saver = new FSImageSaver(ctx, sd);
|
||||
Thread saveThread = new Thread(saver, saver.toString());
|
||||
saveThreads.add(saveThread);
|
||||
saveThread.start();
|
||||
}
|
||||
waitForThreads(saveThreads);
|
||||
saveThreads.clear();
|
||||
storage.reportErrorsOnDirectories(ctx.getErrorSDs());
|
||||
|
||||
if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
|
||||
throw new IOException(
|
||||
"Failed to save in any storage directories while saving namespace.");
|
||||
}
|
||||
if (ctx.isCancelled()) {
|
||||
deleteCancelledCheckpoint(txid);
|
||||
ctx.checkCancelled(); // throws
|
||||
assert false : "should have thrown above!";
|
||||
}
|
||||
|
||||
renameCheckpoint(txid);
|
||||
|
||||
// Since we now have a new checkpoint, we can clean up some
|
||||
// old edit logs and checkpoints.
|
||||
purgeOldStorage();
|
||||
} finally {
|
||||
// Notify any threads waiting on the checkpoint to be canceled
|
||||
// that it is complete.
|
||||
ctx.markComplete();
|
||||
ctx = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -873,6 +900,24 @@ public class FSImage implements Closeable {
|
|||
}
|
||||
if(al != null) storage.reportErrorsOnDirectories(al);
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes the checkpoint file in every storage directory,
|
||||
* since the checkpoint was cancelled.
|
||||
*/
|
||||
private void deleteCancelledCheckpoint(long txid) throws IOException {
|
||||
ArrayList<StorageDirectory> al = Lists.newArrayList();
|
||||
|
||||
for (StorageDirectory sd : storage.dirIterable(NameNodeDirType.IMAGE)) {
|
||||
File ckpt = NNStorage.getStorageFile(sd, NameNodeFile.IMAGE_NEW, txid);
|
||||
if (ckpt.exists() && !ckpt.delete()) {
|
||||
LOG.warn("Unable to delete cancelled checkpoint in " + sd);
|
||||
al.add(sd);
|
||||
}
|
||||
}
|
||||
storage.reportErrorsOnDirectories(al);
|
||||
}
|
||||
|
||||
|
||||
private void renameCheckpointInDir(StorageDirectory sd, long txid)
|
||||
throws IOException {
|
||||
|
@ -1049,4 +1094,5 @@ public class FSImage implements Closeable {
|
|||
public synchronized long getLastAppliedTxId() {
|
||||
return lastAppliedTxId;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -508,6 +508,7 @@ class FSImageFormat {
|
|||
* functions may be used to retrieve information about the file that was written.
|
||||
*/
|
||||
static class Saver {
|
||||
private final SaveNamespaceContext context;
|
||||
/** Set to true once an image has been written */
|
||||
private boolean saved = false;
|
||||
|
||||
|
@ -529,6 +530,11 @@ class FSImageFormat {
|
|||
throw new IllegalStateException("FSImageSaver has already saved an image");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Saver(SaveNamespaceContext context) {
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the MD5 checksum of the image file that was saved.
|
||||
|
@ -539,12 +545,11 @@ class FSImageFormat {
|
|||
}
|
||||
|
||||
void save(File newFile,
|
||||
long txid,
|
||||
FSNamesystem sourceNamesystem,
|
||||
FSImageCompression compression)
|
||||
throws IOException {
|
||||
checkNotSaved();
|
||||
|
||||
final FSNamesystem sourceNamesystem = context.getSourceNamesystem();
|
||||
FSDirectory fsDir = sourceNamesystem.dir;
|
||||
long startTime = now();
|
||||
//
|
||||
|
@ -565,7 +570,7 @@ class FSImageFormat {
|
|||
.getNamespaceID());
|
||||
out.writeLong(fsDir.rootDir.numItemsInTree());
|
||||
out.writeLong(sourceNamesystem.getGenerationStamp());
|
||||
out.writeLong(txid);
|
||||
out.writeLong(context.getTxId());
|
||||
|
||||
// write compression info and set up compressed stream
|
||||
out = compression.writeHeaderAndWrapStream(fos);
|
||||
|
@ -581,10 +586,12 @@ class FSImageFormat {
|
|||
saveImage(strbuf, fsDir.rootDir, out);
|
||||
// save files under construction
|
||||
sourceNamesystem.saveFilesUnderConstruction(out);
|
||||
context.checkCancelled();
|
||||
sourceNamesystem.saveSecretManagerState(out);
|
||||
strbuf = null;
|
||||
|
||||
context.checkCancelled();
|
||||
out.flush();
|
||||
context.checkCancelled();
|
||||
fout.getChannel().force(true);
|
||||
} finally {
|
||||
out.close();
|
||||
|
@ -603,9 +610,10 @@ class FSImageFormat {
|
|||
* This is a recursive procedure, which first saves all children of
|
||||
* a current directory and then moves inside the sub-directories.
|
||||
*/
|
||||
private static void saveImage(ByteBuffer currentDirName,
|
||||
private void saveImage(ByteBuffer currentDirName,
|
||||
INodeDirectory current,
|
||||
DataOutputStream out) throws IOException {
|
||||
context.checkCancelled();
|
||||
List<INode> children = current.getChildrenRaw();
|
||||
if (children == null || children.isEmpty())
|
||||
return;
|
||||
|
|
|
@ -2793,6 +2793,27 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel an ongoing saveNamespace operation and wait for its
|
||||
* threads to exit, if one is currently in progress.
|
||||
*
|
||||
* If no such operation is in progress, this call does nothing.
|
||||
*
|
||||
* @param reason a reason to be communicated to the caller saveNamespace
|
||||
* @throws IOException
|
||||
*/
|
||||
void cancelSaveNamespace(String reason) throws IOException {
|
||||
readLock();
|
||||
try {
|
||||
checkSuperuserPrivilege();
|
||||
getFSImage().cancelSaveNamespace(reason);
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Enables/Disables/Checks restoring failed storage replicas if the storage becomes available again.
|
||||
* Requires superuser privilege.
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
class SaveNamespaceCancelledException extends IOException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
SaveNamespaceCancelledException(String cancelReason) {
|
||||
super(cancelReason);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,98 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
* Context for an ongoing SaveNamespace operation. This class
|
||||
* allows cancellation, and also is responsible for accumulating
|
||||
* failed storage directories.
|
||||
*/
|
||||
class SaveNamespaceContext {
|
||||
private final FSNamesystem sourceNamesystem;
|
||||
private final long txid;
|
||||
private final List<StorageDirectory> errorSDs =
|
||||
Collections.synchronizedList(new ArrayList<StorageDirectory>());
|
||||
|
||||
/**
|
||||
* If the operation has been canceled, set to the reason why
|
||||
* it has been canceled (eg standby moving to active)
|
||||
*/
|
||||
private volatile String cancelReason = null;
|
||||
|
||||
private CountDownLatch completionLatch = new CountDownLatch(1);
|
||||
|
||||
SaveNamespaceContext(
|
||||
FSNamesystem sourceNamesystem,
|
||||
long txid) {
|
||||
this.sourceNamesystem = sourceNamesystem;
|
||||
this.txid = txid;
|
||||
}
|
||||
|
||||
FSNamesystem getSourceNamesystem() {
|
||||
return sourceNamesystem;
|
||||
}
|
||||
|
||||
long getTxId() {
|
||||
return txid;
|
||||
}
|
||||
|
||||
void reportErrorOnStorageDirectory(StorageDirectory sd) {
|
||||
errorSDs.add(sd);
|
||||
}
|
||||
|
||||
List<StorageDirectory> getErrorSDs() {
|
||||
return errorSDs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Requests that the current saveNamespace operation be
|
||||
* canceled if it is still running.
|
||||
* @param reason the reason why cancellation is requested
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
void cancel(String reason) throws InterruptedException {
|
||||
this.cancelReason = reason;
|
||||
completionLatch.await();
|
||||
}
|
||||
|
||||
void markComplete() {
|
||||
Preconditions.checkState(completionLatch.getCount() == 1,
|
||||
"Context already completed!");
|
||||
completionLatch.countDown();
|
||||
}
|
||||
|
||||
void checkCancelled() throws SaveNamespaceCancelledException {
|
||||
if (cancelReason != null) {
|
||||
throw new SaveNamespaceCancelledException(
|
||||
cancelReason);
|
||||
}
|
||||
}
|
||||
|
||||
boolean isCancelled() {
|
||||
return cancelReason != null;
|
||||
}
|
||||
}
|
|
@ -42,7 +42,7 @@ public abstract class MD5FileUtils {
|
|||
private static final Log LOG = LogFactory.getLog(
|
||||
MD5FileUtils.class);
|
||||
|
||||
private static final String MD5_SUFFIX = ".md5";
|
||||
public static final String MD5_SUFFIX = ".md5";
|
||||
private static final Pattern LINE_REGEX =
|
||||
Pattern.compile("([0-9a-f]{32}) [ \\*](.+)");
|
||||
|
||||
|
|
|
@ -31,6 +31,10 @@ import static org.mockito.Mockito.spy;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -49,6 +53,9 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
|||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.hdfs.util.MD5FileUtils;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
@ -129,22 +136,25 @@ public class TestSaveNamespace {
|
|||
case SAVE_SECOND_FSIMAGE_RTE:
|
||||
// The spy throws a RuntimeException when writing to the second directory
|
||||
doAnswer(new FaultySaveImage(true)).
|
||||
when(spyImage).saveFSImage(Mockito.eq(fsn),
|
||||
(StorageDirectory)anyObject(), anyLong());
|
||||
when(spyImage).saveFSImage(
|
||||
(SaveNamespaceContext)anyObject(),
|
||||
(StorageDirectory)anyObject());
|
||||
shouldFail = false;
|
||||
break;
|
||||
case SAVE_SECOND_FSIMAGE_IOE:
|
||||
// The spy throws an IOException when writing to the second directory
|
||||
doAnswer(new FaultySaveImage(false)).
|
||||
when(spyImage).saveFSImage(Mockito.eq(fsn),
|
||||
(StorageDirectory)anyObject(), anyLong());
|
||||
when(spyImage).saveFSImage(
|
||||
(SaveNamespaceContext)anyObject(),
|
||||
(StorageDirectory)anyObject());
|
||||
shouldFail = false;
|
||||
break;
|
||||
case SAVE_ALL_FSIMAGES:
|
||||
// The spy throws IOException in all directories
|
||||
doThrow(new RuntimeException("Injected")).
|
||||
when(spyImage).saveFSImage(Mockito.eq(fsn),
|
||||
(StorageDirectory)anyObject(), anyLong());
|
||||
when(spyImage).saveFSImage(
|
||||
(SaveNamespaceContext)anyObject(),
|
||||
(StorageDirectory)anyObject());
|
||||
shouldFail = true;
|
||||
break;
|
||||
case WRITE_STORAGE_ALL:
|
||||
|
@ -368,9 +378,9 @@ public class TestSaveNamespace {
|
|||
FSNamesystem.getNamespaceEditsDirs(conf));
|
||||
|
||||
doThrow(new IOException("Injected fault: saveFSImage")).
|
||||
when(spyImage).saveFSImage(
|
||||
Mockito.eq(fsn), (StorageDirectory)anyObject(),
|
||||
Mockito.anyLong());
|
||||
when(spyImage).saveFSImage(
|
||||
(SaveNamespaceContext)anyObject(),
|
||||
(StorageDirectory)anyObject());
|
||||
|
||||
try {
|
||||
doAnEdit(fsn, 1);
|
||||
|
@ -512,6 +522,84 @@ public class TestSaveNamespace {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout=20000)
|
||||
public void testCancelSaveNamespace() throws Exception {
|
||||
Configuration conf = getConf();
|
||||
NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
|
||||
DFSTestUtil.formatNameNode(conf);
|
||||
FSNamesystem fsn = FSNamesystem.loadFromDisk(conf);
|
||||
|
||||
// Replace the FSImage with a spy
|
||||
final FSImage image = fsn.dir.fsImage;
|
||||
NNStorage storage = image.getStorage();
|
||||
storage.close(); // unlock any directories that FSNamesystem's initialization may have locked
|
||||
storage.setStorageDirectories(
|
||||
FSNamesystem.getNamespaceDirs(conf),
|
||||
FSNamesystem.getNamespaceEditsDirs(conf));
|
||||
|
||||
FSNamesystem spyFsn = spy(fsn);
|
||||
final FSNamesystem finalFsn = spyFsn;
|
||||
DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG);
|
||||
doAnswer(delayer).when(spyFsn).getGenerationStamp();
|
||||
|
||||
ExecutorService pool = Executors.newFixedThreadPool(2);
|
||||
|
||||
try {
|
||||
doAnEdit(fsn, 1);
|
||||
|
||||
// Save namespace
|
||||
fsn.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
|
||||
try {
|
||||
Future<Void> saverFuture = pool.submit(new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
image.saveNamespace(finalFsn);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
// Wait until saveNamespace calls getGenerationStamp
|
||||
delayer.waitForCall();
|
||||
// then cancel the saveNamespace
|
||||
Future<Void> cancelFuture = pool.submit(new Callable<Void>() {
|
||||
public Void call() throws Exception {
|
||||
image.cancelSaveNamespace("cancelled");
|
||||
return null;
|
||||
}
|
||||
});
|
||||
// give the cancel call time to run
|
||||
Thread.sleep(500);
|
||||
|
||||
// allow saveNamespace to proceed - it should check the cancel flag after
|
||||
// this point and throw an exception
|
||||
delayer.proceed();
|
||||
|
||||
cancelFuture.get();
|
||||
saverFuture.get();
|
||||
fail("saveNamespace did not fail even though cancelled!");
|
||||
} catch (Throwable t) {
|
||||
GenericTestUtils.assertExceptionContains(
|
||||
"SaveNamespaceCancelledException", t);
|
||||
}
|
||||
LOG.info("Successfully cancelled a saveNamespace");
|
||||
|
||||
|
||||
// Check that we have only the original image and not any
|
||||
// cruft left over from half-finished images
|
||||
FSImageTestUtil.logStorageContents(LOG, storage);
|
||||
for (StorageDirectory sd : storage.dirIterable(null)) {
|
||||
File curDir = sd.getCurrentDir();
|
||||
GenericTestUtils.assertGlobEquals(curDir, "fsimage_.*",
|
||||
NNStorage.getImageFileName(0),
|
||||
NNStorage.getImageFileName(0) + MD5FileUtils.MD5_SUFFIX);
|
||||
}
|
||||
} finally {
|
||||
if (fsn != null) {
|
||||
fsn.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void doAnEdit(FSNamesystem fsn, int id) throws IOException {
|
||||
// Make an edit
|
||||
fsn.mkdirs(
|
||||
|
|
Loading…
Reference in New Issue