From e7c701586d8cfac73101b3358707c5ad22937879 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Mon, 8 Aug 2016 16:32:01 -0700 Subject: [PATCH] HDFS-4176. EditLogTailer should call rollEdits with a timeout. (lei) --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 3 + .../server/namenode/ha/EditLogTailer.java | 73 ++++++++++++++++--- .../src/main/resources/hdfs-default.xml | 7 ++ .../server/namenode/ha/TestEditLogTailer.java | 45 ++++++++++++ 4 files changed, 118 insertions(+), 10 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index f31eb0ad891..602c694e874 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -689,6 +689,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_HA_LOGROLL_PERIOD_DEFAULT = 2 * 60; // 2m public static final String DFS_HA_TAILEDITS_PERIOD_KEY = "dfs.ha.tail-edits.period"; public static final int DFS_HA_TAILEDITS_PERIOD_DEFAULT = 60; // 1m + public static final String DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY = + "dfs.ha.tail-edits.rolledits.timeout"; + public static final int DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_DEFAULT = 60; // 1m public static final String DFS_HA_LOGROLL_RPC_TIMEOUT_KEY = "dfs.ha.log-roll.rpc.timeout"; public static final int DFS_HA_LOGROLL_RPC_TIMEOUT_DEFAULT = 20000; // 20s public static final String DFS_HA_FENCE_METHODS_KEY = "dfs.ha.fencing.methods"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java index 5730de49102..e13b5015032 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java @@ -24,6 +24,15 @@ import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; import java.util.Collection; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -95,6 +104,17 @@ public class EditLogTailer { */ private final long logRollPeriodMs; + /** + * The timeout in milliseconds of calling rollEdits RPC to Active NN. + * @see HDFS-4176. + */ + private final long rollEditsTimeoutMs; + + /** + * The executor to run roll edit RPC call in a daemon thread. + */ + private final ExecutorService rollEditsRpcExecutor; + /** * How often the Standby should check if there are new finalized segment(s) * available to be read from. @@ -125,7 +145,14 @@ public class EditLogTailer { sleepTimeMs = conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_DEFAULT) * 1000; - + + rollEditsTimeoutMs = conf.getInt( + DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY, + DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_DEFAULT) * 1000; + + rollEditsRpcExecutor = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true).build()); + LOG.debug("logRollPeriodMs=" + logRollPeriodMs + " sleepTime=" + sleepTimeMs); } @@ -154,6 +181,7 @@ public class EditLogTailer { } public void stop() throws IOException { + rollEditsRpcExecutor.shutdown(); tailerThread.setShouldRun(false); tailerThread.interrupt(); try { @@ -173,7 +201,7 @@ public class EditLogTailer { public void setEditLog(FSEditLog editLog) { this.editLog = editLog; } - + public void catchupDuringFailover() throws IOException { Preconditions.checkState(tailerThread == null || !tailerThread.isAlive(), @@ -266,25 +294,50 @@ public class EditLogTailer { (monotonicNow() - lastLoadTimeMs) > logRollPeriodMs ; } + /** + * @return a Callable to roll logs on remote NameNode. + */ + @VisibleForTesting + Callable getRollEditsTask() { + return new Callable() { + @Override + public Void call() throws IOException { + getActiveNodeProxy().rollEditLog(); + return null; + } + }; + } + /** * Trigger the active node to roll its logs. */ - private void triggerActiveLogRoll() { - LOG.info("Triggering log roll on remote NameNode " + activeAddr); + @VisibleForTesting + void triggerActiveLogRoll() { + LOG.info("Triggering log roll on remote NameNode"); + Future future = null; try { - getActiveNodeProxy().rollEditLog(); + future = rollEditsRpcExecutor.submit(getRollEditsTask()); + future.get(rollEditsTimeoutMs, TimeUnit.MILLISECONDS); lastRollTriggerTxId = lastLoadedTxnId; - } catch (IOException ioe) { - if (ioe instanceof RemoteException) { - ioe = ((RemoteException)ioe).unwrapRemoteException(); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof RemoteException) { + IOException ioe = ((RemoteException) cause).unwrapRemoteException(); if (ioe instanceof StandbyException) { LOG.info("Skipping log roll. Remote node is not in Active state: " + ioe.getMessage().split("\n")[0]); return; } } - - LOG.warn("Unable to trigger a roll of the active NN", ioe); + LOG.warn("Unable to trigger a roll of the active NN", e); + } catch (TimeoutException e) { + if (future != null) { + future.cancel(true); + } + LOG.warn(String.format( + "Unable to finish rolling edits in %d ms", rollEditsTimeoutMs)); + } catch (InterruptedException e) { + LOG.warn("Unable to trigger a roll of the active NN", e); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 1692d1805c8..a893addfc79 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1475,6 +1475,13 @@ + + dfs.ha.tail-edits.rolledits.timeout + 60 + The timeout in seconds of calling rollEdits RPC on Active NN. + + + dfs.ha.automatic-failover.enabled false diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java index c400a096047..1d13bbe938b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java @@ -17,13 +17,17 @@ */ package org.apache.hadoop.hdfs.server.namenode.ha; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Collection; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsPermission; @@ -48,6 +52,7 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; import com.google.common.base.Supplier; +import org.mockito.Mockito; @RunWith(Parameterized.class) public class TestEditLogTailer { @@ -194,4 +199,44 @@ public class TestEditLogTailer { } }, 100, 10000); } + + @Test(timeout=20000) + public void testRollEditTimeoutForActiveNN() throws IOException { + Configuration conf = getConf(); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY, 5); // 5s + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + + HAUtil.setAllowStandbyReads(conf, true); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(0) + .build(); + cluster.waitActive(); + + cluster.transitionToActive(0); + + try { + EditLogTailer tailer = Mockito.spy( + cluster.getNamesystem(1).getEditLogTailer()); + final AtomicInteger flag = new AtomicInteger(0); + + // Return a slow roll edit process. + when(tailer.getRollEditsTask()).thenReturn( + new Callable() { + @Override + public Void call() throws Exception { + Thread.sleep(30000); // sleep for 30 seconds. + assertTrue(Thread.currentThread().isInterrupted()); + flag.addAndGet(1); + return null; + } + } + ); + tailer.triggerActiveLogRoll(); + assertEquals(0, flag.get()); + } finally { + cluster.shutdown(); + } + } }