diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 7c5c63930ed..74eb1606956 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -765,6 +765,9 @@ Release 2.7.0 - UNRELEASED HDFS-7575. Upgrade should generate a unique storage ID for each volume. (Arpit Agarwal) + HDFS-3519. Checkpoint upload may interfere with a concurrent saveNamespace. + (Ming Ma via cnauroth) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index 8ac6926850e..3b5d2c39cc6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -29,9 +29,11 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -97,6 +99,15 @@ public class FSImage implements Closeable { protected NNStorageRetentionManager archivalManager; + /* Used to make sure there are no concurrent checkpoints for a given txid + * The checkpoint here could be one of the following operations. + * a. checkpoint when NN is in standby. + * b. admin saveNameSpace operation. + * c. download checkpoint file from any remote checkpointer. + */ + private final Set currentlyCheckpointing = + Collections.synchronizedSet(new HashSet()); + /** * Construct an FSImage * @param conf Configuration @@ -1058,18 +1069,26 @@ public class FSImage implements Closeable { editLog.endCurrentLogSegment(true); } long imageTxId = getLastAppliedOrWrittenTxId(); + if (!addToCheckpointing(imageTxId)) { + throw new IOException( + "FS image is being downloaded from another NN at txid " + imageTxId); + } try { - saveFSImageInAllDirs(source, nnf, imageTxId, canceler); - storage.writeAll(); - } finally { - if (editLogWasOpen) { - editLog.startLogSegmentAndWriteHeaderTxn(imageTxId + 1); - // Take this opportunity to note the current transaction. - // Even if the namespace save was cancelled, this marker - // is only used to determine what transaction ID is required - // for startup. So, it doesn't hurt to update it unnecessarily. - storage.writeTransactionIdFileToStorage(imageTxId + 1); + try { + saveFSImageInAllDirs(source, nnf, imageTxId, canceler); + storage.writeAll(); + } finally { + if (editLogWasOpen) { + editLog.startLogSegmentAndWriteHeaderTxn(imageTxId + 1); + // Take this opportunity to note the current transaction. + // Even if the namespace save was cancelled, this marker + // is only used to determine what transaction ID is required + // for startup. So, it doesn't hurt to update it unnecessarily. + storage.writeTransactionIdFileToStorage(imageTxId + 1); + } } + } finally { + removeFromCheckpointing(imageTxId); } } @@ -1078,7 +1097,22 @@ public class FSImage implements Closeable { */ protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid) throws IOException { - saveFSImageInAllDirs(source, NameNodeFile.IMAGE, txid, null); + if (!addToCheckpointing(txid)) { + throw new IOException(("FS image is being downloaded from another NN")); + } + try { + saveFSImageInAllDirs(source, NameNodeFile.IMAGE, txid, null); + } finally { + removeFromCheckpointing(txid); + } + } + + public boolean addToCheckpointing(long txid) { + return currentlyCheckpointing.add(txid); + } + + public void removeFromCheckpointing(long txid) { + currentlyCheckpointing.remove(txid); } private synchronized void saveFSImageInAllDirs(FSNamesystem source, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java index d10aacc7768..702c8f14db1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java @@ -81,9 +81,6 @@ public class ImageServlet extends HttpServlet { private static final String LATEST_FSIMAGE_VALUE = "latest"; private static final String IMAGE_FILE_TYPE = "imageFile"; - private static final Set currentlyDownloadingCheckpoints = - Collections.synchronizedSet(new HashSet()); - @Override public void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { @@ -467,17 +464,20 @@ public class ImageServlet extends HttpServlet { final NameNodeFile nnf = parsedParams.getNameNodeFile(); - if (!currentlyDownloadingCheckpoints.add(txid)) { + if (!nnImage.addToCheckpointing(txid)) { response.sendError(HttpServletResponse.SC_CONFLICT, - "Another checkpointer is already in the process of uploading a" - + " checkpoint made at transaction ID " + txid); + "Either current namenode is checkpointing or another" + + " checkpointer is already in the process of " + + "uploading a checkpoint made at transaction ID " + + txid); return null; } try { if (nnImage.getStorage().findImageFile(nnf, txid) != null) { response.sendError(HttpServletResponse.SC_CONFLICT, - "Another checkpointer already uploaded an checkpoint " - + "for txid " + txid); + "Either current namenode has checkpointed or " + + "another checkpointer already uploaded an " + + "checkpoint for txid " + txid); return null; } @@ -502,7 +502,7 @@ public class ImageServlet extends HttpServlet { stream.close(); } } finally { - currentlyDownloadingCheckpoints.remove(txid); + nnImage.removeFromCheckpointing(txid); } return null; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java index 1d75c30e3da..33af0e21979 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java @@ -466,7 +466,7 @@ public class TestStandbyCheckpoints { throws IOException { CompressionOutputStream ret = super.createOutputStream(out); CompressionOutputStream spy = Mockito.spy(ret); - Mockito.doAnswer(new GenericTestUtils.SleepAnswer(2)) + Mockito.doAnswer(new GenericTestUtils.SleepAnswer(5)) .when(spy).write(Mockito.any(), Mockito.anyInt(), Mockito.anyInt()); return spy; }