From 5e6cc6fe8a11a638ba98913ca402efdc988fe73a Mon Sep 17 00:00:00 2001 From: Chen Liang Date: Wed, 17 Jul 2019 14:37:16 -0700 Subject: [PATCH] HDFS-12979. [SBN read] StandbyNode should upload FsImage to ObserverNode after checkpointing. Contributed by Chen Liang. --- .../hdfs/server/namenode/ImageServlet.java | 63 +++++++- .../hadoop/hdfs/server/namenode/NameNode.java | 6 + .../namenode/ha/StandbyCheckpointer.java | 146 +++++++++++------- .../apache/hadoop/hdfs/MiniDFSCluster.java | 5 + .../hdfs/server/namenode/TestCheckpoint.java | 55 +++++++ .../namenode/ha/TestStandbyCheckpoints.java | 35 ++++- 6 files changed, 254 insertions(+), 56 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java index fcc0abfbf93..6f0a2ce3b29 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java @@ -17,8 +17,14 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap; import org.apache.hadoop.hdfs.server.common.Util; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY; import static org.apache.hadoop.util.Time.monotonicNow; import java.net.HttpURLConnection; @@ -89,6 +95,10 @@ public class ImageServlet extends HttpServlet { private SortedSet currentlyDownloadingCheckpoints = Collections . synchronizedSortedSet(new TreeSet()); + public static final String RECENT_IMAGE_CHECK_ENABLED = + "recent.image.check.enabled"; + public static final boolean RECENT_IMAGE_CHECK_ENABLED_DEFAULT = true; + @Override public void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { @@ -507,6 +517,23 @@ public class ImageServlet extends HttpServlet { final PutImageParams parsedParams = new PutImageParams(request, response, conf); final NameNodeMetrics metrics = NameNode.getNameNodeMetrics(); + final boolean checkRecentImageEnable; + Object checkRecentImageEnableObj = + context.getAttribute(RECENT_IMAGE_CHECK_ENABLED); + if (checkRecentImageEnableObj != null) { + if (checkRecentImageEnableObj instanceof Boolean) { + checkRecentImageEnable = (boolean) checkRecentImageEnableObj; + } else { + // This is an error case, but crashing NN due to this + // seems more undesirable. Only log the error and set to default. + LOG.error("Expecting boolean obj for setting checking recent image, " + + "but got " + checkRecentImageEnableObj.getClass() + ". This is " + + "unexpected! Setting to default."); + checkRecentImageEnable = RECENT_IMAGE_CHECK_ENABLED_DEFAULT; + } + } else { + checkRecentImageEnable = RECENT_IMAGE_CHECK_ENABLED_DEFAULT; + } validateRequest(context, conf, request, response, nnImage, parsedParams.getStorageInfoString()); @@ -520,7 +547,8 @@ public class ImageServlet extends HttpServlet { // target (regardless of the fact that we got the image) HAServiceProtocol.HAServiceState state = NameNodeHttpServer .getNameNodeStateFromContext(getServletContext()); - if (state != HAServiceProtocol.HAServiceState.ACTIVE) { + if (state != HAServiceProtocol.HAServiceState.ACTIVE && + state != HAServiceProtocol.HAServiceState.OBSERVER) { // we need a different response type here so the client can differentiate this // from the failure to upload due to (1) security, or (2) other checkpoints already // present @@ -554,6 +582,39 @@ public class ImageServlet extends HttpServlet { + txid); return null; } + + long now = System.currentTimeMillis(); + long lastCheckpointTime = + nnImage.getStorage().getMostRecentCheckpointTime(); + long lastCheckpointTxid = + nnImage.getStorage().getMostRecentCheckpointTxId(); + + long checkpointPeriod = + conf.getTimeDuration(DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, + DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT, TimeUnit.SECONDS); + long checkpointTxnCount = + conf.getLong(DFS_NAMENODE_CHECKPOINT_TXNS_KEY, + DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT); + + long timeDelta = TimeUnit.MILLISECONDS.toSeconds( + now - lastCheckpointTime); + + if (checkRecentImageEnable && + timeDelta < checkpointPeriod && + txid - lastCheckpointTxid < checkpointTxnCount) { + // only when at least one of two conditions are met we accept + // a new fsImage + // 1. most recent image's txid is too far behind + // 2. last checkpoint time was too old + response.sendError(HttpServletResponse.SC_CONFLICT, + "Most recent checkpoint is neither too far behind in " + + "txid, nor too old. New txnid cnt is " + + (txid - lastCheckpointTxid) + + ", expecting at least " + checkpointTxnCount + + " unless too long since last upload."); + return null; + } + try { if (nnImage.getStorage().findImageFile(nnf, txid) != null) { response.sendError(HttpServletResponse.SC_CONFLICT, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 126ac0bde1c..ec6ce36cf85 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.http.HttpServer2; import org.apache.hadoop.ipc.ExternalCall; import org.apache.hadoop.ipc.RefreshCallQueueProtocol; import org.apache.hadoop.ipc.RetriableException; @@ -440,6 +441,11 @@ public class NameNode extends ReconfigurableBase implements return rpcServer; } + @VisibleForTesting + public HttpServer2 getHttpServer() { + return httpServer.getHttpServer(); + } + public void queueExternalCall(ExternalCall extCall) throws IOException, InterruptedException { if (rpcServer == null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java index 6f8ce91ee4c..d3381550c9b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java @@ -19,14 +19,16 @@ package org.apache.hadoop.hdfs.server.namenode.ha; import static org.apache.hadoop.util.Time.monotonicNow; +import com.google.common.collect.Lists; import java.io.IOException; import java.net.URI; import java.net.URL; import java.security.PrivilegedAction; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -67,7 +69,6 @@ public class StandbyCheckpointer { private final Configuration conf; private final FSNamesystem namesystem; private long lastCheckpointTime; - private long lastUploadTime; private final CheckpointerThread thread; private final ThreadFactory uploadThreadFactory; private List activeNNAddresses; @@ -75,11 +76,13 @@ public class StandbyCheckpointer { private final Object cancelLock = new Object(); private Canceler canceler; - private boolean isPrimaryCheckPointer = true; // Keep track of how many checkpoints were canceled. // This is for use in tests. private static int canceledCount = 0; + + // A map from NN url to the most recent image upload time. + private final HashMap checkpointReceivers; public StandbyCheckpointer(Configuration conf, FSNamesystem ns) throws IOException { @@ -89,8 +92,38 @@ public class StandbyCheckpointer { this.thread = new CheckpointerThread(); this.uploadThreadFactory = new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("TransferFsImageUpload-%d").build(); - setNameNodeAddresses(conf); + this.checkpointReceivers = new HashMap<>(); + for (URL address : activeNNAddresses) { + this.checkpointReceivers.put(address.toString(), + new CheckpointReceiverEntry()); + } + } + + private static final class CheckpointReceiverEntry { + private long lastUploadTime; + private boolean isPrimary; + + CheckpointReceiverEntry() { + this.lastUploadTime = 0L; + this.isPrimary = true; + } + + void setLastUploadTime(long lastUploadTime) { + this.lastUploadTime = lastUploadTime; + } + + void setIsPrimary(boolean isPrimaryFor) { + this.isPrimary = isPrimaryFor; + } + + long getLastUploadTime() { + return lastUploadTime; + } + + boolean isPrimary() { + return isPrimary; + } } /** @@ -158,7 +191,7 @@ public class StandbyCheckpointer { thread.interrupt(); } - private void doCheckpoint(boolean sendCheckpoint) throws InterruptedException, IOException { + private void doCheckpoint() throws InterruptedException, IOException { assert canceler != null; final long txid; final NameNodeFile imageType; @@ -210,11 +243,6 @@ public class StandbyCheckpointer { namesystem.cpUnlock(); } - //early exit if we shouldn't actually send the checkpoint to the ANN - if(!sendCheckpoint){ - return; - } - // Upload the saved checkpoint back to the active // Do this in a separate thread to avoid blocking transition to active, but don't allow more // than the expected number of tasks to run or queue up @@ -224,56 +252,70 @@ public class StandbyCheckpointer { uploadThreadFactory); // for right now, just match the upload to the nn address by convention. There is no need to // directly tie them together by adding a pair class. - List> uploads = - new ArrayList>(); + HashMap> uploads = + new HashMap<>(); for (final URL activeNNAddress : activeNNAddresses) { - Future upload = - executor.submit(new Callable() { - @Override - public TransferFsImage.TransferResult call() - throws IOException, InterruptedException { - CheckpointFaultInjector.getInstance().duringUploadInProgess(); - return TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem - .getFSImage().getStorage(), imageType, txid, canceler); - } - }); - uploads.add(upload); + // Upload image if at least 1 of 2 following conditions met: + // 1. has been quiet for long enough, try to contact the node. + // 2. this standby IS the primary checkpointer of target NN. + String addressString = activeNNAddress.toString(); + assert checkpointReceivers.containsKey(addressString); + CheckpointReceiverEntry receiverEntry = + checkpointReceivers.get(addressString); + long secsSinceLastUpload = + TimeUnit.MILLISECONDS.toSeconds( + monotonicNow() - receiverEntry.getLastUploadTime()); + boolean shouldUpload = receiverEntry.isPrimary() || + secsSinceLastUpload >= checkpointConf.getQuietPeriod(); + if (shouldUpload) { + Future upload = + executor.submit(new Callable() { + @Override + public TransferFsImage.TransferResult call() + throws IOException, InterruptedException { + CheckpointFaultInjector.getInstance().duringUploadInProgess(); + return TransferFsImage.uploadImageFromStorage(activeNNAddress, + conf, namesystem.getFSImage().getStorage(), imageType, txid, + canceler); + } + }); + uploads.put(addressString, upload); + } } InterruptedException ie = null; - IOException ioe= null; - int i = 0; - boolean success = false; - for (; i < uploads.size(); i++) { - Future upload = uploads.get(i); + List ioes = Lists.newArrayList(); + for (Map.Entry> entry : + uploads.entrySet()) { + String url = entry.getKey(); + Future upload = entry.getValue(); try { - // TODO should there be some smarts here about retries nodes that are not the active NN? + // TODO should there be some smarts here about retries nodes that + // are not the active NN? + CheckpointReceiverEntry receiverEntry = checkpointReceivers.get(url); if (upload.get() == TransferFsImage.TransferResult.SUCCESS) { - success = true; - //avoid getting the rest of the results - we don't care since we had a successful upload - break; + receiverEntry.setLastUploadTime(monotonicNow()); + receiverEntry.setIsPrimary(true); + } else { + receiverEntry.setIsPrimary(false); } - } catch (ExecutionException e) { - ioe = new IOException("Exception during image upload", e); - break; + // Even if exception happens, still proceeds to next NN url. + // so that fail to upload to previous NN does not cause the + // remaining NN not getting the fsImage. + ioes.add(new IOException("Exception during image upload", e)); } catch (InterruptedException e) { ie = e; break; } } - if (ie == null && ioe == null) { - //Update only when response from remote about success or - lastUploadTime = monotonicNow(); - // we are primary if we successfully updated the ANN - this.isPrimaryCheckPointer = success; - } // cleaner than copying code for multiple catch statements and better than catching all // exceptions, so we just handle the ones we expect. - if (ie != null || ioe != null) { + if (ie != null) { // cancel the rest of the tasks, and close the pool - for (; i < uploads.size(); i++) { - Future upload = uploads.get(i); + for (Map.Entry> entry : + uploads.entrySet()) { + Future upload = entry.getValue(); // The background thread may be blocked waiting in the throttler, so // interrupt it. upload.cancel(true); @@ -286,11 +328,11 @@ public class StandbyCheckpointer { executor.awaitTermination(500, TimeUnit.MILLISECONDS); // re-throw the exception we got, since one of these two must be non-null - if (ie != null) { - throw ie; - } else if (ioe != null) { - throw ioe; - } + throw ie; + } + + if (!ioes.isEmpty()) { + throw MultipleIOException.createIOException(ioes); } } @@ -373,7 +415,6 @@ public class StandbyCheckpointer { // Reset checkpoint time so that we don't always checkpoint // on startup. lastCheckpointTime = monotonicNow(); - lastUploadTime = monotonicNow(); while (shouldRun) { boolean needRollbackCheckpoint = namesystem.isNeedRollbackFsImage(); if (!needRollbackCheckpoint) { @@ -426,10 +467,7 @@ public class StandbyCheckpointer { // on all nodes, we build the checkpoint. However, we only ship the checkpoint if have a // rollback request, are the checkpointer, are outside the quiet period. - final long secsSinceLastUpload = (now - lastUploadTime) / 1000; - boolean sendRequest = isPrimaryCheckPointer - || secsSinceLastUpload >= checkpointConf.getQuietPeriod(); - doCheckpoint(sendRequest); + doCheckpoint(); // reset needRollbackCheckpoint to false only when we finish a ckpt // for rollback image diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index cb4bdcc374f..94aae539850 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -80,6 +80,7 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap; import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.InMemoryLevelDBAliasMapClient; +import org.apache.hadoop.hdfs.server.namenode.ImageServlet; import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.security.ssl.KeyStoreTestUtil; import org.slf4j.Logger; @@ -984,6 +985,8 @@ public class MiniDFSCluster implements AutoCloseable { } copyKeys(conf, nnConf, nnInfo.nameserviceId, nnInfo.nnId); } + nn.nameNode.getHttpServer() + .setAttribute(ImageServlet.RECENT_IMAGE_CHECK_ENABLED, false); } } @@ -2199,6 +2202,8 @@ public class MiniDFSCluster implements AutoCloseable { } NameNode nn = NameNode.createNameNode(args, info.conf); + nn.getHttpServer() + .setAttribute(ImageServlet.RECENT_IMAGE_CHECK_ENABLED, false); info.nameNode = nn; info.setStartOpt(startOpt); if (waitActive) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java index 4618f4bd107..572ad8bef7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCheckpoint.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI; import static org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil.assertNNHasCheckpoints; import static org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil.getNameNodeCurrentDirs; +import static org.apache.hadoop.hdfs.server.namenode.ImageServlet.RECENT_IMAGE_CHECK_ENABLED; import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt; import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; @@ -2462,6 +2463,60 @@ public class TestCheckpoint { } } + @Test(timeout = 300000) + public void testActiveRejectSmallerDeltaImage() throws Exception { + MiniDFSCluster cluster = null; + Configuration conf = new HdfsConfiguration(); + // Set the delta txid threshold to 10 + conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 10); + // Set the delta time threshold to some arbitrarily large value, so + // it does not trigger a checkpoint during this test. + conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, 900000); + + SecondaryNameNode secondary = null; + + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0) + .format(true).build(); + // enable small delta rejection + NameNode active = cluster.getNameNode(); + active.httpServer.getHttpServer() + .setAttribute(RECENT_IMAGE_CHECK_ENABLED, true); + + secondary = startSecondaryNameNode(conf); + + FileSystem fs = cluster.getFileSystem(); + assertEquals(0, active.getNamesystem().getFSImage() + .getMostRecentCheckpointTxId()); + + // create 5 dir. + for (int i = 0; i < 5; i++) { + fs.mkdirs(new Path("dir-" + i)); + } + + // Checkpoint 1st + secondary.doCheckpoint(); + // at this point, the txid delta is smaller than threshold 10. + // active does not accept this image. + assertEquals(0, active.getNamesystem().getFSImage() + .getMostRecentCheckpointTxId()); + + // create another 10 dir. + for (int i = 0; i < 10; i++) { + fs.mkdirs(new Path("dir2-" + i)); + } + + // Checkpoint 2nd + secondary.doCheckpoint(); + // here the delta is large enough and active accepts this image. + assertEquals(21, active.getNamesystem().getFSImage() + .getMostRecentCheckpointTxId()); + } finally { + cleanup(secondary); + cleanup(cluster); + } + } + private static void cleanup(SecondaryNameNode snn) { if (snn != null) { try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java index 6804bb92fde..1730a662b29 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestStandbyCheckpoints.java @@ -253,7 +253,40 @@ public class TestStandbyCheckpoints { dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1)); FSImageTestUtil.assertParallelFilesAreIdentical(dirs, ImmutableSet.of()); } - + + /** + * Test for the case of when there are observer NameNodes, Standby node is + * able to upload fsImage to Observer node as well. + */ + @Test(timeout = 300000) + public void testStandbyAndObserverState() throws Exception { + // Transition 2 to observer + cluster.transitionToObserver(2); + doEdits(0, 10); + // After a rollEditLog, Standby(nn1) 's next checkpoint would be + // ahead of observer(nn2). + nns[0].getRpcServer().rollEditLog(); + + // After standby creating a checkpoint, it will try to push the image to + // active and all observer, updating it's own txid to the most recent. + HATestUtil.waitForCheckpoint(cluster, 1, ImmutableList.of(12)); + HATestUtil.waitForCheckpoint(cluster, 0, ImmutableList.of(12)); + HATestUtil.waitForCheckpoint(cluster, 2, ImmutableList.of(12)); + + assertEquals(12, nns[2].getNamesystem().getFSImage() + .getMostRecentCheckpointTxId()); + assertEquals(12, nns[1].getNamesystem().getFSImage() + .getMostRecentCheckpointTxId()); + + List dirs = Lists.newArrayList(); + // observer and standby both have this same image. + dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 2)); + dirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1)); + FSImageTestUtil.assertParallelFilesAreIdentical(dirs, ImmutableSet.of()); + // Restore 2 back to standby + cluster.transitionToStandby(2); + } + /** * Test for the case when the SBN is configured to checkpoint based * on a time period, but no transactions are happening on the