HDFS-3519. Checkpoint upload may interfere with a concurrent saveNamespace. Contributed by Ming Ma.

This commit is contained in:
cnauroth 2015-01-22 16:26:21 -08:00
parent 792b7d337a
commit d3268c4b10
4 changed files with 58 additions and 21 deletions

View File

@ -765,6 +765,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7575. Upgrade should generate a unique storage ID for each HDFS-7575. Upgrade should generate a unique storage ID for each
volume. (Arpit Agarwal) volume. (Arpit Agarwal)
HDFS-3519. Checkpoint upload may interfere with a concurrent saveNamespace.
(Ming Ma via cnauroth)
Release 2.6.1 - UNRELEASED Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -29,9 +29,11 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -97,6 +99,15 @@ public class FSImage implements Closeable {
protected NNStorageRetentionManager archivalManager; protected NNStorageRetentionManager archivalManager;
/* Used to make sure there are no concurrent checkpoints for a given txid
* The checkpoint here could be one of the following operations.
* a. checkpoint when NN is in standby.
* b. admin saveNameSpace operation.
* c. download checkpoint file from any remote checkpointer.
*/
private final Set<Long> currentlyCheckpointing =
Collections.<Long>synchronizedSet(new HashSet<Long>());
/** /**
* Construct an FSImage * Construct an FSImage
* @param conf Configuration * @param conf Configuration
@ -1058,18 +1069,26 @@ public class FSImage implements Closeable {
editLog.endCurrentLogSegment(true); editLog.endCurrentLogSegment(true);
} }
long imageTxId = getLastAppliedOrWrittenTxId(); long imageTxId = getLastAppliedOrWrittenTxId();
if (!addToCheckpointing(imageTxId)) {
throw new IOException(
"FS image is being downloaded from another NN at txid " + imageTxId);
}
try { try {
saveFSImageInAllDirs(source, nnf, imageTxId, canceler); try {
storage.writeAll(); saveFSImageInAllDirs(source, nnf, imageTxId, canceler);
} finally { storage.writeAll();
if (editLogWasOpen) { } finally {
editLog.startLogSegmentAndWriteHeaderTxn(imageTxId + 1); if (editLogWasOpen) {
// Take this opportunity to note the current transaction. editLog.startLogSegmentAndWriteHeaderTxn(imageTxId + 1);
// Even if the namespace save was cancelled, this marker // Take this opportunity to note the current transaction.
// is only used to determine what transaction ID is required // Even if the namespace save was cancelled, this marker
// for startup. So, it doesn't hurt to update it unnecessarily. // is only used to determine what transaction ID is required
storage.writeTransactionIdFileToStorage(imageTxId + 1); // for startup. So, it doesn't hurt to update it unnecessarily.
storage.writeTransactionIdFileToStorage(imageTxId + 1);
}
} }
} finally {
removeFromCheckpointing(imageTxId);
} }
} }
@ -1078,7 +1097,22 @@ public class FSImage implements Closeable {
*/ */
protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid) protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid)
throws IOException { throws IOException {
saveFSImageInAllDirs(source, NameNodeFile.IMAGE, txid, null); if (!addToCheckpointing(txid)) {
throw new IOException(("FS image is being downloaded from another NN"));
}
try {
saveFSImageInAllDirs(source, NameNodeFile.IMAGE, txid, null);
} finally {
removeFromCheckpointing(txid);
}
}
public boolean addToCheckpointing(long txid) {
return currentlyCheckpointing.add(txid);
}
public void removeFromCheckpointing(long txid) {
currentlyCheckpointing.remove(txid);
} }
private synchronized void saveFSImageInAllDirs(FSNamesystem source, private synchronized void saveFSImageInAllDirs(FSNamesystem source,

View File

@ -81,9 +81,6 @@ public class ImageServlet extends HttpServlet {
private static final String LATEST_FSIMAGE_VALUE = "latest"; private static final String LATEST_FSIMAGE_VALUE = "latest";
private static final String IMAGE_FILE_TYPE = "imageFile"; private static final String IMAGE_FILE_TYPE = "imageFile";
private static final Set<Long> currentlyDownloadingCheckpoints =
Collections.synchronizedSet(new HashSet<Long>());
@Override @Override
public void doGet(final HttpServletRequest request, public void doGet(final HttpServletRequest request,
final HttpServletResponse response) throws ServletException, IOException { final HttpServletResponse response) throws ServletException, IOException {
@ -467,17 +464,20 @@ public class ImageServlet extends HttpServlet {
final NameNodeFile nnf = parsedParams.getNameNodeFile(); final NameNodeFile nnf = parsedParams.getNameNodeFile();
if (!currentlyDownloadingCheckpoints.add(txid)) { if (!nnImage.addToCheckpointing(txid)) {
response.sendError(HttpServletResponse.SC_CONFLICT, response.sendError(HttpServletResponse.SC_CONFLICT,
"Another checkpointer is already in the process of uploading a" "Either current namenode is checkpointing or another"
+ " checkpoint made at transaction ID " + txid); + " checkpointer is already in the process of "
+ "uploading a checkpoint made at transaction ID "
+ txid);
return null; return null;
} }
try { try {
if (nnImage.getStorage().findImageFile(nnf, txid) != null) { if (nnImage.getStorage().findImageFile(nnf, txid) != null) {
response.sendError(HttpServletResponse.SC_CONFLICT, response.sendError(HttpServletResponse.SC_CONFLICT,
"Another checkpointer already uploaded an checkpoint " "Either current namenode has checkpointed or "
+ "for txid " + txid); + "another checkpointer already uploaded an "
+ "checkpoint for txid " + txid);
return null; return null;
} }
@ -502,7 +502,7 @@ public class ImageServlet extends HttpServlet {
stream.close(); stream.close();
} }
} finally { } finally {
currentlyDownloadingCheckpoints.remove(txid); nnImage.removeFromCheckpointing(txid);
} }
return null; return null;
} }

View File

@ -466,7 +466,7 @@ public class TestStandbyCheckpoints {
throws IOException { throws IOException {
CompressionOutputStream ret = super.createOutputStream(out); CompressionOutputStream ret = super.createOutputStream(out);
CompressionOutputStream spy = Mockito.spy(ret); CompressionOutputStream spy = Mockito.spy(ret);
Mockito.doAnswer(new GenericTestUtils.SleepAnswer(2)) Mockito.doAnswer(new GenericTestUtils.SleepAnswer(5))
.when(spy).write(Mockito.<byte[]>any(), Mockito.anyInt(), Mockito.anyInt()); .when(spy).write(Mockito.<byte[]>any(), Mockito.anyInt(), Mockito.anyInt());
return spy; return spy;
} }