From 67406460f0b6c05edde1d1185aeb42b6324df202 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Thu, 28 Jul 2016 13:32:00 -0700 Subject: [PATCH] HDFS-4176. EditLogTailer should call rollEdits with a timeout. (Lei Xu) --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 3 + .../server/namenode/ha/EditLogTailer.java | 76 +++++++++++++++---- .../src/main/resources/hdfs-default.xml | 7 ++ .../server/namenode/ha/TestEditLogTailer.java | 46 +++++++++++ 4 files changed, 116 insertions(+), 16 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 231dea7c58c..3385751b59c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -731,6 +731,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_HA_TAILEDITS_INPROGRESS_KEY = "dfs.ha.tail-edits.in-progress"; public static final boolean DFS_HA_TAILEDITS_INPROGRESS_DEFAULT = false; + public static final String DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY = + "dfs.ha.tail-edits.rolledits.timeout"; + public static final int DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_DEFAULT = 60; // 1m public static final String DFS_HA_LOGROLL_RPC_TIMEOUT_KEY = "dfs.ha.log-roll.rpc.timeout"; public static final int DFS_HA_LOGROLL_RPC_TIMEOUT_DEFAULT = 20000; // 20s public static final String DFS_HA_FENCE_METHODS_KEY = "dfs.ha.fencing.methods"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java index 14473752f08..95c3d581163 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java @@ -27,16 +27,21 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; @@ -101,6 +106,17 @@ public class EditLogTailer { */ private final long logRollPeriodMs; + /** + * The timeout in milliseconds of calling rollEdits RPC to Active NN. + * @see HDFS-4176. + */ + private final long rollEditsTimeoutMs; + + /** + * The executor to run roll edit RPC call in a daemon thread. + */ + private final ExecutorService rollEditsRpcExecutor; + /** * How often the Standby should check if there are new finalized segment(s) * available to be read from. @@ -159,6 +175,13 @@ public class EditLogTailer { sleepTimeMs = conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_DEFAULT) * 1000; + rollEditsTimeoutMs = conf.getInt( + DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY, + DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_DEFAULT) * 1000; + + rollEditsRpcExecutor = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true).build()); + maxRetries = conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY, DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT); if (maxRetries <= 0) { @@ -186,6 +209,7 @@ public class EditLogTailer { } public void stop() throws IOException { + rollEditsRpcExecutor.shutdown(); tailerThread.setShouldRun(false); tailerThread.interrupt(); try { @@ -205,7 +229,7 @@ public class EditLogTailer { public void setEditLog(FSEditLog editLog) { this.editLog = editLog; } - + public void catchupDuringFailover() throws IOException { Preconditions.checkState(tailerThread == null || !tailerThread.isAlive(), @@ -299,31 +323,51 @@ public class EditLogTailer { (monotonicNow() - lastLoadTimeMs) > logRollPeriodMs ; } + /** + * NameNodeProxy factory method. + * @return a Callable to roll logs on remote NameNode. + */ + @VisibleForTesting + Callable getNameNodeProxy() { + return new MultipleNameNodeProxy() { + @Override + protected Void doWork() throws IOException { + cachedActiveProxy.rollEditLog(); + return null; + } + }; + } + /** * Trigger the active node to roll its logs. */ - private void triggerActiveLogRoll() { + @VisibleForTesting + void triggerActiveLogRoll() { LOG.info("Triggering log roll on remote NameNode"); + Future future = null; try { - new MultipleNameNodeProxy() { - @Override - protected Void doWork() throws IOException { - cachedActiveProxy.rollEditLog(); - return null; - } - }.call(); + future = rollEditsRpcExecutor.submit(getNameNodeProxy()); + future.get(rollEditsTimeoutMs, TimeUnit.MILLISECONDS); lastRollTriggerTxId = lastLoadedTxnId; - } catch (IOException ioe) { - if (ioe instanceof RemoteException) { - ioe = ((RemoteException)ioe).unwrapRemoteException(); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof RemoteException) { + IOException ioe = ((RemoteException) cause).unwrapRemoteException(); if (ioe instanceof StandbyException) { LOG.info("Skipping log roll. Remote node is not in Active state: " + ioe.getMessage().split("\n")[0]); return; } } - - LOG.warn("Unable to trigger a roll of the active NN", ioe); + LOG.warn("Unable to trigger a roll of the active NN", e); + } catch (TimeoutException e) { + if (future != null) { + future.cancel(true); + } + LOG.warn(String.format( + "Unable to finish rolling edits in %d ms", rollEditsTimeoutMs)); + } catch (InterruptedException e) { + LOG.warn("Unable to trigger a roll of the active NN", e); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index b82fa31cce3..e6fde8c3930 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1495,6 +1495,13 @@ + + dfs.ha.tail-edits.rolledits.timeout + 60 + The timeout in seconds of calling rollEdits RPC on Active NN. + + + dfs.ha.automatic-failover.enabled false diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java index 3af201d7c5b..0d0873d730a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java @@ -17,15 +17,19 @@ */ package org.apache.hadoop.hdfs.server.namenode.ha; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Collection; +import java.util.concurrent.Callable; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsPermission; @@ -50,6 +54,7 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; import com.google.common.base.Supplier; +import org.mockito.Mockito; @RunWith(Parameterized.class) public class TestEditLogTailer { @@ -249,4 +254,45 @@ public class TestEditLogTailer { } }, 100, 10000); } + + @Test(timeout=20000) + public void testRollEditTimeoutForActiveNN() throws IOException { + Configuration conf = getConf(); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY, 5); // 5s + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY, 100); + + HAUtil.setAllowStandbyReads(conf, true); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(0) + .build(); + cluster.waitActive(); + + cluster.transitionToActive(0); + + try { + EditLogTailer tailer = Mockito.spy( + cluster.getNamesystem(1).getEditLogTailer()); + AtomicInteger flag = new AtomicInteger(0); + + // Return a slow roll edit process. + when(tailer.getNameNodeProxy()).thenReturn( + new Callable() { + @Override + public Void call() throws Exception { + Thread.sleep(30000); // sleep for 30 seconds. + assertTrue(Thread.currentThread().isInterrupted()); + flag.addAndGet(1); + return null; + } + } + ); + tailer.triggerActiveLogRoll(); + assertEquals(0, flag.get()); + } finally { + cluster.shutdown(); + } + } }