From 639c7b0ddc216395dc336f48c021ab0c4ce51ada Mon Sep 17 00:00:00 2001 From: cnauroth Date: Thu, 22 Jan 2015 16:03:17 -0800 Subject: [PATCH] HDFS-3519. Checkpoint upload may interfere with a concurrent saveNamespace. Contributed by Ming Ma. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hadoop/hdfs/server/namenode/FSImage.java | 56 +++++++++++++++---- .../hdfs/server/namenode/ImageServlet.java | 18 +++--- .../namenode/ha/TestStandbyCheckpoints.java | 2 +- 4 files changed, 58 insertions(+), 21 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index d7833e5ea89..1d1dc951ea0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -478,6 +478,9 @@ Release 2.7.0 - UNRELEASED HDFS-7575. Upgrade should generate a unique storage ID for each volume. (Arpit Agarwal) + HDFS-3519. Checkpoint upload may interfere with a concurrent saveNamespace. + (Ming Ma via cnauroth) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index 5a975541aca..3a374f47925 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -29,9 +29,11 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -97,6 +99,15 @@ public class FSImage implements Closeable { protected NNStorageRetentionManager archivalManager; + /* Used to make sure there are no concurrent checkpoints for a given txid + * The checkpoint here could be one of the following operations. + * a. checkpoint when NN is in standby. + * b. admin saveNameSpace operation. + * c. download checkpoint file from any remote checkpointer. + */ + private final Set currentlyCheckpointing = + Collections.synchronizedSet(new HashSet()); + /** * Construct an FSImage * @param conf Configuration @@ -1058,18 +1069,26 @@ public class FSImage implements Closeable { editLog.endCurrentLogSegment(true); } long imageTxId = getLastAppliedOrWrittenTxId(); + if (!addToCheckpointing(imageTxId)) { + throw new IOException( + "FS image is being downloaded from another NN at txid " + imageTxId); + } try { - saveFSImageInAllDirs(source, nnf, imageTxId, canceler); - storage.writeAll(); - } finally { - if (editLogWasOpen) { - editLog.startLogSegment(imageTxId + 1, true); - // Take this opportunity to note the current transaction. - // Even if the namespace save was cancelled, this marker - // is only used to determine what transaction ID is required - // for startup. So, it doesn't hurt to update it unnecessarily. - storage.writeTransactionIdFileToStorage(imageTxId + 1); + try { + saveFSImageInAllDirs(source, nnf, imageTxId, canceler); + storage.writeAll(); + } finally { + if (editLogWasOpen) { + editLog.startLogSegment(imageTxId + 1, true); + // Take this opportunity to note the current transaction. + // Even if the namespace save was cancelled, this marker + // is only used to determine what transaction ID is required + // for startup. So, it doesn't hurt to update it unnecessarily. + storage.writeTransactionIdFileToStorage(imageTxId + 1); + } } + } finally { + removeFromCheckpointing(imageTxId); } } @@ -1078,7 +1097,22 @@ public class FSImage implements Closeable { */ protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid) throws IOException { - saveFSImageInAllDirs(source, NameNodeFile.IMAGE, txid, null); + if (!addToCheckpointing(txid)) { + throw new IOException(("FS image is being downloaded from another NN")); + } + try { + saveFSImageInAllDirs(source, NameNodeFile.IMAGE, txid, null); + } finally { + removeFromCheckpointing(txid); + } + } + + public boolean addToCheckpointing(long txid) { + return currentlyCheckpointing.add(txid); + } + + public void removeFromCheckpointing(long txid) { + currentlyCheckpointing.remove(txid); } private synchronized void saveFSImageInAllDirs(FSNamesystem source, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java index d10aacc7768..702c8f14db1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java @@ -81,9 +81,6 @@ public class ImageServlet extends HttpServlet { private static final String LATEST_FSIMAGE_VALUE = "latest"; private static final String IMAGE_FILE_TYPE = "imageFile"; - private static final Set currentlyDownloadingCheckpoints = - Collections.synchronizedSet(new HashSet()); - @Override public void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { @@ -467,17 +464,20 @@ public class ImageServlet extends HttpServlet { final NameNodeFile nnf = parsedParams.getNameNodeFile(); - if (!currentlyDownloadingCheckpoints.add(txid)) { + if (!nnImage.addToCheckpointing(txid)) { response.sendError(HttpServletResponse.SC_CONFLICT, - "Another checkpointer is already in the process of uploading a" - + " checkpoint made at transaction ID " + txid); + "Either current namenode is checkpointing or another" + + " checkpointer is already in the process of " + + "uploading a checkpoint made at transaction ID " + + txid); return null; } try { if (nnImage.getStorage().findImageFile(nnf, txid) != null) { response.sendError(HttpServletResponse.SC_CONFLICT, - "Another checkpointer already uploaded an checkpoint " - + "for txid " + txid); + "Either current namenode has checkpointed or " + + "another checkpointer already uploaded an " + + "checkpoint for txid " + txid); return null; } @@ -502,7 +502,7 @@ public class ImageServlet extends HttpServlet { stream.close(); } } finally { - currentlyDownloadingCheckpoints.remove(txid); + nnImage.removeFromCheckpointing(txid); } return null; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java index 1d75c30e3da..33af0e21979 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java @@ -466,7 +466,7 @@ public class TestStandbyCheckpoints { throws IOException { CompressionOutputStream ret = super.createOutputStream(out); CompressionOutputStream spy = Mockito.spy(ret); - Mockito.doAnswer(new GenericTestUtils.SleepAnswer(2)) + Mockito.doAnswer(new GenericTestUtils.SleepAnswer(5)) .when(spy).write(Mockito.any(), Mockito.anyInt(), Mockito.anyInt()); return spy; }