From bb6a3c83305f97090f980c53adaaf37baf18c698 Mon Sep 17 00:00:00 2001 From: Brahma Reddy Battula Date: Sat, 26 Aug 2017 13:46:20 +0800 Subject: [PATCH] HDFS-12248. SNN will not upload fsimage on IOE and Interrupted exceptions. (Brahma Reddy Battula) --- .../namenode/CheckpointFaultInjector.java | 18 +++++-- .../namenode/ha/StandbyCheckpointer.java | 16 ++++--- .../hadoop/hdfs/TestRollingUpgrade.java | 48 +++++++++++++++++++ 3 files changed, 71 insertions(+), 11 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointFaultInjector.java index 18520944779..eeb082b8d94 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CheckpointFaultInjector.java @@ -24,13 +24,17 @@ import java.io.IOException; * Utility class to faciliate some fault injection tests for the checkpointing * process. */ -class CheckpointFaultInjector { - static CheckpointFaultInjector instance = new CheckpointFaultInjector(); - - static CheckpointFaultInjector getInstance() { +public class CheckpointFaultInjector { + public static CheckpointFaultInjector instance = + new CheckpointFaultInjector(); + + public static CheckpointFaultInjector getInstance() { return instance; } - + + public static void set(CheckpointFaultInjector instance) { + CheckpointFaultInjector.instance = instance; + } public void beforeGetImageSetsHeaders() throws IOException {} public void afterSecondaryCallsRollEditLog() throws IOException {} public void duringMerge() throws IOException {} @@ -46,4 +50,8 @@ class CheckpointFaultInjector { public void afterMD5Rename() throws IOException {} public void beforeEditsRename() throws IOException {} + + public void duringUploadInProgess() throws InterruptedException, IOException { + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java index 753447b0089..789ed9ca2f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/StandbyCheckpointer.java @@ -36,6 +36,7 @@ import org.apache.hadoop.ha.ServiceFailedException; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.server.namenode.CheckpointConf; +import org.apache.hadoop.hdfs.server.namenode.CheckpointFaultInjector; import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; @@ -228,7 +229,9 @@ public class StandbyCheckpointer { Future upload = executor.submit(new Callable() { @Override - public TransferFsImage.TransferResult call() throws IOException { + public TransferFsImage.TransferResult call() + throws IOException, InterruptedException { + CheckpointFaultInjector.getInstance().duringUploadInProgess(); return TransferFsImage.uploadImageFromStorage(activeNNAddress, conf, namesystem .getFSImage().getStorage(), imageType, txid, canceler); } @@ -258,11 +261,12 @@ public class StandbyCheckpointer { break; } } - lastUploadTime = monotonicNow(); - - // we are primary if we successfully updated the ANN - this.isPrimaryCheckPointer = success; - + if (ie == null && ioe == null) { + //Update only when response from remote about success or + lastUploadTime = monotonicNow(); + // we are primary if we successfully updated the ANN + this.isPrimaryCheckPointer = success; + } // cleaner than copying code for multiple catch statements and better than catching all // exceptions, so we just handle the ones we expect. if (ie != null || ioe != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java index b356fb945d2..0545b040f3d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestRollingUpgrade.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs; import java.io.File; import java.io.IOException; import java.lang.management.ManagementFactory; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import javax.management.AttributeNotFoundException; @@ -46,6 +47,7 @@ import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.namenode.CheckpointFaultInjector; import org.apache.hadoop.hdfs.server.namenode.FSImage; import org.apache.hadoop.hdfs.server.namenode.NNStorage; import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode; @@ -568,6 +570,52 @@ public class TestRollingUpgrade { testCheckpoint(3); } + @Test(timeout = 60000) + public void testRollBackImage() throws Exception { + final Configuration conf = new Configuration(); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 10); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 2); + MiniQJMHACluster cluster = null; + CheckpointFaultInjector old = CheckpointFaultInjector.getInstance(); + try { + cluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(2).build(); + MiniDFSCluster dfsCluster = cluster.getDfsCluster(); + dfsCluster.waitActive(); + dfsCluster.transitionToActive(0); + DistributedFileSystem dfs = dfsCluster.getFileSystem(0); + for (int i = 0; i <= 10; i++) { + Path foo = new Path("/foo" + i); + dfs.mkdirs(foo); + } + cluster.getDfsCluster().getNameNodeRpc(0).rollEdits(); + CountDownLatch ruEdit = new CountDownLatch(1); + CheckpointFaultInjector.set(new CheckpointFaultInjector() { + @Override + public void duringUploadInProgess() + throws IOException, InterruptedException { + if (ruEdit.getCount() == 1) { + ruEdit.countDown(); + Thread.sleep(180000); + } + } + }); + ruEdit.await(); + RollingUpgradeInfo info = dfs + .rollingUpgrade(RollingUpgradeAction.PREPARE); + Assert.assertTrue(info.isStarted()); + FSImage fsimage = dfsCluster.getNamesystem(0).getFSImage(); + queryForPreparation(dfs); + // The NN should have a copy of the fsimage in case of rollbacks. + Assert.assertTrue(fsimage.hasRollbackFSImage()); + } finally { + CheckpointFaultInjector.set(old); + if (cluster != null) { + cluster.shutdown(); + } + } + } + public void testCheckpoint(int nnCount) throws IOException, InterruptedException { final Configuration conf = new Configuration(); conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);